1use crate::StorageError;
2use crate::association_state::QueryAssociationStateCache;
3use crate::group::ConversationType;
4use crate::group::StoredGroupCommitLogPublicKey;
5use crate::group_message::StoredGroupMessage;
6use crate::local_commit_log::{LocalCommitLog, LocalCommitLogOrder};
7use crate::remote_commit_log::{RemoteCommitLog, RemoteCommitLogOrder};
8use std::collections::HashMap;
9use std::sync::Arc;
10use xmtp_proto::types::{Cursor, GlobalCursor, OrphanedEnvelope, Topic};
11use xmtp_proto::xmtp::identity::associations::AssociationState as AssociationStateProto;
12
13use crate::SqliteConnection;
14use crate::prelude::*;
15use mockall::mock;
16use parking_lot::Mutex;
17
18use crate::pending_remove::QueryPendingRemove;
19use crate::{ConnectionError, ConnectionExt};
20
21pub type MockDb = MockDbQuery;
22
23#[derive(Clone)]
24pub struct MockConnection {
25 inner: Arc<Mutex<SqliteConnection>>,
26}
27
28impl std::fmt::Debug for MockConnection {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 write!(f, "MockConnection")
31 }
32}
33
34impl AsRef<MockConnection> for MockConnection {
35 fn as_ref(&self) -> &MockConnection {
36 self
37 }
38}
39
40impl ConnectionExt for MockConnection {
42 fn raw_query_read<T, F>(&self, fun: F) -> Result<T, crate::ConnectionError>
43 where
44 F: FnOnce(&mut SqliteConnection) -> Result<T, diesel::result::Error>,
45 Self: Sized,
46 {
47 let mut conn = self.inner.lock();
48 fun(&mut conn).map_err(ConnectionError::from)
49 }
50
51 fn raw_query_write<T, F>(&self, fun: F) -> Result<T, crate::ConnectionError>
52 where
53 F: FnOnce(&mut SqliteConnection) -> Result<T, diesel::result::Error>,
54 Self: Sized,
55 {
56 let mut conn = self.inner.lock();
57 fun(&mut conn).map_err(ConnectionError::from)
58 }
59
60 fn disconnect(&self) -> Result<(), ConnectionError> {
61 Ok(())
62 }
63
64 fn reconnect(&self) -> Result<(), ConnectionError> {
65 Ok(())
66 }
67}
68
69mock! {
70 pub DbQuery {
71
72 }
73
74 impl ReadOnly for DbQuery {
75 fn enable_readonly(&self) -> Result<(), StorageError>;
76 fn disable_readonly(&self) -> Result<(), StorageError>;
77 }
78
79 impl QueryConsentRecord for DbQuery {
80 fn get_consent_record(
81 &self,
82 entity: String,
83 entity_type: crate::consent_record::ConsentType,
84 ) -> Result<Option<crate::consent_record::StoredConsentRecord>, crate::ConnectionError>;
85
86 fn consent_records(
87 &self,
88 ) -> Result<Vec<crate::consent_record::StoredConsentRecord>, crate::ConnectionError>;
89
90 fn consent_records_paged(
91 &self,
92 limit: i64,
93 offset: i64,
94 ) -> Result<Vec<crate::consent_record::StoredConsentRecord>, crate::ConnectionError>;
95
96 fn insert_newer_consent_record(
97 &self,
98 record: crate::consent_record::StoredConsentRecord,
99 ) -> Result<bool, crate::ConnectionError>;
100
101 fn insert_or_replace_consent_records(
102 &self,
103 records: &[crate::consent_record::StoredConsentRecord],
104 ) -> Result<Vec<crate::consent_record::StoredConsentRecord>, crate::ConnectionError>;
105
106 fn maybe_insert_consent_record_return_existing(
107 &self,
108 record: &crate::consent_record::StoredConsentRecord,
109 ) -> Result<Option<crate::consent_record::StoredConsentRecord>, crate::ConnectionError>;
110
111 fn find_consent_by_dm_id(
112 &self,
113 dm_id: &str,
114 ) -> Result<Vec<crate::consent_record::StoredConsentRecord>, crate::ConnectionError>;
115 }
116
117 impl QueryConversationList for DbQuery {
118 #[mockall::concretize]
119 fn fetch_conversation_list<A: AsRef<crate::group::GroupQueryArgs>>(
120 &self,
121 args: A,
122 ) -> Result<Vec<crate::conversation_list::ConversationListItem>, StorageError>;
123 }
124
125 impl QueryDms for DbQuery {
126 fn fetch_stitched(
127 &self,
128 key: &[u8],
129 ) -> Result<Option<crate::group::StoredGroup>, ConnectionError>;
130
131 #[mockall::concretize]
132 fn find_active_dm_group<M>(
133 &self,
134 members: M,
135 ) -> Result<Option<crate::group::StoredGroup>, ConnectionError>
136 where
137 M: std::fmt::Display;
138
139 fn other_dms(&self, group_id: &[u8])
140 -> Result<Vec<crate::group::StoredGroup>, ConnectionError>;
141 }
142
143 impl QueryGroup for DbQuery {
144 #[mockall::concretize]
145 fn find_groups<A: AsRef<crate::group::GroupQueryArgs>>(
146 &self,
147 args: A,
148 ) -> Result<Vec<crate::group::StoredGroup>, crate::ConnectionError>;
149
150 #[mockall::concretize]
151 fn find_groups_by_id_paged<A: AsRef<crate::group::GroupQueryArgs>>(
152 &self,
153 args: A,
154 offset: i64,
155 ) -> Result<Vec<crate::group::StoredGroup>, crate::ConnectionError>;
156
157 #[mockall::concretize]
158 fn update_group_membership<GroupId: AsRef<[u8]>>(
159 &self,
160 group_id: GroupId,
161 state: crate::group::GroupMembershipState,
162 ) -> Result<(), crate::ConnectionError>;
163
164 fn all_sync_groups(&self) -> Result<Vec<crate::group::StoredGroup>, crate::ConnectionError>;
165
166 fn find_sync_group(
167 &self,
168 id: &[u8],
169 ) -> Result<Option<crate::group::StoredGroup>, crate::ConnectionError>;
170
171 fn primary_sync_group(
172 &self,
173 ) -> Result<Option<crate::group::StoredGroup>, crate::ConnectionError>;
174
175 fn find_group(
176 &self,
177 id: &[u8],
178 ) -> Result<Option<crate::group::StoredGroup>, crate::ConnectionError>;
179
180 fn find_group_by_sequence_id(
181 &self,
182 cursor: Cursor,
183 ) -> Result<Option<crate::group::StoredGroup>, crate::ConnectionError>;
184
185 fn get_rotated_at_ns(&self, group_id: Vec<u8>) -> Result<i64, StorageError>;
186
187 fn update_rotated_at_ns(&self, group_id: Vec<u8>) -> Result<(), StorageError>;
188
189 fn get_installations_time_checked(&self, group_id: Vec<u8>) -> Result<i64, StorageError>;
190
191 fn update_installations_time_checked(&self, group_id: Vec<u8>) -> Result<(), StorageError>;
192
193 fn update_message_disappearing_from_ns(
194 &self,
195 group_id: Vec<u8>,
196 from_ns: Option<i64>,
197 ) -> Result<(), StorageError>;
198
199 fn update_message_disappearing_in_ns(
200 &self,
201 group_id: Vec<u8>,
202 in_ns: Option<i64>,
203 ) -> Result<(), StorageError>;
204
205 fn insert_or_replace_group(
206 &self,
207 group: crate::group::StoredGroup,
208 ) -> Result<crate::group::StoredGroup, StorageError>;
209
210 fn group_cursors(&self) -> Result<Vec<Cursor>, crate::ConnectionError>;
211
212 fn mark_group_as_maybe_forked(
213 &self,
214 group_id: &[u8],
215 fork_details: String,
216 ) -> Result<(), StorageError>;
217
218 fn clear_fork_flag_for_group(&self, group_id: &[u8]) -> Result<(), crate::ConnectionError>;
219
220 fn has_duplicate_dm(&self, group_id: &[u8]) -> Result<bool, crate::ConnectionError>;
221
222 fn get_conversation_ids_for_remote_log_publish(&self) -> Result<Vec<StoredGroupCommitLogPublicKey>, crate::ConnectionError>;
223
224 fn get_conversation_ids_for_remote_log_download(&self) -> Result<Vec<StoredGroupCommitLogPublicKey>, crate::ConnectionError>;
225
226 fn get_conversation_ids_for_fork_check(
227 &self,
228 ) -> Result<Vec<Vec<u8>>, crate::ConnectionError>;
229
230 fn get_conversation_ids_for_requesting_readds(
231 &self,
232 ) -> Result<Vec<crate::encrypted_store::group::StoredGroupForReaddRequest>, crate::ConnectionError>;
233
234 fn get_conversation_ids_for_responding_readds(
235 &self,
236 ) -> Result<Vec<crate::encrypted_store::group::StoredGroupForRespondingReadds>, crate::ConnectionError>;
237
238 fn get_conversation_type(&self, group_id: &[u8]) -> Result<ConversationType, crate::ConnectionError>;
239
240 fn set_group_commit_log_public_key(
241 &self,
242 group_id: &[u8],
243 public_key: &[u8],
244 ) -> Result<(), StorageError>;
245
246 fn set_group_commit_log_forked_status(
247 &self,
248 group_id: &[u8],
249 is_forked: Option<bool>,
250 ) -> Result<(), StorageError>;
251
252 fn get_group_commit_log_forked_status(
253 &self,
254 group_id: &[u8],
255 ) -> Result<Option<bool>, StorageError>;
256
257 fn set_group_has_pending_leave_request_status(
258 &self,
259 group_id: &[u8],
260 has_pending_leave_request: Option<bool>,
261 ) -> Result<(), StorageError>;
262 fn get_groups_have_pending_leave_request(
263 &self,
264 ) -> Result<Vec<Vec<u8>>, crate::ConnectionError>;
265 }
266
267 impl QueryGroupVersion for DbQuery {
268 fn set_group_paused(&self, group_id: &[u8], min_version: &str) -> Result<(), StorageError>;
269
270 fn unpause_group(&self, group_id: &[u8]) -> Result<(), StorageError>;
271
272 fn get_group_paused_version(&self, group_id: &[u8]) -> Result<Option<String>, StorageError>;
273 }
274
275 impl QueryGroupIntent for DbQuery {
276 fn insert_group_intent(
277 &self,
278 to_save: crate::group_intent::NewGroupIntent,
279 ) -> Result<crate::group_intent::StoredGroupIntent, crate::ConnectionError>;
280
281 #[mockall::concretize]
282 fn find_group_intents<Id: AsRef<[u8]>>(
283 &self,
284 group_id: Id,
285 allowed_states: Option<Vec<crate::group_intent::IntentState>>,
286 allowed_kinds: Option<Vec<crate::group_intent::IntentKind>>,
287 ) -> Result<Vec<crate::group_intent::StoredGroupIntent>, crate::ConnectionError>;
288
289 fn set_group_intent_published(
290 &self,
291 intent_id: crate::group_intent::ID,
292 payload_hash: &[u8],
293 post_commit_data: Option<Vec<u8>>,
294 staged_commit: Option<Vec<u8>>,
295 published_in_epoch: i64,
296 ) -> Result<(), StorageError>;
297
298 fn set_group_intent_committed(
299 &self,
300 intent_id: crate::group_intent::ID,
301 cursor: Cursor,
302 ) -> Result<(), StorageError>;
303
304 fn set_group_intent_processed(
305 &self,
306 intent_id: crate::group_intent::ID,
307 ) -> Result<(), StorageError>;
308
309 fn set_group_intent_to_publish(
310 &self,
311 intent_id: crate::group_intent::ID,
312 ) -> Result<(), StorageError>;
313
314 fn set_group_intent_error(
315 &self,
316 intent_id: crate::group_intent::ID,
317 ) -> Result<(), StorageError>;
318
319 fn find_group_intent_by_payload_hash(
320 &self,
321 payload_hash: &[u8],
322 ) -> Result<Option<crate::group_intent::StoredGroupIntent>, StorageError>;
323
324 #[mockall::concretize]
325 fn find_dependant_commits<P: AsRef<[u8]>>(
326 &self,
327 payload_hashes: &[P],
328 ) -> Result<HashMap<crate::group_intent::PayloadHash, crate::group_intent::IntentDependency>, StorageError>;
329
330 fn increment_intent_publish_attempt_count(
331 &self,
332 intent_id: crate::group_intent::ID,
333 ) -> Result<(), StorageError>;
334
335 fn set_group_intent_error_and_fail_msg(
336 &self,
337 intent: &crate::group_intent::StoredGroupIntent,
338 msg_id: Option<Vec<u8>>,
339 ) -> Result<(), StorageError>;
340 }
341
342 impl QueryReaddStatus for DbQuery {
343 fn get_readd_status(
344 &self,
345 group_id: &[u8],
346 installation_id: &[u8],
347 ) -> Result<Option<crate::readd_status::ReaddStatus>, crate::ConnectionError>;
348
349 fn is_awaiting_readd(
350 &self,
351 group_id: &[u8],
352 installation_id: &[u8],
353 ) -> Result<bool, crate::ConnectionError>;
354
355 fn update_requested_at_sequence_id(
356 &self,
357 group_id: &[u8],
358 installation_id: &[u8],
359 sequence_id: i64,
360 ) -> Result<(), crate::ConnectionError>;
361
362 fn update_responded_at_sequence_id(
363 &self,
364 group_id: &[u8],
365 installation_id: &[u8],
366 sequence_id: i64,
367 ) -> Result<(), crate::ConnectionError>;
368
369 fn delete_other_readd_statuses(
370 &self,
371 group_id: &[u8],
372 self_installation_id: &[u8],
373 ) -> Result<(), crate::ConnectionError>;
374
375 fn delete_readd_statuses(
376 &self,
377 group_id: &[u8],
378 installation_ids: std::collections::HashSet<Vec<u8> > ,
379 ) -> Result<(), crate::ConnectionError>;
380
381 fn get_readds_awaiting_response(
382 &self,
383 group_id: &[u8],
384 self_installation_id: &[u8],
385 ) -> Result<Vec<crate::readd_status::ReaddStatus>, crate::ConnectionError>;
386 }
387
388 impl QueryGroupMessage for DbQuery {
389 fn get_group_messages(
390 &self,
391 group_id: &[u8],
392 args: &crate::group_message::MsgQueryArgs,
393 ) -> Result<Vec<crate::group_message::StoredGroupMessage>, crate::ConnectionError>;
394
395 fn count_group_messages(
396 &self,
397 group_id: &[u8],
398 args: &crate::group_message::MsgQueryArgs,
399 ) -> Result<i64, crate::ConnectionError>;
400
401 fn group_messages_paged(
402 &self,
403 args: &crate::group_message::MsgQueryArgs,
404 offset: i64,
405 ) -> Result<Vec<crate::group_message::StoredGroupMessage>, crate::ConnectionError>;
406
407 fn get_group_messages_with_reactions(
408 &self,
409 group_id: &[u8],
410 args: &crate::group_message::MsgQueryArgs,
411 ) -> Result<Vec<crate::group_message::StoredGroupMessageWithReactions>, crate::ConnectionError>;
412
413 fn get_inbound_relations<'a>(
414 &self,
415 group_id: &'a [u8],
416 message_ids: &'a [&'a [u8]],
417 relation_query: crate::group_message::RelationQuery,
418 ) -> Result<crate::group_message::InboundRelations, crate::ConnectionError>;
419
420 fn get_outbound_relations<'a>(
421 &self,
422 group_id: &'a [u8],
423 message_ids: &'a [&'a [u8]],
424 ) -> Result<crate::group_message::OutboundRelations, crate::ConnectionError>;
425
426 fn get_inbound_relation_counts<'a>(
427 &self,
428 group_id: &'a [u8],
429 message_ids: &'a [&'a [u8]],
430 relation_query: crate::group_message::RelationQuery,
431 ) -> Result<crate::group_message::RelationCounts, crate::ConnectionError>;
432
433 #[mockall::concretize]
434 fn get_group_message<MessageId: AsRef<[u8]>>(
435 &self,
436 id: MessageId,
437 ) -> Result<Option<crate::group_message::StoredGroupMessage>, crate::ConnectionError>;
438
439 #[mockall::concretize]
440 fn write_conn_get_group_message<MessageId: AsRef<[u8]>>(
441 &self,
442 id: MessageId,
443 ) -> Result<Option<crate::group_message::StoredGroupMessage>, crate::ConnectionError>;
444
445 #[mockall::concretize]
446 fn get_group_message_by_timestamp<GroupId: AsRef<[u8]>>(
447 &self,
448 group_id: GroupId,
449 timestamp: i64,
450 ) -> Result<Option<crate::group_message::StoredGroupMessage>, crate::ConnectionError>;
451
452 #[mockall::concretize]
453 fn get_group_message_by_cursor<GroupId: AsRef<[u8]>>(
454 &self,
455 group_id: GroupId,
456 sequence_id: Cursor,
457 ) -> Result<Option<crate::group_message::StoredGroupMessage>, crate::ConnectionError>;
458
459 #[mockall::concretize]
460 fn set_delivery_status_to_published<MessageId: AsRef<[u8]>>(
461 &self,
462 msg_id: &MessageId,
463 timestamp: u64,
464 cursor: Cursor,
465 message_expire_at_ns: Option<i64>
466 ) -> Result<usize, crate::ConnectionError>;
467
468 #[mockall::concretize]
469 fn set_delivery_status_to_failed<MessageId: AsRef<[u8]>>(
470 &self,
471 msg_id: &MessageId,
472 ) -> Result<usize, crate::ConnectionError>;
473
474 fn delete_expired_messages(&self) -> Result<Vec<StoredGroupMessage>, crate::ConnectionError>;
475
476 #[mockall::concretize]
477 fn delete_message_by_id<MessageId: AsRef<[u8]>>(
478 &self,
479 message_id: MessageId,
480 ) -> Result<usize, crate::ConnectionError>;
481
482 #[mockall::concretize]
483 fn get_latest_message_times_by_sender<GroupId: AsRef<[u8]>>(
484 &self,
485 group_id: GroupId,
486 allowed_content_types: &[crate::group_message::ContentType],
487 ) -> Result<crate::group_message::LatestMessageTimeBySender, crate::ConnectionError>;
488
489 fn messages_newer_than(
490 &self,
491 cursors_by_group: &HashMap<Vec<u8>, xmtp_proto::types::GlobalCursor>,
492 ) -> Result<Vec<Cursor>, crate::ConnectionError>;
493
494 fn clear_messages<'a>(
495 &self,
496 group_ids: Option<&'a [Vec<u8>]>,
497 retention_days: Option<u32>,
498 ) -> Result<usize, crate::ConnectionError>;
499 }
500
501 impl QueryIdentity for DbQuery {
502 fn queue_key_package_rotation(&self) -> Result<(), StorageError>;
503
504 fn reset_key_package_rotation_queue(&self, rotation_interval: i64) -> Result<(), StorageError>;
505
506 fn is_identity_needs_rotation(&self) -> Result<bool, StorageError>;
507 }
508
509 impl QueryIdentityCache for DbQuery {
510 #[mockall::concretize]
511 fn fetch_cached_inbox_ids<T>(
512 &self,
513 identifiers: &[T],
514 ) -> Result<std::collections::HashMap<String, String>, StorageError>
515 where
516 T: std::fmt::Display,
517 for<'a> &'a T: Into<crate::identity_cache::StoredIdentityKind>;
518
519 #[mockall::concretize]
520 fn cache_inbox_id<T, S>(
521 &self,
522 identifier: &T,
523 inbox_id: S,
524 ) -> Result<(), StorageError>
525 where
526 T: std::fmt::Display,
527 S: ToString,
528 for<'a> &'a T: Into<crate::identity_cache::StoredIdentityKind>;
529 }
530
531 impl QueryKeyPackageHistory for DbQuery {
532 fn store_key_package_history_entry(
533 &self,
534 key_package_hash_ref: Vec<u8>,
535 post_quantum_public_key: Option<Vec<u8>>,
536 ) -> Result<crate::key_package_history::StoredKeyPackageHistoryEntry, StorageError>;
537
538 fn find_key_package_history_entry_by_hash_ref(
539 &self,
540 hash_ref: Vec<u8>,
541 ) -> Result<crate::key_package_history::StoredKeyPackageHistoryEntry, StorageError>;
542
543 fn find_key_package_history_entries_before_id(
544 &self,
545 id: i32,
546 ) -> Result<Vec<crate::key_package_history::StoredKeyPackageHistoryEntry>, StorageError>;
547
548 fn mark_key_package_before_id_to_be_deleted(&self, id: i32) -> Result<(), StorageError>;
549
550 fn get_expired_key_packages(
551 &self,
552 ) -> Result<Vec<crate::key_package_history::StoredKeyPackageHistoryEntry>, StorageError>;
553
554 fn delete_key_package_history_up_to_id(&self, id: i32) -> Result<(), StorageError>;
555
556 fn delete_key_package_entry_with_id(&self, id: i32) -> Result<(), StorageError>;
557 }
558
559 impl QueryKeyStoreEntry for DbQuery {
560 fn insert_or_update_key_store_entry(
561 &self,
562 key: Vec<u8>,
563 value: Vec<u8>,
564 ) -> Result<(), StorageError>;
565 }
566
567 impl QueryDeviceSyncMessages for DbQuery {
568 fn unprocessed_sync_group_messages(
569 &self,
570 ) -> Result<Vec<crate::group_message::StoredGroupMessage>, StorageError>;
571
572 fn sync_group_messages_paged(
573 &self,
574 offset: i64,
575 limit: i64,
576 ) -> Result<Vec<crate::group_message::StoredGroupMessage>, StorageError>;
577
578 fn mark_device_sync_msg_as_processed(
579 &self,
580 message_id: &[u8],
581 ) -> Result<(), StorageError>;
582
583 fn increment_device_sync_msg_attempt(
584 &self,
585 message_id: &[u8],
586 max_attempts: i32,
587 ) -> Result<i32, StorageError>;
588 }
589
590 impl QueryRefreshState for DbQuery {
591 #[mockall::concretize]
592 fn get_refresh_state<EntityId: AsRef<[u8]>>(
593 &self,
594 entity_id: EntityId,
595 entity_kind: crate::refresh_state::EntityKind,
596 originator_id: u32,
597 ) -> Result<Option<crate::refresh_state::RefreshState>, StorageError>;
598
599 #[mockall::concretize]
600 fn get_last_cursor_for_originators<Id: AsRef<[u8]>>(
601 &self,
602 id: Id,
603 entity_kind: crate::refresh_state::EntityKind,
604 originator_id: &[u32]
605 ) -> Result<Vec<Cursor>, StorageError>;
606
607 #[mockall::concretize]
608 fn get_last_cursor_for_ids<Id: AsRef<[u8]>>(
609 &self,
610 ids: &[Id],
611 entities: &[crate::refresh_state::EntityKind],
612 ) -> Result<std::collections::HashMap<Vec<u8>, GlobalCursor>, StorageError>;
613
614 #[mockall::concretize]
615 fn update_cursor<Id: AsRef<[u8]>>(
616 &self,
617 entity_id: Id,
618 entity_kind: crate::refresh_state::EntityKind,
619 cursor: xmtp_proto::types::Cursor
620 ) -> Result<bool, StorageError>;
621
622 #[mockall::concretize]
623 fn get_remote_log_cursors(
624 &self,
625 conversation_ids: &[&Vec<u8>],
626 ) -> Result<HashMap<Vec<u8>, Cursor>, crate::ConnectionError>;
627
628 #[mockall::concretize]
629 fn lowest_common_cursor(&self, topics: &[&Topic]) -> Result<GlobalCursor, StorageError>;
630
631 #[mockall::concretize]
632 fn latest_cursor_for_id<Id: AsRef<[u8]>>(
633 &self,
634 entity: Id,
635 entities: &[crate::refresh_state::EntityKind],
636 originators: Option<&[&xmtp_proto::types::OriginatorId]>
637 ) -> Result<xmtp_proto::types::GlobalCursor, StorageError>;
638
639 #[mockall::concretize]
640 fn latest_cursor_combined<Id: AsRef<[u8]>>(
641 &self,
642 entity_id: Id,
643 entities: &[crate::refresh_state::EntityKind],
644 originators: Option<&[&xmtp_proto::types::OriginatorId]>,
645 ) -> Result<GlobalCursor, StorageError>;
646
647 #[mockall::concretize]
648 fn lowest_common_cursor_combined(&self, topics: &[&Topic]) -> Result<GlobalCursor, StorageError>;
649 }
650
651 impl QueryIdentityUpdates for DbQuery {
652 #[mockall::concretize]
653 fn get_identity_updates<InboxId: AsRef<str>>(
654 &self,
655 inbox_id: InboxId,
656 from_sequence_id: Option<i64>,
657 to_sequence_id: Option<i64>,
658 ) -> Result<Vec<crate::identity_update::StoredIdentityUpdate>, crate::ConnectionError>;
659
660 fn insert_or_ignore_identity_updates(
661 &self,
662 updates: &[crate::identity_update::StoredIdentityUpdate],
663 ) -> Result<(), crate::ConnectionError>;
664
665 fn get_latest_sequence_id_for_inbox(
666 &self,
667 inbox_id: &str,
668 ) -> Result<i64, crate::ConnectionError>;
669
670 fn get_latest_sequence_id<'a>(
671 &'a self,
672 inbox_ids: &'a [&'a str],
673 ) -> Result<std::collections::HashMap<String, i64>, crate::ConnectionError>;
674
675 fn count_inbox_updates<'a>(
676 &'a self,
677 inbox_ids: &'a [&'a str],
678 ) -> Result<std::collections::HashMap<String, i64>, crate::ConnectionError>;
679 }
680
681 impl QueryLocalCommitLog for DbQuery {
682 fn get_group_logs(
683 &self,
684 group_id: &[u8],
685 ) -> Result<Vec<LocalCommitLog>, crate::ConnectionError>;
686
687 fn get_local_commit_log_after_cursor(
690 &self,
691 group_id: &[u8],
692 after_cursor: i64,
693 order_by: LocalCommitLogOrder,
694 ) -> Result<Vec<LocalCommitLog>, crate::ConnectionError>;
695
696 fn get_latest_log_for_group(
697 &self,
698 group_id: &[u8],
699 ) -> Result<Option<LocalCommitLog>, crate::ConnectionError>;
700
701 fn get_local_commit_log_cursor(
702 &self,
703 group_id: &[u8],
704 ) -> Result<Option<i32>, crate::ConnectionError>;
705 }
706
707 impl QueryRemoteCommitLog for DbQuery {
708 fn get_latest_remote_log_for_group(&self, group_id: &[u8]) -> Result<Option<RemoteCommitLog>, crate::ConnectionError>;
709
710 fn get_remote_commit_log_after_cursor(
711 &self,
712 group_id: &[u8],
713 after_cursor: i64,
714 order_by: RemoteCommitLogOrder,
715 ) -> Result<Vec<RemoteCommitLog>, crate::ConnectionError>;
716
717 }
718
719 impl QueryAssociationStateCache for DbQuery {
720 fn write_to_cache(
721 &self,
722 inbox_id: String,
723 sequence_id: i64,
724 state: AssociationStateProto,
725 ) -> Result<(), StorageError>;
726
727 #[mockall::concretize]
728 fn read_from_cache<A: AsRef<str>>(
729 &self,
730 inbox_id: A,
731 sequence_id: i64,
732 ) -> Result<Option<AssociationStateProto>, StorageError>;
733
734
735 #[mockall::concretize]
736 fn batch_read_from_cache(
737 &self,
738 identifiers: Vec<(String, i64)>,
739 ) -> Result<Vec<AssociationStateProto>, StorageError>;
740 }
741
742 impl QueryTasks for DbQuery {
743 fn create_task(&self, task: crate::tasks::NewTask) -> Result<crate::tasks::Task, StorageError>;
744
745 fn get_tasks(&self) -> Result<Vec<crate::tasks::Task>, StorageError>;
746
747 fn get_next_task(&self) -> Result<Option<crate::tasks::Task>, StorageError>;
748
749 fn update_task(
750 &self,
751 id: i32,
752 attempts: i32,
753 last_attempted_at_ns: i64,
754 next_attempt_at_ns: i64,
755 ) -> Result<crate::tasks::Task, StorageError>;
756
757 fn delete_task(&self, id: i32) -> Result<bool, StorageError>;
758 }
759
760 impl Pragmas for DbQuery {
761 fn busy_timeout(
762 &self,
763 ) -> Result<i32, crate::ConnectionError>;
764 #[mockall::concretize]
765 fn set_sqlcipher_log<S: AsRef<str>>(
766 &self,
767 level: S
768 ) -> Result<(), crate::ConnectionError>;
769 }
770
771 impl QueryPendingRemove for DbQuery{
772 fn get_pending_remove_users(
773 &self,
774 group_id: &[u8],
775 ) -> Result<Vec<String>, crate::ConnectionError>;
776 fn delete_pending_remove_users(
777 &self,
778 group_id: &[u8],
779 inbox_ids: Vec<String>,
780 ) -> Result<usize, crate::ConnectionError>;
781 fn get_user_pending_remove_status(&self,
782 group_id: &[u8],
783 inbox_id: &str,
784 ) -> Result<bool, crate::ConnectionError>;
785 }
786
787 impl QueryIcebox for DbQuery {
788 fn past_dependents(
789 &self,
790 cursors: &[xmtp_proto::types::Cursor],
791 ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError>;
792
793 fn future_dependents(
794 &self,
795 cursors: &[xmtp_proto::types::Cursor],
796 ) -> Result<Vec<OrphanedEnvelope>, crate::ConnectionError>;
797
798 fn ice(
799 &self,
800 orphans: Vec<OrphanedEnvelope>,
801 ) -> Result<usize, crate::ConnectionError>;
802
803 fn prune_icebox(&self) -> Result<usize, crate::ConnectionError>;
804 }
805
806 impl crate::migrations::QueryMigrations for DbQuery {
807 fn applied_migrations(&self) -> Result<Vec<String>, crate::ConnectionError>;
808
809 fn available_migrations(&self) -> Result<Vec<String>, crate::ConnectionError>;
810
811 fn rollback_to_version<'a>(
812 &self,
813 version: &'a str,
814 ) -> Result<Vec<String>, crate::ConnectionError>;
815
816 fn run_migration<'a>(
817 &self,
818 name: &'a str,
819 ) -> Result<(), crate::ConnectionError>;
820
821 fn revert_migration<'a>(
822 &self,
823 name: &'a str,
824 ) -> Result<(), crate::ConnectionError>;
825
826 fn run_pending_migrations(&self) -> Result<Vec<String>, crate::ConnectionError>;
827 }
828 impl crate::message_deletion::QueryMessageDeletion for DbQuery {
829 fn get_message_deletion(
830 &self,
831 _id: &[u8],
832 ) -> Result<Option<crate::message_deletion::StoredMessageDeletion>, crate::ConnectionError>;
833
834 fn get_deletion_by_deleted_message_id(
835 &self,
836 _deleted_message_id: &[u8],
837 ) -> Result<Option<crate::message_deletion::StoredMessageDeletion>, crate::ConnectionError>;
838
839 fn get_deletions_for_messages(
840 &self,
841 _message_ids: Vec<Vec<u8>>,
842 ) -> Result<Vec<crate::message_deletion::StoredMessageDeletion>, crate::ConnectionError>;
843
844 fn get_group_deletions(
845 &self,
846 _group_id: &[u8],
847 ) -> Result<Vec<crate::message_deletion::StoredMessageDeletion>, crate::ConnectionError>;
848
849 fn is_message_deleted(
850 &self,
851 _message_id: &[u8],
852 ) -> Result<bool, crate::ConnectionError>;
853 }
854
855}
856
857impl ConnectionExt for MockDbQuery {
858 fn raw_query_read<T, F>(&self, _fun: F) -> Result<T, crate::ConnectionError>
859 where
860 F: FnOnce(&mut SqliteConnection) -> Result<T, diesel::result::Error>,
861 Self: Sized,
862 {
863 todo!()
864 }
865
866 fn raw_query_write<T, F>(&self, _fun: F) -> Result<T, crate::ConnectionError>
867 where
868 F: FnOnce(&mut SqliteConnection) -> Result<T, diesel::result::Error>,
869 Self: Sized,
870 {
871 tracing::warn!("unhandled mock raw_query_write");
873 unsafe {
874 let uninit = std::mem::MaybeUninit::<T>::uninit();
875 Ok(uninit.assume_init())
876 }
877 }
878
879 fn disconnect(&self) -> Result<(), ConnectionError> {
880 todo!()
881 }
882
883 fn reconnect(&self) -> Result<(), ConnectionError> {
884 todo!()
885 }
886}
887
888impl IntoConnection for MockDbQuery {
889 type Connection = MockConnection;
890
891 fn into_connection(self) -> Self::Connection {
892 todo!()
893 }
894}