1use std::collections::HashMap;
2
3use crate::StorageError;
4use crate::impl_store;
5
6use super::{
7 ConnectionExt,
8 db_connection::DbConnection,
9 schema::identity_updates::{self, dsl},
10};
11use derive_builder::Builder;
12use diesel::{dsl::max, prelude::*};
13
14#[derive(Insertable, Identifiable, Queryable, Debug, Clone, PartialEq, Eq, Builder)]
16#[diesel(table_name = identity_updates)]
17#[diesel(primary_key(inbox_id, sequence_id))]
18#[builder(setter(into), build_fn(error = "StorageError"))]
19pub struct StoredIdentityUpdate {
20 pub inbox_id: String,
21 pub sequence_id: i64,
22 pub server_timestamp_ns: i64,
23 pub payload: Vec<u8>,
24 pub originator_id: i32,
25}
26
27impl StoredIdentityUpdate {
28 pub fn build() -> StoredIdentityUpdateBuilder {
29 StoredIdentityUpdateBuilder::default()
30 }
31
32 pub fn new(
33 inbox_id: String,
34 sequence_id: i64,
35 server_timestamp_ns: i64,
36 payload: Vec<u8>,
37 originator_id: i32,
38 ) -> Self {
39 Self {
40 inbox_id,
41 sequence_id,
42 server_timestamp_ns,
43 payload,
44 originator_id,
45 }
46 }
47}
48
49impl_store!(StoredIdentityUpdate, identity_updates);
50
51pub trait QueryIdentityUpdates {
52 fn get_identity_updates<InboxId: AsRef<str>>(
55 &self,
56 inbox_id: InboxId,
57 from_sequence_id: Option<i64>,
58 to_sequence_id: Option<i64>,
59 ) -> Result<Vec<StoredIdentityUpdate>, crate::ConnectionError>;
60
61 fn insert_or_ignore_identity_updates(
63 &self,
64 updates: &[StoredIdentityUpdate],
65 ) -> Result<(), crate::ConnectionError>;
66
67 fn get_latest_sequence_id_for_inbox(
68 &self,
69 inbox_id: &str,
70 ) -> Result<i64, crate::ConnectionError>;
71
72 fn get_latest_sequence_id(
74 &self,
75 inbox_ids: &[&str],
76 ) -> Result<HashMap<String, i64>, crate::ConnectionError>;
77
78 fn count_inbox_updates(
80 &self,
81 inbox_ids: &[&str],
82 ) -> Result<HashMap<String, i64>, crate::ConnectionError>;
83}
84
85impl<T> QueryIdentityUpdates for &T
86where
87 T: QueryIdentityUpdates,
88{
89 fn get_identity_updates<InboxId: AsRef<str>>(
90 &self,
91 inbox_id: InboxId,
92 from_sequence_id: Option<i64>,
93 to_sequence_id: Option<i64>,
94 ) -> Result<Vec<StoredIdentityUpdate>, crate::ConnectionError> {
95 (**self).get_identity_updates(inbox_id, from_sequence_id, to_sequence_id)
96 }
97
98 fn insert_or_ignore_identity_updates(
99 &self,
100 updates: &[StoredIdentityUpdate],
101 ) -> Result<(), crate::ConnectionError> {
102 (**self).insert_or_ignore_identity_updates(updates)
103 }
104
105 fn get_latest_sequence_id_for_inbox(
106 &self,
107 inbox_id: &str,
108 ) -> Result<i64, crate::ConnectionError> {
109 (**self).get_latest_sequence_id_for_inbox(inbox_id)
110 }
111
112 fn get_latest_sequence_id(
113 &self,
114 inbox_ids: &[&str],
115 ) -> Result<HashMap<String, i64>, crate::ConnectionError> {
116 (**self).get_latest_sequence_id(inbox_ids)
117 }
118
119 fn count_inbox_updates(
120 &self,
121 inbox_ids: &[&str],
122 ) -> Result<HashMap<String, i64>, crate::ConnectionError> {
123 (**self).count_inbox_updates(inbox_ids)
124 }
125}
126
127impl<C: ConnectionExt> QueryIdentityUpdates for DbConnection<C> {
128 fn get_identity_updates<InboxId: AsRef<str>>(
131 &self,
132 inbox_id: InboxId,
133 from_sequence_id: Option<i64>,
134 to_sequence_id: Option<i64>,
135 ) -> Result<Vec<StoredIdentityUpdate>, crate::ConnectionError> {
136 let mut query = dsl::identity_updates
137 .order(dsl::sequence_id.asc())
138 .filter(dsl::inbox_id.eq(inbox_id.as_ref()))
139 .into_boxed();
140
141 if let Some(sequence_id) = from_sequence_id {
142 query = query.filter(dsl::sequence_id.gt(sequence_id));
143 }
144
145 if let Some(sequence_id) = to_sequence_id {
146 query = query.filter(dsl::sequence_id.le(sequence_id));
147 }
148
149 self.raw_query_read(|conn| query.load::<StoredIdentityUpdate>(conn))
150 }
151
152 #[tracing::instrument(level = "trace", skip(updates))]
154 fn insert_or_ignore_identity_updates(
155 &self,
156 updates: &[StoredIdentityUpdate],
157 ) -> Result<(), crate::ConnectionError> {
158 self.raw_query_write(|conn| {
159 diesel::insert_or_ignore_into(dsl::identity_updates)
160 .values(updates)
161 .execute(conn)
162 })?;
163 Ok(())
164 }
165
166 fn get_latest_sequence_id_for_inbox(
167 &self,
168 inbox_id: &str,
169 ) -> Result<i64, crate::ConnectionError> {
170 let query = dsl::identity_updates
171 .select(dsl::sequence_id)
172 .order(dsl::sequence_id.desc())
173 .limit(1)
174 .filter(dsl::inbox_id.eq(inbox_id))
175 .into_boxed();
176
177 self.raw_query_read(|conn| query.first::<i64>(conn))
178 }
179
180 #[tracing::instrument(level = "trace", skip_all)]
182 fn get_latest_sequence_id(
183 &self,
184 inbox_ids: &[&str],
185 ) -> Result<HashMap<String, i64>, crate::ConnectionError> {
186 let query = dsl::identity_updates
188 .group_by(dsl::inbox_id)
189 .select((dsl::inbox_id, max(dsl::sequence_id)))
190 .filter(dsl::inbox_id.eq_any(inbox_ids));
191
192 let result_tuples: Vec<(String, i64)> = self
194 .raw_query_read(|conn| query.load::<(String, Option<i64>)>(conn))?
195 .into_iter()
196 .filter_map(|(inbox_id, sequence_id_opt)| {
199 sequence_id_opt.map(|sequence_id| (inbox_id, sequence_id))
200 })
201 .collect();
202
203 Ok(HashMap::from_iter(result_tuples))
205 }
206
207 fn count_inbox_updates(
208 &self,
209 inbox_ids: &[&str],
210 ) -> Result<HashMap<String, i64>, crate::ConnectionError> {
211 use diesel::dsl::count_star;
212 let query = dsl::identity_updates
213 .group_by(dsl::inbox_id)
214 .select((dsl::inbox_id, count_star()))
215 .filter(dsl::inbox_id.eq_any(inbox_ids));
216 self.raw_query_read(|conn| {
217 query
218 .load_iter::<(String, i64), _>(conn)?
219 .collect::<Result<HashMap<_, _>, _>>()
220 })
221 }
222}
223
224#[cfg(test)]
225pub(crate) mod tests {
226 #[cfg(target_arch = "wasm32")]
227 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_dedicated_worker);
228
229 use crate::{Store, test_utils::with_connection};
230 use xmtp_common::{rand_time, rand_vec};
231
232 use super::*;
233
234 fn build_update(inbox_id: &str, sequence_id: i64) -> StoredIdentityUpdate {
235 StoredIdentityUpdate::new(
236 inbox_id.to_string(),
237 sequence_id,
238 rand_time(),
239 rand_vec::<24>(),
240 1,
241 )
242 }
243
244 #[xmtp_common::test]
245 fn insert_and_read() {
246 with_connection(|conn| {
247 let inbox_id = "inbox_1";
248 let update_1 = build_update(inbox_id, 1);
249 let update_1_payload = update_1.payload.clone();
250 let update_2 = build_update(inbox_id, 2);
251 let update_2_payload = update_2.payload.clone();
252
253 update_1.store(conn).expect("should store without error");
254 update_2.store(conn).expect("should store without error");
255
256 let all_updates = conn
257 .get_identity_updates(inbox_id, None, None)
258 .expect("query should work");
259
260 assert_eq!(all_updates.len(), 2);
261 let first_update = all_updates.first().unwrap();
262 assert_eq!(first_update.payload, update_1_payload);
263 let second_update = all_updates.last().unwrap();
264 assert_eq!(second_update.payload, update_2_payload);
265 })
266 }
267
268 #[xmtp_common::test]
269 fn test_filter() {
270 with_connection(|conn| {
271 let inbox_id = "inbox_1";
272 let update_1 = build_update(inbox_id, 1);
273 let update_2 = build_update(inbox_id, 2);
274 let update_3 = build_update(inbox_id, 3);
275
276 conn.insert_or_ignore_identity_updates(&[update_1, update_2, update_3])
277 .expect("insert should succeed");
278
279 let update_1_and_2 = conn
280 .get_identity_updates(inbox_id, None, Some(2))
281 .expect("query should work");
282
283 assert_eq!(update_1_and_2.len(), 2);
284
285 let all_updates = conn
286 .get_identity_updates(inbox_id, None, None)
287 .expect("query should work");
288
289 assert_eq!(all_updates.len(), 3);
290
291 let only_update_2 = conn
292 .get_identity_updates(inbox_id, Some(1), Some(2))
293 .expect("query should work");
294
295 assert_eq!(only_update_2.len(), 1);
296 assert_eq!(only_update_2[0].sequence_id, 2);
297 })
298 }
299
300 #[xmtp_common::test]
301 fn test_get_latest_sequence_id() {
302 with_connection(|conn| {
303 let inbox_1 = "inbox_1";
304 let inbox_2 = "inbox_2";
305 let update_1 = build_update(inbox_1, 1);
306 let update_2 = build_update(inbox_1, 3);
307 let update_3 = build_update(inbox_2, 5);
308 let update_4 = build_update(inbox_2, 6);
309
310 conn.insert_or_ignore_identity_updates(&[update_1, update_2, update_3, update_4])
311 .expect("insert should succeed");
312
313 let latest_sequence_ids = conn
314 .get_latest_sequence_id(&[inbox_1, inbox_2])
315 .expect("query should work");
316
317 assert_eq!(latest_sequence_ids.get(inbox_1), Some(&3));
318 assert_eq!(latest_sequence_ids.get(inbox_2), Some(&6));
319
320 let latest_sequence_ids_with_missing_member = conn
321 .get_latest_sequence_id(&[inbox_1, "missing_inbox"])
322 .expect("should still succeed");
323
324 assert_eq!(
325 latest_sequence_ids_with_missing_member.get(inbox_1),
326 Some(&3)
327 );
328 assert_eq!(
329 latest_sequence_ids_with_missing_member.get("missing_inbox"),
330 None
331 );
332 })
333 }
334
335 #[xmtp_common::test]
336 fn get_single_sequence_id() {
337 with_connection(|conn| {
338 let inbox_id = "inbox_1";
339 let update = build_update(inbox_id, 1);
340 let update_2 = build_update(inbox_id, 2);
341 update.store(conn).expect("should store without error");
342 update_2.store(conn).expect("should store without error");
343
344 let sequence_id = conn
345 .get_latest_sequence_id_for_inbox(inbox_id)
346 .expect("query should work");
347 assert_eq!(sequence_id, 2);
348 })
349 }
350
351 #[xmtp_common::test]
352 fn test_count_inbox_updates() {
353 with_connection(|conn| {
354 let inbox_1 = "inbox_1";
355 let inbox_2 = "inbox_2";
356 conn.insert_or_ignore_identity_updates(&[
357 build_update(inbox_1, 1),
358 build_update(inbox_1, 2),
359 build_update(inbox_2, 1),
360 ])
361 .unwrap();
362 let counts = conn
363 .count_inbox_updates(&[inbox_1, inbox_2, "missing"])
364 .unwrap();
365 assert_eq!(counts.get(inbox_1), Some(&2));
366 assert_eq!(counts.get(inbox_2), Some(&1));
367 assert_eq!(counts.get("missing"), None);
368 })
369 }
370}