xmtp_db/encrypted_store/
local_commit_log.rs

1use super::{DbConnection, remote_commit_log::CommitResult, schema::local_commit_log::dsl};
2use crate::{ConnectionExt, impl_store, schema::local_commit_log};
3use diesel::{Insertable, Queryable, prelude::*};
4use xmtp_common::snippet::Snippet;
5use xmtp_proto::xmtp::mls::message_contents::PlaintextCommitLogEntry;
6
7pub enum CommitType {
8    GroupCreation,
9    BackupRestore,
10    Welcome,
11    KeyUpdate,
12    MetadataUpdate,
13    UpdateGroupMembership,
14    UpdateAdminList,
15    UpdatePermission,
16}
17
18impl std::fmt::Display for CommitType {
19    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20        let description = match self {
21            CommitType::GroupCreation => "GroupCreation",
22            CommitType::BackupRestore => "BackupRestore",
23            CommitType::Welcome => "Welcome",
24            CommitType::KeyUpdate => "KeyUpdate",
25            CommitType::MetadataUpdate => "MetadataUpdate",
26            CommitType::UpdateGroupMembership => "UpdateGroupMembership",
27            CommitType::UpdateAdminList => "UpdateAdminList",
28            CommitType::UpdatePermission => "UpdatePermission",
29        };
30        write!(f, "{}", description)
31    }
32}
33
34#[derive(Insertable, Debug, Clone)]
35#[diesel(table_name = local_commit_log)]
36pub struct NewLocalCommitLog {
37    pub group_id: Vec<u8>,
38    pub commit_sequence_id: i64,
39    pub last_epoch_authenticator: Vec<u8>,
40    pub commit_result: CommitResult,
41    pub applied_epoch_number: i64,
42    pub applied_epoch_authenticator: Vec<u8>,
43    pub error_message: Option<String>,
44    pub sender_inbox_id: Option<String>,
45    pub sender_installation_id: Option<Vec<u8>>,
46    pub commit_type: Option<String>,
47}
48
49#[derive(Queryable, Clone)]
50#[diesel(table_name = local_commit_log)]
51#[diesel(primary_key(id))]
52pub struct LocalCommitLog {
53    pub rowid: i32,
54    pub group_id: Vec<u8>,
55    pub commit_sequence_id: i64,
56    pub last_epoch_authenticator: Vec<u8>,
57    pub commit_result: CommitResult,
58    pub applied_epoch_number: i64,
59    pub applied_epoch_authenticator: Vec<u8>,
60    pub error_message: Option<String>,
61    pub sender_inbox_id: Option<String>,
62    pub sender_installation_id: Option<Vec<u8>>,
63    pub commit_type: Option<String>,
64}
65
66impl From<&LocalCommitLog> for PlaintextCommitLogEntry {
67    fn from(local_commit_log: &LocalCommitLog) -> Self {
68        PlaintextCommitLogEntry {
69            group_id: local_commit_log.group_id.clone(),
70            commit_sequence_id: local_commit_log.commit_sequence_id as u64,
71            last_epoch_authenticator: local_commit_log.last_epoch_authenticator.clone(),
72            commit_result: local_commit_log.commit_result.into(),
73            applied_epoch_number: local_commit_log.applied_epoch_number as u64,
74            applied_epoch_authenticator: local_commit_log.applied_epoch_authenticator.clone(),
75        }
76    }
77}
78
79impl From<CommitResult> for i32 {
80    fn from(commit_result: CommitResult) -> Self {
81        match commit_result {
82            CommitResult::Success => {
83                xmtp_proto::xmtp::mls::message_contents::CommitResult::Applied as i32
84            }
85            CommitResult::WrongEpoch => {
86                xmtp_proto::xmtp::mls::message_contents::CommitResult::WrongEpoch as i32
87            }
88            CommitResult::Undecryptable => {
89                xmtp_proto::xmtp::mls::message_contents::CommitResult::Undecryptable as i32
90            }
91            CommitResult::Invalid => {
92                xmtp_proto::xmtp::mls::message_contents::CommitResult::Invalid as i32
93            }
94            CommitResult::Unknown => {
95                xmtp_proto::xmtp::mls::message_contents::CommitResult::Unspecified as i32
96            }
97        }
98    }
99}
100
101impl_store!(NewLocalCommitLog, local_commit_log);
102
103impl std::fmt::Debug for LocalCommitLog {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        write!(
106            f,
107            "LocalCommitLog {{ rowid: {:?}, group_id {:?}, commit_sequence_id: {:?}, last_epoch_authenticator: {:?}, commit_result: {:?}, error_message: {:?}, applied_epoch_number: {:?}, applied_epoch_authenticator: {:?}, sender_inbox_id: {:?}, sender_installation_id: {:?}, commit_type: {:?} }}",
108            self.rowid,
109            &self.group_id.snippet(),
110            self.commit_sequence_id,
111            &self.last_epoch_authenticator.snippet(),
112            self.commit_result,
113            self.error_message,
114            self.applied_epoch_number,
115            self.applied_epoch_authenticator.snippet(),
116            self.sender_inbox_id.snippet(),
117            self.sender_installation_id.snippet(),
118            self.commit_type
119        )
120    }
121}
122
123pub enum LocalCommitLogOrder {
124    AscendingByRowid,
125    DescendingByRowid,
126}
127
128pub trait QueryLocalCommitLog {
129    fn get_group_logs(
130        &self,
131        group_id: &[u8],
132    ) -> Result<Vec<LocalCommitLog>, crate::ConnectionError>;
133
134    // Local commit log entries are returned sorted in ascending order of `rowid`
135    // Entries with `commit_sequence_id` = 0 should not be published to the remote commit log
136    fn get_local_commit_log_after_cursor(
137        &self,
138        group_id: &[u8],
139        after_cursor: i64,
140        order_by: LocalCommitLogOrder,
141    ) -> Result<Vec<LocalCommitLog>, crate::ConnectionError>;
142
143    fn get_latest_log_for_group(
144        &self,
145        group_id: &[u8],
146    ) -> Result<Option<LocalCommitLog>, crate::ConnectionError>;
147
148    fn get_local_commit_log_cursor(
149        &self,
150        group_id: &[u8],
151    ) -> Result<Option<i32>, crate::ConnectionError>;
152}
153
154impl<T> QueryLocalCommitLog for &T
155where
156    T: QueryLocalCommitLog,
157{
158    fn get_group_logs(
159        &self,
160        group_id: &[u8],
161    ) -> Result<Vec<LocalCommitLog>, crate::ConnectionError> {
162        (**self).get_group_logs(group_id)
163    }
164
165    fn get_local_commit_log_after_cursor(
166        &self,
167        group_id: &[u8],
168        after_cursor: i64,
169        order_by: LocalCommitLogOrder,
170    ) -> Result<Vec<LocalCommitLog>, crate::ConnectionError> {
171        (**self).get_local_commit_log_after_cursor(group_id, after_cursor, order_by)
172    }
173
174    fn get_latest_log_for_group(
175        &self,
176        group_id: &[u8],
177    ) -> Result<Option<LocalCommitLog>, crate::ConnectionError> {
178        (**self).get_latest_log_for_group(group_id)
179    }
180
181    fn get_local_commit_log_cursor(
182        &self,
183        group_id: &[u8],
184    ) -> Result<Option<i32>, crate::ConnectionError> {
185        (**self).get_local_commit_log_cursor(group_id)
186    }
187}
188
189impl<C: ConnectionExt> QueryLocalCommitLog for DbConnection<C> {
190    fn get_group_logs(
191        &self,
192        group_id: &[u8],
193    ) -> Result<Vec<LocalCommitLog>, crate::ConnectionError> {
194        self.raw_query_read(|db| {
195            dsl::local_commit_log
196                .filter(dsl::group_id.eq(group_id))
197                .order_by(dsl::rowid.asc())
198                .load(db)
199        })
200    }
201
202    // Local commit log entries are sorted by `rowid`
203    // Entries with `commit_sequence_id` = 0 should not be published to the remote commit log
204    fn get_local_commit_log_after_cursor(
205        &self,
206        group_id: &[u8],
207        after_cursor: i64,
208        order: LocalCommitLogOrder,
209    ) -> Result<Vec<LocalCommitLog>, crate::ConnectionError> {
210        // i64 cursor is populated by i32 local_commit_log rowid value, so we should never hit this error
211        if after_cursor > i32::MAX as i64 {
212            return Err(crate::ConnectionError::Database(
213                diesel::result::Error::QueryBuilderError("Cursor value exceeds i32::MAX".into()),
214            ));
215        }
216        let after_cursor = after_cursor as i32;
217
218        let query = dsl::local_commit_log
219            .filter(dsl::group_id.eq(group_id))
220            .filter(dsl::rowid.gt(after_cursor))
221            .filter(dsl::commit_sequence_id.ne(0));
222
223        self.raw_query_read(|db| match order {
224            LocalCommitLogOrder::AscendingByRowid => query.order_by(dsl::rowid.asc()).load(db),
225            LocalCommitLogOrder::DescendingByRowid => query.order_by(dsl::rowid.desc()).load(db),
226        })
227    }
228
229    fn get_latest_log_for_group(
230        &self,
231        group_id: &[u8],
232    ) -> Result<Option<LocalCommitLog>, crate::ConnectionError> {
233        self.raw_query_read(|db| {
234            dsl::local_commit_log
235                .filter(dsl::group_id.eq(group_id))
236                .order_by(dsl::rowid.desc())
237                .limit(1)
238                .first(db)
239                .optional()
240        })
241    }
242
243    fn get_local_commit_log_cursor(
244        &self,
245        group_id: &[u8],
246    ) -> Result<Option<i32>, crate::ConnectionError> {
247        let query = dsl::local_commit_log
248            .filter(dsl::group_id.eq(group_id))
249            .select(dsl::rowid)
250            .order(dsl::rowid.desc())
251            .limit(1);
252
253        self.raw_query_read(|conn| query.first::<i32>(conn).optional())
254    }
255}