xmtp_api_d14n/queries/v3/
streams.rs1use crate::protocol::CursorStore;
2use crate::{V3Client, v3::*};
3use xmtp_api_grpc::streams::{TryFromItem, try_from_stream};
4use xmtp_common::RetryableError;
5use xmtp_proto::api::{ApiClientError, Client, QueryStream, XmtpStream};
6use xmtp_proto::api_client::XmtpMlsStreams;
7use xmtp_proto::mls_v1::subscribe_group_messages_request::Filter as GroupSubscribeFilter;
8use xmtp_proto::mls_v1::subscribe_welcome_messages_request::Filter as WelcomeSubscribeFilter;
9use xmtp_proto::types::{
10 GroupId, GroupMessage, InstallationId, TopicCursor, TopicKind, WelcomeMessage,
11};
12
13#[xmtp_common::async_trait]
14impl<C, Store, E> XmtpMlsStreams for V3Client<C, Store>
15where
16 C: Client<Error = E>,
17 E: RetryableError + 'static,
18 Store: CursorStore,
19{
20 type GroupMessageStream =
21 TryFromItem<XmtpStream<<C as Client>::Stream, V3ProtoGroupMessage>, GroupMessage>;
22
23 type WelcomeMessageStream =
24 TryFromItem<XmtpStream<<C as Client>::Stream, V3ProtoWelcomeMessage>, WelcomeMessage>;
25
26 type Error = ApiClientError<E>;
27
28 async fn subscribe_group_messages(
29 &self,
30 group_ids: &[&GroupId],
31 ) -> Result<Self::GroupMessageStream, Self::Error> {
32 let topics = group_ids
33 .iter()
34 .map(|gid| TopicKind::GroupMessagesV1.create(gid))
35 .collect::<Vec<_>>();
36 let cursors = self.cursor_store.latest_for_topics(&mut topics.iter())?;
37
38 let mut filters = vec![];
39 for topic in &topics {
40 let cursor = cursors.get(topic).cloned().unwrap_or_default().max();
41 tracing::debug!("subscribing to {topic} @ {cursor}");
42 filters.push(GroupSubscribeFilter {
43 group_id: topic.identifier().to_vec(),
44 id_cursor: cursor,
45 })
46 }
47
48 Ok(try_from_stream(
49 SubscribeGroupMessages::builder()
50 .filters(filters)
51 .build()?
52 .stream(&self.client)
53 .await?,
54 ))
55 }
56
57 async fn subscribe_group_messages_with_cursors(
58 &self,
59 topics: &TopicCursor,
60 ) -> Result<Self::GroupMessageStream, Self::Error> {
61 let mut filters = vec![];
62 for (group_id, cursor) in topics.groups() {
63 let id_cursor = cursor.max();
64 tracing::debug!(
65 "subscribing to group {} @ cursor {}",
66 hex::encode(group_id),
67 id_cursor
68 );
69 filters.push(GroupSubscribeFilter {
70 group_id: group_id.to_vec(),
71 id_cursor,
72 })
73 }
74
75 Ok(try_from_stream(
76 SubscribeGroupMessages::builder()
77 .filters(filters)
78 .build()?
79 .stream(&self.client)
80 .await?,
81 ))
82 }
83
84 async fn subscribe_welcome_messages(
85 &self,
86 installations: &[&InstallationId],
87 ) -> Result<Self::WelcomeMessageStream, Self::Error> {
88 let topics = installations
89 .iter()
90 .map(|id| TopicKind::WelcomeMessagesV1.create(id))
91 .collect::<Vec<_>>();
92 let cursors = self.cursor_store.latest_for_topics(&mut topics.iter())?;
93
94 let mut filters = vec![];
95 for topic in &topics {
96 let id_cursor = cursors.get(topic).cloned().unwrap_or_default().v3_welcome();
97 filters.push(WelcomeSubscribeFilter {
98 installation_key: topic.identifier().to_vec(),
99 id_cursor,
100 })
101 }
102
103 Ok(try_from_stream(
104 SubscribeWelcomeMessages::builder()
105 .filters(filters)
106 .build()?
107 .stream(&self.client)
108 .await?,
109 ))
110 }
111}