Seregon/StratoSDK

StratoSDK is a framework with a declarative approach similar to Flutter/React, written and designed entirely for Rust.

Rust/27.3 KB/No license
crates/strato-ui-core/src/async/native/executor.rs
StratoSDK / crates / strato-ui-core / src / async / native / executor.rs
1use std::{
2 marker::PhantomData,
3 pin::Pin,
4 rc::Rc,
5 sync::{
6 atomic::{AtomicUsize, Ordering},
7 Arc,
8 },
9 task::{Context, Poll},
10};
11 
12use async_executor::LocalExecutor;
13use futures::{
14 future::{BoxFuture, LocalBoxFuture},
15 Future, FutureExt,
16};
17use futures_util::future::{AbortHandle, Abortable};
18 
19use crate::{platform, r#async::executor::Error};
20 
21pub type ForegroundTask = async_task::Task<()>;
22 
23pub struct BackgroundTask {
24 inner: Option<tokio::task::JoinHandle<()>>,
25}
26 
27impl BackgroundTask {
28 pub fn abort(&self) {
29 if let Some(inner) = &self.inner {
30 inner.abort();
31 }
32 }
33 
34 pub fn detach(self) {
35 // Nothing to do here; dropping the join handle will cause the task
36 // to be detached.
37 }
38}
39 
40impl Future for BackgroundTask {
41 type Output = Result<(), tokio::task::JoinError>;
42 
43 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
44 match &mut self.inner {
45 Some(inner) => inner.poll_unpin(cx),
46 None => Poll::Pending,
47 }
48 }
49}
50 
51pub enum Foreground {
52 Platform {
53 not_send_or_sync: PhantomData<Rc<()>>, // Make sure the type is `!Send` and `!Sync`.
54 delegate: Arc<dyn platform::DispatchDelegate>,
55 },
56 Test {
57 executor: LocalExecutor<'static>,
58 },
59}
60 
61impl Foreground {
62 pub fn platform(delegate: Arc<dyn platform::DispatchDelegate>) -> Result<Self, Error> {
63 if delegate.is_main_thread() {
64 Ok(Self::Platform {
65 not_send_or_sync: PhantomData,
66 delegate,
67 })
68 } else {
69 Err(Error::NotOnMainThread)
70 }
71 }
72 
73 pub fn test() -> Self {
74 Self::Test {
75 executor: LocalExecutor::new(),
76 }
77 }
78 
79 /// Schedule an asynchronous task to run on the main thread.
80 ///
81 /// If you have a boxed future, use `spawn_boxed` instead.
82 pub fn spawn(&self, future: impl Future<Output = ()> + 'static) -> ForegroundTask {
83 self.spawn_boxed(future.boxed_local())
84 }
85 
86 /// Schedule an asynchronous task to run on the main thread.
87 ///
88 /// This takes in a boxed future in order to avoid monomorphizing the
89 /// underlying task implementation. `spawn_boxed` generates significantly
90 /// less code than a generic implementation, with no noticeable performance
91 /// impact.
92 pub fn spawn_boxed(&self, future: LocalBoxFuture<'static, ()>) -> ForegroundTask {
93 match self {
94 Foreground::Platform {
95 not_send_or_sync: _,
96 delegate: platform,
97 } => {
98 let platform = platform.clone();
99 let schedule = move |task: async_task::Runnable| platform.run_on_main_thread(task);
100 let (runnable, handle) = async_task::spawn_local(future, schedule);
101 runnable.schedule();
102 handle
103 }
104 Foreground::Test { executor } => executor.spawn(future),
105 }
106 }
107 
108 /// Schedules an abortable asynchronous task to run on the main thread.
109 ///
110 /// This is the same as `spawn()` except the task may be aborted using the returned
111 /// [`AbortHandle`].
112 pub fn spawn_abortable(
113 &self,
114 future: impl Future<Output = ()> + 'static,
115 ) -> (ForegroundTask, AbortHandle) {
116 let (handle, registration) = AbortHandle::new_pair();
117 let task = self.spawn(Abortable::new(future, registration).map(|_| ()));
118 (task, handle)
119 }
120 
121 pub async fn run<T>(&'_ self, future: impl Future<Output = T>) -> T {
122 match self {
123 Foreground::Platform {
124 not_send_or_sync: _,
125 delegate: _,
126 } => unimplemented!("only the test executor can be run"),
127 Foreground::Test { executor } => executor.run(future).await,
128 }
129 }
130}
131 
132pub struct Background {
133 runtime: Option<tokio::runtime::Runtime>,
134}
135 
136impl Drop for Background {
137 fn drop(&mut self) {
138 if let Some(runtime) = self.runtime.take() {
139 // Cancel all running tasks immediately instead of blocking until they complete.
140 runtime.shutdown_background();
141 }
142 }
143}
144 
145impl Default for Background {
146 fn default() -> Self {
147 let num_threads = if cfg!(any(test, feature = "integration_tests")) {
148 // For tests, limit each test to a single background thread.
149 // When running unit tests via [`App::test()`] on machines with
150 // many logical cores, the time it takes to spawning the background
151 // threads can far exceed the time it takes to actually run the
152 // test.
153 1
154 } else {
155 // In production, create a thread for each logical CPU core,
156 // maximizing our possible parallelism.
157 num_cpus::get()
158 };
159 
160 Self::new(num_threads, |i| format!("background-executor-{i}"))
161 }
162}
163 
164impl Background {
165 pub fn new(
166 num_threads: usize,
167 name_fn: impl Fn(usize) -> String + Send + Sync + 'static,
168 ) -> Self {
169 let runtime = tokio::runtime::Builder::new_multi_thread()
170 .worker_threads(num_threads)
171 .thread_name_fn(move || {
172 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
173 let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
174 name_fn(id)
175 })
176 .enable_all()
177 .build()
178 .expect("should not fail to create tokio runtime for background executor");
179 
180 Self {
181 runtime: Some(runtime),
182 }
183 }
184 
185 /// Schedule an asynchronous task to run on a background thread.
186 ///
187 /// If you have a boxed future, use `spawn_boxed` instead.
188 pub fn spawn(&self, future: impl Send + Future<Output = ()> + 'static) -> BackgroundTask {
189 self.spawn_boxed(future.boxed())
190 }
191 
192 /// Schedule an asynchronous task to run on a background thread.
193 ///
194 /// This takes in a boxed future in order to avoid monomorphizing the
195 /// underlying task implementation. `spawn_boxed` generates significantly
196 /// less code than a generic implementation, with no noticeable performance
197 /// impact.
198 pub fn spawn_boxed(&self, future: BoxFuture<'static, ()>) -> BackgroundTask {
199 let inner = match &self.runtime {
200 Some(runtime) => Some(runtime.spawn(future)),
201 None => {
202 log::error!("tried to spawn a background task after the executor was shut down");
203 None
204 }
205 };
206 BackgroundTask { inner }
207 }
208 
209 /// Schedules an abortable asynchronous task to run on a background thread.
210 ///
211 /// This is the same as `spawn()` except the task may be aborted using the returned
212 /// [`AbortHandle`].
213 pub fn spawn_abortable(
214 &self,
215 future: impl Send + Future<Output = ()> + 'static,
216 ) -> (BackgroundTask, AbortHandle) {
217 let (handle, registration) = AbortHandle::new_pair();
218 let task = self.spawn(Abortable::new(future, registration).map(|_| ()));
219 (task, handle)
220 }
221}
222