xmtp_api_d14n/queries/
client_bundle.rs

1use std::{error::Error, sync::Arc, time::Duration};
2
3use crate::{
4    AuthCallback, AuthHandle, MessageBackendBuilderError, MiddlewareBuilder, ReadWriteClient,
5    ReadonlyClient,
6};
7use derive_builder::Builder;
8use http::{request, uri::PathAndQuery};
9use prost::bytes::Bytes;
10use xmtp_api_grpc::{GrpcClient, error::GrpcError};
11use xmtp_common::{MaybeSend, MaybeSync};
12use xmtp_configuration::{MULTI_NODE_TIMEOUT_MS, PAYER_WRITE_FILTER};
13use xmtp_proto::{
14    api::{ApiClientError, ArcClient, Client, IsConnectedCheck, ToBoxedClient},
15    prelude::{ApiBuilder, NetConnectConfig},
16    types::AppVersion,
17};
18
19#[derive(Clone, Copy, Debug)]
20#[non_exhaustive]
21pub enum ClientKind {
22    D14n,
23    V3,
24    Hybrid,
25}
26impl std::fmt::Display for ClientKind {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        use ClientKind::*;
29        match self {
30            D14n => write!(f, "D14n"),
31            V3 => write!(f, "V3"),
32            Hybrid => write!(f, "Hybrid"),
33        }
34    }
35}
36
37pub struct ClientBundle<Err> {
38    client: ArcClient<Err>,
39    kind: ClientKind,
40}
41
42impl<Err> Clone for ClientBundle<Err> {
43    fn clone(&self) -> Self {
44        Self {
45            client: self.client.clone(),
46            kind: self.kind,
47        }
48    }
49}
50
51impl ClientBundle<()> {
52    pub fn builder() -> ClientBundleBuilder {
53        ClientBundleBuilder::default()
54    }
55}
56
57#[xmtp_common::async_trait]
58impl<Err> Client for ClientBundle<Err>
59where
60    Err: Error + MaybeSend + MaybeSync + 'static,
61{
62    type Error = Err;
63
64    type Stream = <ArcClient<Err> as Client>::Stream;
65
66    async fn request(
67        &self,
68        request: request::Builder,
69        path: PathAndQuery,
70        body: Bytes,
71    ) -> Result<http::Response<Bytes>, ApiClientError<Self::Error>> {
72        self.client.request(request, path, body).await
73    }
74
75    async fn stream(
76        &self,
77        request: request::Builder,
78        path: PathAndQuery,
79        body: Bytes,
80    ) -> Result<http::Response<Self::Stream>, ApiClientError<Self::Error>> {
81        self.client.stream(request, path, body).await
82    }
83
84    fn fake_stream(&self) -> http::Response<Self::Stream> {
85        self.client.fake_stream()
86    }
87}
88
89#[xmtp_common::async_trait]
90impl<Err> IsConnectedCheck for ClientBundle<Err> {
91    async fn is_connected(&self) -> bool {
92        self.client.is_connected().await
93    }
94}
95
96impl<Err> ClientBundle<Err> {
97    pub fn new(client: ArcClient<Err>, kind: ClientKind) -> Self {
98        Self { client, kind }
99    }
100
101    /// create a d14n client bundle
102    pub fn d14n(client: ArcClient<Err>) -> Self {
103        Self {
104            client,
105            kind: ClientKind::D14n,
106        }
107    }
108
109    /// Create a v3 client bundle
110    pub fn v3(client: ArcClient<Err>) -> Self {
111        Self {
112            client,
113            kind: ClientKind::V3,
114        }
115    }
116
117    /// Create a hybrid client
118    pub fn hybrid(client: ArcClient<Err>) -> Self {
119        Self {
120            client,
121            kind: ClientKind::Hybrid,
122        }
123    }
124
125    pub fn kind(&self) -> &ClientKind {
126        &self.kind
127    }
128}
129
130// we aren't using any of the generated build fns by derive_builder here
131// instead we are just using it to generate the setters on the impl for us.
132#[derive(Builder, Clone)]
133#[builder(public, name = "ClientBundleBuilder", build_fn(skip))]
134struct __ClientBundleBuilder {
135    #[builder(setter(into))]
136    app_version: AppVersion,
137    #[builder(setter(into))]
138    v3_host: String,
139    #[builder(setter(into))]
140    gateway_host: String,
141    #[builder(setter(into))]
142    auth_callback: Arc<dyn AuthCallback>,
143    #[builder(setter(into))]
144    auth_handle: AuthHandle,
145    is_secure: bool,
146    readonly: bool,
147}
148
149impl ClientBundleBuilder {
150    pub fn maybe_gateway_host<U: Into<String>>(&mut self, host: Option<U>) -> &mut Self {
151        self.gateway_host = host.map(Into::into);
152        self
153    }
154
155    pub fn maybe_auth_callback(&mut self, callback: Option<Arc<dyn AuthCallback>>) -> &mut Self {
156        self.auth_callback = callback;
157        self
158    }
159
160    pub fn maybe_auth_handle(&mut self, handle: Option<AuthHandle>) -> &mut Self {
161        self.auth_handle = handle;
162        self
163    }
164
165    pub fn build(&mut self) -> Result<ClientBundle<GrpcError>, MessageBackendBuilderError> {
166        let Self {
167            v3_host,
168            gateway_host,
169            app_version,
170            auth_callback,
171            auth_handle,
172            is_secure,
173            readonly,
174        } = self.clone();
175        let v3_host = v3_host.ok_or(MessageBackendBuilderError::MissingV3Host)?;
176        let is_secure = is_secure.unwrap_or_default();
177        let readonly = readonly.unwrap_or_default();
178
179        // implicitly use a d14n client
180        if let Some(gateway) = gateway_host {
181            let mut gateway_client_builder = GrpcClient::builder();
182            gateway_client_builder.set_host(gateway.to_string());
183            gateway_client_builder.set_tls(is_secure);
184
185            if let Some(version) = app_version {
186                gateway_client_builder.set_app_version(version)?;
187            }
188
189            let mut multi_node = crate::middleware::MultiNodeClientBuilder::default();
190            multi_node.set_timeout(Duration::from_millis(MULTI_NODE_TIMEOUT_MS))?;
191            multi_node.set_gateway_builder(gateway_client_builder.clone())?;
192            let mut template = GrpcClient::builder();
193            template.set_tls(is_secure);
194            multi_node.set_node_client_builder(template)?;
195
196            let gateway_client = gateway_client_builder.build()?;
197            let multi_node = multi_node.build()?;
198
199            let client = if auth_callback.is_some() || auth_handle.is_some() {
200                let auth = crate::AuthMiddleware::new(gateway_client, auth_callback, auth_handle);
201                let client = ReadWriteClient::builder()
202                    .read(multi_node)
203                    .write(auth)
204                    .filter(PAYER_WRITE_FILTER)
205                    .build()?;
206                if readonly {
207                    ReadonlyClient::builder().inner(client).build()?.arced()
208                } else {
209                    client.arced()
210                }
211            } else {
212                let client = ReadWriteClient::builder()
213                    .read(multi_node)
214                    .write(gateway_client)
215                    .filter(PAYER_WRITE_FILTER)
216                    .build()?;
217                if readonly {
218                    ReadonlyClient::builder().inner(client).build()?.arced()
219                } else {
220                    client.arced()
221                }
222            };
223
224            Ok(ClientBundle::d14n(client))
225        } else {
226            let mut v3_client = GrpcClient::builder();
227            v3_client.set_host(v3_host.to_string());
228            v3_client.set_tls(is_secure);
229            if let Some(ref version) = app_version {
230                v3_client.set_app_version(version.clone())?;
231            }
232
233            let v3_client = v3_client.build()?;
234            let client = v3_client.arced();
235            Ok(ClientBundle::v3(client))
236        }
237    }
238}