xmtp_api_grpc/grpc_client/
client.rs1use crate::{
7 error::{GrpcBuilderError, GrpcError},
8 streams::{EscapableTonicStream, FakeEmptyStream, NonBlockingWebStream},
9};
10use futures::Stream;
11use http::{StatusCode, request, uri::PathAndQuery};
12use http_body_util::StreamBody;
13use pin_project_lite::pin_project;
14use prost::bytes::Bytes;
15use std::{
16 pin::Pin,
17 task::{Context, Poll, ready},
18};
19use tonic::{
20 Response, Status, Streaming,
21 client::Grpc,
22 codec::Codec,
23 metadata::{self, MetadataMap, MetadataValue},
24};
25use xmtp_common::Retry;
26use xmtp_configuration::GRPC_PAYLOAD_LIMIT;
27use xmtp_proto::{
28 api::{ApiClientError, Client, IsConnectedCheck},
29 api_client::{ApiBuilder, NetConnectConfig},
30 codec::TransparentCodec,
31 types::AppVersion,
32};
33
34impl From<GrpcError> for ApiClientError<GrpcError> {
35 fn from(source: GrpcError) -> ApiClientError<GrpcError> {
36 ApiClientError::Client { source }
37 }
38}
39
40trait ToHttp {
42 type Body;
43 fn to_http(self) -> http::Response<Self::Body>;
44}
45
46impl<T> ToHttp for tonic::Response<T> {
48 type Body = T;
49
50 fn to_http(self) -> http::Response<Self::Body> {
51 let (metadata, body, extensions) = self.into_parts();
52 let mut response = http::Response::new(body);
53 if cfg!(target_arch = "wasm32") {
54 *response.version_mut() = http::version::Version::HTTP_11;
55 } else {
56 *response.version_mut() = http::version::Version::HTTP_2;
57 }
58 *response.headers_mut() = metadata.into_headers();
59 *response.extensions_mut() = extensions;
60 response
61 }
62}
63
64#[derive(Clone)]
65pub struct GrpcClient {
66 inner: tonic::client::Grpc<crate::GrpcService>,
67 app_version: MetadataValue<metadata::Ascii>,
68 libxmtp_version: MetadataValue<metadata::Ascii>,
69}
70
71impl GrpcClient {
72 pub fn new(
73 service: crate::GrpcService,
74 app_version: MetadataValue<metadata::Ascii>,
75 libxmtp_version: MetadataValue<metadata::Ascii>,
76 ) -> Self {
77 Self {
78 inner: tonic::client::Grpc::new(service),
79 app_version,
80 libxmtp_version,
81 }
82 }
83
84 fn build_tonic_request(
86 &self,
87 request: http::request::Builder,
88 body: Bytes,
89 ) -> Result<tonic::Request<Bytes>, Status> {
90 let request = request
91 .body(body)
92 .map_err(|e| tonic::Status::from_error(Box::new(e)))?;
93 let (parts, body) = request.into_parts();
94 let mut tonic_request = tonic::Request::from_parts(
95 MetadataMap::from_headers(parts.headers),
96 parts.extensions,
97 body,
98 );
99 let metadata = tonic_request.metadata_mut();
100 metadata.append("x-app-version", self.app_version.clone());
102 metadata.append("x-libxmtp-version", self.libxmtp_version.clone());
103 Ok(tonic_request)
104 }
105
106 async fn wait_for_ready(&self, client: &mut Grpc<crate::GrpcService>) -> Result<(), Status> {
107 client.ready().await.map_err(|e| {
108 tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {e}"))
109 })?;
110 Ok(())
111 }
112}
113
114pin_project! {
115 pub struct GrpcStream {
117 #[pin] inner: crate::streams::NonBlocking
118 }
119}
120
121impl From<crate::streams::NonBlocking> for GrpcStream {
122 fn from(value: crate::streams::NonBlocking) -> GrpcStream {
123 GrpcStream { inner: value }
124 }
125}
126
127impl Stream for GrpcStream {
131 type Item = Result<Bytes, GrpcError>;
132
133 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
134 let this = self.project();
135 let item = ready!(this.inner.poll_next(cx));
136 Poll::Ready(item.map(|i| i.map_err(GrpcError::from)))
137 }
138}
139
140#[xmtp_common::async_trait]
141impl Client for GrpcClient {
142 type Error = GrpcError;
143 type Stream = GrpcStream;
144
145 async fn request(
146 &self,
147 request: http::request::Builder,
148 path: http::uri::PathAndQuery,
149 body: Bytes,
150 ) -> Result<http::Response<Bytes>, ApiClientError<Self::Error>> {
151 let client = &mut self.inner.clone();
152 self.wait_for_ready(client).await.map_err(GrpcError::from)?;
153 let request = self
154 .build_tonic_request(request, body)
155 .map_err(GrpcError::from)?;
156 let codec = TransparentCodec::default();
157 let response = client
158 .unary(request, path, codec)
159 .await
160 .map_err(GrpcError::from)?;
161
162 Ok(response.to_http())
163 }
164
165 async fn stream(
166 &self,
167 request: request::Builder,
168 path: PathAndQuery,
169 body: Bytes,
170 ) -> Result<http::Response<Self::Stream>, ApiClientError<Self::Error>> {
171 let this = self.clone();
172 let response = async move {
174 let mut client = this.inner.clone();
175 this.wait_for_ready(&mut client).await?;
176 let request = this.build_tonic_request(request, body)?;
177 let codec = TransparentCodec::default();
178 client.server_streaming(request, path, codec).await
179 };
180 let req = crate::streams::NonBlockingStreamRequest::new(
181 Box::pin(response) as crate::streams::ResponseFuture
182 );
183 let response = crate::streams::send(req).await.map_err(GrpcError::from)?;
184 let response = response.map(|body| GrpcStream {
185 inner: EscapableTonicStream::new(body),
186 });
187 Ok(response.to_http().map(Into::into))
188 }
189
190 fn fake_stream(&self) -> http::Response<Self::Stream> {
192 let mut codec = TransparentCodec::default();
193 let s = StreamBody::new(FakeEmptyStream::<Status>::new());
194 let response = Streaming::new_response(codec.decoder(), s, StatusCode::OK, None, None);
195 let response = Response::new(response);
196 let response = response.map(|body| GrpcStream {
197 inner: EscapableTonicStream::new(NonBlockingWebStream::started(body)),
198 });
199 response.to_http()
200 }
201}
202
203#[xmtp_common::async_trait]
204impl IsConnectedCheck for GrpcClient {
205 async fn is_connected(&self) -> bool {
206 self.inner.clone().ready().await.is_ok()
207 }
208}
209
210impl GrpcClient {
211 pub fn builder() -> ClientBuilder {
212 ClientBuilder::default()
213 }
214}
215
216#[derive(Default, Clone)]
217pub struct ClientBuilder {
218 pub host: Option<String>,
219 pub app_version: Option<MetadataValue<metadata::Ascii>>,
221 pub libxmtp_version: Option<MetadataValue<metadata::Ascii>>,
223 pub tls_channel: bool,
225 pub limit: Option<u64>,
227 pub retry: Option<Retry>,
229}
230
231impl NetConnectConfig for ClientBuilder {
232 fn set_libxmtp_version(&mut self, version: String) -> Result<(), Self::Error> {
233 self.libxmtp_version = Some(MetadataValue::try_from(&version)?);
234 Ok(())
235 }
236
237 fn set_app_version(&mut self, version: AppVersion) -> Result<(), Self::Error> {
238 self.app_version = Some(MetadataValue::try_from(&version)?);
239 Ok(())
240 }
241
242 fn set_host(&mut self, host: String) {
243 self.host = Some(host);
244 }
245
246 fn set_tls(&mut self, tls: bool) {
247 self.tls_channel = tls;
248 }
249
250 fn rate_per_minute(&mut self, limit: u32) {
251 self.limit = Some(limit.into());
252 }
253
254 fn port(&self) -> Result<Option<String>, Self::Error> {
255 if let Some(h) = &self.host {
256 let u = url::Url::parse(h)?;
257 Ok(u.port().map(|u| u.to_string()))
258 } else {
259 Err(GrpcBuilderError::MissingHostUrl)
260 }
261 }
262
263 fn host(&self) -> Option<&str> {
264 self.host.as_deref()
265 }
266
267 fn set_retry(&mut self, retry: xmtp_common::Retry) {
268 self.retry = Some(retry);
269 }
270}
271
272impl ApiBuilder for ClientBuilder {
273 type Output = crate::GrpcClient;
274 type Error = GrpcBuilderError;
275
276 fn build(self) -> Result<Self::Output, Self::Error> {
277 let host = self.host.ok_or(GrpcBuilderError::MissingHostUrl)?;
278 let channel = crate::GrpcService::new(host, self.limit, self.tls_channel)?;
279 Ok(GrpcClient {
280 inner: tonic::client::Grpc::new(channel)
281 .max_decoding_message_size(GRPC_PAYLOAD_LIMIT)
282 .max_encoding_message_size(GRPC_PAYLOAD_LIMIT),
283 app_version: self
284 .app_version
285 .unwrap_or(MetadataValue::try_from("0.0.0")?),
286 libxmtp_version: self.libxmtp_version.unwrap_or(MetadataValue::try_from(
287 env!("CARGO_PKG_VERSION").to_string(),
288 )?),
289 })
290 }
291}
292
293impl GrpcClient {
294 pub fn create(host: &str, is_secure: bool) -> Result<Self, GrpcBuilderError> {
295 let mut builder = Self::builder();
296 builder.set_host(host.to_string());
297 builder.set_tls(is_secure);
298 builder.build()
299 }
300
301 pub fn create_with_version(
303 host: &str,
304 is_secure: bool,
305 app_version: AppVersion,
306 ) -> Result<Self, GrpcBuilderError> {
307 let mut builder = Self::builder();
308 builder.set_host(host.to_string());
309 builder.set_tls(is_secure);
310 builder.set_app_version(app_version)?;
311 builder.build()
312 }
313}
314
315#[cfg(test)]
316pub mod tests {
317 use crate::grpc_client::test::DevNodeGoClient;
318 use prost::Message;
319 use xmtp_proto::api_client::ApiBuilder;
320 use xmtp_proto::prelude::{NetConnectConfig, XmtpTestClient};
321 use xmtp_proto::types::AppVersion;
322 use xmtp_proto::xmtp::message_api::v1::PublishRequest;
323
324 #[xmtp_common::test]
325 async fn metadata_test() {
326 let mut client = DevNodeGoClient::create();
327 let app_version = AppVersion::from("test/1.0.0");
328 let libxmtp_version = "0.0.1".to_string();
329 client.set_app_version(app_version.clone()).unwrap();
330 client.set_libxmtp_version(libxmtp_version.clone()).unwrap();
331 let client = client.build().unwrap();
332 let request = client
333 .build_tonic_request(
334 Default::default(),
335 PublishRequest { envelopes: vec![] }.encode_to_vec().into(),
336 )
337 .unwrap();
338
339 assert_eq!(
340 request
341 .metadata()
342 .get("x-app-version")
343 .unwrap()
344 .to_str()
345 .unwrap()
346 .to_string(),
347 app_version
348 );
349 assert_eq!(
350 request
351 .metadata()
352 .get("x-libxmtp-version")
353 .unwrap()
354 .to_str()
355 .unwrap()
356 .to_string(),
357 libxmtp_version
358 );
359 }
360}