xmtp_db/encrypted_store/
identity_update.rs

1use std::collections::HashMap;
2
3use crate::StorageError;
4use crate::impl_store;
5
6use super::{
7    ConnectionExt,
8    db_connection::DbConnection,
9    schema::identity_updates::{self, dsl},
10};
11use derive_builder::Builder;
12use diesel::{dsl::max, prelude::*};
13
14/// StoredIdentityUpdate holds a serialized IdentityUpdate record
15#[derive(Insertable, Identifiable, Queryable, Debug, Clone, PartialEq, Eq, Builder)]
16#[diesel(table_name = identity_updates)]
17#[diesel(primary_key(inbox_id, sequence_id))]
18#[builder(setter(into), build_fn(error = "StorageError"))]
19pub struct StoredIdentityUpdate {
20    pub inbox_id: String,
21    pub sequence_id: i64,
22    pub server_timestamp_ns: i64,
23    pub payload: Vec<u8>,
24    pub originator_id: i32,
25}
26
27impl StoredIdentityUpdate {
28    pub fn build() -> StoredIdentityUpdateBuilder {
29        StoredIdentityUpdateBuilder::default()
30    }
31
32    pub fn new(
33        inbox_id: String,
34        sequence_id: i64,
35        server_timestamp_ns: i64,
36        payload: Vec<u8>,
37        originator_id: i32,
38    ) -> Self {
39        Self {
40            inbox_id,
41            sequence_id,
42            server_timestamp_ns,
43            payload,
44            originator_id,
45        }
46    }
47}
48
49impl_store!(StoredIdentityUpdate, identity_updates);
50
51pub trait QueryIdentityUpdates {
52    /// Returns all identity updates for the given inbox ID up to the provided sequence_id.
53    /// Returns updates greater than `from_sequence_id` and less than _or equal to_ `to_sequence_id`
54    fn get_identity_updates<InboxId: AsRef<str>>(
55        &self,
56        inbox_id: InboxId,
57        from_sequence_id: Option<i64>,
58        to_sequence_id: Option<i64>,
59    ) -> Result<Vec<StoredIdentityUpdate>, crate::ConnectionError>;
60
61    /// Batch insert identity updates, ignoring duplicates.
62    fn insert_or_ignore_identity_updates(
63        &self,
64        updates: &[StoredIdentityUpdate],
65    ) -> Result<(), crate::ConnectionError>;
66
67    fn get_latest_sequence_id_for_inbox(
68        &self,
69        inbox_id: &str,
70    ) -> Result<i64, crate::ConnectionError>;
71
72    /// Given a list of inbox_ids return a HashMap of each inbox ID -> highest known sequence ID
73    fn get_latest_sequence_id(
74        &self,
75        inbox_ids: &[&str],
76    ) -> Result<HashMap<String, i64>, crate::ConnectionError>;
77
78    /// Returns the count of identity updates for inbox_ids
79    fn count_inbox_updates(
80        &self,
81        inbox_ids: &[&str],
82    ) -> Result<HashMap<String, i64>, crate::ConnectionError>;
83}
84
85impl<T> QueryIdentityUpdates for &T
86where
87    T: QueryIdentityUpdates,
88{
89    fn get_identity_updates<InboxId: AsRef<str>>(
90        &self,
91        inbox_id: InboxId,
92        from_sequence_id: Option<i64>,
93        to_sequence_id: Option<i64>,
94    ) -> Result<Vec<StoredIdentityUpdate>, crate::ConnectionError> {
95        (**self).get_identity_updates(inbox_id, from_sequence_id, to_sequence_id)
96    }
97
98    fn insert_or_ignore_identity_updates(
99        &self,
100        updates: &[StoredIdentityUpdate],
101    ) -> Result<(), crate::ConnectionError> {
102        (**self).insert_or_ignore_identity_updates(updates)
103    }
104
105    fn get_latest_sequence_id_for_inbox(
106        &self,
107        inbox_id: &str,
108    ) -> Result<i64, crate::ConnectionError> {
109        (**self).get_latest_sequence_id_for_inbox(inbox_id)
110    }
111
112    fn get_latest_sequence_id(
113        &self,
114        inbox_ids: &[&str],
115    ) -> Result<HashMap<String, i64>, crate::ConnectionError> {
116        (**self).get_latest_sequence_id(inbox_ids)
117    }
118
119    fn count_inbox_updates(
120        &self,
121        inbox_ids: &[&str],
122    ) -> Result<HashMap<String, i64>, crate::ConnectionError> {
123        (**self).count_inbox_updates(inbox_ids)
124    }
125}
126
127impl<C: ConnectionExt> QueryIdentityUpdates for DbConnection<C> {
128    /// Returns all identity updates for the given inbox ID up to the provided sequence_id.
129    /// Returns updates greater than `from_sequence_id` and less than _or equal to_ `to_sequence_id`
130    fn get_identity_updates<InboxId: AsRef<str>>(
131        &self,
132        inbox_id: InboxId,
133        from_sequence_id: Option<i64>,
134        to_sequence_id: Option<i64>,
135    ) -> Result<Vec<StoredIdentityUpdate>, crate::ConnectionError> {
136        let mut query = dsl::identity_updates
137            .order(dsl::sequence_id.asc())
138            .filter(dsl::inbox_id.eq(inbox_id.as_ref()))
139            .into_boxed();
140
141        if let Some(sequence_id) = from_sequence_id {
142            query = query.filter(dsl::sequence_id.gt(sequence_id));
143        }
144
145        if let Some(sequence_id) = to_sequence_id {
146            query = query.filter(dsl::sequence_id.le(sequence_id));
147        }
148
149        self.raw_query_read(|conn| query.load::<StoredIdentityUpdate>(conn))
150    }
151
152    /// Batch insert identity updates, ignoring duplicates.
153    #[tracing::instrument(level = "trace", skip(updates))]
154    fn insert_or_ignore_identity_updates(
155        &self,
156        updates: &[StoredIdentityUpdate],
157    ) -> Result<(), crate::ConnectionError> {
158        self.raw_query_write(|conn| {
159            diesel::insert_or_ignore_into(dsl::identity_updates)
160                .values(updates)
161                .execute(conn)
162        })?;
163        Ok(())
164    }
165
166    fn get_latest_sequence_id_for_inbox(
167        &self,
168        inbox_id: &str,
169    ) -> Result<i64, crate::ConnectionError> {
170        let query = dsl::identity_updates
171            .select(dsl::sequence_id)
172            .order(dsl::sequence_id.desc())
173            .limit(1)
174            .filter(dsl::inbox_id.eq(inbox_id))
175            .into_boxed();
176
177        self.raw_query_read(|conn| query.first::<i64>(conn))
178    }
179
180    /// Given a list of inbox_ids return a HashMap of each inbox ID -> highest known sequence ID
181    #[tracing::instrument(level = "trace", skip_all)]
182    fn get_latest_sequence_id(
183        &self,
184        inbox_ids: &[&str],
185    ) -> Result<HashMap<String, i64>, crate::ConnectionError> {
186        // Query IdentityUpdates grouped by inbox_id, getting the max sequence_id
187        let query = dsl::identity_updates
188            .group_by(dsl::inbox_id)
189            .select((dsl::inbox_id, max(dsl::sequence_id)))
190            .filter(dsl::inbox_id.eq_any(inbox_ids));
191
192        // Get the results as a Vec of (inbox_id, sequence_id) tuples
193        let result_tuples: Vec<(String, i64)> = self
194            .raw_query_read(|conn| query.load::<(String, Option<i64>)>(conn))?
195            .into_iter()
196            // Diesel needs an Option type for aggregations like max(sequence_id), so we
197            // unwrap the option here
198            .filter_map(|(inbox_id, sequence_id_opt)| {
199                sequence_id_opt.map(|sequence_id| (inbox_id, sequence_id))
200            })
201            .collect();
202
203        // Convert the Vec to a HashMap
204        Ok(HashMap::from_iter(result_tuples))
205    }
206
207    fn count_inbox_updates(
208        &self,
209        inbox_ids: &[&str],
210    ) -> Result<HashMap<String, i64>, crate::ConnectionError> {
211        use diesel::dsl::count_star;
212        let query = dsl::identity_updates
213            .group_by(dsl::inbox_id)
214            .select((dsl::inbox_id, count_star()))
215            .filter(dsl::inbox_id.eq_any(inbox_ids));
216        self.raw_query_read(|conn| {
217            query
218                .load_iter::<(String, i64), _>(conn)?
219                .collect::<Result<HashMap<_, _>, _>>()
220        })
221    }
222}
223
224#[cfg(test)]
225pub(crate) mod tests {
226    #[cfg(target_arch = "wasm32")]
227    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_dedicated_worker);
228
229    use crate::{Store, test_utils::with_connection};
230    use xmtp_common::{rand_time, rand_vec};
231
232    use super::*;
233
234    fn build_update(inbox_id: &str, sequence_id: i64) -> StoredIdentityUpdate {
235        StoredIdentityUpdate::new(
236            inbox_id.to_string(),
237            sequence_id,
238            rand_time(),
239            rand_vec::<24>(),
240            1,
241        )
242    }
243
244    #[xmtp_common::test]
245    fn insert_and_read() {
246        with_connection(|conn| {
247            let inbox_id = "inbox_1";
248            let update_1 = build_update(inbox_id, 1);
249            let update_1_payload = update_1.payload.clone();
250            let update_2 = build_update(inbox_id, 2);
251            let update_2_payload = update_2.payload.clone();
252
253            update_1.store(conn).expect("should store without error");
254            update_2.store(conn).expect("should store without error");
255
256            let all_updates = conn
257                .get_identity_updates(inbox_id, None, None)
258                .expect("query should work");
259
260            assert_eq!(all_updates.len(), 2);
261            let first_update = all_updates.first().unwrap();
262            assert_eq!(first_update.payload, update_1_payload);
263            let second_update = all_updates.last().unwrap();
264            assert_eq!(second_update.payload, update_2_payload);
265        })
266    }
267
268    #[xmtp_common::test]
269    fn test_filter() {
270        with_connection(|conn| {
271            let inbox_id = "inbox_1";
272            let update_1 = build_update(inbox_id, 1);
273            let update_2 = build_update(inbox_id, 2);
274            let update_3 = build_update(inbox_id, 3);
275
276            conn.insert_or_ignore_identity_updates(&[update_1, update_2, update_3])
277                .expect("insert should succeed");
278
279            let update_1_and_2 = conn
280                .get_identity_updates(inbox_id, None, Some(2))
281                .expect("query should work");
282
283            assert_eq!(update_1_and_2.len(), 2);
284
285            let all_updates = conn
286                .get_identity_updates(inbox_id, None, None)
287                .expect("query should work");
288
289            assert_eq!(all_updates.len(), 3);
290
291            let only_update_2 = conn
292                .get_identity_updates(inbox_id, Some(1), Some(2))
293                .expect("query should work");
294
295            assert_eq!(only_update_2.len(), 1);
296            assert_eq!(only_update_2[0].sequence_id, 2);
297        })
298    }
299
300    #[xmtp_common::test]
301    fn test_get_latest_sequence_id() {
302        with_connection(|conn| {
303            let inbox_1 = "inbox_1";
304            let inbox_2 = "inbox_2";
305            let update_1 = build_update(inbox_1, 1);
306            let update_2 = build_update(inbox_1, 3);
307            let update_3 = build_update(inbox_2, 5);
308            let update_4 = build_update(inbox_2, 6);
309
310            conn.insert_or_ignore_identity_updates(&[update_1, update_2, update_3, update_4])
311                .expect("insert should succeed");
312
313            let latest_sequence_ids = conn
314                .get_latest_sequence_id(&[inbox_1, inbox_2])
315                .expect("query should work");
316
317            assert_eq!(latest_sequence_ids.get(inbox_1), Some(&3));
318            assert_eq!(latest_sequence_ids.get(inbox_2), Some(&6));
319
320            let latest_sequence_ids_with_missing_member = conn
321                .get_latest_sequence_id(&[inbox_1, "missing_inbox"])
322                .expect("should still succeed");
323
324            assert_eq!(
325                latest_sequence_ids_with_missing_member.get(inbox_1),
326                Some(&3)
327            );
328            assert_eq!(
329                latest_sequence_ids_with_missing_member.get("missing_inbox"),
330                None
331            );
332        })
333    }
334
335    #[xmtp_common::test]
336    fn get_single_sequence_id() {
337        with_connection(|conn| {
338            let inbox_id = "inbox_1";
339            let update = build_update(inbox_id, 1);
340            let update_2 = build_update(inbox_id, 2);
341            update.store(conn).expect("should store without error");
342            update_2.store(conn).expect("should store without error");
343
344            let sequence_id = conn
345                .get_latest_sequence_id_for_inbox(inbox_id)
346                .expect("query should work");
347            assert_eq!(sequence_id, 2);
348        })
349    }
350
351    #[xmtp_common::test]
352    fn test_count_inbox_updates() {
353        with_connection(|conn| {
354            let inbox_1 = "inbox_1";
355            let inbox_2 = "inbox_2";
356            conn.insert_or_ignore_identity_updates(&[
357                build_update(inbox_1, 1),
358                build_update(inbox_1, 2),
359                build_update(inbox_2, 1),
360            ])
361            .unwrap();
362            let counts = conn
363                .count_inbox_updates(&[inbox_1, inbox_2, "missing"])
364                .unwrap();
365            assert_eq!(counts.get(inbox_1), Some(&2));
366            assert_eq!(counts.get(inbox_2), Some(&1));
367            assert_eq!(counts.get("missing"), None);
368        })
369    }
370}