xmtp_api_d14n/queries/
boxed_streams.rs

1use crate::protocol::XmtpQuery;
2use std::pin::Pin;
3use xmtp_proto::api::HasStats;
4use xmtp_proto::api::IsConnectedCheck;
5use xmtp_proto::api_client::ApiStats;
6use xmtp_proto::api_client::IdentityStats;
7use xmtp_proto::api_client::XmtpMlsClient;
8use xmtp_proto::identity_v1;
9use xmtp_proto::mls_v1;
10use xmtp_proto::prelude::XmtpIdentityClient;
11use xmtp_proto::prelude::XmtpMlsStreams;
12use xmtp_proto::types::InstallationId;
13use xmtp_proto::types::TopicCursor;
14use xmtp_proto::types::WelcomeMessage;
15use xmtp_proto::types::{GroupId, GroupMessage};
16/// Wraps an ApiClient to allow turning
17/// a concretely-typed client into type-erased a [`BoxableXmtpApi`]
18/// allowing for the transformation into a type-erased Api Client
19#[derive(Clone)]
20pub struct BoxedStreamsClient<C> {
21    inner: C,
22}
23
24impl<C> BoxedStreamsClient<C> {
25    pub fn new(inner: C) -> Self {
26        Self { inner }
27    }
28}
29
30#[xmtp_common::async_trait]
31impl<C> XmtpMlsClient for BoxedStreamsClient<C>
32where
33    C: XmtpMlsClient,
34{
35    type Error = <C as XmtpMlsClient>::Error;
36
37    async fn upload_key_package(
38        &self,
39        request: mls_v1::UploadKeyPackageRequest,
40    ) -> Result<(), Self::Error> {
41        self.inner.upload_key_package(request).await
42    }
43
44    async fn fetch_key_packages(
45        &self,
46        request: mls_v1::FetchKeyPackagesRequest,
47    ) -> Result<mls_v1::FetchKeyPackagesResponse, Self::Error> {
48        self.inner.fetch_key_packages(request).await
49    }
50
51    async fn send_group_messages(
52        &self,
53        request: mls_v1::SendGroupMessagesRequest,
54    ) -> Result<(), Self::Error> {
55        self.inner.send_group_messages(request).await
56    }
57
58    async fn send_welcome_messages(
59        &self,
60        request: mls_v1::SendWelcomeMessagesRequest,
61    ) -> Result<(), Self::Error> {
62        self.inner.send_welcome_messages(request).await
63    }
64    async fn query_group_messages(
65        &self,
66        group_id: GroupId,
67    ) -> Result<Vec<GroupMessage>, Self::Error> {
68        self.inner.query_group_messages(group_id).await
69    }
70
71    async fn query_latest_group_message(
72        &self,
73        group_id: GroupId,
74    ) -> Result<Option<GroupMessage>, Self::Error> {
75        self.inner.query_latest_group_message(group_id).await
76    }
77
78    async fn query_welcome_messages(
79        &self,
80        installation_key: InstallationId,
81    ) -> Result<Vec<WelcomeMessage>, Self::Error> {
82        self.inner.query_welcome_messages(installation_key).await
83    }
84
85    async fn publish_commit_log(
86        &self,
87        request: mls_v1::BatchPublishCommitLogRequest,
88    ) -> Result<(), Self::Error> {
89        self.inner.publish_commit_log(request).await
90    }
91
92    async fn query_commit_log(
93        &self,
94        request: mls_v1::BatchQueryCommitLogRequest,
95    ) -> Result<mls_v1::BatchQueryCommitLogResponse, Self::Error> {
96        self.inner.query_commit_log(request).await
97    }
98
99    async fn get_newest_group_message(
100        &self,
101        request: mls_v1::GetNewestGroupMessageRequest,
102    ) -> Result<Vec<Option<xmtp_proto::types::GroupMessageMetadata>>, Self::Error> {
103        self.inner.get_newest_group_message(request).await
104    }
105}
106
107#[xmtp_common::async_trait]
108impl<C> XmtpIdentityClient for BoxedStreamsClient<C>
109where
110    C: XmtpIdentityClient,
111{
112    type Error = <C as XmtpIdentityClient>::Error;
113
114    async fn publish_identity_update(
115        &self,
116        request: identity_v1::PublishIdentityUpdateRequest,
117    ) -> Result<identity_v1::PublishIdentityUpdateResponse, Self::Error> {
118        self.inner.publish_identity_update(request).await
119    }
120
121    async fn get_identity_updates_v2(
122        &self,
123        request: identity_v1::GetIdentityUpdatesRequest,
124    ) -> Result<identity_v1::GetIdentityUpdatesResponse, Self::Error> {
125        self.inner.get_identity_updates_v2(request).await
126    }
127
128    async fn get_inbox_ids(
129        &self,
130        request: identity_v1::GetInboxIdsRequest,
131    ) -> Result<identity_v1::GetInboxIdsResponse, Self::Error> {
132        self.inner.get_inbox_ids(request).await
133    }
134
135    async fn verify_smart_contract_wallet_signatures(
136        &self,
137        request: identity_v1::VerifySmartContractWalletSignaturesRequest,
138    ) -> Result<identity_v1::VerifySmartContractWalletSignaturesResponse, Self::Error> {
139        self.inner
140            .verify_smart_contract_wallet_signatures(request)
141            .await
142    }
143}
144
145#[xmtp_common::async_trait]
146impl<C> XmtpMlsStreams for BoxedStreamsClient<C>
147where
148    C: XmtpMlsStreams,
149    C::GroupMessageStream: 'static,
150    C::WelcomeMessageStream: 'static,
151{
152    type GroupMessageStream = xmtp_proto::api_client::BoxedGroupS<Self::Error>;
153    type WelcomeMessageStream = xmtp_proto::api_client::BoxedWelcomeS<Self::Error>;
154    type Error = <C as XmtpMlsStreams>::Error;
155
156    async fn subscribe_group_messages(
157        &self,
158        group_ids: &[&GroupId],
159    ) -> Result<Self::GroupMessageStream, Self::Error> {
160        let s = self.inner.subscribe_group_messages(group_ids).await?;
161        Ok(Box::pin(s) as Pin<Box<_>>)
162    }
163
164    async fn subscribe_group_messages_with_cursors(
165        &self,
166        groups_with_cursors: &TopicCursor,
167    ) -> Result<Self::GroupMessageStream, Self::Error> {
168        let s = self
169            .inner
170            .subscribe_group_messages_with_cursors(groups_with_cursors)
171            .await?;
172        Ok(Box::pin(s) as Pin<Box<_>>)
173    }
174
175    async fn subscribe_welcome_messages(
176        &self,
177        installations: &[&InstallationId],
178    ) -> Result<Self::WelcomeMessageStream, Self::Error> {
179        let s = self.inner.subscribe_welcome_messages(installations).await?;
180        Ok(Box::pin(s) as Pin<Box<_>>)
181    }
182}
183
184impl<C> HasStats for BoxedStreamsClient<C>
185where
186    C: HasStats,
187{
188    fn aggregate_stats(&self) -> xmtp_proto::api_client::AggregateStats {
189        self.inner.aggregate_stats()
190    }
191
192    fn mls_stats(&self) -> ApiStats {
193        self.inner.mls_stats()
194    }
195
196    fn identity_stats(&self) -> IdentityStats {
197        self.inner.identity_stats()
198    }
199}
200
201#[xmtp_common::async_trait]
202impl<C> IsConnectedCheck for BoxedStreamsClient<C>
203where
204    C: IsConnectedCheck,
205{
206    async fn is_connected(&self) -> bool {
207        self.inner.is_connected().await
208    }
209}
210
211#[xmtp_common::async_trait]
212impl<C: XmtpQuery> XmtpQuery for BoxedStreamsClient<C> {
213    type Error = <C as XmtpQuery>::Error;
214
215    async fn query_at(
216        &self,
217        topic: xmtp_proto::types::Topic,
218        at: Option<xmtp_proto::types::GlobalCursor>,
219    ) -> Result<crate::protocol::XmtpEnvelope, Self::Error> {
220        <C as XmtpQuery>::query_at(&self.inner, topic, at).await
221    }
222}