xmtp_api_d14n/queries/
boxed_streams.rs1use crate::protocol::XmtpQuery;
2use std::pin::Pin;
3use xmtp_proto::api::HasStats;
4use xmtp_proto::api::IsConnectedCheck;
5use xmtp_proto::api_client::ApiStats;
6use xmtp_proto::api_client::IdentityStats;
7use xmtp_proto::api_client::XmtpMlsClient;
8use xmtp_proto::identity_v1;
9use xmtp_proto::mls_v1;
10use xmtp_proto::prelude::XmtpIdentityClient;
11use xmtp_proto::prelude::XmtpMlsStreams;
12use xmtp_proto::types::InstallationId;
13use xmtp_proto::types::TopicCursor;
14use xmtp_proto::types::WelcomeMessage;
15use xmtp_proto::types::{GroupId, GroupMessage};
16#[derive(Clone)]
20pub struct BoxedStreamsClient<C> {
21 inner: C,
22}
23
24impl<C> BoxedStreamsClient<C> {
25 pub fn new(inner: C) -> Self {
26 Self { inner }
27 }
28}
29
30#[xmtp_common::async_trait]
31impl<C> XmtpMlsClient for BoxedStreamsClient<C>
32where
33 C: XmtpMlsClient,
34{
35 type Error = <C as XmtpMlsClient>::Error;
36
37 async fn upload_key_package(
38 &self,
39 request: mls_v1::UploadKeyPackageRequest,
40 ) -> Result<(), Self::Error> {
41 self.inner.upload_key_package(request).await
42 }
43
44 async fn fetch_key_packages(
45 &self,
46 request: mls_v1::FetchKeyPackagesRequest,
47 ) -> Result<mls_v1::FetchKeyPackagesResponse, Self::Error> {
48 self.inner.fetch_key_packages(request).await
49 }
50
51 async fn send_group_messages(
52 &self,
53 request: mls_v1::SendGroupMessagesRequest,
54 ) -> Result<(), Self::Error> {
55 self.inner.send_group_messages(request).await
56 }
57
58 async fn send_welcome_messages(
59 &self,
60 request: mls_v1::SendWelcomeMessagesRequest,
61 ) -> Result<(), Self::Error> {
62 self.inner.send_welcome_messages(request).await
63 }
64 async fn query_group_messages(
65 &self,
66 group_id: GroupId,
67 ) -> Result<Vec<GroupMessage>, Self::Error> {
68 self.inner.query_group_messages(group_id).await
69 }
70
71 async fn query_latest_group_message(
72 &self,
73 group_id: GroupId,
74 ) -> Result<Option<GroupMessage>, Self::Error> {
75 self.inner.query_latest_group_message(group_id).await
76 }
77
78 async fn query_welcome_messages(
79 &self,
80 installation_key: InstallationId,
81 ) -> Result<Vec<WelcomeMessage>, Self::Error> {
82 self.inner.query_welcome_messages(installation_key).await
83 }
84
85 async fn publish_commit_log(
86 &self,
87 request: mls_v1::BatchPublishCommitLogRequest,
88 ) -> Result<(), Self::Error> {
89 self.inner.publish_commit_log(request).await
90 }
91
92 async fn query_commit_log(
93 &self,
94 request: mls_v1::BatchQueryCommitLogRequest,
95 ) -> Result<mls_v1::BatchQueryCommitLogResponse, Self::Error> {
96 self.inner.query_commit_log(request).await
97 }
98
99 async fn get_newest_group_message(
100 &self,
101 request: mls_v1::GetNewestGroupMessageRequest,
102 ) -> Result<Vec<Option<xmtp_proto::types::GroupMessageMetadata>>, Self::Error> {
103 self.inner.get_newest_group_message(request).await
104 }
105}
106
107#[xmtp_common::async_trait]
108impl<C> XmtpIdentityClient for BoxedStreamsClient<C>
109where
110 C: XmtpIdentityClient,
111{
112 type Error = <C as XmtpIdentityClient>::Error;
113
114 async fn publish_identity_update(
115 &self,
116 request: identity_v1::PublishIdentityUpdateRequest,
117 ) -> Result<identity_v1::PublishIdentityUpdateResponse, Self::Error> {
118 self.inner.publish_identity_update(request).await
119 }
120
121 async fn get_identity_updates_v2(
122 &self,
123 request: identity_v1::GetIdentityUpdatesRequest,
124 ) -> Result<identity_v1::GetIdentityUpdatesResponse, Self::Error> {
125 self.inner.get_identity_updates_v2(request).await
126 }
127
128 async fn get_inbox_ids(
129 &self,
130 request: identity_v1::GetInboxIdsRequest,
131 ) -> Result<identity_v1::GetInboxIdsResponse, Self::Error> {
132 self.inner.get_inbox_ids(request).await
133 }
134
135 async fn verify_smart_contract_wallet_signatures(
136 &self,
137 request: identity_v1::VerifySmartContractWalletSignaturesRequest,
138 ) -> Result<identity_v1::VerifySmartContractWalletSignaturesResponse, Self::Error> {
139 self.inner
140 .verify_smart_contract_wallet_signatures(request)
141 .await
142 }
143}
144
145#[xmtp_common::async_trait]
146impl<C> XmtpMlsStreams for BoxedStreamsClient<C>
147where
148 C: XmtpMlsStreams,
149 C::GroupMessageStream: 'static,
150 C::WelcomeMessageStream: 'static,
151{
152 type GroupMessageStream = xmtp_proto::api_client::BoxedGroupS<Self::Error>;
153 type WelcomeMessageStream = xmtp_proto::api_client::BoxedWelcomeS<Self::Error>;
154 type Error = <C as XmtpMlsStreams>::Error;
155
156 async fn subscribe_group_messages(
157 &self,
158 group_ids: &[&GroupId],
159 ) -> Result<Self::GroupMessageStream, Self::Error> {
160 let s = self.inner.subscribe_group_messages(group_ids).await?;
161 Ok(Box::pin(s) as Pin<Box<_>>)
162 }
163
164 async fn subscribe_group_messages_with_cursors(
165 &self,
166 groups_with_cursors: &TopicCursor,
167 ) -> Result<Self::GroupMessageStream, Self::Error> {
168 let s = self
169 .inner
170 .subscribe_group_messages_with_cursors(groups_with_cursors)
171 .await?;
172 Ok(Box::pin(s) as Pin<Box<_>>)
173 }
174
175 async fn subscribe_welcome_messages(
176 &self,
177 installations: &[&InstallationId],
178 ) -> Result<Self::WelcomeMessageStream, Self::Error> {
179 let s = self.inner.subscribe_welcome_messages(installations).await?;
180 Ok(Box::pin(s) as Pin<Box<_>>)
181 }
182}
183
184impl<C> HasStats for BoxedStreamsClient<C>
185where
186 C: HasStats,
187{
188 fn aggregate_stats(&self) -> xmtp_proto::api_client::AggregateStats {
189 self.inner.aggregate_stats()
190 }
191
192 fn mls_stats(&self) -> ApiStats {
193 self.inner.mls_stats()
194 }
195
196 fn identity_stats(&self) -> IdentityStats {
197 self.inner.identity_stats()
198 }
199}
200
201#[xmtp_common::async_trait]
202impl<C> IsConnectedCheck for BoxedStreamsClient<C>
203where
204 C: IsConnectedCheck,
205{
206 async fn is_connected(&self) -> bool {
207 self.inner.is_connected().await
208 }
209}
210
211#[xmtp_common::async_trait]
212impl<C: XmtpQuery> XmtpQuery for BoxedStreamsClient<C> {
213 type Error = <C as XmtpQuery>::Error;
214
215 async fn query_at(
216 &self,
217 topic: xmtp_proto::types::Topic,
218 at: Option<xmtp_proto::types::GlobalCursor>,
219 ) -> Result<crate::protocol::XmtpEnvelope, Self::Error> {
220 <C as XmtpQuery>::query_at(&self.inner, topic, at).await
221 }
222}