Seregon/StratoSDK

StratoSDK is a framework with a declarative approach similar to Flutter/React, written and designed entirely for Rust.

Rust/27.3 KB/No license
crates/strato-ui-core/src/core/model/context.rs
StratoSDK / crates / strato-ui-core / src / core / model / context.rs
1use std::{any::Any, future::Future, marker::PhantomData, sync::Arc};
2 
3use crate::{
4 r#async::{SpawnableOutput, Timer},
5 windowing::WindowManager,
6 ReadModel, ReadView, UpdateView, View, ViewAsRef, ViewContext, ViewHandle, WeakModelHandle,
7};
8use anyhow::Result;
9use futures::{
10 stream::{AbortHandle, Abortable},
11 FutureExt,
12};
13use thiserror::Error;
14 
15use crate::{
16 accessibility::AccessibilityContent,
17 core::{Observation, Subscription, SubscriptionKey, TaskCallback},
18 r#async::{executor, SpawnedFutureHandle, SpawnedLocalStream},
19 AppContext, Effect, Entity, EntityId, GetSingletonModelHandle, ModelAsRef, ModelHandle,
20 RequestState, RetryOption, UpdateModel,
21};
22 
23/// Error returned when a model has been dropped, and so references to it are invalid.
24#[derive(Debug, Error, PartialEq, Eq)]
25#[error("Model has been dropped")]
26pub struct ModelDropped;
27 
28/// Structure that combines model identifiers and a handle to the application
29/// context/application state.
30pub struct ModelContext<'a, T: ?Sized> {
31 app: &'a mut AppContext,
32 model_id: EntityId,
33 model_type: PhantomData<T>,
34}
35 
36impl<'a, T: Entity> ModelContext<'a, T> {
37 pub(in crate::core) fn new(app: &'a mut AppContext, model_id: EntityId) -> Self {
38 Self {
39 app,
40 model_id,
41 model_type: PhantomData,
42 }
43 }
44 
45 pub fn handle(&self) -> WeakModelHandle<T> {
46 WeakModelHandle::new(self.model_id)
47 }
48 
49 pub fn background_executor(&self) -> Arc<executor::Background> {
50 self.app.background_executor().clone()
51 }
52 
53 pub fn model_id(&self) -> EntityId {
54 self.model_id
55 }
56 
57 pub fn windows(&self) -> &WindowManager {
58 self.app.windows()
59 }
60 
61 pub fn add_model<S, F>(&mut self, build_model: F) -> ModelHandle<S>
62 where
63 S: Entity,
64 F: FnOnce(&mut ModelContext<S>) -> S,
65 {
66 self.app.add_model(build_model)
67 }
68 
69 pub fn subscribe_to_model<S: Entity, F>(&mut self, handle: &ModelHandle<S>, mut callback: F)
70 where
71 S::Event: 'static,
72 F: 'static + FnMut(&mut T, &S::Event, &mut ModelContext<T>),
73 {
74 self.app
75 .subscriptions
76 .entry(handle.id())
77 .or_default()
78 .push(Subscription::FromModel {
79 model_id: self.model_id,
80 callback: Box::new(move |model, payload, app, model_id| {
81 let model = model.downcast_mut().expect("downcast is type safe");
82 let payload: &<S as Entity>::Event =
83 payload.downcast_ref().expect("downcast is type safe");
84 let mut ctx = ModelContext::new(app, model_id);
85 callback(model, payload, &mut ctx);
86 }),
87 });
88 }
89 
90 pub fn unsubscribe_from_model<E>(&mut self, handle: &ModelHandle<E>)
91 where
92 E: Entity,
93 E::Event: 'static,
94 {
95 let target_entity = handle.id();
96 
97 // If we're currently emitting events for this entity, defer the unsubscribe.
98 if let Some(ref mut pending) = self.app.pending_unsubscribes {
99 if pending.entity_id == target_entity {
100 pending.keys.insert(SubscriptionKey::Model(self.model_id));
101 
102 // Remove subscriptions created earlier in this emission so subscribe-then-unsubscribe ordering is preserved.
103 if let std::collections::hash_map::Entry::Occupied(mut entry) =
104 self.app.subscriptions.entry(target_entity)
105 {
106 entry.get_mut().retain(|subscription| match subscription {
107 Subscription::FromView { .. } | Subscription::FromApp { .. } => true,
108 Subscription::FromModel { model_id, .. } => *model_id != self.model_id,
109 });
110 
111 if entry.get().is_empty() {
112 entry.remove();
113 }
114 }
115 
116 return;
117 }
118 }
119 
120 // Otherwise process immediately.
121 self.app
122 .subscriptions
123 .entry(target_entity)
124 .or_default()
125 .retain(|subscription| match subscription {
126 Subscription::FromView { .. } | Subscription::FromApp { .. } => true,
127 Subscription::FromModel { model_id, .. } => *model_id != self.model_id,
128 })
129 }
130 
131 pub fn subscribe_to_view<V, F>(&mut self, handle: &ViewHandle<V>, mut callback: F)
132 where
133 V: View,
134 V::Event: 'static,
135 F: 'static + FnMut(&mut T, &V::Event, &mut ModelContext<T>),
136 {
137 self.app
138 .subscriptions
139 .entry(handle.id())
140 .or_default()
141 .push(Subscription::FromModel {
142 model_id: self.model_id,
143 callback: Box::new(move |model, payload, app, model_id| {
144 let model = model.downcast_mut().expect("downcast is type safe");
145 let payload = payload.downcast_ref().expect("downcast is type safe");
146 let mut ctx = ModelContext::new(app, model_id);
147 callback(model, payload, &mut ctx);
148 }),
149 });
150 }
151 
152 pub fn unsubscribe_from_view<V>(&mut self, handle: &ViewHandle<V>)
153 where
154 V: View,
155 V::Event: 'static,
156 {
157 let target_entity = handle.id();
158 
159 // If we're currently emitting events for this entity, defer the unsubscribe.
160 if let Some(ref mut pending) = self.app.pending_unsubscribes {
161 if pending.entity_id == target_entity {
162 pending.keys.insert(SubscriptionKey::Model(self.model_id));
163 
164 // Remove subscriptions created earlier in this emission so subscribe-then-unsubscribe ordering is preserved.
165 if let std::collections::hash_map::Entry::Occupied(mut entry) =
166 self.app.subscriptions.entry(target_entity)
167 {
168 entry.get_mut().retain(|subscription| match subscription {
169 Subscription::FromView { .. } | Subscription::FromApp { .. } => true,
170 Subscription::FromModel { model_id, .. } => *model_id != self.model_id,
171 });
172 
173 if entry.get().is_empty() {
174 entry.remove();
175 }
176 }
177 
178 return;
179 }
180 }
181 
182 // Otherwise process immediately.
183 self.app
184 .subscriptions
185 .entry(target_entity)
186 .or_default()
187 .retain(|subscription| match subscription {
188 Subscription::FromView { .. } | Subscription::FromApp { .. } => true,
189 Subscription::FromModel { model_id, .. } => *model_id != self.model_id,
190 })
191 }
192 
193 pub fn emit(&mut self, payload: T::Event) {
194 self.app.pending_effects.push_back(Effect::Event {
195 entity_id: self.model_id,
196 payload: Box::new(payload),
197 });
198 }
199 
200 /// Global actions are being phased out. Prefer dispatching typed actions instead of global actions.
201 /// Dispatch a global action to be handled by the registered handler
202 ///
203 /// Note: The dispatch of the global action will be registered as an effect and flushed after
204 /// the current model update is complete. This will ensure that the model has been re-inserted
205 /// into the `AppContext`, so it will be accessible to the global action, if necessary
206 #[track_caller]
207 pub fn dispatch_global_action<A: Any>(&mut self, name: &'static str, arg: A) {
208 let location = std::panic::Location::caller();
209 self.app.pending_effects.push_back(Effect::GlobalAction {
210 name,
211 location,
212 arg: Box::new(arg),
213 });
214 }
215 
216 pub fn observe<S, F>(&mut self, handle: &ModelHandle<S>, mut callback: F)
217 where
218 S: Entity,
219 F: 'static + FnMut(&mut T, ModelHandle<S>, &mut ModelContext<T>),
220 {
221 self.app
222 .observations
223 .entry(handle.id())
224 .or_default()
225 .push(Observation::FromModel {
226 model_id: self.model_id,
227 callback: Box::new(move |model, observed_id, app, model_id| {
228 let model = model.downcast_mut().expect("downcast is type safe");
229 let observed = ModelHandle::new(observed_id, &app.ref_counts);
230 let mut ctx = ModelContext::new(app, model_id);
231 callback(model, observed, &mut ctx);
232 }),
233 });
234 }
235 
236 pub fn notify(&mut self) {
237 // If the last effect is a model notification for this model,
238 // don't add another one.
239 if let Some(Effect::ModelNotification { model_id }) = self.app.pending_effects.back() {
240 if *model_id == self.model_id {
241 return;
242 }
243 }
244 
245 self.app
246 .pending_effects
247 .push_back(Effect::ModelNotification {
248 model_id: self.model_id,
249 });
250 }
251 
252 /// Emit AccessibilityContent
253 /// This method lets propagate any content to the screen reader on demand (doesn't need to be
254 /// tied with actions or specific events).
255 pub fn emit_a11y_content(&mut self, content: AccessibilityContent) {
256 self.app
257 .platform_delegate
258 .set_accessibility_contents(content);
259 }
260 
261 // Only public in crate::core so it can be used by ui/src/core/mod_test.rs.
262 pub(in crate::core) fn spawn_local<S, F, U>(
263 &mut self,
264 future: S,
265 callback: F,
266 ) -> impl Future<Output = ()>
267 where
268 S: 'static + Future,
269 F: 'static + FnOnce(&mut T, S::Output, &mut ModelContext<T>) -> U,
270 U: 'static,
271 {
272 let (tx, rx) = futures::channel::oneshot::channel();
273 
274 let task_id = self.app.spawn_local(future);
275 
276 self.app.task_callbacks.insert(
277 task_id,
278 TaskCallback::ModelFromFuture {
279 model_id: self.model_id,
280 callback: Box::new(move |model, output, app, model_id| {
281 let model = model.downcast_mut().unwrap();
282 let output = *output.downcast().unwrap();
283 let result = callback(model, output, &mut ModelContext::new(app, model_id));
284 let _ = tx.send(result);
285 }),
286 },
287 );
288 
289 async move {
290 if rx.await.is_err() {
291 log::error!("sender unexpectedly dropped before receiver");
292 }
293 }
294 }
295 
296 /// Schedules a future that returns a Result type to run on the background thread.
297 /// If the future resolves to success (Ok), call the set callback with RequestState::RequestSucceeded.
298 /// If the future fails and we still have remaining retry counts, call the set callback
299 /// with RequestState::RequestFailedRetryPending and retry based on the RetryOption.
300 /// Otherwise, call the set callback with RequestState::RequestFailed.
301 pub fn spawn_with_retry_on_error<P, S, F, M>(
302 &mut self,
303 future_closure: P,
304 retry_option: RetryOption,
305 callback: F,
306 ) -> SpawnedFutureHandle
307 where
308 P: 'static + FnMut() -> S,
309 S: crate::r#async::Spawnable + Future<Output = Result<M>>,
310 <S as Future>::Output: crate::r#async::SpawnableOutput,
311 F: 'static + FnMut(&mut T, RequestState<M>, &mut ModelContext<T>),
312 {
313 self.spawn_with_retry_on_error_when(future_closure, retry_option, |_| true, callback)
314 }
315 
316 /// Like [`Self::spawn_with_retry_on_error`], but additionally consults `should_retry` on
317 /// each failure. The chain stops immediately (calling the callback with
318 /// [`RequestState::RequestFailed`]) when `should_retry` returns false, even if retries
319 /// remain on the [`RetryOption`]. Use this for errors that are known to be permanent so
320 /// they don't issue redundant requests — e.g. classify a 403/404 with
321 /// `is_transient_http_error` and skip retries.
322 pub fn spawn_with_retry_on_error_when<P, S, R, F, M>(
323 &mut self,
324 mut future_closure: P,
325 mut retry_option: RetryOption,
326 mut should_retry: R,
327 mut callback: F,
328 ) -> SpawnedFutureHandle
329 where
330 P: 'static + FnMut() -> S,
331 S: crate::r#async::Spawnable + Future<Output = Result<M>>,
332 <S as Future>::Output: crate::r#async::SpawnableOutput,
333 R: 'static + FnMut(&anyhow::Error) -> bool,
334 F: 'static + FnMut(&mut T, RequestState<M>, &mut ModelContext<T>),
335 {
336 let future = future_closure();
337 
338 self.spawn(future, move |me, res, ctx| match res {
339 Ok(success) => {
340 callback(me, RequestState::RequestSucceeded(success), ctx);
341 }
342 Err(e) => {
343 if retry_option.remaining_retry_count == 0 || !should_retry(&e) {
344 callback(me, RequestState::RequestFailed(e), ctx);
345 } else {
346 callback(me, RequestState::RequestFailedRetryPending(e), ctx);
347 
348 let _ = ctx.spawn(
349 async move { Timer::after(retry_option.duration()).await },
350 move |_, _, ctx| {
351 retry_option.advance();
352 ctx.spawn_with_retry_on_error_when(
353 future_closure,
354 retry_option,
355 should_retry,
356 callback,
357 )
358 },
359 );
360 }
361 }
362 })
363 }
364 
365 /// Schedules a future to run on a background thread, invoking a callback on
366 /// the _main_ thread upon completion.
367 ///
368 /// This function is useful in situations where a long-running process needs
369 /// to occur (e.g.: a network request), after which the model needs to be
370 /// updated based on the result.
371 ///
372 /// The callback receives the output of the future, if any, in addition to
373 /// mutable references to the spawning view and its context, allowing for
374 /// dirtying of the model (via [`Self::notify`]) if appropriate.
375 ///
376 /// The future can be aborted by calling `abort` on the returned `SpawnedFutureHandle`. Note the
377 /// future will only be aborted the _next_ time the future is polled.
378 ///
379 /// See [`Self::spawn_abortable`] for an alternative version of this function that accepts an
380 /// `on_abort` function that is called when the future is aborted.
381 pub fn spawn<S, F, U>(&mut self, future: S, callback: F) -> SpawnedFutureHandle
382 where
383 S: crate::r#async::Spawnable,
384 <S as Future>::Output: crate::r#async::SpawnableOutput,
385 F: 'static + FnOnce(&mut T, S::Output, &mut ModelContext<T>) -> U,
386 U: 'static,
387 {
388 self.spawn_abortable::<S, _, _>(
389 future,
390 |view, output, ctx| {
391 callback(view, output, ctx);
392 },
393 |_, _| {},
394 )
395 }
396 
397 /// Schedules a future to run on a background thread, invoking the `on_resolve`
398 /// callback on the _main_ thread upon completion. If the future is aborted, the
399 /// `on_abort` function is called.
400 ///
401 /// This function is useful in situations where a long-running process needs
402 /// to occur (e.g.: a network request), after which the model needs to be
403 /// updated based on the result.
404 ///
405 /// The `on_resolve` callback receives the output of the future, if any, in addition to
406 /// mutable references to the spawning model and its context, allowing for
407 /// dirtying of the view (via [`Self::notify`]) if appropriate.
408 ///
409 /// The future can be aborted by calling `abort` on the returned `SpawnedFutureHandle`. Note, a
410 /// future is not immediately killed on `abort`--it will only be aborted once the future's
411 /// `poll` method returns.
412 ///
413 /// See [`Self::spawn`] for an alternative version of this function that doesn't
414 /// require a callback if/when the future is aborted.
415 pub fn spawn_abortable<S, F, A>(
416 &mut self,
417 future: S,
418 on_resolve: F,
419 on_abort: A,
420 ) -> SpawnedFutureHandle
421 where
422 S: crate::r#async::Spawnable,
423 <S as Future>::Output: crate::r#async::SpawnableOutput,
424 F: 'static + FnOnce(&mut T, S::Output, &mut ModelContext<T>),
425 A: 'static + FnOnce(&mut T, &mut ModelContext<T>),
426 {
427 let (tx, rx) = futures::channel::oneshot::channel();
428 
429 let (abort_handle, abort_registration) = AbortHandle::new_pair();
430 self.app
431 .background_executor()
432 .spawn_boxed(Box::pin(async move {
433 let abortable = Abortable::new(future, abort_registration);
434 let result = abortable.await;
435 if tx.send(result).is_err() {
436 log::error!("Error sending background task result to main thread",);
437 }
438 }))
439 .detach();
440 
441 let future = self.spawn_local(rx, |model, rx_result, ctx| {
442 let output = match rx_result {
443 Ok(output) => output,
444 Err(_) => {
445 log::error!("sender unexpectedly dropped before receiver");
446 on_abort(model, ctx);
447 return;
448 }
449 };
450 
451 // Call the appropriate callback based on the output of resolving the future. If the
452 // future returned `Ok`, the future was not aborted so we can call `on_resolve`. If
453 // the future returned `Err`--the future was aborted.
454 match output {
455 Ok(output) => on_resolve(model, output, ctx),
456 Err(_) => on_abort(model, ctx),
457 }
458 });
459 
460 let future_id = self.app.register_spawned_future(future.boxed());
461 SpawnedFutureHandle::new(abort_handle, future_id)
462 }
463 
464 /// Creates a handle which background tasks can use to spawn work for this model. Spawned tasks
465 /// are executed on the main thread in the context of the model, and results are sent back to
466 /// the background task.
467 ///
468 /// Note that the spawner *does not* keep a strong reference to the model. If the model is
469 /// dropped, any pending or future tasks will be discarded.
470 pub fn spawner(&mut self) -> ModelSpawner<T> {
471 let (task_tx, task_rx) = async_channel::unbounded();
472 let (completion_tx, _completion_rx) = futures::channel::oneshot::channel();
473 
474 let task_id = self.app.spawn_stream_local(task_rx, completion_tx);
475 self.app.task_callbacks.insert(
476 task_id,
477 TaskCallback::ModelFromStream {
478 model_id: self.model_id,
479 on_item: Box::new(move |model, task, app, model_id| {
480 let model = model.downcast_mut().expect("unexpected model type");
481 let task: ModelTask<T> = *task
482 .downcast()
483 .expect("task from spawner should be ModelTask<T>");
484 let mut ctx = ModelContext::new(app, model_id);
485 task(model, &mut ctx);
486 }),
487 on_done: Box::new(move |_model, _app, _model_id| {}),
488 },
489 );
490 
491 ModelSpawner {
492 task_sender: task_tx,
493 }
494 }
495 
496 pub fn spawn_stream_local<S, F, G>(
497 &mut self,
498 stream: S,
499 mut on_item: F,
500 on_done: G,
501 ) -> SpawnedLocalStream
502 where
503 S: 'static + crate::r#async::Stream,
504 S::Item: SpawnableOutput,
505 F: 'static + FnMut(&mut T, S::Item, &mut ModelContext<T>),
506 G: 'static + FnOnce(&mut T, &mut ModelContext<T>),
507 {
508 let (tx, rx) = futures::channel::oneshot::channel();
509 
510 let task_id = self.app.spawn_stream_local(stream, tx);
511 self.app.task_callbacks.insert(
512 task_id,
513 TaskCallback::ModelFromStream {
514 model_id: self.model_id,
515 on_item: Box::new(move |model, output, app, model_id| {
516 let model = model.downcast_mut().unwrap();
517 let output = *output.downcast().unwrap();
518 let mut ctx = ModelContext::new(app, model_id);
519 on_item(model, output, &mut ctx);
520 }),
521 on_done: Box::new(move |model, app, model_id| {
522 let model = model.downcast_mut().unwrap();
523 let mut ctx = ModelContext::new(app, model_id);
524 on_done(model, &mut ctx);
525 }),
526 },
527 );
528 
529 SpawnedLocalStream::new(
530 async move {
531 if rx.await.is_err() {
532 log::error!("sender unexpectedly dropped before receiver");
533 }
534 }
535 .boxed_local(),
536 )
537 }
538}
539 
540impl<T> std::ops::Deref for ModelContext<'_, T> {
541 type Target = AppContext;
542 
543 fn deref(&self) -> &Self::Target {
544 self.app
545 }
546}
547 
548impl<T> std::ops::DerefMut for ModelContext<'_, T> {
549 fn deref_mut(&mut self) -> &mut Self::Target {
550 self.app
551 }
552}
553 
554impl<M> ViewAsRef for ModelContext<'_, M> {
555 fn view<T: View>(&self, handle: &ViewHandle<T>) -> &T {
556 self.app.view(handle)
557 }
558 
559 fn try_view<T: View>(&self, handle: &ViewHandle<T>) -> Option<&T> {
560 self.app.try_view(handle)
561 }
562}
563 
564impl<M> ReadView for ModelContext<'_, M> {
565 fn read_view<T, F, S>(&self, handle: &ViewHandle<T>, read: F) -> S
566 where
567 T: View,
568 F: FnOnce(&T, &AppContext) -> S,
569 {
570 self.app.read_view(handle, read)
571 }
572}
573 
574impl<M> UpdateView for ModelContext<'_, M> {
575 fn update_view<T, F, S>(&mut self, handle: &ViewHandle<T>, update: F) -> S
576 where
577 T: View,
578 F: FnOnce(&mut T, &mut ViewContext<T>) -> S,
579 {
580 self.app.update_view(handle, update)
581 }
582}
583 
584impl<M> ModelAsRef for ModelContext<'_, M> {
585 fn model<T: Entity>(&self, handle: &ModelHandle<T>) -> &T {
586 self.app.model(handle)
587 }
588}
589 
590impl<M> ReadModel for ModelContext<'_, M> {
591 fn read_model<T, F, S>(&self, handle: &ModelHandle<T>, read: F) -> S
592 where
593 T: Entity,
594 F: FnOnce(&T, &AppContext) -> S,
595 {
596 self.app.read_model(handle, read)
597 }
598}
599 
600impl<M> UpdateModel for ModelContext<'_, M> {
601 fn update_model<T, F, S>(&mut self, handle: &ModelHandle<T>, update: F) -> S
602 where
603 T: Entity,
604 F: FnOnce(&mut T, &mut ModelContext<T>) -> S,
605 {
606 self.app.update_model(handle, update)
607 }
608}
609 
610impl<M> GetSingletonModelHandle for ModelContext<'_, M> {
611 fn get_singleton_model_handle<T: crate::SingletonEntity>(&self) -> ModelHandle<T> {
612 self.app.get_singleton_model_handle()
613 }
614}
615 
616/// A task which must run in the context of a model of type `M`.
617type ModelTask<M> = Box<dyn FnOnce(&mut M, &mut ModelContext<M>) + Send + 'static>;
618 
619/// A handle for spawning model tasks from background threads.
620pub struct ModelSpawner<M> {
621 task_sender: async_channel::Sender<ModelTask<M>>,
622}
623 
624impl<M> Clone for ModelSpawner<M> {
625 fn clone(&self) -> Self {
626 Self {
627 task_sender: self.task_sender.clone(),
628 }
629 }
630}
631 
632impl<M> ModelSpawner<M> {
633 /// Spawn a task that will execute on the main thread, in the context of a model.
634 pub async fn spawn<R: Send + 'static>(
635 &self,
636 work: impl FnOnce(&mut M, &mut ModelContext<M>) -> R + Send + 'static,
637 ) -> Result<R, ModelDropped> {
638 let (tx, rx) = futures::channel::oneshot::channel();
639 
640 self.task_sender
641 .send(Box::new(move |me, ctx| {
642 let result = work(me, ctx);
643 // If the background task has dropped the receiver, then we don't need to send
644 // the result, and there's no one to inform regardless.
645 let _ = tx.send(result);
646 }))
647 .await
648 .map_err(|_| ModelDropped)?;
649 
650 rx.await.map_err(|_| ModelDropped)
651 }
652}
653 
654#[cfg(test)]
655#[path = "context_test.rs"]
656mod tests;
657