StratoSDK is a framework with a declarative approach similar to Flutter/React, written and designed entirely for Rust.
| 1 | use std::borrow::Cow; |
| 2 | |
| 3 | use bounded_vec_deque::BoundedVecDeque; |
| 4 | use chrono::{DateTime, Duration, Utc}; |
| 5 | use serde_json::Value; |
| 6 | |
| 7 | use crate::time::get_current_time; |
| 8 | |
| 9 | const MAX_BUFFER_SIZE: usize = 1024; |
| 10 | |
| 11 | /// The length of time between events used to determine a 'session' boundary. In other words, if |
| 12 | /// new_event occurs >SESSION_CONTINUATION_THRESHOLD_SECONDS after old_event, new_event is |
| 13 | /// associated with a new session. |
| 14 | pub(super) const SESSION_CONTINUATION_THRESHOLD_SECONDS: u64 = 5 * 60; |
| 15 | |
| 16 | /// A data store for telemetry events. This is a thin wrapper around a [`BoundedVecDeque`] with |
| 17 | /// APIs for domain-specific APIs for recording events (appending to the deque). |
| 18 | pub(super) struct EventStore { |
| 19 | // Bounded for now to save memory. |
| 20 | // TODO: write to disk periodically |
| 21 | pub(super) events: BoundedVecDeque<Event>, |
| 22 | current_session_created_at: DateTime<Utc>, |
| 23 | last_event_timestamp_seen: DateTime<Utc>, |
| 24 | } |
| 25 | |
| 26 | #[derive(Clone, Debug)] |
| 27 | pub struct Event { |
| 28 | /// The type of the event and its payload. |
| 29 | pub payload: EventPayload, |
| 30 | |
| 31 | // We are using the session creation time as the identifier for the session. |
| 32 | // Some metrics platforms (e.g. Amplitude) expect this. |
| 33 | pub session_created_at: DateTime<Utc>, |
| 34 | |
| 35 | /// The time at which the event occurred. |
| 36 | pub timestamp: DateTime<Utc>, |
| 37 | |
| 38 | /// Whether the event contains user-generated content. |
| 39 | pub contains_ugc: bool, |
| 40 | } |
| 41 | |
| 42 | /// Represents the type of telemetry event and its contents. |
| 43 | #[derive(Clone, PartialEq, Eq, Debug)] |
| 44 | pub enum EventPayload { |
| 45 | IdentifyUser { |
| 46 | user_id: String, |
| 47 | anonymous_id: String, |
| 48 | }, |
| 49 | AppActive { |
| 50 | user_id: Option<String>, |
| 51 | anonymous_id: String, |
| 52 | }, |
| 53 | NamedEvent { |
| 54 | user_id: Option<String>, |
| 55 | anonymous_id: String, |
| 56 | name: Cow<'static, str>, |
| 57 | value: Option<Value>, |
| 58 | }, |
| 59 | } |
| 60 | |
| 61 | impl EventStore { |
| 62 | pub(super) fn new() -> Self { |
| 63 | let initial_timestamp = get_current_time(); |
| 64 | Self { |
| 65 | events: BoundedVecDeque::new(MAX_BUFFER_SIZE), |
| 66 | current_session_created_at: initial_timestamp, |
| 67 | last_event_timestamp_seen: initial_timestamp, |
| 68 | } |
| 69 | } |
| 70 | |
| 71 | // Register a named telemetry event |
| 72 | // Create a new session if the session is stale |
| 73 | pub(super) fn record_event( |
| 74 | &mut self, |
| 75 | user_id: Option<String>, |
| 76 | anonymous_id: String, |
| 77 | name: Cow<'static, str>, |
| 78 | payload: Option<Value>, |
| 79 | contains_ugc: bool, |
| 80 | timestamp: DateTime<Utc>, |
| 81 | ) { |
| 82 | let event = self.create_event( |
| 83 | user_id, |
| 84 | anonymous_id, |
| 85 | name, |
| 86 | payload, |
| 87 | contains_ugc, |
| 88 | timestamp, |
| 89 | ); |
| 90 | |
| 91 | #[cfg(feature = "log_named_telemetry_events")] |
| 92 | log::info!("Recorded telemetry event: {event:#?}"); |
| 93 | |
| 94 | self.events.push_back(event); |
| 95 | } |
| 96 | |
| 97 | // Register an Identify User telemetry event |
| 98 | // Create a new session if the session is stale |
| 99 | pub(super) fn record_identify_user_event( |
| 100 | &mut self, |
| 101 | user_id: String, |
| 102 | anonymous_id: String, |
| 103 | timestamp: DateTime<Utc>, |
| 104 | ) { |
| 105 | let session_created_at = if self.is_session_stale(timestamp) { |
| 106 | self.current_session_created_at = timestamp; |
| 107 | timestamp |
| 108 | } else { |
| 109 | self.current_session_created_at |
| 110 | }; |
| 111 | self.last_event_timestamp_seen = self.last_event_timestamp_seen.max(timestamp); |
| 112 | self.events.push_back(Event { |
| 113 | session_created_at, |
| 114 | payload: EventPayload::IdentifyUser { |
| 115 | user_id, |
| 116 | anonymous_id, |
| 117 | }, |
| 118 | timestamp, |
| 119 | contains_ugc: false, |
| 120 | }); |
| 121 | } |
| 122 | |
| 123 | // Called every time app is active |
| 124 | // If session is fresh and the last event on the queue is an App Active event, collapse them |
| 125 | // Else, it behaves like `record_event` |
| 126 | pub(super) fn record_app_active( |
| 127 | &mut self, |
| 128 | user_id: Option<String>, |
| 129 | anonymous_id: String, |
| 130 | timestamp: DateTime<Utc>, |
| 131 | ) { |
| 132 | if !self.is_session_stale(timestamp) { |
| 133 | if let Some(last_event) = self.events.back_mut().filter(|event| { |
| 134 | event.payload |
| 135 | == EventPayload::AppActive { |
| 136 | user_id: user_id.clone(), |
| 137 | anonymous_id: anonymous_id.clone(), |
| 138 | } |
| 139 | }) { |
| 140 | last_event.timestamp = timestamp; |
| 141 | self.last_event_timestamp_seen = self.last_event_timestamp_seen.max(timestamp); |
| 142 | return; |
| 143 | } |
| 144 | } |
| 145 | |
| 146 | let session_created_at = if self.is_session_stale(timestamp) { |
| 147 | self.current_session_created_at = timestamp; |
| 148 | timestamp |
| 149 | } else { |
| 150 | self.current_session_created_at |
| 151 | }; |
| 152 | self.last_event_timestamp_seen = self.last_event_timestamp_seen.max(timestamp); |
| 153 | self.events.push_back(Event { |
| 154 | session_created_at, |
| 155 | payload: EventPayload::AppActive { |
| 156 | user_id, |
| 157 | anonymous_id, |
| 158 | }, |
| 159 | timestamp, |
| 160 | contains_ugc: false, |
| 161 | }); |
| 162 | } |
| 163 | |
| 164 | /// Returns a newly [`Event`], while also updating `Self::last_event_timestamp_seen` and |
| 165 | /// `Self::current_session_created_at`, if necessary. |
| 166 | pub(super) fn create_event( |
| 167 | &mut self, |
| 168 | user_id: Option<String>, |
| 169 | anonymous_id: String, |
| 170 | name: Cow<'static, str>, |
| 171 | payload: Option<Value>, |
| 172 | contains_ugc: bool, |
| 173 | timestamp: DateTime<Utc>, |
| 174 | ) -> Event { |
| 175 | let session_created_at = if self.is_session_stale(timestamp) { |
| 176 | self.current_session_created_at = timestamp; |
| 177 | timestamp |
| 178 | } else { |
| 179 | self.current_session_created_at |
| 180 | }; |
| 181 | self.last_event_timestamp_seen = self.last_event_timestamp_seen.max(timestamp); |
| 182 | Event { |
| 183 | session_created_at, |
| 184 | payload: EventPayload::NamedEvent { |
| 185 | user_id, |
| 186 | anonymous_id, |
| 187 | name, |
| 188 | value: payload, |
| 189 | }, |
| 190 | timestamp, |
| 191 | contains_ugc, |
| 192 | } |
| 193 | } |
| 194 | |
| 195 | fn is_session_stale(&self, now: DateTime<Utc>) -> bool { |
| 196 | let session_freshness_threshold = self.last_event_timestamp_seen |
| 197 | + Duration::seconds(SESSION_CONTINUATION_THRESHOLD_SECONDS as i64); |
| 198 | now > session_freshness_threshold |
| 199 | } |
| 200 | } |
| 201 | |
| 202 | #[cfg(test)] |
| 203 | #[path = "event_store_test.rs"] |
| 204 | mod tests; |
| 205 |