xmtp_db/encrypted_store/
group.rs

1//! The Group database table. Stored information surrounding group membership and ID's.
2use super::{
3    ConnectionExt, Sqlite,
4    consent_record::ConsentState,
5    db_connection::DbConnection,
6    schema::groups::{self, dsl},
7};
8use crate::NotFound;
9use crate::{DuplicateItem, StorageError, impl_fetch, impl_store, impl_store_or_ignore};
10use derive_builder::{Builder, UninitializedFieldError};
11use diesel::{
12    backend::Backend,
13    deserialize::{self, FromSql, FromSqlRow},
14    dsl::sql,
15    expression::AsExpression,
16    prelude::*,
17    serialize::{self, IsNull, Output, ToSql},
18    sql_types::Integer,
19};
20use serde::{Deserialize, Serialize};
21mod convert;
22mod dms;
23mod version;
24
25pub use dms::QueryDms;
26pub use version::QueryGroupVersion;
27use xmtp_proto::types::Cursor;
28
29pub type ID = Vec<u8>;
30
31#[derive(
32    Debug,
33    Clone,
34    Serialize,
35    Deserialize,
36    PartialEq,
37    Insertable,
38    Identifiable,
39    Queryable,
40    Builder,
41    Selectable,
42    QueryableByName,
43)]
44#[diesel(table_name = groups)]
45#[diesel(primary_key(id))]
46#[diesel(check_for_backend(Sqlite))]
47#[builder(
48    setter(into),
49    build_fn(error = "StorageError", validate = "Self::validate")
50)]
51#[derive(AsChangeset)]
52/// A Unique group chat
53pub struct StoredGroup {
54    /// Randomly generated ID by group creator
55    pub id: Vec<u8>,
56    /// Based on timestamp of this welcome message
57    pub created_at_ns: i64,
58    /// Enum, [`GroupMembershipState`] representing access to the group
59    pub membership_state: GroupMembershipState,
60    /// Track when the latest, most recent installations were checked
61    #[builder(default = "0")]
62    pub installations_last_checked: i64,
63    /// The inbox_id of who added the user to a group.
64    pub added_by_inbox_id: String,
65    /// The sequence id of the welcome message
66    #[builder(default = None)]
67    pub sequence_id: Option<i64>,
68    /// The last time the leaf node encryption key was rotated
69    #[builder(default = "0")]
70    pub rotated_at_ns: i64,
71    /// Enum, [`ConversationType`] signifies the group conversation type which extends to who can access it.
72    #[builder(default = "self.default_conversation_type()")]
73    pub conversation_type: ConversationType,
74    /// The inbox_id of the DM target
75    #[builder(default = None)]
76    pub dm_id: Option<String>,
77    /// Timestamp of when the last message was sent for this group (updated automatically in a trigger)
78    #[builder(default = None)]
79    pub last_message_ns: Option<i64>,
80    /// The Time in NS when the messages should be deleted
81    #[builder(default = None)]
82    pub message_disappear_from_ns: Option<i64>,
83    /// How long a message in the group can live in NS
84    #[builder(default = None)]
85    pub message_disappear_in_ns: Option<i64>,
86    /// The version of the protocol that the group is paused for, None is not paused
87    #[builder(default = None)]
88    pub paused_for_version: Option<String>,
89    #[builder(default = false)]
90    pub maybe_forked: bool,
91    #[builder(default = "String::new()")]
92    pub fork_details: String,
93    /// The Originator Node ID of the WelcomeMessage
94    #[builder(default = None)]
95    pub originator_id: Option<i64>,
96    /// Whether the user should publish the commit log for this group
97    #[builder(default = false)]
98    pub should_publish_commit_log: bool,
99    /// The consensus public key of the commit log for this group
100    /// Derived from the first entry of the commit log
101    #[builder(default = None)]
102    pub commit_log_public_key: Option<Vec<u8>>,
103    /// Whether the local commit log has diverged from the remote commit log
104    /// NULL if the remote commit log is not up to date yet
105    #[builder(default = None)]
106    pub is_commit_log_forked: Option<bool>,
107    /// Whether the pending-remove list is empty
108    /// NULL if the pending-remove didn't receive an update yet
109    #[builder(default = None)]
110    pub has_pending_leave_request: Option<bool>,
111    //todo: store member role?
112}
113
114impl StoredGroupBuilder {
115    fn validate(&self) -> Result<(), StorageError> {
116        if self.sequence_id.is_some() && self.originator_id.is_none() {
117            return Err(UninitializedFieldError::new("originator_id").into());
118        }
119        if self.originator_id.is_some() && self.sequence_id.is_none() {
120            return Err(UninitializedFieldError::new("sequence_id").into());
121        }
122        Ok(())
123    }
124}
125impl StoredGroup {
126    pub fn cursor(&self) -> Option<Cursor> {
127        // if a group specifies a sequence_id/originator_id, then it must
128        // specify both sequence id and originator
129        // else DB and Builder error
130        if let Some(sequence_id) = self.sequence_id
131            && let Some(originator) = self.originator_id
132        {
133            return Some(Cursor::new(sequence_id as u64, originator as u32));
134        }
135        None
136    }
137}
138
139impl StoredGroupBuilder {
140    pub fn cursor(&mut self, cursor: Cursor) -> &mut Self {
141        self.originator_id = Some(Some(cursor.originator_id as i64));
142        self.sequence_id = Some(Some(cursor.sequence_id as i64));
143        self
144    }
145}
146
147/// A subset of the group table for fetching the commit log public key
148#[derive(Queryable)]
149#[diesel(table_name = groups)]
150pub struct StoredGroupCommitLogPublicKey {
151    pub id: Vec<u8>,
152    pub commit_log_public_key: Option<Vec<u8>>,
153}
154
155/// A struct for fetching groups that need readd requests with their latest epoch
156#[derive(Debug, Clone, Queryable, QueryableByName)]
157pub struct StoredGroupForReaddRequest {
158    #[diesel(sql_type = diesel::sql_types::Binary)]
159    pub group_id: Vec<u8>,
160    #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
161    pub latest_commit_sequence_id: Option<i64>,
162}
163
164/// A struct for fetching groups that need to respond to readd requests
165#[derive(Debug, Clone, Queryable, QueryableByName)]
166pub struct StoredGroupForRespondingReadds {
167    #[diesel(sql_type = diesel::sql_types::Binary)]
168    pub group_id: Vec<u8>,
169    #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
170    pub dm_id: Option<String>,
171    #[diesel(sql_type = diesel::sql_types::Integer)]
172    pub conversation_type: ConversationType,
173    #[diesel(sql_type = diesel::sql_types::BigInt)]
174    pub created_at_ns: i64,
175}
176
177// TODO: Create two more structs that delegate to StoredGroup
178impl_fetch!(StoredGroup, groups, Vec<u8>);
179impl_store!(StoredGroup, groups);
180impl_store_or_ignore!(StoredGroup, groups);
181
182impl StoredGroupBuilder {
183    fn default_conversation_type(&self) -> ConversationType {
184        if self.dm_id.is_some() {
185            ConversationType::Dm
186        } else {
187            ConversationType::Group
188        }
189    }
190}
191
192impl StoredGroup {
193    pub fn builder() -> StoredGroupBuilder {
194        StoredGroupBuilder::default()
195    }
196}
197
198#[derive(Debug, Clone, Default)]
199pub enum GroupQueryOrderBy {
200    #[default]
201    CreatedAt,
202    LastActivity,
203}
204
205#[derive(Debug, Default, Clone)]
206pub struct GroupQueryArgs {
207    pub allowed_states: Option<Vec<GroupMembershipState>>,
208    pub created_after_ns: Option<i64>,
209    pub created_before_ns: Option<i64>,
210    pub last_activity_after_ns: Option<i64>,
211    pub last_activity_before_ns: Option<i64>,
212    pub limit: Option<i64>,
213    pub conversation_type: Option<ConversationType>,
214    pub consent_states: Option<Vec<ConsentState>>,
215    pub include_sync_groups: bool,
216    pub include_duplicate_dms: bool,
217    pub should_publish_commit_log: Option<bool>,
218    pub order_by: Option<GroupQueryOrderBy>,
219}
220
221impl AsRef<GroupQueryArgs> for GroupQueryArgs {
222    fn as_ref(&self) -> &GroupQueryArgs {
223        self
224    }
225}
226
227impl GroupQueryArgs {
228    pub fn validate(&self) -> Result<(), crate::ConnectionError> {
229        if self.last_activity_after_ns.is_some() && self.created_after_ns.is_some() {
230            return Err(crate::ConnectionError::InvalidQuery(
231                "last_activity_after_ns and created_after_ns cannot be used together".to_string(),
232            ));
233        }
234
235        if self.last_activity_before_ns.is_some() && self.created_before_ns.is_some() {
236            return Err(crate::ConnectionError::InvalidQuery(
237                "last_activity_before_ns and created_before_ns cannot be used together".to_string(),
238            ));
239        }
240
241        Ok(())
242    }
243}
244
245pub trait QueryGroup {
246    /// Return regular `Purpose::Conversation` groups with additional optional filters
247    fn find_groups<A: AsRef<GroupQueryArgs>>(
248        &self,
249        args: A,
250    ) -> Result<Vec<StoredGroup>, crate::ConnectionError>;
251
252    fn find_groups_by_id_paged<A: AsRef<GroupQueryArgs>>(
253        &self,
254        args: A,
255        offset: i64,
256    ) -> Result<Vec<StoredGroup>, crate::ConnectionError>;
257
258    /// Updates group membership state
259    fn update_group_membership<GroupId: AsRef<[u8]>>(
260        &self,
261        group_id: GroupId,
262        state: GroupMembershipState,
263    ) -> Result<(), crate::ConnectionError>;
264
265    fn all_sync_groups(&self) -> Result<Vec<StoredGroup>, crate::ConnectionError>;
266
267    fn find_sync_group(&self, id: &[u8]) -> Result<Option<StoredGroup>, crate::ConnectionError>;
268
269    fn primary_sync_group(&self) -> Result<Option<StoredGroup>, crate::ConnectionError>;
270
271    /// Return a single group that matches the given ID
272    fn find_group(&self, id: &[u8]) -> Result<Option<StoredGroup>, crate::ConnectionError>;
273
274    /// Return a single group that matches the given welcome ID
275    fn find_group_by_sequence_id(
276        &self,
277        cursor: Cursor,
278    ) -> Result<Option<StoredGroup>, crate::ConnectionError>;
279
280    fn get_rotated_at_ns(&self, group_id: Vec<u8>) -> Result<i64, StorageError>;
281
282    /// Updates the 'last time checked' we checked for new installations.
283    fn update_rotated_at_ns(&self, group_id: Vec<u8>) -> Result<(), StorageError>;
284
285    fn get_installations_time_checked(&self, group_id: Vec<u8>) -> Result<i64, StorageError>;
286
287    /// Updates the 'last time checked' we checked for new installations.
288    fn update_installations_time_checked(&self, group_id: Vec<u8>) -> Result<(), StorageError>;
289
290    fn update_message_disappearing_from_ns(
291        &self,
292        group_id: Vec<u8>,
293        from_ns: Option<i64>,
294    ) -> Result<(), StorageError>;
295
296    fn update_message_disappearing_in_ns(
297        &self,
298        group_id: Vec<u8>,
299        in_ns: Option<i64>,
300    ) -> Result<(), StorageError>;
301
302    fn insert_or_replace_group(&self, group: StoredGroup) -> Result<StoredGroup, StorageError>;
303
304    /// Get all the welcome ids turned into groups
305    fn group_cursors(&self) -> Result<Vec<Cursor>, crate::ConnectionError>;
306
307    fn mark_group_as_maybe_forked(
308        &self,
309        group_id: &[u8],
310        fork_details: String,
311    ) -> Result<(), StorageError>;
312
313    fn clear_fork_flag_for_group(&self, group_id: &[u8]) -> Result<(), crate::ConnectionError>;
314
315    fn has_duplicate_dm(&self, group_id: &[u8]) -> Result<bool, crate::ConnectionError>;
316
317    /// Get conversations for all conversations that require a remote commit log publish (DMs and groups where user is super admin, excluding sync groups)
318    fn get_conversation_ids_for_remote_log_publish(
319        &self,
320    ) -> Result<Vec<StoredGroupCommitLogPublicKey>, crate::ConnectionError>;
321
322    /// Get conversations for all conversations that require a remote commit log download (DMs and groups that are not sync groups)
323    fn get_conversation_ids_for_remote_log_download(
324        &self,
325    ) -> Result<Vec<StoredGroupCommitLogPublicKey>, crate::ConnectionError>;
326
327    /// Get conversation IDs for fork checking (excludes already forked conversations and sync groups)
328    fn get_conversation_ids_for_fork_check(&self) -> Result<Vec<Vec<u8>>, crate::ConnectionError>;
329
330    /// Get conversation IDs for conversations that are forked and need readd requests
331    fn get_conversation_ids_for_requesting_readds(
332        &self,
333    ) -> Result<Vec<StoredGroupForReaddRequest>, crate::ConnectionError>;
334
335    /// Get conversation IDs for conversations that need to respond to readd requests
336    fn get_conversation_ids_for_responding_readds(
337        &self,
338    ) -> Result<Vec<StoredGroupForRespondingReadds>, crate::ConnectionError>;
339
340    fn get_conversation_type(
341        &self,
342        group_id: &[u8],
343    ) -> Result<ConversationType, crate::ConnectionError>;
344
345    /// Updates the commit log public key for a group
346    fn set_group_commit_log_public_key(
347        &self,
348        group_id: &[u8],
349        public_key: &[u8],
350    ) -> Result<(), StorageError>;
351
352    /// Updates the is_commit_log_forked status for a group
353    fn set_group_commit_log_forked_status(
354        &self,
355        group_id: &[u8],
356        is_forked: Option<bool>,
357    ) -> Result<(), StorageError>;
358
359    /// Gets the is_commit_log_forked status for a group
360    fn get_group_commit_log_forked_status(
361        &self,
362        group_id: &[u8],
363    ) -> Result<Option<bool>, StorageError>;
364
365    /// Updates the has_pending_leave_request status for a group
366    fn set_group_has_pending_leave_request_status(
367        &self,
368        group_id: &[u8],
369        has_pending_leave_request: Option<bool>,
370    ) -> Result<(), StorageError>;
371
372    fn get_groups_have_pending_leave_request(&self)
373    -> Result<Vec<Vec<u8>>, crate::ConnectionError>;
374}
375
376impl<T> QueryGroup for &T
377where
378    T: QueryGroup,
379{
380    /// Return regular `Purpose::Conversation` groups with additional optional filters
381    fn find_groups<A: AsRef<GroupQueryArgs>>(
382        &self,
383        args: A,
384    ) -> Result<Vec<StoredGroup>, crate::ConnectionError> {
385        (**self).find_groups(args)
386    }
387
388    fn find_groups_by_id_paged<A: AsRef<GroupQueryArgs>>(
389        &self,
390        args: A,
391        offset: i64,
392    ) -> Result<Vec<StoredGroup>, crate::ConnectionError> {
393        (**self).find_groups_by_id_paged(args, offset)
394    }
395
396    /// Updates group membership state
397    fn update_group_membership<GroupId: AsRef<[u8]>>(
398        &self,
399        group_id: GroupId,
400        state: GroupMembershipState,
401    ) -> Result<(), crate::ConnectionError> {
402        (**self).update_group_membership(group_id, state)
403    }
404
405    fn all_sync_groups(&self) -> Result<Vec<StoredGroup>, crate::ConnectionError> {
406        (**self).all_sync_groups()
407    }
408
409    fn find_sync_group(&self, id: &[u8]) -> Result<Option<StoredGroup>, crate::ConnectionError> {
410        (**self).find_sync_group(id)
411    }
412
413    fn primary_sync_group(&self) -> Result<Option<StoredGroup>, crate::ConnectionError> {
414        (**self).primary_sync_group()
415    }
416
417    /// Return a single group that matches the given ID
418    fn find_group(&self, id: &[u8]) -> Result<Option<StoredGroup>, crate::ConnectionError> {
419        (**self).find_group(id)
420    }
421
422    /// Return a single group that matches the given welcome ID
423    fn find_group_by_sequence_id(
424        &self,
425        cursor: Cursor,
426    ) -> Result<Option<StoredGroup>, crate::ConnectionError> {
427        (**self).find_group_by_sequence_id(cursor)
428    }
429
430    fn get_rotated_at_ns(&self, group_id: Vec<u8>) -> Result<i64, StorageError> {
431        (**self).get_rotated_at_ns(group_id)
432    }
433
434    /// Updates the 'last time checked' we checked for new installations.
435    fn update_rotated_at_ns(&self, group_id: Vec<u8>) -> Result<(), StorageError> {
436        (**self).update_rotated_at_ns(group_id)
437    }
438
439    fn get_installations_time_checked(&self, group_id: Vec<u8>) -> Result<i64, StorageError> {
440        (**self).get_installations_time_checked(group_id)
441    }
442
443    /// Updates the 'last time checked' we checked for new installations.
444    fn update_installations_time_checked(&self, group_id: Vec<u8>) -> Result<(), StorageError> {
445        (**self).update_installations_time_checked(group_id)
446    }
447
448    fn update_message_disappearing_from_ns(
449        &self,
450        group_id: Vec<u8>,
451        from_ns: Option<i64>,
452    ) -> Result<(), StorageError> {
453        (**self).update_message_disappearing_from_ns(group_id, from_ns)
454    }
455
456    fn update_message_disappearing_in_ns(
457        &self,
458        group_id: Vec<u8>,
459        in_ns: Option<i64>,
460    ) -> Result<(), StorageError> {
461        (**self).update_message_disappearing_in_ns(group_id, in_ns)
462    }
463
464    fn insert_or_replace_group(&self, group: StoredGroup) -> Result<StoredGroup, StorageError> {
465        (**self).insert_or_replace_group(group)
466    }
467
468    /// Get all the welcome ids turned into groups
469    fn group_cursors(&self) -> Result<Vec<Cursor>, crate::ConnectionError> {
470        (**self).group_cursors()
471    }
472
473    fn mark_group_as_maybe_forked(
474        &self,
475        group_id: &[u8],
476        fork_details: String,
477    ) -> Result<(), StorageError> {
478        (**self).mark_group_as_maybe_forked(group_id, fork_details)
479    }
480
481    fn clear_fork_flag_for_group(&self, group_id: &[u8]) -> Result<(), crate::ConnectionError> {
482        (**self).clear_fork_flag_for_group(group_id)
483    }
484
485    fn has_duplicate_dm(&self, group_id: &[u8]) -> Result<bool, crate::ConnectionError> {
486        (**self).has_duplicate_dm(group_id)
487    }
488
489    /// Get conversation IDs for all conversations that require a remote commit log publish (DMs and groups where user is super admin, excluding sync groups)
490    fn get_conversation_ids_for_remote_log_publish(
491        &self,
492    ) -> Result<Vec<StoredGroupCommitLogPublicKey>, crate::ConnectionError> {
493        (**self).get_conversation_ids_for_remote_log_publish()
494    }
495
496    fn get_conversation_ids_for_remote_log_download(
497        &self,
498    ) -> Result<Vec<StoredGroupCommitLogPublicKey>, crate::ConnectionError> {
499        (**self).get_conversation_ids_for_remote_log_download()
500    }
501
502    fn get_conversation_ids_for_fork_check(&self) -> Result<Vec<Vec<u8>>, crate::ConnectionError> {
503        (**self).get_conversation_ids_for_fork_check()
504    }
505
506    fn get_conversation_ids_for_requesting_readds(
507        &self,
508    ) -> Result<Vec<StoredGroupForReaddRequest>, crate::ConnectionError> {
509        (**self).get_conversation_ids_for_requesting_readds()
510    }
511
512    fn get_conversation_ids_for_responding_readds(
513        &self,
514    ) -> Result<Vec<StoredGroupForRespondingReadds>, crate::ConnectionError> {
515        (**self).get_conversation_ids_for_responding_readds()
516    }
517
518    fn get_conversation_type(
519        &self,
520        group_id: &[u8],
521    ) -> Result<ConversationType, crate::ConnectionError> {
522        (**self).get_conversation_type(group_id)
523    }
524
525    fn set_group_commit_log_public_key(
526        &self,
527        group_id: &[u8],
528        public_key: &[u8],
529    ) -> Result<(), StorageError> {
530        (**self).set_group_commit_log_public_key(group_id, public_key)
531    }
532
533    fn set_group_commit_log_forked_status(
534        &self,
535        group_id: &[u8],
536        is_forked: Option<bool>,
537    ) -> Result<(), StorageError> {
538        (**self).set_group_commit_log_forked_status(group_id, is_forked)
539    }
540
541    fn get_group_commit_log_forked_status(
542        &self,
543        group_id: &[u8],
544    ) -> Result<Option<bool>, StorageError> {
545        (**self).get_group_commit_log_forked_status(group_id)
546    }
547
548    fn set_group_has_pending_leave_request_status(
549        &self,
550        group_id: &[u8],
551        has_pending_leave_request: Option<bool>,
552    ) -> Result<(), StorageError> {
553        (**self).set_group_has_pending_leave_request_status(group_id, has_pending_leave_request)
554    }
555
556    fn get_groups_have_pending_leave_request(
557        &self,
558    ) -> Result<Vec<Vec<u8>>, crate::ConnectionError> {
559        (**self).get_groups_have_pending_leave_request()
560    }
561}
562
563impl<C: ConnectionExt> QueryGroup for DbConnection<C> {
564    /// Return regular `Purpose::Conversation` groups with additional optional filters
565    fn find_groups<A: AsRef<GroupQueryArgs>>(
566        &self,
567        args: A,
568    ) -> Result<Vec<StoredGroup>, crate::ConnectionError> {
569        use crate::schema::consent_records::dsl as consent_dsl;
570
571        args.as_ref().validate()?;
572
573        let GroupQueryArgs {
574            allowed_states,
575            created_after_ns,
576            created_before_ns,
577            limit,
578            conversation_type,
579            consent_states,
580            include_sync_groups,
581            include_duplicate_dms,
582            last_activity_after_ns,
583            last_activity_before_ns,
584            should_publish_commit_log,
585            order_by,
586        } = args.as_ref();
587
588        let order_expression = match order_by.clone().unwrap_or_default() {
589            GroupQueryOrderBy::CreatedAt => {
590                diesel::dsl::sql::<diesel::sql_types::BigInt>("created_at_ns ASC")
591            }
592            GroupQueryOrderBy::LastActivity => diesel::dsl::sql::<diesel::sql_types::BigInt>(
593                "COALESCE(last_message_ns, created_at_ns) DESC",
594            ),
595        };
596
597        let mut query = dsl::groups
598            .filter(dsl::conversation_type.ne_all(ConversationType::virtual_types()))
599            .order(order_expression)
600            .into_boxed();
601
602        if !include_duplicate_dms {
603            // Fast DM deduplication using EXISTS - avoids expensive window functions
604            // Keep only the latest group for each dm_id (or regular group if not a DM)
605            query = query.filter(sql::<diesel::sql_types::Bool>(
606                "NOT EXISTS (
607                    SELECT 1 FROM groups g2
608                    WHERE COALESCE(g2.dm_id, g2.id) = COALESCE(groups.dm_id, groups.id)
609                    AND (COALESCE(g2.last_message_ns, 0), g2.id) > (COALESCE(groups.last_message_ns, 0), groups.id)
610                )",
611            ));
612        }
613
614        if let Some(limit) = limit {
615            query = query.limit(*limit);
616        }
617
618        if let Some(allowed_states) = allowed_states {
619            query = query.filter(dsl::membership_state.eq_any(allowed_states));
620        }
621
622        // last_activity_after_ns takes precedence over created_after_ns
623        if let Some(last_activity_after_ns) = last_activity_after_ns {
624            // "Activity after" means groups that were either created,
625            // or have sent a message after the specified time.
626            query = query.filter(
627                diesel::dsl::sql::<diesel::sql_types::BigInt>(
628                    "COALESCE(last_message_ns, created_at_ns)",
629                )
630                .gt(last_activity_after_ns),
631            );
632        }
633
634        if let Some(created_after_ns) = created_after_ns {
635            query = query.filter(dsl::created_at_ns.gt(created_after_ns));
636        }
637
638        if let Some(last_activity_before_ns) = last_activity_before_ns {
639            query = query.filter(
640                diesel::dsl::sql::<diesel::sql_types::BigInt>(
641                    "COALESCE(last_message_ns, created_at_ns)",
642                )
643                .lt(last_activity_before_ns),
644            );
645        }
646
647        if let Some(created_before_ns) = created_before_ns {
648            query = query.filter(dsl::created_at_ns.lt(created_before_ns));
649        }
650
651        if let Some(conversation_type) = conversation_type {
652            query = query.filter(dsl::conversation_type.eq(conversation_type));
653        }
654
655        let effective_consent_states = match &consent_states {
656            Some(states) if !states.is_empty() => states.clone(),
657            _ => vec![ConsentState::Allowed, ConsentState::Unknown],
658        };
659
660        let includes_unknown = effective_consent_states.contains(&ConsentState::Unknown);
661        let includes_all = effective_consent_states.len() == 3;
662
663        if let Some(should_publish_commit_log) = should_publish_commit_log {
664            query = query.filter(dsl::should_publish_commit_log.eq(should_publish_commit_log));
665        }
666
667        let filtered_states: Vec<_> = effective_consent_states
668            .iter()
669            .filter(|state| **state != ConsentState::Unknown)
670            .cloned()
671            .collect();
672
673        let mut groups = if includes_all {
674            // No filtering at all
675            self.raw_query_read(|conn| query.load::<StoredGroup>(conn))?
676        } else if includes_unknown {
677            // LEFT JOIN: include Unknown + NULL + filtered states
678            let left_joined_query = query
679                .left_join(consent_dsl::consent_records.on(
680                    sql::<diesel::sql_types::Text>("lower(hex(groups.id))").eq(consent_dsl::entity),
681                ))
682                .filter(
683                    consent_dsl::state
684                        .is_null()
685                        .or(consent_dsl::state.eq(ConsentState::Unknown))
686                        .or(consent_dsl::state.eq_any(filtered_states.clone())),
687                )
688                .select(dsl::groups::all_columns());
689
690            self.raw_query_read(|conn| left_joined_query.load::<StoredGroup>(conn))?
691        } else {
692            // INNER JOIN: strict match only to specific states (no Unknown or NULL)
693            let inner_joined_query = query
694                .inner_join(consent_dsl::consent_records.on(
695                    sql::<diesel::sql_types::Text>("lower(hex(groups.id))").eq(consent_dsl::entity),
696                ))
697                .filter(consent_dsl::state.eq_any(filtered_states.clone()))
698                .select(dsl::groups::all_columns());
699
700            self.raw_query_read(|conn| inner_joined_query.load::<StoredGroup>(conn))?
701        };
702
703        // Were sync groups explicitly asked for? Was the include_sync_groups flag set to true?
704        // Then query for those separately
705        if matches!(conversation_type, Some(ConversationType::Sync)) || *include_sync_groups {
706            let query = dsl::groups.filter(dsl::conversation_type.eq(ConversationType::Sync));
707            let mut sync_groups = self.raw_query_read(|conn| query.load(conn))?;
708            groups.append(&mut sync_groups);
709        }
710
711        Ok(groups)
712    }
713
714    fn find_groups_by_id_paged<A: AsRef<GroupQueryArgs>>(
715        &self,
716        args: A,
717        offset: i64,
718    ) -> Result<Vec<StoredGroup>, crate::ConnectionError> {
719        let GroupQueryArgs {
720            created_after_ns,
721            created_before_ns,
722            limit,
723            ..
724        } = args.as_ref();
725
726        let mut query = groups::table
727            .filter(groups::conversation_type.ne_all(ConversationType::virtual_types()))
728            .order(groups::id)
729            .into_boxed();
730
731        if let Some(start_ns) = created_after_ns {
732            query = query.filter(groups::created_at_ns.gt(start_ns));
733        }
734        if let Some(end_ns) = created_before_ns {
735            query = query.filter(groups::created_at_ns.le(end_ns));
736        }
737
738        query = query.limit(limit.unwrap_or(100)).offset(offset);
739
740        self.raw_query_read(|conn| query.load::<StoredGroup>(conn))
741    }
742
743    /// Updates group membership state
744    fn update_group_membership<GroupId: AsRef<[u8]>>(
745        &self,
746        group_id: GroupId,
747        state: GroupMembershipState,
748    ) -> Result<(), crate::ConnectionError> {
749        self.raw_query_write(|conn| {
750            diesel::update(dsl::groups.find(group_id.as_ref()))
751                .set(dsl::membership_state.eq(state))
752                .execute(conn)
753        })?;
754
755        Ok(())
756    }
757
758    fn all_sync_groups(&self) -> Result<Vec<StoredGroup>, crate::ConnectionError> {
759        let query = dsl::groups
760            .order(dsl::created_at_ns.desc())
761            .filter(dsl::conversation_type.eq(ConversationType::Sync));
762
763        self.raw_query_read(|conn| query.load(conn))
764    }
765
766    fn find_sync_group(&self, id: &[u8]) -> Result<Option<StoredGroup>, crate::ConnectionError> {
767        let query = dsl::groups
768            .filter(dsl::conversation_type.eq(ConversationType::Sync))
769            .filter(dsl::id.eq(id));
770
771        self.raw_query_read(|conn| query.first(conn).optional())
772    }
773
774    fn primary_sync_group(&self) -> Result<Option<StoredGroup>, crate::ConnectionError> {
775        let query = dsl::groups
776            .order(dsl::created_at_ns.desc())
777            .filter(dsl::conversation_type.eq(ConversationType::Sync));
778
779        self.raw_query_read(|conn| query.first(conn).optional())
780    }
781
782    /// Return a single group that matches the given ID
783    fn find_group(&self, id: &[u8]) -> Result<Option<StoredGroup>, crate::ConnectionError> {
784        let query = dsl::groups
785            .order(dsl::created_at_ns.asc())
786            .limit(1)
787            .filter(dsl::id.eq(id));
788        let groups = self.raw_query_read(|conn| query.load(conn))?;
789
790        Ok(groups.into_iter().next())
791    }
792
793    /// Return a single group that matches the given welcome ID
794    fn find_group_by_sequence_id(
795        &self,
796        cursor: Cursor,
797    ) -> Result<Option<StoredGroup>, crate::ConnectionError> {
798        let query = dsl::groups
799            .order(dsl::created_at_ns.asc())
800            .filter(dsl::sequence_id.eq(cursor.sequence_id as i64))
801            .filter(dsl::originator_id.eq(cursor.originator_id as i64));
802
803        let groups = self.raw_query_read(|conn| query.load(conn))?;
804
805        if groups.len() > 1 {
806            tracing::warn!(
807                cursor.sequence_id,
808                "More than one group found for welcome_id {}",
809                cursor.sequence_id
810            );
811        }
812        Ok(groups.into_iter().next())
813    }
814
815    fn get_rotated_at_ns(&self, group_id: Vec<u8>) -> Result<i64, StorageError> {
816        let last_ts: Option<i64> = self.raw_query_read(|conn| {
817            dsl::groups
818                .find(&group_id)
819                .select(dsl::rotated_at_ns)
820                .first(conn)
821                .optional()
822        })?;
823
824        last_ts.ok_or(StorageError::NotFound(NotFound::InstallationTimeForGroup(
825            group_id,
826        )))
827    }
828
829    /// Updates the 'last time checked' we checked for new installations.
830    fn update_rotated_at_ns(&self, group_id: Vec<u8>) -> Result<(), StorageError> {
831        self.raw_query_write(|conn| {
832            let now = xmtp_common::time::now_ns();
833            diesel::update(dsl::groups.find(&group_id))
834                .set(dsl::rotated_at_ns.eq(now))
835                .execute(conn)
836        })?;
837
838        Ok(())
839    }
840
841    fn get_installations_time_checked(&self, group_id: Vec<u8>) -> Result<i64, StorageError> {
842        let last_ts = self.raw_query_read(|conn| {
843            dsl::groups
844                .find(&group_id)
845                .select(dsl::installations_last_checked)
846                .first(conn)
847                .optional()
848        })?;
849
850        last_ts.ok_or(NotFound::InstallationTimeForGroup(group_id).into())
851    }
852
853    /// Updates the 'last time checked' we checked for new installations.
854    fn update_installations_time_checked(&self, group_id: Vec<u8>) -> Result<(), StorageError> {
855        self.raw_query_write(|conn| {
856            let now = xmtp_common::time::now_ns();
857            diesel::update(dsl::groups.find(&group_id))
858                .set(dsl::installations_last_checked.eq(now))
859                .execute(conn)
860        })?;
861
862        Ok(())
863    }
864
865    fn update_message_disappearing_from_ns(
866        &self,
867        group_id: Vec<u8>,
868        from_ns: Option<i64>,
869    ) -> Result<(), StorageError> {
870        self.raw_query_write(|conn| {
871            diesel::update(dsl::groups.find(&group_id))
872                .set(dsl::message_disappear_from_ns.eq(from_ns))
873                .execute(conn)
874        })?;
875
876        Ok(())
877    }
878
879    fn update_message_disappearing_in_ns(
880        &self,
881        group_id: Vec<u8>,
882        in_ns: Option<i64>,
883    ) -> Result<(), StorageError> {
884        self.raw_query_write(|conn| {
885            diesel::update(dsl::groups.find(&group_id))
886                .set(dsl::message_disappear_in_ns.eq(in_ns))
887                .execute(conn)
888        })?;
889
890        Ok(())
891    }
892
893    fn insert_or_replace_group(&self, group: StoredGroup) -> Result<StoredGroup, StorageError> {
894        let maybe_inserted_group: Option<StoredGroup> = self.raw_query_write(|conn| {
895            diesel::insert_into(dsl::groups)
896                .values(&group)
897                .on_conflict_do_nothing()
898                .get_result(conn)
899                .optional()
900        })?;
901
902        if maybe_inserted_group.is_none() {
903            let mut existing_group: StoredGroup =
904                self.raw_query_read(|conn| dsl::groups.find(&group.id).first(conn))?;
905            // A restored group should be overwritten
906            if matches!(
907                existing_group.membership_state,
908                GroupMembershipState::Restored
909            ) {
910                self.raw_query_write(|c| {
911                    diesel::update(dsl::groups.find(&group.id))
912                        .set(&group)
913                        .execute(c)
914                })?;
915            }
916
917            if existing_group.sequence_id == group.sequence_id {
918                tracing::info!("Group welcome id already exists");
919                // Error so OpenMLS db transaction are rolled back on duplicate welcomes
920                Err(StorageError::Duplicate(DuplicateItem::WelcomeId(
921                    existing_group.cursor(),
922                )))
923            } else {
924                tracing::info!("Group already exists");
925                // If the welcome id is greater than the existing group welcome, update the welcome id
926                // on the existing group
927                if group.sequence_id.is_some()
928                    && (existing_group.sequence_id.is_none()
929                        || group.sequence_id > existing_group.sequence_id)
930                {
931                    self.raw_query_write(|c| {
932                        diesel::update(dsl::groups.find(&group.id))
933                            .set(dsl::sequence_id.eq(group.sequence_id))
934                            .execute(c)
935                    })?;
936                    existing_group.sequence_id = group.sequence_id;
937                }
938                Ok(existing_group)
939            }
940        } else {
941            Ok(self.raw_query_read(|c| dsl::groups.find(group.id).first(c))?)
942        }
943    }
944
945    /// Get all the welcome ids turned into groups
946    fn group_cursors(&self) -> Result<Vec<Cursor>, crate::ConnectionError> {
947        self.raw_query_read(|conn| {
948            Ok(dsl::groups
949                .filter(dsl::sequence_id.is_not_null())
950                .select((dsl::sequence_id, dsl::originator_id))
951                .load::<(Option<i64>, Option<i64>)>(conn)?
952                .into_iter()
953                .map(|(seq, orig)| {
954                    Cursor::new(
955                        seq.expect("Filtered for not null") as u64,
956                        orig.expect("if seq is not null, originator must not be null") as u32,
957                    )
958                })
959                .collect())
960        })
961    }
962
963    fn mark_group_as_maybe_forked(
964        &self,
965        group_id: &[u8],
966        fork_details: String,
967    ) -> Result<(), StorageError> {
968        self.raw_query_write(|conn| {
969            diesel::update(dsl::groups.find(&group_id))
970                .set((
971                    dsl::maybe_forked.eq(true),
972                    dsl::fork_details.eq(fork_details),
973                ))
974                .execute(conn)
975        })?;
976
977        Ok(())
978    }
979
980    fn clear_fork_flag_for_group(&self, group_id: &[u8]) -> Result<(), crate::ConnectionError> {
981        self.raw_query_write(|conn| {
982            diesel::update(dsl::groups.find(&group_id))
983                .set((dsl::maybe_forked.eq(false), dsl::fork_details.eq("")))
984                .execute(conn)
985        })?;
986        Ok(())
987    }
988
989    fn has_duplicate_dm(&self, group_id: &[u8]) -> Result<bool, crate::ConnectionError> {
990        self.raw_query_read(|conn| {
991            let dm_id: Option<String> = dsl::groups
992                .filter(dsl::id.eq(group_id))
993                .select(dsl::dm_id)
994                .first::<Option<String>>(conn)
995                .optional()?
996                .flatten();
997
998            if let Some(dm_id) = dm_id {
999                let count: i64 = dsl::groups
1000                    .filter(dsl::conversation_type.eq(ConversationType::Dm))
1001                    .filter(dsl::dm_id.eq(dm_id))
1002                    .count()
1003                    .get_result(conn)?;
1004
1005                Ok(count > 1)
1006            } else {
1007                Ok(false)
1008            }
1009        })
1010    }
1011
1012    /// Get conversation IDs for all conversations that require a remote commit log publish
1013    /// (DMs and groups where user is super admin, excluding sync groups and rejected groups)
1014    fn get_conversation_ids_for_remote_log_publish(
1015        &self,
1016    ) -> Result<Vec<StoredGroupCommitLogPublicKey>, crate::ConnectionError> {
1017        use crate::schema::consent_records::dsl as consent_dsl;
1018
1019        let query = dsl::groups
1020            .filter(
1021                dsl::conversation_type
1022                    .eq(ConversationType::Dm)
1023                    .or(dsl::conversation_type
1024                        .eq(ConversationType::Group)
1025                        .and(dsl::should_publish_commit_log.eq(true))),
1026            )
1027            .inner_join(consent_dsl::consent_records.on(
1028                sql::<diesel::sql_types::Text>("lower(hex(groups.id))").eq(consent_dsl::entity),
1029            ))
1030            .filter(consent_dsl::state.eq(ConsentState::Allowed))
1031            .select((dsl::id, dsl::commit_log_public_key))
1032            .order(dsl::created_at_ns.asc());
1033
1034        self.raw_query_read(|conn| query.load::<StoredGroupCommitLogPublicKey>(conn))
1035    }
1036
1037    // All dms and groups that are not sync groups and have consent state Allowed
1038    fn get_conversation_ids_for_remote_log_download(
1039        &self,
1040    ) -> Result<Vec<StoredGroupCommitLogPublicKey>, crate::ConnectionError> {
1041        use crate::schema::consent_records::dsl as consent_dsl;
1042
1043        let query = dsl::groups
1044            .filter(dsl::conversation_type.ne_all(ConversationType::virtual_types()))
1045            .inner_join(consent_dsl::consent_records.on(
1046                sql::<diesel::sql_types::Text>("lower(hex(groups.id))").eq(consent_dsl::entity),
1047            ))
1048            .filter(consent_dsl::state.eq(ConsentState::Allowed))
1049            .select((dsl::id, dsl::commit_log_public_key));
1050
1051        self.raw_query_read(|conn| query.load::<StoredGroupCommitLogPublicKey>(conn))
1052    }
1053
1054    // Get conversation IDs for fork checking (excludes already forked conversations and sync groups)
1055    fn get_conversation_ids_for_fork_check(&self) -> Result<Vec<Vec<u8>>, crate::ConnectionError> {
1056        let query = dsl::groups
1057            .filter(
1058                dsl::conversation_type
1059                    .ne_all(ConversationType::virtual_types())
1060                    .and(
1061                        dsl::is_commit_log_forked
1062                            .is_null()
1063                            .or(dsl::is_commit_log_forked.ne(Some(true))),
1064                    ),
1065            )
1066            .select(dsl::id);
1067
1068        self.raw_query_read(|conn| query.load::<Vec<u8>>(conn))
1069    }
1070
1071    fn get_conversation_ids_for_requesting_readds(
1072        &self,
1073    ) -> Result<Vec<StoredGroupForReaddRequest>, crate::ConnectionError> {
1074        use super::schema::{groups::dsl as groups_dsl, remote_commit_log::dsl as rcl_dsl};
1075        use diesel::dsl::max;
1076
1077        self.raw_query_read(|conn| {
1078            groups_dsl::groups
1079                .left_join(rcl_dsl::remote_commit_log.on(groups_dsl::id.eq(rcl_dsl::group_id)))
1080                .filter(
1081                    groups_dsl::conversation_type
1082                        .ne_all(ConversationType::virtual_types())
1083                        .and(groups_dsl::is_commit_log_forked.eq(true)),
1084                )
1085                .group_by(groups_dsl::id)
1086                .select((groups_dsl::id, max(rcl_dsl::commit_sequence_id).nullable()))
1087                .load::<StoredGroupForReaddRequest>(conn)
1088        })
1089    }
1090
1091    fn get_conversation_ids_for_responding_readds(
1092        &self,
1093    ) -> Result<Vec<StoredGroupForRespondingReadds>, crate::ConnectionError> {
1094        use super::schema::{groups::dsl as groups_dsl, readd_status::dsl as readd_dsl};
1095        use diesel::{ExpressionMethods, JoinOnDsl, QueryDsl};
1096
1097        self.raw_query_read(|conn| {
1098            readd_dsl::readd_status
1099                .inner_join(groups_dsl::groups.on(readd_dsl::group_id.eq(groups_dsl::id)))
1100                .filter(readd_dsl::requested_at_sequence_id.is_not_null())
1101                .filter(
1102                    readd_dsl::requested_at_sequence_id
1103                        .ge(readd_dsl::responded_at_sequence_id)
1104                        .or(readd_dsl::responded_at_sequence_id.is_null()),
1105                )
1106                .select((
1107                    groups_dsl::id,
1108                    groups_dsl::dm_id,
1109                    groups_dsl::conversation_type,
1110                    groups_dsl::created_at_ns,
1111                ))
1112                .distinct()
1113                .load::<StoredGroupForRespondingReadds>(conn)
1114        })
1115    }
1116
1117    fn get_conversation_type(
1118        &self,
1119        group_id: &[u8],
1120    ) -> Result<ConversationType, crate::ConnectionError> {
1121        let query = dsl::groups
1122            .filter(dsl::id.eq(group_id))
1123            .select(dsl::conversation_type);
1124        let conversation_type = self.raw_query_read(|conn| query.first(conn))?;
1125        Ok(conversation_type)
1126    }
1127
1128    fn set_group_commit_log_public_key(
1129        &self,
1130        group_id: &[u8],
1131        public_key: &[u8],
1132    ) -> Result<(), StorageError> {
1133        use crate::schema::groups::dsl;
1134        let num_updated = self.raw_query_write(|conn| {
1135            diesel::update(dsl::groups)
1136                .filter(
1137                    dsl::id
1138                        .eq(group_id)
1139                        .and(dsl::commit_log_public_key.is_null()),
1140                )
1141                .set(dsl::commit_log_public_key.eq(public_key))
1142                .execute(conn)
1143        })?;
1144        if num_updated == 0 {
1145            return Err(StorageError::Duplicate(DuplicateItem::CommitLogPublicKey(
1146                group_id.to_vec(),
1147            )));
1148        }
1149        Ok(())
1150    }
1151
1152    fn set_group_commit_log_forked_status(
1153        &self,
1154        group_id: &[u8],
1155        is_forked: Option<bool>,
1156    ) -> Result<(), StorageError> {
1157        use crate::schema::groups::dsl;
1158        self.raw_query_write(|conn| {
1159            diesel::update(dsl::groups.find(group_id))
1160                .set(dsl::is_commit_log_forked.eq(is_forked))
1161                .execute(conn)
1162        })?;
1163        Ok(())
1164    }
1165
1166    fn get_group_commit_log_forked_status(
1167        &self,
1168        group_id: &[u8],
1169    ) -> Result<Option<bool>, StorageError> {
1170        use crate::schema::groups::dsl;
1171        self.raw_query_read(|conn| {
1172            dsl::groups
1173                .find(group_id)
1174                .select(dsl::is_commit_log_forked)
1175                .first::<Option<bool>>(conn)
1176        })
1177        .map_err(StorageError::from)
1178    }
1179
1180    fn set_group_has_pending_leave_request_status(
1181        &self,
1182        group_id: &[u8],
1183        has_pending_leave_request: Option<bool>,
1184    ) -> Result<(), StorageError> {
1185        use crate::schema::groups::dsl;
1186        self.raw_query_write(|conn| {
1187            diesel::update(dsl::groups.find(group_id))
1188                .set(dsl::has_pending_leave_request.eq(has_pending_leave_request))
1189                .execute(conn)
1190        })?;
1191        Ok(())
1192    }
1193
1194    fn get_groups_have_pending_leave_request(
1195        &self,
1196    ) -> Result<Vec<Vec<u8>>, crate::ConnectionError> {
1197        let query = dsl::groups
1198            .filter(
1199                dsl::conversation_type
1200                    .ne(ConversationType::Sync)
1201                    .and(dsl::has_pending_leave_request.eq(Some(true))),
1202            )
1203            .select(dsl::id);
1204
1205        self.raw_query_read(|conn| query.load::<Vec<u8>>(conn))
1206    }
1207}
1208
1209#[repr(i32)]
1210#[derive(Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq, AsExpression, FromSqlRow)]
1211#[diesel(sql_type = Integer)]
1212/// Status of membership in a group, once a user sends a request to join
1213pub enum GroupMembershipState {
1214    /// User is allowed to interact with this Group
1215    Allowed = 1,
1216    /// User has been Rejected from this Group
1217    Rejected = 2,
1218    /// User is Pending acceptance to the Group
1219    Pending = 3,
1220    /// Group has been restored from an archive, but is not active yet.
1221    Restored = 4,
1222    /// User is Pending to get removed of the Group
1223    PendingRemove = 5,
1224}
1225
1226impl ToSql<Integer, Sqlite> for GroupMembershipState
1227where
1228    i32: ToSql<Integer, Sqlite>,
1229{
1230    fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> serialize::Result {
1231        out.set_value(*self as i32);
1232        Ok(IsNull::No)
1233    }
1234}
1235
1236impl FromSql<Integer, Sqlite> for GroupMembershipState
1237where
1238    i32: FromSql<Integer, Sqlite>,
1239{
1240    fn from_sql(bytes: <Sqlite as Backend>::RawValue<'_>) -> deserialize::Result<Self> {
1241        match i32::from_sql(bytes)? {
1242            1 => Ok(GroupMembershipState::Allowed),
1243            2 => Ok(GroupMembershipState::Rejected),
1244            3 => Ok(GroupMembershipState::Pending),
1245            4 => Ok(GroupMembershipState::Restored),
1246            5 => Ok(GroupMembershipState::PendingRemove),
1247            x => Err(format!("Unrecognized variant {}", x).into()),
1248        }
1249    }
1250}
1251
1252#[repr(i32)]
1253#[derive(Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq, AsExpression, FromSqlRow)]
1254#[diesel(sql_type = Integer)]
1255pub enum ConversationType {
1256    Group = 1,
1257    Dm = 2,
1258    Sync = 3,
1259    Oneshot = 4,
1260}
1261
1262impl ConversationType {
1263    pub fn virtual_types() -> Vec<ConversationType> {
1264        vec![ConversationType::Sync, ConversationType::Oneshot]
1265    }
1266
1267    pub fn is_virtual(&self) -> bool {
1268        // Use match to force exhaustive pattern matching
1269        match self {
1270            ConversationType::Group => false,
1271            ConversationType::Dm => false,
1272            ConversationType::Sync => true,
1273            ConversationType::Oneshot => true,
1274        }
1275    }
1276}
1277
1278impl ToSql<Integer, Sqlite> for ConversationType
1279where
1280    i32: ToSql<Integer, Sqlite>,
1281{
1282    fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> serialize::Result {
1283        out.set_value(*self as i32);
1284        Ok(IsNull::No)
1285    }
1286}
1287
1288impl FromSql<Integer, Sqlite> for ConversationType
1289where
1290    i32: FromSql<Integer, Sqlite>,
1291{
1292    fn from_sql(bytes: <Sqlite as Backend>::RawValue<'_>) -> deserialize::Result<Self> {
1293        match i32::from_sql(bytes)? {
1294            1 => Ok(ConversationType::Group),
1295            2 => Ok(ConversationType::Dm),
1296            3 => Ok(ConversationType::Sync),
1297            4 => Ok(ConversationType::Oneshot),
1298            x => Err(format!("Unrecognized variant {}", x).into()),
1299        }
1300    }
1301}
1302
1303impl std::fmt::Display for ConversationType {
1304    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1305        use ConversationType::*;
1306        match self {
1307            Group => write!(f, "group"),
1308            Dm => write!(f, "dm"),
1309            Sync => write!(f, "sync"),
1310            Oneshot => write!(f, "oneshot"),
1311        }
1312    }
1313}
1314
1315pub trait DmIdExt {
1316    fn other_inbox_id(&self, id: &str) -> String;
1317}
1318
1319impl DmIdExt for String {
1320    fn other_inbox_id(&self, id: &str) -> String {
1321        // drop the "dm:"
1322        let dm_id = &self[3..];
1323
1324        // If my id is the first half, return the second half, otherwise return first half
1325        let target_inbox = if dm_id[..id.len()] == *id {
1326            // + 1 because there is a colon (:)
1327            &dm_id[(id.len() + 1)..]
1328        } else {
1329            &dm_id[..id.len()]
1330        };
1331
1332        target_inbox.to_string()
1333    }
1334}
1335
1336#[cfg(test)]
1337pub(crate) mod tests {
1338    #[cfg(target_arch = "wasm32")]
1339    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_dedicated_worker);
1340
1341    pub use super::dms::tests::*;
1342    use super::*;
1343
1344    use crate::{
1345        Fetch, Store,
1346        consent_record::{ConsentType, StoredConsentRecord},
1347        readd_status::ReaddStatus,
1348        schema::groups::dsl::groups,
1349        test_utils::{with_connection, with_connection_async},
1350    };
1351    use xmtp_common::{assert_ok, rand_vec, time::now_ns};
1352    use xmtp_configuration::Originators;
1353
1354    /// Generate a test group
1355    pub fn generate_group(state: Option<GroupMembershipState>) -> StoredGroup {
1356        // Default behavior: Use `now_ns()` as the creation time
1357        generate_group_with_created_at(state, now_ns())
1358    }
1359
1360    pub fn generate_group_with_created_at(
1361        state: Option<GroupMembershipState>,
1362        created_at_ns: i64,
1363    ) -> StoredGroup {
1364        let id = rand_vec::<24>();
1365        let membership_state = state.unwrap_or(GroupMembershipState::Allowed);
1366        StoredGroup::builder()
1367            .id(id)
1368            .created_at_ns(created_at_ns)
1369            .membership_state(membership_state)
1370            .added_by_inbox_id("placeholder_address")
1371            .build()
1372            .unwrap()
1373    }
1374
1375    /// Generate a test group with welcome
1376    pub fn generate_group_with_welcome(
1377        state: Option<GroupMembershipState>,
1378        welcome_id: Option<i64>,
1379    ) -> StoredGroup {
1380        let id = rand_vec::<24>();
1381        let created_at_ns = now_ns();
1382        let membership_state = state.unwrap_or(GroupMembershipState::Allowed);
1383        StoredGroup::builder()
1384            .id(id)
1385            .created_at_ns(created_at_ns)
1386            .membership_state(membership_state)
1387            .added_by_inbox_id("placeholder_address")
1388            .sequence_id(welcome_id.unwrap_or(xmtp_common::rand_i64()))
1389            .originator_id(Originators::WELCOME_MESSAGES as i64)
1390            .conversation_type(ConversationType::Group)
1391            .build()
1392            .unwrap()
1393    }
1394
1395    /// Generate a test consent
1396    pub fn generate_consent_record(
1397        entity_type: ConsentType,
1398        state: ConsentState,
1399        entity: String,
1400    ) -> StoredConsentRecord {
1401        StoredConsentRecord {
1402            entity_type,
1403            state,
1404            entity,
1405            consented_at_ns: now_ns(),
1406        }
1407    }
1408
1409    #[xmtp_common::test]
1410    fn test_it_stores_group() {
1411        with_connection(|conn| {
1412            let test_group = generate_group(None);
1413
1414            test_group.store(conn).unwrap();
1415            assert_eq!(
1416                conn.raw_query_read(|raw_conn| groups.first::<StoredGroup>(raw_conn))
1417                    .unwrap(),
1418                test_group
1419            );
1420        })
1421    }
1422
1423    #[xmtp_common::test]
1424    fn test_it_fetches_group() {
1425        with_connection(|conn| {
1426            let test_group = generate_group(None);
1427
1428            conn.raw_query_write(|raw_conn| {
1429                diesel::insert_into(groups)
1430                    .values(test_group.clone())
1431                    .execute(raw_conn)
1432            })
1433            .unwrap();
1434
1435            let fetched_group: Option<StoredGroup> = conn.fetch(&test_group.id).unwrap();
1436            assert_eq!(fetched_group, Some(test_group));
1437        })
1438    }
1439
1440    #[xmtp_common::test]
1441    fn test_it_updates_group_membership_state() {
1442        with_connection(|conn| {
1443            let test_group = generate_group(Some(GroupMembershipState::Pending));
1444
1445            test_group.store(conn).unwrap();
1446            conn.update_group_membership(&test_group.id, GroupMembershipState::Rejected)
1447                .unwrap();
1448
1449            let updated_group: StoredGroup = conn.fetch(&test_group.id).ok().flatten().unwrap();
1450            assert_eq!(
1451                updated_group,
1452                StoredGroup {
1453                    membership_state: GroupMembershipState::Rejected,
1454                    ..test_group
1455                }
1456            );
1457        })
1458    }
1459
1460    #[xmtp_common::test]
1461    async fn test_find_groups() {
1462        let wait_in_wasm = async || {
1463            // web has current time resolution only to millisecond,
1464            // which is too slow for this test to pass and the timestamps to be different
1465            // force generated groups to be created at different times
1466
1467            if cfg!(target_arch = "wasm32") {
1468                xmtp_common::time::sleep(std::time::Duration::from_millis(1)).await;
1469            }
1470        };
1471        with_connection_async(|conn| async move {
1472            let test_group_1 = generate_group(Some(GroupMembershipState::Pending));
1473            test_group_1.store(&conn).unwrap();
1474            wait_in_wasm().await;
1475            let test_group_2 = generate_group(Some(GroupMembershipState::Allowed));
1476            test_group_2.store(&conn).unwrap();
1477            wait_in_wasm().await;
1478            let test_group_3 = generate_dm(Some(GroupMembershipState::Allowed));
1479            test_group_3.store(&conn).unwrap();
1480
1481            let other_inbox_id = test_group_3
1482                .dm_id
1483                .unwrap()
1484                .other_inbox_id("placeholder_inbox_id_1");
1485
1486            let all_results = conn
1487                .find_groups(GroupQueryArgs {
1488                    conversation_type: Some(ConversationType::Group),
1489                    ..Default::default()
1490                })
1491                .unwrap();
1492            assert_eq!(all_results.len(), 2);
1493
1494            let pending_results = conn
1495                .find_groups(GroupQueryArgs {
1496                    allowed_states: Some(vec![GroupMembershipState::Pending]),
1497                    conversation_type: Some(ConversationType::Group),
1498                    ..Default::default()
1499                })
1500                .unwrap();
1501            assert_eq!(pending_results[0].id, test_group_1.id);
1502            assert_eq!(pending_results.len(), 1);
1503
1504            // Offset and limit
1505            let results_with_limit = conn
1506                .find_groups(GroupQueryArgs {
1507                    conversation_type: Some(ConversationType::Group),
1508                    limit: Some(1),
1509                    ..Default::default()
1510                })
1511                .unwrap();
1512            assert_eq!(results_with_limit.len(), 1);
1513            assert_eq!(results_with_limit[0].id, test_group_1.id);
1514
1515            let results_with_created_at_ns_after = conn
1516                .find_groups(GroupQueryArgs {
1517                    conversation_type: Some(ConversationType::Group),
1518                    limit: Some(1),
1519                    created_after_ns: Some(test_group_1.created_at_ns),
1520                    ..Default::default()
1521                })
1522                .unwrap();
1523            assert_eq!(results_with_created_at_ns_after.len(), 1);
1524            assert_eq!(results_with_created_at_ns_after[0].id, test_group_2.id);
1525
1526            // Sync groups SHOULD NOT be returned
1527            let synced_groups = conn.primary_sync_group().unwrap();
1528            assert!(synced_groups.is_none());
1529
1530            // test that dm groups are included
1531            let dm_results = conn.find_groups(GroupQueryArgs::default()).unwrap();
1532            assert_eq!(dm_results.len(), 3);
1533            assert_eq!(dm_results[2].id, test_group_3.id);
1534
1535            // test find_dm_group
1536            let dm_result = conn
1537                .find_active_dm_group(format!("dm:placeholder_inbox_id_1:{}", &other_inbox_id))
1538                .unwrap();
1539            assert!(dm_result.is_some());
1540
1541            // test only dms are returned
1542            let dm_results = conn
1543                .find_groups(GroupQueryArgs {
1544                    conversation_type: Some(ConversationType::Dm),
1545                    ..Default::default()
1546                })
1547                .unwrap();
1548            assert_eq!(dm_results.len(), 1);
1549            assert_eq!(dm_results[0].id, test_group_3.id);
1550        })
1551        .await
1552    }
1553
1554    #[xmtp_common::test]
1555    async fn test_installations_last_checked_is_updated() {
1556        with_connection_async(|conn| async move {
1557            let test_group = generate_group(None);
1558            test_group.store(&conn).unwrap();
1559
1560            // Check that the installations update has not been performed, yet
1561            assert_eq!(test_group.installations_last_checked, 0);
1562
1563            if cfg!(target_arch = "wasm32") {
1564                // web has current time resolution only to millisecond,
1565                // which is too slow for this test to pass and the timestamps to be different
1566                xmtp_common::time::sleep(std::time::Duration::from_millis(1)).await;
1567            }
1568            // Check that some event occurred which triggers an installation list update.
1569            // Here we invoke that event directly
1570            let result = conn.update_installations_time_checked(test_group.id.clone());
1571            assert_ok!(result);
1572
1573            // Check that the latest installation list timestamp has been updated
1574            let fetched_group: StoredGroup = conn.fetch(&test_group.id).ok().flatten().unwrap();
1575            assert_ne!(fetched_group.installations_last_checked, 0);
1576            assert!(fetched_group.created_at_ns < fetched_group.installations_last_checked);
1577        })
1578        .await
1579    }
1580
1581    #[xmtp_common::test]
1582    fn test_new_group_has_correct_purpose() {
1583        with_connection(|conn| {
1584            let test_group = generate_group(None);
1585
1586            conn.raw_query_write(|raw_conn| {
1587                diesel::insert_into(groups)
1588                    .values(test_group.clone())
1589                    .execute(raw_conn)
1590            })
1591            .unwrap();
1592
1593            let fetched_group: Option<StoredGroup> = conn.fetch(&test_group.id).unwrap();
1594            assert_eq!(fetched_group, Some(test_group));
1595            let conversation_type = fetched_group.unwrap().conversation_type;
1596            assert_eq!(conversation_type, ConversationType::Group);
1597        })
1598    }
1599
1600    #[xmtp_common::test]
1601    fn test_find_groups_by_consent_state() {
1602        with_connection(|conn| {
1603            let test_group_1 = generate_group(Some(GroupMembershipState::Allowed));
1604            test_group_1.store(conn).unwrap();
1605            let test_group_2 = generate_group(Some(GroupMembershipState::Allowed));
1606            test_group_2.store(conn).unwrap();
1607            let test_group_3 = generate_dm(Some(GroupMembershipState::Allowed));
1608            test_group_3.store(conn).unwrap();
1609            let test_group_4 = generate_dm(Some(GroupMembershipState::Allowed));
1610            test_group_4.store(conn).unwrap();
1611
1612            let test_group_1_consent = generate_consent_record(
1613                ConsentType::ConversationId,
1614                ConsentState::Allowed,
1615                hex::encode(test_group_1.id.clone()),
1616            );
1617            test_group_1_consent.store(conn).unwrap();
1618            let test_group_2_consent = generate_consent_record(
1619                ConsentType::ConversationId,
1620                ConsentState::Denied,
1621                hex::encode(test_group_2.id.clone()),
1622            );
1623            test_group_2_consent.store(conn).unwrap();
1624            let test_group_3_consent = generate_consent_record(
1625                ConsentType::ConversationId,
1626                ConsentState::Allowed,
1627                hex::encode(test_group_3.id.clone()),
1628            );
1629            test_group_3_consent.store(conn).unwrap();
1630
1631            let all_results = conn
1632                .find_groups(GroupQueryArgs {
1633                    consent_states: Some(vec![
1634                        ConsentState::Allowed,
1635                        ConsentState::Unknown,
1636                        ConsentState::Denied,
1637                    ]),
1638                    ..Default::default()
1639                })
1640                .unwrap();
1641            assert_eq!(all_results.len(), 4);
1642
1643            let default_results = conn.find_groups(GroupQueryArgs::default()).unwrap();
1644            assert_eq!(default_results.len(), 3);
1645
1646            let allowed_results = conn
1647                .find_groups(GroupQueryArgs {
1648                    consent_states: Some(vec![ConsentState::Allowed]),
1649                    ..Default::default()
1650                })
1651                .unwrap();
1652            assert_eq!(allowed_results.len(), 2);
1653
1654            let allowed_unknown_results = conn
1655                .find_groups(GroupQueryArgs {
1656                    consent_states: Some(vec![ConsentState::Allowed, ConsentState::Unknown]),
1657                    ..Default::default()
1658                })
1659                .unwrap();
1660            assert_eq!(allowed_unknown_results.len(), 3);
1661
1662            let denied_results = conn
1663                .find_groups(GroupQueryArgs {
1664                    consent_states: Some(vec![ConsentState::Denied]),
1665                    ..Default::default()
1666                })
1667                .unwrap();
1668            assert_eq!(denied_results.len(), 1);
1669            assert_eq!(denied_results[0].id, test_group_2.id);
1670
1671            let unknown_results = conn
1672                .find_groups(GroupQueryArgs {
1673                    consent_states: Some(vec![ConsentState::Unknown]),
1674                    ..Default::default()
1675                })
1676                .unwrap();
1677            assert_eq!(unknown_results.len(), 1);
1678            assert_eq!(unknown_results[0].id, test_group_4.id);
1679
1680            let empty_array_results = conn
1681                .find_groups(GroupQueryArgs {
1682                    consent_states: Some(vec![]),
1683                    ..Default::default()
1684                })
1685                .unwrap();
1686            assert_eq!(empty_array_results.len(), 3);
1687        })
1688    }
1689
1690    #[xmtp_common::test]
1691    fn test_get_sequence_ids() {
1692        with_connection(|conn| {
1693            let mls_groups = [
1694                generate_group_with_welcome(None, Some(30)),
1695                generate_group(None),
1696                generate_group(None),
1697                generate_group_with_welcome(None, Some(10)),
1698            ];
1699            for g in mls_groups.iter() {
1700                g.store(conn).unwrap();
1701            }
1702            assert_eq!(
1703                vec![30, 10],
1704                conn.group_cursors()
1705                    .unwrap()
1706                    .into_iter()
1707                    .map(|c| c.sequence_id)
1708                    .collect::<Vec<u64>>()
1709            );
1710        })
1711    }
1712
1713    #[xmtp_common::test]
1714    fn test_find_group_default_excludes_denied() {
1715        with_connection(|conn| {
1716            // Create three groups: one allowed, one denied, one unknown (no consent)
1717            let allowed_group = generate_group(Some(GroupMembershipState::Allowed));
1718            allowed_group.store(conn).unwrap();
1719
1720            let denied_group = generate_group(Some(GroupMembershipState::Allowed));
1721            denied_group.store(conn).unwrap();
1722
1723            let unknown_group = generate_group(Some(GroupMembershipState::Allowed));
1724            unknown_group.store(conn).unwrap();
1725
1726            // Create consent records for allowed and denied; leave unknown_group without one
1727            let allowed_consent = generate_consent_record(
1728                ConsentType::ConversationId,
1729                ConsentState::Allowed,
1730                hex::encode(allowed_group.id.clone()),
1731            );
1732            allowed_consent.store(conn).unwrap();
1733
1734            let denied_consent = generate_consent_record(
1735                ConsentType::ConversationId,
1736                ConsentState::Denied,
1737                hex::encode(denied_group.id.clone()),
1738            );
1739            denied_consent.store(conn).unwrap();
1740
1741            // Query using default args (no consent_states specified)
1742            let default_results = conn.find_groups(GroupQueryArgs::default()).unwrap();
1743
1744            // Expect to include only: allowed_group and unknown_group (2 total)
1745            assert_eq!(default_results.len(), 2);
1746            let returned_ids: Vec<_> = default_results.iter().map(|g| &g.id).collect();
1747            assert!(returned_ids.contains(&&allowed_group.id));
1748            assert!(returned_ids.contains(&&unknown_group.id));
1749            assert!(!returned_ids.contains(&&denied_group.id));
1750        })
1751    }
1752
1753    #[xmtp_common::test(unwrap_try = true)]
1754    fn test_get_conversation_ids_for_remote_log_publish() {
1755        with_connection(|conn| {
1756            let mut group1 = generate_group(None);
1757            let mut group2 = generate_group(None);
1758            let mut group3 = generate_group(None);
1759            let mut group4 = generate_group(None);
1760            group1.should_publish_commit_log = true;
1761            group1.commit_log_public_key = None;
1762            generate_consent_record(
1763                ConsentType::ConversationId,
1764                ConsentState::Allowed,
1765                hex::encode(group1.id.clone()),
1766            )
1767            .store(conn)?;
1768            group2.should_publish_commit_log = true;
1769            group2.commit_log_public_key = Some(rand_vec::<32>());
1770
1771            group3.should_publish_commit_log = true;
1772            group3.commit_log_public_key = Some(rand_vec::<32>());
1773            generate_consent_record(
1774                ConsentType::ConversationId,
1775                ConsentState::Allowed,
1776                hex::encode(group3.id.clone()),
1777            )
1778            .store(conn)?;
1779            group4.should_publish_commit_log = false;
1780            group1.store(conn)?;
1781            group2.store(conn)?;
1782            group3.store(conn)?;
1783            group4.store(conn)?;
1784
1785            let commit_log_keys = conn.get_conversation_ids_for_remote_log_publish().unwrap();
1786            assert_eq!(commit_log_keys.len(), 2);
1787            assert_eq!(commit_log_keys[0].id, group1.id);
1788            assert_eq!(commit_log_keys[1].id, group3.id);
1789            assert_eq!(commit_log_keys[0].commit_log_public_key, None);
1790            assert_eq!(
1791                commit_log_keys[1].commit_log_public_key,
1792                group3.commit_log_public_key
1793            );
1794        })
1795    }
1796
1797    #[xmtp_common::test]
1798    fn test_get_conversation_ids_for_remote_log_publish_with_consent() {
1799        with_connection(|conn| {
1800            // Create groups: one with Allowed consent, one with Denied consent, one with no consent
1801            let mut allowed_group = generate_group(None);
1802            allowed_group.should_publish_commit_log = true;
1803            allowed_group.store(conn).unwrap();
1804
1805            let mut denied_group = generate_group(None);
1806            denied_group.should_publish_commit_log = true;
1807            denied_group.store(conn).unwrap();
1808
1809            let mut no_consent_group = generate_group(None);
1810            no_consent_group.should_publish_commit_log = true;
1811            no_consent_group.store(conn).unwrap();
1812
1813            // Create consent records
1814            let allowed_consent = generate_consent_record(
1815                ConsentType::ConversationId,
1816                ConsentState::Allowed,
1817                hex::encode(allowed_group.id.clone()),
1818            );
1819            allowed_consent.store(conn).unwrap();
1820
1821            let denied_consent = generate_consent_record(
1822                ConsentType::ConversationId,
1823                ConsentState::Denied,
1824                hex::encode(denied_group.id.clone()),
1825            );
1826            denied_consent.store(conn).unwrap();
1827
1828            // Function should only return groups with Allowed consent state
1829            let commit_log_keys = conn.get_conversation_ids_for_remote_log_publish().unwrap();
1830            assert_eq!(commit_log_keys.len(), 1);
1831            assert_eq!(commit_log_keys[0].id, allowed_group.id);
1832        })
1833    }
1834
1835    #[xmtp_common::test]
1836    fn test_get_conversation_ids_for_remote_log_download_with_consent() {
1837        with_connection(|conn| {
1838            // Create groups: one with Allowed consent, one with Denied consent, one with no consent
1839            let allowed_group = generate_group(None);
1840            allowed_group.store(conn).unwrap();
1841
1842            let denied_group = generate_group(None);
1843            denied_group.store(conn).unwrap();
1844
1845            let no_consent_group = generate_group(None);
1846            no_consent_group.store(conn).unwrap();
1847
1848            // Create a sync group (should be excluded regardless of consent)
1849            let mut sync_group = generate_group(None);
1850            sync_group.conversation_type = ConversationType::Sync;
1851            sync_group.store(conn).unwrap();
1852            let sync_consent = generate_consent_record(
1853                ConsentType::ConversationId,
1854                ConsentState::Allowed,
1855                hex::encode(sync_group.id.clone()),
1856            );
1857            sync_consent.store(conn).unwrap();
1858
1859            // Create consent records
1860            let allowed_consent = generate_consent_record(
1861                ConsentType::ConversationId,
1862                ConsentState::Allowed,
1863                hex::encode(allowed_group.id.clone()),
1864            );
1865            allowed_consent.store(conn).unwrap();
1866
1867            let denied_consent = generate_consent_record(
1868                ConsentType::ConversationId,
1869                ConsentState::Denied,
1870                hex::encode(denied_group.id.clone()),
1871            );
1872            denied_consent.store(conn).unwrap();
1873
1874            // Function should only return groups with Allowed consent state, excluding sync groups
1875            let conversation_ids = conn.get_conversation_ids_for_remote_log_download().unwrap();
1876            assert_eq!(conversation_ids.len(), 1);
1877            assert_eq!(conversation_ids[0].id, allowed_group.id);
1878        })
1879    }
1880
1881    #[xmtp_common::test]
1882    fn test_get_conversation_ids_for_responding_readds() {
1883        with_connection(|conn| {
1884            // Create test groups
1885            let group_id_1 = vec![1, 2, 3];
1886            let group_id_2 = vec![4, 5, 6];
1887            let group_id_3 = vec![7, 8, 9];
1888
1889            let group1 = StoredGroup::builder()
1890                .id(group_id_1.clone())
1891                .created_at_ns(1000)
1892                .membership_state(GroupMembershipState::Allowed)
1893                .added_by_inbox_id("placeholder_address")
1894                .build()
1895                .unwrap();
1896            group1.store(conn).unwrap();
1897
1898            let group2 = StoredGroup::builder()
1899                .id(group_id_2.clone())
1900                .created_at_ns(2000)
1901                .membership_state(GroupMembershipState::Allowed)
1902                .added_by_inbox_id("placeholder_address")
1903                .build()
1904                .unwrap();
1905            group2.store(conn).unwrap();
1906
1907            let group3 = StoredGroup::builder()
1908                .id(group_id_3.clone())
1909                .created_at_ns(3000)
1910                .membership_state(GroupMembershipState::Allowed)
1911                .added_by_inbox_id("placeholder_address")
1912                .build()
1913                .unwrap();
1914            group3.store(conn).unwrap();
1915
1916            // Create readd status entries with various test cases
1917            let test_cases = vec![
1918                // Case 1: Pending readd (requested_at > responded_at)
1919                ReaddStatus {
1920                    group_id: group_id_1.clone(),
1921                    installation_id: vec![1],
1922                    requested_at_sequence_id: Some(10),
1923                    responded_at_sequence_id: Some(5),
1924                },
1925                // Case 2: Pending readd (responded_at is None)
1926                ReaddStatus {
1927                    group_id: group_id_1.clone(),
1928                    installation_id: vec![2],
1929                    requested_at_sequence_id: Some(8),
1930                    responded_at_sequence_id: None,
1931                },
1932                // Case 4: Not pending (requested_at < responded_at)
1933                ReaddStatus {
1934                    group_id: group_id_2.clone(),
1935                    installation_id: vec![4],
1936                    requested_at_sequence_id: Some(12),
1937                    responded_at_sequence_id: Some(15),
1938                },
1939                // Case 5: Not pending (requested_at is None)
1940                ReaddStatus {
1941                    group_id: group_id_2.clone(),
1942                    installation_id: vec![5],
1943                    requested_at_sequence_id: None,
1944                    responded_at_sequence_id: Some(20),
1945                },
1946                // Case 6: Pending readd (requested_at == responded_at, should be pending)
1947                ReaddStatus {
1948                    group_id: group_id_3.clone(),
1949                    installation_id: vec![6],
1950                    requested_at_sequence_id: Some(25),
1951                    responded_at_sequence_id: Some(25),
1952                },
1953            ];
1954
1955            // Store all test cases
1956            for status in test_cases {
1957                status.store(conn).unwrap();
1958            }
1959
1960            // Call the method under test
1961            let result = conn.get_conversation_ids_for_responding_readds().unwrap();
1962
1963            // Should return groups 1 and 3 (both have pending readd requests)
1964            // Group 2 has no pending readds
1965            assert_eq!(result.len(), 2);
1966
1967            // Results should be sorted by group_id (since we used distinct())
1968            let mut result_group_ids: Vec<Vec<u8>> =
1969                result.iter().map(|r| r.group_id.clone()).collect();
1970            result_group_ids.sort();
1971
1972            assert_eq!(result_group_ids[0], group_id_1);
1973            assert_eq!(result_group_ids[1], group_id_3);
1974
1975            // Check that the correct metadata is returned
1976            let group1_result = result.iter().find(|r| r.group_id == group_id_1).unwrap();
1977            assert_eq!(group1_result.dm_id, None);
1978            assert_eq!(group1_result.conversation_type, ConversationType::Group);
1979            assert_eq!(group1_result.created_at_ns, 1000);
1980
1981            let group3_result = result.iter().find(|r| r.group_id == group_id_3).unwrap();
1982            assert_eq!(group3_result.dm_id, None);
1983            assert_eq!(group3_result.conversation_type, ConversationType::Group);
1984            assert_eq!(group3_result.created_at_ns, 3000);
1985        })
1986    }
1987}