xmtp_db/encrypted_store/
readd_status.rs

1use std::collections::HashSet;
2
3use diesel::prelude::*;
4
5use super::{
6    DbConnection,
7    schema::readd_status::{self},
8};
9use crate::{ConnectionExt, impl_store};
10
11#[derive(Identifiable, Queryable, Selectable, Insertable, Debug, Clone, PartialEq, Eq)]
12#[diesel(table_name = readd_status)]
13#[diesel(primary_key(group_id, installation_id))]
14pub struct ReaddStatus {
15    pub group_id: Vec<u8>,
16    pub installation_id: Vec<u8>,
17    pub requested_at_sequence_id: Option<i64>,
18    pub responded_at_sequence_id: Option<i64>,
19}
20
21impl_store!(ReaddStatus, readd_status);
22
23pub trait QueryReaddStatus {
24    fn get_readd_status(
25        &self,
26        group_id: &[u8],
27        installation_id: &[u8],
28    ) -> Result<Option<ReaddStatus>, crate::ConnectionError>;
29
30    fn is_awaiting_readd(
31        &self,
32        group_id: &[u8],
33        installation_id: &[u8],
34    ) -> Result<bool, crate::ConnectionError>;
35
36    /// Update the requested_at_sequence_id for a given group_id and installation_id,
37    /// provided it is higher than the current value.
38    /// Inserts the row if it doesn't exist.
39    fn update_requested_at_sequence_id(
40        &self,
41        group_id: &[u8],
42        installation_id: &[u8],
43        sequence_id: i64,
44    ) -> Result<(), crate::ConnectionError>;
45
46    /// Update the responded_at_sequence_id for a given group_id and installation_id,
47    /// provided it is higher than the current value.
48    /// Inserts the row if it doesn't exist.
49    fn update_responded_at_sequence_id(
50        &self,
51        group_id: &[u8],
52        installation_id: &[u8],
53        sequence_id: i64,
54    ) -> Result<(), crate::ConnectionError>;
55
56    fn delete_other_readd_statuses(
57        &self,
58        group_id: &[u8],
59        self_installation_id: &[u8],
60    ) -> Result<(), crate::ConnectionError>;
61
62    fn delete_readd_statuses(
63        &self,
64        group_id: &[u8],
65        installation_ids: HashSet<Vec<u8>>,
66    ) -> Result<(), crate::ConnectionError>;
67
68    fn get_readds_awaiting_response(
69        &self,
70        group_id: &[u8],
71        self_installation_id: &[u8],
72    ) -> Result<Vec<ReaddStatus>, crate::ConnectionError>;
73}
74
75impl<C: ConnectionExt> QueryReaddStatus for DbConnection<C> {
76    fn get_readd_status(
77        &self,
78        group_id: &[u8],
79        installation_id: &[u8],
80    ) -> Result<Option<ReaddStatus>, crate::ConnectionError> {
81        use super::schema::readd_status::dsl as readd_dsl;
82        use diesel::QueryDsl;
83
84        self.raw_query_read(|conn| {
85            readd_dsl::readd_status
86                .filter(readd_dsl::group_id.eq(group_id))
87                .filter(readd_dsl::installation_id.eq(installation_id))
88                .first::<ReaddStatus>(conn)
89                .optional()
90        })
91    }
92
93    fn is_awaiting_readd(
94        &self,
95        group_id: &[u8],
96        installation_id: &[u8],
97    ) -> Result<bool, crate::ConnectionError> {
98        let readd_status = self.get_readd_status(group_id, installation_id)?;
99        if let Some(readd_status) = readd_status
100            && let Some(requested_at) = readd_status.requested_at_sequence_id
101            && requested_at >= readd_status.responded_at_sequence_id.unwrap_or(0)
102        {
103            return Ok(true);
104        }
105        Ok(false)
106    }
107
108    fn update_requested_at_sequence_id(
109        &self,
110        group_id: &[u8],
111        installation_id: &[u8],
112        sequence_id: i64,
113    ) -> Result<(), crate::ConnectionError> {
114        use super::schema::readd_status::dsl as readd_dsl;
115        use diesel::query_dsl::methods::FilterDsl;
116
117        let new_status = super::readd_status::ReaddStatus {
118            group_id: group_id.to_vec(),
119            installation_id: installation_id.to_vec(),
120            requested_at_sequence_id: Some(sequence_id),
121            responded_at_sequence_id: None,
122        };
123
124        self.raw_query_write(|conn| {
125            diesel::insert_into(readd_dsl::readd_status)
126                .values(&new_status)
127                .on_conflict((readd_dsl::group_id, readd_dsl::installation_id))
128                .do_update()
129                .set(readd_dsl::requested_at_sequence_id.eq(sequence_id))
130                .filter(
131                    readd_dsl::requested_at_sequence_id
132                        .is_null()
133                        .or(readd_dsl::requested_at_sequence_id.lt(sequence_id)),
134                )
135                .execute(conn)
136        })?;
137
138        Ok(())
139    }
140
141    fn update_responded_at_sequence_id(
142        &self,
143        group_id: &[u8],
144        installation_id: &[u8],
145        sequence_id: i64,
146    ) -> Result<(), crate::ConnectionError> {
147        use super::schema::readd_status::dsl as readd_dsl;
148        use diesel::query_dsl::methods::FilterDsl;
149
150        let new_status = super::readd_status::ReaddStatus {
151            group_id: group_id.to_vec(),
152            installation_id: installation_id.to_vec(),
153            requested_at_sequence_id: None,
154            responded_at_sequence_id: Some(sequence_id),
155        };
156
157        self.raw_query_write(|conn| {
158            diesel::insert_into(readd_dsl::readd_status)
159                .values(&new_status)
160                .on_conflict((readd_dsl::group_id, readd_dsl::installation_id))
161                .do_update()
162                .set(readd_dsl::responded_at_sequence_id.eq(sequence_id))
163                .filter(
164                    readd_dsl::responded_at_sequence_id
165                        .is_null()
166                        .or(readd_dsl::responded_at_sequence_id.lt(sequence_id)),
167                )
168                .execute(conn)
169        })?;
170
171        Ok(())
172    }
173
174    fn delete_other_readd_statuses(
175        &self,
176        group_id: &[u8],
177        self_installation_id: &[u8],
178    ) -> Result<(), crate::ConnectionError> {
179        use super::schema::readd_status::dsl as readd_dsl;
180        use diesel::{ExpressionMethods, QueryDsl};
181
182        self.raw_query_write(|conn| {
183            diesel::delete(
184                readd_dsl::readd_status
185                    .filter(readd_dsl::group_id.eq(group_id))
186                    .filter(readd_dsl::installation_id.ne(self_installation_id)),
187            )
188            .execute(conn)?;
189            Ok(())
190        })
191    }
192
193    fn delete_readd_statuses(
194        &self,
195        group_id: &[u8],
196        installation_ids: HashSet<Vec<u8>>,
197    ) -> Result<(), crate::ConnectionError> {
198        use super::schema::readd_status::dsl as readd_dsl;
199        use diesel::{ExpressionMethods, QueryDsl};
200
201        self.raw_query_write(|conn| {
202            diesel::delete(
203                readd_dsl::readd_status
204                    .filter(readd_dsl::group_id.eq(group_id))
205                    .filter(readd_dsl::installation_id.eq_any(installation_ids)),
206            )
207            .execute(conn)?;
208            Ok(())
209        })
210    }
211
212    fn get_readds_awaiting_response(
213        &self,
214        group_id: &[u8],
215        self_installation_id: &[u8],
216    ) -> Result<Vec<ReaddStatus>, crate::ConnectionError> {
217        use super::schema::readd_status::dsl as readd_dsl;
218        use diesel::{ExpressionMethods, QueryDsl};
219
220        self.raw_query_read(|conn| {
221            readd_dsl::readd_status
222                .filter(readd_dsl::group_id.eq(group_id))
223                .filter(readd_dsl::installation_id.ne(self_installation_id))
224                .filter(readd_dsl::requested_at_sequence_id.is_not_null())
225                .filter(
226                    readd_dsl::requested_at_sequence_id
227                        .ge(readd_dsl::responded_at_sequence_id)
228                        .or(readd_dsl::responded_at_sequence_id.is_null()),
229                )
230                .load::<ReaddStatus>(conn)
231        })
232    }
233}
234
235impl<T> QueryReaddStatus for &T
236where
237    T: QueryReaddStatus,
238{
239    fn get_readd_status(
240        &self,
241        group_id: &[u8],
242        installation_id: &[u8],
243    ) -> Result<Option<ReaddStatus>, crate::ConnectionError> {
244        (**self).get_readd_status(group_id, installation_id)
245    }
246
247    fn is_awaiting_readd(
248        &self,
249        group_id: &[u8],
250        installation_id: &[u8],
251    ) -> Result<bool, crate::ConnectionError> {
252        (**self).is_awaiting_readd(group_id, installation_id)
253    }
254
255    fn update_requested_at_sequence_id(
256        &self,
257        group_id: &[u8],
258        installation_id: &[u8],
259        sequence_id: i64,
260    ) -> Result<(), crate::ConnectionError> {
261        (**self).update_requested_at_sequence_id(group_id, installation_id, sequence_id)
262    }
263
264    fn update_responded_at_sequence_id(
265        &self,
266        group_id: &[u8],
267        installation_id: &[u8],
268        sequence_id: i64,
269    ) -> Result<(), crate::ConnectionError> {
270        (**self).update_responded_at_sequence_id(group_id, installation_id, sequence_id)
271    }
272
273    fn delete_other_readd_statuses(
274        &self,
275        group_id: &[u8],
276        self_installation_id: &[u8],
277    ) -> Result<(), crate::ConnectionError> {
278        (**self).delete_other_readd_statuses(group_id, self_installation_id)
279    }
280
281    fn delete_readd_statuses(
282        &self,
283        group_id: &[u8],
284        installation_ids: HashSet<Vec<u8>>,
285    ) -> Result<(), crate::ConnectionError> {
286        (**self).delete_readd_statuses(group_id, installation_ids)
287    }
288
289    fn get_readds_awaiting_response(
290        &self,
291        group_id: &[u8],
292        self_installation_id: &[u8],
293    ) -> Result<Vec<ReaddStatus>, crate::ConnectionError> {
294        (**self).get_readds_awaiting_response(group_id, self_installation_id)
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use crate::{Store, test_utils::with_connection};
302
303    #[xmtp_common::test]
304    fn test_get_readd_status_not_found() {
305        with_connection(|conn| {
306            let group_id = vec![1, 2, 3];
307            let installation_id = vec![4, 5, 6];
308
309            let result = conn.get_readd_status(&group_id, &installation_id).unwrap();
310            assert!(result.is_none());
311        })
312    }
313
314    #[xmtp_common::test]
315    fn test_store_and_get_readd_status() {
316        with_connection(|conn| {
317            let group_id = vec![1, 2, 3];
318            let installation_id = vec![4, 5, 6];
319
320            let status = ReaddStatus {
321                group_id: group_id.clone(),
322                installation_id: installation_id.clone(),
323                requested_at_sequence_id: Some(100),
324                responded_at_sequence_id: Some(50),
325            };
326
327            // Store the status
328            status.store(conn).unwrap();
329
330            // Retrieve it
331            let retrieved = conn.get_readd_status(&group_id, &installation_id).unwrap();
332            assert!(retrieved.is_some());
333            let retrieved_status = retrieved.unwrap();
334            assert_eq!(retrieved_status.requested_at_sequence_id, Some(100));
335            assert_eq!(retrieved_status.responded_at_sequence_id, Some(50));
336        })
337    }
338
339    #[xmtp_common::test]
340    fn test_update_requested_at_sequence_id_creates_new() {
341        with_connection(|conn| {
342            let group_id = vec![1, 2, 3];
343            let installation_id = vec![4, 5, 6];
344            let sequence_id = 100;
345
346            // Update on non-existing record should create it
347            conn.update_requested_at_sequence_id(&group_id, &installation_id, sequence_id)
348                .unwrap();
349
350            // Verify it was created
351            let status = conn.get_readd_status(&group_id, &installation_id).unwrap();
352            assert!(status.is_some());
353            let status = status.unwrap();
354            assert_eq!(status.requested_at_sequence_id, Some(sequence_id));
355            assert_eq!(status.responded_at_sequence_id, None);
356        })
357    }
358
359    #[xmtp_common::test]
360    fn test_update_requested_at_sequence_id_updates_existing() {
361        with_connection(|conn| {
362            let group_id = vec![1, 2, 3];
363            let installation_id = vec![4, 5, 6];
364
365            // Create initial status
366            let initial_status = ReaddStatus {
367                group_id: group_id.clone(),
368                installation_id: installation_id.clone(),
369                requested_at_sequence_id: Some(50),
370                responded_at_sequence_id: Some(25),
371            };
372            initial_status.store(conn).unwrap();
373
374            // Update with higher sequence_id
375            conn.update_requested_at_sequence_id(&group_id, &installation_id, 100)
376                .unwrap();
377
378            // Verify it was updated
379            let status = conn.get_readd_status(&group_id, &installation_id).unwrap();
380            assert!(status.is_some());
381            let status = status.unwrap();
382            assert_eq!(status.requested_at_sequence_id, Some(100));
383            assert_eq!(status.responded_at_sequence_id, Some(25)); // This is preserved by the UPDATE
384        })
385    }
386
387    #[xmtp_common::test]
388    fn test_update_requested_at_sequence_id_only_updates_if_higher() {
389        with_connection(|conn| {
390            let group_id = vec![1, 2, 3];
391            let installation_id = vec![4, 5, 6];
392
393            // Create initial status with high sequence_id
394            let initial_status = ReaddStatus {
395                group_id: group_id.clone(),
396                installation_id: installation_id.clone(),
397                requested_at_sequence_id: Some(100),
398                responded_at_sequence_id: Some(50),
399            };
400            initial_status.store(conn).unwrap();
401
402            // Try to update with lower sequence_id - this should be ignored
403            conn.update_requested_at_sequence_id(&group_id, &installation_id, 75)
404                .unwrap();
405
406            // Verify it was NOT updated (lower sequence_id should be ignored due to filter)
407            let status = conn.get_readd_status(&group_id, &installation_id).unwrap();
408            assert!(status.is_some());
409            let status = status.unwrap();
410            assert_eq!(status.requested_at_sequence_id, Some(100)); // Should remain unchanged
411            assert_eq!(status.responded_at_sequence_id, Some(50)); // Should remain unchanged
412        })
413    }
414
415    #[xmtp_common::test]
416    fn test_update_requested_at_sequence_id_updates_from_null() {
417        with_connection(|conn| {
418            let group_id = vec![1, 2, 3];
419            let installation_id = vec![4, 5, 6];
420
421            // Create initial status with null requested_at_sequence_id
422            let initial_status = ReaddStatus {
423                group_id: group_id.clone(),
424                installation_id: installation_id.clone(),
425                requested_at_sequence_id: None,
426                responded_at_sequence_id: Some(25),
427            };
428            initial_status.store(conn).unwrap();
429
430            // Update with any sequence_id (should work since current is null)
431            conn.update_requested_at_sequence_id(&group_id, &installation_id, 50)
432                .unwrap();
433
434            // Verify it was updated
435            let status = conn.get_readd_status(&group_id, &installation_id).unwrap();
436            assert!(status.is_some());
437            let status = status.unwrap();
438            assert_eq!(status.requested_at_sequence_id, Some(50));
439            assert_eq!(status.responded_at_sequence_id, Some(25)); // This is preserved by the UPDATE
440        })
441    }
442
443    #[xmtp_common::test]
444    async fn test_update_responded_at_sequence_id_creates_new() {
445        with_connection(|conn| {
446            let group_id = vec![1, 2, 3];
447            let installation_id = vec![4, 5, 6];
448            let sequence_id = 100;
449
450            // Update on non-existing record should create it
451            conn.update_responded_at_sequence_id(&group_id, &installation_id, sequence_id)
452                .unwrap();
453
454            // Verify it was created
455            let status = conn.get_readd_status(&group_id, &installation_id).unwrap();
456            assert!(status.is_some());
457            let status = status.unwrap();
458            assert_eq!(status.responded_at_sequence_id, Some(sequence_id));
459            assert_eq!(status.requested_at_sequence_id, None);
460        })
461    }
462
463    #[xmtp_common::test]
464    fn test_update_responded_at_sequence_id_only_updates_if_higher() {
465        with_connection(|conn| {
466            let group_id = vec![1, 2, 3];
467            let installation_id = vec![4, 5, 6];
468
469            // Create initial status with high responded_at_sequence_id
470            let initial_status = ReaddStatus {
471                group_id: group_id.clone(),
472                installation_id: installation_id.clone(),
473                requested_at_sequence_id: Some(50),
474                responded_at_sequence_id: Some(100),
475            };
476            initial_status.store(conn).unwrap();
477
478            // Try to update with lower sequence_id - this should be ignored
479            conn.update_responded_at_sequence_id(&group_id, &installation_id, 75)
480                .unwrap();
481
482            // Verify it was NOT updated (lower sequence_id should be ignored due to filter)
483            let status = conn.get_readd_status(&group_id, &installation_id).unwrap();
484            assert!(status.is_some());
485            let status = status.unwrap();
486            assert_eq!(status.responded_at_sequence_id, Some(100)); // Should remain unchanged
487            assert_eq!(status.requested_at_sequence_id, Some(50)); // Should remain unchanged
488
489            // Now update with a higher sequence_id - this should work
490            conn.update_responded_at_sequence_id(&group_id, &installation_id, 125)
491                .unwrap();
492
493            // Verify it was updated
494            let status = conn.get_readd_status(&group_id, &installation_id).unwrap();
495            assert!(status.is_some());
496            let status = status.unwrap();
497            assert_eq!(status.responded_at_sequence_id, Some(125)); // Should be updated
498            assert_eq!(status.requested_at_sequence_id, Some(50)); // Should remain unchanged
499        })
500    }
501
502    #[xmtp_common::test]
503    fn test_is_awaiting_readd_no_status() {
504        with_connection(|conn| {
505            let group_id = vec![1, 2, 3];
506            let installation_id = vec![4, 5, 6];
507
508            // Should return false when no readd status exists
509            let result = conn.is_awaiting_readd(&group_id, &installation_id).unwrap();
510            assert!(!result);
511        })
512    }
513
514    #[xmtp_common::test]
515    fn test_is_awaiting_readd_no_request() {
516        with_connection(|conn| {
517            let group_id = vec![1, 2, 3];
518            let installation_id = vec![4, 5, 6];
519
520            // Create a readd status without a requested_at_sequence_id
521            ReaddStatus {
522                group_id: group_id.clone(),
523                installation_id: installation_id.clone(),
524                requested_at_sequence_id: None,
525                responded_at_sequence_id: Some(5),
526            }
527            .store(conn)
528            .unwrap();
529
530            // Should return false when no request has been made
531            let result = conn.is_awaiting_readd(&group_id, &installation_id).unwrap();
532            assert!(!result);
533        })
534    }
535
536    #[xmtp_common::test]
537    fn test_is_awaiting_readd_request_pending() {
538        with_connection(|conn| {
539            let group_id = vec![1, 2, 3];
540            let installation_id = vec![4, 5, 6];
541
542            // Create a readd status with requested_at > responded_at
543            ReaddStatus {
544                group_id: group_id.clone(),
545                installation_id: installation_id.clone(),
546                requested_at_sequence_id: Some(10),
547                responded_at_sequence_id: Some(5),
548            }
549            .store(conn)
550            .unwrap();
551
552            // Should return true when request is pending
553            let result = conn.is_awaiting_readd(&group_id, &installation_id).unwrap();
554            assert!(result);
555        })
556    }
557
558    #[xmtp_common::test]
559    fn test_is_awaiting_readd_request_fulfilled() {
560        with_connection(|conn| {
561            let group_id = vec![1, 2, 3];
562            let installation_id = vec![4, 5, 6];
563
564            // Create a readd status with requested_at <= responded_at
565            ReaddStatus {
566                group_id: group_id.clone(),
567                installation_id: installation_id.clone(),
568                requested_at_sequence_id: Some(5),
569                responded_at_sequence_id: Some(10),
570            }
571            .store(conn)
572            .unwrap();
573
574            // Should return false when request has been fulfilled
575            let result = conn.is_awaiting_readd(&group_id, &installation_id).unwrap();
576            assert!(!result);
577        })
578    }
579
580    #[xmtp_common::test]
581    fn test_is_awaiting_readd_equal_sequence_ids() {
582        with_connection(|conn| {
583            let group_id = vec![1, 2, 3];
584            let installation_id = vec![4, 5, 6];
585
586            // Create a readd status with requested_at == responded_at
587            ReaddStatus {
588                group_id: group_id.clone(),
589                installation_id: installation_id.clone(),
590                requested_at_sequence_id: Some(10),
591                responded_at_sequence_id: Some(10),
592            }
593            .store(conn)
594            .unwrap();
595
596            // Should return true when sequence IDs are equal.
597            // The response to a readd request will always add a commit, which increases the sequence ID.
598            // It is possible that a readd request is subsequently issued at the same sequence ID.
599            let result = conn.is_awaiting_readd(&group_id, &installation_id).unwrap();
600            assert!(result);
601        })
602    }
603
604    #[xmtp_common::test]
605    fn test_is_awaiting_readd_no_responded_at() {
606        with_connection(|conn| {
607            let group_id = vec![1, 2, 3];
608            let installation_id = vec![4, 5, 6];
609
610            // Create a readd status with requested_at but no responded_at (defaults to 0)
611            ReaddStatus {
612                group_id: group_id.clone(),
613                installation_id: installation_id.clone(),
614                requested_at_sequence_id: Some(5),
615                responded_at_sequence_id: None,
616            }
617            .store(conn)
618            .unwrap();
619
620            // Should return true when requested_at > 0 (default responded_at)
621            let result = conn.is_awaiting_readd(&group_id, &installation_id).unwrap();
622            assert!(result);
623        })
624    }
625
626    #[xmtp_common::test]
627    fn test_delete_other_readd_statuses() {
628        with_connection(|conn| {
629            let group_id = vec![1, 2, 3];
630            let keep_installation_id = vec![10, 11, 12];
631            let delete_installation_id_1 = vec![20, 21, 22];
632            let delete_installation_id_2 = vec![30, 31, 32];
633
634            // Create readd statuses for the same group with different installation IDs
635            let status_to_keep = ReaddStatus {
636                group_id: group_id.clone(),
637                installation_id: keep_installation_id.clone(),
638                requested_at_sequence_id: Some(10),
639                responded_at_sequence_id: Some(5),
640            };
641            status_to_keep.store(conn).unwrap();
642
643            let status_to_delete_1 = ReaddStatus {
644                group_id: group_id.clone(),
645                installation_id: delete_installation_id_1.clone(),
646                requested_at_sequence_id: Some(15),
647                responded_at_sequence_id: Some(8),
648            };
649            status_to_delete_1.store(conn).unwrap();
650
651            let status_to_delete_2 = ReaddStatus {
652                group_id: group_id.clone(),
653                installation_id: delete_installation_id_2.clone(),
654                requested_at_sequence_id: Some(20),
655                responded_at_sequence_id: None,
656            };
657            status_to_delete_2.store(conn).unwrap();
658
659            // Create a status for a different group (should not be affected)
660            let different_group_status = ReaddStatus {
661                group_id: vec![4, 5, 6],
662                installation_id: vec![40, 41, 42],
663                requested_at_sequence_id: Some(25),
664                responded_at_sequence_id: Some(12),
665            };
666            different_group_status.store(conn).unwrap();
667
668            // Delete other readd statuses for the group
669            conn.delete_other_readd_statuses(&group_id, &keep_installation_id)
670                .unwrap();
671
672            // Verify the status we wanted to keep is still there
673            let kept_status = conn
674                .get_readd_status(&group_id, &keep_installation_id)
675                .unwrap();
676            assert!(kept_status.is_some());
677
678            // Verify the other statuses in the same group were deleted
679            let deleted_status_1 = conn
680                .get_readd_status(&group_id, &delete_installation_id_1)
681                .unwrap();
682            assert!(deleted_status_1.is_none());
683
684            let deleted_status_2 = conn
685                .get_readd_status(&group_id, &delete_installation_id_2)
686                .unwrap();
687            assert!(deleted_status_2.is_none());
688
689            // Verify the status in the different group was not affected
690            let different_group_check = conn.get_readd_status(&[4, 5, 6], &[40, 41, 42]).unwrap();
691            assert!(different_group_check.is_some());
692        })
693    }
694
695    #[xmtp_common::test]
696    fn test_get_readds_awaiting_response() {
697        with_connection(|conn| {
698            let group_id = vec![1, 2, 3];
699            let self_installation_id = vec![10, 11, 12];
700            let other_installation_id_1 = vec![20, 21, 22];
701            let other_installation_id_2 = vec![30, 31, 32];
702
703            // Create readd statuses with various states
704
705            // Case 1: Pending readd from other installation (should be included)
706            let pending_status_1 = ReaddStatus {
707                group_id: group_id.clone(),
708                installation_id: other_installation_id_1.clone(),
709                requested_at_sequence_id: Some(10),
710                responded_at_sequence_id: Some(5),
711            };
712            pending_status_1.store(conn).unwrap();
713
714            // Case 2: Pending readd from other installation with null responded_at (should be included)
715            let pending_status_2 = ReaddStatus {
716                group_id: group_id.clone(),
717                installation_id: other_installation_id_2.clone(),
718                requested_at_sequence_id: Some(15),
719                responded_at_sequence_id: None,
720            };
721            pending_status_2.store(conn).unwrap();
722
723            // Case 3: Not pending readd from other installation (should be excluded)
724            let fulfilled_status = ReaddStatus {
725                group_id: group_id.clone(),
726                installation_id: vec![40, 41, 42],
727                requested_at_sequence_id: Some(8),
728                responded_at_sequence_id: Some(12),
729            };
730            fulfilled_status.store(conn).unwrap();
731
732            // Case 4: Pending readd from self installation (should be excluded)
733            let self_status = ReaddStatus {
734                group_id: group_id.clone(),
735                installation_id: self_installation_id.clone(),
736                requested_at_sequence_id: Some(20),
737                responded_at_sequence_id: Some(10),
738            };
739            self_status.store(conn).unwrap();
740
741            // Case 5: No requested_at_sequence_id (should be excluded)
742            let no_request_status = ReaddStatus {
743                group_id: group_id.clone(),
744                installation_id: vec![50, 51, 52],
745                requested_at_sequence_id: None,
746                responded_at_sequence_id: Some(5),
747            };
748            no_request_status.store(conn).unwrap();
749
750            // Case 6: Different group (should be excluded)
751            let different_group_status = ReaddStatus {
752                group_id: vec![4, 5, 6],
753                installation_id: vec![60, 61, 62],
754                requested_at_sequence_id: Some(25),
755                responded_at_sequence_id: Some(15),
756            };
757            different_group_status.store(conn).unwrap();
758
759            // Call the method under test
760            let result = conn
761                .get_readds_awaiting_response(&group_id, &self_installation_id)
762                .unwrap();
763
764            // Should return 2 pending readds from other installations
765            assert_eq!(result.len(), 2);
766
767            // Verify the correct statuses are returned
768            let returned_installations: Vec<Vec<u8>> =
769                result.iter().map(|r| r.installation_id.clone()).collect();
770            assert!(returned_installations.contains(&other_installation_id_1));
771            assert!(returned_installations.contains(&other_installation_id_2));
772
773            // Verify the details of the returned statuses
774            for status in result {
775                assert_eq!(status.group_id, group_id);
776                assert_ne!(status.installation_id, self_installation_id);
777                assert!(status.requested_at_sequence_id.is_some());
778
779                // Check that the awaiting response condition is met
780                let requested_at = status.requested_at_sequence_id.unwrap();
781                let responded_at = status.responded_at_sequence_id.unwrap_or(0);
782                assert!(requested_at >= responded_at);
783            }
784        })
785    }
786}