1use super::{DbConnection, remote_commit_log::CommitResult, schema::local_commit_log::dsl};
2use crate::{ConnectionExt, impl_store, schema::local_commit_log};
3use diesel::{Insertable, Queryable, prelude::*};
4use xmtp_common::snippet::Snippet;
5use xmtp_proto::xmtp::mls::message_contents::PlaintextCommitLogEntry;
6
7pub enum CommitType {
8 GroupCreation,
9 BackupRestore,
10 Welcome,
11 KeyUpdate,
12 MetadataUpdate,
13 UpdateGroupMembership,
14 UpdateAdminList,
15 UpdatePermission,
16}
17
18impl std::fmt::Display for CommitType {
19 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 let description = match self {
21 CommitType::GroupCreation => "GroupCreation",
22 CommitType::BackupRestore => "BackupRestore",
23 CommitType::Welcome => "Welcome",
24 CommitType::KeyUpdate => "KeyUpdate",
25 CommitType::MetadataUpdate => "MetadataUpdate",
26 CommitType::UpdateGroupMembership => "UpdateGroupMembership",
27 CommitType::UpdateAdminList => "UpdateAdminList",
28 CommitType::UpdatePermission => "UpdatePermission",
29 };
30 write!(f, "{}", description)
31 }
32}
33
34#[derive(Insertable, Debug, Clone)]
35#[diesel(table_name = local_commit_log)]
36pub struct NewLocalCommitLog {
37 pub group_id: Vec<u8>,
38 pub commit_sequence_id: i64,
39 pub last_epoch_authenticator: Vec<u8>,
40 pub commit_result: CommitResult,
41 pub applied_epoch_number: i64,
42 pub applied_epoch_authenticator: Vec<u8>,
43 pub error_message: Option<String>,
44 pub sender_inbox_id: Option<String>,
45 pub sender_installation_id: Option<Vec<u8>>,
46 pub commit_type: Option<String>,
47}
48
49#[derive(Queryable, Clone)]
50#[diesel(table_name = local_commit_log)]
51#[diesel(primary_key(id))]
52pub struct LocalCommitLog {
53 pub rowid: i32,
54 pub group_id: Vec<u8>,
55 pub commit_sequence_id: i64,
56 pub last_epoch_authenticator: Vec<u8>,
57 pub commit_result: CommitResult,
58 pub applied_epoch_number: i64,
59 pub applied_epoch_authenticator: Vec<u8>,
60 pub error_message: Option<String>,
61 pub sender_inbox_id: Option<String>,
62 pub sender_installation_id: Option<Vec<u8>>,
63 pub commit_type: Option<String>,
64}
65
66impl From<&LocalCommitLog> for PlaintextCommitLogEntry {
67 fn from(local_commit_log: &LocalCommitLog) -> Self {
68 PlaintextCommitLogEntry {
69 group_id: local_commit_log.group_id.clone(),
70 commit_sequence_id: local_commit_log.commit_sequence_id as u64,
71 last_epoch_authenticator: local_commit_log.last_epoch_authenticator.clone(),
72 commit_result: local_commit_log.commit_result.into(),
73 applied_epoch_number: local_commit_log.applied_epoch_number as u64,
74 applied_epoch_authenticator: local_commit_log.applied_epoch_authenticator.clone(),
75 }
76 }
77}
78
79impl From<CommitResult> for i32 {
80 fn from(commit_result: CommitResult) -> Self {
81 match commit_result {
82 CommitResult::Success => {
83 xmtp_proto::xmtp::mls::message_contents::CommitResult::Applied as i32
84 }
85 CommitResult::WrongEpoch => {
86 xmtp_proto::xmtp::mls::message_contents::CommitResult::WrongEpoch as i32
87 }
88 CommitResult::Undecryptable => {
89 xmtp_proto::xmtp::mls::message_contents::CommitResult::Undecryptable as i32
90 }
91 CommitResult::Invalid => {
92 xmtp_proto::xmtp::mls::message_contents::CommitResult::Invalid as i32
93 }
94 CommitResult::Unknown => {
95 xmtp_proto::xmtp::mls::message_contents::CommitResult::Unspecified as i32
96 }
97 }
98 }
99}
100
101impl_store!(NewLocalCommitLog, local_commit_log);
102
103impl std::fmt::Debug for LocalCommitLog {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 write!(
106 f,
107 "LocalCommitLog {{ rowid: {:?}, group_id {:?}, commit_sequence_id: {:?}, last_epoch_authenticator: {:?}, commit_result: {:?}, error_message: {:?}, applied_epoch_number: {:?}, applied_epoch_authenticator: {:?}, sender_inbox_id: {:?}, sender_installation_id: {:?}, commit_type: {:?} }}",
108 self.rowid,
109 &self.group_id.snippet(),
110 self.commit_sequence_id,
111 &self.last_epoch_authenticator.snippet(),
112 self.commit_result,
113 self.error_message,
114 self.applied_epoch_number,
115 self.applied_epoch_authenticator.snippet(),
116 self.sender_inbox_id.snippet(),
117 self.sender_installation_id.snippet(),
118 self.commit_type
119 )
120 }
121}
122
123pub enum LocalCommitLogOrder {
124 AscendingByRowid,
125 DescendingByRowid,
126}
127
128pub trait QueryLocalCommitLog {
129 fn get_group_logs(
130 &self,
131 group_id: &[u8],
132 ) -> Result<Vec<LocalCommitLog>, crate::ConnectionError>;
133
134 fn get_local_commit_log_after_cursor(
137 &self,
138 group_id: &[u8],
139 after_cursor: i64,
140 order_by: LocalCommitLogOrder,
141 ) -> Result<Vec<LocalCommitLog>, crate::ConnectionError>;
142
143 fn get_latest_log_for_group(
144 &self,
145 group_id: &[u8],
146 ) -> Result<Option<LocalCommitLog>, crate::ConnectionError>;
147
148 fn get_local_commit_log_cursor(
149 &self,
150 group_id: &[u8],
151 ) -> Result<Option<i32>, crate::ConnectionError>;
152}
153
154impl<T> QueryLocalCommitLog for &T
155where
156 T: QueryLocalCommitLog,
157{
158 fn get_group_logs(
159 &self,
160 group_id: &[u8],
161 ) -> Result<Vec<LocalCommitLog>, crate::ConnectionError> {
162 (**self).get_group_logs(group_id)
163 }
164
165 fn get_local_commit_log_after_cursor(
166 &self,
167 group_id: &[u8],
168 after_cursor: i64,
169 order_by: LocalCommitLogOrder,
170 ) -> Result<Vec<LocalCommitLog>, crate::ConnectionError> {
171 (**self).get_local_commit_log_after_cursor(group_id, after_cursor, order_by)
172 }
173
174 fn get_latest_log_for_group(
175 &self,
176 group_id: &[u8],
177 ) -> Result<Option<LocalCommitLog>, crate::ConnectionError> {
178 (**self).get_latest_log_for_group(group_id)
179 }
180
181 fn get_local_commit_log_cursor(
182 &self,
183 group_id: &[u8],
184 ) -> Result<Option<i32>, crate::ConnectionError> {
185 (**self).get_local_commit_log_cursor(group_id)
186 }
187}
188
189impl<C: ConnectionExt> QueryLocalCommitLog for DbConnection<C> {
190 fn get_group_logs(
191 &self,
192 group_id: &[u8],
193 ) -> Result<Vec<LocalCommitLog>, crate::ConnectionError> {
194 self.raw_query_read(|db| {
195 dsl::local_commit_log
196 .filter(dsl::group_id.eq(group_id))
197 .order_by(dsl::rowid.asc())
198 .load(db)
199 })
200 }
201
202 fn get_local_commit_log_after_cursor(
205 &self,
206 group_id: &[u8],
207 after_cursor: i64,
208 order: LocalCommitLogOrder,
209 ) -> Result<Vec<LocalCommitLog>, crate::ConnectionError> {
210 if after_cursor > i32::MAX as i64 {
212 return Err(crate::ConnectionError::Database(
213 diesel::result::Error::QueryBuilderError("Cursor value exceeds i32::MAX".into()),
214 ));
215 }
216 let after_cursor = after_cursor as i32;
217
218 let query = dsl::local_commit_log
219 .filter(dsl::group_id.eq(group_id))
220 .filter(dsl::rowid.gt(after_cursor))
221 .filter(dsl::commit_sequence_id.ne(0));
222
223 self.raw_query_read(|db| match order {
224 LocalCommitLogOrder::AscendingByRowid => query.order_by(dsl::rowid.asc()).load(db),
225 LocalCommitLogOrder::DescendingByRowid => query.order_by(dsl::rowid.desc()).load(db),
226 })
227 }
228
229 fn get_latest_log_for_group(
230 &self,
231 group_id: &[u8],
232 ) -> Result<Option<LocalCommitLog>, crate::ConnectionError> {
233 self.raw_query_read(|db| {
234 dsl::local_commit_log
235 .filter(dsl::group_id.eq(group_id))
236 .order_by(dsl::rowid.desc())
237 .limit(1)
238 .first(db)
239 .optional()
240 })
241 }
242
243 fn get_local_commit_log_cursor(
244 &self,
245 group_id: &[u8],
246 ) -> Result<Option<i32>, crate::ConnectionError> {
247 let query = dsl::local_commit_log
248 .filter(dsl::group_id.eq(group_id))
249 .select(dsl::rowid)
250 .order(dsl::rowid.desc())
251 .limit(1);
252
253 self.raw_query_read(|conn| query.first::<i32>(conn).optional())
254 }
255}