StratoSDK is a framework with a declarative approach similar to Flutter/React, written and designed entirely for Rust.
| 1 | use std::{ |
| 2 | marker::PhantomData, |
| 3 | pin::Pin, |
| 4 | rc::Rc, |
| 5 | sync::{ |
| 6 | atomic::{AtomicUsize, Ordering}, |
| 7 | Arc, |
| 8 | }, |
| 9 | task::{Context, Poll}, |
| 10 | }; |
| 11 | |
| 12 | use async_executor::LocalExecutor; |
| 13 | use futures::{ |
| 14 | future::{BoxFuture, LocalBoxFuture}, |
| 15 | Future, FutureExt, |
| 16 | }; |
| 17 | use futures_util::future::{AbortHandle, Abortable}; |
| 18 | |
| 19 | use crate::{platform, r#async::executor::Error}; |
| 20 | |
| 21 | pub type ForegroundTask = async_task::Task<()>; |
| 22 | |
| 23 | pub struct BackgroundTask { |
| 24 | inner: Option<tokio::task::JoinHandle<()>>, |
| 25 | } |
| 26 | |
| 27 | impl BackgroundTask { |
| 28 | pub fn abort(&self) { |
| 29 | if let Some(inner) = &self.inner { |
| 30 | inner.abort(); |
| 31 | } |
| 32 | } |
| 33 | |
| 34 | pub fn detach(self) { |
| 35 | // Nothing to do here; dropping the join handle will cause the task |
| 36 | // to be detached. |
| 37 | } |
| 38 | } |
| 39 | |
| 40 | impl Future for BackgroundTask { |
| 41 | type Output = Result<(), tokio::task::JoinError>; |
| 42 | |
| 43 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 44 | match &mut self.inner { |
| 45 | Some(inner) => inner.poll_unpin(cx), |
| 46 | None => Poll::Pending, |
| 47 | } |
| 48 | } |
| 49 | } |
| 50 | |
| 51 | pub enum Foreground { |
| 52 | Platform { |
| 53 | not_send_or_sync: PhantomData<Rc<()>>, // Make sure the type is `!Send` and `!Sync`. |
| 54 | delegate: Arc<dyn platform::DispatchDelegate>, |
| 55 | }, |
| 56 | Test { |
| 57 | executor: LocalExecutor<'static>, |
| 58 | }, |
| 59 | } |
| 60 | |
| 61 | impl Foreground { |
| 62 | pub fn platform(delegate: Arc<dyn platform::DispatchDelegate>) -> Result<Self, Error> { |
| 63 | if delegate.is_main_thread() { |
| 64 | Ok(Self::Platform { |
| 65 | not_send_or_sync: PhantomData, |
| 66 | delegate, |
| 67 | }) |
| 68 | } else { |
| 69 | Err(Error::NotOnMainThread) |
| 70 | } |
| 71 | } |
| 72 | |
| 73 | pub fn test() -> Self { |
| 74 | Self::Test { |
| 75 | executor: LocalExecutor::new(), |
| 76 | } |
| 77 | } |
| 78 | |
| 79 | /// Schedule an asynchronous task to run on the main thread. |
| 80 | /// |
| 81 | /// If you have a boxed future, use `spawn_boxed` instead. |
| 82 | pub fn spawn(&self, future: impl Future<Output = ()> + 'static) -> ForegroundTask { |
| 83 | self.spawn_boxed(future.boxed_local()) |
| 84 | } |
| 85 | |
| 86 | /// Schedule an asynchronous task to run on the main thread. |
| 87 | /// |
| 88 | /// This takes in a boxed future in order to avoid monomorphizing the |
| 89 | /// underlying task implementation. `spawn_boxed` generates significantly |
| 90 | /// less code than a generic implementation, with no noticeable performance |
| 91 | /// impact. |
| 92 | pub fn spawn_boxed(&self, future: LocalBoxFuture<'static, ()>) -> ForegroundTask { |
| 93 | match self { |
| 94 | Foreground::Platform { |
| 95 | not_send_or_sync: _, |
| 96 | delegate: platform, |
| 97 | } => { |
| 98 | let platform = platform.clone(); |
| 99 | let schedule = move |task: async_task::Runnable| platform.run_on_main_thread(task); |
| 100 | let (runnable, handle) = async_task::spawn_local(future, schedule); |
| 101 | runnable.schedule(); |
| 102 | handle |
| 103 | } |
| 104 | Foreground::Test { executor } => executor.spawn(future), |
| 105 | } |
| 106 | } |
| 107 | |
| 108 | /// Schedules an abortable asynchronous task to run on the main thread. |
| 109 | /// |
| 110 | /// This is the same as `spawn()` except the task may be aborted using the returned |
| 111 | /// [`AbortHandle`]. |
| 112 | pub fn spawn_abortable( |
| 113 | &self, |
| 114 | future: impl Future<Output = ()> + 'static, |
| 115 | ) -> (ForegroundTask, AbortHandle) { |
| 116 | let (handle, registration) = AbortHandle::new_pair(); |
| 117 | let task = self.spawn(Abortable::new(future, registration).map(|_| ())); |
| 118 | (task, handle) |
| 119 | } |
| 120 | |
| 121 | pub async fn run<T>(&'_ self, future: impl Future<Output = T>) -> T { |
| 122 | match self { |
| 123 | Foreground::Platform { |
| 124 | not_send_or_sync: _, |
| 125 | delegate: _, |
| 126 | } => unimplemented!("only the test executor can be run"), |
| 127 | Foreground::Test { executor } => executor.run(future).await, |
| 128 | } |
| 129 | } |
| 130 | } |
| 131 | |
| 132 | pub struct Background { |
| 133 | runtime: Option<tokio::runtime::Runtime>, |
| 134 | } |
| 135 | |
| 136 | impl Drop for Background { |
| 137 | fn drop(&mut self) { |
| 138 | if let Some(runtime) = self.runtime.take() { |
| 139 | // Cancel all running tasks immediately instead of blocking until they complete. |
| 140 | runtime.shutdown_background(); |
| 141 | } |
| 142 | } |
| 143 | } |
| 144 | |
| 145 | impl Default for Background { |
| 146 | fn default() -> Self { |
| 147 | let num_threads = if cfg!(any(test, feature = "integration_tests")) { |
| 148 | // For tests, limit each test to a single background thread. |
| 149 | // When running unit tests via [`App::test()`] on machines with |
| 150 | // many logical cores, the time it takes to spawning the background |
| 151 | // threads can far exceed the time it takes to actually run the |
| 152 | // test. |
| 153 | 1 |
| 154 | } else { |
| 155 | // In production, create a thread for each logical CPU core, |
| 156 | // maximizing our possible parallelism. |
| 157 | num_cpus::get() |
| 158 | }; |
| 159 | |
| 160 | Self::new(num_threads, |i| format!("background-executor-{i}")) |
| 161 | } |
| 162 | } |
| 163 | |
| 164 | impl Background { |
| 165 | pub fn new( |
| 166 | num_threads: usize, |
| 167 | name_fn: impl Fn(usize) -> String + Send + Sync + 'static, |
| 168 | ) -> Self { |
| 169 | let runtime = tokio::runtime::Builder::new_multi_thread() |
| 170 | .worker_threads(num_threads) |
| 171 | .thread_name_fn(move || { |
| 172 | static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); |
| 173 | let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); |
| 174 | name_fn(id) |
| 175 | }) |
| 176 | .enable_all() |
| 177 | .build() |
| 178 | .expect("should not fail to create tokio runtime for background executor"); |
| 179 | |
| 180 | Self { |
| 181 | runtime: Some(runtime), |
| 182 | } |
| 183 | } |
| 184 | |
| 185 | /// Schedule an asynchronous task to run on a background thread. |
| 186 | /// |
| 187 | /// If you have a boxed future, use `spawn_boxed` instead. |
| 188 | pub fn spawn(&self, future: impl Send + Future<Output = ()> + 'static) -> BackgroundTask { |
| 189 | self.spawn_boxed(future.boxed()) |
| 190 | } |
| 191 | |
| 192 | /// Schedule an asynchronous task to run on a background thread. |
| 193 | /// |
| 194 | /// This takes in a boxed future in order to avoid monomorphizing the |
| 195 | /// underlying task implementation. `spawn_boxed` generates significantly |
| 196 | /// less code than a generic implementation, with no noticeable performance |
| 197 | /// impact. |
| 198 | pub fn spawn_boxed(&self, future: BoxFuture<'static, ()>) -> BackgroundTask { |
| 199 | let inner = match &self.runtime { |
| 200 | Some(runtime) => Some(runtime.spawn(future)), |
| 201 | None => { |
| 202 | log::error!("tried to spawn a background task after the executor was shut down"); |
| 203 | None |
| 204 | } |
| 205 | }; |
| 206 | BackgroundTask { inner } |
| 207 | } |
| 208 | |
| 209 | /// Schedules an abortable asynchronous task to run on a background thread. |
| 210 | /// |
| 211 | /// This is the same as `spawn()` except the task may be aborted using the returned |
| 212 | /// [`AbortHandle`]. |
| 213 | pub fn spawn_abortable( |
| 214 | &self, |
| 215 | future: impl Send + Future<Output = ()> + 'static, |
| 216 | ) -> (BackgroundTask, AbortHandle) { |
| 217 | let (handle, registration) = AbortHandle::new_pair(); |
| 218 | let task = self.spawn(Abortable::new(future, registration).map(|_| ())); |
| 219 | (task, handle) |
| 220 | } |
| 221 | } |
| 222 |