1#![warn(clippy::unwrap_used)]
2
3pub mod encrypted_store;
4mod errors;
5pub mod serialization;
6pub use serialization::*;
7pub mod sql_key_store;
8mod traits;
9pub use traits::*;
10pub mod xmtp_openmls_provider;
11pub use xmtp_openmls_provider::*;
12#[cfg(any(feature = "test-utils", test))]
13pub mod mock;
14
15#[cfg(any(test, feature = "test-utils"))]
16pub mod test_utils;
17#[cfg(any(test, feature = "test-utils"))]
18pub use test_utils::*;
19
20pub use diesel;
21pub use encrypted_store::*;
22pub use errors::*;
23pub use xmtp_proto as proto;
24
25use diesel::connection::SimpleConnection;
26
27use crate::sql_key_store::SqlKeyStore;
28
29pub type DefaultStore = EncryptedMessageStore<database::DefaultDatabase>;
31pub type DefaultDbConnection = <DefaultStore as XmtpDb>::DbQuery;
32pub type DefaultMlsStore = SqlKeyStore<<DefaultStore as XmtpDb>::DbQuery>;
33
34pub mod prelude {
35 pub use super::ReadOnly;
36 pub use super::association_state::QueryAssociationStateCache;
37 pub use super::consent_record::QueryConsentRecord;
38 pub use super::conversation_list::QueryConversationList;
39 pub use super::group::QueryDms;
40 pub use super::group::QueryGroup;
41 pub use super::group::QueryGroupVersion;
42 pub use super::group_intent::QueryGroupIntent;
43 pub use super::group_message::QueryGroupMessage;
44 pub use super::icebox::QueryIcebox;
45 pub use super::identity::QueryIdentity;
46 pub use super::identity_cache::QueryIdentityCache;
47 pub use super::identity_update::QueryIdentityUpdates;
48 pub use super::key_package_history::QueryKeyPackageHistory;
49 pub use super::key_store_entry::QueryKeyStoreEntry;
50 pub use super::local_commit_log::QueryLocalCommitLog;
51 pub use super::migrations::QueryMigrations;
52 pub use super::pragmas::Pragmas;
53 pub use super::processed_device_sync_messages::QueryDeviceSyncMessages;
54 pub use super::readd_status::QueryReaddStatus;
55 pub use super::refresh_state::QueryRefreshState;
56 pub use super::remote_commit_log::QueryRemoteCommitLog;
57 pub use super::tasks::QueryTasks;
58 pub use super::traits::*;
59}
60
61pub trait ReadOnly {
62 #[allow(unused)]
63 fn enable_readonly(&self) -> Result<(), StorageError>;
64
65 #[allow(unused)]
66 fn disable_readonly(&self) -> Result<(), StorageError>;
67}
68
69impl<C: ConnectionExt> ReadOnly for DbConnection<C> {
70 #[allow(unused)]
71 fn enable_readonly(&self) -> Result<(), StorageError> {
72 self.raw_query_write(|conn| conn.batch_execute("PRAGMA query_only = ON;"))?;
73 Ok(())
74 }
75
76 #[allow(unused)]
77 fn disable_readonly(&self) -> Result<(), StorageError> {
78 self.raw_query_write(|conn| conn.batch_execute("PRAGMA query_only = OFF;"))?;
79 Ok(())
80 }
81}
82
83impl<T> ReadOnly for &T
84where
85 T: ReadOnly,
86{
87 #[allow(unused)]
88 fn enable_readonly(&self) -> Result<(), StorageError> {
89 (**self).enable_readonly()
90 }
91
92 #[allow(unused)]
93 fn disable_readonly(&self) -> Result<(), StorageError> {
94 (**self).disable_readonly()
95 }
96}
97
98#[cfg(target_arch = "wasm32")]
99pub async fn init_sqlite() {
100 }
102#[cfg_attr(not(target_arch = "wasm32"), ctor::ctor)]
103#[cfg(all(test, not(target_arch = "wasm32")))]
104fn test_setup() {
105 xmtp_common::logger();
106}
107
108#[cfg(not(target_arch = "wasm32"))]
109pub async fn init_sqlite() {}
110
111#[cfg(any(test, feature = "test-utils"))]
112pub mod test_util {
113 #![allow(clippy::unwrap_used)]
114
115 use crate::group_message::{ContentType, GroupMessageKind, StoredGroupMessage};
116
117 use super::*;
118 use ascii_table::AsciiTable;
119 use diesel::{
120 ExpressionMethods, RunQueryDsl, connection::LoadConnection, deserialize::FromSqlRow,
121 sql_query,
122 };
123
124 impl<C: ConnectionExt> DbConnection<C> {
125 pub fn register_triggers(&self) {
127 tracing::info!("Registering triggers");
128 let queries = vec![
129 r#"
130 CREATE TABLE test_metadata (
131 intents_created INT DEFAULT 0,
132 intents_published INT DEFAULT 0,
133 intents_deleted INT DEFAULT 0,
134 intents_processed INT DEFAULT 0,
135 rowid integer PRIMARY KEY CHECK (rowid = 1) -- There can only be one meta
136 );
137 "#,
138 r#"
139 -- Create a table to store history of deleted intent payload hashes
140 CREATE TABLE deleted_intents_history (
141 id INTEGER PRIMARY KEY AUTOINCREMENT,
142 intent_id INTEGER NOT NULL,
143 payload_hash BLOB,
144 deleted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
145 );
146 "#,
147 r#"
148 -- Create a table to store history of key package rotation timestamps
149 CREATE TABLE key_package_rotation_history (
150 id INTEGER PRIMARY KEY AUTOINCREMENT,
151 next_key_package_rotation_ns BIGINT,
152 updated_at BIGINT NOT NULL
153 );
154 "#,
155 r#"
156 -- Modify the deletion trigger to record payload hash history
157 CREATE TRIGGER intents_deleted_tracking AFTER DELETE ON group_intents
158 FOR EACH ROW
159 BEGIN
160 -- Update the counter in test_metadata
161 UPDATE test_metadata SET intents_deleted = intents_deleted + 1;
162 -- Insert the deleted intent's information into history table
163 INSERT INTO deleted_intents_history (intent_id, payload_hash)
164 VALUES (OLD.id, OLD.payload_hash);
165 END;
166 "#,
167 r#"CREATE TRIGGER intents_created_tracking AFTER INSERT on group_intents
168 BEGIN
169 UPDATE test_metadata SET intents_created = intents_created + 1;
170 END;"#,
171 r#"CREATE TRIGGER intents_published_tracking AFTER UPDATE OF state ON group_intents
172 FOR EACH ROW
173 WHEN NEW.state = 2 AND OLD.state !=2
174 BEGIN
175 UPDATE test_metadata SET intents_published = intents_published + 1;
176 END;"#,
177 r#"CREATE TRIGGER intents_processed_tracking AFTER UPDATE OF state ON group_intents
178 FOR EACH ROW
179 WHEN NEW.state = 5
180 BEGIN
181 UPDATE test_metadata SET intents_processed = intents_processed + 1;
182 END;"#,
183 r#"
184 CREATE TRIGGER track_key_package_rotation AFTER UPDATE OF next_key_package_rotation_ns ON identity
185 FOR EACH ROW
186 WHEN OLD.next_key_package_rotation_ns IS NOT NEW.next_key_package_rotation_ns
187 BEGIN
188 INSERT INTO key_package_rotation_history (next_key_package_rotation_ns, updated_at)
189 VALUES (NEW.next_key_package_rotation_ns, (strftime('%s', 'now') || substr(strftime('%f', 'now'), 4)) * 1);
190 END;
191 "#,
192 r#"INSERT INTO test_metadata (
193 intents_created,
194 intents_deleted,
195 intents_published,
196 intents_processed
197 ) VALUES (0, 0, 0, 0);"#,
198 ];
199 for query in queries {
200 let query = diesel::sql_query(query);
201 let _ = self.raw_query_write(|conn| query.execute(conn)).unwrap();
202 }
203 }
204
205 pub fn disable_memory_security(&self) {
207 let query = r#"PRAGMA cipher_memory_security = OFF"#;
208 let query = diesel::sql_query(query);
209 let _ = self.raw_query_read(|c| query.clone().execute(c)).unwrap();
210 let _ = self.raw_query_write(|c| query.execute(c)).unwrap();
211 }
212
213 pub fn intents_published(&self) -> i32 {
214 self.raw_query_read(|conn| {
215 let mut row = conn
216 .load(sql_query(
217 "SELECT intents_published FROM test_metadata WHERE rowid = 1",
218 ))
219 .unwrap();
220 let row = row.next().unwrap().unwrap();
221 Ok(
222 <i32 as FromSqlRow<diesel::sql_types::Integer, _>>::build_from_row(&row)
223 .unwrap(),
224 )
225 })
226 .unwrap()
227 }
228
229 pub fn intents_processed(&self) -> i32 {
230 self.raw_query_read(|conn| {
231 let mut row = conn
232 .load(sql_query(
233 "SELECT intents_processed FROM test_metadata WHERE rowid = 1",
234 ))
235 .unwrap();
236 let row = row.next().unwrap().unwrap();
237 Ok(
238 <i32 as FromSqlRow<diesel::sql_types::Integer, _>>::build_from_row(&row)
239 .unwrap(),
240 )
241 })
242 .unwrap()
243 }
244
245 pub fn intents_deleted(&self) -> i32 {
246 self.raw_query_read(|conn| {
247 let mut row = conn
248 .load(sql_query("SELECT intents_deleted FROM test_metadata"))
249 .unwrap();
250 let row = row.next().unwrap().unwrap();
251 Ok(
252 <i32 as FromSqlRow<diesel::sql_types::Integer, _>>::build_from_row(&row)
253 .unwrap(),
254 )
255 })
256 .unwrap()
257 }
258
259 pub fn intent_payloads_deleted(&self) -> Vec<Vec<u8>> {
260 let mut hashes = vec![];
261 self.raw_query_read(|conn| {
262 let row = conn
263 .load(sql_query(
264 "SELECT payload_hash FROM deleted_intents_history",
265 ))
266 .unwrap();
267 for r in row {
268 hashes.push(
269 <Vec<u8> as FromSqlRow<diesel::sql_types::Binary, _>>::build_from_row(
270 &r.unwrap(),
271 )
272 .unwrap(),
273 );
274 }
275 Ok(())
276 })
277 .unwrap();
278 hashes
279 }
280
281 pub fn intents_created(&self) -> i32 {
282 self.raw_query_read(|conn| {
283 let mut row = conn
284 .load(sql_query("SELECT intents_created FROM test_metadata"))
285 .unwrap();
286 let row = row.next().unwrap().unwrap();
287 Ok(
288 <i32 as FromSqlRow<diesel::sql_types::Integer, _>>::build_from_row(&row)
289 .unwrap(),
290 )
291 })
292 .unwrap()
293 }
294
295 pub fn missing_messages(&self, sequence_ids: &[u64]) -> Vec<StoredGroupMessage> {
296 use crate::schema::group_messages::{self, dsl};
297 use diesel::QueryDsl;
298 let sequence_ids: Vec<i64> = sequence_ids.iter().copied().map(|id| id as i64).collect();
299 let query = dsl::group_messages
300 .filter(dsl::sequence_id.is_not_null())
301 .filter(group_messages::sequence_id.ne_all(sequence_ids))
302 .filter(group_messages::kind.eq(GroupMessageKind::Application))
303 .order(group_messages::sequence_id.asc());
304
305 self.raw_query_read(|conn| query.load(conn)).unwrap()
306 }
307
308 pub fn key_package_rotation_history(&self) -> Vec<(i64, i64)> {
309 let mut history = vec![];
310 self.raw_query_read(|conn| {
311 let rows = conn
312 .load(sql_query(
313 "SELECT next_key_package_rotation_ns, updated_at FROM key_package_rotation_history ORDER BY id ASC",
314 ))
315 .unwrap();
316 for row in rows {
317 let row = row.unwrap();
318 let rotation_ns = <i64 as FromSqlRow<diesel::sql_types::BigInt, _>>::build_from_row(&row)
319 .unwrap();
320 let updated_at = <i64 as FromSqlRow<diesel::sql_types::BigInt, _>>::build_from_row(&row)
321 .unwrap();
322 history.push((rotation_ns, updated_at));
323 }
324 Ok(())
325 })
326 .unwrap();
327 history
328 }
329
330 pub fn print_db(&self) {
333 let proto_content_type_header = hex::decode(
338 "0a240a08786d74702e6f726712166170706c69636174696f6e2f782d70726f746f627566",
339 )
340 .unwrap();
341 let format_msg = |m: &StoredGroupMessage| -> String {
342 if m.kind == GroupMessageKind::MembershipChange {
343 return "transcript".to_string();
344 }
345 if m.content_type != ContentType::Unknown {
346 return "encoded message".to_string();
347 }
348
349 if m.decrypted_message_bytes
350 .starts_with(&proto_content_type_header)
351 {
352 return "unknown encoded type".to_string();
353 }
354
355 match String::from_utf8(m.decrypted_message_bytes.clone()) {
356 Ok(s) => s,
357 Err(_) => "unknown encoded type".to_string(),
358 }
359 };
360 let mut t = AsciiTable::default();
361
362 println!("\n=== group_messages ===");
363 let msgs: Vec<crate::group_message::StoredGroupMessage> = self
364 .raw_query_read(|c| crate::schema::group_messages::table.load(c))
365 .unwrap_or_default();
366 t.column(0).set_header("id");
367 t.column(1).set_header("group_id");
368 t.column(2).set_header("sent_at");
369 t.column(3).set_header("kind");
370 t.column(4).set_header("sender_inbox_id");
371 t.column(5).set_header("delivery_status");
372 t.column(6).set_header("content_type");
373 t.column(7).set_header("originator_id");
374 t.column(8).set_header("sequence_id");
375 t.column(9).set_header("message");
376 let rows: Vec<Vec<String>> = msgs
377 .iter()
378 .map(|m| {
379 vec![
380 hex::encode(&m.id)[..16].to_string(),
381 hex::encode(&m.group_id)[..16].to_string(),
382 m.sent_at_ns.to_string(),
383 format!("{:?}", m.kind),
384 m.sender_inbox_id.clone(),
385 format!("{:?}", m.delivery_status),
386 m.content_type.to_string(),
387 m.originator_id.to_string(),
388 m.sequence_id.to_string(),
389 format_msg(m),
390 ]
391 })
392 .collect();
393 if rows.is_empty() {
394 println!("(empty)");
395 } else {
396 t.println(rows);
397 }
398
399 let mut t = AsciiTable::default();
400 println!("\n=== refresh_state ===");
401 let states: Vec<crate::refresh_state::RefreshState> = self
402 .raw_query_read(|c| crate::schema::refresh_state::table.load(c))
403 .unwrap_or_default();
404 t.column(0).set_header("entity_id");
405 t.column(1).set_header("entity_kind");
406 t.column(2).set_header("originator_id");
407 t.column(3).set_header("sequence_id");
408 let rows: Vec<Vec<String>> = states
409 .iter()
410 .map(|s| {
411 vec![
412 hex::encode(&s.entity_id)[..16.min(s.entity_id.len() * 2)].to_string(),
413 format!("{:?}", s.entity_kind),
414 s.originator_id.to_string(),
415 s.sequence_id.to_string(),
416 ]
417 })
418 .collect();
419 if rows.is_empty() {
420 println!("(empty)");
421 } else {
422 t.println(rows);
423 }
424
425 let mut t = AsciiTable::default();
426 t.column(0).set_header("originator_id");
427 t.column(1).set_header("sequence_id");
428 t.column(2).set_header("group_id");
429 t.column(3).set_header("envelope_payload");
430 println!("\n=== icebox ===");
431 let ice: Vec<crate::icebox::Icebox> = self
432 .raw_query_read(|c| crate::schema::icebox::table.load(c))
433 .unwrap_or_default();
434 let rows: Vec<Vec<String>> = ice
435 .iter()
436 .map(|i| {
437 vec![
438 i.originator_id.to_string(),
439 i.sequence_id.to_string(),
440 hex::encode(&i.group_id)[..16].to_string(),
441 hex::encode(&i.envelope_payload)[..20.min(i.envelope_payload.len() * 2)]
442 .to_string(),
443 ]
444 })
445 .collect();
446 if rows.is_empty() {
447 println!("(empty)");
448 } else {
449 t.println(rows);
450 }
451 }
452 }
453}