Seregon/StratoSDK

StratoSDK is a framework with a declarative approach similar to Flutter/React, written and designed entirely for Rust.

Rust/27.3 KB/No license
crates/strato-ui-core/src/assets/asset_cache.rs
StratoSDK / crates / strato-ui-core / src / assets / asset_cache.rs
1use anyhow::anyhow;
2use anyhow::{Error, Result};
3use async_channel::{self, Receiver, Sender};
4use bytes::Bytes;
5use derivative::Derivative;
6use futures::FutureExt as _;
7use futures::{future::BoxFuture, Future};
8use std::any::{Any, TypeId};
9use std::pin::Pin;
10use std::{cell::RefCell, collections::HashMap, hash::Hash, rc::Rc, sync::Arc};
11 
12use crate::image_cache::ImageCache;
13use crate::{r#async::executor, Entity, ModelContext, SingletonEntity};
14 
15use super::AssetProvider;
16 
17pub trait FetchAsset: crate::r#async::Spawnable + Future<Output = Result<Bytes>> {}
18impl<T: crate::r#async::Spawnable + Future<Output = Result<Bytes>> + ?Sized> FetchAsset for T {}
19 
20/// Marker trait for async asset ID namespaces.
21///
22/// Each distinct kind of async asset source defines its own zero-sized marker
23/// type that implements this trait. The marker's [`TypeId`] is stored inside
24/// [`AsyncAssetId`] so that IDs from different sources can never collide, even
25/// if they happen to share the same key string.
26pub trait AsyncAssetType: 'static {}
27 
28/// A namespaced identifier for an [`AssetSource::Async`] entry.
29///
30/// The namespace is stored as a [`TypeId`] derived from a marker type that
31/// implements [`AsyncAssetType`]. This guarantees that two different async
32/// sources cannot accidentally produce colliding cache keys.
33#[derive(Clone, Hash, PartialEq, Eq)]
34pub struct AsyncAssetId {
35 namespace: TypeId,
36 key: String,
37}
38 
39impl AsyncAssetId {
40 /// Creates a new ID in the namespace defined by `N`.
41 pub fn new<N: AsyncAssetType>(key: impl Into<String>) -> Self {
42 Self {
43 namespace: TypeId::of::<N>(),
44 key: key.into(),
45 }
46 }
47 
48 /// Returns the key portion of this ID.
49 pub fn key(&self) -> &str {
50 &self.key
51 }
52}
53 
54impl std::fmt::Debug for AsyncAssetId {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 // TypeId's Debug output is opaque, so just print the key.
57 f.debug_struct("AsyncAssetId")
58 .field("key", &self.key)
59 .finish()
60 }
61}
62 
63/// A "URI" for some data file. In other words, the location of an asset.
64#[derive(Derivative)]
65#[derivative(Clone, Hash, PartialEq, Eq, Debug)]
66pub enum AssetSource {
67 /// Loaded from an arbitrary asynchronous source (e.g. a URL fetch).
68 Async {
69 /// A namespaced identifier used as the cache key.
70 id: AsyncAssetId,
71 /// A factory that produces the future to fetch the asset bytes.
72 /// Called at most once per unique `id` — only when the asset is
73 /// not already loaded or loading.
74 #[derivative(Hash = "ignore", PartialEq = "ignore", Debug = "ignore")]
75 fetch: Arc<dyn Fn() -> Pin<Box<dyn FetchAsset>> + Send + Sync>,
76 },
77 /// Included in the app bundle.
78 Bundled {
79 // Assets that are statically included in the bundle can be statically
80 // referenced, hence using a `&'static str` here and not a `String`.
81 path: &'static str,
82 },
83 /// Accessible in the user's local filesystem at the provided path.
84 LocalFile { path: String },
85 /// Image loaded directly with bytes
86 Raw { id: String },
87}
88 
89/// The public representation of an asset's current state (i.e., in-memory availability).
90pub enum AssetState<T> {
91 Loading { handle: AssetHandle },
92 Loaded { data: Rc<T> },
93 Evicted,
94 FailedToLoad(Rc<Error>),
95}
96 
97/// An external type so views can refer to the asset they requested.
98/// Transforms into a future that resolves once the asset is finished loading, allowing
99/// work to be scheduled at the time of load completion.
100#[derive(Clone, Hash, PartialEq, Eq, Debug)]
101pub struct AssetHandle {
102 source: AssetSource,
103 asset_type: TypeId,
104}
105 
106impl AssetHandle {
107 /// Creates a future that resolves whenever the asset is finished loading.
108 pub fn when_loaded(&self, asset_cache: &AssetCache) -> Option<BoxFuture<'static, ()>> {
109 asset_cache.create_future_for_loading_asset(self)
110 }
111}
112 
113/// An internal representation of an asset's state, as it's tracked and updated by the
114/// AssetCache. An implementation.
115enum AssetStateInternal {
116 Loading {
117 channel: (Sender<()>, Receiver<()>),
118 },
119 Loaded {
120 data: Rc<dyn Any>,
121 timestamp: u64,
122 size_in_bytes: usize,
123 },
124 Evicted,
125 Error(Rc<Error>),
126}
127 
128impl AssetStateInternal {
129 fn loading() -> Self {
130 // Whenever we add an asset in a loading state, we create a channel that
131 // can be alerted once the asset load completes (i.e., becomes available
132 // or encounters an error). The channel must support the ability to clone one
133 // side of the channel.
134 let channel = async_channel::bounded(1);
135 AssetStateInternal::Loading { channel }
136 }
137 
138 fn to_external_type<T: Asset>(&self, source: AssetSource) -> AssetState<T> {
139 match self {
140 AssetStateInternal::Loading { .. } => AssetState::Loading {
141 handle: AssetHandle {
142 source,
143 asset_type: TypeId::of::<T>(),
144 },
145 },
146 AssetStateInternal::Loaded { data, .. } => AssetState::Loaded {
147 data: data
148 .clone()
149 .downcast::<T>()
150 .expect("should not fail to downcast"),
151 },
152 AssetStateInternal::Evicted => AssetState::Evicted,
153 AssetStateInternal::Error(err) => AssetState::FailedToLoad(err.clone()),
154 }
155 }
156}
157 
158/// A general-purpose data cache for managing assets. Generalized to any file type.
159/// Internally handles networking and persistence caching.
160pub struct AssetCache {
161 // Note: interior mutability allows us to update the state of an asset
162 // without requiring a mutable reference to the AssetCache.
163 inner: Rc<RefCell<HashMap<AssetHandle, AssetStateInternal>>>,
164 
165 bundled_asset_provider: Box<dyn AssetProvider>,
166 foreground_executor: Rc<executor::Foreground>,
167 background_executor: Arc<executor::Background>,
168}
169 
170pub trait Asset: Any {
171 fn try_from_bytes(data: &[u8]) -> anyhow::Result<Self>
172 where
173 Self: Sized;
174 
175 fn size_in_bytes(&self) -> usize;
176}
177 
178impl Asset for String {
179 fn try_from_bytes(data: &[u8]) -> anyhow::Result<Self>
180 where
181 Self: Sized,
182 {
183 std::str::from_utf8(data)
184 .map(|s| s.to_string())
185 .map_err(|e| e.into())
186 }
187 
188 fn size_in_bytes(&self) -> usize {
189 self.len()
190 }
191}
192 
193impl AssetCache {
194 const MAX_RAW_ASSET_SIZE: usize = 320 * 1000 * 1000; // 320MB
195 
196 pub fn new(
197 bundled_asset_provider: Box<dyn AssetProvider>,
198 foreground_executor: Rc<executor::Foreground>,
199 background_executor: Arc<executor::Background>,
200 ) -> Self {
201 Self {
202 inner: Rc::new(RefCell::new(HashMap::new())),
203 bundled_asset_provider,
204 foreground_executor,
205 background_executor,
206 }
207 }
208 
209 /// Tracks the current total size of raw assets in memory.
210 pub fn get_total_raw_asset_size(&self) -> usize {
211 self.inner
212 .borrow()
213 .iter()
214 .filter_map(|(handle, state)| {
215 if let AssetStateInternal::Loaded { size_in_bytes, .. } = state {
216 if matches!(handle.source, AssetSource::Raw { .. }) {
217 return Some(*size_in_bytes);
218 }
219 }
220 None
221 })
222 .sum()
223 }
224 
225 /// Removes the least recently added raw assets until the total size is within the limit.
226 fn evict_raw_assets_if_needed(&self, ctx: &ModelContext<Self>) -> Vec<u32> {
227 let mut total_size = self.get_total_raw_asset_size();
228 let mut assets = self.inner.borrow_mut();
229 
230 if total_size <= Self::MAX_RAW_ASSET_SIZE {
231 return vec![];
232 }
233 
234 // Collect all raw assets with their timestamps
235 let mut raw_assets: Vec<_> = assets
236 .iter()
237 .filter_map(|(handle, state)| {
238 if matches!(handle.source, AssetSource::Raw { .. }) {
239 if let AssetStateInternal::Loaded {
240 timestamp,
241 size_in_bytes,
242 ..
243 } = state
244 {
245 return Some((handle.clone(), *timestamp, *size_in_bytes));
246 }
247 }
248 None
249 })
250 .collect();
251 
252 // Sort by timestamp (oldest first)
253 raw_assets.sort_by_key(|&(_, timestamp, _)| timestamp);
254 
255 let mut evicted_image_ids = vec![];
256 
257 // Evict until within the limit
258 for (handle, _, size_in_bytes) in raw_assets {
259 if total_size <= Self::MAX_RAW_ASSET_SIZE {
260 break;
261 }
262 if let AssetSource::Raw { id } = &handle.source {
263 if assets.remove(&handle).is_some() {
264 assets.insert(handle.clone(), AssetStateInternal::Evicted);
265 ImageCache::as_ref(ctx).evict_image(&handle.source);
266 total_size -= size_in_bytes;
267 
268 if let Ok(id) = id.parse::<u32>() {
269 evicted_image_ids.push(id);
270 }
271 }
272 }
273 }
274 
275 evicted_image_ids
276 }
277 
278 /// The main API of the asset cache. Given the location of an asset, returns an indicator of the
279 /// in-memory availability of the asset. If the asset is not already loaded or loading, a background
280 /// task is spawned to perform the retrieval.
281 ///
282 /// Note: this is an idempotent operation. It can be called as many times as needed on a given
283 /// asset and won't duplicate work.
284 pub fn load_asset<T: Asset>(&self, source: AssetSource) -> AssetState<T> {
285 let mut assets = self.inner.borrow_mut();
286 
287 // If we've already seen this asset source, we can simply return the current state of it. Otherwise,
288 // begin the load.
289 let key = AssetHandle {
290 source: source.clone(),
291 asset_type: TypeId::of::<T>(),
292 };
293 if !assets.contains_key(&key) {
294 match source.clone() {
295 AssetSource::Async { fetch, .. } => {
296 assets.insert(key.clone(), AssetStateInternal::loading());
297 let future = (fetch)();
298 self.load_asynchronously::<T>(source.clone(), future);
299 }
300 AssetSource::Bundled { path } => {
301 let asset_state = match self
302 .bundled_asset_provider
303 .get(path)
304 .and_then(|bytes| T::try_from_bytes(&bytes))
305 {
306 Ok(asset) => {
307 let timestamp = instant::now() as u64;
308 let size_in_bytes = asset.size_in_bytes();
309 
310 AssetStateInternal::Loaded {
311 data: Rc::new(asset) as Rc<dyn Any>,
312 timestamp,
313 size_in_bytes,
314 }
315 }
316 Err(err) => AssetStateInternal::Error(Rc::new(err)),
317 };
318 assets.insert(key.clone(), asset_state);
319 }
320 AssetSource::LocalFile { path } => {
321 assets.insert(key.clone(), AssetStateInternal::loading());
322 self.load_asynchronously::<T>(
323 source.clone(),
324 Box::pin(async move {
325 let buffer = async_fs::read(path).await?;
326 Ok(buffer.into())
327 }),
328 );
329 }
330 AssetSource::Raw { id } => {
331 assets.insert(
332 key.clone(),
333 AssetStateInternal::Error(Rc::new(anyhow!(
334 "Raw image with ID {:?} did not exist",
335 id
336 ))),
337 );
338 }
339 };
340 }
341 
342 assets[&key].to_external_type(source)
343 }
344 
345 pub fn insert_raw_asset_bytes<T: Asset>(
346 &self,
347 id: String,
348 bytes: &[u8],
349 ctx: &mut ModelContext<Self>,
350 ) {
351 let mut assets = self.inner.borrow_mut();
352 let source = AssetSource::Raw { id: id.clone() };
353 let key = AssetHandle {
354 source: source.clone(),
355 asset_type: TypeId::of::<T>(),
356 };
357 match T::try_from_bytes(bytes) {
358 Ok(asset) => {
359 let timestamp = instant::now() as u64;
360 let size_in_bytes = asset.size_in_bytes();
361 
362 assets.insert(
363 key.clone(),
364 AssetStateInternal::Loaded {
365 data: Rc::new(asset) as Rc<dyn Any>,
366 timestamp,
367 size_in_bytes,
368 },
369 );
370 }
371 Err(err) => {
372 log::warn!("Raw asset conversion failed (ID: {id}): {err:#}");
373 assets.insert(key.clone(), AssetStateInternal::Error(Rc::new(err)));
374 }
375 };
376 
377 ImageCache::as_ref(ctx).evict_image(&source);
378 
379 drop(assets);
380 let image_ids = self.evict_raw_assets_if_needed(ctx);
381 
382 if !image_ids.is_empty() {
383 ctx.emit(AssetCacheEvent::ImagesEvicted { image_ids });
384 }
385 }
386 
387 // Creates a future that resolves when an asset is loaded into moemory.
388 fn create_future_for_loading_asset(
389 &self,
390 asset_handle: &AssetHandle,
391 ) -> Option<BoxFuture<'static, ()>> {
392 let assets = self.inner.borrow_mut();
393 
394 assets.get(asset_handle).map(|asset_state| {
395 match asset_state {
396 AssetStateInternal::Loading { channel } => {
397 // Internally, the future works by cloning a new receiver on the channel that's assigned
398 // to this asset. Inside the future, we simply wait on the receiving end of the channel.
399 // Note that the channel is held by the AssetStateInternal::Loading variant, so when the asset
400 // is promoted to the Loaded or FailedToLoad variants, the channel is dropped. This returns a
401 // RecvError to any receivers, serving as our notification that the asset is no longer loading.
402 let rx = channel.1.clone();
403 async move {
404 let _ = rx.recv().await;
405 }
406 .boxed()
407 }
408 // If the asset isn't currently loading, it is either already loaded or it's in an error state. Either
409 // way, we should return a future that resolves immediately since there's no more pending updates
410 // for this asset.
411 _ => futures::future::ready(()).boxed(),
412 }
413 })
414 }
415 
416 // Helper method to spawn the futures that perform an asset load and place the results into the asset cache.
417 fn load_asynchronously<T: Asset>(
418 &self,
419 asset_source: AssetSource,
420 future: Pin<Box<dyn FetchAsset>>,
421 ) {
422 let (tx, rx) = futures::channel::oneshot::channel();
423 
424 // Spawn the work on the background executor.
425 self.background_executor
426 .spawn(async move {
427 let result = future.await;
428 // When the fetch finished, send the results to the future running on the foreground executor.
429 if tx.send(result).is_err() {
430 log::error!("Error sending background task result to main thread");
431 }
432 })
433 .detach();
434 
435 // Spawn a receiver on the foreground executor.
436 let assets = Rc::downgrade(&self.inner);
437 self.foreground_executor
438 .spawn_boxed(Box::pin(async move {
439 let result = match rx.await {
440 Ok(result) => result,
441 Err(_) => {
442 let msg = "sender unexpectedly dropped before receiver";
443 log::error!("{msg}");
444 Err(anyhow!(msg))
445 }
446 };
447 
448 let Some(assets) = assets.upgrade() else {
449 return;
450 };
451 
452 let mut assets = assets.borrow_mut();
453 
454 // Populate the asset cache with the result.
455 let handle = AssetHandle {
456 source: asset_source.clone(),
457 asset_type: TypeId::of::<T>(),
458 };
459 match result {
460 Ok(bytes) => match T::try_from_bytes(&bytes) {
461 Ok(asset) => {
462 log::debug!("Asset fetch succeeded: {asset_source:?}");
463 
464 let timestamp = instant::now() as u64;
465 let size_in_bytes = asset.size_in_bytes();
466 
467 assets.insert(
468 handle,
469 AssetStateInternal::Loaded {
470 data: Rc::new(asset) as Rc<dyn Any>,
471 timestamp,
472 size_in_bytes,
473 },
474 );
475 }
476 Err(err) => {
477 log::warn!("Asset conversion failed ({asset_source:?}): {err:#}");
478 assets.insert(handle, AssetStateInternal::Error(Rc::new(err)));
479 }
480 },
481 Err(err) => {
482 log::warn!("Asset fetch failed ({asset_source:?}): {err:#}");
483 assets.insert(handle, AssetStateInternal::Error(Rc::new(err)));
484 }
485 }
486 }))
487 .detach();
488 }
489}
490 
491#[derive(Debug, Clone)]
492pub enum AssetCacheEvent {
493 ImagesEvicted { image_ids: Vec<u32> },
494}
495 
496impl Entity for AssetCache {
497 type Event = AssetCacheEvent;
498}
499 
500impl SingletonEntity for AssetCache {}
501