xmtp_db/encrypted_store/
group_message.rs

1use super::ConnectionExt;
2use super::group::ConversationType;
3use super::schema::groups;
4use super::{
5    Sqlite,
6    db_connection::DbConnection,
7    schema::{
8        group_messages::{self, dsl},
9        groups::dsl as groups_dsl,
10    },
11};
12use crate::impl_fetch;
13use derive_builder::Builder;
14use diesel::{
15    backend::Backend,
16    deserialize::{self, FromSql, FromSqlRow},
17    dsl::sql as diesel_sql,
18    expression::AsExpression,
19    prelude::*,
20    serialize::{self, IsNull, Output, ToSql},
21    sql_types::Integer,
22};
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25use xmtp_common::{NS_IN_DAY, time::now_ns};
26use xmtp_content_types::{
27    actions, attachment, delete_message, group_updated, intent, leave_request, markdown,
28    membership_change, multi_remote_attachment, reaction, read_receipt, remote_attachment, reply,
29    text, transaction_reference, wallet_send_calls,
30};
31use xmtp_proto::types::Cursor;
32
33mod convert;
34#[cfg(test)]
35pub mod messages_newer_than_tests;
36#[cfg(test)]
37pub mod tests;
38
39#[derive(
40    Debug, Clone, Serialize, Deserialize, Queryable, Selectable, Identifiable, Eq, PartialEq,
41)]
42#[diesel(table_name = group_messages)]
43#[diesel(primary_key(id))]
44#[diesel(check_for_backend(Sqlite))]
45/// Successfully processed messages to be returned to the User.
46pub struct StoredGroupMessage {
47    /// Id of the message.
48    pub id: Vec<u8>,
49    /// Id of the group this message is tied to.
50    pub group_id: Vec<u8>,
51    /// Contents of message after decryption.
52    pub decrypted_message_bytes: Vec<u8>,
53    /// Time in nanoseconds the message was sent.
54    pub sent_at_ns: i64,
55    /// Group Message Kind Enum: 1 = Application, 2 = MembershipChange
56    pub kind: GroupMessageKind,
57    /// The ID of the App Installation this message was sent from.
58    pub sender_installation_id: Vec<u8>,
59    /// The Inbox ID of the Sender
60    pub sender_inbox_id: String,
61    /// We optimistically store messages before sending.
62    pub delivery_status: DeliveryStatus,
63    /// The Content Type of the message
64    pub content_type: ContentType,
65    /// The content type version major
66    pub version_major: i32,
67    /// The content type version minor
68    pub version_minor: i32,
69    /// The ID of the authority defining the content type
70    pub authority_id: String,
71    /// The ID of a referenced message
72    pub reference_id: Option<Vec<u8>>,
73    /// The Originator Node ID
74    pub originator_id: i64,
75    /// The Message SequenceId
76    pub sequence_id: i64,
77    /// Time in nanoseconds the message was inserted into the database
78    /// This field is automatically set by the database
79    pub inserted_at_ns: i64,
80    /// Timestamp (in NS) after which the message must be deleted
81    pub expire_at_ns: Option<i64>,
82    /// Whether to send a push notification when publishing this message
83    pub should_push: bool,
84}
85
86impl StoredGroupMessage {
87    pub fn cursor(&self) -> Cursor {
88        Cursor::new(self.sequence_id as u64, self.originator_id as u32)
89    }
90}
91
92// Separate Insertable struct that excludes inserted_at_ns to let the database set it
93#[derive(Debug, Clone, Insertable)]
94#[diesel(table_name = group_messages)]
95struct NewStoredGroupMessage {
96    pub id: Vec<u8>,
97    pub group_id: Vec<u8>,
98    pub decrypted_message_bytes: Vec<u8>,
99    pub sent_at_ns: i64,
100    pub kind: GroupMessageKind,
101    pub sender_installation_id: Vec<u8>,
102    pub sender_inbox_id: String,
103    pub delivery_status: DeliveryStatus,
104    pub content_type: ContentType,
105    pub version_major: i32,
106    pub version_minor: i32,
107    pub authority_id: String,
108    pub reference_id: Option<Vec<u8>>,
109    pub originator_id: i64,
110    pub sequence_id: i64,
111    // inserted_at_ns is NOT included - let database set it
112    pub expire_at_ns: Option<i64>,
113    pub should_push: bool,
114}
115
116impl From<&StoredGroupMessage> for NewStoredGroupMessage {
117    fn from(msg: &StoredGroupMessage) -> Self {
118        Self {
119            id: msg.id.clone(),
120            group_id: msg.group_id.clone(),
121            decrypted_message_bytes: msg.decrypted_message_bytes.clone(),
122            sent_at_ns: msg.sent_at_ns,
123            kind: msg.kind,
124            sender_installation_id: msg.sender_installation_id.clone(),
125            sender_inbox_id: msg.sender_inbox_id.clone(),
126            delivery_status: msg.delivery_status,
127            content_type: msg.content_type,
128            version_major: msg.version_major,
129            version_minor: msg.version_minor,
130            authority_id: msg.authority_id.clone(),
131            reference_id: msg.reference_id.clone(),
132            originator_id: msg.originator_id,
133            sequence_id: msg.sequence_id,
134            expire_at_ns: msg.expire_at_ns,
135            should_push: msg.should_push,
136        }
137    }
138}
139
140pub struct StoredGroupMessageWithReactions {
141    pub message: StoredGroupMessage,
142    // Messages who's reference_id matches this message's id
143    pub reactions: Vec<StoredGroupMessage>,
144}
145
146#[derive(Clone, Debug, PartialEq, Default)]
147pub enum SortDirection {
148    #[default]
149    Ascending,
150    Descending,
151}
152
153#[derive(Clone, Debug, PartialEq, Default)]
154pub enum SortBy {
155    #[default]
156    SentAt,
157    InsertedAt,
158}
159
160#[repr(i32)]
161#[derive(Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq, AsExpression, FromSqlRow)]
162#[diesel(sql_type = Integer)]
163pub enum GroupMessageKind {
164    Application = 1,
165    MembershipChange = 2,
166}
167
168impl ToSql<Integer, Sqlite> for GroupMessageKind
169where
170    i32: ToSql<Integer, Sqlite>,
171{
172    fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> serialize::Result {
173        out.set_value(*self as i32);
174        Ok(IsNull::No)
175    }
176}
177
178impl FromSql<Integer, Sqlite> for GroupMessageKind
179where
180    i32: FromSql<Integer, Sqlite>,
181{
182    fn from_sql(bytes: <Sqlite as Backend>::RawValue<'_>) -> deserialize::Result<Self> {
183        match i32::from_sql(bytes)? {
184            1 => Ok(GroupMessageKind::Application),
185            2 => Ok(GroupMessageKind::MembershipChange),
186            x => Err(format!("Unrecognized variant {}", x).into()),
187        }
188    }
189}
190
191/// Trait for determining if a message can be deleted by users.
192pub trait Deletable {
193    /// Returns whether this message can be deleted by users.
194    fn is_deletable(&self) -> bool;
195}
196
197impl Deletable for GroupMessageKind {
198    fn is_deletable(&self) -> bool {
199        match self {
200            // Application messages are deletable
201            GroupMessageKind::Application => true,
202            // Membership changes are transcript messages - not deletable
203            GroupMessageKind::MembershipChange => false,
204        }
205    }
206}
207
208//Legacy content types found at https://github.com/xmtp/xmtp-js/tree/main/content-types
209#[repr(i32)]
210#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, FromSqlRow, AsExpression)]
211#[diesel(sql_type = diesel::sql_types::Integer)]
212pub enum ContentType {
213    Unknown = 0,
214    Text = 1,
215    GroupMembershipChange = 2,
216    GroupUpdated = 3,
217    Reaction = 4,
218    ReadReceipt = 5,
219    Reply = 6,
220    Attachment = 7,
221    RemoteAttachment = 8,
222    TransactionReference = 9,
223    WalletSendCalls = 10,
224    LeaveRequest = 11,
225    Markdown = 12,
226    Actions = 13,
227    Intent = 14,
228    MultiRemoteAttachment = 15,
229    DeleteMessage = 16,
230}
231
232impl ContentType {
233    pub fn all() -> Vec<ContentType> {
234        vec![
235            ContentType::Unknown,
236            ContentType::Text,
237            ContentType::GroupMembershipChange,
238            ContentType::GroupUpdated,
239            ContentType::Reaction,
240            ContentType::ReadReceipt,
241            ContentType::Reply,
242            ContentType::Attachment,
243            ContentType::RemoteAttachment,
244            ContentType::TransactionReference,
245            ContentType::WalletSendCalls,
246            ContentType::LeaveRequest,
247            ContentType::Markdown,
248            ContentType::Actions,
249            ContentType::Intent,
250            ContentType::MultiRemoteAttachment,
251            ContentType::DeleteMessage,
252        ]
253    }
254}
255
256impl Deletable for ContentType {
257    fn is_deletable(&self) -> bool {
258        match self {
259            ContentType::GroupMembershipChange
260            | ContentType::GroupUpdated
261            | ContentType::LeaveRequest
262            | ContentType::Reaction
263            | ContentType::ReadReceipt
264            | ContentType::Actions
265            | ContentType::Intent
266            | ContentType::DeleteMessage
267            // Unknown content types default to non-deletable for safety
268            |ContentType::Unknown => false,
269
270            ContentType::Text
271            | ContentType::Markdown
272            | ContentType::Reply
273            | ContentType::Attachment
274            | ContentType::RemoteAttachment
275            | ContentType::TransactionReference
276            | ContentType::MultiRemoteAttachment
277            | ContentType::WalletSendCalls => true,
278        }
279    }
280}
281
282impl std::fmt::Display for ContentType {
283    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284        let as_string = match self {
285            Self::Unknown => "unknown",
286            Self::Text => text::TextCodec::TYPE_ID,
287            Self::Markdown => markdown::MarkdownCodec::TYPE_ID,
288            Self::GroupMembershipChange => membership_change::GroupMembershipChangeCodec::TYPE_ID,
289            Self::GroupUpdated => group_updated::GroupUpdatedCodec::TYPE_ID,
290            Self::Reaction => reaction::ReactionCodec::TYPE_ID,
291            Self::ReadReceipt => read_receipt::ReadReceiptCodec::TYPE_ID,
292            Self::Attachment => attachment::AttachmentCodec::TYPE_ID,
293            Self::RemoteAttachment => remote_attachment::RemoteAttachmentCodec::TYPE_ID,
294            Self::Reply => reply::ReplyCodec::TYPE_ID,
295            Self::TransactionReference => transaction_reference::TransactionReferenceCodec::TYPE_ID,
296            Self::WalletSendCalls => wallet_send_calls::WalletSendCallsCodec::TYPE_ID,
297            Self::LeaveRequest => leave_request::LeaveRequestCodec::TYPE_ID,
298            Self::Actions => actions::ActionsCodec::TYPE_ID,
299            Self::Intent => intent::IntentCodec::TYPE_ID,
300            Self::MultiRemoteAttachment => {
301                multi_remote_attachment::MultiRemoteAttachmentCodec::TYPE_ID
302            }
303            Self::DeleteMessage => delete_message::DeleteMessageCodec::TYPE_ID,
304        };
305
306        write!(f, "{}", as_string)
307    }
308}
309
310impl From<String> for ContentType {
311    fn from(type_id: String) -> Self {
312        match type_id.as_str() {
313            text::TextCodec::TYPE_ID => Self::Text,
314            markdown::MarkdownCodec::TYPE_ID => Self::Markdown,
315            membership_change::GroupMembershipChangeCodec::TYPE_ID => Self::GroupMembershipChange,
316            group_updated::GroupUpdatedCodec::TYPE_ID => Self::GroupUpdated,
317            reaction::ReactionCodec::TYPE_ID => Self::Reaction,
318            read_receipt::ReadReceiptCodec::TYPE_ID => Self::ReadReceipt,
319            reply::ReplyCodec::TYPE_ID => Self::Reply,
320            attachment::AttachmentCodec::TYPE_ID => Self::Attachment,
321            remote_attachment::RemoteAttachmentCodec::TYPE_ID => Self::RemoteAttachment,
322            transaction_reference::TransactionReferenceCodec::TYPE_ID => Self::TransactionReference,
323            wallet_send_calls::WalletSendCallsCodec::TYPE_ID => Self::WalletSendCalls,
324            leave_request::LeaveRequestCodec::TYPE_ID => Self::LeaveRequest,
325            actions::ActionsCodec::TYPE_ID => Self::Actions,
326            intent::IntentCodec::TYPE_ID => Self::Intent,
327            multi_remote_attachment::MultiRemoteAttachmentCodec::TYPE_ID => {
328                Self::MultiRemoteAttachment
329            }
330            delete_message::DeleteMessageCodec::TYPE_ID => Self::DeleteMessage,
331            _ => Self::Unknown,
332        }
333    }
334}
335
336impl ToSql<Integer, Sqlite> for ContentType
337where
338    i32: ToSql<Integer, Sqlite>,
339{
340    fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> serialize::Result {
341        out.set_value(*self as i32);
342        Ok(IsNull::No)
343    }
344}
345
346impl FromSql<Integer, Sqlite> for ContentType
347where
348    i32: FromSql<Integer, Sqlite>,
349{
350    fn from_sql(bytes: <Sqlite as Backend>::RawValue<'_>) -> deserialize::Result<Self> {
351        match i32::from_sql(bytes)? {
352            0 => Ok(ContentType::Unknown),
353            1 => Ok(ContentType::Text),
354            2 => Ok(ContentType::GroupMembershipChange),
355            3 => Ok(ContentType::GroupUpdated),
356            4 => Ok(ContentType::Reaction),
357            5 => Ok(ContentType::ReadReceipt),
358            6 => Ok(ContentType::Reply),
359            7 => Ok(ContentType::Attachment),
360            8 => Ok(ContentType::RemoteAttachment),
361            9 => Ok(ContentType::TransactionReference),
362            10 => Ok(ContentType::WalletSendCalls),
363            11 => Ok(ContentType::LeaveRequest),
364            12 => Ok(ContentType::Markdown),
365            13 => Ok(ContentType::Actions),
366            14 => Ok(ContentType::Intent),
367            15 => Ok(ContentType::MultiRemoteAttachment),
368            16 => Ok(ContentType::DeleteMessage),
369            x => Err(format!("Unrecognized variant {}", x).into()),
370        }
371    }
372}
373
374#[repr(i32)]
375#[derive(Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq, FromSqlRow, AsExpression)]
376#[diesel(sql_type = Integer)]
377pub enum DeliveryStatus {
378    Unpublished = 1,
379    Published = 2,
380    Failed = 3,
381}
382
383impl ToSql<Integer, Sqlite> for DeliveryStatus
384where
385    i32: ToSql<Integer, Sqlite>,
386{
387    fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> serialize::Result {
388        out.set_value(*self as i32);
389        Ok(IsNull::No)
390    }
391}
392
393impl FromSql<Integer, Sqlite> for DeliveryStatus
394where
395    i32: FromSql<Integer, Sqlite>,
396{
397    fn from_sql(bytes: <Sqlite as Backend>::RawValue<'_>) -> deserialize::Result<Self> {
398        match i32::from_sql(bytes)? {
399            1 => Ok(DeliveryStatus::Unpublished),
400            2 => Ok(DeliveryStatus::Published),
401            3 => Ok(DeliveryStatus::Failed),
402            x => Err(format!("Unrecognized variant {}", x).into()),
403        }
404    }
405}
406
407impl_fetch!(StoredGroupMessage, group_messages, Vec<u8>);
408
409// Custom store implementation that uses NewStoredGroupMessage to exclude inserted_at_ns
410impl<C> crate::Store<C> for StoredGroupMessage
411where
412    C: crate::ConnectionExt,
413{
414    type Output = ();
415    fn store(&self, into: &C) -> Result<(), crate::StorageError> {
416        let new_msg = NewStoredGroupMessage::from(self);
417        into.raw_query_write::<_, _>(|conn| {
418            diesel::insert_into(group_messages::table)
419                .values(&new_msg)
420                .execute(conn)
421                .map(|_| ())
422        })
423        .map_err(Into::into)
424    }
425}
426
427// Custom store_or_ignore implementation that uses NewStoredGroupMessage
428impl<C> crate::StoreOrIgnore<C> for StoredGroupMessage
429where
430    C: crate::ConnectionExt,
431{
432    type Output = ();
433
434    fn store_or_ignore(&self, into: &C) -> Result<(), crate::StorageError> {
435        let new_msg = NewStoredGroupMessage::from(self);
436        into.raw_query_write(|conn| {
437            diesel::insert_or_ignore_into(group_messages::table)
438                .values(&new_msg)
439                .execute(conn)
440                .map(|_| ())
441        })
442        .map_err(Into::into)
443    }
444}
445
446#[derive(Default, Clone, Builder, Debug)]
447#[builder(setter(into))]
448pub struct MsgQueryArgs {
449    #[builder(default = None)]
450    pub sent_after_ns: Option<i64>,
451    #[builder(default = None)]
452    pub sent_before_ns: Option<i64>,
453    #[builder(default = None)]
454    pub kind: Option<GroupMessageKind>,
455    #[builder(default = None)]
456    pub delivery_status: Option<DeliveryStatus>,
457    #[builder(default = None)]
458    pub limit: Option<i64>,
459    #[builder(default = None)]
460    pub direction: Option<SortDirection>,
461    #[builder(default = None)]
462    pub content_types: Option<Vec<ContentType>>,
463    #[builder(default = None)]
464    pub exclude_content_types: Option<Vec<ContentType>>,
465    #[builder(default = None)]
466    pub exclude_sender_inbox_ids: Option<Vec<String>>,
467    #[builder(default = None)]
468    pub sort_by: Option<SortBy>,
469    #[builder(default = None)]
470    pub inserted_after_ns: Option<i64>,
471    #[builder(default = None)]
472    pub inserted_before_ns: Option<i64>,
473    #[builder(default = false)]
474    pub exclude_disappearing: bool,
475}
476
477impl MsgQueryArgs {
478    pub fn builder() -> MsgQueryArgsBuilder {
479        MsgQueryArgsBuilder::default()
480    }
481}
482
483#[derive(Default, Clone, Builder)]
484pub struct RelationQuery {
485    #[builder(default = None)]
486    pub content_types: Option<Vec<ContentType>>,
487    #[builder(default = None)]
488    pub limit: Option<i64>,
489    #[builder(default = SortDirection::Ascending)]
490    pub direction: SortDirection,
491}
492
493impl RelationQuery {
494    pub fn builder() -> RelationQueryBuilder {
495        RelationQueryBuilder::default()
496    }
497}
498
499pub type InboundRelations = HashMap<Vec<u8>, Vec<StoredGroupMessage>>;
500pub type OutboundRelations = HashMap<Vec<u8>, StoredGroupMessage>;
501pub type RelationCounts = HashMap<Vec<u8>, usize>;
502
503pub struct MessagesWithRelations {
504    pub messages: Vec<StoredGroupMessage>,
505    /// Messages referenced by any item in the `messages` vector, keyed by their ID
506    pub outbound_relations: HashMap<Vec<u8>, StoredGroupMessage>,
507    /// Messages that reference any item in the `messages` vector, grouped by the reference_id
508    pub inbound_relations: HashMap<Vec<u8>, Vec<StoredGroupMessage>>,
509}
510
511pub type LatestMessageTimeBySender = HashMap<String, i64>;
512
513pub trait QueryGroupMessage {
514    /// Query for group messages
515    fn get_group_messages(
516        &self,
517        group_id: &[u8],
518        args: &MsgQueryArgs,
519    ) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError>;
520
521    /// Count group messages matching the given criteria
522    fn count_group_messages(
523        &self,
524        group_id: &[u8],
525        args: &MsgQueryArgs,
526    ) -> Result<i64, crate::ConnectionError>;
527
528    fn group_messages_paged(
529        &self,
530        args: &MsgQueryArgs,
531        offset: i64,
532    ) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError>;
533
534    /// Query for group messages with their reactions
535    fn get_group_messages_with_reactions(
536        &self,
537        group_id: &[u8],
538        args: &MsgQueryArgs,
539    ) -> Result<Vec<StoredGroupMessageWithReactions>, crate::ConnectionError>;
540
541    fn get_inbound_relations(
542        &self,
543        group_id: &[u8],
544        message_ids: &[&[u8]],
545        relation_query: RelationQuery,
546    ) -> Result<InboundRelations, crate::ConnectionError>;
547
548    fn get_outbound_relations(
549        &self,
550        group_id: &[u8],
551        message_ids: &[&[u8]],
552    ) -> Result<OutboundRelations, crate::ConnectionError>;
553
554    fn get_inbound_relation_counts(
555        &self,
556        group_id: &[u8],
557        message_ids: &[&[u8]],
558        relation_query: RelationQuery,
559    ) -> Result<RelationCounts, crate::ConnectionError>;
560
561    /// Get a particular group message
562    fn get_group_message<MessageId: AsRef<[u8]>>(
563        &self,
564        id: MessageId,
565    ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError>;
566
567    fn get_latest_message_times_by_sender<GroupId: AsRef<[u8]>>(
568        &self,
569        group_id: GroupId,
570        allowed_content_types: &[ContentType],
571    ) -> Result<LatestMessageTimeBySender, crate::ConnectionError>;
572
573    /// Get a particular group message using the write connection
574    fn write_conn_get_group_message<MessageId: AsRef<[u8]>>(
575        &self,
576        id: MessageId,
577    ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError>;
578
579    fn get_group_message_by_timestamp<GroupId: AsRef<[u8]>>(
580        &self,
581        group_id: GroupId,
582        timestamp: i64,
583    ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError>;
584
585    fn get_group_message_by_cursor<GroupId: AsRef<[u8]>>(
586        &self,
587        group_id: GroupId,
588        sequence_id: Cursor,
589    ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError>;
590
591    fn set_delivery_status_to_published<MessageId: AsRef<[u8]>>(
592        &self,
593        msg_id: &MessageId,
594        timestamp: u64,
595        cursor: Cursor,
596        message_expire_at_ns: Option<i64>,
597    ) -> Result<usize, crate::ConnectionError>;
598
599    fn set_delivery_status_to_failed<MessageId: AsRef<[u8]>>(
600        &self,
601        msg_id: &MessageId,
602    ) -> Result<usize, crate::ConnectionError>;
603
604    fn delete_expired_messages(&self) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError>;
605
606    fn delete_message_by_id<MessageId: AsRef<[u8]>>(
607        &self,
608        message_id: MessageId,
609    ) -> Result<usize, crate::ConnectionError>;
610
611    fn messages_newer_than(
612        &self,
613        cursors_by_group: &HashMap<Vec<u8>, xmtp_proto::types::GlobalCursor>,
614    ) -> Result<Vec<Cursor>, crate::ConnectionError>;
615
616    /// Clear messages from the database with optional filtering.
617    ///
618    /// # Arguments
619    /// * `group_ids` - If provided, only delete messages in these groups. If None, delete from all groups.
620    /// * `retention_days` - If provided, only delete messages older than this many days. If None, delete all matching messages.
621    ///
622    /// # Returns
623    /// The number of messages deleted.
624    fn clear_messages(
625        &self,
626        group_ids: Option<&[Vec<u8>]>,
627        retention_days: Option<u32>,
628    ) -> Result<usize, crate::ConnectionError>;
629}
630
631impl<T> QueryGroupMessage for &T
632where
633    T: QueryGroupMessage,
634{
635    /// Query for group messages
636    fn get_group_messages(
637        &self,
638        group_id: &[u8],
639        args: &MsgQueryArgs,
640    ) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError> {
641        (**self).get_group_messages(group_id, args)
642    }
643
644    /// Count group messages matching the given criteria
645    fn count_group_messages(
646        &self,
647        group_id: &[u8],
648        args: &MsgQueryArgs,
649    ) -> Result<i64, crate::ConnectionError> {
650        (**self).count_group_messages(group_id, args)
651    }
652
653    fn group_messages_paged(
654        &self,
655        args: &MsgQueryArgs,
656        offset: i64,
657    ) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError> {
658        (**self).group_messages_paged(args, offset)
659    }
660
661    /// Query for group messages with their reactions
662    fn get_group_messages_with_reactions(
663        &self,
664        group_id: &[u8],
665        args: &MsgQueryArgs,
666    ) -> Result<Vec<StoredGroupMessageWithReactions>, crate::ConnectionError> {
667        (**self).get_group_messages_with_reactions(group_id, args)
668    }
669
670    fn get_inbound_relations(
671        &self,
672        group_id: &[u8],
673        message_ids: &[&[u8]],
674        relation_query: RelationQuery,
675    ) -> Result<InboundRelations, crate::ConnectionError> {
676        (**self).get_inbound_relations(group_id, message_ids, relation_query)
677    }
678
679    fn get_outbound_relations(
680        &self,
681        group_id: &[u8],
682        message_ids: &[&[u8]],
683    ) -> Result<OutboundRelations, crate::ConnectionError> {
684        (**self).get_outbound_relations(group_id, message_ids)
685    }
686
687    fn get_inbound_relation_counts(
688        &self,
689        group_id: &[u8],
690        message_ids: &[&[u8]],
691        relation_query: RelationQuery,
692    ) -> Result<RelationCounts, crate::ConnectionError> {
693        (**self).get_inbound_relation_counts(group_id, message_ids, relation_query)
694    }
695
696    fn get_latest_message_times_by_sender<GroupId: AsRef<[u8]>>(
697        &self,
698        group_id: GroupId,
699        allowed_content_types: &[ContentType],
700    ) -> Result<LatestMessageTimeBySender, crate::ConnectionError> {
701        (**self).get_latest_message_times_by_sender(group_id, allowed_content_types)
702    }
703
704    /// Get a particular group message
705    fn get_group_message<MessageId: AsRef<[u8]>>(
706        &self,
707        id: MessageId,
708    ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
709        (**self).get_group_message(id)
710    }
711
712    /// Get a particular group message using the write connection
713    fn write_conn_get_group_message<MessageId: AsRef<[u8]>>(
714        &self,
715        id: MessageId,
716    ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
717        (**self).write_conn_get_group_message(id)
718    }
719
720    fn get_group_message_by_timestamp<GroupId: AsRef<[u8]>>(
721        &self,
722        group_id: GroupId,
723        timestamp: i64,
724    ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
725        (**self).get_group_message_by_timestamp(group_id, timestamp)
726    }
727
728    fn get_group_message_by_cursor<GroupId: AsRef<[u8]>>(
729        &self,
730        group_id: GroupId,
731        cursor: Cursor,
732    ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
733        (**self).get_group_message_by_cursor(group_id, cursor)
734    }
735
736    fn set_delivery_status_to_published<MessageId: AsRef<[u8]>>(
737        &self,
738        msg_id: &MessageId,
739        timestamp: u64,
740        cursor: Cursor,
741        message_expire_at_ns: Option<i64>,
742    ) -> Result<usize, crate::ConnectionError> {
743        (**self).set_delivery_status_to_published(msg_id, timestamp, cursor, message_expire_at_ns)
744    }
745
746    fn set_delivery_status_to_failed<MessageId: AsRef<[u8]>>(
747        &self,
748        msg_id: &MessageId,
749    ) -> Result<usize, crate::ConnectionError> {
750        (**self).set_delivery_status_to_failed(msg_id)
751    }
752
753    fn delete_expired_messages(&self) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError> {
754        (**self).delete_expired_messages()
755    }
756
757    fn delete_message_by_id<MessageId: AsRef<[u8]>>(
758        &self,
759        message_id: MessageId,
760    ) -> Result<usize, crate::ConnectionError> {
761        (**self).delete_message_by_id(message_id)
762    }
763
764    fn messages_newer_than(
765        &self,
766        cursors_by_group: &HashMap<Vec<u8>, xmtp_proto::types::GlobalCursor>,
767    ) -> Result<Vec<Cursor>, crate::ConnectionError> {
768        (**self).messages_newer_than(cursors_by_group)
769    }
770
771    fn clear_messages(
772        &self,
773        group_ids: Option<&[Vec<u8>]>,
774        retention_days: Option<u32>,
775    ) -> Result<usize, crate::ConnectionError> {
776        (**self).clear_messages(group_ids, retention_days)
777    }
778}
779
780// Macro to apply common message filters to any boxed query
781macro_rules! apply_message_filters {
782    ($query:expr, $args:expr) => {{
783        let mut query = $query;
784
785        if let Some(sent_after) = $args.sent_after_ns {
786            query = query.filter(dsl::sent_at_ns.gt(sent_after));
787        }
788
789        if let Some(sent_before) = $args.sent_before_ns {
790            query = query.filter(dsl::sent_at_ns.lt(sent_before));
791        }
792
793        if let Some(kind) = $args.kind {
794            query = query.filter(dsl::kind.eq(kind));
795        }
796
797        if let Some(status) = $args.delivery_status {
798            query = query.filter(dsl::delivery_status.eq(status));
799        }
800
801        if let Some(content_types) = &$args.content_types {
802            query = query.filter(dsl::content_type.eq_any(content_types));
803        }
804
805        if let Some(exclude_content_types) = &$args.exclude_content_types {
806            query = query.filter(dsl::content_type.ne_all(exclude_content_types));
807        }
808
809        if let Some(exclude_sender_inbox_ids) = &$args.exclude_sender_inbox_ids {
810            query = query.filter(dsl::sender_inbox_id.ne_all(exclude_sender_inbox_ids));
811        }
812
813        if let Some(inserted_after_ns) = $args.inserted_after_ns {
814            query = query.filter(dsl::inserted_at_ns.gt(inserted_after_ns));
815        }
816
817        if let Some(inserted_before_ns) = $args.inserted_before_ns {
818            query = query.filter(dsl::inserted_at_ns.lt(inserted_before_ns));
819        }
820
821        // Always exclude expired messages (expire_at_ns < now)
822        let current_time = now_ns();
823        query = query.filter(
824            dsl::expire_at_ns
825                .is_null()
826                .or(dsl::expire_at_ns.gt(current_time)),
827        );
828
829        query
830    }};
831}
832
833impl<C: ConnectionExt> QueryGroupMessage for DbConnection<C> {
834    /// Query for group messages
835    fn get_group_messages(
836        &self,
837        group_id: &[u8],
838        args: &MsgQueryArgs,
839    ) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError> {
840        use crate::schema::group_messages::dsl;
841
842        // Start with base query
843        let mut query = dsl::group_messages
844            .filter(group_id_filter(group_id))
845            .into_boxed();
846
847        // Apply common filters using macro
848        query = apply_message_filters!(query, args);
849
850        // Apply ordering with a rowid tie-break to ensure indexes get used when sorting.
851        query = match (
852            args.sort_by.clone().unwrap_or_default(),
853            args.direction.clone().unwrap_or_default(),
854        ) {
855            (SortBy::SentAt, SortDirection::Ascending) => {
856                query.order((dsl::sent_at_ns.asc(), diesel_sql::<Integer>("rowid").asc()))
857            }
858            (SortBy::SentAt, SortDirection::Descending) => query.order((
859                dsl::sent_at_ns.desc(),
860                diesel_sql::<Integer>("rowid").desc(),
861            )),
862            (SortBy::InsertedAt, SortDirection::Ascending) => query.order((
863                dsl::inserted_at_ns.asc(),
864                diesel_sql::<Integer>("rowid").asc(),
865            )),
866            (SortBy::InsertedAt, SortDirection::Descending) => query.order((
867                dsl::inserted_at_ns.desc(),
868                diesel_sql::<Integer>("rowid").desc(),
869            )),
870        };
871
872        if let Some(limit) = args.limit {
873            query = query.limit(limit);
874        }
875
876        self.raw_query_read(|conn| query.load::<StoredGroupMessage>(conn))
877    }
878
879    /// Count group messages matching the given criteria
880    fn count_group_messages(
881        &self,
882        group_id: &[u8],
883        args: &MsgQueryArgs,
884    ) -> Result<i64, crate::ConnectionError> {
885        use crate::schema::{group_messages::dsl, groups::dsl as groups_dsl};
886
887        // Check if this is a DM group
888        let is_dm = self.raw_query_read(|conn| {
889            groups_dsl::groups
890                .filter(groups_dsl::id.eq(group_id))
891                .select(groups_dsl::conversation_type)
892                .first::<ConversationType>(conn)
893        })? == ConversationType::Dm;
894
895        let include_group_updated = args
896            .content_types
897            .as_ref()
898            .map(|types| types.contains(&ContentType::GroupUpdated))
899            .unwrap_or(false);
900
901        // Start with base query
902        let mut query = dsl::group_messages
903            .filter(group_id_filter(group_id))
904            .into_boxed();
905
906        // For DM groups, exclude GroupUpdated messages unless specifically requested
907        // In find_group_messages we do some post-query deduplication to return the first GroupUpdated
908        // message but not the subsequent ones. That's not really an option here, so instead we are excluding
909        // them altogether.
910        //
911        // Ideally we would prevent the duplicate GroupUpdated messages from being inserted in the first place.
912        if is_dm && !include_group_updated {
913            query = query.filter(dsl::content_type.ne(ContentType::GroupUpdated));
914        }
915
916        // Apply common filters using macro
917        query = apply_message_filters!(query, args);
918
919        let count =
920            self.raw_query_read(|conn| query.select(diesel::dsl::count_star()).first::<i64>(conn))?;
921
922        Ok(count)
923    }
924
925    fn group_messages_paged(
926        &self,
927        args: &MsgQueryArgs,
928        offset: i64,
929    ) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError> {
930        let MsgQueryArgs {
931            sent_after_ns,
932            sent_before_ns,
933            limit,
934            exclude_disappearing,
935            ..
936        } = args;
937
938        let mut query = group_messages::table
939            .left_join(groups::table)
940            .filter(groups::conversation_type.ne_all(ConversationType::virtual_types()))
941            .filter(group_messages::kind.eq(GroupMessageKind::Application))
942            .order_by(group_messages::id)
943            .into_boxed();
944
945        if let Some(start_ns) = sent_after_ns {
946            query = query.filter(group_messages::sent_at_ns.gt(start_ns));
947        }
948        if let Some(end_ns) = sent_before_ns {
949            query = query.filter(group_messages::sent_at_ns.le(end_ns));
950        }
951        if *exclude_disappearing {
952            query = query.filter(group_messages::expire_at_ns.is_null());
953        } else {
954            // Always exclude expired messages (expire_at_ns < now)
955            let current_time = now_ns();
956            query = query.filter(
957                group_messages::expire_at_ns
958                    .is_null()
959                    .or(group_messages::expire_at_ns.gt(current_time)),
960            );
961        }
962
963        query = query.limit(limit.unwrap_or(100)).offset(offset);
964
965        self.raw_query_read(|conn| {
966            query
967                .select(group_messages::all_columns)
968                .load::<StoredGroupMessage>(conn)
969        })
970    }
971
972    /// Query for group messages with their reactions
973    fn get_group_messages_with_reactions(
974        &self,
975        group_id: &[u8],
976        args: &MsgQueryArgs,
977    ) -> Result<Vec<StoredGroupMessageWithReactions>, crate::ConnectionError> {
978        // First get all the main messages
979        let mut modified_args = args.clone();
980        // filter out reactions from the main query so we don't get them twice
981        let content_types = match modified_args.content_types.clone() {
982            Some(content_types) => {
983                let mut content_types = content_types.clone();
984                content_types.retain(|content_type| *content_type != ContentType::Reaction);
985                Some(content_types)
986            }
987            None => Some(vec![
988                ContentType::Text,
989                ContentType::GroupMembershipChange,
990                ContentType::GroupUpdated,
991                ContentType::ReadReceipt,
992                ContentType::Reply,
993                ContentType::Attachment,
994                ContentType::RemoteAttachment,
995                ContentType::TransactionReference,
996                ContentType::Unknown,
997            ]),
998        };
999
1000        modified_args.content_types = content_types;
1001        let messages = self.get_group_messages(group_id, &modified_args)?;
1002
1003        // Then get all reactions for these messages in a single query
1004        let message_ids: Vec<&[u8]> = messages.iter().map(|m| m.id.as_slice()).collect();
1005
1006        let mut reactions_query = dsl::group_messages
1007            .filter(group_id_filter(group_id))
1008            .filter(dsl::reference_id.is_not_null())
1009            .filter(dsl::reference_id.eq_any(message_ids))
1010            .into_boxed();
1011
1012        // Apply the same sorting as the main messages
1013        reactions_query = match args.direction.as_ref().unwrap_or(&SortDirection::Ascending) {
1014            SortDirection::Ascending => reactions_query.order(dsl::sent_at_ns.asc()),
1015            SortDirection::Descending => reactions_query.order(dsl::sent_at_ns.desc()),
1016        };
1017
1018        let reactions: Vec<StoredGroupMessage> =
1019            self.raw_query_read(|conn| reactions_query.load::<StoredGroupMessage>(conn))?;
1020
1021        // Group reactions by parent message id
1022        let mut reactions_by_reference: HashMap<Vec<u8>, Vec<StoredGroupMessage>> = HashMap::new();
1023
1024        for reaction in reactions {
1025            if let Some(reference_id) = &reaction.reference_id {
1026                reactions_by_reference
1027                    .entry(reference_id.clone())
1028                    .or_default()
1029                    .push(reaction);
1030            }
1031        }
1032
1033        // Combine messages with their reactions
1034        let messages_with_reactions: Vec<StoredGroupMessageWithReactions> = messages
1035            .into_iter()
1036            .map(|message| {
1037                let message_clone = message.clone();
1038                StoredGroupMessageWithReactions {
1039                    message,
1040                    reactions: reactions_by_reference
1041                        .remove(&message_clone.id)
1042                        .unwrap_or_default(),
1043                }
1044            })
1045            .collect();
1046
1047        Ok(messages_with_reactions)
1048    }
1049
1050    fn get_inbound_relations(
1051        &self,
1052        group_id: &[u8],
1053        message_ids: &[&[u8]],
1054        relation_query: RelationQuery,
1055    ) -> Result<InboundRelations, crate::ConnectionError> {
1056        let mut inbound_relations: HashMap<Vec<u8>, Vec<StoredGroupMessage>> = HashMap::new();
1057
1058        let mut inbound_relations_query = dsl::group_messages
1059            .filter(group_id_filter(group_id))
1060            .filter(dsl::reference_id.is_not_null())
1061            .filter(dsl::reference_id.eq_any(message_ids))
1062            .into_boxed();
1063
1064        if relation_query.direction == SortDirection::Descending {
1065            inbound_relations_query = inbound_relations_query.order(dsl::sent_at_ns.desc());
1066        } else {
1067            inbound_relations_query = inbound_relations_query.order(dsl::sent_at_ns.asc());
1068        }
1069
1070        if let Some(content_types) = relation_query.content_types {
1071            inbound_relations_query =
1072                inbound_relations_query.filter(dsl::content_type.eq_any(content_types));
1073        }
1074
1075        if let Some(limit) = relation_query.limit {
1076            inbound_relations_query = inbound_relations_query.limit(limit);
1077        }
1078
1079        let raw_inbound_relations: Vec<StoredGroupMessage> =
1080            self.raw_query_read(|conn| inbound_relations_query.load::<StoredGroupMessage>(conn))?;
1081
1082        for inbound_reference in raw_inbound_relations {
1083            if let Some(reference_id) = &inbound_reference.reference_id {
1084                inbound_relations
1085                    .entry(reference_id.clone())
1086                    .or_default()
1087                    .push(inbound_reference);
1088            }
1089        }
1090
1091        Ok(inbound_relations)
1092    }
1093
1094    fn get_outbound_relations(
1095        &self,
1096        group_id: &[u8],
1097        reference_ids: &[&[u8]],
1098    ) -> Result<OutboundRelations, crate::ConnectionError> {
1099        let outbound_references_query = dsl::group_messages
1100            .filter(group_id_filter(group_id))
1101            .filter(dsl::id.eq_any(reference_ids))
1102            .into_boxed();
1103
1104        let raw_outbound_references: Vec<StoredGroupMessage> =
1105            self.raw_query_read(|conn| outbound_references_query.load::<StoredGroupMessage>(conn))?;
1106
1107        Ok(raw_outbound_references
1108            .into_iter()
1109            .map(|outbound| (outbound.id.clone(), outbound))
1110            .collect())
1111    }
1112
1113    fn get_inbound_relation_counts(
1114        &self,
1115        group_id: &[u8],
1116        message_ids: &[&[u8]],
1117        relation_query: RelationQuery,
1118    ) -> Result<RelationCounts, crate::ConnectionError> {
1119        let mut count_query = dsl::group_messages
1120            .filter(group_id_filter(group_id))
1121            .filter(dsl::reference_id.is_not_null())
1122            .filter(dsl::reference_id.eq_any(message_ids))
1123            .group_by(dsl::reference_id)
1124            .select((dsl::reference_id, diesel::dsl::count_star()))
1125            .into_boxed();
1126
1127        if let Some(content_types) = relation_query.content_types {
1128            count_query = count_query.filter(dsl::content_type.eq_any(content_types));
1129        }
1130
1131        let raw_counts: Vec<(Option<Vec<u8>>, i64)> =
1132            self.raw_query_read(|conn| count_query.load(conn))?;
1133
1134        Ok(raw_counts
1135            .into_iter()
1136            .filter_map(|(reference_id, count)| reference_id.map(|id| (id, count as usize)))
1137            .collect())
1138    }
1139
1140    fn get_latest_message_times_by_sender<GroupId: AsRef<[u8]>>(
1141        &self,
1142        group_id: GroupId,
1143        allowed_content_types: &[ContentType],
1144    ) -> Result<LatestMessageTimeBySender, crate::ConnectionError> {
1145        let query = dsl::group_messages
1146            .filter(group_id_filter(group_id.as_ref()))
1147            .filter(dsl::content_type.eq_any(allowed_content_types))
1148            .group_by(dsl::sender_inbox_id)
1149            .select((dsl::sender_inbox_id, diesel::dsl::max(dsl::sent_at_ns)))
1150            .into_boxed();
1151
1152        let raw_results: Vec<(String, Option<i64>)> =
1153            self.raw_query_read(|conn| query.load(conn))?;
1154
1155        Ok(raw_results
1156            .into_iter()
1157            .filter_map(|(sender_inbox_id, max_sent_at_ns)| {
1158                max_sent_at_ns.map(|sent_at_ns| (sender_inbox_id, sent_at_ns))
1159            })
1160            .collect())
1161    }
1162
1163    /// Get a particular group message
1164    fn get_group_message<MessageId: AsRef<[u8]>>(
1165        &self,
1166        id: MessageId,
1167    ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
1168        self.raw_query_read(|conn| {
1169            dsl::group_messages
1170                .filter(dsl::id.eq(id.as_ref()))
1171                .first::<StoredGroupMessage>(conn)
1172                .optional()
1173        })
1174    }
1175
1176    /// Get a particular group message using the write connection
1177    fn write_conn_get_group_message<MessageId: AsRef<[u8]>>(
1178        &self,
1179        id: MessageId,
1180    ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
1181        self.raw_query_write(|conn| {
1182            dsl::group_messages
1183                .filter(dsl::id.eq(id.as_ref()))
1184                .first::<StoredGroupMessage>(conn)
1185                .optional()
1186        })
1187    }
1188
1189    fn get_group_message_by_timestamp<GroupId: AsRef<[u8]>>(
1190        &self,
1191        group_id: GroupId,
1192        timestamp: i64,
1193    ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
1194        self.raw_query_read(|conn| {
1195            dsl::group_messages
1196                .filter(dsl::group_id.eq(group_id.as_ref()))
1197                .filter(dsl::sent_at_ns.eq(&timestamp))
1198                .first::<StoredGroupMessage>(conn)
1199                .optional()
1200        })
1201    }
1202
1203    fn get_group_message_by_cursor<GroupId: AsRef<[u8]>>(
1204        &self,
1205        group_id: GroupId,
1206        cursor: Cursor,
1207    ) -> Result<Option<StoredGroupMessage>, crate::ConnectionError> {
1208        self.raw_query_read(|conn| {
1209            dsl::group_messages
1210                .filter(dsl::group_id.eq(group_id.as_ref()))
1211                .filter(dsl::sequence_id.eq(cursor.sequence_id as i64))
1212                .filter(dsl::originator_id.eq(cursor.originator_id as i64))
1213                .first::<StoredGroupMessage>(conn)
1214                .optional()
1215        })
1216    }
1217
1218    fn set_delivery_status_to_published<MessageId: AsRef<[u8]>>(
1219        &self,
1220        msg_id: &MessageId,
1221        timestamp: u64,
1222        cursor: Cursor,
1223        message_expire_at_ns: Option<i64>,
1224    ) -> Result<usize, crate::ConnectionError> {
1225        tracing::info!(
1226            "Message [{}] published with cursor = {}",
1227            hex::encode(msg_id),
1228            cursor
1229        );
1230        self.raw_query_write(|conn| {
1231            diesel::update(dsl::group_messages)
1232                .filter(dsl::id.eq(msg_id.as_ref()))
1233                .set((
1234                    dsl::delivery_status.eq(DeliveryStatus::Published),
1235                    dsl::sent_at_ns.eq(timestamp as i64),
1236                    dsl::sequence_id.eq(cursor.sequence_id as i64),
1237                    dsl::originator_id.eq(cursor.originator_id as i64),
1238                    dsl::expire_at_ns.eq(message_expire_at_ns),
1239                ))
1240                .execute(conn)
1241        })
1242    }
1243
1244    fn set_delivery_status_to_failed<MessageId: AsRef<[u8]>>(
1245        &self,
1246        msg_id: &MessageId,
1247    ) -> Result<usize, crate::ConnectionError> {
1248        self.raw_query_write(|conn| {
1249            diesel::update(dsl::group_messages)
1250                .filter(dsl::id.eq(msg_id.as_ref()))
1251                .set((dsl::delivery_status.eq(DeliveryStatus::Failed),))
1252                .execute(conn)
1253        })
1254    }
1255
1256    fn delete_expired_messages(&self) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError> {
1257        self.raw_query_write(|conn| {
1258            use diesel::prelude::*;
1259            let now = now_ns();
1260
1261            diesel::delete(
1262                dsl::group_messages
1263                    .filter(dsl::delivery_status.eq(DeliveryStatus::Published))
1264                    .filter(dsl::kind.eq(GroupMessageKind::Application))
1265                    .filter(dsl::expire_at_ns.is_not_null())
1266                    .filter(dsl::expire_at_ns.le(now)),
1267            )
1268            .returning(StoredGroupMessage::as_returning())
1269            .load::<StoredGroupMessage>(conn)
1270        })
1271    }
1272
1273    fn delete_message_by_id<MessageId: AsRef<[u8]>>(
1274        &self,
1275        message_id: MessageId,
1276    ) -> Result<usize, crate::ConnectionError> {
1277        self.raw_query_write(|conn| {
1278            use diesel::prelude::*;
1279            diesel::delete(dsl::group_messages.filter(dsl::id.eq(message_id.as_ref())))
1280                .execute(conn)
1281        })
1282    }
1283
1284    fn messages_newer_than(
1285        &self,
1286        cursors_by_group: &HashMap<Vec<u8>, xmtp_proto::types::GlobalCursor>,
1287    ) -> Result<Vec<Cursor>, crate::ConnectionError> {
1288        use diesel::BoolExpressionMethods;
1289        use diesel::ExpressionMethods;
1290        use diesel::prelude::*;
1291
1292        let mut all_cursors = Vec::new();
1293
1294        // Convert the HashMap into a Vec for batching
1295        let groups: Vec<_> = cursors_by_group.iter().collect();
1296
1297        // Process groups in batches of 100
1298        for batch in groups.chunks(100) {
1299            // Build the WHERE clause using Diesel's query builder
1300            // Start with a false condition that we'll OR with real conditions
1301            let mut batch_filter = Box::new(dsl::group_id.eq(&[] as &[u8]))
1302                as Box<
1303                    dyn BoxableExpression<
1304                            group_messages::table,
1305                            Sqlite,
1306                            SqlType = diesel::sql_types::Bool,
1307                        >,
1308                >;
1309
1310            for (group_id, global_cursor) in batch {
1311                if global_cursor.is_empty() {
1312                    // No cursor for this group - include all messages
1313                    batch_filter = Box::new(batch_filter.or(dsl::group_id.eq(group_id.as_slice())));
1314                } else {
1315                    // Build condition for this group: group_id matches AND (originator conditions)
1316                    let known_originators: Vec<i64> =
1317                        global_cursor.keys().map(|k| *k as i64).collect();
1318
1319                    // Start with false condition for originator checks
1320                    let mut originator_filter = Box::new(dsl::originator_id.eq(-1i64))
1321                        as Box<
1322                            dyn BoxableExpression<
1323                                    group_messages::table,
1324                                    Sqlite,
1325                                    SqlType = diesel::sql_types::Bool,
1326                                >,
1327                        >;
1328
1329                    // For each known originator, add: originator_id = X AND sequence_id > Y
1330                    for (orig_id, seq_id) in global_cursor.iter() {
1331                        originator_filter = Box::new(
1332                            originator_filter.or(dsl::originator_id
1333                                .eq(*orig_id as i64)
1334                                .and(dsl::sequence_id.gt(*seq_id as i64))),
1335                        );
1336                    }
1337
1338                    // Also include messages from unknown originators
1339                    originator_filter = Box::new(
1340                        originator_filter.or(dsl::originator_id.ne_all(known_originators)),
1341                    );
1342
1343                    // Combine: this group AND (originator conditions)
1344                    batch_filter = Box::new(
1345                        batch_filter
1346                            .or(dsl::group_id.eq(group_id.as_slice()).and(originator_filter)),
1347                    );
1348                }
1349            }
1350
1351            // Execute the query
1352            let messages: Vec<(i64, i64)> = self.raw_query_read(|conn| {
1353                dsl::group_messages
1354                    .select((dsl::originator_id, dsl::sequence_id))
1355                    .filter(batch_filter)
1356                    .load(conn)
1357            })?;
1358
1359            for (originator_id, sequence_id) in messages {
1360                all_cursors.push(Cursor::new(sequence_id as u64, originator_id as u32));
1361            }
1362        }
1363
1364        Ok(all_cursors)
1365    }
1366
1367    fn clear_messages(
1368        &self,
1369        group_ids: Option<&[Vec<u8>]>,
1370        retention_days: Option<u32>,
1371    ) -> Result<usize, crate::ConnectionError> {
1372        let mut query = diesel::delete(dsl::group_messages).into_boxed();
1373
1374        if let Some(group_ids) = group_ids {
1375            query = query.filter(dsl::group_id.eq_any(group_ids));
1376        }
1377
1378        if let Some(days) = retention_days {
1379            let limit = now_ns().saturating_sub(NS_IN_DAY.saturating_mul(i64::from(days)));
1380            query = query.filter(dsl::sent_at_ns.lt(limit));
1381        }
1382
1383        self.raw_query_write(|conn| query.execute(conn))
1384    }
1385}
1386
1387fn group_id_filter(
1388    group_id: &[u8],
1389) -> impl diesel::expression::BoxableExpression<
1390    group_messages::table,
1391    diesel::sqlite::Sqlite,
1392    SqlType = diesel::sql_types::Bool,
1393> + diesel::expression::NonAggregate {
1394    dsl::group_id.eq_any(
1395        groups_dsl::groups
1396            .filter(
1397                groups_dsl::id.eq(group_id).or(groups_dsl::dm_id.eq_any(
1398                    groups_dsl::groups
1399                        .select(groups_dsl::dm_id)
1400                        .filter(groups_dsl::id.eq(group_id))
1401                        .into_boxed(),
1402                )),
1403            )
1404            .select(groups_dsl::id),
1405    )
1406}