xmtp_api_d14n/queries/d14n/
streams.rs

1use crate::d14n::SubscribeEnvelopes;
2use crate::protocol::{CursorStore, GroupMessageExtractor, WelcomeMessageExtractor};
3use crate::queries::stream;
4use crate::{FlattenedStream, OrderedStream, TryExtractorStream};
5
6use super::D14nClient;
7use xmtp_common::RetryableError;
8use xmtp_proto::api::{ApiClientError, Client, QueryStream, XmtpStream};
9use xmtp_proto::api_client::{Paged, XmtpMlsStreams};
10use xmtp_proto::types::{GroupId, InstallationId, TopicCursor, TopicKind};
11use xmtp_proto::xmtp::xmtpv4::message_api::SubscribeEnvelopesResponse;
12
13type PagedItem = <SubscribeEnvelopesResponse as Paged>::Message;
14
15type OrderedStreamT<C, Store> = OrderedStream<
16    FlattenedStream<XmtpStream<<C as Client>::Stream, SubscribeEnvelopesResponse>>,
17    Store,
18    PagedItem,
19>;
20
21#[xmtp_common::async_trait]
22impl<C, Store, E> XmtpMlsStreams for D14nClient<C, Store>
23where
24    C: Client<Error = E>,
25    <C as Client>::Stream: 'static,
26    E: RetryableError + 'static,
27    Store: CursorStore + Clone,
28{
29    type Error = ApiClientError<E>;
30
31    type GroupMessageStream = TryExtractorStream<OrderedStreamT<C, Store>, GroupMessageExtractor>;
32
33    type WelcomeMessageStream = TryExtractorStream<
34        XmtpStream<<C as Client>::Stream, SubscribeEnvelopesResponse>,
35        WelcomeMessageExtractor,
36    >;
37
38    async fn subscribe_group_messages(
39        &self,
40        group_ids: &[&GroupId],
41    ) -> Result<Self::GroupMessageStream, Self::Error> {
42        if group_ids.is_empty() {
43            let s = SubscribeEnvelopes::builder()
44                .build()?
45                .fake_stream(&self.client);
46            let s = stream::ordered(
47                stream::flattened(s),
48                self.cursor_store.clone(),
49                TopicCursor::default(),
50            );
51            return Ok(stream::try_extractor(s));
52        }
53        let topics = group_ids
54            .iter()
55            .map(|gid| TopicKind::GroupMessagesV1.create(gid))
56            .collect::<Vec<_>>();
57        let lcc = self
58            .cursor_store
59            .lcc_maybe_missing(&topics.iter().collect::<Vec<_>>())?;
60        let topic_cursor: TopicCursor = self
61            .cursor_store
62            .latest_for_topics(&mut topics.iter())?
63            .into();
64        tracing::debug!("subscribing to messages @cursor={}", lcc);
65        let s = SubscribeEnvelopes::builder()
66            .topics(topics)
67            .last_seen(lcc)
68            .build()?
69            .stream(&self.client)
70            .await?;
71        let s = stream::ordered(
72            stream::flattened(s),
73            self.cursor_store.clone(),
74            topic_cursor,
75        );
76        Ok(stream::try_extractor(s))
77    }
78
79    async fn subscribe_group_messages_with_cursors(
80        &self,
81        topics: &TopicCursor,
82    ) -> Result<Self::GroupMessageStream, Self::Error> {
83        if topics.is_empty() {
84            let s = SubscribeEnvelopes::builder()
85                .build()?
86                .fake_stream(&self.client);
87            let s = stream::ordered(
88                stream::flattened(s),
89                self.cursor_store.clone(),
90                TopicCursor::default(),
91            );
92            return Ok(stream::try_extractor(s));
93        }
94        // Compute the lowest common cursor from the provided cursors
95        let lcc = topics.lcc();
96        tracing::debug!(
97            "subscribing to messages with provided cursors @cursor={}",
98            lcc
99        );
100        let s = SubscribeEnvelopes::builder()
101            .topics(topics.topics())
102            .last_seen(lcc)
103            .build()?
104            .stream(&self.client)
105            .await?;
106        let s = stream::ordered(
107            stream::flattened(s),
108            self.cursor_store.clone(),
109            topics.clone(),
110        );
111        Ok(stream::try_extractor(s))
112    }
113
114    async fn subscribe_welcome_messages(
115        &self,
116        installations: &[&InstallationId],
117    ) -> Result<Self::WelcomeMessageStream, Self::Error> {
118        if installations.is_empty() {
119            let s = SubscribeEnvelopes::builder()
120                .build()?
121                .fake_stream(&self.client);
122            return Ok(stream::try_extractor(s));
123        }
124        let topics = installations
125            .iter()
126            .map(|ins| TopicKind::WelcomeMessagesV1.create(ins))
127            .collect::<Vec<_>>();
128        let lcc = self
129            .cursor_store
130            .lowest_common_cursor(&topics.iter().collect::<Vec<_>>())?;
131        let s = SubscribeEnvelopes::builder()
132            .topics(topics)
133            .last_seen(lcc)
134            .build()?
135            .stream(&self.client)
136            .await?;
137        Ok(stream::try_extractor(s))
138    }
139}