xmtp_api_d14n/queries/d14n/
streams.rs1use crate::d14n::SubscribeEnvelopes;
2use crate::protocol::{CursorStore, GroupMessageExtractor, WelcomeMessageExtractor};
3use crate::queries::stream;
4use crate::{FlattenedStream, OrderedStream, TryExtractorStream};
5
6use super::D14nClient;
7use xmtp_common::RetryableError;
8use xmtp_proto::api::{ApiClientError, Client, QueryStream, XmtpStream};
9use xmtp_proto::api_client::{Paged, XmtpMlsStreams};
10use xmtp_proto::types::{GroupId, InstallationId, TopicCursor, TopicKind};
11use xmtp_proto::xmtp::xmtpv4::message_api::SubscribeEnvelopesResponse;
12
13type PagedItem = <SubscribeEnvelopesResponse as Paged>::Message;
14
15type OrderedStreamT<C, Store> = OrderedStream<
16 FlattenedStream<XmtpStream<<C as Client>::Stream, SubscribeEnvelopesResponse>>,
17 Store,
18 PagedItem,
19>;
20
21#[xmtp_common::async_trait]
22impl<C, Store, E> XmtpMlsStreams for D14nClient<C, Store>
23where
24 C: Client<Error = E>,
25 <C as Client>::Stream: 'static,
26 E: RetryableError + 'static,
27 Store: CursorStore + Clone,
28{
29 type Error = ApiClientError<E>;
30
31 type GroupMessageStream = TryExtractorStream<OrderedStreamT<C, Store>, GroupMessageExtractor>;
32
33 type WelcomeMessageStream = TryExtractorStream<
34 XmtpStream<<C as Client>::Stream, SubscribeEnvelopesResponse>,
35 WelcomeMessageExtractor,
36 >;
37
38 async fn subscribe_group_messages(
39 &self,
40 group_ids: &[&GroupId],
41 ) -> Result<Self::GroupMessageStream, Self::Error> {
42 if group_ids.is_empty() {
43 let s = SubscribeEnvelopes::builder()
44 .build()?
45 .fake_stream(&self.client);
46 let s = stream::ordered(
47 stream::flattened(s),
48 self.cursor_store.clone(),
49 TopicCursor::default(),
50 );
51 return Ok(stream::try_extractor(s));
52 }
53 let topics = group_ids
54 .iter()
55 .map(|gid| TopicKind::GroupMessagesV1.create(gid))
56 .collect::<Vec<_>>();
57 let lcc = self
58 .cursor_store
59 .lcc_maybe_missing(&topics.iter().collect::<Vec<_>>())?;
60 let topic_cursor: TopicCursor = self
61 .cursor_store
62 .latest_for_topics(&mut topics.iter())?
63 .into();
64 tracing::debug!("subscribing to messages @cursor={}", lcc);
65 let s = SubscribeEnvelopes::builder()
66 .topics(topics)
67 .last_seen(lcc)
68 .build()?
69 .stream(&self.client)
70 .await?;
71 let s = stream::ordered(
72 stream::flattened(s),
73 self.cursor_store.clone(),
74 topic_cursor,
75 );
76 Ok(stream::try_extractor(s))
77 }
78
79 async fn subscribe_group_messages_with_cursors(
80 &self,
81 topics: &TopicCursor,
82 ) -> Result<Self::GroupMessageStream, Self::Error> {
83 if topics.is_empty() {
84 let s = SubscribeEnvelopes::builder()
85 .build()?
86 .fake_stream(&self.client);
87 let s = stream::ordered(
88 stream::flattened(s),
89 self.cursor_store.clone(),
90 TopicCursor::default(),
91 );
92 return Ok(stream::try_extractor(s));
93 }
94 let lcc = topics.lcc();
96 tracing::debug!(
97 "subscribing to messages with provided cursors @cursor={}",
98 lcc
99 );
100 let s = SubscribeEnvelopes::builder()
101 .topics(topics.topics())
102 .last_seen(lcc)
103 .build()?
104 .stream(&self.client)
105 .await?;
106 let s = stream::ordered(
107 stream::flattened(s),
108 self.cursor_store.clone(),
109 topics.clone(),
110 );
111 Ok(stream::try_extractor(s))
112 }
113
114 async fn subscribe_welcome_messages(
115 &self,
116 installations: &[&InstallationId],
117 ) -> Result<Self::WelcomeMessageStream, Self::Error> {
118 if installations.is_empty() {
119 let s = SubscribeEnvelopes::builder()
120 .build()?
121 .fake_stream(&self.client);
122 return Ok(stream::try_extractor(s));
123 }
124 let topics = installations
125 .iter()
126 .map(|ins| TopicKind::WelcomeMessagesV1.create(ins))
127 .collect::<Vec<_>>();
128 let lcc = self
129 .cursor_store
130 .lowest_common_cursor(&topics.iter().collect::<Vec<_>>())?;
131 let s = SubscribeEnvelopes::builder()
132 .topics(topics)
133 .last_seen(lcc)
134 .build()?
135 .stream(&self.client)
136 .await?;
137 Ok(stream::try_extractor(s))
138 }
139}