xmtp_api_d14n/queries/d14n/
mls.rs

1use super::D14nClient;
2use crate::d14n::GetNewestEnvelopes;
3use crate::d14n::PublishClientEnvelopes;
4use crate::d14n::QueryEnvelope;
5use crate::protocol::CollectionExtractor;
6use crate::protocol::CursorStore;
7use crate::protocol::EnvelopeError;
8use crate::protocol::GroupMessageExtractor;
9use crate::protocol::KeyPackagesExtractor;
10use crate::protocol::MessageMetadataExtractor;
11use crate::protocol::ProtocolEnvelope;
12use crate::protocol::SequencedExtractor;
13use crate::protocol::WelcomeMessageExtractor;
14use crate::protocol::resolve;
15use crate::protocol::traits::Envelope;
16use crate::protocol::traits::EnvelopeCollection;
17use crate::protocol::traits::Extractor;
18use crate::queries::D14nCombinatorExt;
19use xmtp_common::RetryableError;
20use xmtp_configuration::MAX_PAGE_SIZE;
21use xmtp_proto::api;
22use xmtp_proto::api::Client;
23use xmtp_proto::api::EndpointExt;
24use xmtp_proto::api::{ApiClientError, Query};
25use xmtp_proto::api_client::XmtpMlsClient;
26use xmtp_proto::mls_v1;
27use xmtp_proto::mls_v1::BatchQueryCommitLogResponse;
28use xmtp_proto::types::GroupId;
29use xmtp_proto::types::GroupMessageMetadata;
30use xmtp_proto::types::InstallationId;
31use xmtp_proto::types::Topic;
32use xmtp_proto::types::TopicCursor;
33use xmtp_proto::types::TopicKind;
34use xmtp_proto::types::WelcomeMessage;
35use xmtp_proto::xmtp::xmtpv4::envelopes::ClientEnvelope;
36use xmtp_proto::xmtp::xmtpv4::message_api::GetNewestEnvelopeResponse;
37
38#[xmtp_common::async_trait]
39impl<C, Store, E> XmtpMlsClient for D14nClient<C, Store>
40where
41    E: RetryableError + 'static,
42    C: Client<Error = E>,
43    ApiClientError<E>: From<ApiClientError<<C as xmtp_proto::api::Client>::Error>> + 'static,
44    Store: CursorStore,
45{
46    type Error = ApiClientError<E>;
47
48    #[tracing::instrument(level = "trace", skip_all)]
49    async fn upload_key_package(
50        &self,
51        request: mls_v1::UploadKeyPackageRequest,
52    ) -> Result<(), Self::Error> {
53        let envelopes = request.client_envelope()?;
54        api::ignore(
55            PublishClientEnvelopes::builder()
56                .envelope(envelopes)
57                .build()?,
58        )
59        .query(&self.client)
60        .await?;
61
62        Ok::<_, Self::Error>(())
63    }
64
65    #[tracing::instrument(level = "trace", skip_all)]
66    async fn fetch_key_packages(
67        &self,
68        request: mls_v1::FetchKeyPackagesRequest,
69    ) -> Result<mls_v1::FetchKeyPackagesResponse, Self::Error> {
70        let topics = request
71            .installation_keys
72            .iter()
73            .map(Topic::new_key_package)
74            .map(Into::into)
75            .collect();
76
77        let result: GetNewestEnvelopeResponse = GetNewestEnvelopes::builder()
78            .topics(topics)
79            .build()?
80            .query(&self.client)
81            .await?;
82        tracing::info!("got {} envelopes", result.results.len());
83        let extractor = CollectionExtractor::new(result.results, KeyPackagesExtractor::new());
84        let key_packages = extractor.get()?;
85        Ok(mls_v1::FetchKeyPackagesResponse { key_packages })
86    }
87
88    #[tracing::instrument(level = "trace", skip_all)]
89    async fn send_group_messages(
90        &self,
91        request: mls_v1::SendGroupMessagesRequest,
92    ) -> Result<(), Self::Error> {
93        let hashes = request.messages.sha256_hashes()?;
94        let mut dependencies = self.cursor_store.find_message_dependencies(
95            hashes
96                .iter()
97                .map(AsRef::as_ref)
98                .collect::<Vec<_>>()
99                .as_slice(),
100        )?;
101        let mut envelopes: Vec<ClientEnvelope> = request.messages.client_envelopes()?;
102        envelopes.iter_mut().try_for_each(|envelope| {
103            let data = envelope.sha256_hash()?;
104            let dependency = dependencies.remove(&data);
105            let mut aad = envelope.aad.clone().unwrap_or_default();
106            aad.depends_on = dependency.map(Into::into);
107            envelope.aad = Some(aad);
108            Ok(())
109        })?;
110
111        PublishClientEnvelopes::builder()
112            .envelopes(envelopes)
113            .build()?
114            .ignore_response()
115            .query(&self.client)
116            .await?;
117
118        Ok(())
119    }
120
121    #[tracing::instrument(level = "debug", skip_all)]
122    async fn send_welcome_messages(
123        &self,
124        request: mls_v1::SendWelcomeMessagesRequest,
125    ) -> Result<(), Self::Error> {
126        let envelopes = request.messages.client_envelopes()?;
127
128        api::ignore(
129            PublishClientEnvelopes::builder()
130                .envelopes(envelopes)
131                .build()?,
132        )
133        .query(&self.client)
134        .await?;
135        Ok(())
136    }
137
138    #[tracing::instrument(level = "debug", skip(self))]
139    async fn query_group_messages(
140        &self,
141        group_id: GroupId,
142    ) -> Result<Vec<xmtp_proto::types::GroupMessage>, Self::Error> {
143        let topic = TopicKind::GroupMessagesV1.create(&group_id);
144        let lcc = self.cursor_store.lowest_common_cursor(&[&topic])?;
145        tracing::debug!(%topic, %lcc, "querying messages");
146        let mut topic_cursor = TopicCursor::default();
147        topic_cursor.insert(topic.clone(), lcc.clone());
148        let resolver = resolve::network_backoff(&self.client);
149        let response = QueryEnvelope::builder()
150            .topic(topic)
151            .last_seen(lcc)
152            .limit(MAX_PAGE_SIZE)
153            .build()?
154            .ordered(resolver, topic_cursor, &self.cursor_store)
155            .query(&self.client)
156            .await?;
157
158        let messages = SequencedExtractor::builder()
159            .envelopes(response)
160            .build::<GroupMessageExtractor>()
161            .get()?;
162        Ok(messages
163            .into_iter()
164            .map(|i| i.map_err(EnvelopeError::from))
165            .collect::<Result<_, _>>()?)
166    }
167
168    #[tracing::instrument(level = "trace", skip_all)]
169    async fn query_latest_group_message(
170        &self,
171        group_id: GroupId,
172    ) -> Result<Option<xmtp_proto::types::GroupMessage>, Self::Error> {
173        let response: GetNewestEnvelopeResponse = GetNewestEnvelopes::builder()
174            .topic(Topic::new_group_message(group_id))
175            .build()?
176            .query(&self.client)
177            .await?;
178        // expect at most a single message
179        let mut extractor = GroupMessageExtractor::default();
180        if response.results.is_empty() {
181            return Ok(None);
182        }
183        response
184            .results
185            .into_iter()
186            .next()
187            .as_ref()
188            .accept(&mut extractor)?;
189        Ok(Some(extractor.get().map_err(EnvelopeError::from)?))
190    }
191
192    #[tracing::instrument(level = "info", skip(self))]
193    async fn query_welcome_messages(
194        &self,
195        installation_key: InstallationId,
196    ) -> Result<Vec<WelcomeMessage>, Self::Error> {
197        let topic = TopicKind::WelcomeMessagesV1.create(installation_key);
198        let lcc = self.cursor_store.lowest_common_cursor(&[&topic])?;
199        tracing::info!("querying welcomes @{:?}", lcc);
200        let response = QueryEnvelope::builder()
201            .topic(topic)
202            .last_seen(lcc)
203            .limit(MAX_PAGE_SIZE)
204            .build()?
205            .query(&self.client)
206            .await?;
207
208        let messages = SequencedExtractor::builder()
209            .envelopes(response.envelopes)
210            .build::<WelcomeMessageExtractor>()
211            .get()?;
212        Ok(messages
213            .into_iter()
214            .map(|i| i.map_err(EnvelopeError::from))
215            .collect::<Result<_, _>>()?)
216    }
217
218    #[tracing::instrument(level = "debug", skip_all)]
219    async fn publish_commit_log(
220        &self,
221        _request: mls_v1::BatchPublishCommitLogRequest,
222    ) -> Result<(), Self::Error> {
223        Ok(())
224    }
225
226    async fn query_commit_log(
227        &self,
228        _request: mls_v1::BatchQueryCommitLogRequest,
229    ) -> Result<mls_v1::BatchQueryCommitLogResponse, Self::Error> {
230        tracing::debug!("commit log disabled for d14n");
231        Ok(BatchQueryCommitLogResponse { responses: vec![] })
232    }
233
234    async fn get_newest_group_message(
235        &self,
236        request: mls_v1::GetNewestGroupMessageRequest,
237    ) -> Result<Vec<Option<GroupMessageMetadata>>, Self::Error> {
238        let topics: Vec<Vec<u8>> = request
239            .group_ids
240            .into_iter()
241            .map(Topic::new_group_message)
242            .map(Into::into)
243            .collect();
244
245        let response = GetNewestEnvelopes::builder()
246            .topics(topics)
247            .build()?
248            .query(&self.client)
249            .await?;
250
251        let extractor = CollectionExtractor::new(response.results, MessageMetadataExtractor::new());
252        let responses = extractor.get()?;
253
254        Ok(responses)
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use crate::{
262        protocol::traits::{EnvelopeVisitor, Extractor},
263        queries::d14n::test::{TestCursorStore, group_message_request},
264    };
265    use futures::FutureExt;
266    use proptest::prelude::*;
267    use prost::Message;
268    use xmtp_proto::xmtp::xmtpv4::message_api::get_newest_envelope_response;
269    use xmtp_proto::xmtp::xmtpv4::payer_api::PublishClientEnvelopesRequest;
270
271    #[xmtp_common::test]
272    fn test_group_message_response_extractor_with_empty_envelope() {
273        let response = get_newest_envelope_response::Response {
274            originator_envelope: None,
275        };
276
277        let mut extractor = MessageMetadataExtractor::new();
278
279        // Test that the extractor handles empty responses gracefully
280        let result = extractor.visit_newest_envelope_response(&response);
281        assert!(
282            result.is_ok(),
283            "Extractor should handle empty response without error"
284        );
285
286        let responses = extractor.get();
287        assert_eq!(responses.len(), 1, "Should create exactly one response");
288
289        let extracted_response = &responses[0];
290        assert!(
291            extracted_response.is_none(),
292            "Should have no group message for empty envelope"
293        );
294    }
295
296    #[xmtp_common::test]
297    fn test_group_message_response_extractor_builder_pattern() {
298        // Test that the extractor can be built and used
299        let extractor = MessageMetadataExtractor::new();
300        let responses = extractor.get();
301        assert_eq!(responses.len(), 0, "New extractor should have no responses");
302
303        // Test default construction
304        let extractor2: MessageMetadataExtractor = Default::default();
305        let responses2 = extractor2.get();
306        assert_eq!(
307            responses2.len(),
308            0,
309            "Default extractor should have no responses"
310        );
311    }
312
313    proptest! {
314        #[xmtp_common::test]
315        fn test_send_group_messages_with_dependencies(generated in group_message_request(15)) {
316            let mut client = D14nClient::new_mock_with_store(TestCursorStore::default());
317            client.cursor_store.dependencies = generated.dependencies;
318            let request = generated.request.clone();
319            client.client.expect_request().times(1).returning(move |_,_, mut body| {
320                let body: PublishClientEnvelopesRequest = prost::Message::decode(&mut body).unwrap();
321                for e in body.envelopes {
322                    assert!(e.aad.is_some());
323                    let aad = e.aad.unwrap();
324                    assert!(aad.depends_on.is_some());
325                }
326                Ok(http::Response::new(request.encode_to_vec().into()))
327            });
328
329            client.send_group_messages(generated.request).now_or_never().unwrap().unwrap();
330        }
331    }
332}