xmtp_db/encrypted_store/
icebox.rs

1use super::{ConnectionExt, db_connection::DbConnection};
2use crate::icebox::types::{IceboxOrphans, IceboxWithDep};
3use crate::schema::icebox::dsl;
4use crate::schema::icebox_dependencies;
5use crate::{impl_store, schema::icebox};
6use diesel::prelude::*;
7use itertools::Itertools;
8use serde::{Deserialize, Serialize};
9use xmtp_proto::types::{
10    Cursor, OriginatorId, OrphanedEnvelope, OrphanedEnvelopeBuilder, SequenceId,
11};
12
13mod types;
14
15#[derive(
16    Debug,
17    Clone,
18    Serialize,
19    Deserialize,
20    Insertable,
21    Identifiable,
22    Queryable,
23    Eq,
24    PartialEq,
25    QueryableByName,
26)]
27#[diesel(table_name = icebox)]
28#[diesel(primary_key(originator_id, sequence_id))]
29#[diesel(belongs_to(crate::group::StoredGroup, foreign_key = group_id))]
30pub struct Icebox {
31    pub originator_id: i64,
32    pub sequence_id: i64,
33    pub group_id: Vec<u8>,
34    pub envelope_payload: Vec<u8>,
35}
36
37impl_store!(Icebox, icebox);
38
39#[derive(
40    Debug,
41    Clone,
42    Serialize,
43    Deserialize,
44    Insertable,
45    Identifiable,
46    Queryable,
47    Eq,
48    PartialEq,
49    QueryableByName,
50)]
51#[diesel(table_name = icebox_dependencies)]
52#[diesel(primary_key(
53    envelope_originator_id,
54    envelope_sequence_id,
55    dependency_originator_id,
56    dependency_sequence_id
57))]
58pub struct IceboxDependency {
59    pub envelope_originator_id: i64,
60    pub envelope_sequence_id: i64,
61    pub dependency_originator_id: i64,
62    pub dependency_sequence_id: i64,
63}
64
65impl_store!(IceboxDependency, icebox_dependencies);
66
67pub trait QueryIcebox {
68    /// Returns the envelopes (if they exist) plus all their dependencies, and
69    /// dependencies of dependencies, along with each envelope's own dependencies.
70    /// This could be useful for resolving issues where a commit that could have been
71    /// processed, was accidentally committed to the icebox.
72    /// Generally, if an envelope has a dependency on something in the icebox already
73    /// it means its dependency could not be processed, so it must also be iceboxed.
74    fn past_dependents(
75        &self,
76        cursors: &[Cursor],
77    ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError>;
78
79    /// Returns envelopes that depend on any of the specified cursors,
80    /// along with each envelope's own dependencies.
81    /// Does not return the cursors themselves, if they exist in the chain.
82    fn future_dependents(
83        &self,
84        cursors: &[Cursor],
85    ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError>;
86
87    /// cache the orphans until its parent(s) may be found.
88    fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<usize, crate::ConnectionError>;
89
90    /// Removes icebox entries that have been processed according to refresh_state.
91    /// Deletes entries where the refresh_state cursor for the group is at or beyond
92    /// the icebox entry's sequence_id, indicating the envelope has been processed.
93    fn prune_icebox(&self) -> Result<usize, crate::ConnectionError>;
94}
95
96impl<T> QueryIcebox for &T
97where
98    T: QueryIcebox,
99{
100    fn past_dependents(
101        &self,
102        cursors: &[Cursor],
103    ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError> {
104        (**self).past_dependents(cursors)
105    }
106
107    fn future_dependents(
108        &self,
109        cursors: &[Cursor],
110    ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError> {
111        (**self).future_dependents(cursors)
112    }
113
114    fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<usize, crate::ConnectionError> {
115        (**self).ice(orphans)
116    }
117
118    fn prune_icebox(&self) -> Result<usize, crate::ConnectionError> {
119        (**self).prune_icebox()
120    }
121}
122
123impl<C: ConnectionExt> DbConnection<C> {
124    fn do_icebox_query(
125        &self,
126        query_str: String,
127    ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError> {
128        self.raw_query_read(|conn| {
129            diesel::sql_query(query_str)
130                .load_iter::<IceboxWithDep, _>(conn)?
131                .process_results(|iter| {
132                    // since we're using load_iter
133                    // to optimize, we load a *const [u8] into `IceboxWithDep` for group_id and
134                    // envelope_payload, cloning it only once in `fold_with`.
135                    // as long as we are in the scope of `load_iter` (attached to the lifetime of
136                    // `conn` or `&mut SqliteConnection` within `raw_query_read`) the lifetime of group_id and
137                    // envelope_payload is safe.
138                    // the other raw pointers are safe as long as they aren't accessed once
139                    // iteration ends, which is guaranteed by the end of grouping operation and
140                    // conversion to `OrphanedEnvelope` type.
141                    // diesel `Vec<u8>` deserialization implementation for reference:
142                    // https://github.com/diesel-rs/diesel/blob/0abaf1b3f2ed24ac5643227baf841da9a63d9f1f/diesel/src/type_impls/primitives.rs#L164
143                    iter.into_grouping_map_by(|row| (row.originator_id, row.sequence_id))
144                        .fold_with(
145                            |_key, row| {
146                                let mut builder = OrphanedEnvelopeBuilder::default();
147                                // safe b/c we are within the lifetime of `row_iter`
148                                // so the slice in sqlites memory still exists
149                                // and is immediately copied to a `Vec<u8>`.
150                                let group_id = unsafe { row.group_id() };
151                                let payload = unsafe { row.envelope_payload() };
152                                builder
153                                    .cursor(Cursor::new(
154                                        row.sequence_id as SequenceId,
155                                        row.originator_id as OriginatorId,
156                                    ))
157                                    .payload(payload)
158                                    .group_id(group_id);
159                                builder
160                            },
161                            |mut acc, _key, row| {
162                                acc.depending_on(Cursor::new(
163                                    row.dependency_sequence_id as SequenceId,
164                                    row.dependency_originator_id as OriginatorId,
165                                ));
166                                acc
167                            },
168                        )
169                        .into_values()
170                        .map(|v| v.build())
171                        .try_collect()
172                        .map_err(|e| diesel::result::Error::DeserializationError(Box::new(e) as _))
173                })?
174        })
175    }
176}
177
178impl<C: ConnectionExt> QueryIcebox for DbConnection<C> {
179    fn past_dependents(
180        &self,
181        cursors: &[Cursor],
182    ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError> {
183        if cursors.is_empty() {
184            return Ok(Vec::new());
185        }
186
187        let values_clause = cursors
188            .iter()
189            .map(|c| format!("({}, {})", c.originator_id, c.sequence_id))
190            .join(", ");
191
192        let query_str = format!(
193            r#"
194            WITH RECURSIVE
195            start_cursors(originator_id, sequence_id) AS (
196                VALUES {}
197            ),
198            dependency_chain AS (
199                -- Base case: Start with the specified envelopes if they exist
200                SELECT i.originator_id, i.sequence_id, i.group_id, i.envelope_payload
201                FROM icebox i
202                JOIN start_cursors sc ON i.originator_id = sc.originator_id
203                                      AND i.sequence_id = sc.sequence_id
204
205                UNION
206
207                -- OR start with their immediate dependencies if they don't
208                SELECT i.originator_id, i.sequence_id, i.group_id, i.envelope_payload
209                FROM icebox i
210                JOIN icebox_dependencies d ON i.originator_id = d.dependency_originator_id
211                                           AND i.sequence_id = d.dependency_sequence_id
212                JOIN start_cursors sc ON d.envelope_originator_id = sc.originator_id
213                                      AND d.envelope_sequence_id = sc.sequence_id
214
215                UNION ALL
216
217                -- Recursive case: Continue traversing the dependency chain
218                SELECT i.originator_id, i.sequence_id, i.group_id, i.envelope_payload
219                FROM icebox i
220                JOIN icebox_dependencies d ON i.originator_id = d.dependency_originator_id
221                                           AND i.sequence_id = d.dependency_sequence_id
222                JOIN dependency_chain dc ON d.envelope_originator_id = dc.originator_id
223                                         AND d.envelope_sequence_id = dc.sequence_id
224            )
225            SELECT
226                dc.originator_id,
227                dc.sequence_id,
228                dc.group_id,
229                dc.envelope_payload,
230                d.dependency_originator_id,
231                d.dependency_sequence_id
232            FROM (SELECT DISTINCT * FROM dependency_chain) dc
233            INNER JOIN icebox_dependencies d
234                ON dc.originator_id = d.envelope_originator_id
235                AND dc.sequence_id = d.envelope_sequence_id
236            ORDER BY dc.originator_id DESC, dc.sequence_id DESC
237            "#,
238            values_clause
239        );
240
241        self.do_icebox_query(query_str)
242    }
243
244    fn future_dependents(
245        &self,
246        cursors: &[Cursor],
247    ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError> {
248        if cursors.is_empty() {
249            return Ok(Vec::new());
250        }
251
252        // Build the VALUES clause with actual values (safe since they're i64)
253        let values_clause = cursors
254            .iter()
255            .map(|c| format!("({}, {})", c.originator_id, c.sequence_id))
256            .join(", ");
257
258        let query_str = format!(
259            r#"
260            WITH RECURSIVE
261            start_cursors(originator_id, sequence_id) AS (
262                VALUES {}
263            ),
264            dependency_chain AS (
265                -- Base case: Find all immediate dependents from any starting cursor
266                SELECT i.originator_id, i.sequence_id, i.group_id, i.envelope_payload
267                FROM icebox i
268                JOIN icebox_dependencies d ON i.originator_id = d.envelope_originator_id
269                                           AND i.sequence_id = d.envelope_sequence_id
270                JOIN start_cursors sc ON d.dependency_originator_id = sc.originator_id
271                                      AND d.dependency_sequence_id = sc.sequence_id
272
273                UNION ALL
274
275                -- Recursive case: Continue traversing the dependent chain
276                SELECT i.originator_id, i.sequence_id, i.group_id, i.envelope_payload
277                FROM icebox i
278                JOIN icebox_dependencies d ON i.originator_id = d.envelope_originator_id
279                                           AND i.sequence_id = d.envelope_sequence_id
280                JOIN dependency_chain dc ON d.dependency_originator_id = dc.originator_id
281                                         AND d.dependency_sequence_id = dc.sequence_id
282            )
283            SELECT
284                dc.originator_id,
285                dc.sequence_id,
286                dc.group_id,
287                dc.envelope_payload,
288                d.dependency_originator_id,
289                d.dependency_sequence_id
290            FROM dependency_chain dc
291            INNER JOIN icebox_dependencies d
292                ON dc.originator_id = d.envelope_originator_id
293                AND dc.sequence_id = d.envelope_sequence_id
294            "#,
295            values_clause
296        );
297
298        self.do_icebox_query(query_str)
299    }
300
301    fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<usize, crate::ConnectionError> {
302        if orphans.is_empty() {
303            return Ok(0);
304        }
305        self.raw_query_write(|conn| {
306            conn.transaction::<_, diesel::result::Error, _>(|conn| {
307                let mut total = 0;
308
309                for orphan in &orphans {
310                    let inserted = diesel::insert_into(dsl::icebox)
311                        .values(Icebox::from(orphan.clone()))
312                        .on_conflict_do_nothing()
313                        .execute(conn)?;
314                    total += inserted;
315                }
316
317                let dependencies = orphans.iter().flat_map(|o| o.deps()).collect::<Vec<_>>();
318                for dep in dependencies {
319                    let inserted = diesel::insert_into(icebox_dependencies::table)
320                        .values(dep)
321                        .on_conflict_do_nothing()
322                        .execute(conn)?;
323                    total += inserted;
324                }
325
326                Ok(total)
327            })
328        })
329    }
330
331    fn prune_icebox(&self) -> Result<usize, crate::ConnectionError> {
332        use super::refresh_state::EntityKind;
333        use super::schema::{icebox, refresh_state};
334
335        self.raw_query_write(|conn| {
336            diesel::delete(
337                icebox::table.filter(diesel::dsl::exists(
338                    refresh_state::table
339                        .filter(refresh_state::entity_id.eq(icebox::group_id))
340                        .filter(
341                            refresh_state::originator_id
342                                .cast::<diesel::sql_types::BigInt>()
343                                .eq(icebox::originator_id),
344                        )
345                        .filter(refresh_state::sequence_id.ge(icebox::sequence_id))
346                        .filter(
347                            refresh_state::entity_kind.eq_any(&[
348                                EntityKind::ApplicationMessage,
349                                EntityKind::CommitMessage,
350                            ]),
351                        ),
352                )),
353            )
354            .execute(conn)
355        })
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use xmtp_proto::types::Cursor;
362
363    use crate::Store;
364    use crate::group::{ConversationType, GroupMembershipState, StoredGroup};
365    use crate::with_connection;
366
367    use super::*;
368
369    fn create_test_group(conn: &impl crate::DbQuery) -> Vec<u8> {
370        let group_id = xmtp_common::rand_vec::<24>();
371        let group = StoredGroup {
372            id: group_id.clone(),
373            created_at_ns: 0,
374            membership_state: GroupMembershipState::Allowed,
375            installations_last_checked: 0,
376            added_by_inbox_id: "test".to_string(),
377            sequence_id: None,
378            rotated_at_ns: 0,
379            conversation_type: ConversationType::Group,
380            dm_id: None,
381            last_message_ns: None,
382            message_disappear_from_ns: None,
383            message_disappear_in_ns: None,
384            paused_for_version: None,
385            maybe_forked: false,
386            fork_details: "{}".to_string(),
387            originator_id: None,
388            should_publish_commit_log: false,
389            commit_log_public_key: None,
390            is_commit_log_forked: None,
391            has_pending_leave_request: None,
392        };
393        group.store(conn).unwrap();
394        group_id
395    }
396
397    fn iced(group_id: Vec<u8>) -> Vec<OrphanedEnvelope> {
398        vec![
399            OrphanedEnvelope::builder()
400                .cursor(Cursor::new(41, 1u32))
401                .depending_on(Cursor::new(40, 1u32))
402                .payload(vec![1, 2, 3])
403                .group_id(group_id.clone())
404                .build()
405                .unwrap(),
406            OrphanedEnvelope::builder()
407                .cursor(Cursor::new(40, 1u32))
408                .depending_on(Cursor::new(39, 2u32))
409                .payload(vec![1, 2, 3])
410                .group_id(group_id.clone())
411                .build()
412                .unwrap(),
413            OrphanedEnvelope::builder()
414                .cursor(Cursor::new(39, 2u32))
415                .depending_on(Cursor::new(38, 2u32))
416                .payload(vec![1, 2, 3])
417                .group_id(group_id)
418                .build()
419                .unwrap(),
420        ]
421    }
422
423    #[xmtp_common::test(unwrap_try = true)]
424    fn icebox_dependency_chain() {
425        with_connection(|conn| {
426            let group_id = create_test_group(conn);
427            let orphans = iced(group_id);
428
429            // Store envelopes and dependencies
430            conn.ice(orphans.clone())?;
431
432            let dep_chain = conn.past_dependents(&[Cursor::new(41, 1u32)])?;
433            assert_eq!(dep_chain.len(), 3);
434
435            assert_eq!(orphans[0].depends_on[&1], 40);
436            assert_eq!(orphans[1].depends_on[&2], 39);
437            assert_eq!(orphans[2].depends_on[&2], 38);
438
439            let mut dep_chain = conn.future_dependents(&[Cursor::new(39, 2u32)])?;
440            dep_chain.sort_by_key(|d| d.cursor.sequence_id);
441            assert_eq!(dep_chain.len(), 2);
442            assert_eq!(dep_chain[0].cursor.sequence_id, 40);
443            assert_eq!(dep_chain[0].cursor.originator_id, 1);
444            assert_eq!(dep_chain[0].depends_on[&2], 39);
445
446            assert_eq!(dep_chain[1].cursor.sequence_id, 41);
447            assert_eq!(dep_chain[1].cursor.originator_id, 1);
448            assert_eq!(dep_chain[1].depends_on[&1], 40);
449        })
450    }
451
452    #[xmtp_common::test(unwrap_try = true)]
453    fn test_icebox_wrong_originator() {
454        with_connection(|conn| {
455            let group_id = create_test_group(conn);
456            // Break the chain by changing the originator
457            let mut orphans = iced(group_id.clone());
458            // Change envelope (39, 2) to (39, 1), breaking the chain
459            orphans[2] = OrphanedEnvelope::builder()
460                .cursor(Cursor::new(39, 1u32))
461                .depending_on(Cursor::new(38, 1u32))
462                .payload(vec![1, 2, 3])
463                .group_id(group_id)
464                .build()
465                .unwrap();
466
467            conn.ice(orphans)?;
468
469            let mut dep_chain = conn.past_dependents(&[Cursor::new(41, 1u32)])?;
470            dep_chain.sort_by_key(|d| d.cursor.sequence_id);
471            // The last iced message should not be there due to the wrong originator_id.
472            // past_dependents returns starting envelope + dependencies
473            // Should only return (41, 1) and (40, 1) because (40, 1) depends on (39, 2) which doesn't exist
474            assert_eq!(dep_chain.len(), 2);
475            assert_eq!(dep_chain[0].depends_on[&2], 39);
476            assert_eq!(dep_chain[1].depends_on[&1], 40);
477
478            // With the changed originator, envelope (39, 1) has no dependents
479            // (40, 1) depends on (39, 2), not (39, 1)
480            let dep_chain = conn.future_dependents(&[Cursor::new(39, 1u32)])?;
481            assert_eq!(dep_chain.len(), 0);
482        })
483    }
484
485    #[xmtp_common::test(unwrap_try = true)]
486    fn test_icebox_wrong_sequence() {
487        with_connection(|conn| {
488            let group_id = create_test_group(conn);
489            // Break the chain by changing the sequence_id to a non-conflicting value
490            let mut orphans = iced(group_id.clone());
491            // Change envelope (39, 2) to (100, 2), breaking the chain
492            orphans[2] = OrphanedEnvelope::builder()
493                .cursor(Cursor::new(100, 2u32))
494                .depending_on(Cursor::new(38, 2u32))
495                .payload(vec![1, 2, 3])
496                .group_id(group_id)
497                .build()
498                .unwrap();
499
500            conn.ice(orphans)?;
501
502            let mut dep_chain = conn.past_dependents(&[Cursor::new(41, 1u32)])?;
503            dep_chain.sort_by_key(|d| d.cursor.sequence_id);
504
505            // The last iced message should not be there due to the wrong sequence_id.
506            // past_dependents returns starting envelope + dependencies
507            // Should only return (41, 1) and (40, 1) because (40, 1) depends on (39, 2) which doesn't exist
508            assert_eq!(dep_chain.len(), 2);
509            assert_eq!(dep_chain[0].depends_on[&2], 39);
510            assert_eq!(dep_chain[1].depends_on[&1], 40);
511            // With the changed sequence_id, envelope (100, 2) has no dependents
512            // Nothing depends on (100, 2) in the dependency chain
513            let dep_chain = conn.future_dependents(&[Cursor::new(100, 2u32)])?;
514            assert_eq!(dep_chain.len(), 0);
515        })
516    }
517
518    // commit + two dependant application messages
519    #[xmtp_common::test(unwrap_try = true)]
520    fn test_icebox_multiple_dependencies() {
521        with_connection(|conn| {
522            let group_id = create_test_group(conn);
523            // Test that two envelopes can depend on the same envelope
524            let orphans = vec![
525                OrphanedEnvelope::builder()
526                    .cursor(Cursor::new(1, 100u32))
527                    .depending_on(Cursor::new(10, 0u32))
528                    .payload(vec![1; 5])
529                    .group_id(group_id.clone())
530                    .build()
531                    .unwrap(),
532                OrphanedEnvelope::builder()
533                    .cursor(Cursor::new(2, 100u32))
534                    .depending_on(Cursor::new(10, 0u32))
535                    .payload(vec![1; 5])
536                    .group_id(group_id)
537                    .build()
538                    .unwrap(),
539            ];
540
541            let result = conn.ice(orphans);
542            assert!(result.is_ok());
543
544            let mut got = conn.future_dependents(&[Cursor::new(10, 0u32)])?;
545            got.sort_by_key(|d| d.cursor.sequence_id);
546            assert_eq!(got.len(), 2);
547            assert_eq!(got[0].cursor.sequence_id, 1);
548            assert_eq!(got[0].cursor.originator_id, 100);
549            assert_eq!(got[1].cursor.sequence_id, 2);
550            assert_eq!(got[1].cursor.originator_id, 100);
551
552            // Verify both envelopes have the dependency on commit
553            for envelope in &got {
554                assert_eq!(envelope.depends_on[&0], 10);
555            }
556        })
557    }
558
559    // chained commits & app messages
560    #[xmtp_common::test(unwrap_try = true)]
561    fn test_icebox_chain() {
562        with_connection(|conn| {
563            let group_id = create_test_group(conn);
564            // Test a chain where envelope 3 depends on 2, and both 1 and 2 depend on 3
565            let orphans = vec![
566                OrphanedEnvelope::builder()
567                    .cursor(Cursor::new(1, 100u32))
568                    .depending_on(Cursor::new(3, 0u32))
569                    .payload(vec![1])
570                    .group_id(group_id.clone())
571                    .build()
572                    .unwrap(),
573                OrphanedEnvelope::builder()
574                    .cursor(Cursor::new(2, 100u32))
575                    .depending_on(Cursor::new(3, 0u32))
576                    .payload(vec![1])
577                    .group_id(group_id.clone())
578                    .build()
579                    .unwrap(),
580                OrphanedEnvelope::builder()
581                    .cursor(Cursor::new(3, 0u32))
582                    .depending_on(Cursor::new(2, 0u32))
583                    .payload(vec![1])
584                    .group_id(group_id)
585                    .build()
586                    .unwrap(),
587            ];
588
589            let result = conn.ice(orphans);
590            assert!(result.is_ok());
591
592            let mut got = conn.future_dependents(&[Cursor::new(2, 0u32)])?;
593            got.sort_by_key(|i| i.cursor.sequence_id);
594            assert_eq!(got.len(), 3);
595
596            assert_eq!(got[0].cursor.sequence_id, 1);
597            assert_eq!(got[0].cursor.originator_id, 100);
598            assert_eq!(got[1].cursor.sequence_id, 2);
599            assert_eq!(got[1].cursor.originator_id, 100);
600            assert_eq!(got[2].cursor.sequence_id, 3);
601            assert_eq!(got[2].cursor.originator_id, 0);
602        })
603    }
604
605    #[xmtp_common::test(unwrap_try = true)]
606    fn test_future_dependents_multiple_cursors() {
607        with_connection(|conn| {
608            let group_id = create_test_group(conn);
609            let orphans = iced(group_id);
610
611            // Store envelopes and dependencies
612            conn.ice(orphans)?;
613
614            // Test query with multiple cursors
615            let cursors = vec![Cursor::new(39, 2u32), Cursor::new(40, 1u32)];
616
617            let mut result = conn.future_dependents(&cursors)?;
618            result.sort_by_key(|d| d.cursor.sequence_id);
619
620            // Verify we get the union of dependants
621            // (39, 2) is depended on by (40, 1) and (41, 1)
622            // (40, 1) is depended on by (41, 1)
623            // So we should get (40, 1) and (41, 1), deduplicated
624            assert_eq!(result.len(), 2);
625            assert_eq!(result[0].cursor.sequence_id, 40);
626            assert_eq!(result[0].cursor.originator_id, 1);
627            assert_eq!(result[1].cursor.sequence_id, 41);
628            assert_eq!(result[1].cursor.originator_id, 1);
629
630            // Verify dependencies are correct
631            assert_eq!(result[0].depends_on[&2], 39);
632            assert_eq!(result[1].depends_on[&1], 40);
633        })
634    }
635
636    #[xmtp_common::test(unwrap_try = true)]
637    fn test_future_dependents_empty() {
638        with_connection(|conn| {
639            // Test with empty cursor list
640            let result = conn.future_dependents(&[])?;
641            assert_eq!(result.len(), 0);
642        })
643    }
644
645    #[xmtp_common::test(unwrap_try = true)]
646    fn test_querying_dependencies_in_middle_works() {
647        with_connection(|conn| {
648            let group_id = create_test_group(conn);
649            let orphans = iced(group_id);
650
651            conn.ice(orphans.clone())?;
652
653            let mut result = conn.past_dependents(&[Cursor::new(40, 1u32)])?;
654            assert_eq!(result.len(), 2);
655            result.sort_by_key(|d| d.cursor.originator_id);
656            assert_eq!(result[0].cursor, Cursor::new(40, 1u32));
657            assert_eq!(result[0].depends_on, Cursor::new(39, 2u32).into());
658            assert_eq!(result[1].cursor, Cursor::new(39, 2u32));
659            assert_eq!(result[1].depends_on, Cursor::new(38, 2u32).into());
660
661            let result = conn.future_dependents(&[Cursor::new(40, 1u32)])?;
662            assert_eq!(result.len(), 1);
663            assert_eq!(result[0].cursor, Cursor::new(41, 1u32));
664            assert_eq!(result[0].depends_on, Cursor::new(40, 1u32).into());
665        })
666    }
667
668    #[xmtp_common::test(unwrap_try = true)]
669    fn test_prune_icebox() {
670        use crate::StoreOrIgnore;
671        use crate::encrypted_store::refresh_state::{EntityKind, RefreshState};
672
673        with_connection(|conn| {
674            let group_id = create_test_group(conn);
675
676            let orphans = vec![
677                OrphanedEnvelope::builder()
678                    .cursor(Cursor::new(10, 1u32))
679                    .depending_on(Cursor::new(9, 1u32))
680                    .payload(vec![1, 2, 3])
681                    .group_id(group_id.clone())
682                    .build()
683                    .unwrap(),
684                OrphanedEnvelope::builder()
685                    .cursor(Cursor::new(20, 1u32))
686                    .depending_on(Cursor::new(19, 1u32))
687                    .payload(vec![4, 5, 6])
688                    .group_id(group_id.clone())
689                    .build()
690                    .unwrap(),
691                OrphanedEnvelope::builder()
692                    .cursor(Cursor::new(30, 1u32))
693                    .depending_on(Cursor::new(29, 1u32))
694                    .payload(vec![7, 8, 9])
695                    .group_id(group_id.clone())
696                    .build()
697                    .unwrap(),
698                OrphanedEnvelope::builder()
699                    .cursor(Cursor::new(10, 10u32))
700                    .depending_on(Cursor::new(1, 1u32))
701                    .payload(vec![1, 2, 3])
702                    .group_id(group_id.clone())
703                    .build()
704                    .unwrap(),
705            ];
706            conn.ice(orphans)?;
707
708            RefreshState {
709                entity_id: group_id.clone(),
710                entity_kind: EntityKind::ApplicationMessage,
711                sequence_id: 20,
712                originator_id: 1,
713            }
714            .store_or_ignore(conn)?;
715
716            let deleted = conn.prune_icebox()?;
717            assert_eq!(
718                deleted, 2,
719                "Should delete entries with sequence_id 10 and 20"
720            );
721
722            // Verify entry 30 remains
723            let mut remaining: Vec<Icebox> = conn.raw_query_read(|conn| {
724                dsl::icebox.filter(dsl::group_id.eq(&group_id)).load(conn)
725            })?;
726            remaining.sort_by_key(|e| e.originator_id);
727
728            assert_eq!(remaining.len(), 2, "Should have 2 entries remaining");
729            assert_eq!(remaining[0].sequence_id, 30);
730            assert_eq!(remaining[0].originator_id, 1);
731            assert_eq!(remaining[1].sequence_id, 10);
732            assert_eq!(remaining[1].originator_id, 10);
733        })
734    }
735
736    #[xmtp_common::test(unwrap_try = true)]
737    fn test_prune_icebox_no_cleanup_when_cursor_lower() {
738        use crate::StoreOrIgnore;
739        use crate::encrypted_store::refresh_state::{EntityKind, RefreshState};
740
741        with_connection(|conn| {
742            let group_id = create_test_group(conn);
743
744            let orphans = vec![
745                OrphanedEnvelope::builder()
746                    .cursor(Cursor::new(50, 1u32))
747                    .depending_on(Cursor::new(49, 1u32))
748                    .payload(vec![1, 2, 3])
749                    .group_id(group_id.clone())
750                    .build()
751                    .unwrap(),
752                OrphanedEnvelope::builder()
753                    .cursor(Cursor::new(60, 1u32))
754                    .depending_on(Cursor::new(59, 1u32))
755                    .payload(vec![4, 5, 6])
756                    .group_id(group_id.clone())
757                    .build()
758                    .unwrap(),
759            ];
760            conn.ice(orphans)?;
761
762            RefreshState {
763                entity_id: group_id.clone(),
764                entity_kind: EntityKind::ApplicationMessage,
765                sequence_id: 40,
766                originator_id: 1,
767            }
768            .store_or_ignore(conn)?;
769
770            let deleted = conn.prune_icebox()?;
771            assert_eq!(deleted, 0, "Should not delete any entries");
772
773            let remaining: Vec<Icebox> = conn.raw_query_read(|conn| {
774                dsl::icebox.filter(dsl::group_id.eq(&group_id)).load(conn)
775            })?;
776            assert_eq!(remaining.len(), 2);
777        })
778    }
779
780    #[xmtp_common::test(unwrap_try = true)]
781    fn test_prune_icebox_only_relevant_entity_kinds() {
782        use crate::StoreOrIgnore;
783        use crate::encrypted_store::refresh_state::{EntityKind, RefreshState};
784
785        with_connection(|conn| {
786            let group_id = create_test_group(conn);
787
788            let orphans = vec![
789                OrphanedEnvelope::builder()
790                    .cursor(Cursor::new(10, 1u32))
791                    .depending_on(Cursor::new(9, 1u32))
792                    .payload(vec![1, 2, 3])
793                    .group_id(group_id.clone())
794                    .build()
795                    .unwrap(),
796            ];
797            conn.ice(orphans)?;
798
799            RefreshState {
800                entity_id: group_id.clone(),
801                entity_kind: EntityKind::Welcome,
802                sequence_id: 100,
803                originator_id: 1,
804            }
805            .store_or_ignore(conn)?;
806
807            let deleted = conn.prune_icebox()?;
808            assert_eq!(deleted, 0, "Should not delete due to wrong entity_kind");
809
810            let remaining: Vec<Icebox> = conn.raw_query_read(|conn| {
811                dsl::icebox.filter(dsl::group_id.eq(&group_id)).load(conn)
812            })?;
813            assert_eq!(remaining.len(), 1);
814        })
815    }
816
817    #[xmtp_common::test(unwrap_try = true)]
818    fn test_prune_icebox_dependencies_cascade_deleted() {
819        use crate::StoreOrIgnore;
820        use crate::encrypted_store::refresh_state::{EntityKind, RefreshState};
821
822        with_connection(|conn| {
823            let group_id = create_test_group(conn);
824
825            let orphans = vec![
826                OrphanedEnvelope::builder()
827                    .cursor(Cursor::new(10, 1u32))
828                    .depending_on(Cursor::new(9, 1u32))
829                    .payload(vec![1, 2, 3])
830                    .group_id(group_id.clone())
831                    .build()
832                    .unwrap(),
833            ];
834            conn.ice(orphans)?;
835
836            use crate::schema::icebox_dependencies::dsl as dep_dsl;
837            let deps: Vec<IceboxDependency> = conn.raw_query_read(|conn| {
838                icebox_dependencies::table
839                    .filter(dep_dsl::envelope_originator_id.eq(1))
840                    .filter(dep_dsl::envelope_sequence_id.eq(10))
841                    .load(conn)
842            })?;
843            assert_eq!(deps.len(), 1);
844
845            RefreshState {
846                entity_id: group_id.clone(),
847                entity_kind: EntityKind::ApplicationMessage,
848                sequence_id: 10,
849                originator_id: 1,
850            }
851            .store_or_ignore(conn)?;
852
853            let deleted = conn.prune_icebox()?;
854            assert_eq!(deleted, 1, "Should delete the icebox entry");
855
856            let remaining: Vec<Icebox> = conn.raw_query_read(|conn| {
857                dsl::icebox.filter(dsl::group_id.eq(&group_id)).load(conn)
858            })?;
859            assert_eq!(remaining.len(), 0);
860
861            let deps: Vec<IceboxDependency> = conn.raw_query_read(|conn| {
862                icebox_dependencies::table
863                    .filter(dep_dsl::envelope_originator_id.eq(1))
864                    .filter(dep_dsl::envelope_sequence_id.eq(10))
865                    .load(conn)
866            })?;
867            assert_eq!(deps.len(), 0, "Dependencies should be cascade deleted");
868        })
869    }
870}