1use super::{
3 ConnectionExt, Sqlite,
4 consent_record::ConsentState,
5 db_connection::DbConnection,
6 schema::groups::{self, dsl},
7};
8use crate::NotFound;
9use crate::{DuplicateItem, StorageError, impl_fetch, impl_store, impl_store_or_ignore};
10use derive_builder::{Builder, UninitializedFieldError};
11use diesel::{
12 backend::Backend,
13 deserialize::{self, FromSql, FromSqlRow},
14 dsl::sql,
15 expression::AsExpression,
16 prelude::*,
17 serialize::{self, IsNull, Output, ToSql},
18 sql_types::Integer,
19};
20use serde::{Deserialize, Serialize};
21mod convert;
22mod dms;
23mod version;
24
25pub use dms::QueryDms;
26pub use version::QueryGroupVersion;
27use xmtp_proto::types::Cursor;
28
29pub type ID = Vec<u8>;
30
31#[derive(
32 Debug,
33 Clone,
34 Serialize,
35 Deserialize,
36 PartialEq,
37 Insertable,
38 Identifiable,
39 Queryable,
40 Builder,
41 Selectable,
42 QueryableByName,
43)]
44#[diesel(table_name = groups)]
45#[diesel(primary_key(id))]
46#[diesel(check_for_backend(Sqlite))]
47#[builder(
48 setter(into),
49 build_fn(error = "StorageError", validate = "Self::validate")
50)]
51#[derive(AsChangeset)]
52pub struct StoredGroup {
54 pub id: Vec<u8>,
56 pub created_at_ns: i64,
58 pub membership_state: GroupMembershipState,
60 #[builder(default = "0")]
62 pub installations_last_checked: i64,
63 pub added_by_inbox_id: String,
65 #[builder(default = None)]
67 pub sequence_id: Option<i64>,
68 #[builder(default = "0")]
70 pub rotated_at_ns: i64,
71 #[builder(default = "self.default_conversation_type()")]
73 pub conversation_type: ConversationType,
74 #[builder(default = None)]
76 pub dm_id: Option<String>,
77 #[builder(default = None)]
79 pub last_message_ns: Option<i64>,
80 #[builder(default = None)]
82 pub message_disappear_from_ns: Option<i64>,
83 #[builder(default = None)]
85 pub message_disappear_in_ns: Option<i64>,
86 #[builder(default = None)]
88 pub paused_for_version: Option<String>,
89 #[builder(default = false)]
90 pub maybe_forked: bool,
91 #[builder(default = "String::new()")]
92 pub fork_details: String,
93 #[builder(default = None)]
95 pub originator_id: Option<i64>,
96 #[builder(default = false)]
98 pub should_publish_commit_log: bool,
99 #[builder(default = None)]
102 pub commit_log_public_key: Option<Vec<u8>>,
103 #[builder(default = None)]
106 pub is_commit_log_forked: Option<bool>,
107 #[builder(default = None)]
110 pub has_pending_leave_request: Option<bool>,
111 }
113
114impl StoredGroupBuilder {
115 fn validate(&self) -> Result<(), StorageError> {
116 if self.sequence_id.is_some() && self.originator_id.is_none() {
117 return Err(UninitializedFieldError::new("originator_id").into());
118 }
119 if self.originator_id.is_some() && self.sequence_id.is_none() {
120 return Err(UninitializedFieldError::new("sequence_id").into());
121 }
122 Ok(())
123 }
124}
125impl StoredGroup {
126 pub fn cursor(&self) -> Option<Cursor> {
127 if let Some(sequence_id) = self.sequence_id
131 && let Some(originator) = self.originator_id
132 {
133 return Some(Cursor::new(sequence_id as u64, originator as u32));
134 }
135 None
136 }
137}
138
139impl StoredGroupBuilder {
140 pub fn cursor(&mut self, cursor: Cursor) -> &mut Self {
141 self.originator_id = Some(Some(cursor.originator_id as i64));
142 self.sequence_id = Some(Some(cursor.sequence_id as i64));
143 self
144 }
145}
146
147#[derive(Queryable)]
149#[diesel(table_name = groups)]
150pub struct StoredGroupCommitLogPublicKey {
151 pub id: Vec<u8>,
152 pub commit_log_public_key: Option<Vec<u8>>,
153}
154
155#[derive(Debug, Clone, Queryable, QueryableByName)]
157pub struct StoredGroupForReaddRequest {
158 #[diesel(sql_type = diesel::sql_types::Binary)]
159 pub group_id: Vec<u8>,
160 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
161 pub latest_commit_sequence_id: Option<i64>,
162}
163
164#[derive(Debug, Clone, Queryable, QueryableByName)]
166pub struct StoredGroupForRespondingReadds {
167 #[diesel(sql_type = diesel::sql_types::Binary)]
168 pub group_id: Vec<u8>,
169 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
170 pub dm_id: Option<String>,
171 #[diesel(sql_type = diesel::sql_types::Integer)]
172 pub conversation_type: ConversationType,
173 #[diesel(sql_type = diesel::sql_types::BigInt)]
174 pub created_at_ns: i64,
175}
176
177impl_fetch!(StoredGroup, groups, Vec<u8>);
179impl_store!(StoredGroup, groups);
180impl_store_or_ignore!(StoredGroup, groups);
181
182impl StoredGroupBuilder {
183 fn default_conversation_type(&self) -> ConversationType {
184 if self.dm_id.is_some() {
185 ConversationType::Dm
186 } else {
187 ConversationType::Group
188 }
189 }
190}
191
192impl StoredGroup {
193 pub fn builder() -> StoredGroupBuilder {
194 StoredGroupBuilder::default()
195 }
196}
197
198#[derive(Debug, Clone, Default)]
199pub enum GroupQueryOrderBy {
200 #[default]
201 CreatedAt,
202 LastActivity,
203}
204
205#[derive(Debug, Default, Clone)]
206pub struct GroupQueryArgs {
207 pub allowed_states: Option<Vec<GroupMembershipState>>,
208 pub created_after_ns: Option<i64>,
209 pub created_before_ns: Option<i64>,
210 pub last_activity_after_ns: Option<i64>,
211 pub last_activity_before_ns: Option<i64>,
212 pub limit: Option<i64>,
213 pub conversation_type: Option<ConversationType>,
214 pub consent_states: Option<Vec<ConsentState>>,
215 pub include_sync_groups: bool,
216 pub include_duplicate_dms: bool,
217 pub should_publish_commit_log: Option<bool>,
218 pub order_by: Option<GroupQueryOrderBy>,
219}
220
221impl AsRef<GroupQueryArgs> for GroupQueryArgs {
222 fn as_ref(&self) -> &GroupQueryArgs {
223 self
224 }
225}
226
227impl GroupQueryArgs {
228 pub fn validate(&self) -> Result<(), crate::ConnectionError> {
229 if self.last_activity_after_ns.is_some() && self.created_after_ns.is_some() {
230 return Err(crate::ConnectionError::InvalidQuery(
231 "last_activity_after_ns and created_after_ns cannot be used together".to_string(),
232 ));
233 }
234
235 if self.last_activity_before_ns.is_some() && self.created_before_ns.is_some() {
236 return Err(crate::ConnectionError::InvalidQuery(
237 "last_activity_before_ns and created_before_ns cannot be used together".to_string(),
238 ));
239 }
240
241 Ok(())
242 }
243}
244
245pub trait QueryGroup {
246 fn find_groups<A: AsRef<GroupQueryArgs>>(
248 &self,
249 args: A,
250 ) -> Result<Vec<StoredGroup>, crate::ConnectionError>;
251
252 fn find_groups_by_id_paged<A: AsRef<GroupQueryArgs>>(
253 &self,
254 args: A,
255 offset: i64,
256 ) -> Result<Vec<StoredGroup>, crate::ConnectionError>;
257
258 fn update_group_membership<GroupId: AsRef<[u8]>>(
260 &self,
261 group_id: GroupId,
262 state: GroupMembershipState,
263 ) -> Result<(), crate::ConnectionError>;
264
265 fn all_sync_groups(&self) -> Result<Vec<StoredGroup>, crate::ConnectionError>;
266
267 fn find_sync_group(&self, id: &[u8]) -> Result<Option<StoredGroup>, crate::ConnectionError>;
268
269 fn primary_sync_group(&self) -> Result<Option<StoredGroup>, crate::ConnectionError>;
270
271 fn find_group(&self, id: &[u8]) -> Result<Option<StoredGroup>, crate::ConnectionError>;
273
274 fn find_group_by_sequence_id(
276 &self,
277 cursor: Cursor,
278 ) -> Result<Option<StoredGroup>, crate::ConnectionError>;
279
280 fn get_rotated_at_ns(&self, group_id: Vec<u8>) -> Result<i64, StorageError>;
281
282 fn update_rotated_at_ns(&self, group_id: Vec<u8>) -> Result<(), StorageError>;
284
285 fn get_installations_time_checked(&self, group_id: Vec<u8>) -> Result<i64, StorageError>;
286
287 fn update_installations_time_checked(&self, group_id: Vec<u8>) -> Result<(), StorageError>;
289
290 fn update_message_disappearing_from_ns(
291 &self,
292 group_id: Vec<u8>,
293 from_ns: Option<i64>,
294 ) -> Result<(), StorageError>;
295
296 fn update_message_disappearing_in_ns(
297 &self,
298 group_id: Vec<u8>,
299 in_ns: Option<i64>,
300 ) -> Result<(), StorageError>;
301
302 fn insert_or_replace_group(&self, group: StoredGroup) -> Result<StoredGroup, StorageError>;
303
304 fn group_cursors(&self) -> Result<Vec<Cursor>, crate::ConnectionError>;
306
307 fn mark_group_as_maybe_forked(
308 &self,
309 group_id: &[u8],
310 fork_details: String,
311 ) -> Result<(), StorageError>;
312
313 fn clear_fork_flag_for_group(&self, group_id: &[u8]) -> Result<(), crate::ConnectionError>;
314
315 fn has_duplicate_dm(&self, group_id: &[u8]) -> Result<bool, crate::ConnectionError>;
316
317 fn get_conversation_ids_for_remote_log_publish(
319 &self,
320 ) -> Result<Vec<StoredGroupCommitLogPublicKey>, crate::ConnectionError>;
321
322 fn get_conversation_ids_for_remote_log_download(
324 &self,
325 ) -> Result<Vec<StoredGroupCommitLogPublicKey>, crate::ConnectionError>;
326
327 fn get_conversation_ids_for_fork_check(&self) -> Result<Vec<Vec<u8>>, crate::ConnectionError>;
329
330 fn get_conversation_ids_for_requesting_readds(
332 &self,
333 ) -> Result<Vec<StoredGroupForReaddRequest>, crate::ConnectionError>;
334
335 fn get_conversation_ids_for_responding_readds(
337 &self,
338 ) -> Result<Vec<StoredGroupForRespondingReadds>, crate::ConnectionError>;
339
340 fn get_conversation_type(
341 &self,
342 group_id: &[u8],
343 ) -> Result<ConversationType, crate::ConnectionError>;
344
345 fn set_group_commit_log_public_key(
347 &self,
348 group_id: &[u8],
349 public_key: &[u8],
350 ) -> Result<(), StorageError>;
351
352 fn set_group_commit_log_forked_status(
354 &self,
355 group_id: &[u8],
356 is_forked: Option<bool>,
357 ) -> Result<(), StorageError>;
358
359 fn get_group_commit_log_forked_status(
361 &self,
362 group_id: &[u8],
363 ) -> Result<Option<bool>, StorageError>;
364
365 fn set_group_has_pending_leave_request_status(
367 &self,
368 group_id: &[u8],
369 has_pending_leave_request: Option<bool>,
370 ) -> Result<(), StorageError>;
371
372 fn get_groups_have_pending_leave_request(&self)
373 -> Result<Vec<Vec<u8>>, crate::ConnectionError>;
374}
375
376impl<T> QueryGroup for &T
377where
378 T: QueryGroup,
379{
380 fn find_groups<A: AsRef<GroupQueryArgs>>(
382 &self,
383 args: A,
384 ) -> Result<Vec<StoredGroup>, crate::ConnectionError> {
385 (**self).find_groups(args)
386 }
387
388 fn find_groups_by_id_paged<A: AsRef<GroupQueryArgs>>(
389 &self,
390 args: A,
391 offset: i64,
392 ) -> Result<Vec<StoredGroup>, crate::ConnectionError> {
393 (**self).find_groups_by_id_paged(args, offset)
394 }
395
396 fn update_group_membership<GroupId: AsRef<[u8]>>(
398 &self,
399 group_id: GroupId,
400 state: GroupMembershipState,
401 ) -> Result<(), crate::ConnectionError> {
402 (**self).update_group_membership(group_id, state)
403 }
404
405 fn all_sync_groups(&self) -> Result<Vec<StoredGroup>, crate::ConnectionError> {
406 (**self).all_sync_groups()
407 }
408
409 fn find_sync_group(&self, id: &[u8]) -> Result<Option<StoredGroup>, crate::ConnectionError> {
410 (**self).find_sync_group(id)
411 }
412
413 fn primary_sync_group(&self) -> Result<Option<StoredGroup>, crate::ConnectionError> {
414 (**self).primary_sync_group()
415 }
416
417 fn find_group(&self, id: &[u8]) -> Result<Option<StoredGroup>, crate::ConnectionError> {
419 (**self).find_group(id)
420 }
421
422 fn find_group_by_sequence_id(
424 &self,
425 cursor: Cursor,
426 ) -> Result<Option<StoredGroup>, crate::ConnectionError> {
427 (**self).find_group_by_sequence_id(cursor)
428 }
429
430 fn get_rotated_at_ns(&self, group_id: Vec<u8>) -> Result<i64, StorageError> {
431 (**self).get_rotated_at_ns(group_id)
432 }
433
434 fn update_rotated_at_ns(&self, group_id: Vec<u8>) -> Result<(), StorageError> {
436 (**self).update_rotated_at_ns(group_id)
437 }
438
439 fn get_installations_time_checked(&self, group_id: Vec<u8>) -> Result<i64, StorageError> {
440 (**self).get_installations_time_checked(group_id)
441 }
442
443 fn update_installations_time_checked(&self, group_id: Vec<u8>) -> Result<(), StorageError> {
445 (**self).update_installations_time_checked(group_id)
446 }
447
448 fn update_message_disappearing_from_ns(
449 &self,
450 group_id: Vec<u8>,
451 from_ns: Option<i64>,
452 ) -> Result<(), StorageError> {
453 (**self).update_message_disappearing_from_ns(group_id, from_ns)
454 }
455
456 fn update_message_disappearing_in_ns(
457 &self,
458 group_id: Vec<u8>,
459 in_ns: Option<i64>,
460 ) -> Result<(), StorageError> {
461 (**self).update_message_disappearing_in_ns(group_id, in_ns)
462 }
463
464 fn insert_or_replace_group(&self, group: StoredGroup) -> Result<StoredGroup, StorageError> {
465 (**self).insert_or_replace_group(group)
466 }
467
468 fn group_cursors(&self) -> Result<Vec<Cursor>, crate::ConnectionError> {
470 (**self).group_cursors()
471 }
472
473 fn mark_group_as_maybe_forked(
474 &self,
475 group_id: &[u8],
476 fork_details: String,
477 ) -> Result<(), StorageError> {
478 (**self).mark_group_as_maybe_forked(group_id, fork_details)
479 }
480
481 fn clear_fork_flag_for_group(&self, group_id: &[u8]) -> Result<(), crate::ConnectionError> {
482 (**self).clear_fork_flag_for_group(group_id)
483 }
484
485 fn has_duplicate_dm(&self, group_id: &[u8]) -> Result<bool, crate::ConnectionError> {
486 (**self).has_duplicate_dm(group_id)
487 }
488
489 fn get_conversation_ids_for_remote_log_publish(
491 &self,
492 ) -> Result<Vec<StoredGroupCommitLogPublicKey>, crate::ConnectionError> {
493 (**self).get_conversation_ids_for_remote_log_publish()
494 }
495
496 fn get_conversation_ids_for_remote_log_download(
497 &self,
498 ) -> Result<Vec<StoredGroupCommitLogPublicKey>, crate::ConnectionError> {
499 (**self).get_conversation_ids_for_remote_log_download()
500 }
501
502 fn get_conversation_ids_for_fork_check(&self) -> Result<Vec<Vec<u8>>, crate::ConnectionError> {
503 (**self).get_conversation_ids_for_fork_check()
504 }
505
506 fn get_conversation_ids_for_requesting_readds(
507 &self,
508 ) -> Result<Vec<StoredGroupForReaddRequest>, crate::ConnectionError> {
509 (**self).get_conversation_ids_for_requesting_readds()
510 }
511
512 fn get_conversation_ids_for_responding_readds(
513 &self,
514 ) -> Result<Vec<StoredGroupForRespondingReadds>, crate::ConnectionError> {
515 (**self).get_conversation_ids_for_responding_readds()
516 }
517
518 fn get_conversation_type(
519 &self,
520 group_id: &[u8],
521 ) -> Result<ConversationType, crate::ConnectionError> {
522 (**self).get_conversation_type(group_id)
523 }
524
525 fn set_group_commit_log_public_key(
526 &self,
527 group_id: &[u8],
528 public_key: &[u8],
529 ) -> Result<(), StorageError> {
530 (**self).set_group_commit_log_public_key(group_id, public_key)
531 }
532
533 fn set_group_commit_log_forked_status(
534 &self,
535 group_id: &[u8],
536 is_forked: Option<bool>,
537 ) -> Result<(), StorageError> {
538 (**self).set_group_commit_log_forked_status(group_id, is_forked)
539 }
540
541 fn get_group_commit_log_forked_status(
542 &self,
543 group_id: &[u8],
544 ) -> Result<Option<bool>, StorageError> {
545 (**self).get_group_commit_log_forked_status(group_id)
546 }
547
548 fn set_group_has_pending_leave_request_status(
549 &self,
550 group_id: &[u8],
551 has_pending_leave_request: Option<bool>,
552 ) -> Result<(), StorageError> {
553 (**self).set_group_has_pending_leave_request_status(group_id, has_pending_leave_request)
554 }
555
556 fn get_groups_have_pending_leave_request(
557 &self,
558 ) -> Result<Vec<Vec<u8>>, crate::ConnectionError> {
559 (**self).get_groups_have_pending_leave_request()
560 }
561}
562
563impl<C: ConnectionExt> QueryGroup for DbConnection<C> {
564 fn find_groups<A: AsRef<GroupQueryArgs>>(
566 &self,
567 args: A,
568 ) -> Result<Vec<StoredGroup>, crate::ConnectionError> {
569 use crate::schema::consent_records::dsl as consent_dsl;
570
571 args.as_ref().validate()?;
572
573 let GroupQueryArgs {
574 allowed_states,
575 created_after_ns,
576 created_before_ns,
577 limit,
578 conversation_type,
579 consent_states,
580 include_sync_groups,
581 include_duplicate_dms,
582 last_activity_after_ns,
583 last_activity_before_ns,
584 should_publish_commit_log,
585 order_by,
586 } = args.as_ref();
587
588 let order_expression = match order_by.clone().unwrap_or_default() {
589 GroupQueryOrderBy::CreatedAt => {
590 diesel::dsl::sql::<diesel::sql_types::BigInt>("created_at_ns ASC")
591 }
592 GroupQueryOrderBy::LastActivity => diesel::dsl::sql::<diesel::sql_types::BigInt>(
593 "COALESCE(last_message_ns, created_at_ns) DESC",
594 ),
595 };
596
597 let mut query = dsl::groups
598 .filter(dsl::conversation_type.ne_all(ConversationType::virtual_types()))
599 .order(order_expression)
600 .into_boxed();
601
602 if !include_duplicate_dms {
603 query = query.filter(sql::<diesel::sql_types::Bool>(
606 "NOT EXISTS (
607 SELECT 1 FROM groups g2
608 WHERE COALESCE(g2.dm_id, g2.id) = COALESCE(groups.dm_id, groups.id)
609 AND (COALESCE(g2.last_message_ns, 0), g2.id) > (COALESCE(groups.last_message_ns, 0), groups.id)
610 )",
611 ));
612 }
613
614 if let Some(limit) = limit {
615 query = query.limit(*limit);
616 }
617
618 if let Some(allowed_states) = allowed_states {
619 query = query.filter(dsl::membership_state.eq_any(allowed_states));
620 }
621
622 if let Some(last_activity_after_ns) = last_activity_after_ns {
624 query = query.filter(
627 diesel::dsl::sql::<diesel::sql_types::BigInt>(
628 "COALESCE(last_message_ns, created_at_ns)",
629 )
630 .gt(last_activity_after_ns),
631 );
632 }
633
634 if let Some(created_after_ns) = created_after_ns {
635 query = query.filter(dsl::created_at_ns.gt(created_after_ns));
636 }
637
638 if let Some(last_activity_before_ns) = last_activity_before_ns {
639 query = query.filter(
640 diesel::dsl::sql::<diesel::sql_types::BigInt>(
641 "COALESCE(last_message_ns, created_at_ns)",
642 )
643 .lt(last_activity_before_ns),
644 );
645 }
646
647 if let Some(created_before_ns) = created_before_ns {
648 query = query.filter(dsl::created_at_ns.lt(created_before_ns));
649 }
650
651 if let Some(conversation_type) = conversation_type {
652 query = query.filter(dsl::conversation_type.eq(conversation_type));
653 }
654
655 let effective_consent_states = match &consent_states {
656 Some(states) if !states.is_empty() => states.clone(),
657 _ => vec![ConsentState::Allowed, ConsentState::Unknown],
658 };
659
660 let includes_unknown = effective_consent_states.contains(&ConsentState::Unknown);
661 let includes_all = effective_consent_states.len() == 3;
662
663 if let Some(should_publish_commit_log) = should_publish_commit_log {
664 query = query.filter(dsl::should_publish_commit_log.eq(should_publish_commit_log));
665 }
666
667 let filtered_states: Vec<_> = effective_consent_states
668 .iter()
669 .filter(|state| **state != ConsentState::Unknown)
670 .cloned()
671 .collect();
672
673 let mut groups = if includes_all {
674 self.raw_query_read(|conn| query.load::<StoredGroup>(conn))?
676 } else if includes_unknown {
677 let left_joined_query = query
679 .left_join(consent_dsl::consent_records.on(
680 sql::<diesel::sql_types::Text>("lower(hex(groups.id))").eq(consent_dsl::entity),
681 ))
682 .filter(
683 consent_dsl::state
684 .is_null()
685 .or(consent_dsl::state.eq(ConsentState::Unknown))
686 .or(consent_dsl::state.eq_any(filtered_states.clone())),
687 )
688 .select(dsl::groups::all_columns());
689
690 self.raw_query_read(|conn| left_joined_query.load::<StoredGroup>(conn))?
691 } else {
692 let inner_joined_query = query
694 .inner_join(consent_dsl::consent_records.on(
695 sql::<diesel::sql_types::Text>("lower(hex(groups.id))").eq(consent_dsl::entity),
696 ))
697 .filter(consent_dsl::state.eq_any(filtered_states.clone()))
698 .select(dsl::groups::all_columns());
699
700 self.raw_query_read(|conn| inner_joined_query.load::<StoredGroup>(conn))?
701 };
702
703 if matches!(conversation_type, Some(ConversationType::Sync)) || *include_sync_groups {
706 let query = dsl::groups.filter(dsl::conversation_type.eq(ConversationType::Sync));
707 let mut sync_groups = self.raw_query_read(|conn| query.load(conn))?;
708 groups.append(&mut sync_groups);
709 }
710
711 Ok(groups)
712 }
713
714 fn find_groups_by_id_paged<A: AsRef<GroupQueryArgs>>(
715 &self,
716 args: A,
717 offset: i64,
718 ) -> Result<Vec<StoredGroup>, crate::ConnectionError> {
719 let GroupQueryArgs {
720 created_after_ns,
721 created_before_ns,
722 limit,
723 ..
724 } = args.as_ref();
725
726 let mut query = groups::table
727 .filter(groups::conversation_type.ne_all(ConversationType::virtual_types()))
728 .order(groups::id)
729 .into_boxed();
730
731 if let Some(start_ns) = created_after_ns {
732 query = query.filter(groups::created_at_ns.gt(start_ns));
733 }
734 if let Some(end_ns) = created_before_ns {
735 query = query.filter(groups::created_at_ns.le(end_ns));
736 }
737
738 query = query.limit(limit.unwrap_or(100)).offset(offset);
739
740 self.raw_query_read(|conn| query.load::<StoredGroup>(conn))
741 }
742
743 fn update_group_membership<GroupId: AsRef<[u8]>>(
745 &self,
746 group_id: GroupId,
747 state: GroupMembershipState,
748 ) -> Result<(), crate::ConnectionError> {
749 self.raw_query_write(|conn| {
750 diesel::update(dsl::groups.find(group_id.as_ref()))
751 .set(dsl::membership_state.eq(state))
752 .execute(conn)
753 })?;
754
755 Ok(())
756 }
757
758 fn all_sync_groups(&self) -> Result<Vec<StoredGroup>, crate::ConnectionError> {
759 let query = dsl::groups
760 .order(dsl::created_at_ns.desc())
761 .filter(dsl::conversation_type.eq(ConversationType::Sync));
762
763 self.raw_query_read(|conn| query.load(conn))
764 }
765
766 fn find_sync_group(&self, id: &[u8]) -> Result<Option<StoredGroup>, crate::ConnectionError> {
767 let query = dsl::groups
768 .filter(dsl::conversation_type.eq(ConversationType::Sync))
769 .filter(dsl::id.eq(id));
770
771 self.raw_query_read(|conn| query.first(conn).optional())
772 }
773
774 fn primary_sync_group(&self) -> Result<Option<StoredGroup>, crate::ConnectionError> {
775 let query = dsl::groups
776 .order(dsl::created_at_ns.desc())
777 .filter(dsl::conversation_type.eq(ConversationType::Sync));
778
779 self.raw_query_read(|conn| query.first(conn).optional())
780 }
781
782 fn find_group(&self, id: &[u8]) -> Result<Option<StoredGroup>, crate::ConnectionError> {
784 let query = dsl::groups
785 .order(dsl::created_at_ns.asc())
786 .limit(1)
787 .filter(dsl::id.eq(id));
788 let groups = self.raw_query_read(|conn| query.load(conn))?;
789
790 Ok(groups.into_iter().next())
791 }
792
793 fn find_group_by_sequence_id(
795 &self,
796 cursor: Cursor,
797 ) -> Result<Option<StoredGroup>, crate::ConnectionError> {
798 let query = dsl::groups
799 .order(dsl::created_at_ns.asc())
800 .filter(dsl::sequence_id.eq(cursor.sequence_id as i64))
801 .filter(dsl::originator_id.eq(cursor.originator_id as i64));
802
803 let groups = self.raw_query_read(|conn| query.load(conn))?;
804
805 if groups.len() > 1 {
806 tracing::warn!(
807 cursor.sequence_id,
808 "More than one group found for welcome_id {}",
809 cursor.sequence_id
810 );
811 }
812 Ok(groups.into_iter().next())
813 }
814
815 fn get_rotated_at_ns(&self, group_id: Vec<u8>) -> Result<i64, StorageError> {
816 let last_ts: Option<i64> = self.raw_query_read(|conn| {
817 dsl::groups
818 .find(&group_id)
819 .select(dsl::rotated_at_ns)
820 .first(conn)
821 .optional()
822 })?;
823
824 last_ts.ok_or(StorageError::NotFound(NotFound::InstallationTimeForGroup(
825 group_id,
826 )))
827 }
828
829 fn update_rotated_at_ns(&self, group_id: Vec<u8>) -> Result<(), StorageError> {
831 self.raw_query_write(|conn| {
832 let now = xmtp_common::time::now_ns();
833 diesel::update(dsl::groups.find(&group_id))
834 .set(dsl::rotated_at_ns.eq(now))
835 .execute(conn)
836 })?;
837
838 Ok(())
839 }
840
841 fn get_installations_time_checked(&self, group_id: Vec<u8>) -> Result<i64, StorageError> {
842 let last_ts = self.raw_query_read(|conn| {
843 dsl::groups
844 .find(&group_id)
845 .select(dsl::installations_last_checked)
846 .first(conn)
847 .optional()
848 })?;
849
850 last_ts.ok_or(NotFound::InstallationTimeForGroup(group_id).into())
851 }
852
853 fn update_installations_time_checked(&self, group_id: Vec<u8>) -> Result<(), StorageError> {
855 self.raw_query_write(|conn| {
856 let now = xmtp_common::time::now_ns();
857 diesel::update(dsl::groups.find(&group_id))
858 .set(dsl::installations_last_checked.eq(now))
859 .execute(conn)
860 })?;
861
862 Ok(())
863 }
864
865 fn update_message_disappearing_from_ns(
866 &self,
867 group_id: Vec<u8>,
868 from_ns: Option<i64>,
869 ) -> Result<(), StorageError> {
870 self.raw_query_write(|conn| {
871 diesel::update(dsl::groups.find(&group_id))
872 .set(dsl::message_disappear_from_ns.eq(from_ns))
873 .execute(conn)
874 })?;
875
876 Ok(())
877 }
878
879 fn update_message_disappearing_in_ns(
880 &self,
881 group_id: Vec<u8>,
882 in_ns: Option<i64>,
883 ) -> Result<(), StorageError> {
884 self.raw_query_write(|conn| {
885 diesel::update(dsl::groups.find(&group_id))
886 .set(dsl::message_disappear_in_ns.eq(in_ns))
887 .execute(conn)
888 })?;
889
890 Ok(())
891 }
892
893 fn insert_or_replace_group(&self, group: StoredGroup) -> Result<StoredGroup, StorageError> {
894 let maybe_inserted_group: Option<StoredGroup> = self.raw_query_write(|conn| {
895 diesel::insert_into(dsl::groups)
896 .values(&group)
897 .on_conflict_do_nothing()
898 .get_result(conn)
899 .optional()
900 })?;
901
902 if maybe_inserted_group.is_none() {
903 let mut existing_group: StoredGroup =
904 self.raw_query_read(|conn| dsl::groups.find(&group.id).first(conn))?;
905 if matches!(
907 existing_group.membership_state,
908 GroupMembershipState::Restored
909 ) {
910 self.raw_query_write(|c| {
911 diesel::update(dsl::groups.find(&group.id))
912 .set(&group)
913 .execute(c)
914 })?;
915 }
916
917 if existing_group.sequence_id == group.sequence_id {
918 tracing::info!("Group welcome id already exists");
919 Err(StorageError::Duplicate(DuplicateItem::WelcomeId(
921 existing_group.cursor(),
922 )))
923 } else {
924 tracing::info!("Group already exists");
925 if group.sequence_id.is_some()
928 && (existing_group.sequence_id.is_none()
929 || group.sequence_id > existing_group.sequence_id)
930 {
931 self.raw_query_write(|c| {
932 diesel::update(dsl::groups.find(&group.id))
933 .set(dsl::sequence_id.eq(group.sequence_id))
934 .execute(c)
935 })?;
936 existing_group.sequence_id = group.sequence_id;
937 }
938 Ok(existing_group)
939 }
940 } else {
941 Ok(self.raw_query_read(|c| dsl::groups.find(group.id).first(c))?)
942 }
943 }
944
945 fn group_cursors(&self) -> Result<Vec<Cursor>, crate::ConnectionError> {
947 self.raw_query_read(|conn| {
948 Ok(dsl::groups
949 .filter(dsl::sequence_id.is_not_null())
950 .select((dsl::sequence_id, dsl::originator_id))
951 .load::<(Option<i64>, Option<i64>)>(conn)?
952 .into_iter()
953 .map(|(seq, orig)| {
954 Cursor::new(
955 seq.expect("Filtered for not null") as u64,
956 orig.expect("if seq is not null, originator must not be null") as u32,
957 )
958 })
959 .collect())
960 })
961 }
962
963 fn mark_group_as_maybe_forked(
964 &self,
965 group_id: &[u8],
966 fork_details: String,
967 ) -> Result<(), StorageError> {
968 self.raw_query_write(|conn| {
969 diesel::update(dsl::groups.find(&group_id))
970 .set((
971 dsl::maybe_forked.eq(true),
972 dsl::fork_details.eq(fork_details),
973 ))
974 .execute(conn)
975 })?;
976
977 Ok(())
978 }
979
980 fn clear_fork_flag_for_group(&self, group_id: &[u8]) -> Result<(), crate::ConnectionError> {
981 self.raw_query_write(|conn| {
982 diesel::update(dsl::groups.find(&group_id))
983 .set((dsl::maybe_forked.eq(false), dsl::fork_details.eq("")))
984 .execute(conn)
985 })?;
986 Ok(())
987 }
988
989 fn has_duplicate_dm(&self, group_id: &[u8]) -> Result<bool, crate::ConnectionError> {
990 self.raw_query_read(|conn| {
991 let dm_id: Option<String> = dsl::groups
992 .filter(dsl::id.eq(group_id))
993 .select(dsl::dm_id)
994 .first::<Option<String>>(conn)
995 .optional()?
996 .flatten();
997
998 if let Some(dm_id) = dm_id {
999 let count: i64 = dsl::groups
1000 .filter(dsl::conversation_type.eq(ConversationType::Dm))
1001 .filter(dsl::dm_id.eq(dm_id))
1002 .count()
1003 .get_result(conn)?;
1004
1005 Ok(count > 1)
1006 } else {
1007 Ok(false)
1008 }
1009 })
1010 }
1011
1012 fn get_conversation_ids_for_remote_log_publish(
1015 &self,
1016 ) -> Result<Vec<StoredGroupCommitLogPublicKey>, crate::ConnectionError> {
1017 use crate::schema::consent_records::dsl as consent_dsl;
1018
1019 let query = dsl::groups
1020 .filter(
1021 dsl::conversation_type
1022 .eq(ConversationType::Dm)
1023 .or(dsl::conversation_type
1024 .eq(ConversationType::Group)
1025 .and(dsl::should_publish_commit_log.eq(true))),
1026 )
1027 .inner_join(consent_dsl::consent_records.on(
1028 sql::<diesel::sql_types::Text>("lower(hex(groups.id))").eq(consent_dsl::entity),
1029 ))
1030 .filter(consent_dsl::state.eq(ConsentState::Allowed))
1031 .select((dsl::id, dsl::commit_log_public_key))
1032 .order(dsl::created_at_ns.asc());
1033
1034 self.raw_query_read(|conn| query.load::<StoredGroupCommitLogPublicKey>(conn))
1035 }
1036
1037 fn get_conversation_ids_for_remote_log_download(
1039 &self,
1040 ) -> Result<Vec<StoredGroupCommitLogPublicKey>, crate::ConnectionError> {
1041 use crate::schema::consent_records::dsl as consent_dsl;
1042
1043 let query = dsl::groups
1044 .filter(dsl::conversation_type.ne_all(ConversationType::virtual_types()))
1045 .inner_join(consent_dsl::consent_records.on(
1046 sql::<diesel::sql_types::Text>("lower(hex(groups.id))").eq(consent_dsl::entity),
1047 ))
1048 .filter(consent_dsl::state.eq(ConsentState::Allowed))
1049 .select((dsl::id, dsl::commit_log_public_key));
1050
1051 self.raw_query_read(|conn| query.load::<StoredGroupCommitLogPublicKey>(conn))
1052 }
1053
1054 fn get_conversation_ids_for_fork_check(&self) -> Result<Vec<Vec<u8>>, crate::ConnectionError> {
1056 let query = dsl::groups
1057 .filter(
1058 dsl::conversation_type
1059 .ne_all(ConversationType::virtual_types())
1060 .and(
1061 dsl::is_commit_log_forked
1062 .is_null()
1063 .or(dsl::is_commit_log_forked.ne(Some(true))),
1064 ),
1065 )
1066 .select(dsl::id);
1067
1068 self.raw_query_read(|conn| query.load::<Vec<u8>>(conn))
1069 }
1070
1071 fn get_conversation_ids_for_requesting_readds(
1072 &self,
1073 ) -> Result<Vec<StoredGroupForReaddRequest>, crate::ConnectionError> {
1074 use super::schema::{groups::dsl as groups_dsl, remote_commit_log::dsl as rcl_dsl};
1075 use diesel::dsl::max;
1076
1077 self.raw_query_read(|conn| {
1078 groups_dsl::groups
1079 .left_join(rcl_dsl::remote_commit_log.on(groups_dsl::id.eq(rcl_dsl::group_id)))
1080 .filter(
1081 groups_dsl::conversation_type
1082 .ne_all(ConversationType::virtual_types())
1083 .and(groups_dsl::is_commit_log_forked.eq(true)),
1084 )
1085 .group_by(groups_dsl::id)
1086 .select((groups_dsl::id, max(rcl_dsl::commit_sequence_id).nullable()))
1087 .load::<StoredGroupForReaddRequest>(conn)
1088 })
1089 }
1090
1091 fn get_conversation_ids_for_responding_readds(
1092 &self,
1093 ) -> Result<Vec<StoredGroupForRespondingReadds>, crate::ConnectionError> {
1094 use super::schema::{groups::dsl as groups_dsl, readd_status::dsl as readd_dsl};
1095 use diesel::{ExpressionMethods, JoinOnDsl, QueryDsl};
1096
1097 self.raw_query_read(|conn| {
1098 readd_dsl::readd_status
1099 .inner_join(groups_dsl::groups.on(readd_dsl::group_id.eq(groups_dsl::id)))
1100 .filter(readd_dsl::requested_at_sequence_id.is_not_null())
1101 .filter(
1102 readd_dsl::requested_at_sequence_id
1103 .ge(readd_dsl::responded_at_sequence_id)
1104 .or(readd_dsl::responded_at_sequence_id.is_null()),
1105 )
1106 .select((
1107 groups_dsl::id,
1108 groups_dsl::dm_id,
1109 groups_dsl::conversation_type,
1110 groups_dsl::created_at_ns,
1111 ))
1112 .distinct()
1113 .load::<StoredGroupForRespondingReadds>(conn)
1114 })
1115 }
1116
1117 fn get_conversation_type(
1118 &self,
1119 group_id: &[u8],
1120 ) -> Result<ConversationType, crate::ConnectionError> {
1121 let query = dsl::groups
1122 .filter(dsl::id.eq(group_id))
1123 .select(dsl::conversation_type);
1124 let conversation_type = self.raw_query_read(|conn| query.first(conn))?;
1125 Ok(conversation_type)
1126 }
1127
1128 fn set_group_commit_log_public_key(
1129 &self,
1130 group_id: &[u8],
1131 public_key: &[u8],
1132 ) -> Result<(), StorageError> {
1133 use crate::schema::groups::dsl;
1134 let num_updated = self.raw_query_write(|conn| {
1135 diesel::update(dsl::groups)
1136 .filter(
1137 dsl::id
1138 .eq(group_id)
1139 .and(dsl::commit_log_public_key.is_null()),
1140 )
1141 .set(dsl::commit_log_public_key.eq(public_key))
1142 .execute(conn)
1143 })?;
1144 if num_updated == 0 {
1145 return Err(StorageError::Duplicate(DuplicateItem::CommitLogPublicKey(
1146 group_id.to_vec(),
1147 )));
1148 }
1149 Ok(())
1150 }
1151
1152 fn set_group_commit_log_forked_status(
1153 &self,
1154 group_id: &[u8],
1155 is_forked: Option<bool>,
1156 ) -> Result<(), StorageError> {
1157 use crate::schema::groups::dsl;
1158 self.raw_query_write(|conn| {
1159 diesel::update(dsl::groups.find(group_id))
1160 .set(dsl::is_commit_log_forked.eq(is_forked))
1161 .execute(conn)
1162 })?;
1163 Ok(())
1164 }
1165
1166 fn get_group_commit_log_forked_status(
1167 &self,
1168 group_id: &[u8],
1169 ) -> Result<Option<bool>, StorageError> {
1170 use crate::schema::groups::dsl;
1171 self.raw_query_read(|conn| {
1172 dsl::groups
1173 .find(group_id)
1174 .select(dsl::is_commit_log_forked)
1175 .first::<Option<bool>>(conn)
1176 })
1177 .map_err(StorageError::from)
1178 }
1179
1180 fn set_group_has_pending_leave_request_status(
1181 &self,
1182 group_id: &[u8],
1183 has_pending_leave_request: Option<bool>,
1184 ) -> Result<(), StorageError> {
1185 use crate::schema::groups::dsl;
1186 self.raw_query_write(|conn| {
1187 diesel::update(dsl::groups.find(group_id))
1188 .set(dsl::has_pending_leave_request.eq(has_pending_leave_request))
1189 .execute(conn)
1190 })?;
1191 Ok(())
1192 }
1193
1194 fn get_groups_have_pending_leave_request(
1195 &self,
1196 ) -> Result<Vec<Vec<u8>>, crate::ConnectionError> {
1197 let query = dsl::groups
1198 .filter(
1199 dsl::conversation_type
1200 .ne(ConversationType::Sync)
1201 .and(dsl::has_pending_leave_request.eq(Some(true))),
1202 )
1203 .select(dsl::id);
1204
1205 self.raw_query_read(|conn| query.load::<Vec<u8>>(conn))
1206 }
1207}
1208
1209#[repr(i32)]
1210#[derive(Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq, AsExpression, FromSqlRow)]
1211#[diesel(sql_type = Integer)]
1212pub enum GroupMembershipState {
1214 Allowed = 1,
1216 Rejected = 2,
1218 Pending = 3,
1220 Restored = 4,
1222 PendingRemove = 5,
1224}
1225
1226impl ToSql<Integer, Sqlite> for GroupMembershipState
1227where
1228 i32: ToSql<Integer, Sqlite>,
1229{
1230 fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> serialize::Result {
1231 out.set_value(*self as i32);
1232 Ok(IsNull::No)
1233 }
1234}
1235
1236impl FromSql<Integer, Sqlite> for GroupMembershipState
1237where
1238 i32: FromSql<Integer, Sqlite>,
1239{
1240 fn from_sql(bytes: <Sqlite as Backend>::RawValue<'_>) -> deserialize::Result<Self> {
1241 match i32::from_sql(bytes)? {
1242 1 => Ok(GroupMembershipState::Allowed),
1243 2 => Ok(GroupMembershipState::Rejected),
1244 3 => Ok(GroupMembershipState::Pending),
1245 4 => Ok(GroupMembershipState::Restored),
1246 5 => Ok(GroupMembershipState::PendingRemove),
1247 x => Err(format!("Unrecognized variant {}", x).into()),
1248 }
1249 }
1250}
1251
1252#[repr(i32)]
1253#[derive(Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq, AsExpression, FromSqlRow)]
1254#[diesel(sql_type = Integer)]
1255pub enum ConversationType {
1256 Group = 1,
1257 Dm = 2,
1258 Sync = 3,
1259 Oneshot = 4,
1260}
1261
1262impl ConversationType {
1263 pub fn virtual_types() -> Vec<ConversationType> {
1264 vec![ConversationType::Sync, ConversationType::Oneshot]
1265 }
1266
1267 pub fn is_virtual(&self) -> bool {
1268 match self {
1270 ConversationType::Group => false,
1271 ConversationType::Dm => false,
1272 ConversationType::Sync => true,
1273 ConversationType::Oneshot => true,
1274 }
1275 }
1276}
1277
1278impl ToSql<Integer, Sqlite> for ConversationType
1279where
1280 i32: ToSql<Integer, Sqlite>,
1281{
1282 fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> serialize::Result {
1283 out.set_value(*self as i32);
1284 Ok(IsNull::No)
1285 }
1286}
1287
1288impl FromSql<Integer, Sqlite> for ConversationType
1289where
1290 i32: FromSql<Integer, Sqlite>,
1291{
1292 fn from_sql(bytes: <Sqlite as Backend>::RawValue<'_>) -> deserialize::Result<Self> {
1293 match i32::from_sql(bytes)? {
1294 1 => Ok(ConversationType::Group),
1295 2 => Ok(ConversationType::Dm),
1296 3 => Ok(ConversationType::Sync),
1297 4 => Ok(ConversationType::Oneshot),
1298 x => Err(format!("Unrecognized variant {}", x).into()),
1299 }
1300 }
1301}
1302
1303impl std::fmt::Display for ConversationType {
1304 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1305 use ConversationType::*;
1306 match self {
1307 Group => write!(f, "group"),
1308 Dm => write!(f, "dm"),
1309 Sync => write!(f, "sync"),
1310 Oneshot => write!(f, "oneshot"),
1311 }
1312 }
1313}
1314
1315pub trait DmIdExt {
1316 fn other_inbox_id(&self, id: &str) -> String;
1317}
1318
1319impl DmIdExt for String {
1320 fn other_inbox_id(&self, id: &str) -> String {
1321 let dm_id = &self[3..];
1323
1324 let target_inbox = if dm_id[..id.len()] == *id {
1326 &dm_id[(id.len() + 1)..]
1328 } else {
1329 &dm_id[..id.len()]
1330 };
1331
1332 target_inbox.to_string()
1333 }
1334}
1335
1336#[cfg(test)]
1337pub(crate) mod tests {
1338 #[cfg(target_arch = "wasm32")]
1339 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_dedicated_worker);
1340
1341 pub use super::dms::tests::*;
1342 use super::*;
1343
1344 use crate::{
1345 Fetch, Store,
1346 consent_record::{ConsentType, StoredConsentRecord},
1347 readd_status::ReaddStatus,
1348 schema::groups::dsl::groups,
1349 test_utils::{with_connection, with_connection_async},
1350 };
1351 use xmtp_common::{assert_ok, rand_vec, time::now_ns};
1352 use xmtp_configuration::Originators;
1353
1354 pub fn generate_group(state: Option<GroupMembershipState>) -> StoredGroup {
1356 generate_group_with_created_at(state, now_ns())
1358 }
1359
1360 pub fn generate_group_with_created_at(
1361 state: Option<GroupMembershipState>,
1362 created_at_ns: i64,
1363 ) -> StoredGroup {
1364 let id = rand_vec::<24>();
1365 let membership_state = state.unwrap_or(GroupMembershipState::Allowed);
1366 StoredGroup::builder()
1367 .id(id)
1368 .created_at_ns(created_at_ns)
1369 .membership_state(membership_state)
1370 .added_by_inbox_id("placeholder_address")
1371 .build()
1372 .unwrap()
1373 }
1374
1375 pub fn generate_group_with_welcome(
1377 state: Option<GroupMembershipState>,
1378 welcome_id: Option<i64>,
1379 ) -> StoredGroup {
1380 let id = rand_vec::<24>();
1381 let created_at_ns = now_ns();
1382 let membership_state = state.unwrap_or(GroupMembershipState::Allowed);
1383 StoredGroup::builder()
1384 .id(id)
1385 .created_at_ns(created_at_ns)
1386 .membership_state(membership_state)
1387 .added_by_inbox_id("placeholder_address")
1388 .sequence_id(welcome_id.unwrap_or(xmtp_common::rand_i64()))
1389 .originator_id(Originators::WELCOME_MESSAGES as i64)
1390 .conversation_type(ConversationType::Group)
1391 .build()
1392 .unwrap()
1393 }
1394
1395 pub fn generate_consent_record(
1397 entity_type: ConsentType,
1398 state: ConsentState,
1399 entity: String,
1400 ) -> StoredConsentRecord {
1401 StoredConsentRecord {
1402 entity_type,
1403 state,
1404 entity,
1405 consented_at_ns: now_ns(),
1406 }
1407 }
1408
1409 #[xmtp_common::test]
1410 fn test_it_stores_group() {
1411 with_connection(|conn| {
1412 let test_group = generate_group(None);
1413
1414 test_group.store(conn).unwrap();
1415 assert_eq!(
1416 conn.raw_query_read(|raw_conn| groups.first::<StoredGroup>(raw_conn))
1417 .unwrap(),
1418 test_group
1419 );
1420 })
1421 }
1422
1423 #[xmtp_common::test]
1424 fn test_it_fetches_group() {
1425 with_connection(|conn| {
1426 let test_group = generate_group(None);
1427
1428 conn.raw_query_write(|raw_conn| {
1429 diesel::insert_into(groups)
1430 .values(test_group.clone())
1431 .execute(raw_conn)
1432 })
1433 .unwrap();
1434
1435 let fetched_group: Option<StoredGroup> = conn.fetch(&test_group.id).unwrap();
1436 assert_eq!(fetched_group, Some(test_group));
1437 })
1438 }
1439
1440 #[xmtp_common::test]
1441 fn test_it_updates_group_membership_state() {
1442 with_connection(|conn| {
1443 let test_group = generate_group(Some(GroupMembershipState::Pending));
1444
1445 test_group.store(conn).unwrap();
1446 conn.update_group_membership(&test_group.id, GroupMembershipState::Rejected)
1447 .unwrap();
1448
1449 let updated_group: StoredGroup = conn.fetch(&test_group.id).ok().flatten().unwrap();
1450 assert_eq!(
1451 updated_group,
1452 StoredGroup {
1453 membership_state: GroupMembershipState::Rejected,
1454 ..test_group
1455 }
1456 );
1457 })
1458 }
1459
1460 #[xmtp_common::test]
1461 async fn test_find_groups() {
1462 let wait_in_wasm = async || {
1463 if cfg!(target_arch = "wasm32") {
1468 xmtp_common::time::sleep(std::time::Duration::from_millis(1)).await;
1469 }
1470 };
1471 with_connection_async(|conn| async move {
1472 let test_group_1 = generate_group(Some(GroupMembershipState::Pending));
1473 test_group_1.store(&conn).unwrap();
1474 wait_in_wasm().await;
1475 let test_group_2 = generate_group(Some(GroupMembershipState::Allowed));
1476 test_group_2.store(&conn).unwrap();
1477 wait_in_wasm().await;
1478 let test_group_3 = generate_dm(Some(GroupMembershipState::Allowed));
1479 test_group_3.store(&conn).unwrap();
1480
1481 let other_inbox_id = test_group_3
1482 .dm_id
1483 .unwrap()
1484 .other_inbox_id("placeholder_inbox_id_1");
1485
1486 let all_results = conn
1487 .find_groups(GroupQueryArgs {
1488 conversation_type: Some(ConversationType::Group),
1489 ..Default::default()
1490 })
1491 .unwrap();
1492 assert_eq!(all_results.len(), 2);
1493
1494 let pending_results = conn
1495 .find_groups(GroupQueryArgs {
1496 allowed_states: Some(vec![GroupMembershipState::Pending]),
1497 conversation_type: Some(ConversationType::Group),
1498 ..Default::default()
1499 })
1500 .unwrap();
1501 assert_eq!(pending_results[0].id, test_group_1.id);
1502 assert_eq!(pending_results.len(), 1);
1503
1504 let results_with_limit = conn
1506 .find_groups(GroupQueryArgs {
1507 conversation_type: Some(ConversationType::Group),
1508 limit: Some(1),
1509 ..Default::default()
1510 })
1511 .unwrap();
1512 assert_eq!(results_with_limit.len(), 1);
1513 assert_eq!(results_with_limit[0].id, test_group_1.id);
1514
1515 let results_with_created_at_ns_after = conn
1516 .find_groups(GroupQueryArgs {
1517 conversation_type: Some(ConversationType::Group),
1518 limit: Some(1),
1519 created_after_ns: Some(test_group_1.created_at_ns),
1520 ..Default::default()
1521 })
1522 .unwrap();
1523 assert_eq!(results_with_created_at_ns_after.len(), 1);
1524 assert_eq!(results_with_created_at_ns_after[0].id, test_group_2.id);
1525
1526 let synced_groups = conn.primary_sync_group().unwrap();
1528 assert!(synced_groups.is_none());
1529
1530 let dm_results = conn.find_groups(GroupQueryArgs::default()).unwrap();
1532 assert_eq!(dm_results.len(), 3);
1533 assert_eq!(dm_results[2].id, test_group_3.id);
1534
1535 let dm_result = conn
1537 .find_active_dm_group(format!("dm:placeholder_inbox_id_1:{}", &other_inbox_id))
1538 .unwrap();
1539 assert!(dm_result.is_some());
1540
1541 let dm_results = conn
1543 .find_groups(GroupQueryArgs {
1544 conversation_type: Some(ConversationType::Dm),
1545 ..Default::default()
1546 })
1547 .unwrap();
1548 assert_eq!(dm_results.len(), 1);
1549 assert_eq!(dm_results[0].id, test_group_3.id);
1550 })
1551 .await
1552 }
1553
1554 #[xmtp_common::test]
1555 async fn test_installations_last_checked_is_updated() {
1556 with_connection_async(|conn| async move {
1557 let test_group = generate_group(None);
1558 test_group.store(&conn).unwrap();
1559
1560 assert_eq!(test_group.installations_last_checked, 0);
1562
1563 if cfg!(target_arch = "wasm32") {
1564 xmtp_common::time::sleep(std::time::Duration::from_millis(1)).await;
1567 }
1568 let result = conn.update_installations_time_checked(test_group.id.clone());
1571 assert_ok!(result);
1572
1573 let fetched_group: StoredGroup = conn.fetch(&test_group.id).ok().flatten().unwrap();
1575 assert_ne!(fetched_group.installations_last_checked, 0);
1576 assert!(fetched_group.created_at_ns < fetched_group.installations_last_checked);
1577 })
1578 .await
1579 }
1580
1581 #[xmtp_common::test]
1582 fn test_new_group_has_correct_purpose() {
1583 with_connection(|conn| {
1584 let test_group = generate_group(None);
1585
1586 conn.raw_query_write(|raw_conn| {
1587 diesel::insert_into(groups)
1588 .values(test_group.clone())
1589 .execute(raw_conn)
1590 })
1591 .unwrap();
1592
1593 let fetched_group: Option<StoredGroup> = conn.fetch(&test_group.id).unwrap();
1594 assert_eq!(fetched_group, Some(test_group));
1595 let conversation_type = fetched_group.unwrap().conversation_type;
1596 assert_eq!(conversation_type, ConversationType::Group);
1597 })
1598 }
1599
1600 #[xmtp_common::test]
1601 fn test_find_groups_by_consent_state() {
1602 with_connection(|conn| {
1603 let test_group_1 = generate_group(Some(GroupMembershipState::Allowed));
1604 test_group_1.store(conn).unwrap();
1605 let test_group_2 = generate_group(Some(GroupMembershipState::Allowed));
1606 test_group_2.store(conn).unwrap();
1607 let test_group_3 = generate_dm(Some(GroupMembershipState::Allowed));
1608 test_group_3.store(conn).unwrap();
1609 let test_group_4 = generate_dm(Some(GroupMembershipState::Allowed));
1610 test_group_4.store(conn).unwrap();
1611
1612 let test_group_1_consent = generate_consent_record(
1613 ConsentType::ConversationId,
1614 ConsentState::Allowed,
1615 hex::encode(test_group_1.id.clone()),
1616 );
1617 test_group_1_consent.store(conn).unwrap();
1618 let test_group_2_consent = generate_consent_record(
1619 ConsentType::ConversationId,
1620 ConsentState::Denied,
1621 hex::encode(test_group_2.id.clone()),
1622 );
1623 test_group_2_consent.store(conn).unwrap();
1624 let test_group_3_consent = generate_consent_record(
1625 ConsentType::ConversationId,
1626 ConsentState::Allowed,
1627 hex::encode(test_group_3.id.clone()),
1628 );
1629 test_group_3_consent.store(conn).unwrap();
1630
1631 let all_results = conn
1632 .find_groups(GroupQueryArgs {
1633 consent_states: Some(vec![
1634 ConsentState::Allowed,
1635 ConsentState::Unknown,
1636 ConsentState::Denied,
1637 ]),
1638 ..Default::default()
1639 })
1640 .unwrap();
1641 assert_eq!(all_results.len(), 4);
1642
1643 let default_results = conn.find_groups(GroupQueryArgs::default()).unwrap();
1644 assert_eq!(default_results.len(), 3);
1645
1646 let allowed_results = conn
1647 .find_groups(GroupQueryArgs {
1648 consent_states: Some(vec![ConsentState::Allowed]),
1649 ..Default::default()
1650 })
1651 .unwrap();
1652 assert_eq!(allowed_results.len(), 2);
1653
1654 let allowed_unknown_results = conn
1655 .find_groups(GroupQueryArgs {
1656 consent_states: Some(vec![ConsentState::Allowed, ConsentState::Unknown]),
1657 ..Default::default()
1658 })
1659 .unwrap();
1660 assert_eq!(allowed_unknown_results.len(), 3);
1661
1662 let denied_results = conn
1663 .find_groups(GroupQueryArgs {
1664 consent_states: Some(vec![ConsentState::Denied]),
1665 ..Default::default()
1666 })
1667 .unwrap();
1668 assert_eq!(denied_results.len(), 1);
1669 assert_eq!(denied_results[0].id, test_group_2.id);
1670
1671 let unknown_results = conn
1672 .find_groups(GroupQueryArgs {
1673 consent_states: Some(vec![ConsentState::Unknown]),
1674 ..Default::default()
1675 })
1676 .unwrap();
1677 assert_eq!(unknown_results.len(), 1);
1678 assert_eq!(unknown_results[0].id, test_group_4.id);
1679
1680 let empty_array_results = conn
1681 .find_groups(GroupQueryArgs {
1682 consent_states: Some(vec![]),
1683 ..Default::default()
1684 })
1685 .unwrap();
1686 assert_eq!(empty_array_results.len(), 3);
1687 })
1688 }
1689
1690 #[xmtp_common::test]
1691 fn test_get_sequence_ids() {
1692 with_connection(|conn| {
1693 let mls_groups = [
1694 generate_group_with_welcome(None, Some(30)),
1695 generate_group(None),
1696 generate_group(None),
1697 generate_group_with_welcome(None, Some(10)),
1698 ];
1699 for g in mls_groups.iter() {
1700 g.store(conn).unwrap();
1701 }
1702 assert_eq!(
1703 vec![30, 10],
1704 conn.group_cursors()
1705 .unwrap()
1706 .into_iter()
1707 .map(|c| c.sequence_id)
1708 .collect::<Vec<u64>>()
1709 );
1710 })
1711 }
1712
1713 #[xmtp_common::test]
1714 fn test_find_group_default_excludes_denied() {
1715 with_connection(|conn| {
1716 let allowed_group = generate_group(Some(GroupMembershipState::Allowed));
1718 allowed_group.store(conn).unwrap();
1719
1720 let denied_group = generate_group(Some(GroupMembershipState::Allowed));
1721 denied_group.store(conn).unwrap();
1722
1723 let unknown_group = generate_group(Some(GroupMembershipState::Allowed));
1724 unknown_group.store(conn).unwrap();
1725
1726 let allowed_consent = generate_consent_record(
1728 ConsentType::ConversationId,
1729 ConsentState::Allowed,
1730 hex::encode(allowed_group.id.clone()),
1731 );
1732 allowed_consent.store(conn).unwrap();
1733
1734 let denied_consent = generate_consent_record(
1735 ConsentType::ConversationId,
1736 ConsentState::Denied,
1737 hex::encode(denied_group.id.clone()),
1738 );
1739 denied_consent.store(conn).unwrap();
1740
1741 let default_results = conn.find_groups(GroupQueryArgs::default()).unwrap();
1743
1744 assert_eq!(default_results.len(), 2);
1746 let returned_ids: Vec<_> = default_results.iter().map(|g| &g.id).collect();
1747 assert!(returned_ids.contains(&&allowed_group.id));
1748 assert!(returned_ids.contains(&&unknown_group.id));
1749 assert!(!returned_ids.contains(&&denied_group.id));
1750 })
1751 }
1752
1753 #[xmtp_common::test(unwrap_try = true)]
1754 fn test_get_conversation_ids_for_remote_log_publish() {
1755 with_connection(|conn| {
1756 let mut group1 = generate_group(None);
1757 let mut group2 = generate_group(None);
1758 let mut group3 = generate_group(None);
1759 let mut group4 = generate_group(None);
1760 group1.should_publish_commit_log = true;
1761 group1.commit_log_public_key = None;
1762 generate_consent_record(
1763 ConsentType::ConversationId,
1764 ConsentState::Allowed,
1765 hex::encode(group1.id.clone()),
1766 )
1767 .store(conn)?;
1768 group2.should_publish_commit_log = true;
1769 group2.commit_log_public_key = Some(rand_vec::<32>());
1770
1771 group3.should_publish_commit_log = true;
1772 group3.commit_log_public_key = Some(rand_vec::<32>());
1773 generate_consent_record(
1774 ConsentType::ConversationId,
1775 ConsentState::Allowed,
1776 hex::encode(group3.id.clone()),
1777 )
1778 .store(conn)?;
1779 group4.should_publish_commit_log = false;
1780 group1.store(conn)?;
1781 group2.store(conn)?;
1782 group3.store(conn)?;
1783 group4.store(conn)?;
1784
1785 let commit_log_keys = conn.get_conversation_ids_for_remote_log_publish().unwrap();
1786 assert_eq!(commit_log_keys.len(), 2);
1787 assert_eq!(commit_log_keys[0].id, group1.id);
1788 assert_eq!(commit_log_keys[1].id, group3.id);
1789 assert_eq!(commit_log_keys[0].commit_log_public_key, None);
1790 assert_eq!(
1791 commit_log_keys[1].commit_log_public_key,
1792 group3.commit_log_public_key
1793 );
1794 })
1795 }
1796
1797 #[xmtp_common::test]
1798 fn test_get_conversation_ids_for_remote_log_publish_with_consent() {
1799 with_connection(|conn| {
1800 let mut allowed_group = generate_group(None);
1802 allowed_group.should_publish_commit_log = true;
1803 allowed_group.store(conn).unwrap();
1804
1805 let mut denied_group = generate_group(None);
1806 denied_group.should_publish_commit_log = true;
1807 denied_group.store(conn).unwrap();
1808
1809 let mut no_consent_group = generate_group(None);
1810 no_consent_group.should_publish_commit_log = true;
1811 no_consent_group.store(conn).unwrap();
1812
1813 let allowed_consent = generate_consent_record(
1815 ConsentType::ConversationId,
1816 ConsentState::Allowed,
1817 hex::encode(allowed_group.id.clone()),
1818 );
1819 allowed_consent.store(conn).unwrap();
1820
1821 let denied_consent = generate_consent_record(
1822 ConsentType::ConversationId,
1823 ConsentState::Denied,
1824 hex::encode(denied_group.id.clone()),
1825 );
1826 denied_consent.store(conn).unwrap();
1827
1828 let commit_log_keys = conn.get_conversation_ids_for_remote_log_publish().unwrap();
1830 assert_eq!(commit_log_keys.len(), 1);
1831 assert_eq!(commit_log_keys[0].id, allowed_group.id);
1832 })
1833 }
1834
1835 #[xmtp_common::test]
1836 fn test_get_conversation_ids_for_remote_log_download_with_consent() {
1837 with_connection(|conn| {
1838 let allowed_group = generate_group(None);
1840 allowed_group.store(conn).unwrap();
1841
1842 let denied_group = generate_group(None);
1843 denied_group.store(conn).unwrap();
1844
1845 let no_consent_group = generate_group(None);
1846 no_consent_group.store(conn).unwrap();
1847
1848 let mut sync_group = generate_group(None);
1850 sync_group.conversation_type = ConversationType::Sync;
1851 sync_group.store(conn).unwrap();
1852 let sync_consent = generate_consent_record(
1853 ConsentType::ConversationId,
1854 ConsentState::Allowed,
1855 hex::encode(sync_group.id.clone()),
1856 );
1857 sync_consent.store(conn).unwrap();
1858
1859 let allowed_consent = generate_consent_record(
1861 ConsentType::ConversationId,
1862 ConsentState::Allowed,
1863 hex::encode(allowed_group.id.clone()),
1864 );
1865 allowed_consent.store(conn).unwrap();
1866
1867 let denied_consent = generate_consent_record(
1868 ConsentType::ConversationId,
1869 ConsentState::Denied,
1870 hex::encode(denied_group.id.clone()),
1871 );
1872 denied_consent.store(conn).unwrap();
1873
1874 let conversation_ids = conn.get_conversation_ids_for_remote_log_download().unwrap();
1876 assert_eq!(conversation_ids.len(), 1);
1877 assert_eq!(conversation_ids[0].id, allowed_group.id);
1878 })
1879 }
1880
1881 #[xmtp_common::test]
1882 fn test_get_conversation_ids_for_responding_readds() {
1883 with_connection(|conn| {
1884 let group_id_1 = vec![1, 2, 3];
1886 let group_id_2 = vec![4, 5, 6];
1887 let group_id_3 = vec![7, 8, 9];
1888
1889 let group1 = StoredGroup::builder()
1890 .id(group_id_1.clone())
1891 .created_at_ns(1000)
1892 .membership_state(GroupMembershipState::Allowed)
1893 .added_by_inbox_id("placeholder_address")
1894 .build()
1895 .unwrap();
1896 group1.store(conn).unwrap();
1897
1898 let group2 = StoredGroup::builder()
1899 .id(group_id_2.clone())
1900 .created_at_ns(2000)
1901 .membership_state(GroupMembershipState::Allowed)
1902 .added_by_inbox_id("placeholder_address")
1903 .build()
1904 .unwrap();
1905 group2.store(conn).unwrap();
1906
1907 let group3 = StoredGroup::builder()
1908 .id(group_id_3.clone())
1909 .created_at_ns(3000)
1910 .membership_state(GroupMembershipState::Allowed)
1911 .added_by_inbox_id("placeholder_address")
1912 .build()
1913 .unwrap();
1914 group3.store(conn).unwrap();
1915
1916 let test_cases = vec![
1918 ReaddStatus {
1920 group_id: group_id_1.clone(),
1921 installation_id: vec![1],
1922 requested_at_sequence_id: Some(10),
1923 responded_at_sequence_id: Some(5),
1924 },
1925 ReaddStatus {
1927 group_id: group_id_1.clone(),
1928 installation_id: vec![2],
1929 requested_at_sequence_id: Some(8),
1930 responded_at_sequence_id: None,
1931 },
1932 ReaddStatus {
1934 group_id: group_id_2.clone(),
1935 installation_id: vec![4],
1936 requested_at_sequence_id: Some(12),
1937 responded_at_sequence_id: Some(15),
1938 },
1939 ReaddStatus {
1941 group_id: group_id_2.clone(),
1942 installation_id: vec![5],
1943 requested_at_sequence_id: None,
1944 responded_at_sequence_id: Some(20),
1945 },
1946 ReaddStatus {
1948 group_id: group_id_3.clone(),
1949 installation_id: vec![6],
1950 requested_at_sequence_id: Some(25),
1951 responded_at_sequence_id: Some(25),
1952 },
1953 ];
1954
1955 for status in test_cases {
1957 status.store(conn).unwrap();
1958 }
1959
1960 let result = conn.get_conversation_ids_for_responding_readds().unwrap();
1962
1963 assert_eq!(result.len(), 2);
1966
1967 let mut result_group_ids: Vec<Vec<u8>> =
1969 result.iter().map(|r| r.group_id.clone()).collect();
1970 result_group_ids.sort();
1971
1972 assert_eq!(result_group_ids[0], group_id_1);
1973 assert_eq!(result_group_ids[1], group_id_3);
1974
1975 let group1_result = result.iter().find(|r| r.group_id == group_id_1).unwrap();
1977 assert_eq!(group1_result.dm_id, None);
1978 assert_eq!(group1_result.conversation_type, ConversationType::Group);
1979 assert_eq!(group1_result.created_at_ns, 1000);
1980
1981 let group3_result = result.iter().find(|r| r.group_id == group_id_3).unwrap();
1982 assert_eq!(group3_result.dm_id, None);
1983 assert_eq!(group3_result.conversation_type, ConversationType::Group);
1984 assert_eq!(group3_result.created_at_ns, 3000);
1985 })
1986 }
1987}