xmtp_api_d14n/queries/v3/
xmtp_query.rs1use crate::{
2 V3Client,
3 protocol::{CursorStore, XmtpEnvelope, XmtpQuery},
4 v3::{FetchKeyPackages, GetIdentityUpdatesV2, QueryGroupMessages, QueryWelcomeMessages},
5};
6use xmtp_common::RetryableError;
7use xmtp_configuration::MAX_PAGE_SIZE;
8use xmtp_proto::identity_v1::{
9 get_identity_updates_request, get_identity_updates_response::IdentityUpdateLog,
10};
11use xmtp_proto::{
12 api::{ApiClientError, Client, EndpointExt, Query},
13 mls_v1::{PagingInfo, SortDirection},
14 types::{GlobalCursor, Topic, TopicKind},
15};
16
17#[xmtp_common::async_trait]
18impl<C, Store, E> XmtpQuery for V3Client<C, Store>
19where
20 C: Client<Error = E>,
21 E: RetryableError + 'static,
22 ApiClientError<E>: From<ApiClientError<<C as Client>::Error>>,
23 Store: CursorStore,
24{
25 type Error = ApiClientError<E>;
26 async fn query_at(
27 &self,
28 topic: Topic,
29 at: Option<GlobalCursor>,
30 ) -> Result<XmtpEnvelope, Self::Error> {
31 use TopicKind::*;
32 match topic.kind() {
33 GroupMessagesV1 => {
34 let id_cursor = at.map(|c| c.v3_message()).unwrap_or(0);
35 let result = QueryGroupMessages::builder()
36 .group_id(topic.identifier())
37 .paging_info(PagingInfo {
38 direction: SortDirection::Ascending as i32,
39 limit: MAX_PAGE_SIZE,
40 id_cursor,
41 })
42 .build()?
43 .v3_paged(Some(id_cursor))
44 .query(&self.client)
45 .await?;
46 Ok(XmtpEnvelope::new(result))
47 }
48 WelcomeMessagesV1 => {
49 let id_cursor = at.map(|c| c.v3_welcome()).unwrap_or(0);
50 let result = QueryWelcomeMessages::builder()
51 .installation_key(topic.identifier())
52 .paging_info(PagingInfo {
53 direction: SortDirection::Ascending as i32,
54 limit: MAX_PAGE_SIZE,
55 id_cursor,
56 })
57 .build()?
58 .v3_paged(Some(id_cursor))
59 .query(&self.client)
60 .await?;
61 Ok(XmtpEnvelope::new(result))
62 }
63 IdentityUpdatesV1 => {
64 let result = GetIdentityUpdatesV2::builder()
65 .request(get_identity_updates_request::Request {
66 inbox_id: hex::encode(topic.identifier()),
67 sequence_id: at.map(|c| c.inbox_log()).unwrap_or(0),
68 })
69 .build()?
70 .query(&self.client)
71 .await?;
72 let updates: Vec<IdentityUpdateLog> = result
73 .responses
74 .into_iter()
75 .flat_map(|r| r.updates)
76 .collect();
77 Ok(XmtpEnvelope::new(updates))
78 }
79 KeyPackagesV1 => {
80 let result = FetchKeyPackages::builder()
81 .installation_key(topic.identifier().to_vec())
82 .build()?
83 .query(&self.client)
84 .await?;
85 Ok(XmtpEnvelope::new(result.key_packages))
86 }
87 _ => unreachable!(),
88 }
89 }
90}