xmtp_api_grpc/grpc_client/native.rs
1use crate::error::GrpcBuilderError;
2use http::Request;
3use std::time::Duration;
4use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
5use tonic::{body::Body, client::GrpcService};
6use tower::Service;
7
8use std::task::{Context, Poll};
9
10#[derive(Clone, Debug)]
11pub struct NativeGrpcService {
12 inner: Channel,
13}
14
15impl NativeGrpcService {
16 pub fn new(
17 host: String,
18 limit: Option<u64>,
19 is_secure: bool,
20 ) -> Result<Self, GrpcBuilderError> {
21 let channel = match is_secure {
22 true => create_tls_channel(host, limit.unwrap_or(5000))?,
23 false => apply_channel_options(Channel::from_shared(host)?, limit.unwrap_or(5000))
24 .connect_lazy(),
25 };
26
27 Ok(Self { inner: channel })
28 }
29}
30
31impl Service<Request<Body>> for NativeGrpcService {
32 type Response = <Channel as Service<Request<Body>>>::Response;
33 type Error = <Channel as GrpcService<Body>>::Error;
34 type Future = <Channel as GrpcService<Body>>::Future;
35
36 fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
37 <Channel as Service<Request<Body>>>::poll_ready(&mut self.inner, ctx)
38 }
39
40 fn call(&mut self, request: Request<Body>) -> Self::Future {
41 <Channel as Service<Request<Body>>>::call(&mut self.inner, request)
42 }
43}
44
45pub(crate) fn apply_channel_options(endpoint: Endpoint, limit: u64) -> Endpoint {
46 endpoint
47 // Purpose: This setting controls the size of the initial connection-level flow control window for HTTP/2, which is the underlying protocol for gRPC.
48 // Functionality: Flow control in HTTP/2 manages how much data can be in flight on the network. Setting the initial connection window size to (1 << 31) - 1 (the maximum possible value for a 32-bit integer, which is 2,147,483,647 bytes) essentially allows the client to receive a very large amount of data from the server before needing to acknowledge receipt and permit more data to be sent. This can be particularly useful in high-latency networks or when transferring large amounts of data.
49 // Impact: Increasing the window size can improve throughput by allowing more data to be in transit at a time, but it may also increase memory usage and can potentially lead to inefficient use of bandwidth if the network is unreliable.
50 .initial_connection_window_size(Some((1 << 31) - 1))
51 // Purpose: Configures whether the client should send keep-alive pings to the server when the connection is idle.
52 // Functionality: When set to true, this option ensures that periodic pings are sent on an idle connection to keep it alive and detect if the server is still responsive.
53 // Impact: This helps maintain active connections, particularly through NATs, load balancers, and other middleboxes that might drop idle connections. It helps ensure that the connection is promptly usable when new requests need to be sent.
54 .keep_alive_while_idle(true)
55 // Purpose: Sets the maximum amount of time the client will wait for a connection to be established.
56 // Functionality: If a connection cannot be established within the specified duration, the attempt is aborted and an error is returned.
57 // Impact: This setting prevents the client from waiting indefinitely for a connection to be established, which is crucial in scenarios where rapid failure detection is necessary to maintain responsiveness or to quickly fallback to alternative services or retry logic.
58 .connect_timeout(Duration::from_secs(10))
59 // Purpose: Configures the TCP keep-alive interval for the socket connection.
60 // Functionality: This setting tells the operating system to send TCP keep-alive probes periodically when no data has been transferred over the connection within the specified interval.
61 // Impact: Similar to the gRPC-level keep-alive, this helps keep the connection alive at the TCP layer and detect broken connections. It's particularly useful for detecting half-open connections and ensuring that resources are not wasted on unresponsive peers.
62 .tcp_keepalive(Some(Duration::from_secs(16)))
63 // Purpose: Sets a maximum duration for the client to wait for a response to a request.
64 // Functionality: If a response is not received within the specified timeout, the request is canceled and an error is returned.
65 // Impact: This is critical for bounding the wait time for operations, which can enhance the predictability and reliability of client interactions by avoiding indefinitely hanging requests.
66 .timeout(Duration::from_secs(120))
67 // Purpose: Specifies how long the client will wait for a response to a keep-alive ping before considering the connection dead.
68 // Functionality: If a ping response is not received within this duration, the connection is presumed to be lost and is closed.
69 // Impact: This setting is crucial for quickly detecting unresponsive connections and freeing up resources associated with them. It ensures that the client has up-to-date information on the status of connections and can react accordingly.
70 .keep_alive_timeout(Duration::from_secs(10))
71 .http2_keep_alive_interval(Duration::from_secs(16))
72 .rate_limit(limit, Duration::from_secs(60))
73}
74
75#[tracing::instrument(level = "trace", skip_all)]
76pub fn create_tls_channel(address: String, limit: u64) -> Result<Channel, GrpcBuilderError> {
77 let channel = apply_channel_options(Channel::from_shared(address)?, limit)
78 .tls_config(ClientTlsConfig::new().with_enabled_roots())?
79 .connect_lazy();
80 Ok(channel)
81}