xmtp_db/
lib.rs

1#![warn(clippy::unwrap_used)]
2
3pub mod encrypted_store;
4mod errors;
5pub mod serialization;
6pub use serialization::*;
7pub mod sql_key_store;
8mod traits;
9pub use traits::*;
10pub mod xmtp_openmls_provider;
11pub use xmtp_openmls_provider::*;
12#[cfg(any(feature = "test-utils", test))]
13pub mod mock;
14
15#[cfg(any(test, feature = "test-utils"))]
16pub mod test_utils;
17#[cfg(any(test, feature = "test-utils"))]
18pub use test_utils::*;
19
20pub use diesel;
21pub use encrypted_store::*;
22pub use errors::*;
23pub use xmtp_proto as proto;
24
25use diesel::connection::SimpleConnection;
26
27use crate::sql_key_store::SqlKeyStore;
28
29/// The default platform-specific store
30pub type DefaultStore = EncryptedMessageStore<database::DefaultDatabase>;
31pub type DefaultDbConnection = <DefaultStore as XmtpDb>::DbQuery;
32pub type DefaultMlsStore = SqlKeyStore<<DefaultStore as XmtpDb>::DbQuery>;
33
34pub mod prelude {
35    pub use super::ReadOnly;
36    pub use super::association_state::QueryAssociationStateCache;
37    pub use super::consent_record::QueryConsentRecord;
38    pub use super::conversation_list::QueryConversationList;
39    pub use super::group::QueryDms;
40    pub use super::group::QueryGroup;
41    pub use super::group::QueryGroupVersion;
42    pub use super::group_intent::QueryGroupIntent;
43    pub use super::group_message::QueryGroupMessage;
44    pub use super::icebox::QueryIcebox;
45    pub use super::identity::QueryIdentity;
46    pub use super::identity_cache::QueryIdentityCache;
47    pub use super::identity_update::QueryIdentityUpdates;
48    pub use super::key_package_history::QueryKeyPackageHistory;
49    pub use super::key_store_entry::QueryKeyStoreEntry;
50    pub use super::local_commit_log::QueryLocalCommitLog;
51    pub use super::migrations::QueryMigrations;
52    pub use super::pragmas::Pragmas;
53    pub use super::processed_device_sync_messages::QueryDeviceSyncMessages;
54    pub use super::readd_status::QueryReaddStatus;
55    pub use super::refresh_state::QueryRefreshState;
56    pub use super::remote_commit_log::QueryRemoteCommitLog;
57    pub use super::tasks::QueryTasks;
58    pub use super::traits::*;
59}
60
61pub trait ReadOnly {
62    #[allow(unused)]
63    fn enable_readonly(&self) -> Result<(), StorageError>;
64
65    #[allow(unused)]
66    fn disable_readonly(&self) -> Result<(), StorageError>;
67}
68
69impl<C: ConnectionExt> ReadOnly for DbConnection<C> {
70    #[allow(unused)]
71    fn enable_readonly(&self) -> Result<(), StorageError> {
72        self.raw_query_write(|conn| conn.batch_execute("PRAGMA query_only = ON;"))?;
73        Ok(())
74    }
75
76    #[allow(unused)]
77    fn disable_readonly(&self) -> Result<(), StorageError> {
78        self.raw_query_write(|conn| conn.batch_execute("PRAGMA query_only = OFF;"))?;
79        Ok(())
80    }
81}
82
83impl<T> ReadOnly for &T
84where
85    T: ReadOnly,
86{
87    #[allow(unused)]
88    fn enable_readonly(&self) -> Result<(), StorageError> {
89        (**self).enable_readonly()
90    }
91
92    #[allow(unused)]
93    fn disable_readonly(&self) -> Result<(), StorageError> {
94        (**self).disable_readonly()
95    }
96}
97
98#[cfg(target_arch = "wasm32")]
99pub async fn init_sqlite() {
100    // This is a no-op for wasm32
101}
102#[cfg_attr(not(target_arch = "wasm32"), ctor::ctor)]
103#[cfg(all(test, not(target_arch = "wasm32")))]
104fn test_setup() {
105    xmtp_common::logger();
106}
107
108#[cfg(not(target_arch = "wasm32"))]
109pub async fn init_sqlite() {}
110
111#[cfg(any(test, feature = "test-utils"))]
112pub mod test_util {
113    #![allow(clippy::unwrap_used)]
114
115    use crate::group_message::{ContentType, GroupMessageKind, StoredGroupMessage};
116
117    use super::*;
118    use ascii_table::AsciiTable;
119    use diesel::{
120        ExpressionMethods, RunQueryDsl, connection::LoadConnection, deserialize::FromSqlRow,
121        sql_query,
122    };
123
124    impl<C: ConnectionExt> DbConnection<C> {
125        /// Create a new table and register triggers for tracking column updates
126        pub fn register_triggers(&self) {
127            tracing::info!("Registering triggers");
128            let queries = vec![
129                r#"
130                 CREATE TABLE test_metadata (
131                     intents_created INT DEFAULT 0,
132                     intents_published INT DEFAULT 0,
133                     intents_deleted INT DEFAULT 0,
134                     intents_processed INT DEFAULT 0,
135                     rowid integer PRIMARY KEY CHECK (rowid = 1) -- There can only be one meta
136                 );
137                 "#,
138                r#"
139                 -- Create a table to store history of deleted intent payload hashes
140                 CREATE TABLE deleted_intents_history (
141                     id INTEGER PRIMARY KEY AUTOINCREMENT,
142                     intent_id INTEGER NOT NULL,
143                     payload_hash BLOB,
144                     deleted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
145                 );
146                 "#,
147                r#"
148                 -- Create a table to store history of key package rotation timestamps
149                 CREATE TABLE key_package_rotation_history (
150                     id INTEGER PRIMARY KEY AUTOINCREMENT,
151                     next_key_package_rotation_ns BIGINT,
152                     updated_at BIGINT NOT NULL
153                 );
154                 "#,
155                r#"
156                 -- Modify the deletion trigger to record payload hash history
157                 CREATE TRIGGER intents_deleted_tracking AFTER DELETE ON group_intents
158                 FOR EACH ROW
159                 BEGIN
160                     -- Update the counter in test_metadata
161                     UPDATE test_metadata SET intents_deleted = intents_deleted + 1;
162                     -- Insert the deleted intent's information into history table
163                     INSERT INTO deleted_intents_history (intent_id, payload_hash)
164                     VALUES (OLD.id, OLD.payload_hash);
165                 END;
166                 "#,
167                r#"CREATE TRIGGER intents_created_tracking AFTER INSERT on group_intents
168                 BEGIN
169                     UPDATE test_metadata SET intents_created = intents_created + 1;
170                 END;"#,
171                r#"CREATE TRIGGER intents_published_tracking AFTER UPDATE OF state ON group_intents
172                 FOR EACH ROW
173                 WHEN NEW.state = 2 AND OLD.state !=2
174                 BEGIN
175                     UPDATE test_metadata SET intents_published = intents_published + 1;
176                 END;"#,
177                r#"CREATE TRIGGER intents_processed_tracking AFTER UPDATE OF state ON group_intents
178                 FOR EACH ROW
179                 WHEN NEW.state = 5
180                 BEGIN
181                     UPDATE test_metadata SET intents_processed = intents_processed + 1;
182                 END;"#,
183                r#"
184                 CREATE TRIGGER track_key_package_rotation AFTER UPDATE OF next_key_package_rotation_ns ON identity
185                 FOR EACH ROW
186                 WHEN OLD.next_key_package_rotation_ns IS NOT NEW.next_key_package_rotation_ns
187                 BEGIN
188                     INSERT INTO key_package_rotation_history (next_key_package_rotation_ns, updated_at)
189                     VALUES (NEW.next_key_package_rotation_ns, (strftime('%s', 'now') || substr(strftime('%f', 'now'), 4)) * 1);
190                 END;
191                 "#,
192                r#"INSERT INTO test_metadata (
193                     intents_created,
194                     intents_deleted,
195                     intents_published,
196                     intents_processed
197                 ) VALUES (0, 0, 0, 0);"#,
198            ];
199            for query in queries {
200                let query = diesel::sql_query(query);
201                let _ = self.raw_query_write(|conn| query.execute(conn)).unwrap();
202            }
203        }
204
205        /// Disable sqlcipher memory security
206        pub fn disable_memory_security(&self) {
207            let query = r#"PRAGMA cipher_memory_security = OFF"#;
208            let query = diesel::sql_query(query);
209            let _ = self.raw_query_read(|c| query.clone().execute(c)).unwrap();
210            let _ = self.raw_query_write(|c| query.execute(c)).unwrap();
211        }
212
213        pub fn intents_published(&self) -> i32 {
214            self.raw_query_read(|conn| {
215                let mut row = conn
216                    .load(sql_query(
217                        "SELECT intents_published FROM test_metadata WHERE rowid = 1",
218                    ))
219                    .unwrap();
220                let row = row.next().unwrap().unwrap();
221                Ok(
222                    <i32 as FromSqlRow<diesel::sql_types::Integer, _>>::build_from_row(&row)
223                        .unwrap(),
224                )
225            })
226            .unwrap()
227        }
228
229        pub fn intents_processed(&self) -> i32 {
230            self.raw_query_read(|conn| {
231                let mut row = conn
232                    .load(sql_query(
233                        "SELECT intents_processed FROM test_metadata WHERE rowid = 1",
234                    ))
235                    .unwrap();
236                let row = row.next().unwrap().unwrap();
237                Ok(
238                    <i32 as FromSqlRow<diesel::sql_types::Integer, _>>::build_from_row(&row)
239                        .unwrap(),
240                )
241            })
242            .unwrap()
243        }
244
245        pub fn intents_deleted(&self) -> i32 {
246            self.raw_query_read(|conn| {
247                let mut row = conn
248                    .load(sql_query("SELECT intents_deleted FROM test_metadata"))
249                    .unwrap();
250                let row = row.next().unwrap().unwrap();
251                Ok(
252                    <i32 as FromSqlRow<diesel::sql_types::Integer, _>>::build_from_row(&row)
253                        .unwrap(),
254                )
255            })
256            .unwrap()
257        }
258
259        pub fn intent_payloads_deleted(&self) -> Vec<Vec<u8>> {
260            let mut hashes = vec![];
261            self.raw_query_read(|conn| {
262                let row = conn
263                    .load(sql_query(
264                        "SELECT payload_hash FROM deleted_intents_history",
265                    ))
266                    .unwrap();
267                for r in row {
268                    hashes.push(
269                        <Vec<u8> as FromSqlRow<diesel::sql_types::Binary, _>>::build_from_row(
270                            &r.unwrap(),
271                        )
272                        .unwrap(),
273                    );
274                }
275                Ok(())
276            })
277            .unwrap();
278            hashes
279        }
280
281        pub fn intents_created(&self) -> i32 {
282            self.raw_query_read(|conn| {
283                let mut row = conn
284                    .load(sql_query("SELECT intents_created FROM test_metadata"))
285                    .unwrap();
286                let row = row.next().unwrap().unwrap();
287                Ok(
288                    <i32 as FromSqlRow<diesel::sql_types::Integer, _>>::build_from_row(&row)
289                        .unwrap(),
290                )
291            })
292            .unwrap()
293        }
294
295        pub fn missing_messages(&self, sequence_ids: &[u64]) -> Vec<StoredGroupMessage> {
296            use crate::schema::group_messages::{self, dsl};
297            use diesel::QueryDsl;
298            let sequence_ids: Vec<i64> = sequence_ids.iter().copied().map(|id| id as i64).collect();
299            let query = dsl::group_messages
300                .filter(dsl::sequence_id.is_not_null())
301                .filter(group_messages::sequence_id.ne_all(sequence_ids))
302                .filter(group_messages::kind.eq(GroupMessageKind::Application))
303                .order(group_messages::sequence_id.asc());
304
305            self.raw_query_read(|conn| query.load(conn)).unwrap()
306        }
307
308        pub fn key_package_rotation_history(&self) -> Vec<(i64, i64)> {
309            let mut history = vec![];
310            self.raw_query_read(|conn| {
311                 let rows = conn
312                     .load(sql_query(
313                         "SELECT next_key_package_rotation_ns, updated_at FROM key_package_rotation_history ORDER BY id ASC",
314                     ))
315                     .unwrap();
316                 for row in rows {
317                     let row = row.unwrap();
318                     let rotation_ns = <i64 as FromSqlRow<diesel::sql_types::BigInt, _>>::build_from_row(&row)
319                         .unwrap();
320                     let updated_at = <i64 as FromSqlRow<diesel::sql_types::BigInt, _>>::build_from_row(&row)
321                         .unwrap();
322                     history.push((rotation_ns, updated_at));
323                 }
324                 Ok(())
325             })
326             .unwrap();
327            history
328        }
329
330        /// print refresh state, group messages, and icebox tables of the database to stdout in
331        /// column format.
332        pub fn print_db(&self) {
333            // matches
334            // <CR>$xmtp.org application/x-protobuf
335            // can see actual format with hex -> ascii converter
336            // this allows us to ignore noise from protobuf encoded bytes in the message field
337            let proto_content_type_header = hex::decode(
338                "0a240a08786d74702e6f726712166170706c69636174696f6e2f782d70726f746f627566",
339            )
340            .unwrap();
341            let format_msg = |m: &StoredGroupMessage| -> String {
342                if m.kind == GroupMessageKind::MembershipChange {
343                    return "transcript".to_string();
344                }
345                if m.content_type != ContentType::Unknown {
346                    return "encoded message".to_string();
347                }
348
349                if m.decrypted_message_bytes
350                    .starts_with(&proto_content_type_header)
351                {
352                    return "unknown encoded type".to_string();
353                }
354
355                match String::from_utf8(m.decrypted_message_bytes.clone()) {
356                    Ok(s) => s,
357                    Err(_) => "unknown encoded type".to_string(),
358                }
359            };
360            let mut t = AsciiTable::default();
361
362            println!("\n=== group_messages ===");
363            let msgs: Vec<crate::group_message::StoredGroupMessage> = self
364                .raw_query_read(|c| crate::schema::group_messages::table.load(c))
365                .unwrap_or_default();
366            t.column(0).set_header("id");
367            t.column(1).set_header("group_id");
368            t.column(2).set_header("sent_at");
369            t.column(3).set_header("kind");
370            t.column(4).set_header("sender_inbox_id");
371            t.column(5).set_header("delivery_status");
372            t.column(6).set_header("content_type");
373            t.column(7).set_header("originator_id");
374            t.column(8).set_header("sequence_id");
375            t.column(9).set_header("message");
376            let rows: Vec<Vec<String>> = msgs
377                .iter()
378                .map(|m| {
379                    vec![
380                        hex::encode(&m.id)[..16].to_string(),
381                        hex::encode(&m.group_id)[..16].to_string(),
382                        m.sent_at_ns.to_string(),
383                        format!("{:?}", m.kind),
384                        m.sender_inbox_id.clone(),
385                        format!("{:?}", m.delivery_status),
386                        m.content_type.to_string(),
387                        m.originator_id.to_string(),
388                        m.sequence_id.to_string(),
389                        format_msg(m),
390                    ]
391                })
392                .collect();
393            if rows.is_empty() {
394                println!("(empty)");
395            } else {
396                t.println(rows);
397            }
398
399            let mut t = AsciiTable::default();
400            println!("\n=== refresh_state ===");
401            let states: Vec<crate::refresh_state::RefreshState> = self
402                .raw_query_read(|c| crate::schema::refresh_state::table.load(c))
403                .unwrap_or_default();
404            t.column(0).set_header("entity_id");
405            t.column(1).set_header("entity_kind");
406            t.column(2).set_header("originator_id");
407            t.column(3).set_header("sequence_id");
408            let rows: Vec<Vec<String>> = states
409                .iter()
410                .map(|s| {
411                    vec![
412                        hex::encode(&s.entity_id)[..16.min(s.entity_id.len() * 2)].to_string(),
413                        format!("{:?}", s.entity_kind),
414                        s.originator_id.to_string(),
415                        s.sequence_id.to_string(),
416                    ]
417                })
418                .collect();
419            if rows.is_empty() {
420                println!("(empty)");
421            } else {
422                t.println(rows);
423            }
424
425            let mut t = AsciiTable::default();
426            t.column(0).set_header("originator_id");
427            t.column(1).set_header("sequence_id");
428            t.column(2).set_header("group_id");
429            t.column(3).set_header("envelope_payload");
430            println!("\n=== icebox ===");
431            let ice: Vec<crate::icebox::Icebox> = self
432                .raw_query_read(|c| crate::schema::icebox::table.load(c))
433                .unwrap_or_default();
434            let rows: Vec<Vec<String>> = ice
435                .iter()
436                .map(|i| {
437                    vec![
438                        i.originator_id.to_string(),
439                        i.sequence_id.to_string(),
440                        hex::encode(&i.group_id)[..16].to_string(),
441                        hex::encode(&i.envelope_payload)[..20.min(i.envelope_payload.len() * 2)]
442                            .to_string(),
443                    ]
444                })
445                .collect();
446            if rows.is_empty() {
447                println!("(empty)");
448            } else {
449                t.println(rows);
450            }
451        }
452    }
453}