xmtp_api_d14n/queries/v3/
mls.rs1use crate::protocol::{
2 CollectionExtractor, CursorStore, MessageMetadataExtractor, ProtocolEnvelope,
3 V3WelcomeMessageExtractor,
4};
5use crate::protocol::{SequencedExtractor, V3GroupMessageExtractor, traits::Extractor};
6use crate::{V3Client, v3::*};
7use xmtp_common::RetryableError;
8use xmtp_configuration::{MAX_PAGE_SIZE, Originators};
9use xmtp_proto::api::{self, ApiClientError, Client, Query};
10use xmtp_proto::api_client::XmtpMlsClient;
11use xmtp_proto::mls_v1::{self, GroupMessage as ProtoGroupMessage, PagingInfo, SortDirection};
12use xmtp_proto::types::{GroupId, GroupMessageMetadata, InstallationId, TopicKind, WelcomeMessage};
13
14#[xmtp_common::async_trait]
15impl<C, Store, E> XmtpMlsClient for V3Client<C, Store>
16where
17 E: RetryableError + 'static,
18 C: Client<Error = E>,
19 ApiClientError<E>: From<ApiClientError<<C as Client>::Error>> + 'static,
20 Store: CursorStore,
21{
22 type Error = ApiClientError<E>;
23
24 async fn upload_key_package(
25 &self,
26 request: mls_v1::UploadKeyPackageRequest,
27 ) -> Result<(), Self::Error> {
28 UploadKeyPackage::builder()
29 .key_package(request.key_package)
30 .is_inbox_id_credential(request.is_inbox_id_credential)
31 .build()?
32 .query(&self.client)
33 .await
34 }
35 async fn fetch_key_packages(
36 &self,
37 request: mls_v1::FetchKeyPackagesRequest,
38 ) -> Result<mls_v1::FetchKeyPackagesResponse, Self::Error> {
39 FetchKeyPackages::builder()
40 .installation_keys(request.installation_keys)
41 .build()?
42 .query(&self.client)
43 .await
44 }
45 async fn send_group_messages(
46 &self,
47 request: mls_v1::SendGroupMessagesRequest,
48 ) -> Result<(), Self::Error> {
49 SendGroupMessages::builder()
50 .messages(request.messages)
51 .build()?
52 .query(&self.client)
53 .await
54 }
55 async fn send_welcome_messages(
56 &self,
57 request: mls_v1::SendWelcomeMessagesRequest,
58 ) -> Result<(), Self::Error> {
59 SendWelcomeMessages::builder()
60 .messages(request.messages)
61 .build()?
62 .query(&self.client)
63 .await
64 }
65 async fn query_group_messages(
66 &self,
67 group_id: GroupId,
68 ) -> Result<Vec<xmtp_proto::types::GroupMessage>, Self::Error> {
69 let topic = &TopicKind::GroupMessagesV1.create(&group_id);
70 let cursor = self
71 .cursor_store
72 .latest_per_originator(
73 topic,
74 &[
75 &Originators::APPLICATION_MESSAGES,
76 &Originators::MLS_COMMITS,
77 ],
78 )?
79 .max();
80 let endpoint = QueryGroupMessages::builder()
81 .group_id(group_id.to_vec())
82 .paging_info(PagingInfo {
83 limit: MAX_PAGE_SIZE,
84 direction: SortDirection::Ascending as i32,
85 id_cursor: cursor,
86 })
87 .build()?;
88 let messages = api::v3_paged(api::retry(endpoint), Some(cursor))
89 .query(&self.client)
90 .await?;
91 let messages = SequencedExtractor::builder()
92 .envelopes(messages)
93 .build::<V3GroupMessageExtractor>()
94 .get()?;
95 Ok(messages
96 .into_iter()
97 .collect::<Result<Vec<Option<_>>, _>>()?
98 .into_iter()
99 .flatten()
100 .collect())
101 }
102
103 async fn query_latest_group_message(
104 &self,
105 group_id: GroupId,
106 ) -> Result<Option<xmtp_proto::types::GroupMessage>, Self::Error> {
107 let endpoint = QueryGroupMessages::builder()
108 .group_id(group_id.to_vec())
109 .paging_info(PagingInfo {
110 limit: 1,
111 direction: SortDirection::Descending as i32,
112 id_cursor: 0,
113 })
114 .build()?;
115 let message: Option<ProtoGroupMessage> = api::retry(endpoint)
116 .query(&self.client)
117 .await?
118 .messages
119 .into_iter()
120 .next();
121 let mut extractor = V3GroupMessageExtractor::default();
122 message.as_ref().accept(&mut extractor)?;
123 Ok(extractor.get()?)
124 }
125
126 async fn query_welcome_messages(
127 &self,
128 installation_key: InstallationId,
129 ) -> Result<Vec<WelcomeMessage>, Self::Error> {
130 let topic = &TopicKind::WelcomeMessagesV1.create(installation_key);
131 let id_cursor = self
132 .cursor_store
133 .latest_for_originator(topic, &Originators::WELCOME_MESSAGES)?
134 .sequence_id;
135 let endpoint = QueryWelcomeMessages::builder()
136 .installation_key(installation_key)
137 .paging_info(PagingInfo {
138 limit: MAX_PAGE_SIZE,
139 direction: SortDirection::Ascending as i32,
140 id_cursor,
141 })
142 .build()?;
143 let messages = api::v3_paged(api::retry(endpoint), Some(id_cursor))
144 .query(&self.client)
145 .await?;
146 let messages = SequencedExtractor::builder()
147 .envelopes(messages)
148 .build::<V3WelcomeMessageExtractor>()
149 .get()?;
150 Ok(messages.into_iter().collect::<Result<_, _>>()?)
151 }
152
153 async fn publish_commit_log(
154 &self,
155 request: mls_v1::BatchPublishCommitLogRequest,
156 ) -> Result<(), Self::Error> {
157 PublishCommitLog::builder()
158 .commit_log_entries(request.requests)
159 .build()?
160 .query(&self.client)
161 .await
162 }
163
164 async fn query_commit_log(
165 &self,
166 request: mls_v1::BatchQueryCommitLogRequest,
167 ) -> Result<mls_v1::BatchQueryCommitLogResponse, Self::Error> {
168 QueryCommitLog::builder()
169 .query_log_requests(request.requests)
170 .build()?
171 .query(&self.client)
172 .await
173 }
174
175 async fn get_newest_group_message(
176 &self,
177 request: mls_v1::GetNewestGroupMessageRequest,
178 ) -> Result<Vec<Option<GroupMessageMetadata>>, Self::Error> {
179 let responses = GetNewestGroupMessage::builder()
180 .group_ids(request.group_ids)
181 .build()?
182 .query(&self.client)
183 .await?;
184
185 let extractor =
186 CollectionExtractor::new(responses.responses, MessageMetadataExtractor::new());
187 let responses = extractor.get()?;
188
189 Ok(responses)
190 }
191}