StratoSDK is a framework with a declarative approach similar to Flutter/React, written and designed entirely for Rust.
| 1 | use anyhow::anyhow; |
| 2 | use anyhow::{Error, Result}; |
| 3 | use async_channel::{self, Receiver, Sender}; |
| 4 | use bytes::Bytes; |
| 5 | use derivative::Derivative; |
| 6 | use futures::FutureExt as _; |
| 7 | use futures::{future::BoxFuture, Future}; |
| 8 | use std::any::{Any, TypeId}; |
| 9 | use std::pin::Pin; |
| 10 | use std::{cell::RefCell, collections::HashMap, hash::Hash, rc::Rc, sync::Arc}; |
| 11 | |
| 12 | use crate::image_cache::ImageCache; |
| 13 | use crate::{r#async::executor, Entity, ModelContext, SingletonEntity}; |
| 14 | |
| 15 | use super::AssetProvider; |
| 16 | |
| 17 | pub trait FetchAsset: crate::r#async::Spawnable + Future<Output = Result<Bytes>> {} |
| 18 | impl<T: crate::r#async::Spawnable + Future<Output = Result<Bytes>> + ?Sized> FetchAsset for T {} |
| 19 | |
| 20 | /// Marker trait for async asset ID namespaces. |
| 21 | /// |
| 22 | /// Each distinct kind of async asset source defines its own zero-sized marker |
| 23 | /// type that implements this trait. The marker's [`TypeId`] is stored inside |
| 24 | /// [`AsyncAssetId`] so that IDs from different sources can never collide, even |
| 25 | /// if they happen to share the same key string. |
| 26 | pub trait AsyncAssetType: 'static {} |
| 27 | |
| 28 | /// A namespaced identifier for an [`AssetSource::Async`] entry. |
| 29 | /// |
| 30 | /// The namespace is stored as a [`TypeId`] derived from a marker type that |
| 31 | /// implements [`AsyncAssetType`]. This guarantees that two different async |
| 32 | /// sources cannot accidentally produce colliding cache keys. |
| 33 | #[derive(Clone, Hash, PartialEq, Eq)] |
| 34 | pub struct AsyncAssetId { |
| 35 | namespace: TypeId, |
| 36 | key: String, |
| 37 | } |
| 38 | |
| 39 | impl AsyncAssetId { |
| 40 | /// Creates a new ID in the namespace defined by `N`. |
| 41 | pub fn new<N: AsyncAssetType>(key: impl Into<String>) -> Self { |
| 42 | Self { |
| 43 | namespace: TypeId::of::<N>(), |
| 44 | key: key.into(), |
| 45 | } |
| 46 | } |
| 47 | |
| 48 | /// Returns the key portion of this ID. |
| 49 | pub fn key(&self) -> &str { |
| 50 | &self.key |
| 51 | } |
| 52 | } |
| 53 | |
| 54 | impl std::fmt::Debug for AsyncAssetId { |
| 55 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| 56 | // TypeId's Debug output is opaque, so just print the key. |
| 57 | f.debug_struct("AsyncAssetId") |
| 58 | .field("key", &self.key) |
| 59 | .finish() |
| 60 | } |
| 61 | } |
| 62 | |
| 63 | /// A "URI" for some data file. In other words, the location of an asset. |
| 64 | #[derive(Derivative)] |
| 65 | #[derivative(Clone, Hash, PartialEq, Eq, Debug)] |
| 66 | pub enum AssetSource { |
| 67 | /// Loaded from an arbitrary asynchronous source (e.g. a URL fetch). |
| 68 | Async { |
| 69 | /// A namespaced identifier used as the cache key. |
| 70 | id: AsyncAssetId, |
| 71 | /// A factory that produces the future to fetch the asset bytes. |
| 72 | /// Called at most once per unique `id` — only when the asset is |
| 73 | /// not already loaded or loading. |
| 74 | #[derivative(Hash = "ignore", PartialEq = "ignore", Debug = "ignore")] |
| 75 | fetch: Arc<dyn Fn() -> Pin<Box<dyn FetchAsset>> + Send + Sync>, |
| 76 | }, |
| 77 | /// Included in the app bundle. |
| 78 | Bundled { |
| 79 | // Assets that are statically included in the bundle can be statically |
| 80 | // referenced, hence using a `&'static str` here and not a `String`. |
| 81 | path: &'static str, |
| 82 | }, |
| 83 | /// Accessible in the user's local filesystem at the provided path. |
| 84 | LocalFile { path: String }, |
| 85 | /// Image loaded directly with bytes |
| 86 | Raw { id: String }, |
| 87 | } |
| 88 | |
| 89 | /// The public representation of an asset's current state (i.e., in-memory availability). |
| 90 | pub enum AssetState<T> { |
| 91 | Loading { handle: AssetHandle }, |
| 92 | Loaded { data: Rc<T> }, |
| 93 | Evicted, |
| 94 | FailedToLoad(Rc<Error>), |
| 95 | } |
| 96 | |
| 97 | /// An external type so views can refer to the asset they requested. |
| 98 | /// Transforms into a future that resolves once the asset is finished loading, allowing |
| 99 | /// work to be scheduled at the time of load completion. |
| 100 | #[derive(Clone, Hash, PartialEq, Eq, Debug)] |
| 101 | pub struct AssetHandle { |
| 102 | source: AssetSource, |
| 103 | asset_type: TypeId, |
| 104 | } |
| 105 | |
| 106 | impl AssetHandle { |
| 107 | /// Creates a future that resolves whenever the asset is finished loading. |
| 108 | pub fn when_loaded(&self, asset_cache: &AssetCache) -> Option<BoxFuture<'static, ()>> { |
| 109 | asset_cache.create_future_for_loading_asset(self) |
| 110 | } |
| 111 | } |
| 112 | |
| 113 | /// An internal representation of an asset's state, as it's tracked and updated by the |
| 114 | /// AssetCache. An implementation. |
| 115 | enum AssetStateInternal { |
| 116 | Loading { |
| 117 | channel: (Sender<()>, Receiver<()>), |
| 118 | }, |
| 119 | Loaded { |
| 120 | data: Rc<dyn Any>, |
| 121 | timestamp: u64, |
| 122 | size_in_bytes: usize, |
| 123 | }, |
| 124 | Evicted, |
| 125 | Error(Rc<Error>), |
| 126 | } |
| 127 | |
| 128 | impl AssetStateInternal { |
| 129 | fn loading() -> Self { |
| 130 | // Whenever we add an asset in a loading state, we create a channel that |
| 131 | // can be alerted once the asset load completes (i.e., becomes available |
| 132 | // or encounters an error). The channel must support the ability to clone one |
| 133 | // side of the channel. |
| 134 | let channel = async_channel::bounded(1); |
| 135 | AssetStateInternal::Loading { channel } |
| 136 | } |
| 137 | |
| 138 | fn to_external_type<T: Asset>(&self, source: AssetSource) -> AssetState<T> { |
| 139 | match self { |
| 140 | AssetStateInternal::Loading { .. } => AssetState::Loading { |
| 141 | handle: AssetHandle { |
| 142 | source, |
| 143 | asset_type: TypeId::of::<T>(), |
| 144 | }, |
| 145 | }, |
| 146 | AssetStateInternal::Loaded { data, .. } => AssetState::Loaded { |
| 147 | data: data |
| 148 | .clone() |
| 149 | .downcast::<T>() |
| 150 | .expect("should not fail to downcast"), |
| 151 | }, |
| 152 | AssetStateInternal::Evicted => AssetState::Evicted, |
| 153 | AssetStateInternal::Error(err) => AssetState::FailedToLoad(err.clone()), |
| 154 | } |
| 155 | } |
| 156 | } |
| 157 | |
| 158 | /// A general-purpose data cache for managing assets. Generalized to any file type. |
| 159 | /// Internally handles networking and persistence caching. |
| 160 | pub struct AssetCache { |
| 161 | // Note: interior mutability allows us to update the state of an asset |
| 162 | // without requiring a mutable reference to the AssetCache. |
| 163 | inner: Rc<RefCell<HashMap<AssetHandle, AssetStateInternal>>>, |
| 164 | |
| 165 | bundled_asset_provider: Box<dyn AssetProvider>, |
| 166 | foreground_executor: Rc<executor::Foreground>, |
| 167 | background_executor: Arc<executor::Background>, |
| 168 | } |
| 169 | |
| 170 | pub trait Asset: Any { |
| 171 | fn try_from_bytes(data: &[u8]) -> anyhow::Result<Self> |
| 172 | where |
| 173 | Self: Sized; |
| 174 | |
| 175 | fn size_in_bytes(&self) -> usize; |
| 176 | } |
| 177 | |
| 178 | impl Asset for String { |
| 179 | fn try_from_bytes(data: &[u8]) -> anyhow::Result<Self> |
| 180 | where |
| 181 | Self: Sized, |
| 182 | { |
| 183 | std::str::from_utf8(data) |
| 184 | .map(|s| s.to_string()) |
| 185 | .map_err(|e| e.into()) |
| 186 | } |
| 187 | |
| 188 | fn size_in_bytes(&self) -> usize { |
| 189 | self.len() |
| 190 | } |
| 191 | } |
| 192 | |
| 193 | impl AssetCache { |
| 194 | const MAX_RAW_ASSET_SIZE: usize = 320 * 1000 * 1000; // 320MB |
| 195 | |
| 196 | pub fn new( |
| 197 | bundled_asset_provider: Box<dyn AssetProvider>, |
| 198 | foreground_executor: Rc<executor::Foreground>, |
| 199 | background_executor: Arc<executor::Background>, |
| 200 | ) -> Self { |
| 201 | Self { |
| 202 | inner: Rc::new(RefCell::new(HashMap::new())), |
| 203 | bundled_asset_provider, |
| 204 | foreground_executor, |
| 205 | background_executor, |
| 206 | } |
| 207 | } |
| 208 | |
| 209 | /// Tracks the current total size of raw assets in memory. |
| 210 | pub fn get_total_raw_asset_size(&self) -> usize { |
| 211 | self.inner |
| 212 | .borrow() |
| 213 | .iter() |
| 214 | .filter_map(|(handle, state)| { |
| 215 | if let AssetStateInternal::Loaded { size_in_bytes, .. } = state { |
| 216 | if matches!(handle.source, AssetSource::Raw { .. }) { |
| 217 | return Some(*size_in_bytes); |
| 218 | } |
| 219 | } |
| 220 | None |
| 221 | }) |
| 222 | .sum() |
| 223 | } |
| 224 | |
| 225 | /// Removes the least recently added raw assets until the total size is within the limit. |
| 226 | fn evict_raw_assets_if_needed(&self, ctx: &ModelContext<Self>) -> Vec<u32> { |
| 227 | let mut total_size = self.get_total_raw_asset_size(); |
| 228 | let mut assets = self.inner.borrow_mut(); |
| 229 | |
| 230 | if total_size <= Self::MAX_RAW_ASSET_SIZE { |
| 231 | return vec![]; |
| 232 | } |
| 233 | |
| 234 | // Collect all raw assets with their timestamps |
| 235 | let mut raw_assets: Vec<_> = assets |
| 236 | .iter() |
| 237 | .filter_map(|(handle, state)| { |
| 238 | if matches!(handle.source, AssetSource::Raw { .. }) { |
| 239 | if let AssetStateInternal::Loaded { |
| 240 | timestamp, |
| 241 | size_in_bytes, |
| 242 | .. |
| 243 | } = state |
| 244 | { |
| 245 | return Some((handle.clone(), *timestamp, *size_in_bytes)); |
| 246 | } |
| 247 | } |
| 248 | None |
| 249 | }) |
| 250 | .collect(); |
| 251 | |
| 252 | // Sort by timestamp (oldest first) |
| 253 | raw_assets.sort_by_key(|&(_, timestamp, _)| timestamp); |
| 254 | |
| 255 | let mut evicted_image_ids = vec![]; |
| 256 | |
| 257 | // Evict until within the limit |
| 258 | for (handle, _, size_in_bytes) in raw_assets { |
| 259 | if total_size <= Self::MAX_RAW_ASSET_SIZE { |
| 260 | break; |
| 261 | } |
| 262 | if let AssetSource::Raw { id } = &handle.source { |
| 263 | if assets.remove(&handle).is_some() { |
| 264 | assets.insert(handle.clone(), AssetStateInternal::Evicted); |
| 265 | ImageCache::as_ref(ctx).evict_image(&handle.source); |
| 266 | total_size -= size_in_bytes; |
| 267 | |
| 268 | if let Ok(id) = id.parse::<u32>() { |
| 269 | evicted_image_ids.push(id); |
| 270 | } |
| 271 | } |
| 272 | } |
| 273 | } |
| 274 | |
| 275 | evicted_image_ids |
| 276 | } |
| 277 | |
| 278 | /// The main API of the asset cache. Given the location of an asset, returns an indicator of the |
| 279 | /// in-memory availability of the asset. If the asset is not already loaded or loading, a background |
| 280 | /// task is spawned to perform the retrieval. |
| 281 | /// |
| 282 | /// Note: this is an idempotent operation. It can be called as many times as needed on a given |
| 283 | /// asset and won't duplicate work. |
| 284 | pub fn load_asset<T: Asset>(&self, source: AssetSource) -> AssetState<T> { |
| 285 | let mut assets = self.inner.borrow_mut(); |
| 286 | |
| 287 | // If we've already seen this asset source, we can simply return the current state of it. Otherwise, |
| 288 | // begin the load. |
| 289 | let key = AssetHandle { |
| 290 | source: source.clone(), |
| 291 | asset_type: TypeId::of::<T>(), |
| 292 | }; |
| 293 | if !assets.contains_key(&key) { |
| 294 | match source.clone() { |
| 295 | AssetSource::Async { fetch, .. } => { |
| 296 | assets.insert(key.clone(), AssetStateInternal::loading()); |
| 297 | let future = (fetch)(); |
| 298 | self.load_asynchronously::<T>(source.clone(), future); |
| 299 | } |
| 300 | AssetSource::Bundled { path } => { |
| 301 | let asset_state = match self |
| 302 | .bundled_asset_provider |
| 303 | .get(path) |
| 304 | .and_then(|bytes| T::try_from_bytes(&bytes)) |
| 305 | { |
| 306 | Ok(asset) => { |
| 307 | let timestamp = instant::now() as u64; |
| 308 | let size_in_bytes = asset.size_in_bytes(); |
| 309 | |
| 310 | AssetStateInternal::Loaded { |
| 311 | data: Rc::new(asset) as Rc<dyn Any>, |
| 312 | timestamp, |
| 313 | size_in_bytes, |
| 314 | } |
| 315 | } |
| 316 | Err(err) => AssetStateInternal::Error(Rc::new(err)), |
| 317 | }; |
| 318 | assets.insert(key.clone(), asset_state); |
| 319 | } |
| 320 | AssetSource::LocalFile { path } => { |
| 321 | assets.insert(key.clone(), AssetStateInternal::loading()); |
| 322 | self.load_asynchronously::<T>( |
| 323 | source.clone(), |
| 324 | Box::pin(async move { |
| 325 | let buffer = async_fs::read(path).await?; |
| 326 | Ok(buffer.into()) |
| 327 | }), |
| 328 | ); |
| 329 | } |
| 330 | AssetSource::Raw { id } => { |
| 331 | assets.insert( |
| 332 | key.clone(), |
| 333 | AssetStateInternal::Error(Rc::new(anyhow!( |
| 334 | "Raw image with ID {:?} did not exist", |
| 335 | id |
| 336 | ))), |
| 337 | ); |
| 338 | } |
| 339 | }; |
| 340 | } |
| 341 | |
| 342 | assets[&key].to_external_type(source) |
| 343 | } |
| 344 | |
| 345 | pub fn insert_raw_asset_bytes<T: Asset>( |
| 346 | &self, |
| 347 | id: String, |
| 348 | bytes: &[u8], |
| 349 | ctx: &mut ModelContext<Self>, |
| 350 | ) { |
| 351 | let mut assets = self.inner.borrow_mut(); |
| 352 | let source = AssetSource::Raw { id: id.clone() }; |
| 353 | let key = AssetHandle { |
| 354 | source: source.clone(), |
| 355 | asset_type: TypeId::of::<T>(), |
| 356 | }; |
| 357 | match T::try_from_bytes(bytes) { |
| 358 | Ok(asset) => { |
| 359 | let timestamp = instant::now() as u64; |
| 360 | let size_in_bytes = asset.size_in_bytes(); |
| 361 | |
| 362 | assets.insert( |
| 363 | key.clone(), |
| 364 | AssetStateInternal::Loaded { |
| 365 | data: Rc::new(asset) as Rc<dyn Any>, |
| 366 | timestamp, |
| 367 | size_in_bytes, |
| 368 | }, |
| 369 | ); |
| 370 | } |
| 371 | Err(err) => { |
| 372 | log::warn!("Raw asset conversion failed (ID: {id}): {err:#}"); |
| 373 | assets.insert(key.clone(), AssetStateInternal::Error(Rc::new(err))); |
| 374 | } |
| 375 | }; |
| 376 | |
| 377 | ImageCache::as_ref(ctx).evict_image(&source); |
| 378 | |
| 379 | drop(assets); |
| 380 | let image_ids = self.evict_raw_assets_if_needed(ctx); |
| 381 | |
| 382 | if !image_ids.is_empty() { |
| 383 | ctx.emit(AssetCacheEvent::ImagesEvicted { image_ids }); |
| 384 | } |
| 385 | } |
| 386 | |
| 387 | // Creates a future that resolves when an asset is loaded into moemory. |
| 388 | fn create_future_for_loading_asset( |
| 389 | &self, |
| 390 | asset_handle: &AssetHandle, |
| 391 | ) -> Option<BoxFuture<'static, ()>> { |
| 392 | let assets = self.inner.borrow_mut(); |
| 393 | |
| 394 | assets.get(asset_handle).map(|asset_state| { |
| 395 | match asset_state { |
| 396 | AssetStateInternal::Loading { channel } => { |
| 397 | // Internally, the future works by cloning a new receiver on the channel that's assigned |
| 398 | // to this asset. Inside the future, we simply wait on the receiving end of the channel. |
| 399 | // Note that the channel is held by the AssetStateInternal::Loading variant, so when the asset |
| 400 | // is promoted to the Loaded or FailedToLoad variants, the channel is dropped. This returns a |
| 401 | // RecvError to any receivers, serving as our notification that the asset is no longer loading. |
| 402 | let rx = channel.1.clone(); |
| 403 | async move { |
| 404 | let _ = rx.recv().await; |
| 405 | } |
| 406 | .boxed() |
| 407 | } |
| 408 | // If the asset isn't currently loading, it is either already loaded or it's in an error state. Either |
| 409 | // way, we should return a future that resolves immediately since there's no more pending updates |
| 410 | // for this asset. |
| 411 | _ => futures::future::ready(()).boxed(), |
| 412 | } |
| 413 | }) |
| 414 | } |
| 415 | |
| 416 | // Helper method to spawn the futures that perform an asset load and place the results into the asset cache. |
| 417 | fn load_asynchronously<T: Asset>( |
| 418 | &self, |
| 419 | asset_source: AssetSource, |
| 420 | future: Pin<Box<dyn FetchAsset>>, |
| 421 | ) { |
| 422 | let (tx, rx) = futures::channel::oneshot::channel(); |
| 423 | |
| 424 | // Spawn the work on the background executor. |
| 425 | self.background_executor |
| 426 | .spawn(async move { |
| 427 | let result = future.await; |
| 428 | // When the fetch finished, send the results to the future running on the foreground executor. |
| 429 | if tx.send(result).is_err() { |
| 430 | log::error!("Error sending background task result to main thread"); |
| 431 | } |
| 432 | }) |
| 433 | .detach(); |
| 434 | |
| 435 | // Spawn a receiver on the foreground executor. |
| 436 | let assets = Rc::downgrade(&self.inner); |
| 437 | self.foreground_executor |
| 438 | .spawn_boxed(Box::pin(async move { |
| 439 | let result = match rx.await { |
| 440 | Ok(result) => result, |
| 441 | Err(_) => { |
| 442 | let msg = "sender unexpectedly dropped before receiver"; |
| 443 | log::error!("{msg}"); |
| 444 | Err(anyhow!(msg)) |
| 445 | } |
| 446 | }; |
| 447 | |
| 448 | let Some(assets) = assets.upgrade() else { |
| 449 | return; |
| 450 | }; |
| 451 | |
| 452 | let mut assets = assets.borrow_mut(); |
| 453 | |
| 454 | // Populate the asset cache with the result. |
| 455 | let handle = AssetHandle { |
| 456 | source: asset_source.clone(), |
| 457 | asset_type: TypeId::of::<T>(), |
| 458 | }; |
| 459 | match result { |
| 460 | Ok(bytes) => match T::try_from_bytes(&bytes) { |
| 461 | Ok(asset) => { |
| 462 | log::debug!("Asset fetch succeeded: {asset_source:?}"); |
| 463 | |
| 464 | let timestamp = instant::now() as u64; |
| 465 | let size_in_bytes = asset.size_in_bytes(); |
| 466 | |
| 467 | assets.insert( |
| 468 | handle, |
| 469 | AssetStateInternal::Loaded { |
| 470 | data: Rc::new(asset) as Rc<dyn Any>, |
| 471 | timestamp, |
| 472 | size_in_bytes, |
| 473 | }, |
| 474 | ); |
| 475 | } |
| 476 | Err(err) => { |
| 477 | log::warn!("Asset conversion failed ({asset_source:?}): {err:#}"); |
| 478 | assets.insert(handle, AssetStateInternal::Error(Rc::new(err))); |
| 479 | } |
| 480 | }, |
| 481 | Err(err) => { |
| 482 | log::warn!("Asset fetch failed ({asset_source:?}): {err:#}"); |
| 483 | assets.insert(handle, AssetStateInternal::Error(Rc::new(err))); |
| 484 | } |
| 485 | } |
| 486 | })) |
| 487 | .detach(); |
| 488 | } |
| 489 | } |
| 490 | |
| 491 | #[derive(Debug, Clone)] |
| 492 | pub enum AssetCacheEvent { |
| 493 | ImagesEvicted { image_ids: Vec<u32> }, |
| 494 | } |
| 495 | |
| 496 | impl Entity for AssetCache { |
| 497 | type Event = AssetCacheEvent; |
| 498 | } |
| 499 | |
| 500 | impl SingletonEntity for AssetCache {} |
| 501 |