xmtp_api_d14n/queries/
client_bundle.rs1use std::{error::Error, sync::Arc, time::Duration};
2
3use crate::{
4 AuthCallback, AuthHandle, MessageBackendBuilderError, MiddlewareBuilder, ReadWriteClient,
5 ReadonlyClient,
6};
7use derive_builder::Builder;
8use http::{request, uri::PathAndQuery};
9use prost::bytes::Bytes;
10use xmtp_api_grpc::{GrpcClient, error::GrpcError};
11use xmtp_common::{MaybeSend, MaybeSync};
12use xmtp_configuration::{MULTI_NODE_TIMEOUT_MS, PAYER_WRITE_FILTER};
13use xmtp_proto::{
14 api::{ApiClientError, ArcClient, Client, IsConnectedCheck, ToBoxedClient},
15 prelude::{ApiBuilder, NetConnectConfig},
16 types::AppVersion,
17};
18
19#[derive(Clone, Copy, Debug)]
20#[non_exhaustive]
21pub enum ClientKind {
22 D14n,
23 V3,
24 Hybrid,
25}
26impl std::fmt::Display for ClientKind {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 use ClientKind::*;
29 match self {
30 D14n => write!(f, "D14n"),
31 V3 => write!(f, "V3"),
32 Hybrid => write!(f, "Hybrid"),
33 }
34 }
35}
36
37pub struct ClientBundle<Err> {
38 client: ArcClient<Err>,
39 kind: ClientKind,
40}
41
42impl<Err> Clone for ClientBundle<Err> {
43 fn clone(&self) -> Self {
44 Self {
45 client: self.client.clone(),
46 kind: self.kind,
47 }
48 }
49}
50
51impl ClientBundle<()> {
52 pub fn builder() -> ClientBundleBuilder {
53 ClientBundleBuilder::default()
54 }
55}
56
57#[xmtp_common::async_trait]
58impl<Err> Client for ClientBundle<Err>
59where
60 Err: Error + MaybeSend + MaybeSync + 'static,
61{
62 type Error = Err;
63
64 type Stream = <ArcClient<Err> as Client>::Stream;
65
66 async fn request(
67 &self,
68 request: request::Builder,
69 path: PathAndQuery,
70 body: Bytes,
71 ) -> Result<http::Response<Bytes>, ApiClientError<Self::Error>> {
72 self.client.request(request, path, body).await
73 }
74
75 async fn stream(
76 &self,
77 request: request::Builder,
78 path: PathAndQuery,
79 body: Bytes,
80 ) -> Result<http::Response<Self::Stream>, ApiClientError<Self::Error>> {
81 self.client.stream(request, path, body).await
82 }
83
84 fn fake_stream(&self) -> http::Response<Self::Stream> {
85 self.client.fake_stream()
86 }
87}
88
89#[xmtp_common::async_trait]
90impl<Err> IsConnectedCheck for ClientBundle<Err> {
91 async fn is_connected(&self) -> bool {
92 self.client.is_connected().await
93 }
94}
95
96impl<Err> ClientBundle<Err> {
97 pub fn new(client: ArcClient<Err>, kind: ClientKind) -> Self {
98 Self { client, kind }
99 }
100
101 pub fn d14n(client: ArcClient<Err>) -> Self {
103 Self {
104 client,
105 kind: ClientKind::D14n,
106 }
107 }
108
109 pub fn v3(client: ArcClient<Err>) -> Self {
111 Self {
112 client,
113 kind: ClientKind::V3,
114 }
115 }
116
117 pub fn hybrid(client: ArcClient<Err>) -> Self {
119 Self {
120 client,
121 kind: ClientKind::Hybrid,
122 }
123 }
124
125 pub fn kind(&self) -> &ClientKind {
126 &self.kind
127 }
128}
129
130#[derive(Builder, Clone)]
133#[builder(public, name = "ClientBundleBuilder", build_fn(skip))]
134struct __ClientBundleBuilder {
135 #[builder(setter(into))]
136 app_version: AppVersion,
137 #[builder(setter(into))]
138 v3_host: String,
139 #[builder(setter(into))]
140 gateway_host: String,
141 #[builder(setter(into))]
142 auth_callback: Arc<dyn AuthCallback>,
143 #[builder(setter(into))]
144 auth_handle: AuthHandle,
145 is_secure: bool,
146 readonly: bool,
147}
148
149impl ClientBundleBuilder {
150 pub fn maybe_gateway_host<U: Into<String>>(&mut self, host: Option<U>) -> &mut Self {
151 self.gateway_host = host.map(Into::into);
152 self
153 }
154
155 pub fn maybe_auth_callback(&mut self, callback: Option<Arc<dyn AuthCallback>>) -> &mut Self {
156 self.auth_callback = callback;
157 self
158 }
159
160 pub fn maybe_auth_handle(&mut self, handle: Option<AuthHandle>) -> &mut Self {
161 self.auth_handle = handle;
162 self
163 }
164
165 pub fn build(&mut self) -> Result<ClientBundle<GrpcError>, MessageBackendBuilderError> {
166 let Self {
167 v3_host,
168 gateway_host,
169 app_version,
170 auth_callback,
171 auth_handle,
172 is_secure,
173 readonly,
174 } = self.clone();
175 let v3_host = v3_host.ok_or(MessageBackendBuilderError::MissingV3Host)?;
176 let is_secure = is_secure.unwrap_or_default();
177 let readonly = readonly.unwrap_or_default();
178
179 if let Some(gateway) = gateway_host {
181 let mut gateway_client_builder = GrpcClient::builder();
182 gateway_client_builder.set_host(gateway.to_string());
183 gateway_client_builder.set_tls(is_secure);
184
185 if let Some(version) = app_version {
186 gateway_client_builder.set_app_version(version)?;
187 }
188
189 let mut multi_node = crate::middleware::MultiNodeClientBuilder::default();
190 multi_node.set_timeout(Duration::from_millis(MULTI_NODE_TIMEOUT_MS))?;
191 multi_node.set_gateway_builder(gateway_client_builder.clone())?;
192 let mut template = GrpcClient::builder();
193 template.set_tls(is_secure);
194 multi_node.set_node_client_builder(template)?;
195
196 let gateway_client = gateway_client_builder.build()?;
197 let multi_node = multi_node.build()?;
198
199 let client = if auth_callback.is_some() || auth_handle.is_some() {
200 let auth = crate::AuthMiddleware::new(gateway_client, auth_callback, auth_handle);
201 let client = ReadWriteClient::builder()
202 .read(multi_node)
203 .write(auth)
204 .filter(PAYER_WRITE_FILTER)
205 .build()?;
206 if readonly {
207 ReadonlyClient::builder().inner(client).build()?.arced()
208 } else {
209 client.arced()
210 }
211 } else {
212 let client = ReadWriteClient::builder()
213 .read(multi_node)
214 .write(gateway_client)
215 .filter(PAYER_WRITE_FILTER)
216 .build()?;
217 if readonly {
218 ReadonlyClient::builder().inner(client).build()?.arced()
219 } else {
220 client.arced()
221 }
222 };
223
224 Ok(ClientBundle::d14n(client))
225 } else {
226 let mut v3_client = GrpcClient::builder();
227 v3_client.set_host(v3_host.to_string());
228 v3_client.set_tls(is_secure);
229 if let Some(ref version) = app_version {
230 v3_client.set_app_version(version.clone())?;
231 }
232
233 let v3_client = v3_client.build()?;
234 let client = v3_client.arced();
235 Ok(ClientBundle::v3(client))
236 }
237 }
238}