1use super::{ConnectionExt, db_connection::DbConnection};
2use crate::icebox::types::{IceboxOrphans, IceboxWithDep};
3use crate::schema::icebox::dsl;
4use crate::schema::icebox_dependencies;
5use crate::{impl_store, schema::icebox};
6use diesel::prelude::*;
7use itertools::Itertools;
8use serde::{Deserialize, Serialize};
9use xmtp_proto::types::{
10 Cursor, OriginatorId, OrphanedEnvelope, OrphanedEnvelopeBuilder, SequenceId,
11};
12
13mod types;
14
15#[derive(
16 Debug,
17 Clone,
18 Serialize,
19 Deserialize,
20 Insertable,
21 Identifiable,
22 Queryable,
23 Eq,
24 PartialEq,
25 QueryableByName,
26)]
27#[diesel(table_name = icebox)]
28#[diesel(primary_key(originator_id, sequence_id))]
29#[diesel(belongs_to(crate::group::StoredGroup, foreign_key = group_id))]
30pub struct Icebox {
31 pub originator_id: i64,
32 pub sequence_id: i64,
33 pub group_id: Vec<u8>,
34 pub envelope_payload: Vec<u8>,
35}
36
37impl_store!(Icebox, icebox);
38
39#[derive(
40 Debug,
41 Clone,
42 Serialize,
43 Deserialize,
44 Insertable,
45 Identifiable,
46 Queryable,
47 Eq,
48 PartialEq,
49 QueryableByName,
50)]
51#[diesel(table_name = icebox_dependencies)]
52#[diesel(primary_key(
53 envelope_originator_id,
54 envelope_sequence_id,
55 dependency_originator_id,
56 dependency_sequence_id
57))]
58pub struct IceboxDependency {
59 pub envelope_originator_id: i64,
60 pub envelope_sequence_id: i64,
61 pub dependency_originator_id: i64,
62 pub dependency_sequence_id: i64,
63}
64
65impl_store!(IceboxDependency, icebox_dependencies);
66
67pub trait QueryIcebox {
68 fn past_dependents(
75 &self,
76 cursors: &[Cursor],
77 ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError>;
78
79 fn future_dependents(
83 &self,
84 cursors: &[Cursor],
85 ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError>;
86
87 fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<usize, crate::ConnectionError>;
89
90 fn prune_icebox(&self) -> Result<usize, crate::ConnectionError>;
94}
95
96impl<T> QueryIcebox for &T
97where
98 T: QueryIcebox,
99{
100 fn past_dependents(
101 &self,
102 cursors: &[Cursor],
103 ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError> {
104 (**self).past_dependents(cursors)
105 }
106
107 fn future_dependents(
108 &self,
109 cursors: &[Cursor],
110 ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError> {
111 (**self).future_dependents(cursors)
112 }
113
114 fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<usize, crate::ConnectionError> {
115 (**self).ice(orphans)
116 }
117
118 fn prune_icebox(&self) -> Result<usize, crate::ConnectionError> {
119 (**self).prune_icebox()
120 }
121}
122
123impl<C: ConnectionExt> DbConnection<C> {
124 fn do_icebox_query(
125 &self,
126 query_str: String,
127 ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError> {
128 self.raw_query_read(|conn| {
129 diesel::sql_query(query_str)
130 .load_iter::<IceboxWithDep, _>(conn)?
131 .process_results(|iter| {
132 iter.into_grouping_map_by(|row| (row.originator_id, row.sequence_id))
144 .fold_with(
145 |_key, row| {
146 let mut builder = OrphanedEnvelopeBuilder::default();
147 let group_id = unsafe { row.group_id() };
151 let payload = unsafe { row.envelope_payload() };
152 builder
153 .cursor(Cursor::new(
154 row.sequence_id as SequenceId,
155 row.originator_id as OriginatorId,
156 ))
157 .payload(payload)
158 .group_id(group_id);
159 builder
160 },
161 |mut acc, _key, row| {
162 acc.depending_on(Cursor::new(
163 row.dependency_sequence_id as SequenceId,
164 row.dependency_originator_id as OriginatorId,
165 ));
166 acc
167 },
168 )
169 .into_values()
170 .map(|v| v.build())
171 .try_collect()
172 .map_err(|e| diesel::result::Error::DeserializationError(Box::new(e) as _))
173 })?
174 })
175 }
176}
177
178impl<C: ConnectionExt> QueryIcebox for DbConnection<C> {
179 fn past_dependents(
180 &self,
181 cursors: &[Cursor],
182 ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError> {
183 if cursors.is_empty() {
184 return Ok(Vec::new());
185 }
186
187 let values_clause = cursors
188 .iter()
189 .map(|c| format!("({}, {})", c.originator_id, c.sequence_id))
190 .join(", ");
191
192 let query_str = format!(
193 r#"
194 WITH RECURSIVE
195 start_cursors(originator_id, sequence_id) AS (
196 VALUES {}
197 ),
198 dependency_chain AS (
199 -- Base case: Start with the specified envelopes if they exist
200 SELECT i.originator_id, i.sequence_id, i.group_id, i.envelope_payload
201 FROM icebox i
202 JOIN start_cursors sc ON i.originator_id = sc.originator_id
203 AND i.sequence_id = sc.sequence_id
204
205 UNION
206
207 -- OR start with their immediate dependencies if they don't
208 SELECT i.originator_id, i.sequence_id, i.group_id, i.envelope_payload
209 FROM icebox i
210 JOIN icebox_dependencies d ON i.originator_id = d.dependency_originator_id
211 AND i.sequence_id = d.dependency_sequence_id
212 JOIN start_cursors sc ON d.envelope_originator_id = sc.originator_id
213 AND d.envelope_sequence_id = sc.sequence_id
214
215 UNION ALL
216
217 -- Recursive case: Continue traversing the dependency chain
218 SELECT i.originator_id, i.sequence_id, i.group_id, i.envelope_payload
219 FROM icebox i
220 JOIN icebox_dependencies d ON i.originator_id = d.dependency_originator_id
221 AND i.sequence_id = d.dependency_sequence_id
222 JOIN dependency_chain dc ON d.envelope_originator_id = dc.originator_id
223 AND d.envelope_sequence_id = dc.sequence_id
224 )
225 SELECT
226 dc.originator_id,
227 dc.sequence_id,
228 dc.group_id,
229 dc.envelope_payload,
230 d.dependency_originator_id,
231 d.dependency_sequence_id
232 FROM (SELECT DISTINCT * FROM dependency_chain) dc
233 INNER JOIN icebox_dependencies d
234 ON dc.originator_id = d.envelope_originator_id
235 AND dc.sequence_id = d.envelope_sequence_id
236 ORDER BY dc.originator_id DESC, dc.sequence_id DESC
237 "#,
238 values_clause
239 );
240
241 self.do_icebox_query(query_str)
242 }
243
244 fn future_dependents(
245 &self,
246 cursors: &[Cursor],
247 ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError> {
248 if cursors.is_empty() {
249 return Ok(Vec::new());
250 }
251
252 let values_clause = cursors
254 .iter()
255 .map(|c| format!("({}, {})", c.originator_id, c.sequence_id))
256 .join(", ");
257
258 let query_str = format!(
259 r#"
260 WITH RECURSIVE
261 start_cursors(originator_id, sequence_id) AS (
262 VALUES {}
263 ),
264 dependency_chain AS (
265 -- Base case: Find all immediate dependents from any starting cursor
266 SELECT i.originator_id, i.sequence_id, i.group_id, i.envelope_payload
267 FROM icebox i
268 JOIN icebox_dependencies d ON i.originator_id = d.envelope_originator_id
269 AND i.sequence_id = d.envelope_sequence_id
270 JOIN start_cursors sc ON d.dependency_originator_id = sc.originator_id
271 AND d.dependency_sequence_id = sc.sequence_id
272
273 UNION ALL
274
275 -- Recursive case: Continue traversing the dependent chain
276 SELECT i.originator_id, i.sequence_id, i.group_id, i.envelope_payload
277 FROM icebox i
278 JOIN icebox_dependencies d ON i.originator_id = d.envelope_originator_id
279 AND i.sequence_id = d.envelope_sequence_id
280 JOIN dependency_chain dc ON d.dependency_originator_id = dc.originator_id
281 AND d.dependency_sequence_id = dc.sequence_id
282 )
283 SELECT
284 dc.originator_id,
285 dc.sequence_id,
286 dc.group_id,
287 dc.envelope_payload,
288 d.dependency_originator_id,
289 d.dependency_sequence_id
290 FROM dependency_chain dc
291 INNER JOIN icebox_dependencies d
292 ON dc.originator_id = d.envelope_originator_id
293 AND dc.sequence_id = d.envelope_sequence_id
294 "#,
295 values_clause
296 );
297
298 self.do_icebox_query(query_str)
299 }
300
301 fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<usize, crate::ConnectionError> {
302 if orphans.is_empty() {
303 return Ok(0);
304 }
305 self.raw_query_write(|conn| {
306 conn.transaction::<_, diesel::result::Error, _>(|conn| {
307 let mut total = 0;
308
309 for orphan in &orphans {
310 let inserted = diesel::insert_into(dsl::icebox)
311 .values(Icebox::from(orphan.clone()))
312 .on_conflict_do_nothing()
313 .execute(conn)?;
314 total += inserted;
315 }
316
317 let dependencies = orphans.iter().flat_map(|o| o.deps()).collect::<Vec<_>>();
318 for dep in dependencies {
319 let inserted = diesel::insert_into(icebox_dependencies::table)
320 .values(dep)
321 .on_conflict_do_nothing()
322 .execute(conn)?;
323 total += inserted;
324 }
325
326 Ok(total)
327 })
328 })
329 }
330
331 fn prune_icebox(&self) -> Result<usize, crate::ConnectionError> {
332 use super::refresh_state::EntityKind;
333 use super::schema::{icebox, refresh_state};
334
335 self.raw_query_write(|conn| {
336 diesel::delete(
337 icebox::table.filter(diesel::dsl::exists(
338 refresh_state::table
339 .filter(refresh_state::entity_id.eq(icebox::group_id))
340 .filter(
341 refresh_state::originator_id
342 .cast::<diesel::sql_types::BigInt>()
343 .eq(icebox::originator_id),
344 )
345 .filter(refresh_state::sequence_id.ge(icebox::sequence_id))
346 .filter(
347 refresh_state::entity_kind.eq_any(&[
348 EntityKind::ApplicationMessage,
349 EntityKind::CommitMessage,
350 ]),
351 ),
352 )),
353 )
354 .execute(conn)
355 })
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use xmtp_proto::types::Cursor;
362
363 use crate::Store;
364 use crate::group::{ConversationType, GroupMembershipState, StoredGroup};
365 use crate::with_connection;
366
367 use super::*;
368
369 fn create_test_group(conn: &impl crate::DbQuery) -> Vec<u8> {
370 let group_id = xmtp_common::rand_vec::<24>();
371 let group = StoredGroup {
372 id: group_id.clone(),
373 created_at_ns: 0,
374 membership_state: GroupMembershipState::Allowed,
375 installations_last_checked: 0,
376 added_by_inbox_id: "test".to_string(),
377 sequence_id: None,
378 rotated_at_ns: 0,
379 conversation_type: ConversationType::Group,
380 dm_id: None,
381 last_message_ns: None,
382 message_disappear_from_ns: None,
383 message_disappear_in_ns: None,
384 paused_for_version: None,
385 maybe_forked: false,
386 fork_details: "{}".to_string(),
387 originator_id: None,
388 should_publish_commit_log: false,
389 commit_log_public_key: None,
390 is_commit_log_forked: None,
391 has_pending_leave_request: None,
392 };
393 group.store(conn).unwrap();
394 group_id
395 }
396
397 fn iced(group_id: Vec<u8>) -> Vec<OrphanedEnvelope> {
398 vec![
399 OrphanedEnvelope::builder()
400 .cursor(Cursor::new(41, 1u32))
401 .depending_on(Cursor::new(40, 1u32))
402 .payload(vec![1, 2, 3])
403 .group_id(group_id.clone())
404 .build()
405 .unwrap(),
406 OrphanedEnvelope::builder()
407 .cursor(Cursor::new(40, 1u32))
408 .depending_on(Cursor::new(39, 2u32))
409 .payload(vec![1, 2, 3])
410 .group_id(group_id.clone())
411 .build()
412 .unwrap(),
413 OrphanedEnvelope::builder()
414 .cursor(Cursor::new(39, 2u32))
415 .depending_on(Cursor::new(38, 2u32))
416 .payload(vec![1, 2, 3])
417 .group_id(group_id)
418 .build()
419 .unwrap(),
420 ]
421 }
422
423 #[xmtp_common::test(unwrap_try = true)]
424 fn icebox_dependency_chain() {
425 with_connection(|conn| {
426 let group_id = create_test_group(conn);
427 let orphans = iced(group_id);
428
429 conn.ice(orphans.clone())?;
431
432 let dep_chain = conn.past_dependents(&[Cursor::new(41, 1u32)])?;
433 assert_eq!(dep_chain.len(), 3);
434
435 assert_eq!(orphans[0].depends_on[&1], 40);
436 assert_eq!(orphans[1].depends_on[&2], 39);
437 assert_eq!(orphans[2].depends_on[&2], 38);
438
439 let mut dep_chain = conn.future_dependents(&[Cursor::new(39, 2u32)])?;
440 dep_chain.sort_by_key(|d| d.cursor.sequence_id);
441 assert_eq!(dep_chain.len(), 2);
442 assert_eq!(dep_chain[0].cursor.sequence_id, 40);
443 assert_eq!(dep_chain[0].cursor.originator_id, 1);
444 assert_eq!(dep_chain[0].depends_on[&2], 39);
445
446 assert_eq!(dep_chain[1].cursor.sequence_id, 41);
447 assert_eq!(dep_chain[1].cursor.originator_id, 1);
448 assert_eq!(dep_chain[1].depends_on[&1], 40);
449 })
450 }
451
452 #[xmtp_common::test(unwrap_try = true)]
453 fn test_icebox_wrong_originator() {
454 with_connection(|conn| {
455 let group_id = create_test_group(conn);
456 let mut orphans = iced(group_id.clone());
458 orphans[2] = OrphanedEnvelope::builder()
460 .cursor(Cursor::new(39, 1u32))
461 .depending_on(Cursor::new(38, 1u32))
462 .payload(vec![1, 2, 3])
463 .group_id(group_id)
464 .build()
465 .unwrap();
466
467 conn.ice(orphans)?;
468
469 let mut dep_chain = conn.past_dependents(&[Cursor::new(41, 1u32)])?;
470 dep_chain.sort_by_key(|d| d.cursor.sequence_id);
471 assert_eq!(dep_chain.len(), 2);
475 assert_eq!(dep_chain[0].depends_on[&2], 39);
476 assert_eq!(dep_chain[1].depends_on[&1], 40);
477
478 let dep_chain = conn.future_dependents(&[Cursor::new(39, 1u32)])?;
481 assert_eq!(dep_chain.len(), 0);
482 })
483 }
484
485 #[xmtp_common::test(unwrap_try = true)]
486 fn test_icebox_wrong_sequence() {
487 with_connection(|conn| {
488 let group_id = create_test_group(conn);
489 let mut orphans = iced(group_id.clone());
491 orphans[2] = OrphanedEnvelope::builder()
493 .cursor(Cursor::new(100, 2u32))
494 .depending_on(Cursor::new(38, 2u32))
495 .payload(vec![1, 2, 3])
496 .group_id(group_id)
497 .build()
498 .unwrap();
499
500 conn.ice(orphans)?;
501
502 let mut dep_chain = conn.past_dependents(&[Cursor::new(41, 1u32)])?;
503 dep_chain.sort_by_key(|d| d.cursor.sequence_id);
504
505 assert_eq!(dep_chain.len(), 2);
509 assert_eq!(dep_chain[0].depends_on[&2], 39);
510 assert_eq!(dep_chain[1].depends_on[&1], 40);
511 let dep_chain = conn.future_dependents(&[Cursor::new(100, 2u32)])?;
514 assert_eq!(dep_chain.len(), 0);
515 })
516 }
517
518 #[xmtp_common::test(unwrap_try = true)]
520 fn test_icebox_multiple_dependencies() {
521 with_connection(|conn| {
522 let group_id = create_test_group(conn);
523 let orphans = vec![
525 OrphanedEnvelope::builder()
526 .cursor(Cursor::new(1, 100u32))
527 .depending_on(Cursor::new(10, 0u32))
528 .payload(vec![1; 5])
529 .group_id(group_id.clone())
530 .build()
531 .unwrap(),
532 OrphanedEnvelope::builder()
533 .cursor(Cursor::new(2, 100u32))
534 .depending_on(Cursor::new(10, 0u32))
535 .payload(vec![1; 5])
536 .group_id(group_id)
537 .build()
538 .unwrap(),
539 ];
540
541 let result = conn.ice(orphans);
542 assert!(result.is_ok());
543
544 let mut got = conn.future_dependents(&[Cursor::new(10, 0u32)])?;
545 got.sort_by_key(|d| d.cursor.sequence_id);
546 assert_eq!(got.len(), 2);
547 assert_eq!(got[0].cursor.sequence_id, 1);
548 assert_eq!(got[0].cursor.originator_id, 100);
549 assert_eq!(got[1].cursor.sequence_id, 2);
550 assert_eq!(got[1].cursor.originator_id, 100);
551
552 for envelope in &got {
554 assert_eq!(envelope.depends_on[&0], 10);
555 }
556 })
557 }
558
559 #[xmtp_common::test(unwrap_try = true)]
561 fn test_icebox_chain() {
562 with_connection(|conn| {
563 let group_id = create_test_group(conn);
564 let orphans = vec![
566 OrphanedEnvelope::builder()
567 .cursor(Cursor::new(1, 100u32))
568 .depending_on(Cursor::new(3, 0u32))
569 .payload(vec![1])
570 .group_id(group_id.clone())
571 .build()
572 .unwrap(),
573 OrphanedEnvelope::builder()
574 .cursor(Cursor::new(2, 100u32))
575 .depending_on(Cursor::new(3, 0u32))
576 .payload(vec![1])
577 .group_id(group_id.clone())
578 .build()
579 .unwrap(),
580 OrphanedEnvelope::builder()
581 .cursor(Cursor::new(3, 0u32))
582 .depending_on(Cursor::new(2, 0u32))
583 .payload(vec![1])
584 .group_id(group_id)
585 .build()
586 .unwrap(),
587 ];
588
589 let result = conn.ice(orphans);
590 assert!(result.is_ok());
591
592 let mut got = conn.future_dependents(&[Cursor::new(2, 0u32)])?;
593 got.sort_by_key(|i| i.cursor.sequence_id);
594 assert_eq!(got.len(), 3);
595
596 assert_eq!(got[0].cursor.sequence_id, 1);
597 assert_eq!(got[0].cursor.originator_id, 100);
598 assert_eq!(got[1].cursor.sequence_id, 2);
599 assert_eq!(got[1].cursor.originator_id, 100);
600 assert_eq!(got[2].cursor.sequence_id, 3);
601 assert_eq!(got[2].cursor.originator_id, 0);
602 })
603 }
604
605 #[xmtp_common::test(unwrap_try = true)]
606 fn test_future_dependents_multiple_cursors() {
607 with_connection(|conn| {
608 let group_id = create_test_group(conn);
609 let orphans = iced(group_id);
610
611 conn.ice(orphans)?;
613
614 let cursors = vec![Cursor::new(39, 2u32), Cursor::new(40, 1u32)];
616
617 let mut result = conn.future_dependents(&cursors)?;
618 result.sort_by_key(|d| d.cursor.sequence_id);
619
620 assert_eq!(result.len(), 2);
625 assert_eq!(result[0].cursor.sequence_id, 40);
626 assert_eq!(result[0].cursor.originator_id, 1);
627 assert_eq!(result[1].cursor.sequence_id, 41);
628 assert_eq!(result[1].cursor.originator_id, 1);
629
630 assert_eq!(result[0].depends_on[&2], 39);
632 assert_eq!(result[1].depends_on[&1], 40);
633 })
634 }
635
636 #[xmtp_common::test(unwrap_try = true)]
637 fn test_future_dependents_empty() {
638 with_connection(|conn| {
639 let result = conn.future_dependents(&[])?;
641 assert_eq!(result.len(), 0);
642 })
643 }
644
645 #[xmtp_common::test(unwrap_try = true)]
646 fn test_querying_dependencies_in_middle_works() {
647 with_connection(|conn| {
648 let group_id = create_test_group(conn);
649 let orphans = iced(group_id);
650
651 conn.ice(orphans.clone())?;
652
653 let mut result = conn.past_dependents(&[Cursor::new(40, 1u32)])?;
654 assert_eq!(result.len(), 2);
655 result.sort_by_key(|d| d.cursor.originator_id);
656 assert_eq!(result[0].cursor, Cursor::new(40, 1u32));
657 assert_eq!(result[0].depends_on, Cursor::new(39, 2u32).into());
658 assert_eq!(result[1].cursor, Cursor::new(39, 2u32));
659 assert_eq!(result[1].depends_on, Cursor::new(38, 2u32).into());
660
661 let result = conn.future_dependents(&[Cursor::new(40, 1u32)])?;
662 assert_eq!(result.len(), 1);
663 assert_eq!(result[0].cursor, Cursor::new(41, 1u32));
664 assert_eq!(result[0].depends_on, Cursor::new(40, 1u32).into());
665 })
666 }
667
668 #[xmtp_common::test(unwrap_try = true)]
669 fn test_prune_icebox() {
670 use crate::StoreOrIgnore;
671 use crate::encrypted_store::refresh_state::{EntityKind, RefreshState};
672
673 with_connection(|conn| {
674 let group_id = create_test_group(conn);
675
676 let orphans = vec![
677 OrphanedEnvelope::builder()
678 .cursor(Cursor::new(10, 1u32))
679 .depending_on(Cursor::new(9, 1u32))
680 .payload(vec![1, 2, 3])
681 .group_id(group_id.clone())
682 .build()
683 .unwrap(),
684 OrphanedEnvelope::builder()
685 .cursor(Cursor::new(20, 1u32))
686 .depending_on(Cursor::new(19, 1u32))
687 .payload(vec![4, 5, 6])
688 .group_id(group_id.clone())
689 .build()
690 .unwrap(),
691 OrphanedEnvelope::builder()
692 .cursor(Cursor::new(30, 1u32))
693 .depending_on(Cursor::new(29, 1u32))
694 .payload(vec![7, 8, 9])
695 .group_id(group_id.clone())
696 .build()
697 .unwrap(),
698 OrphanedEnvelope::builder()
699 .cursor(Cursor::new(10, 10u32))
700 .depending_on(Cursor::new(1, 1u32))
701 .payload(vec![1, 2, 3])
702 .group_id(group_id.clone())
703 .build()
704 .unwrap(),
705 ];
706 conn.ice(orphans)?;
707
708 RefreshState {
709 entity_id: group_id.clone(),
710 entity_kind: EntityKind::ApplicationMessage,
711 sequence_id: 20,
712 originator_id: 1,
713 }
714 .store_or_ignore(conn)?;
715
716 let deleted = conn.prune_icebox()?;
717 assert_eq!(
718 deleted, 2,
719 "Should delete entries with sequence_id 10 and 20"
720 );
721
722 let mut remaining: Vec<Icebox> = conn.raw_query_read(|conn| {
724 dsl::icebox.filter(dsl::group_id.eq(&group_id)).load(conn)
725 })?;
726 remaining.sort_by_key(|e| e.originator_id);
727
728 assert_eq!(remaining.len(), 2, "Should have 2 entries remaining");
729 assert_eq!(remaining[0].sequence_id, 30);
730 assert_eq!(remaining[0].originator_id, 1);
731 assert_eq!(remaining[1].sequence_id, 10);
732 assert_eq!(remaining[1].originator_id, 10);
733 })
734 }
735
736 #[xmtp_common::test(unwrap_try = true)]
737 fn test_prune_icebox_no_cleanup_when_cursor_lower() {
738 use crate::StoreOrIgnore;
739 use crate::encrypted_store::refresh_state::{EntityKind, RefreshState};
740
741 with_connection(|conn| {
742 let group_id = create_test_group(conn);
743
744 let orphans = vec![
745 OrphanedEnvelope::builder()
746 .cursor(Cursor::new(50, 1u32))
747 .depending_on(Cursor::new(49, 1u32))
748 .payload(vec![1, 2, 3])
749 .group_id(group_id.clone())
750 .build()
751 .unwrap(),
752 OrphanedEnvelope::builder()
753 .cursor(Cursor::new(60, 1u32))
754 .depending_on(Cursor::new(59, 1u32))
755 .payload(vec![4, 5, 6])
756 .group_id(group_id.clone())
757 .build()
758 .unwrap(),
759 ];
760 conn.ice(orphans)?;
761
762 RefreshState {
763 entity_id: group_id.clone(),
764 entity_kind: EntityKind::ApplicationMessage,
765 sequence_id: 40,
766 originator_id: 1,
767 }
768 .store_or_ignore(conn)?;
769
770 let deleted = conn.prune_icebox()?;
771 assert_eq!(deleted, 0, "Should not delete any entries");
772
773 let remaining: Vec<Icebox> = conn.raw_query_read(|conn| {
774 dsl::icebox.filter(dsl::group_id.eq(&group_id)).load(conn)
775 })?;
776 assert_eq!(remaining.len(), 2);
777 })
778 }
779
780 #[xmtp_common::test(unwrap_try = true)]
781 fn test_prune_icebox_only_relevant_entity_kinds() {
782 use crate::StoreOrIgnore;
783 use crate::encrypted_store::refresh_state::{EntityKind, RefreshState};
784
785 with_connection(|conn| {
786 let group_id = create_test_group(conn);
787
788 let orphans = vec![
789 OrphanedEnvelope::builder()
790 .cursor(Cursor::new(10, 1u32))
791 .depending_on(Cursor::new(9, 1u32))
792 .payload(vec![1, 2, 3])
793 .group_id(group_id.clone())
794 .build()
795 .unwrap(),
796 ];
797 conn.ice(orphans)?;
798
799 RefreshState {
800 entity_id: group_id.clone(),
801 entity_kind: EntityKind::Welcome,
802 sequence_id: 100,
803 originator_id: 1,
804 }
805 .store_or_ignore(conn)?;
806
807 let deleted = conn.prune_icebox()?;
808 assert_eq!(deleted, 0, "Should not delete due to wrong entity_kind");
809
810 let remaining: Vec<Icebox> = conn.raw_query_read(|conn| {
811 dsl::icebox.filter(dsl::group_id.eq(&group_id)).load(conn)
812 })?;
813 assert_eq!(remaining.len(), 1);
814 })
815 }
816
817 #[xmtp_common::test(unwrap_try = true)]
818 fn test_prune_icebox_dependencies_cascade_deleted() {
819 use crate::StoreOrIgnore;
820 use crate::encrypted_store::refresh_state::{EntityKind, RefreshState};
821
822 with_connection(|conn| {
823 let group_id = create_test_group(conn);
824
825 let orphans = vec![
826 OrphanedEnvelope::builder()
827 .cursor(Cursor::new(10, 1u32))
828 .depending_on(Cursor::new(9, 1u32))
829 .payload(vec![1, 2, 3])
830 .group_id(group_id.clone())
831 .build()
832 .unwrap(),
833 ];
834 conn.ice(orphans)?;
835
836 use crate::schema::icebox_dependencies::dsl as dep_dsl;
837 let deps: Vec<IceboxDependency> = conn.raw_query_read(|conn| {
838 icebox_dependencies::table
839 .filter(dep_dsl::envelope_originator_id.eq(1))
840 .filter(dep_dsl::envelope_sequence_id.eq(10))
841 .load(conn)
842 })?;
843 assert_eq!(deps.len(), 1);
844
845 RefreshState {
846 entity_id: group_id.clone(),
847 entity_kind: EntityKind::ApplicationMessage,
848 sequence_id: 10,
849 originator_id: 1,
850 }
851 .store_or_ignore(conn)?;
852
853 let deleted = conn.prune_icebox()?;
854 assert_eq!(deleted, 1, "Should delete the icebox entry");
855
856 let remaining: Vec<Icebox> = conn.raw_query_read(|conn| {
857 dsl::icebox.filter(dsl::group_id.eq(&group_id)).load(conn)
858 })?;
859 assert_eq!(remaining.len(), 0);
860
861 let deps: Vec<IceboxDependency> = conn.raw_query_read(|conn| {
862 icebox_dependencies::table
863 .filter(dep_dsl::envelope_originator_id.eq(1))
864 .filter(dep_dsl::envelope_sequence_id.eq(10))
865 .load(conn)
866 })?;
867 assert_eq!(deps.len(), 0, "Dependencies should be cascade deleted");
868 })
869 }
870}