1use super::ConnectionExt;
2use super::group::ConversationType;
3use super::schema::groups;
4use super::{
5 Sqlite,
6 db_connection::DbConnection,
7 schema::{
8 group_messages::{self, dsl},
9 groups::dsl as groups_dsl,
10 },
11};
12use crate::impl_fetch;
13use derive_builder::Builder;
14use diesel::{
15 backend::Backend,
16 deserialize::{self, FromSql, FromSqlRow},
17 dsl::sql as diesel_sql,
18 expression::AsExpression,
19 prelude::*,
20 serialize::{self, IsNull, Output, ToSql},
21 sql_types::Integer,
22};
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25use xmtp_common::{NS_IN_DAY, time::now_ns};
26use xmtp_content_types::{
27 actions, attachment, delete_message, group_updated, intent, leave_request, markdown,
28 membership_change, multi_remote_attachment, reaction, read_receipt, remote_attachment, reply,
29 text, transaction_reference, wallet_send_calls,
30};
31use xmtp_proto::types::Cursor;
32
33mod convert;
34#[cfg(test)]
35pub mod messages_newer_than_tests;
36#[cfg(test)]
37pub mod tests;
38
39#[derive(
40 Debug, Clone, Serialize, Deserialize, Queryable, Selectable, Identifiable, Eq, PartialEq,
41)]
42#[diesel(table_name = group_messages)]
43#[diesel(primary_key(id))]
44#[diesel(check_for_backend(Sqlite))]
45pub struct StoredGroupMessage {
47 pub id: Vec<u8>,
49 pub group_id: Vec<u8>,
51 pub decrypted_message_bytes: Vec<u8>,
53 pub sent_at_ns: i64,
55 pub kind: GroupMessageKind,
57 pub sender_installation_id: Vec<u8>,
59 pub sender_inbox_id: String,
61 pub delivery_status: DeliveryStatus,
63 pub content_type: ContentType,
65 pub version_major: i32,
67 pub version_minor: i32,
69 pub authority_id: String,
71 pub reference_id: Option<Vec<u8>>,
73 pub originator_id: i64,
75 pub sequence_id: i64,
77 pub inserted_at_ns: i64,
80 pub expire_at_ns: Option<i64>,
82 pub should_push: bool,
84}
85
86impl StoredGroupMessage {
87 pub fn cursor(&self) -> Cursor {
88 Cursor::new(self.sequence_id as u64, self.originator_id as u32)
89 }
90}
91
92#[derive(Debug, Clone, Insertable)]
94#[diesel(table_name = group_messages)]
95struct NewStoredGroupMessage {
96 pub id: Vec<u8>,
97 pub group_id: Vec<u8>,
98 pub decrypted_message_bytes: Vec<u8>,
99 pub sent_at_ns: i64,
100 pub kind: GroupMessageKind,
101 pub sender_installation_id: Vec<u8>,
102 pub sender_inbox_id: String,
103 pub delivery_status: DeliveryStatus,
104 pub content_type: ContentType,
105 pub version_major: i32,
106 pub version_minor: i32,
107 pub authority_id: String,
108 pub reference_id: Option<Vec<u8>>,
109 pub originator_id: i64,
110 pub sequence_id: i64,
111 pub expire_at_ns: Option<i64>,
113 pub should_push: bool,
114}
115
116impl From<&StoredGroupMessage> for NewStoredGroupMessage {
117 fn from(msg: &StoredGroupMessage) -> Self {
118 Self {
119 id: msg.id.clone(),
120 group_id: msg.group_id.clone(),
121 decrypted_message_bytes: msg.decrypted_message_bytes.clone(),
122 sent_at_ns: msg.sent_at_ns,
123 kind: msg.kind,
124 sender_installation_id: msg.sender_installation_id.clone(),
125 sender_inbox_id: msg.sender_inbox_id.clone(),
126 delivery_status: msg.delivery_status,
127 content_type: msg.content_type,
128 version_major: msg.version_major,
129 version_minor: msg.version_minor,
130 authority_id: msg.authority_id.clone(),
131 reference_id: msg.reference_id.clone(),
132 originator_id: msg.originator_id,
133 sequence_id: msg.sequence_id,
134 expire_at_ns: msg.expire_at_ns,
135 should_push: msg.should_push,
136 }
137 }
138}
139
140pub struct StoredGroupMessageWithReactions {
141 pub message: StoredGroupMessage,
142 pub reactions: Vec<StoredGroupMessage>,
144}
145
146#[derive(Clone, Debug, PartialEq, Default)]
147pub enum SortDirection {
148 #[default]
149 Ascending,
150 Descending,
151}
152
153#[derive(Clone, Debug, PartialEq, Default)]
154pub enum SortBy {
155 #[default]
156 SentAt,
157 InsertedAt,
158}
159
160#[repr(i32)]
161#[derive(Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq, AsExpression, FromSqlRow)]
162#[diesel(sql_type = Integer)]
163pub enum GroupMessageKind {
164 Application = 1,
165 MembershipChange = 2,
166}
167
168impl ToSql<Integer, Sqlite> for GroupMessageKind
169where
170 i32: ToSql<Integer, Sqlite>,
171{
172 fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> serialize::Result {
173 out.set_value(*self as i32);
174 Ok(IsNull::No)
175 }
176}
177
178impl FromSql<Integer, Sqlite> for GroupMessageKind
179where
180 i32: FromSql<Integer, Sqlite>,
181{
182 fn from_sql(bytes: <Sqlite as Backend>::RawValue<'_>) -> deserialize::Result<Self> {
183 match i32::from_sql(bytes)? {
184 1 => Ok(GroupMessageKind::Application),
185 2 => Ok(GroupMessageKind::MembershipChange),
186 x => Err(format!("Unrecognized variant {}", x).into()),
187 }
188 }
189}
190
191pub trait Deletable {
193 fn is_deletable(&self) -> bool;
195}
196
197impl Deletable for GroupMessageKind {
198 fn is_deletable(&self) -> bool {
199 match self {
200 GroupMessageKind::Application => true,
202 GroupMessageKind::MembershipChange => false,
204 }
205 }
206}
207
208#[repr(i32)]
210#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, FromSqlRow, AsExpression)]
211#[diesel(sql_type = diesel::sql_types::Integer)]
212pub enum ContentType {
213 Unknown = 0,
214 Text = 1,
215 GroupMembershipChange = 2,
216 GroupUpdated = 3,
217 Reaction = 4,
218 ReadReceipt = 5,
219 Reply = 6,
220 Attachment = 7,
221 RemoteAttachment = 8,
222 TransactionReference = 9,
223 WalletSendCalls = 10,
224 LeaveRequest = 11,
225 Markdown = 12,
226 Actions = 13,
227 Intent = 14,
228 MultiRemoteAttachment = 15,
229 DeleteMessage = 16,
230}
231
232impl ContentType {
233 pub fn all() -> Vec<ContentType> {
234 vec![
235 ContentType::Unknown,
236 ContentType::Text,
237 ContentType::GroupMembershipChange,
238 ContentType::GroupUpdated,
239 ContentType::Reaction,
240 ContentType::ReadReceipt,
241 ContentType::Reply,
242 ContentType::Attachment,
243 ContentType::RemoteAttachment,
244 ContentType::TransactionReference,
245 ContentType::WalletSendCalls,
246 ContentType::LeaveRequest,
247 ContentType::Markdown,
248 ContentType::Actions,
249 ContentType::Intent,
250 ContentType::MultiRemoteAttachment,
251 ContentType::DeleteMessage,
252 ]
253 }
254}
255
256impl Deletable for ContentType {
257 fn is_deletable(&self) -> bool {
258 match self {
259 ContentType::GroupMembershipChange
260 | ContentType::GroupUpdated
261 | ContentType::LeaveRequest
262 | ContentType::Reaction
263 | ContentType::ReadReceipt
264 | ContentType::Actions
265 | ContentType::Intent
266 | ContentType::DeleteMessage
267 |ContentType::Unknown => false,
269
270 ContentType::Text
271 | ContentType::Markdown
272 | ContentType::Reply
273 | ContentType::Attachment
274 | ContentType::RemoteAttachment
275 | ContentType::TransactionReference
276 | ContentType::MultiRemoteAttachment
277 | ContentType::WalletSendCalls => true,
278 }
279 }
280}
281
282impl std::fmt::Display for ContentType {
283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284 let as_string = match self {
285 Self::Unknown => "unknown",
286 Self::Text => text::TextCodec::TYPE_ID,
287 Self::Markdown => markdown::MarkdownCodec::TYPE_ID,
288 Self::GroupMembershipChange => membership_change::GroupMembershipChangeCodec::TYPE_ID,
289 Self::GroupUpdated => group_updated::GroupUpdatedCodec::TYPE_ID,
290 Self::Reaction => reaction::ReactionCodec::TYPE_ID,
291 Self::ReadReceipt => read_receipt::ReadReceiptCodec::TYPE_ID,
292 Self::Attachment => attachment::AttachmentCodec::TYPE_ID,
293 Self::RemoteAttachment => remote_attachment::RemoteAttachmentCodec::TYPE_ID,
294 Self::Reply => reply::ReplyCodec::TYPE_ID,
295 Self::TransactionReference => transaction_reference::TransactionReferenceCodec::TYPE_ID,
296 Self::WalletSendCalls => wallet_send_calls::WalletSendCallsCodec::TYPE_ID,
297 Self::LeaveRequest => leave_request::LeaveRequestCodec::TYPE_ID,
298 Self::Actions => actions::ActionsCodec::TYPE_ID,
299 Self::Intent => intent::IntentCodec::TYPE_ID,
300 Self::MultiRemoteAttachment => {
301 multi_remote_attachment::MultiRemoteAttachmentCodec::TYPE_ID
302 }
303 Self::DeleteMessage => delete_message::DeleteMessageCodec::TYPE_ID,
304 };
305
306 write!(f, "{}", as_string)
307 }
308}
309
310impl From<String> for ContentType {
311 fn from(type_id: String) -> Self {
312 match type_id.as_str() {
313 text::TextCodec::TYPE_ID => Self::Text,
314 markdown::MarkdownCodec::TYPE_ID => Self::Markdown,
315 membership_change::GroupMembershipChangeCodec::TYPE_ID => Self::GroupMembershipChange,
316 group_updated::GroupUpdatedCodec::TYPE_ID => Self::GroupUpdated,
317 reaction::ReactionCodec::TYPE_ID => Self::Reaction,
318 read_receipt::ReadReceiptCodec::TYPE_ID => Self::ReadReceipt,
319 reply::ReplyCodec::TYPE_ID => Self::Reply,
320 attachment::AttachmentCodec::TYPE_ID => Self::Attachment,
321 remote_attachment::RemoteAttachmentCodec::TYPE_ID => Self::RemoteAttachment,
322 transaction_reference::TransactionReferenceCodec::TYPE_ID => Self::TransactionReference,
323 wallet_send_calls::WalletSendCallsCodec::TYPE_ID => Self::WalletSendCalls,
324 leave_request::LeaveRequestCodec::TYPE_ID => Self::LeaveRequest,
325 actions::ActionsCodec::TYPE_ID => Self::Actions,
326 intent::IntentCodec::TYPE_ID => Self::Intent,
327 multi_remote_attachment::MultiRemoteAttachmentCodec::TYPE_ID => {
328 Self::MultiRemoteAttachment
329 }
330 delete_message::DeleteMessageCodec::TYPE_ID => Self::DeleteMessage,
331 _ => Self::Unknown,
332 }
333 }
334}
335
336impl ToSql<Integer, Sqlite> for ContentType
337where
338 i32: ToSql<Integer, Sqlite>,
339{
340 fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> serialize::Result {
341 out.set_value(*self as i32);
342 Ok(IsNull::No)
343 }
344}
345
346impl FromSql<Integer, Sqlite> for ContentType
347where
348 i32: FromSql<Integer, Sqlite>,
349{
350 fn from_sql(bytes: <Sqlite as Backend>::RawValue<'_>) -> deserialize::Result<Self> {
351 match i32::from_sql(bytes)? {
352 0 => Ok(ContentType::Unknown),
353 1 => Ok(ContentType::Text),
354 2 => Ok(ContentType::GroupMembershipChange),
355 3 => Ok(ContentType::GroupUpdated),
356 4 => Ok(ContentType::Reaction),
357 5 => Ok(ContentType::ReadReceipt),
358 6 => Ok(ContentType::Reply),
359 7 => Ok(ContentType::Attachment),
360 8 => Ok(ContentType::RemoteAttachment),
361 9 => Ok(ContentType::TransactionReference),
362 10 => Ok(ContentType::WalletSendCalls),
363 11 => Ok(ContentType::LeaveRequest),
364 12 => Ok(ContentType::Markdown),
365 13 => Ok(ContentType::Actions),
366 14 => Ok(ContentType::Intent),
367 15 => Ok(ContentType::MultiRemoteAttachment),
368 16 => Ok(ContentType::DeleteMessage),
369 x => Err(format!("Unrecognized variant {}", x).into()),
370 }
371 }
372}
373
374#[repr(i32)]
375#[derive(Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq, FromSqlRow, AsExpression)]
376#[diesel(sql_type = Integer)]
377pub enum DeliveryStatus {
378 Unpublished = 1,
379 Published = 2,
380 Failed = 3,
381}
382
383impl ToSql<Integer, Sqlite> for DeliveryStatus
384where
385 i32: ToSql<Integer, Sqlite>,
386{
387 fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> serialize::Result {
388 out.set_value(*self as i32);
389 Ok(IsNull::No)
390 }
391}
392
393impl FromSql<Integer, Sqlite> for DeliveryStatus
394where
395 i32: FromSql<Integer, Sqlite>,
396{
397 fn from_sql(bytes: <Sqlite as Backend>::RawValue<'_>) -> deserialize::Result<Self> {
398 match i32::from_sql(bytes)? {
399 1 => Ok(DeliveryStatus::Unpublished),
400 2 => Ok(DeliveryStatus::Published),
401 3 => Ok(DeliveryStatus::Failed),
402 x => Err(format!("Unrecognized variant {}", x).into()),
403 }
404 }
405}
406
407impl_fetch!(StoredGroupMessage, group_messages, Vec<u8>);
408
409impl<C> crate::Store<C> for StoredGroupMessage
411where
412 C: crate::ConnectionExt,
413{
414 type Output = ();
415 fn store(&self, into: &C) -> Result<(), crate::StorageError> {
416 let new_msg = NewStoredGroupMessage::from(self);
417 into.raw_query_write::<_, _>(|conn| {
418 diesel::insert_into(group_messages::table)
419 .values(&new_msg)
420 .execute(conn)
421 .map(|_| ())
422 })
423 .map_err(Into::into)
424 }
425}
426
427impl<C> crate::StoreOrIgnore<C> for StoredGroupMessage
429where
430 C: crate::ConnectionExt,
431{
432 type Output = ();
433
434 fn store_or_ignore(&self, into: &C) -> Result<(), crate::StorageError> {
435 let new_msg = NewStoredGroupMessage::from(self);
436 into.raw_query_write(|conn| {
437 diesel::insert_or_ignore_into(group_messages::table)
438 .values(&new_msg)
439 .execute(conn)
440 .map(|_| ())
441 })
442 .map_err(Into::into)
443 }
444}
445
446#[derive(Default, Clone, Builder, Debug)]
447#[builder(setter(into))]
448pub struct MsgQueryArgs {
449 #[builder(default = None)]
450 pub sent_after_ns: Option<i64>,
451 #[builder(default = None)]
452 pub sent_before_ns: Option<i64>,
453 #[builder(default = None)]
454 pub kind: Option<GroupMessageKind>,
455 #[builder(default = None)]
456 pub delivery_status: Option<DeliveryStatus>,
457 #[builder(default = None)]
458 pub limit: Option<i64>,
459 #[builder(default = None)]
460 pub direction: Option<SortDirection>,
461 #[builder(default = None)]
462 pub content_types: Option<Vec<ContentType>>,
463 #[builder(default = None)]
464 pub exclude_content_types: Option<Vec<ContentType>>,
465 #[builder(default = None)]
466 pub exclude_sender_inbox_ids: Option<Vec<String>>,
467 #[builder(default = None)]
468 pub sort_by: Option<SortBy>,
469 #[builder(default = None)]
470 pub inserted_after_ns: Option<i64>,
471 #[builder(default = None)]
472 pub inserted_before_ns: Option<i64>,
473 #[builder(default = false)]
474 pub exclude_disappearing: bool,
475}
476
477impl MsgQueryArgs {
478 pub fn builder() -> MsgQueryArgsBuilder {
479 MsgQueryArgsBuilder::default()
480 }
481}
482
483#[derive(Default, Clone, Builder)]
484pub struct RelationQuery {
485 #[builder(default = None)]
486 pub content_types: Option<Vec<ContentType>>,
487 #[builder(default = None)]
488 pub limit: Option<i64>,
489 #[builder(default = SortDirection::Ascending)]
490 pub direction: SortDirection,
491}
492
493impl RelationQuery {
494 pub fn builder() -> RelationQueryBuilder {
495 RelationQueryBuilder::default()
496 }
497}
498
499pub type InboundRelations = HashMap<Vec<u8>, Vec<StoredGroupMessage>>;
500pub type OutboundRelations = HashMap<Vec<u8>, StoredGroupMessage>;
501pub type RelationCounts = HashMap<Vec<u8>, usize>;
502
503pub struct MessagesWithRelations {
504 pub messages: Vec<StoredGroupMessage>,
505 pub outbound_relations: HashMap<Vec<u8>, StoredGroupMessage>,
507 pub inbound_relations: HashMap<Vec<u8>, Vec<StoredGroupMessage>>,
509}
510
511pub type LatestMessageTimeBySender = HashMap<String, i64>;
512
513pub trait QueryGroupMessage {
514 fn get_group_messages(
516 &self,
517 group_id: &[u8],
518 args: &MsgQueryArgs,
519 ) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError>;
520
521 fn count_group_messages(
523 &self,
524 group_id: &[u8],
525 args: &MsgQueryArgs,
526 ) -> Result<i64, crate::ConnectionError>;
527
528 fn group_messages_paged(
529 &self,
530 args: &MsgQueryArgs,
531 offset: i64,
532 ) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError>;
533
534 fn get_group_messages_with_reactions(
536 &self,
537 group_id: &[u8],
538 args: &MsgQueryArgs,
539 ) -> Result<Vec<StoredGroupMessageWithReactions>, crate::ConnectionError>;
540
541 fn get_inbound_relations(
542 &self,
543 group_id: &[u8],
544 message_ids: &[&[u8]],
545 relation_query: RelationQuery,
546 ) -> Result<InboundRelations, crate::ConnectionError>;
547
548 fn get_outbound_relations(
549 &self,
550 group_id: &[u8],
551 message_ids: &[&[u8]],
552 ) -> Result<OutboundRelations, crate::ConnectionError>;
553
554 fn get_inbound_relation_counts(
555 &self,
556 group_id: &[u8],
557 message_ids: &[&[u8]],
558 relation_query: RelationQuery,
559 ) -> Result<RelationCounts, crate::ConnectionError>;
560
561 fn get_group_message<MessageId: AsRef<[u8]>>(
563 &self,
564 id: MessageId,
565 ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError>;
566
567 fn get_latest_message_times_by_sender<GroupId: AsRef<[u8]>>(
568 &self,
569 group_id: GroupId,
570 allowed_content_types: &[ContentType],
571 ) -> Result<LatestMessageTimeBySender, crate::ConnectionError>;
572
573 fn write_conn_get_group_message<MessageId: AsRef<[u8]>>(
575 &self,
576 id: MessageId,
577 ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError>;
578
579 fn get_group_message_by_timestamp<GroupId: AsRef<[u8]>>(
580 &self,
581 group_id: GroupId,
582 timestamp: i64,
583 ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError>;
584
585 fn get_group_message_by_cursor<GroupId: AsRef<[u8]>>(
586 &self,
587 group_id: GroupId,
588 sequence_id: Cursor,
589 ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError>;
590
591 fn set_delivery_status_to_published<MessageId: AsRef<[u8]>>(
592 &self,
593 msg_id: &MessageId,
594 timestamp: u64,
595 cursor: Cursor,
596 message_expire_at_ns: Option<i64>,
597 ) -> Result<usize, crate::ConnectionError>;
598
599 fn set_delivery_status_to_failed<MessageId: AsRef<[u8]>>(
600 &self,
601 msg_id: &MessageId,
602 ) -> Result<usize, crate::ConnectionError>;
603
604 fn delete_expired_messages(&self) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError>;
605
606 fn delete_message_by_id<MessageId: AsRef<[u8]>>(
607 &self,
608 message_id: MessageId,
609 ) -> Result<usize, crate::ConnectionError>;
610
611 fn messages_newer_than(
612 &self,
613 cursors_by_group: &HashMap<Vec<u8>, xmtp_proto::types::GlobalCursor>,
614 ) -> Result<Vec<Cursor>, crate::ConnectionError>;
615
616 fn clear_messages(
625 &self,
626 group_ids: Option<&[Vec<u8>]>,
627 retention_days: Option<u32>,
628 ) -> Result<usize, crate::ConnectionError>;
629}
630
631impl<T> QueryGroupMessage for &T
632where
633 T: QueryGroupMessage,
634{
635 fn get_group_messages(
637 &self,
638 group_id: &[u8],
639 args: &MsgQueryArgs,
640 ) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError> {
641 (**self).get_group_messages(group_id, args)
642 }
643
644 fn count_group_messages(
646 &self,
647 group_id: &[u8],
648 args: &MsgQueryArgs,
649 ) -> Result<i64, crate::ConnectionError> {
650 (**self).count_group_messages(group_id, args)
651 }
652
653 fn group_messages_paged(
654 &self,
655 args: &MsgQueryArgs,
656 offset: i64,
657 ) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError> {
658 (**self).group_messages_paged(args, offset)
659 }
660
661 fn get_group_messages_with_reactions(
663 &self,
664 group_id: &[u8],
665 args: &MsgQueryArgs,
666 ) -> Result<Vec<StoredGroupMessageWithReactions>, crate::ConnectionError> {
667 (**self).get_group_messages_with_reactions(group_id, args)
668 }
669
670 fn get_inbound_relations(
671 &self,
672 group_id: &[u8],
673 message_ids: &[&[u8]],
674 relation_query: RelationQuery,
675 ) -> Result<InboundRelations, crate::ConnectionError> {
676 (**self).get_inbound_relations(group_id, message_ids, relation_query)
677 }
678
679 fn get_outbound_relations(
680 &self,
681 group_id: &[u8],
682 message_ids: &[&[u8]],
683 ) -> Result<OutboundRelations, crate::ConnectionError> {
684 (**self).get_outbound_relations(group_id, message_ids)
685 }
686
687 fn get_inbound_relation_counts(
688 &self,
689 group_id: &[u8],
690 message_ids: &[&[u8]],
691 relation_query: RelationQuery,
692 ) -> Result<RelationCounts, crate::ConnectionError> {
693 (**self).get_inbound_relation_counts(group_id, message_ids, relation_query)
694 }
695
696 fn get_latest_message_times_by_sender<GroupId: AsRef<[u8]>>(
697 &self,
698 group_id: GroupId,
699 allowed_content_types: &[ContentType],
700 ) -> Result<LatestMessageTimeBySender, crate::ConnectionError> {
701 (**self).get_latest_message_times_by_sender(group_id, allowed_content_types)
702 }
703
704 fn get_group_message<MessageId: AsRef<[u8]>>(
706 &self,
707 id: MessageId,
708 ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
709 (**self).get_group_message(id)
710 }
711
712 fn write_conn_get_group_message<MessageId: AsRef<[u8]>>(
714 &self,
715 id: MessageId,
716 ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
717 (**self).write_conn_get_group_message(id)
718 }
719
720 fn get_group_message_by_timestamp<GroupId: AsRef<[u8]>>(
721 &self,
722 group_id: GroupId,
723 timestamp: i64,
724 ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
725 (**self).get_group_message_by_timestamp(group_id, timestamp)
726 }
727
728 fn get_group_message_by_cursor<GroupId: AsRef<[u8]>>(
729 &self,
730 group_id: GroupId,
731 cursor: Cursor,
732 ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
733 (**self).get_group_message_by_cursor(group_id, cursor)
734 }
735
736 fn set_delivery_status_to_published<MessageId: AsRef<[u8]>>(
737 &self,
738 msg_id: &MessageId,
739 timestamp: u64,
740 cursor: Cursor,
741 message_expire_at_ns: Option<i64>,
742 ) -> Result<usize, crate::ConnectionError> {
743 (**self).set_delivery_status_to_published(msg_id, timestamp, cursor, message_expire_at_ns)
744 }
745
746 fn set_delivery_status_to_failed<MessageId: AsRef<[u8]>>(
747 &self,
748 msg_id: &MessageId,
749 ) -> Result<usize, crate::ConnectionError> {
750 (**self).set_delivery_status_to_failed(msg_id)
751 }
752
753 fn delete_expired_messages(&self) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError> {
754 (**self).delete_expired_messages()
755 }
756
757 fn delete_message_by_id<MessageId: AsRef<[u8]>>(
758 &self,
759 message_id: MessageId,
760 ) -> Result<usize, crate::ConnectionError> {
761 (**self).delete_message_by_id(message_id)
762 }
763
764 fn messages_newer_than(
765 &self,
766 cursors_by_group: &HashMap<Vec<u8>, xmtp_proto::types::GlobalCursor>,
767 ) -> Result<Vec<Cursor>, crate::ConnectionError> {
768 (**self).messages_newer_than(cursors_by_group)
769 }
770
771 fn clear_messages(
772 &self,
773 group_ids: Option<&[Vec<u8>]>,
774 retention_days: Option<u32>,
775 ) -> Result<usize, crate::ConnectionError> {
776 (**self).clear_messages(group_ids, retention_days)
777 }
778}
779
780macro_rules! apply_message_filters {
782 ($query:expr, $args:expr) => {{
783 let mut query = $query;
784
785 if let Some(sent_after) = $args.sent_after_ns {
786 query = query.filter(dsl::sent_at_ns.gt(sent_after));
787 }
788
789 if let Some(sent_before) = $args.sent_before_ns {
790 query = query.filter(dsl::sent_at_ns.lt(sent_before));
791 }
792
793 if let Some(kind) = $args.kind {
794 query = query.filter(dsl::kind.eq(kind));
795 }
796
797 if let Some(status) = $args.delivery_status {
798 query = query.filter(dsl::delivery_status.eq(status));
799 }
800
801 if let Some(content_types) = &$args.content_types {
802 query = query.filter(dsl::content_type.eq_any(content_types));
803 }
804
805 if let Some(exclude_content_types) = &$args.exclude_content_types {
806 query = query.filter(dsl::content_type.ne_all(exclude_content_types));
807 }
808
809 if let Some(exclude_sender_inbox_ids) = &$args.exclude_sender_inbox_ids {
810 query = query.filter(dsl::sender_inbox_id.ne_all(exclude_sender_inbox_ids));
811 }
812
813 if let Some(inserted_after_ns) = $args.inserted_after_ns {
814 query = query.filter(dsl::inserted_at_ns.gt(inserted_after_ns));
815 }
816
817 if let Some(inserted_before_ns) = $args.inserted_before_ns {
818 query = query.filter(dsl::inserted_at_ns.lt(inserted_before_ns));
819 }
820
821 let current_time = now_ns();
823 query = query.filter(
824 dsl::expire_at_ns
825 .is_null()
826 .or(dsl::expire_at_ns.gt(current_time)),
827 );
828
829 query
830 }};
831}
832
833impl<C: ConnectionExt> QueryGroupMessage for DbConnection<C> {
834 fn get_group_messages(
836 &self,
837 group_id: &[u8],
838 args: &MsgQueryArgs,
839 ) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError> {
840 use crate::schema::group_messages::dsl;
841
842 let mut query = dsl::group_messages
844 .filter(group_id_filter(group_id))
845 .into_boxed();
846
847 query = apply_message_filters!(query, args);
849
850 query = match (
852 args.sort_by.clone().unwrap_or_default(),
853 args.direction.clone().unwrap_or_default(),
854 ) {
855 (SortBy::SentAt, SortDirection::Ascending) => {
856 query.order((dsl::sent_at_ns.asc(), diesel_sql::<Integer>("rowid").asc()))
857 }
858 (SortBy::SentAt, SortDirection::Descending) => query.order((
859 dsl::sent_at_ns.desc(),
860 diesel_sql::<Integer>("rowid").desc(),
861 )),
862 (SortBy::InsertedAt, SortDirection::Ascending) => query.order((
863 dsl::inserted_at_ns.asc(),
864 diesel_sql::<Integer>("rowid").asc(),
865 )),
866 (SortBy::InsertedAt, SortDirection::Descending) => query.order((
867 dsl::inserted_at_ns.desc(),
868 diesel_sql::<Integer>("rowid").desc(),
869 )),
870 };
871
872 if let Some(limit) = args.limit {
873 query = query.limit(limit);
874 }
875
876 self.raw_query_read(|conn| query.load::<StoredGroupMessage>(conn))
877 }
878
879 fn count_group_messages(
881 &self,
882 group_id: &[u8],
883 args: &MsgQueryArgs,
884 ) -> Result<i64, crate::ConnectionError> {
885 use crate::schema::{group_messages::dsl, groups::dsl as groups_dsl};
886
887 let is_dm = self.raw_query_read(|conn| {
889 groups_dsl::groups
890 .filter(groups_dsl::id.eq(group_id))
891 .select(groups_dsl::conversation_type)
892 .first::<ConversationType>(conn)
893 })? == ConversationType::Dm;
894
895 let include_group_updated = args
896 .content_types
897 .as_ref()
898 .map(|types| types.contains(&ContentType::GroupUpdated))
899 .unwrap_or(false);
900
901 let mut query = dsl::group_messages
903 .filter(group_id_filter(group_id))
904 .into_boxed();
905
906 if is_dm && !include_group_updated {
913 query = query.filter(dsl::content_type.ne(ContentType::GroupUpdated));
914 }
915
916 query = apply_message_filters!(query, args);
918
919 let count =
920 self.raw_query_read(|conn| query.select(diesel::dsl::count_star()).first::<i64>(conn))?;
921
922 Ok(count)
923 }
924
925 fn group_messages_paged(
926 &self,
927 args: &MsgQueryArgs,
928 offset: i64,
929 ) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError> {
930 let MsgQueryArgs {
931 sent_after_ns,
932 sent_before_ns,
933 limit,
934 exclude_disappearing,
935 ..
936 } = args;
937
938 let mut query = group_messages::table
939 .left_join(groups::table)
940 .filter(groups::conversation_type.ne_all(ConversationType::virtual_types()))
941 .filter(group_messages::kind.eq(GroupMessageKind::Application))
942 .order_by(group_messages::id)
943 .into_boxed();
944
945 if let Some(start_ns) = sent_after_ns {
946 query = query.filter(group_messages::sent_at_ns.gt(start_ns));
947 }
948 if let Some(end_ns) = sent_before_ns {
949 query = query.filter(group_messages::sent_at_ns.le(end_ns));
950 }
951 if *exclude_disappearing {
952 query = query.filter(group_messages::expire_at_ns.is_null());
953 } else {
954 let current_time = now_ns();
956 query = query.filter(
957 group_messages::expire_at_ns
958 .is_null()
959 .or(group_messages::expire_at_ns.gt(current_time)),
960 );
961 }
962
963 query = query.limit(limit.unwrap_or(100)).offset(offset);
964
965 self.raw_query_read(|conn| {
966 query
967 .select(group_messages::all_columns)
968 .load::<StoredGroupMessage>(conn)
969 })
970 }
971
972 fn get_group_messages_with_reactions(
974 &self,
975 group_id: &[u8],
976 args: &MsgQueryArgs,
977 ) -> Result<Vec<StoredGroupMessageWithReactions>, crate::ConnectionError> {
978 let mut modified_args = args.clone();
980 let content_types = match modified_args.content_types.clone() {
982 Some(content_types) => {
983 let mut content_types = content_types.clone();
984 content_types.retain(|content_type| *content_type != ContentType::Reaction);
985 Some(content_types)
986 }
987 None => Some(vec![
988 ContentType::Text,
989 ContentType::GroupMembershipChange,
990 ContentType::GroupUpdated,
991 ContentType::ReadReceipt,
992 ContentType::Reply,
993 ContentType::Attachment,
994 ContentType::RemoteAttachment,
995 ContentType::TransactionReference,
996 ContentType::Unknown,
997 ]),
998 };
999
1000 modified_args.content_types = content_types;
1001 let messages = self.get_group_messages(group_id, &modified_args)?;
1002
1003 let message_ids: Vec<&[u8]> = messages.iter().map(|m| m.id.as_slice()).collect();
1005
1006 let mut reactions_query = dsl::group_messages
1007 .filter(group_id_filter(group_id))
1008 .filter(dsl::reference_id.is_not_null())
1009 .filter(dsl::reference_id.eq_any(message_ids))
1010 .into_boxed();
1011
1012 reactions_query = match args.direction.as_ref().unwrap_or(&SortDirection::Ascending) {
1014 SortDirection::Ascending => reactions_query.order(dsl::sent_at_ns.asc()),
1015 SortDirection::Descending => reactions_query.order(dsl::sent_at_ns.desc()),
1016 };
1017
1018 let reactions: Vec<StoredGroupMessage> =
1019 self.raw_query_read(|conn| reactions_query.load::<StoredGroupMessage>(conn))?;
1020
1021 let mut reactions_by_reference: HashMap<Vec<u8>, Vec<StoredGroupMessage>> = HashMap::new();
1023
1024 for reaction in reactions {
1025 if let Some(reference_id) = &reaction.reference_id {
1026 reactions_by_reference
1027 .entry(reference_id.clone())
1028 .or_default()
1029 .push(reaction);
1030 }
1031 }
1032
1033 let messages_with_reactions: Vec<StoredGroupMessageWithReactions> = messages
1035 .into_iter()
1036 .map(|message| {
1037 let message_clone = message.clone();
1038 StoredGroupMessageWithReactions {
1039 message,
1040 reactions: reactions_by_reference
1041 .remove(&message_clone.id)
1042 .unwrap_or_default(),
1043 }
1044 })
1045 .collect();
1046
1047 Ok(messages_with_reactions)
1048 }
1049
1050 fn get_inbound_relations(
1051 &self,
1052 group_id: &[u8],
1053 message_ids: &[&[u8]],
1054 relation_query: RelationQuery,
1055 ) -> Result<InboundRelations, crate::ConnectionError> {
1056 let mut inbound_relations: HashMap<Vec<u8>, Vec<StoredGroupMessage>> = HashMap::new();
1057
1058 let mut inbound_relations_query = dsl::group_messages
1059 .filter(group_id_filter(group_id))
1060 .filter(dsl::reference_id.is_not_null())
1061 .filter(dsl::reference_id.eq_any(message_ids))
1062 .into_boxed();
1063
1064 if relation_query.direction == SortDirection::Descending {
1065 inbound_relations_query = inbound_relations_query.order(dsl::sent_at_ns.desc());
1066 } else {
1067 inbound_relations_query = inbound_relations_query.order(dsl::sent_at_ns.asc());
1068 }
1069
1070 if let Some(content_types) = relation_query.content_types {
1071 inbound_relations_query =
1072 inbound_relations_query.filter(dsl::content_type.eq_any(content_types));
1073 }
1074
1075 if let Some(limit) = relation_query.limit {
1076 inbound_relations_query = inbound_relations_query.limit(limit);
1077 }
1078
1079 let raw_inbound_relations: Vec<StoredGroupMessage> =
1080 self.raw_query_read(|conn| inbound_relations_query.load::<StoredGroupMessage>(conn))?;
1081
1082 for inbound_reference in raw_inbound_relations {
1083 if let Some(reference_id) = &inbound_reference.reference_id {
1084 inbound_relations
1085 .entry(reference_id.clone())
1086 .or_default()
1087 .push(inbound_reference);
1088 }
1089 }
1090
1091 Ok(inbound_relations)
1092 }
1093
1094 fn get_outbound_relations(
1095 &self,
1096 group_id: &[u8],
1097 reference_ids: &[&[u8]],
1098 ) -> Result<OutboundRelations, crate::ConnectionError> {
1099 let outbound_references_query = dsl::group_messages
1100 .filter(group_id_filter(group_id))
1101 .filter(dsl::id.eq_any(reference_ids))
1102 .into_boxed();
1103
1104 let raw_outbound_references: Vec<StoredGroupMessage> =
1105 self.raw_query_read(|conn| outbound_references_query.load::<StoredGroupMessage>(conn))?;
1106
1107 Ok(raw_outbound_references
1108 .into_iter()
1109 .map(|outbound| (outbound.id.clone(), outbound))
1110 .collect())
1111 }
1112
1113 fn get_inbound_relation_counts(
1114 &self,
1115 group_id: &[u8],
1116 message_ids: &[&[u8]],
1117 relation_query: RelationQuery,
1118 ) -> Result<RelationCounts, crate::ConnectionError> {
1119 let mut count_query = dsl::group_messages
1120 .filter(group_id_filter(group_id))
1121 .filter(dsl::reference_id.is_not_null())
1122 .filter(dsl::reference_id.eq_any(message_ids))
1123 .group_by(dsl::reference_id)
1124 .select((dsl::reference_id, diesel::dsl::count_star()))
1125 .into_boxed();
1126
1127 if let Some(content_types) = relation_query.content_types {
1128 count_query = count_query.filter(dsl::content_type.eq_any(content_types));
1129 }
1130
1131 let raw_counts: Vec<(Option<Vec<u8>>, i64)> =
1132 self.raw_query_read(|conn| count_query.load(conn))?;
1133
1134 Ok(raw_counts
1135 .into_iter()
1136 .filter_map(|(reference_id, count)| reference_id.map(|id| (id, count as usize)))
1137 .collect())
1138 }
1139
1140 fn get_latest_message_times_by_sender<GroupId: AsRef<[u8]>>(
1141 &self,
1142 group_id: GroupId,
1143 allowed_content_types: &[ContentType],
1144 ) -> Result<LatestMessageTimeBySender, crate::ConnectionError> {
1145 let query = dsl::group_messages
1146 .filter(group_id_filter(group_id.as_ref()))
1147 .filter(dsl::content_type.eq_any(allowed_content_types))
1148 .group_by(dsl::sender_inbox_id)
1149 .select((dsl::sender_inbox_id, diesel::dsl::max(dsl::sent_at_ns)))
1150 .into_boxed();
1151
1152 let raw_results: Vec<(String, Option<i64>)> =
1153 self.raw_query_read(|conn| query.load(conn))?;
1154
1155 Ok(raw_results
1156 .into_iter()
1157 .filter_map(|(sender_inbox_id, max_sent_at_ns)| {
1158 max_sent_at_ns.map(|sent_at_ns| (sender_inbox_id, sent_at_ns))
1159 })
1160 .collect())
1161 }
1162
1163 fn get_group_message<MessageId: AsRef<[u8]>>(
1165 &self,
1166 id: MessageId,
1167 ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
1168 self.raw_query_read(|conn| {
1169 dsl::group_messages
1170 .filter(dsl::id.eq(id.as_ref()))
1171 .first::<StoredGroupMessage>(conn)
1172 .optional()
1173 })
1174 }
1175
1176 fn write_conn_get_group_message<MessageId: AsRef<[u8]>>(
1178 &self,
1179 id: MessageId,
1180 ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
1181 self.raw_query_write(|conn| {
1182 dsl::group_messages
1183 .filter(dsl::id.eq(id.as_ref()))
1184 .first::<StoredGroupMessage>(conn)
1185 .optional()
1186 })
1187 }
1188
1189 fn get_group_message_by_timestamp<GroupId: AsRef<[u8]>>(
1190 &self,
1191 group_id: GroupId,
1192 timestamp: i64,
1193 ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
1194 self.raw_query_read(|conn| {
1195 dsl::group_messages
1196 .filter(dsl::group_id.eq(group_id.as_ref()))
1197 .filter(dsl::sent_at_ns.eq(×tamp))
1198 .first::<StoredGroupMessage>(conn)
1199 .optional()
1200 })
1201 }
1202
1203 fn get_group_message_by_cursor<GroupId: AsRef<[u8]>>(
1204 &self,
1205 group_id: GroupId,
1206 cursor: Cursor,
1207 ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
1208 self.raw_query_read(|conn| {
1209 dsl::group_messages
1210 .filter(dsl::group_id.eq(group_id.as_ref()))
1211 .filter(dsl::sequence_id.eq(cursor.sequence_id as i64))
1212 .filter(dsl::originator_id.eq(cursor.originator_id as i64))
1213 .first::<StoredGroupMessage>(conn)
1214 .optional()
1215 })
1216 }
1217
1218 fn set_delivery_status_to_published<MessageId: AsRef<[u8]>>(
1219 &self,
1220 msg_id: &MessageId,
1221 timestamp: u64,
1222 cursor: Cursor,
1223 message_expire_at_ns: Option<i64>,
1224 ) -> Result<usize, crate::ConnectionError> {
1225 tracing::info!(
1226 "Message [{}] published with cursor = {}",
1227 hex::encode(msg_id),
1228 cursor
1229 );
1230 self.raw_query_write(|conn| {
1231 diesel::update(dsl::group_messages)
1232 .filter(dsl::id.eq(msg_id.as_ref()))
1233 .set((
1234 dsl::delivery_status.eq(DeliveryStatus::Published),
1235 dsl::sent_at_ns.eq(timestamp as i64),
1236 dsl::sequence_id.eq(cursor.sequence_id as i64),
1237 dsl::originator_id.eq(cursor.originator_id as i64),
1238 dsl::expire_at_ns.eq(message_expire_at_ns),
1239 ))
1240 .execute(conn)
1241 })
1242 }
1243
1244 fn set_delivery_status_to_failed<MessageId: AsRef<[u8]>>(
1245 &self,
1246 msg_id: &MessageId,
1247 ) -> Result<usize, crate::ConnectionError> {
1248 self.raw_query_write(|conn| {
1249 diesel::update(dsl::group_messages)
1250 .filter(dsl::id.eq(msg_id.as_ref()))
1251 .set((dsl::delivery_status.eq(DeliveryStatus::Failed),))
1252 .execute(conn)
1253 })
1254 }
1255
1256 fn delete_expired_messages(&self) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError> {
1257 self.raw_query_write(|conn| {
1258 use diesel::prelude::*;
1259 let now = now_ns();
1260
1261 diesel::delete(
1262 dsl::group_messages
1263 .filter(dsl::delivery_status.eq(DeliveryStatus::Published))
1264 .filter(dsl::kind.eq(GroupMessageKind::Application))
1265 .filter(dsl::expire_at_ns.is_not_null())
1266 .filter(dsl::expire_at_ns.le(now)),
1267 )
1268 .returning(StoredGroupMessage::as_returning())
1269 .load::<StoredGroupMessage>(conn)
1270 })
1271 }
1272
1273 fn delete_message_by_id<MessageId: AsRef<[u8]>>(
1274 &self,
1275 message_id: MessageId,
1276 ) -> Result<usize, crate::ConnectionError> {
1277 self.raw_query_write(|conn| {
1278 use diesel::prelude::*;
1279 diesel::delete(dsl::group_messages.filter(dsl::id.eq(message_id.as_ref())))
1280 .execute(conn)
1281 })
1282 }
1283
1284 fn messages_newer_than(
1285 &self,
1286 cursors_by_group: &HashMap<Vec<u8>, xmtp_proto::types::GlobalCursor>,
1287 ) -> Result<Vec<Cursor>, crate::ConnectionError> {
1288 use diesel::BoolExpressionMethods;
1289 use diesel::ExpressionMethods;
1290 use diesel::prelude::*;
1291
1292 let mut all_cursors = Vec::new();
1293
1294 let groups: Vec<_> = cursors_by_group.iter().collect();
1296
1297 for batch in groups.chunks(100) {
1299 let mut batch_filter = Box::new(dsl::group_id.eq(&[] as &[u8]))
1302 as Box<
1303 dyn BoxableExpression<
1304 group_messages::table,
1305 Sqlite,
1306 SqlType = diesel::sql_types::Bool,
1307 >,
1308 >;
1309
1310 for (group_id, global_cursor) in batch {
1311 if global_cursor.is_empty() {
1312 batch_filter = Box::new(batch_filter.or(dsl::group_id.eq(group_id.as_slice())));
1314 } else {
1315 let known_originators: Vec<i64> =
1317 global_cursor.keys().map(|k| *k as i64).collect();
1318
1319 let mut originator_filter = Box::new(dsl::originator_id.eq(-1i64))
1321 as Box<
1322 dyn BoxableExpression<
1323 group_messages::table,
1324 Sqlite,
1325 SqlType = diesel::sql_types::Bool,
1326 >,
1327 >;
1328
1329 for (orig_id, seq_id) in global_cursor.iter() {
1331 originator_filter = Box::new(
1332 originator_filter.or(dsl::originator_id
1333 .eq(*orig_id as i64)
1334 .and(dsl::sequence_id.gt(*seq_id as i64))),
1335 );
1336 }
1337
1338 originator_filter = Box::new(
1340 originator_filter.or(dsl::originator_id.ne_all(known_originators)),
1341 );
1342
1343 batch_filter = Box::new(
1345 batch_filter
1346 .or(dsl::group_id.eq(group_id.as_slice()).and(originator_filter)),
1347 );
1348 }
1349 }
1350
1351 let messages: Vec<(i64, i64)> = self.raw_query_read(|conn| {
1353 dsl::group_messages
1354 .select((dsl::originator_id, dsl::sequence_id))
1355 .filter(batch_filter)
1356 .load(conn)
1357 })?;
1358
1359 for (originator_id, sequence_id) in messages {
1360 all_cursors.push(Cursor::new(sequence_id as u64, originator_id as u32));
1361 }
1362 }
1363
1364 Ok(all_cursors)
1365 }
1366
1367 fn clear_messages(
1368 &self,
1369 group_ids: Option<&[Vec<u8>]>,
1370 retention_days: Option<u32>,
1371 ) -> Result<usize, crate::ConnectionError> {
1372 let mut query = diesel::delete(dsl::group_messages).into_boxed();
1373
1374 if let Some(group_ids) = group_ids {
1375 query = query.filter(dsl::group_id.eq_any(group_ids));
1376 }
1377
1378 if let Some(days) = retention_days {
1379 let limit = now_ns().saturating_sub(NS_IN_DAY.saturating_mul(i64::from(days)));
1380 query = query.filter(dsl::sent_at_ns.lt(limit));
1381 }
1382
1383 self.raw_query_write(|conn| query.execute(conn))
1384 }
1385}
1386
1387fn group_id_filter(
1388 group_id: &[u8],
1389) -> impl diesel::expression::BoxableExpression<
1390 group_messages::table,
1391 diesel::sqlite::Sqlite,
1392 SqlType = diesel::sql_types::Bool,
1393> + diesel::expression::NonAggregate {
1394 dsl::group_id.eq_any(
1395 groups_dsl::groups
1396 .filter(
1397 groups_dsl::id.eq(group_id).or(groups_dsl::dm_id.eq_any(
1398 groups_dsl::groups
1399 .select(groups_dsl::dm_id)
1400 .filter(groups_dsl::id.eq(group_id))
1401 .into_boxed(),
1402 )),
1403 )
1404 .select(groups_dsl::id),
1405 )
1406}