Seregon/StratoSDK

StratoSDK is a framework with a declarative approach similar to Flutter/React, written and designed entirely for Rust.

Rust/27.3 KB/No license
crates/strato-ui-core/src/telemetry/event_store.rs
StratoSDK / crates / strato-ui-core / src / telemetry / event_store.rs
1use std::borrow::Cow;
2 
3use bounded_vec_deque::BoundedVecDeque;
4use chrono::{DateTime, Duration, Utc};
5use serde_json::Value;
6 
7use crate::time::get_current_time;
8 
9const MAX_BUFFER_SIZE: usize = 1024;
10 
11/// The length of time between events used to determine a 'session' boundary. In other words, if
12/// new_event occurs >SESSION_CONTINUATION_THRESHOLD_SECONDS after old_event, new_event is
13/// associated with a new session.
14pub(super) const SESSION_CONTINUATION_THRESHOLD_SECONDS: u64 = 5 * 60;
15 
16/// A data store for telemetry events. This is a thin wrapper around a [`BoundedVecDeque`] with
17/// APIs for domain-specific APIs for recording events (appending to the deque).
18pub(super) struct EventStore {
19 // Bounded for now to save memory.
20 // TODO: write to disk periodically
21 pub(super) events: BoundedVecDeque<Event>,
22 current_session_created_at: DateTime<Utc>,
23 last_event_timestamp_seen: DateTime<Utc>,
24}
25 
26#[derive(Clone, Debug)]
27pub struct Event {
28 /// The type of the event and its payload.
29 pub payload: EventPayload,
30 
31 // We are using the session creation time as the identifier for the session.
32 // Some metrics platforms (e.g. Amplitude) expect this.
33 pub session_created_at: DateTime<Utc>,
34 
35 /// The time at which the event occurred.
36 pub timestamp: DateTime<Utc>,
37 
38 /// Whether the event contains user-generated content.
39 pub contains_ugc: bool,
40}
41 
42/// Represents the type of telemetry event and its contents.
43#[derive(Clone, PartialEq, Eq, Debug)]
44pub enum EventPayload {
45 IdentifyUser {
46 user_id: String,
47 anonymous_id: String,
48 },
49 AppActive {
50 user_id: Option<String>,
51 anonymous_id: String,
52 },
53 NamedEvent {
54 user_id: Option<String>,
55 anonymous_id: String,
56 name: Cow<'static, str>,
57 value: Option<Value>,
58 },
59}
60 
61impl EventStore {
62 pub(super) fn new() -> Self {
63 let initial_timestamp = get_current_time();
64 Self {
65 events: BoundedVecDeque::new(MAX_BUFFER_SIZE),
66 current_session_created_at: initial_timestamp,
67 last_event_timestamp_seen: initial_timestamp,
68 }
69 }
70 
71 // Register a named telemetry event
72 // Create a new session if the session is stale
73 pub(super) fn record_event(
74 &mut self,
75 user_id: Option<String>,
76 anonymous_id: String,
77 name: Cow<'static, str>,
78 payload: Option<Value>,
79 contains_ugc: bool,
80 timestamp: DateTime<Utc>,
81 ) {
82 let event = self.create_event(
83 user_id,
84 anonymous_id,
85 name,
86 payload,
87 contains_ugc,
88 timestamp,
89 );
90 
91 #[cfg(feature = "log_named_telemetry_events")]
92 log::info!("Recorded telemetry event: {event:#?}");
93 
94 self.events.push_back(event);
95 }
96 
97 // Register an Identify User telemetry event
98 // Create a new session if the session is stale
99 pub(super) fn record_identify_user_event(
100 &mut self,
101 user_id: String,
102 anonymous_id: String,
103 timestamp: DateTime<Utc>,
104 ) {
105 let session_created_at = if self.is_session_stale(timestamp) {
106 self.current_session_created_at = timestamp;
107 timestamp
108 } else {
109 self.current_session_created_at
110 };
111 self.last_event_timestamp_seen = self.last_event_timestamp_seen.max(timestamp);
112 self.events.push_back(Event {
113 session_created_at,
114 payload: EventPayload::IdentifyUser {
115 user_id,
116 anonymous_id,
117 },
118 timestamp,
119 contains_ugc: false,
120 });
121 }
122 
123 // Called every time app is active
124 // If session is fresh and the last event on the queue is an App Active event, collapse them
125 // Else, it behaves like `record_event`
126 pub(super) fn record_app_active(
127 &mut self,
128 user_id: Option<String>,
129 anonymous_id: String,
130 timestamp: DateTime<Utc>,
131 ) {
132 if !self.is_session_stale(timestamp) {
133 if let Some(last_event) = self.events.back_mut().filter(|event| {
134 event.payload
135 == EventPayload::AppActive {
136 user_id: user_id.clone(),
137 anonymous_id: anonymous_id.clone(),
138 }
139 }) {
140 last_event.timestamp = timestamp;
141 self.last_event_timestamp_seen = self.last_event_timestamp_seen.max(timestamp);
142 return;
143 }
144 }
145 
146 let session_created_at = if self.is_session_stale(timestamp) {
147 self.current_session_created_at = timestamp;
148 timestamp
149 } else {
150 self.current_session_created_at
151 };
152 self.last_event_timestamp_seen = self.last_event_timestamp_seen.max(timestamp);
153 self.events.push_back(Event {
154 session_created_at,
155 payload: EventPayload::AppActive {
156 user_id,
157 anonymous_id,
158 },
159 timestamp,
160 contains_ugc: false,
161 });
162 }
163 
164 /// Returns a newly [`Event`], while also updating `Self::last_event_timestamp_seen` and
165 /// `Self::current_session_created_at`, if necessary.
166 pub(super) fn create_event(
167 &mut self,
168 user_id: Option<String>,
169 anonymous_id: String,
170 name: Cow<'static, str>,
171 payload: Option<Value>,
172 contains_ugc: bool,
173 timestamp: DateTime<Utc>,
174 ) -> Event {
175 let session_created_at = if self.is_session_stale(timestamp) {
176 self.current_session_created_at = timestamp;
177 timestamp
178 } else {
179 self.current_session_created_at
180 };
181 self.last_event_timestamp_seen = self.last_event_timestamp_seen.max(timestamp);
182 Event {
183 session_created_at,
184 payload: EventPayload::NamedEvent {
185 user_id,
186 anonymous_id,
187 name,
188 value: payload,
189 },
190 timestamp,
191 contains_ugc,
192 }
193 }
194 
195 fn is_session_stale(&self, now: DateTime<Utc>) -> bool {
196 let session_freshness_threshold = self.last_event_timestamp_seen
197 + Duration::seconds(SESSION_CONTINUATION_THRESHOLD_SECONDS as i64);
198 now > session_freshness_threshold
199 }
200}
201 
202#[cfg(test)]
203#[path = "event_store_test.rs"]
204mod tests;
205