xmtp_proto/
traits.rs

1//! Api Client Traits
2
3use crate::{
4    api::{RetryQuery, V3Paged, XmtpStream, combinators::Ignore},
5    api_client::{AggregateStats, ApiStats, IdentityStats},
6};
7use http::{request, uri::PathAndQuery};
8use prost::bytes::Bytes;
9use std::{borrow::Cow, sync::Arc};
10use xmtp_common::{MaybeSend, MaybeSync, Retry};
11
12xmtp_common::if_test! {
13    pub mod mock;
14}
15
16mod boxed_client;
17pub(super) mod combinators;
18mod error;
19mod query;
20pub mod stream;
21mod vector_clock;
22pub use boxed_client::*;
23pub use error::*;
24pub use vector_clock::*;
25
26pub trait HasStats {
27    fn aggregate_stats(&self) -> AggregateStats;
28    fn mls_stats(&self) -> ApiStats;
29    fn identity_stats(&self) -> IdentityStats;
30}
31
32/// provides the necessary information for a backend API call.
33/// Indicates the Output type
34pub trait Endpoint<Specialized = ()>: MaybeSend + MaybeSync {
35    type Output: MaybeSend + MaybeSync;
36    fn grpc_endpoint(&self) -> Cow<'static, str>;
37
38    fn body(&self) -> Result<Bytes, BodyError>;
39}
40
41pub trait EndpointExt<S>: Endpoint<S> {
42    fn ignore_response(self) -> Ignore<Self>
43    where
44        Self: Sized + Endpoint<S>,
45    {
46        combinators::ignore(self)
47    }
48
49    fn v3_paged(self, cursor: Option<u64>) -> V3Paged<Self, <Self as Endpoint<S>>::Output>
50    where
51        Self: Sized + Endpoint<S>,
52    {
53        combinators::v3_paged(self, cursor)
54    }
55
56    fn retry(self) -> RetryQuery<Self>
57    where
58        Self: Sized + Endpoint<S>,
59    {
60        combinators::retry(self)
61    }
62
63    fn retry_with_strategy<St>(self, strategy: Retry<St>) -> RetryQuery<Self, St>
64    where
65        Self: Sized + Endpoint<S>,
66    {
67        combinators::retry_with_strategy(self, strategy)
68    }
69}
70
71impl<S, E> EndpointExt<S> for E where E: Endpoint<S> {}
72
73/// Trait indicating an [`Endpoint`] can be paged
74/// paging will return a limited number of results
75/// per request. a cursor is present indicating
76/// the position in the total list of results
77/// on the backend.
78pub trait Pageable {
79    /// set the cursor for this pageable endpoint
80    fn set_cursor(&mut self, cursor: u64);
81}
82
83/// A client represents how a request body is formed and sent into
84/// a backend. The client is protocol agnostic, a Client may
85/// communicate with a backend over gRPC, JSON-RPC, HTTP-REST, etc.
86/// `http::Response`'s are used in order to maintain a
87/// common data format compatible with a wide variety of backends.
88/// an http response is easily derived from a grpc, jsonrpc or rest api.
89#[xmtp_common::async_trait]
90pub trait Client: MaybeSend + MaybeSync {
91    type Error: std::error::Error + MaybeSend + MaybeSync + 'static;
92
93    type Stream: futures::Stream<Item = Result<Bytes, Self::Error>> + MaybeSend;
94
95    async fn request(
96        &self,
97        request: request::Builder,
98        path: PathAndQuery,
99        body: Bytes,
100    ) -> Result<http::Response<Bytes>, ApiClientError<Self::Error>>;
101
102    async fn stream(
103        &self,
104        request: request::Builder,
105        path: http::uri::PathAndQuery,
106        body: Bytes,
107    ) -> Result<http::Response<Self::Stream>, ApiClientError<Self::Error>>;
108
109    /// start a "fake" stream that does not create a TCP connection and will always be pending
110    fn fake_stream(&self) -> http::Response<Self::Stream>;
111}
112
113#[xmtp_common::async_trait]
114impl<T: MaybeSend + MaybeSync + ?Sized> Client for &T
115where
116    T: Client,
117{
118    type Error = T::Error;
119
120    type Stream = T::Stream;
121
122    async fn request(
123        &self,
124        request: request::Builder,
125        path: PathAndQuery,
126        body: Bytes,
127    ) -> Result<http::Response<Bytes>, ApiClientError<Self::Error>> {
128        (**self).request(request, path, body).await
129    }
130
131    async fn stream(
132        &self,
133        request: request::Builder,
134        path: http::uri::PathAndQuery,
135        body: Bytes,
136    ) -> Result<http::Response<Self::Stream>, ApiClientError<Self::Error>> {
137        (**self).stream(request, path, body).await
138    }
139
140    fn fake_stream(&self) -> http::Response<Self::Stream> {
141        (**self).fake_stream()
142    }
143}
144
145#[xmtp_common::async_trait]
146impl<T: MaybeSend + MaybeSync + ?Sized> Client for Box<T>
147where
148    T: Client,
149{
150    type Error = T::Error;
151
152    type Stream = T::Stream;
153
154    async fn request(
155        &self,
156        request: request::Builder,
157        path: PathAndQuery,
158        body: Bytes,
159    ) -> Result<http::Response<Bytes>, ApiClientError<Self::Error>> {
160        (**self).request(request, path, body).await
161    }
162
163    async fn stream(
164        &self,
165        request: request::Builder,
166        path: http::uri::PathAndQuery,
167        body: Bytes,
168    ) -> Result<http::Response<Self::Stream>, ApiClientError<Self::Error>> {
169        (**self).stream(request, path, body).await
170    }
171
172    fn fake_stream(&self) -> http::Response<Self::Stream> {
173        (**self).fake_stream()
174    }
175}
176
177#[xmtp_common::async_trait]
178impl<T: MaybeSend + MaybeSync + ?Sized> Client for Arc<T>
179where
180    T: Client,
181{
182    type Error = T::Error;
183
184    type Stream = T::Stream;
185
186    async fn request(
187        &self,
188        request: request::Builder,
189        path: PathAndQuery,
190        body: Bytes,
191    ) -> Result<http::Response<Bytes>, ApiClientError<Self::Error>> {
192        (**self).request(request, path, body).await
193    }
194
195    async fn stream(
196        &self,
197        request: request::Builder,
198        path: PathAndQuery,
199        body: Bytes,
200    ) -> Result<http::Response<Self::Stream>, ApiClientError<Self::Error>> {
201        (**self).stream(request, path, body).await
202    }
203
204    /// start a "fake" stream that does not create a TCP connection and will always be pending
205    fn fake_stream(&self) -> http::Response<Self::Stream> {
206        (**self).fake_stream()
207    }
208}
209
210#[xmtp_common::async_trait]
211pub trait IsConnectedCheck: MaybeSend + MaybeSync {
212    /// Check if a client is connected
213    async fn is_connected(&self) -> bool;
214}
215
216/// Queries describe the way an endpoint is called.
217/// these are extensions to the behavior of specific endpoints.
218#[xmtp_common::async_trait]
219pub trait Query<C: Client>: MaybeSend + MaybeSync {
220    type Output: MaybeSend + MaybeSync;
221    async fn query(&mut self, client: &C) -> Result<Self::Output, ApiClientError<C::Error>>;
222}
223
224#[xmtp_common::async_trait]
225pub trait QueryRaw<C: Client>: MaybeSend + MaybeSync {
226    async fn query_raw(&mut self, client: &C) -> Result<bytes::Bytes, ApiClientError<C::Error>>;
227}
228
229/// a companion to the [`Query`] trait, except for streaming calls.
230/// Not every query combinator/extension will apply to both
231/// steams and one-off calls (how do you 'page' a streaming api?),
232/// so these traits are separated.
233#[xmtp_common::async_trait]
234pub trait QueryStream<T, C>
235where
236    C: Client,
237{
238    /// stream items from an endpoint
239    /// [`QueryStreamExt::subscribe`] or [`crate::api::stream_as`] should be used to indicate
240    /// the type of item in the stream.
241    async fn stream(
242        &mut self,
243        client: &C,
244    ) -> Result<XmtpStream<<C as Client>::Stream, T>, ApiClientError<C::Error>>;
245
246    fn fake_stream(&mut self, client: &C) -> XmtpStream<<C as Client>::Stream, T>;
247}
248
249#[xmtp_common::async_trait]
250pub trait QueryStreamExt<T, C: Client> {
251    /// Subscribe to the endpoint, indicating the type of stream item with `R`
252    async fn subscribe(
253        &mut self,
254        client: &C,
255    ) -> Result<XmtpStream<<C as Client>::Stream, T>, ApiClientError<C::Error>>
256    where
257        T: Default + prost::Message + 'static;
258}
259
260#[xmtp_common::async_trait]
261impl<T, C, E> QueryStreamExt<T, C> for E
262where
263    C: Client,
264    E: Endpoint<Output = T>,
265{
266    async fn subscribe(
267        &mut self,
268        client: &C,
269    ) -> Result<XmtpStream<<C as Client>::Stream, T>, ApiClientError<C::Error>>
270    where
271        T: Default + prost::Message + 'static,
272    {
273        self.stream(client).await
274    }
275}
276
277#[cfg(test)]
278mod test {
279    use crate::api::{
280        EndpointExt, Query,
281        mock::{MockNetworkClient, TestEndpoint},
282    };
283
284    // test ensures these combinations can compile
285    #[xmtp_common::test]
286    async fn endpoints_can_be_chained() {
287        let client = MockNetworkClient::new();
288        std::mem::drop(TestEndpoint.ignore_response().retry().query(&client));
289        std::mem::drop(TestEndpoint.retry().ignore_response().query(&client));
290    }
291}