xmtp_api_grpc/grpc_client/
client.rs

1//! The generic gRPC Client
2//! Generic over a inner "Channel".
3//! The  inner channel must implement a tower service to implicitly
4//! implement the gRPC Service
5
6use crate::{
7    error::{GrpcBuilderError, GrpcError},
8    streams::{EscapableTonicStream, FakeEmptyStream, NonBlockingWebStream},
9};
10use futures::Stream;
11use http::{StatusCode, request, uri::PathAndQuery};
12use http_body_util::StreamBody;
13use pin_project_lite::pin_project;
14use prost::bytes::Bytes;
15use std::{
16    pin::Pin,
17    task::{Context, Poll, ready},
18};
19use tonic::{
20    Response, Status, Streaming,
21    client::Grpc,
22    codec::Codec,
23    metadata::{self, MetadataMap, MetadataValue},
24};
25use xmtp_common::Retry;
26use xmtp_configuration::GRPC_PAYLOAD_LIMIT;
27use xmtp_proto::{
28    api::{ApiClientError, Client, IsConnectedCheck},
29    api_client::{ApiBuilder, NetConnectConfig},
30    codec::TransparentCodec,
31    types::AppVersion,
32};
33
34impl From<GrpcError> for ApiClientError<GrpcError> {
35    fn from(source: GrpcError) -> ApiClientError<GrpcError> {
36        ApiClientError::Client { source }
37    }
38}
39
40/// Private trait to convert type to an HTTP Response
41trait ToHttp {
42    type Body;
43    fn to_http(self) -> http::Response<Self::Body>;
44}
45
46/// Convert a tonic Response to a generic HTTP response
47impl<T> ToHttp for tonic::Response<T> {
48    type Body = T;
49
50    fn to_http(self) -> http::Response<Self::Body> {
51        let (metadata, body, extensions) = self.into_parts();
52        let mut response = http::Response::new(body);
53        if cfg!(target_arch = "wasm32") {
54            *response.version_mut() = http::version::Version::HTTP_11;
55        } else {
56            *response.version_mut() = http::version::Version::HTTP_2;
57        }
58        *response.headers_mut() = metadata.into_headers();
59        *response.extensions_mut() = extensions;
60        response
61    }
62}
63
64#[derive(Clone)]
65pub struct GrpcClient {
66    inner: tonic::client::Grpc<crate::GrpcService>,
67    app_version: MetadataValue<metadata::Ascii>,
68    libxmtp_version: MetadataValue<metadata::Ascii>,
69}
70
71impl GrpcClient {
72    pub fn new(
73        service: crate::GrpcService,
74        app_version: MetadataValue<metadata::Ascii>,
75        libxmtp_version: MetadataValue<metadata::Ascii>,
76    ) -> Self {
77        Self {
78            inner: tonic::client::Grpc::new(service),
79            app_version,
80            libxmtp_version,
81        }
82    }
83
84    /// Builds a tonic request from a body and a generic HTTP Request
85    fn build_tonic_request(
86        &self,
87        request: http::request::Builder,
88        body: Bytes,
89    ) -> Result<tonic::Request<Bytes>, Status> {
90        let request = request
91            .body(body)
92            .map_err(|e| tonic::Status::from_error(Box::new(e)))?;
93        let (parts, body) = request.into_parts();
94        let mut tonic_request = tonic::Request::from_parts(
95            MetadataMap::from_headers(parts.headers),
96            parts.extensions,
97            body,
98        );
99        let metadata = tonic_request.metadata_mut();
100        // must be lowercase otherwise panics
101        metadata.append("x-app-version", self.app_version.clone());
102        metadata.append("x-libxmtp-version", self.libxmtp_version.clone());
103        Ok(tonic_request)
104    }
105
106    async fn wait_for_ready(&self, client: &mut Grpc<crate::GrpcService>) -> Result<(), Status> {
107        client.ready().await.map_err(|e| {
108            tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {e}"))
109        })?;
110        Ok(())
111    }
112}
113
114pin_project! {
115    /// A stream of bytes from a GRPC Network Source
116    pub struct GrpcStream {
117        #[pin] inner: crate::streams::NonBlocking
118    }
119}
120
121impl From<crate::streams::NonBlocking> for GrpcStream {
122    fn from(value: crate::streams::NonBlocking) -> GrpcStream {
123        GrpcStream { inner: value }
124    }
125}
126
127// just a more convenient way to map the stream type to
128// something more customized to the trait, without playing around with getting the
129// generics right on nested futures combinators.
130impl Stream for GrpcStream {
131    type Item = Result<Bytes, GrpcError>;
132
133    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
134        let this = self.project();
135        let item = ready!(this.inner.poll_next(cx));
136        Poll::Ready(item.map(|i| i.map_err(GrpcError::from)))
137    }
138}
139
140#[xmtp_common::async_trait]
141impl Client for GrpcClient {
142    type Error = GrpcError;
143    type Stream = GrpcStream;
144
145    async fn request(
146        &self,
147        request: http::request::Builder,
148        path: http::uri::PathAndQuery,
149        body: Bytes,
150    ) -> Result<http::Response<Bytes>, ApiClientError<Self::Error>> {
151        let client = &mut self.inner.clone();
152        self.wait_for_ready(client).await.map_err(GrpcError::from)?;
153        let request = self
154            .build_tonic_request(request, body)
155            .map_err(GrpcError::from)?;
156        let codec = TransparentCodec::default();
157        let response = client
158            .unary(request, path, codec)
159            .await
160            .map_err(GrpcError::from)?;
161
162        Ok(response.to_http())
163    }
164
165    async fn stream(
166        &self,
167        request: request::Builder,
168        path: PathAndQuery,
169        body: Bytes,
170    ) -> Result<http::Response<Self::Stream>, ApiClientError<Self::Error>> {
171        let this = self.clone();
172        // client requires to be moved so it lives long enough for streaming response future.
173        let response = async move {
174            let mut client = this.inner.clone();
175            this.wait_for_ready(&mut client).await?;
176            let request = this.build_tonic_request(request, body)?;
177            let codec = TransparentCodec::default();
178            client.server_streaming(request, path, codec).await
179        };
180        let req = crate::streams::NonBlockingStreamRequest::new(
181            Box::pin(response) as crate::streams::ResponseFuture
182        );
183        let response = crate::streams::send(req).await.map_err(GrpcError::from)?;
184        let response = response.map(|body| GrpcStream {
185            inner: EscapableTonicStream::new(body),
186        });
187        Ok(response.to_http().map(Into::into))
188    }
189
190    // just need to ensure the type is the same as `stream`
191    fn fake_stream(&self) -> http::Response<Self::Stream> {
192        let mut codec = TransparentCodec::default();
193        let s = StreamBody::new(FakeEmptyStream::<Status>::new());
194        let response = Streaming::new_response(codec.decoder(), s, StatusCode::OK, None, None);
195        let response = Response::new(response);
196        let response = response.map(|body| GrpcStream {
197            inner: EscapableTonicStream::new(NonBlockingWebStream::started(body)),
198        });
199        response.to_http()
200    }
201}
202
203#[xmtp_common::async_trait]
204impl IsConnectedCheck for GrpcClient {
205    async fn is_connected(&self) -> bool {
206        self.inner.clone().ready().await.is_ok()
207    }
208}
209
210impl GrpcClient {
211    pub fn builder() -> ClientBuilder {
212        ClientBuilder::default()
213    }
214}
215
216#[derive(Default, Clone)]
217pub struct ClientBuilder {
218    pub host: Option<String>,
219    /// version of the app
220    pub app_version: Option<MetadataValue<metadata::Ascii>>,
221    /// Version of the libxmtp core library
222    pub libxmtp_version: Option<MetadataValue<metadata::Ascii>>,
223    /// Whether or not the channel should use TLS
224    pub tls_channel: bool,
225    /// Rate per minute
226    pub limit: Option<u64>,
227    /// retry strategy for this client
228    pub retry: Option<Retry>,
229}
230
231impl NetConnectConfig for ClientBuilder {
232    fn set_libxmtp_version(&mut self, version: String) -> Result<(), Self::Error> {
233        self.libxmtp_version = Some(MetadataValue::try_from(&version)?);
234        Ok(())
235    }
236
237    fn set_app_version(&mut self, version: AppVersion) -> Result<(), Self::Error> {
238        self.app_version = Some(MetadataValue::try_from(&version)?);
239        Ok(())
240    }
241
242    fn set_host(&mut self, host: String) {
243        self.host = Some(host);
244    }
245
246    fn set_tls(&mut self, tls: bool) {
247        self.tls_channel = tls;
248    }
249
250    fn rate_per_minute(&mut self, limit: u32) {
251        self.limit = Some(limit.into());
252    }
253
254    fn port(&self) -> Result<Option<String>, Self::Error> {
255        if let Some(h) = &self.host {
256            let u = url::Url::parse(h)?;
257            Ok(u.port().map(|u| u.to_string()))
258        } else {
259            Err(GrpcBuilderError::MissingHostUrl)
260        }
261    }
262
263    fn host(&self) -> Option<&str> {
264        self.host.as_deref()
265    }
266
267    fn set_retry(&mut self, retry: xmtp_common::Retry) {
268        self.retry = Some(retry);
269    }
270}
271
272impl ApiBuilder for ClientBuilder {
273    type Output = crate::GrpcClient;
274    type Error = GrpcBuilderError;
275
276    fn build(self) -> Result<Self::Output, Self::Error> {
277        let host = self.host.ok_or(GrpcBuilderError::MissingHostUrl)?;
278        let channel = crate::GrpcService::new(host, self.limit, self.tls_channel)?;
279        Ok(GrpcClient {
280            inner: tonic::client::Grpc::new(channel)
281                .max_decoding_message_size(GRPC_PAYLOAD_LIMIT)
282                .max_encoding_message_size(GRPC_PAYLOAD_LIMIT),
283            app_version: self
284                .app_version
285                .unwrap_or(MetadataValue::try_from("0.0.0")?),
286            libxmtp_version: self.libxmtp_version.unwrap_or(MetadataValue::try_from(
287                env!("CARGO_PKG_VERSION").to_string(),
288            )?),
289        })
290    }
291}
292
293impl GrpcClient {
294    pub fn create(host: &str, is_secure: bool) -> Result<Self, GrpcBuilderError> {
295        let mut builder = Self::builder();
296        builder.set_host(host.to_string());
297        builder.set_tls(is_secure);
298        builder.build()
299    }
300
301    /// Create a grpc client with `app_version` attached
302    pub fn create_with_version(
303        host: &str,
304        is_secure: bool,
305        app_version: AppVersion,
306    ) -> Result<Self, GrpcBuilderError> {
307        let mut builder = Self::builder();
308        builder.set_host(host.to_string());
309        builder.set_tls(is_secure);
310        builder.set_app_version(app_version)?;
311        builder.build()
312    }
313}
314
315#[cfg(test)]
316pub mod tests {
317    use crate::grpc_client::test::DevNodeGoClient;
318    use prost::Message;
319    use xmtp_proto::api_client::ApiBuilder;
320    use xmtp_proto::prelude::{NetConnectConfig, XmtpTestClient};
321    use xmtp_proto::types::AppVersion;
322    use xmtp_proto::xmtp::message_api::v1::PublishRequest;
323
324    #[xmtp_common::test]
325    async fn metadata_test() {
326        let mut client = DevNodeGoClient::create();
327        let app_version = AppVersion::from("test/1.0.0");
328        let libxmtp_version = "0.0.1".to_string();
329        client.set_app_version(app_version.clone()).unwrap();
330        client.set_libxmtp_version(libxmtp_version.clone()).unwrap();
331        let client = client.build().unwrap();
332        let request = client
333            .build_tonic_request(
334                Default::default(),
335                PublishRequest { envelopes: vec![] }.encode_to_vec().into(),
336            )
337            .unwrap();
338
339        assert_eq!(
340            request
341                .metadata()
342                .get("x-app-version")
343                .unwrap()
344                .to_str()
345                .unwrap()
346                .to_string(),
347            app_version
348        );
349        assert_eq!(
350            request
351                .metadata()
352                .get("x-libxmtp-version")
353                .unwrap()
354                .to_str()
355                .unwrap()
356                .to_string(),
357            libxmtp_version
358        );
359    }
360}