1use super::ConnectionExt;
2use super::schema::conversation_list::dsl::conversation_list;
3use crate::consent_record::ConsentState;
4use crate::group::{ConversationType, GroupMembershipState, GroupQueryArgs, GroupQueryOrderBy};
5use crate::group_message::{ContentType, DeliveryStatus, GroupMessageKind};
6use crate::{DbConnection, StorageError};
7use diesel::dsl::sql;
8use diesel::{
9 BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, Queryable, RunQueryDsl, Table,
10};
11use serde::{Deserialize, Serialize};
12
13#[derive(Queryable, Debug, Clone, Deserialize, Serialize)]
14#[diesel(table_name = conversation_list)]
15#[diesel(primary_key(id))]
16pub struct ConversationListItem {
18 pub id: Vec<u8>,
20 pub created_at_ns: i64,
22 pub membership_state: GroupMembershipState,
24 pub installations_last_checked: i64,
26 pub added_by_inbox_id: String,
28 pub welcome_sequence_id: Option<i64>,
30 pub dm_id: Option<String>,
32 pub rotated_at_ns: i64,
34 pub conversation_type: ConversationType,
36 pub is_commit_log_forked: Option<bool>,
38 pub message_id: Option<Vec<u8>>,
40 pub decrypted_message_bytes: Option<Vec<u8>>,
42 pub sent_at_ns: Option<i64>,
44 pub kind: Option<GroupMessageKind>,
46 pub sender_installation_id: Option<Vec<u8>>,
48 pub sender_inbox_id: Option<String>,
50 pub delivery_status: Option<DeliveryStatus>,
52 pub content_type: Option<ContentType>,
54 pub version_major: Option<i32>,
56 pub version_minor: Option<i32>,
58 pub authority_id: Option<String>,
60 pub sequence_id: Option<i64>,
62 pub originator_id: Option<i64>,
64}
65
66pub trait QueryConversationList {
67 fn fetch_conversation_list<A: AsRef<GroupQueryArgs>>(
68 &self,
69 args: A,
70 ) -> Result<Vec<ConversationListItem>, StorageError>;
71}
72
73impl<T> QueryConversationList for &T
74where
75 T: QueryConversationList,
76{
77 fn fetch_conversation_list<A: AsRef<GroupQueryArgs>>(
78 &self,
79 args: A,
80 ) -> Result<Vec<ConversationListItem>, StorageError> {
81 (**self).fetch_conversation_list(args)
82 }
83}
84
85impl<C: ConnectionExt> QueryConversationList for DbConnection<C> {
86 fn fetch_conversation_list<A: AsRef<GroupQueryArgs>>(
87 &self,
88 args: A,
89 ) -> Result<Vec<ConversationListItem>, StorageError> {
90 use crate::schema::consent_records::dsl as consent_dsl;
91 use crate::schema::conversation_list::dsl as conversation_list_dsl;
92
93 args.as_ref().validate()?;
94
95 let GroupQueryArgs {
96 allowed_states,
97 created_after_ns,
98 created_before_ns,
99 limit,
100 conversation_type,
101 consent_states,
102 include_sync_groups,
103 include_duplicate_dms,
104 last_activity_after_ns,
105 last_activity_before_ns,
106 order_by,
107 ..
108 } = args.as_ref();
109
110 let order_expression = match order_by.clone().unwrap_or_default() {
111 GroupQueryOrderBy::CreatedAt => {
112 diesel::dsl::sql::<diesel::sql_types::BigInt>("created_at_ns DESC")
113 }
114 GroupQueryOrderBy::LastActivity => diesel::dsl::sql::<diesel::sql_types::BigInt>(
115 "COALESCE(sent_at_ns, created_at_ns) DESC",
116 ),
117 };
118
119 let mut query = conversation_list
120 .select(conversation_list::all_columns())
121 .filter(
122 conversation_list_dsl::conversation_type.ne_all(ConversationType::virtual_types()),
123 )
124 .order(order_expression)
125 .into_boxed();
126
127 if !include_duplicate_dms {
128 query = query.filter(sql::<diesel::sql_types::Bool>(
131 "NOT EXISTS (
132 SELECT 1 FROM groups g2
133 WHERE COALESCE(g2.dm_id, g2.id) = COALESCE(conversation_list.dm_id, conversation_list.id)
134 AND (COALESCE(g2.last_message_ns, 0) > COALESCE((
135 SELECT g1.last_message_ns FROM groups g1 WHERE g1.id = conversation_list.id
136 ), 0)
137 OR (COALESCE(g2.last_message_ns, 0) = COALESCE((
138 SELECT g1.last_message_ns FROM groups g1 WHERE g1.id = conversation_list.id
139 ), 0) AND g2.id > conversation_list.id))
140 )",
141 ));
142 }
143
144 if let Some(limit) = limit {
145 query = query.limit(*limit);
146 }
147
148 if let Some(allowed_states) = allowed_states {
149 query = query.filter(conversation_list_dsl::membership_state.eq_any(allowed_states));
150 }
151
152 if let Some(last_activity_after_ns) = last_activity_after_ns {
154 query = query.filter(
157 diesel::dsl::sql::<diesel::sql_types::BigInt>(
158 "COALESCE(sent_at_ns, created_at_ns)",
159 )
160 .gt(last_activity_after_ns),
161 );
162 }
163
164 if let Some(created_after_ns) = created_after_ns {
165 query = query.filter(conversation_list_dsl::created_at_ns.gt(created_after_ns));
166 }
167
168 if let Some(last_activity_before_ns) = last_activity_before_ns {
169 query = query.filter(
170 diesel::dsl::sql::<diesel::sql_types::BigInt>(
171 "COALESCE(sent_at_ns, created_at_ns)",
172 )
173 .lt(last_activity_before_ns),
174 );
175 }
176
177 if let Some(created_before_ns) = created_before_ns {
178 query = query.filter(conversation_list_dsl::created_at_ns.lt(created_before_ns));
179 }
180
181 if let Some(conversation_type) = conversation_type {
182 query = query.filter(conversation_list_dsl::conversation_type.eq(conversation_type));
183 }
184
185 let effective_consent_states = match &consent_states {
186 Some(states) if !states.is_empty() => states.clone(),
187 _ => vec![ConsentState::Allowed, ConsentState::Unknown],
188 };
189
190 let includes_unknown = effective_consent_states.contains(&ConsentState::Unknown);
191 let includes_all = effective_consent_states.len() == 3;
192
193 let filtered_states: Vec<_> = effective_consent_states
194 .iter()
195 .filter(|state| **state != ConsentState::Unknown)
196 .cloned()
197 .collect();
198
199 let mut conversations = if includes_all {
200 self.raw_query_read(|conn| query.load::<ConversationListItem>(conn))?
202 } else if includes_unknown {
203 let left_joined_query = query
205 .left_join(
206 consent_dsl::consent_records.on(sql::<diesel::sql_types::Text>(
207 "lower(hex(conversation_list.id))",
208 )
209 .eq(consent_dsl::entity)),
210 )
211 .filter(
212 consent_dsl::state
213 .is_null()
214 .or(consent_dsl::state.eq(ConsentState::Unknown))
215 .or(consent_dsl::state.eq_any(filtered_states.clone())),
216 )
217 .select(conversation_list::all_columns());
218
219 self.raw_query_read(|conn| left_joined_query.load::<ConversationListItem>(conn))?
220 } else {
221 let inner_joined_query = query
223 .inner_join(
224 consent_dsl::consent_records.on(sql::<diesel::sql_types::Text>(
225 "lower(hex(conversation_list.id))",
226 )
227 .eq(consent_dsl::entity)),
228 )
229 .filter(consent_dsl::state.eq_any(filtered_states.clone()))
230 .select(conversation_list::all_columns());
231
232 self.raw_query_read(|conn| inner_joined_query.load::<ConversationListItem>(conn))?
233 };
234
235 if matches!(conversation_type, Some(ConversationType::Sync)) || *include_sync_groups {
238 let query = conversation_list_dsl::conversation_list
239 .filter(conversation_list_dsl::conversation_type.eq(ConversationType::Sync));
240 let mut sync_groups = self.raw_query_read(|conn| query.load(conn))?;
241 conversations.append(&mut sync_groups);
242 }
243
244 Ok(conversations)
245 }
246}
247
248#[cfg(test)]
249pub(crate) mod tests {
250 use crate::Store;
251 use crate::consent_record::{ConsentState, ConsentType};
252 use crate::group::tests::{
253 generate_consent_record, generate_dm, generate_group, generate_group_with_created_at,
254 };
255 use crate::group::{GroupMembershipState, GroupQueryArgs, GroupQueryOrderBy};
256 use crate::group_message::ContentType;
257 use crate::group_message::tests::generate_message;
258 use crate::prelude::*;
259 use crate::test_utils::with_connection;
260
261 #[xmtp_common::test]
262 fn test_single_group_multiple_messages() {
263 with_connection(|conn| {
264 let group = generate_group(None);
266 group.store(conn).unwrap();
267
268 for i in 1..5 {
270 let message = crate::encrypted_store::group_message::tests::generate_message(
271 None,
272 Some(&group.id),
273 Some(i * 1000),
274 Some(ContentType::Text),
275 None,
276 None,
277 );
278
279 message.store(conn).unwrap();
280 }
281
282 let conversation_list = conn
284 .fetch_conversation_list(GroupQueryArgs::default())
285 .unwrap();
286 assert_eq!(conversation_list.len(), 1, "Should return one group");
287 assert_eq!(
288 conversation_list[0].id, group.id,
289 "Returned group ID should match the created group"
290 );
291 assert_eq!(
292 conversation_list[0].sent_at_ns.unwrap(),
293 4000,
294 "Last message should be the most recent one"
295 );
296 })
297 }
298
299 #[xmtp_common::test]
300 fn test_three_groups_specific_ordering() {
301 with_connection(|conn| {
302 let group_a = generate_group_with_created_at(None, 5000); let group_b = generate_group_with_created_at(None, 2000); let group_c = generate_group_with_created_at(None, 1000); group_a.store(conn).unwrap();
308 group_b.store(conn).unwrap();
309 group_c.store(conn).unwrap();
310 let message = crate::encrypted_store::group_message::tests::generate_message(
312 None,
313 Some(&group_b.id),
314 Some(3000), None,
316 None,
317 None,
318 );
319 message.store(conn).unwrap();
320
321 let conversation_list = conn
323 .fetch_conversation_list(GroupQueryArgs::default())
324 .unwrap();
325
326 assert_eq!(conversation_list.len(), 3, "Should return all three groups");
327 assert_eq!(
328 conversation_list[0].id, group_a.id,
329 "Group created after the last message should come first"
330 );
331 assert_eq!(
332 conversation_list[1].id, group_b.id,
333 "Group with the last message should come second"
334 );
335 assert_eq!(
336 conversation_list[2].id, group_c.id,
337 "Group created before the last message with no messages should come last"
338 );
339 })
340 }
341
342 #[xmtp_common::test]
343 fn test_group_with_newer_message_update() {
344 with_connection(|conn| {
345 let group = generate_group(None);
347 group.store(conn).unwrap();
348
349 let first_message = crate::encrypted_store::group_message::tests::generate_message(
351 None,
352 Some(&group.id),
353 Some(1000),
354 Some(ContentType::Text),
355 None,
356 None,
357 );
358 first_message.store(conn).unwrap();
359
360 let mut conversation_list = conn
362 .fetch_conversation_list(GroupQueryArgs::default())
363 .unwrap();
364 assert_eq!(conversation_list.len(), 1, "Should return one group");
365 assert_eq!(
366 conversation_list[0].sent_at_ns.unwrap(),
367 1000,
368 "Last message should match the first message"
369 );
370
371 let second_message = crate::encrypted_store::group_message::tests::generate_message(
373 None,
374 Some(&group.id),
375 Some(2000),
376 Some(ContentType::Text),
377 None,
378 None,
379 );
380 second_message.store(conn).unwrap();
381
382 conversation_list = conn
384 .fetch_conversation_list(GroupQueryArgs::default())
385 .unwrap();
386 assert_eq!(
387 conversation_list[0].sent_at_ns.unwrap(),
388 2000,
389 "Last message should now match the second (newest) message"
390 );
391 })
392 }
393
394 #[xmtp_common::test]
395 fn test_find_conversations_by_consent_state() {
396 with_connection(|conn| {
397 let test_group_1 = generate_group(Some(GroupMembershipState::Allowed));
398 test_group_1.store(conn).unwrap();
399 let test_group_2 = generate_group(Some(GroupMembershipState::Allowed));
400 test_group_2.store(conn).unwrap();
401 let test_group_3 = generate_dm(Some(GroupMembershipState::Allowed));
402 test_group_3.store(conn).unwrap();
403 let test_group_4 = generate_dm(Some(GroupMembershipState::Allowed));
404 test_group_4.store(conn).unwrap();
405
406 let test_group_1_consent = generate_consent_record(
407 ConsentType::ConversationId,
408 ConsentState::Allowed,
409 hex::encode(test_group_1.id.clone()),
410 );
411 test_group_1_consent.store(conn).unwrap();
412 let test_group_2_consent = generate_consent_record(
413 ConsentType::ConversationId,
414 ConsentState::Denied,
415 hex::encode(test_group_2.id.clone()),
416 );
417 test_group_2_consent.store(conn).unwrap();
418 let test_group_3_consent = generate_consent_record(
419 ConsentType::ConversationId,
420 ConsentState::Allowed,
421 hex::encode(test_group_3.id.clone()),
422 );
423 test_group_3_consent.store(conn).unwrap();
424
425 let all_results = conn
426 .fetch_conversation_list(GroupQueryArgs {
427 consent_states: Some(vec![
428 ConsentState::Allowed,
429 ConsentState::Unknown,
430 ConsentState::Denied,
431 ]),
432 ..Default::default()
433 })
434 .unwrap();
435 assert_eq!(all_results.len(), 4);
436
437 let default_results = conn
438 .fetch_conversation_list(GroupQueryArgs::default())
439 .unwrap();
440 assert_eq!(default_results.len(), 3);
441
442 let allowed_results = conn
443 .fetch_conversation_list(GroupQueryArgs {
444 consent_states: Some(vec![ConsentState::Allowed]),
445 ..Default::default()
446 })
447 .unwrap();
448 assert_eq!(allowed_results.len(), 2);
449
450 let allowed_unknown_results = conn
451 .fetch_conversation_list(GroupQueryArgs {
452 consent_states: Some(vec![ConsentState::Allowed, ConsentState::Unknown]),
453 ..Default::default()
454 })
455 .unwrap();
456 assert_eq!(allowed_unknown_results.len(), 3);
457
458 let denied_results = conn
459 .fetch_conversation_list(GroupQueryArgs {
460 consent_states: Some(vec![ConsentState::Denied]),
461 ..Default::default()
462 })
463 .unwrap();
464 assert_eq!(denied_results.len(), 1);
465 assert_eq!(denied_results[0].id, test_group_2.id);
466
467 let unknown_results = conn
468 .fetch_conversation_list(GroupQueryArgs {
469 consent_states: Some(vec![ConsentState::Unknown]),
470 ..Default::default()
471 })
472 .unwrap();
473 assert_eq!(unknown_results.len(), 1);
474 assert_eq!(unknown_results[0].id, test_group_4.id);
475
476 let empty_array_results = conn
477 .fetch_conversation_list(GroupQueryArgs {
478 consent_states: Some(vec![]),
479 ..Default::default()
480 })
481 .unwrap();
482 assert_eq!(empty_array_results.len(), 3);
483 })
484 }
485
486 #[xmtp_common::test]
487 fn test_find_conversations_default_excludes_denied() {
488 with_connection(|conn| {
489 let allowed_group = generate_group(Some(GroupMembershipState::Allowed));
491 allowed_group.store(conn).unwrap();
492
493 let denied_group = generate_group(Some(GroupMembershipState::Allowed));
494 denied_group.store(conn).unwrap();
495
496 let unknown_group = generate_group(Some(GroupMembershipState::Allowed));
497 unknown_group.store(conn).unwrap();
498
499 let allowed_consent = generate_consent_record(
501 ConsentType::ConversationId,
502 ConsentState::Allowed,
503 hex::encode(allowed_group.id.clone()),
504 );
505 allowed_consent.store(conn).unwrap();
506
507 let denied_consent = generate_consent_record(
508 ConsentType::ConversationId,
509 ConsentState::Denied,
510 hex::encode(denied_group.id.clone()),
511 );
512 denied_consent.store(conn).unwrap();
513
514 let default_results = conn
516 .fetch_conversation_list(GroupQueryArgs::default())
517 .unwrap();
518
519 assert_eq!(default_results.len(), 2);
521 let returned_ids: Vec<_> = default_results.iter().map(|g| &g.id).collect();
522 assert!(returned_ids.contains(&&allowed_group.id));
523 assert!(returned_ids.contains(&&unknown_group.id));
524 assert!(!returned_ids.contains(&&denied_group.id));
525 })
526 }
527
528 #[xmtp_common::test(unwrap_try = true)]
529 fn test_unknown_content_type_is_present() {
530 with_connection(|conn| {
531 let dm = generate_dm(None);
532 dm.store(conn)?;
533
534 let m = generate_message(
535 None,
536 Some(&dm.id),
537 Some(5000),
538 Some(ContentType::Unknown),
539 None,
540 None,
541 );
542 m.store(conn)?;
543
544 let conv = conn.fetch_conversation_list(GroupQueryArgs {
545 ..Default::default()
546 })?;
547
548 assert!(conv[0].message_id.is_some());
550 })
551 }
552
553 #[xmtp_common::test]
554 fn test_last_activity_after_ns_filter() {
555 with_connection(|conn| {
556 let group1 = generate_group_with_created_at(None, 1000);
558 let group2 = generate_group_with_created_at(None, 2000);
559 let group3 = generate_group_with_created_at(None, 3000);
560
561 group1.store(conn).unwrap();
562 group2.store(conn).unwrap();
563 group3.store(conn).unwrap();
564
565 let message1 = crate::encrypted_store::group_message::tests::generate_message(
567 None,
568 Some(&group1.id),
569 Some(5000),
570 Some(ContentType::Text),
571 None,
572 None,
573 );
574 message1.store(conn).unwrap();
575
576 let message2 = crate::encrypted_store::group_message::tests::generate_message(
578 None,
579 Some(&group2.id),
580 Some(4000),
581 Some(ContentType::Text),
582 None,
583 None,
584 );
585 message2.store(conn).unwrap();
586
587 let results = conn
591 .fetch_conversation_list(GroupQueryArgs {
592 last_activity_after_ns: Some(3500),
593 ..Default::default()
594 })
595 .unwrap();
596 assert_eq!(
597 results.len(),
598 2,
599 "Should return groups with activity after 3500"
600 );
601
602 let returned_ids: Vec<_> = results.iter().map(|g| &g.id).collect();
603 assert!(
604 returned_ids.contains(&&group1.id),
605 "Should include group1 (message at 5000)"
606 );
607 assert!(
608 returned_ids.contains(&&group2.id),
609 "Should include group2 (message at 4000)"
610 );
611 assert!(
612 !returned_ids.contains(&&group3.id),
613 "Should not include group3 (created at 3000)"
614 );
615
616 let results = conn
618 .fetch_conversation_list(GroupQueryArgs {
619 last_activity_after_ns: Some(4500),
620 ..Default::default()
621 })
622 .unwrap();
623 assert_eq!(results.len(), 1, "Should return only group1");
624 assert_eq!(results[0].id, group1.id, "Should be group1");
625
626 let results = conn
628 .fetch_conversation_list(GroupQueryArgs {
629 last_activity_after_ns: Some(2500),
630 ..Default::default()
631 })
632 .unwrap();
633 assert_eq!(results.len(), 3, "Should return all groups");
634 })
635 }
636
637 #[xmtp_common::test]
638 fn test_last_activity_before_ns_filter() {
639 with_connection(|conn| {
640 let group1 = generate_group_with_created_at(None, 1000);
642 let group2 = generate_group_with_created_at(None, 2000);
643 let group3 = generate_group_with_created_at(None, 3000);
644
645 group1.store(conn).unwrap();
646 group2.store(conn).unwrap();
647 group3.store(conn).unwrap();
648
649 let message1 = crate::encrypted_store::group_message::tests::generate_message(
651 None,
652 Some(&group1.id),
653 Some(5000),
654 Some(ContentType::Text),
655 None,
656 None,
657 );
658 message1.store(conn).unwrap();
659
660 let message2 = crate::encrypted_store::group_message::tests::generate_message(
662 None,
663 Some(&group2.id),
664 Some(4000),
665 Some(ContentType::Text),
666 None,
667 None,
668 );
669 message2.store(conn).unwrap();
670
671 let results = conn
675 .fetch_conversation_list(GroupQueryArgs {
676 last_activity_before_ns: Some(4500),
677 ..Default::default()
678 })
679 .unwrap();
680 assert_eq!(
681 results.len(),
682 2,
683 "Should return groups with activity before 4500"
684 );
685
686 let returned_ids: Vec<_> = results.iter().map(|g| &g.id).collect();
687 assert!(
688 !returned_ids.contains(&&group1.id),
689 "Should not include group1 (message at 5000)"
690 );
691 assert!(
692 returned_ids.contains(&&group2.id),
693 "Should include group2 (message at 4000)"
694 );
695 assert!(
696 returned_ids.contains(&&group3.id),
697 "Should include group3 (created at 3000)"
698 );
699
700 let results = conn
702 .fetch_conversation_list(GroupQueryArgs {
703 last_activity_before_ns: Some(3500),
704 ..Default::default()
705 })
706 .unwrap();
707 assert_eq!(results.len(), 1, "Should return only group3");
708 assert_eq!(results[0].id, group3.id, "Should be group3");
709
710 let results = conn
712 .fetch_conversation_list(GroupQueryArgs {
713 last_activity_before_ns: Some(5500),
714 ..Default::default()
715 })
716 .unwrap();
717 assert_eq!(results.len(), 3, "Should return all groups");
718 })
719 }
720
721 #[xmtp_common::test]
722 fn test_activity_filters_combined_with_limit() {
723 with_connection(|conn| {
724 let mut groups = Vec::new();
726 for i in 0..5 {
727 let group = generate_group_with_created_at(None, (i + 1) * 1000);
728 group.store(conn).unwrap();
729
730 let message = crate::encrypted_store::group_message::tests::generate_message(
732 None,
733 Some(&group.id),
734 Some((100 - i) * 1000), Some(ContentType::Text),
736 None,
737 None,
738 );
739 message.store(conn).unwrap();
740 groups.push(group);
741 }
742
743 let results = conn
746 .fetch_conversation_list(GroupQueryArgs {
747 last_activity_after_ns: Some(96_000),
748 limit: Some(2),
749 order_by: Some(GroupQueryOrderBy::LastActivity),
750 ..Default::default()
751 })
752 .unwrap();
753 assert_eq!(results.len(), 2, "Should return 2 groups due to limit");
754
755 assert_eq!(
757 results[0].sent_at_ns.unwrap(),
758 100_000,
759 "First should be most recent"
760 );
761 assert_eq!(
762 results[1].sent_at_ns.unwrap(),
763 99_000,
764 "Second should be second most recent"
765 );
766 })
767 }
768}