xmtp_api_d14n/queries/v3/
mls.rs

1use crate::protocol::{
2    CollectionExtractor, CursorStore, MessageMetadataExtractor, ProtocolEnvelope,
3    V3WelcomeMessageExtractor,
4};
5use crate::protocol::{SequencedExtractor, V3GroupMessageExtractor, traits::Extractor};
6use crate::{V3Client, v3::*};
7use xmtp_common::RetryableError;
8use xmtp_configuration::{MAX_PAGE_SIZE, Originators};
9use xmtp_proto::api::{self, ApiClientError, Client, Query};
10use xmtp_proto::api_client::XmtpMlsClient;
11use xmtp_proto::mls_v1::{self, GroupMessage as ProtoGroupMessage, PagingInfo, SortDirection};
12use xmtp_proto::types::{GroupId, GroupMessageMetadata, InstallationId, TopicKind, WelcomeMessage};
13
14#[xmtp_common::async_trait]
15impl<C, Store, E> XmtpMlsClient for V3Client<C, Store>
16where
17    E: RetryableError + 'static,
18    C: Client<Error = E>,
19    ApiClientError<E>: From<ApiClientError<<C as Client>::Error>> + 'static,
20    Store: CursorStore,
21{
22    type Error = ApiClientError<E>;
23
24    async fn upload_key_package(
25        &self,
26        request: mls_v1::UploadKeyPackageRequest,
27    ) -> Result<(), Self::Error> {
28        UploadKeyPackage::builder()
29            .key_package(request.key_package)
30            .is_inbox_id_credential(request.is_inbox_id_credential)
31            .build()?
32            .query(&self.client)
33            .await
34    }
35    async fn fetch_key_packages(
36        &self,
37        request: mls_v1::FetchKeyPackagesRequest,
38    ) -> Result<mls_v1::FetchKeyPackagesResponse, Self::Error> {
39        FetchKeyPackages::builder()
40            .installation_keys(request.installation_keys)
41            .build()?
42            .query(&self.client)
43            .await
44    }
45    async fn send_group_messages(
46        &self,
47        request: mls_v1::SendGroupMessagesRequest,
48    ) -> Result<(), Self::Error> {
49        SendGroupMessages::builder()
50            .messages(request.messages)
51            .build()?
52            .query(&self.client)
53            .await
54    }
55    async fn send_welcome_messages(
56        &self,
57        request: mls_v1::SendWelcomeMessagesRequest,
58    ) -> Result<(), Self::Error> {
59        SendWelcomeMessages::builder()
60            .messages(request.messages)
61            .build()?
62            .query(&self.client)
63            .await
64    }
65    async fn query_group_messages(
66        &self,
67        group_id: GroupId,
68    ) -> Result<Vec<xmtp_proto::types::GroupMessage>, Self::Error> {
69        let topic = &TopicKind::GroupMessagesV1.create(&group_id);
70        let cursor = self
71            .cursor_store
72            .latest_per_originator(
73                topic,
74                &[
75                    &Originators::APPLICATION_MESSAGES,
76                    &Originators::MLS_COMMITS,
77                ],
78            )?
79            .max();
80        let endpoint = QueryGroupMessages::builder()
81            .group_id(group_id.to_vec())
82            .paging_info(PagingInfo {
83                limit: MAX_PAGE_SIZE,
84                direction: SortDirection::Ascending as i32,
85                id_cursor: cursor,
86            })
87            .build()?;
88        let messages = api::v3_paged(api::retry(endpoint), Some(cursor))
89            .query(&self.client)
90            .await?;
91        let messages = SequencedExtractor::builder()
92            .envelopes(messages)
93            .build::<V3GroupMessageExtractor>()
94            .get()?;
95        Ok(messages
96            .into_iter()
97            .collect::<Result<Vec<Option<_>>, _>>()?
98            .into_iter()
99            .flatten()
100            .collect())
101    }
102
103    async fn query_latest_group_message(
104        &self,
105        group_id: GroupId,
106    ) -> Result<Option<xmtp_proto::types::GroupMessage>, Self::Error> {
107        let endpoint = QueryGroupMessages::builder()
108            .group_id(group_id.to_vec())
109            .paging_info(PagingInfo {
110                limit: 1,
111                direction: SortDirection::Descending as i32,
112                id_cursor: 0,
113            })
114            .build()?;
115        let message: Option<ProtoGroupMessage> = api::retry(endpoint)
116            .query(&self.client)
117            .await?
118            .messages
119            .into_iter()
120            .next();
121        let mut extractor = V3GroupMessageExtractor::default();
122        message.as_ref().accept(&mut extractor)?;
123        Ok(extractor.get()?)
124    }
125
126    async fn query_welcome_messages(
127        &self,
128        installation_key: InstallationId,
129    ) -> Result<Vec<WelcomeMessage>, Self::Error> {
130        let topic = &TopicKind::WelcomeMessagesV1.create(installation_key);
131        let id_cursor = self
132            .cursor_store
133            .latest_for_originator(topic, &Originators::WELCOME_MESSAGES)?
134            .sequence_id;
135        let endpoint = QueryWelcomeMessages::builder()
136            .installation_key(installation_key)
137            .paging_info(PagingInfo {
138                limit: MAX_PAGE_SIZE,
139                direction: SortDirection::Ascending as i32,
140                id_cursor,
141            })
142            .build()?;
143        let messages = api::v3_paged(api::retry(endpoint), Some(id_cursor))
144            .query(&self.client)
145            .await?;
146        let messages = SequencedExtractor::builder()
147            .envelopes(messages)
148            .build::<V3WelcomeMessageExtractor>()
149            .get()?;
150        Ok(messages.into_iter().collect::<Result<_, _>>()?)
151    }
152
153    async fn publish_commit_log(
154        &self,
155        request: mls_v1::BatchPublishCommitLogRequest,
156    ) -> Result<(), Self::Error> {
157        PublishCommitLog::builder()
158            .commit_log_entries(request.requests)
159            .build()?
160            .query(&self.client)
161            .await
162    }
163
164    async fn query_commit_log(
165        &self,
166        request: mls_v1::BatchQueryCommitLogRequest,
167    ) -> Result<mls_v1::BatchQueryCommitLogResponse, Self::Error> {
168        QueryCommitLog::builder()
169            .query_log_requests(request.requests)
170            .build()?
171            .query(&self.client)
172            .await
173    }
174
175    async fn get_newest_group_message(
176        &self,
177        request: mls_v1::GetNewestGroupMessageRequest,
178    ) -> Result<Vec<Option<GroupMessageMetadata>>, Self::Error> {
179        let responses = GetNewestGroupMessage::builder()
180            .group_ids(request.group_ids)
181            .build()?
182            .query(&self.client)
183            .await?;
184
185        let extractor =
186            CollectionExtractor::new(responses.responses, MessageMetadataExtractor::new());
187        let responses = extractor.get()?;
188
189        Ok(responses)
190    }
191}