xmtp_api_d14n/queries/v3/
streams.rs

1use crate::protocol::CursorStore;
2use crate::{V3Client, v3::*};
3use xmtp_api_grpc::streams::{TryFromItem, try_from_stream};
4use xmtp_common::RetryableError;
5use xmtp_proto::api::{ApiClientError, Client, QueryStream, XmtpStream};
6use xmtp_proto::api_client::XmtpMlsStreams;
7use xmtp_proto::mls_v1::subscribe_group_messages_request::Filter as GroupSubscribeFilter;
8use xmtp_proto::mls_v1::subscribe_welcome_messages_request::Filter as WelcomeSubscribeFilter;
9use xmtp_proto::types::{
10    GroupId, GroupMessage, InstallationId, TopicCursor, TopicKind, WelcomeMessage,
11};
12
13#[xmtp_common::async_trait]
14impl<C, Store, E> XmtpMlsStreams for V3Client<C, Store>
15where
16    C: Client<Error = E>,
17    E: RetryableError + 'static,
18    Store: CursorStore,
19{
20    type GroupMessageStream =
21        TryFromItem<XmtpStream<<C as Client>::Stream, V3ProtoGroupMessage>, GroupMessage>;
22
23    type WelcomeMessageStream =
24        TryFromItem<XmtpStream<<C as Client>::Stream, V3ProtoWelcomeMessage>, WelcomeMessage>;
25
26    type Error = ApiClientError<E>;
27
28    async fn subscribe_group_messages(
29        &self,
30        group_ids: &[&GroupId],
31    ) -> Result<Self::GroupMessageStream, Self::Error> {
32        let topics = group_ids
33            .iter()
34            .map(|gid| TopicKind::GroupMessagesV1.create(gid))
35            .collect::<Vec<_>>();
36        let cursors = self.cursor_store.latest_for_topics(&mut topics.iter())?;
37
38        let mut filters = vec![];
39        for topic in &topics {
40            let cursor = cursors.get(topic).cloned().unwrap_or_default().max();
41            tracing::debug!("subscribing to {topic} @ {cursor}");
42            filters.push(GroupSubscribeFilter {
43                group_id: topic.identifier().to_vec(),
44                id_cursor: cursor,
45            })
46        }
47
48        Ok(try_from_stream(
49            SubscribeGroupMessages::builder()
50                .filters(filters)
51                .build()?
52                .stream(&self.client)
53                .await?,
54        ))
55    }
56
57    async fn subscribe_group_messages_with_cursors(
58        &self,
59        topics: &TopicCursor,
60    ) -> Result<Self::GroupMessageStream, Self::Error> {
61        let mut filters = vec![];
62        for (group_id, cursor) in topics.groups() {
63            let id_cursor = cursor.max();
64            tracing::debug!(
65                "subscribing to group {} @ cursor {}",
66                hex::encode(group_id),
67                id_cursor
68            );
69            filters.push(GroupSubscribeFilter {
70                group_id: group_id.to_vec(),
71                id_cursor,
72            })
73        }
74
75        Ok(try_from_stream(
76            SubscribeGroupMessages::builder()
77                .filters(filters)
78                .build()?
79                .stream(&self.client)
80                .await?,
81        ))
82    }
83
84    async fn subscribe_welcome_messages(
85        &self,
86        installations: &[&InstallationId],
87    ) -> Result<Self::WelcomeMessageStream, Self::Error> {
88        let topics = installations
89            .iter()
90            .map(|id| TopicKind::WelcomeMessagesV1.create(id))
91            .collect::<Vec<_>>();
92        let cursors = self.cursor_store.latest_for_topics(&mut topics.iter())?;
93
94        let mut filters = vec![];
95        for topic in &topics {
96            let id_cursor = cursors.get(topic).cloned().unwrap_or_default().v3_welcome();
97            filters.push(WelcomeSubscribeFilter {
98                installation_key: topic.identifier().to_vec(),
99                id_cursor,
100            })
101        }
102
103        Ok(try_from_stream(
104            SubscribeWelcomeMessages::builder()
105                .filters(filters)
106                .build()?
107                .stream(&self.client)
108                .await?,
109        ))
110    }
111}