xmtp_api_d14n/queries/v3/
xmtp_query.rs

1use crate::{
2    V3Client,
3    protocol::{CursorStore, XmtpEnvelope, XmtpQuery},
4    v3::{FetchKeyPackages, GetIdentityUpdatesV2, QueryGroupMessages, QueryWelcomeMessages},
5};
6use xmtp_common::RetryableError;
7use xmtp_configuration::MAX_PAGE_SIZE;
8use xmtp_proto::identity_v1::{
9    get_identity_updates_request, get_identity_updates_response::IdentityUpdateLog,
10};
11use xmtp_proto::{
12    api::{ApiClientError, Client, EndpointExt, Query},
13    mls_v1::{PagingInfo, SortDirection},
14    types::{GlobalCursor, Topic, TopicKind},
15};
16
17#[xmtp_common::async_trait]
18impl<C, Store, E> XmtpQuery for V3Client<C, Store>
19where
20    C: Client<Error = E>,
21    E: RetryableError + 'static,
22    ApiClientError<E>: From<ApiClientError<<C as Client>::Error>>,
23    Store: CursorStore,
24{
25    type Error = ApiClientError<E>;
26    async fn query_at(
27        &self,
28        topic: Topic,
29        at: Option<GlobalCursor>,
30    ) -> Result<XmtpEnvelope, Self::Error> {
31        use TopicKind::*;
32        match topic.kind() {
33            GroupMessagesV1 => {
34                let id_cursor = at.map(|c| c.v3_message()).unwrap_or(0);
35                let result = QueryGroupMessages::builder()
36                    .group_id(topic.identifier())
37                    .paging_info(PagingInfo {
38                        direction: SortDirection::Ascending as i32,
39                        limit: MAX_PAGE_SIZE,
40                        id_cursor,
41                    })
42                    .build()?
43                    .v3_paged(Some(id_cursor))
44                    .query(&self.client)
45                    .await?;
46                Ok(XmtpEnvelope::new(result))
47            }
48            WelcomeMessagesV1 => {
49                let id_cursor = at.map(|c| c.v3_welcome()).unwrap_or(0);
50                let result = QueryWelcomeMessages::builder()
51                    .installation_key(topic.identifier())
52                    .paging_info(PagingInfo {
53                        direction: SortDirection::Ascending as i32,
54                        limit: MAX_PAGE_SIZE,
55                        id_cursor,
56                    })
57                    .build()?
58                    .v3_paged(Some(id_cursor))
59                    .query(&self.client)
60                    .await?;
61                Ok(XmtpEnvelope::new(result))
62            }
63            IdentityUpdatesV1 => {
64                let result = GetIdentityUpdatesV2::builder()
65                    .request(get_identity_updates_request::Request {
66                        inbox_id: hex::encode(topic.identifier()),
67                        sequence_id: at.map(|c| c.inbox_log()).unwrap_or(0),
68                    })
69                    .build()?
70                    .query(&self.client)
71                    .await?;
72                let updates: Vec<IdentityUpdateLog> = result
73                    .responses
74                    .into_iter()
75                    .flat_map(|r| r.updates)
76                    .collect();
77                Ok(XmtpEnvelope::new(updates))
78            }
79            KeyPackagesV1 => {
80                let result = FetchKeyPackages::builder()
81                    .installation_key(topic.identifier().to_vec())
82                    .build()?
83                    .query(&self.client)
84                    .await?;
85                Ok(XmtpEnvelope::new(result.key_packages))
86            }
87            _ => unreachable!(),
88        }
89    }
90}