xmtp_api_d14n/queries/d14n/
mls.rs1use super::D14nClient;
2use crate::d14n::GetNewestEnvelopes;
3use crate::d14n::PublishClientEnvelopes;
4use crate::d14n::QueryEnvelope;
5use crate::protocol::CollectionExtractor;
6use crate::protocol::CursorStore;
7use crate::protocol::EnvelopeError;
8use crate::protocol::GroupMessageExtractor;
9use crate::protocol::KeyPackagesExtractor;
10use crate::protocol::MessageMetadataExtractor;
11use crate::protocol::ProtocolEnvelope;
12use crate::protocol::SequencedExtractor;
13use crate::protocol::WelcomeMessageExtractor;
14use crate::protocol::resolve;
15use crate::protocol::traits::Envelope;
16use crate::protocol::traits::EnvelopeCollection;
17use crate::protocol::traits::Extractor;
18use crate::queries::D14nCombinatorExt;
19use xmtp_common::RetryableError;
20use xmtp_configuration::MAX_PAGE_SIZE;
21use xmtp_proto::api;
22use xmtp_proto::api::Client;
23use xmtp_proto::api::EndpointExt;
24use xmtp_proto::api::{ApiClientError, Query};
25use xmtp_proto::api_client::XmtpMlsClient;
26use xmtp_proto::mls_v1;
27use xmtp_proto::mls_v1::BatchQueryCommitLogResponse;
28use xmtp_proto::types::GroupId;
29use xmtp_proto::types::GroupMessageMetadata;
30use xmtp_proto::types::InstallationId;
31use xmtp_proto::types::Topic;
32use xmtp_proto::types::TopicCursor;
33use xmtp_proto::types::TopicKind;
34use xmtp_proto::types::WelcomeMessage;
35use xmtp_proto::xmtp::xmtpv4::envelopes::ClientEnvelope;
36use xmtp_proto::xmtp::xmtpv4::message_api::GetNewestEnvelopeResponse;
37
38#[xmtp_common::async_trait]
39impl<C, Store, E> XmtpMlsClient for D14nClient<C, Store>
40where
41 E: RetryableError + 'static,
42 C: Client<Error = E>,
43 ApiClientError<E>: From<ApiClientError<<C as xmtp_proto::api::Client>::Error>> + 'static,
44 Store: CursorStore,
45{
46 type Error = ApiClientError<E>;
47
48 #[tracing::instrument(level = "trace", skip_all)]
49 async fn upload_key_package(
50 &self,
51 request: mls_v1::UploadKeyPackageRequest,
52 ) -> Result<(), Self::Error> {
53 let envelopes = request.client_envelope()?;
54 api::ignore(
55 PublishClientEnvelopes::builder()
56 .envelope(envelopes)
57 .build()?,
58 )
59 .query(&self.client)
60 .await?;
61
62 Ok::<_, Self::Error>(())
63 }
64
65 #[tracing::instrument(level = "trace", skip_all)]
66 async fn fetch_key_packages(
67 &self,
68 request: mls_v1::FetchKeyPackagesRequest,
69 ) -> Result<mls_v1::FetchKeyPackagesResponse, Self::Error> {
70 let topics = request
71 .installation_keys
72 .iter()
73 .map(Topic::new_key_package)
74 .map(Into::into)
75 .collect();
76
77 let result: GetNewestEnvelopeResponse = GetNewestEnvelopes::builder()
78 .topics(topics)
79 .build()?
80 .query(&self.client)
81 .await?;
82 tracing::info!("got {} envelopes", result.results.len());
83 let extractor = CollectionExtractor::new(result.results, KeyPackagesExtractor::new());
84 let key_packages = extractor.get()?;
85 Ok(mls_v1::FetchKeyPackagesResponse { key_packages })
86 }
87
88 #[tracing::instrument(level = "trace", skip_all)]
89 async fn send_group_messages(
90 &self,
91 request: mls_v1::SendGroupMessagesRequest,
92 ) -> Result<(), Self::Error> {
93 let hashes = request.messages.sha256_hashes()?;
94 let mut dependencies = self.cursor_store.find_message_dependencies(
95 hashes
96 .iter()
97 .map(AsRef::as_ref)
98 .collect::<Vec<_>>()
99 .as_slice(),
100 )?;
101 let mut envelopes: Vec<ClientEnvelope> = request.messages.client_envelopes()?;
102 envelopes.iter_mut().try_for_each(|envelope| {
103 let data = envelope.sha256_hash()?;
104 let dependency = dependencies.remove(&data);
105 let mut aad = envelope.aad.clone().unwrap_or_default();
106 aad.depends_on = dependency.map(Into::into);
107 envelope.aad = Some(aad);
108 Ok(())
109 })?;
110
111 PublishClientEnvelopes::builder()
112 .envelopes(envelopes)
113 .build()?
114 .ignore_response()
115 .query(&self.client)
116 .await?;
117
118 Ok(())
119 }
120
121 #[tracing::instrument(level = "debug", skip_all)]
122 async fn send_welcome_messages(
123 &self,
124 request: mls_v1::SendWelcomeMessagesRequest,
125 ) -> Result<(), Self::Error> {
126 let envelopes = request.messages.client_envelopes()?;
127
128 api::ignore(
129 PublishClientEnvelopes::builder()
130 .envelopes(envelopes)
131 .build()?,
132 )
133 .query(&self.client)
134 .await?;
135 Ok(())
136 }
137
138 #[tracing::instrument(level = "debug", skip(self))]
139 async fn query_group_messages(
140 &self,
141 group_id: GroupId,
142 ) -> Result<Vec<xmtp_proto::types::GroupMessage>, Self::Error> {
143 let topic = TopicKind::GroupMessagesV1.create(&group_id);
144 let lcc = self.cursor_store.lowest_common_cursor(&[&topic])?;
145 tracing::debug!(%topic, %lcc, "querying messages");
146 let mut topic_cursor = TopicCursor::default();
147 topic_cursor.insert(topic.clone(), lcc.clone());
148 let resolver = resolve::network_backoff(&self.client);
149 let response = QueryEnvelope::builder()
150 .topic(topic)
151 .last_seen(lcc)
152 .limit(MAX_PAGE_SIZE)
153 .build()?
154 .ordered(resolver, topic_cursor, &self.cursor_store)
155 .query(&self.client)
156 .await?;
157
158 let messages = SequencedExtractor::builder()
159 .envelopes(response)
160 .build::<GroupMessageExtractor>()
161 .get()?;
162 Ok(messages
163 .into_iter()
164 .map(|i| i.map_err(EnvelopeError::from))
165 .collect::<Result<_, _>>()?)
166 }
167
168 #[tracing::instrument(level = "trace", skip_all)]
169 async fn query_latest_group_message(
170 &self,
171 group_id: GroupId,
172 ) -> Result<Option<xmtp_proto::types::GroupMessage>, Self::Error> {
173 let response: GetNewestEnvelopeResponse = GetNewestEnvelopes::builder()
174 .topic(Topic::new_group_message(group_id))
175 .build()?
176 .query(&self.client)
177 .await?;
178 let mut extractor = GroupMessageExtractor::default();
180 if response.results.is_empty() {
181 return Ok(None);
182 }
183 response
184 .results
185 .into_iter()
186 .next()
187 .as_ref()
188 .accept(&mut extractor)?;
189 Ok(Some(extractor.get().map_err(EnvelopeError::from)?))
190 }
191
192 #[tracing::instrument(level = "info", skip(self))]
193 async fn query_welcome_messages(
194 &self,
195 installation_key: InstallationId,
196 ) -> Result<Vec<WelcomeMessage>, Self::Error> {
197 let topic = TopicKind::WelcomeMessagesV1.create(installation_key);
198 let lcc = self.cursor_store.lowest_common_cursor(&[&topic])?;
199 tracing::info!("querying welcomes @{:?}", lcc);
200 let response = QueryEnvelope::builder()
201 .topic(topic)
202 .last_seen(lcc)
203 .limit(MAX_PAGE_SIZE)
204 .build()?
205 .query(&self.client)
206 .await?;
207
208 let messages = SequencedExtractor::builder()
209 .envelopes(response.envelopes)
210 .build::<WelcomeMessageExtractor>()
211 .get()?;
212 Ok(messages
213 .into_iter()
214 .map(|i| i.map_err(EnvelopeError::from))
215 .collect::<Result<_, _>>()?)
216 }
217
218 #[tracing::instrument(level = "debug", skip_all)]
219 async fn publish_commit_log(
220 &self,
221 _request: mls_v1::BatchPublishCommitLogRequest,
222 ) -> Result<(), Self::Error> {
223 Ok(())
224 }
225
226 async fn query_commit_log(
227 &self,
228 _request: mls_v1::BatchQueryCommitLogRequest,
229 ) -> Result<mls_v1::BatchQueryCommitLogResponse, Self::Error> {
230 tracing::debug!("commit log disabled for d14n");
231 Ok(BatchQueryCommitLogResponse { responses: vec![] })
232 }
233
234 async fn get_newest_group_message(
235 &self,
236 request: mls_v1::GetNewestGroupMessageRequest,
237 ) -> Result<Vec<Option<GroupMessageMetadata>>, Self::Error> {
238 let topics: Vec<Vec<u8>> = request
239 .group_ids
240 .into_iter()
241 .map(Topic::new_group_message)
242 .map(Into::into)
243 .collect();
244
245 let response = GetNewestEnvelopes::builder()
246 .topics(topics)
247 .build()?
248 .query(&self.client)
249 .await?;
250
251 let extractor = CollectionExtractor::new(response.results, MessageMetadataExtractor::new());
252 let responses = extractor.get()?;
253
254 Ok(responses)
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 use crate::{
262 protocol::traits::{EnvelopeVisitor, Extractor},
263 queries::d14n::test::{TestCursorStore, group_message_request},
264 };
265 use futures::FutureExt;
266 use proptest::prelude::*;
267 use prost::Message;
268 use xmtp_proto::xmtp::xmtpv4::message_api::get_newest_envelope_response;
269 use xmtp_proto::xmtp::xmtpv4::payer_api::PublishClientEnvelopesRequest;
270
271 #[xmtp_common::test]
272 fn test_group_message_response_extractor_with_empty_envelope() {
273 let response = get_newest_envelope_response::Response {
274 originator_envelope: None,
275 };
276
277 let mut extractor = MessageMetadataExtractor::new();
278
279 let result = extractor.visit_newest_envelope_response(&response);
281 assert!(
282 result.is_ok(),
283 "Extractor should handle empty response without error"
284 );
285
286 let responses = extractor.get();
287 assert_eq!(responses.len(), 1, "Should create exactly one response");
288
289 let extracted_response = &responses[0];
290 assert!(
291 extracted_response.is_none(),
292 "Should have no group message for empty envelope"
293 );
294 }
295
296 #[xmtp_common::test]
297 fn test_group_message_response_extractor_builder_pattern() {
298 let extractor = MessageMetadataExtractor::new();
300 let responses = extractor.get();
301 assert_eq!(responses.len(), 0, "New extractor should have no responses");
302
303 let extractor2: MessageMetadataExtractor = Default::default();
305 let responses2 = extractor2.get();
306 assert_eq!(
307 responses2.len(),
308 0,
309 "Default extractor should have no responses"
310 );
311 }
312
313 proptest! {
314 #[xmtp_common::test]
315 fn test_send_group_messages_with_dependencies(generated in group_message_request(15)) {
316 let mut client = D14nClient::new_mock_with_store(TestCursorStore::default());
317 client.cursor_store.dependencies = generated.dependencies;
318 let request = generated.request.clone();
319 client.client.expect_request().times(1).returning(move |_,_, mut body| {
320 let body: PublishClientEnvelopesRequest = prost::Message::decode(&mut body).unwrap();
321 for e in body.envelopes {
322 assert!(e.aad.is_some());
323 let aad = e.aad.unwrap();
324 assert!(aad.depends_on.is_some());
325 }
326 Ok(http::Response::new(request.encode_to_vec().into()))
327 });
328
329 client.send_group_messages(generated.request).now_or_never().unwrap().unwrap();
330 }
331 }
332}