xmtp_proto/traits/
boxed_client.rs

1/// a boxed version of [`Client`]
2pub type BoxClient<Err> = Box<dyn BoxClientT<Err>>;
3
4/// a type-erased version of [`Client`] in an [`Arc`](std::sync::Arc)
5pub type ArcClient<Err> = Arc<dyn BoxClientT<Err>>;
6
7use bytes::Bytes;
8use futures::Stream;
9use http::{request, uri::PathAndQuery};
10use std::{pin::Pin, sync::Arc};
11use xmtp_common::{MaybeSend, MaybeSync};
12
13use crate::api::{ApiClientError, IsConnectedCheck};
14
15use super::Client;
16
17struct BoxedClient<C: ?Sized> {
18    inner: C,
19}
20
21impl<C> BoxedClient<C> {
22    pub fn new(client: C) -> Self {
23        Self { inner: client }
24    }
25}
26
27xmtp_common::if_native! {
28    type BoxedStreamT<Err> = Pin<Box<dyn Stream<Item = Result<Bytes, Err>> + Send>>;
29}
30
31xmtp_common::if_wasm! {
32    type BoxedStreamT<Err> = Pin<Box<dyn Stream<Item = Result<Bytes, Err>>>>;
33}
34
35pub trait BoxClientT<Err>:
36    Client<Error = Err, Stream = BoxedStreamT<Err>> + IsConnectedCheck
37{
38}
39
40impl<T, Err> BoxClientT<Err> for T where
41    T: ?Sized + IsConnectedCheck + Client<Error = Err, Stream = BoxedStreamT<Err>>
42{
43}
44
45#[xmtp_common::async_trait]
46impl<C> Client for BoxedClient<C>
47where
48    C: Client,
49    C::Stream: 'static,
50{
51    type Stream = BoxedStreamT<Self::Error>;
52    type Error = C::Error;
53
54    async fn request(
55        &self,
56        request: request::Builder,
57        path: PathAndQuery,
58        body: Bytes,
59    ) -> Result<http::Response<Bytes>, ApiClientError<Self::Error>> {
60        self.inner.request(request, path, body).await
61    }
62
63    async fn stream(
64        &self,
65        request: request::Builder,
66        path: http::uri::PathAndQuery,
67        body: Bytes,
68    ) -> Result<http::Response<Self::Stream>, ApiClientError<Self::Error>> {
69        let s = self.inner.stream(request, path, body).await?;
70        let s = s.map(|s| Box::pin(s) as Pin<Box<_>>);
71        Ok(s)
72    }
73
74    fn fake_stream(&self) -> http::Response<Self::Stream> {
75        let s = self.inner.fake_stream();
76        s.map(|s| Box::pin(s) as Pin<Box<_>>)
77    }
78}
79
80pub trait ToBoxedClient {
81    type Error: MaybeSend + MaybeSync;
82    fn boxed(self) -> BoxClient<Self::Error>;
83    fn arced(self) -> ArcClient<Self::Error>;
84}
85
86impl<C> ToBoxedClient for C
87where
88    C: Client + IsConnectedCheck + 'static,
89    C::Stream: 'static,
90{
91    type Error = C::Error;
92    fn boxed(self) -> BoxClient<Self::Error> {
93        Box::new(BoxedClient::new(self))
94    }
95    fn arced(self) -> ArcClient<Self::Error> {
96        Arc::new(BoxedClient::new(self))
97    }
98}
99
100#[xmtp_common::async_trait]
101impl<T> IsConnectedCheck for BoxedClient<T>
102where
103    T: ?Sized + IsConnectedCheck,
104{
105    /// Check if a client is connected
106    async fn is_connected(&self) -> bool {
107        self.inner.is_connected().await
108    }
109}