1use crate::{
4 api::{RetryQuery, V3Paged, XmtpStream, combinators::Ignore},
5 api_client::{AggregateStats, ApiStats, IdentityStats},
6};
7use http::{request, uri::PathAndQuery};
8use prost::bytes::Bytes;
9use std::{borrow::Cow, sync::Arc};
10use xmtp_common::{MaybeSend, MaybeSync, Retry};
11
12xmtp_common::if_test! {
13 pub mod mock;
14}
15
16mod boxed_client;
17pub(super) mod combinators;
18mod error;
19mod query;
20pub mod stream;
21mod vector_clock;
22pub use boxed_client::*;
23pub use error::*;
24pub use vector_clock::*;
25
26pub trait HasStats {
27 fn aggregate_stats(&self) -> AggregateStats;
28 fn mls_stats(&self) -> ApiStats;
29 fn identity_stats(&self) -> IdentityStats;
30}
31
32pub trait Endpoint<Specialized = ()>: MaybeSend + MaybeSync {
35 type Output: MaybeSend + MaybeSync;
36 fn grpc_endpoint(&self) -> Cow<'static, str>;
37
38 fn body(&self) -> Result<Bytes, BodyError>;
39}
40
41pub trait EndpointExt<S>: Endpoint<S> {
42 fn ignore_response(self) -> Ignore<Self>
43 where
44 Self: Sized + Endpoint<S>,
45 {
46 combinators::ignore(self)
47 }
48
49 fn v3_paged(self, cursor: Option<u64>) -> V3Paged<Self, <Self as Endpoint<S>>::Output>
50 where
51 Self: Sized + Endpoint<S>,
52 {
53 combinators::v3_paged(self, cursor)
54 }
55
56 fn retry(self) -> RetryQuery<Self>
57 where
58 Self: Sized + Endpoint<S>,
59 {
60 combinators::retry(self)
61 }
62
63 fn retry_with_strategy<St>(self, strategy: Retry<St>) -> RetryQuery<Self, St>
64 where
65 Self: Sized + Endpoint<S>,
66 {
67 combinators::retry_with_strategy(self, strategy)
68 }
69}
70
71impl<S, E> EndpointExt<S> for E where E: Endpoint<S> {}
72
73pub trait Pageable {
79 fn set_cursor(&mut self, cursor: u64);
81}
82
83#[xmtp_common::async_trait]
90pub trait Client: MaybeSend + MaybeSync {
91 type Error: std::error::Error + MaybeSend + MaybeSync + 'static;
92
93 type Stream: futures::Stream<Item = Result<Bytes, Self::Error>> + MaybeSend;
94
95 async fn request(
96 &self,
97 request: request::Builder,
98 path: PathAndQuery,
99 body: Bytes,
100 ) -> Result<http::Response<Bytes>, ApiClientError<Self::Error>>;
101
102 async fn stream(
103 &self,
104 request: request::Builder,
105 path: http::uri::PathAndQuery,
106 body: Bytes,
107 ) -> Result<http::Response<Self::Stream>, ApiClientError<Self::Error>>;
108
109 fn fake_stream(&self) -> http::Response<Self::Stream>;
111}
112
113#[xmtp_common::async_trait]
114impl<T: MaybeSend + MaybeSync + ?Sized> Client for &T
115where
116 T: Client,
117{
118 type Error = T::Error;
119
120 type Stream = T::Stream;
121
122 async fn request(
123 &self,
124 request: request::Builder,
125 path: PathAndQuery,
126 body: Bytes,
127 ) -> Result<http::Response<Bytes>, ApiClientError<Self::Error>> {
128 (**self).request(request, path, body).await
129 }
130
131 async fn stream(
132 &self,
133 request: request::Builder,
134 path: http::uri::PathAndQuery,
135 body: Bytes,
136 ) -> Result<http::Response<Self::Stream>, ApiClientError<Self::Error>> {
137 (**self).stream(request, path, body).await
138 }
139
140 fn fake_stream(&self) -> http::Response<Self::Stream> {
141 (**self).fake_stream()
142 }
143}
144
145#[xmtp_common::async_trait]
146impl<T: MaybeSend + MaybeSync + ?Sized> Client for Box<T>
147where
148 T: Client,
149{
150 type Error = T::Error;
151
152 type Stream = T::Stream;
153
154 async fn request(
155 &self,
156 request: request::Builder,
157 path: PathAndQuery,
158 body: Bytes,
159 ) -> Result<http::Response<Bytes>, ApiClientError<Self::Error>> {
160 (**self).request(request, path, body).await
161 }
162
163 async fn stream(
164 &self,
165 request: request::Builder,
166 path: http::uri::PathAndQuery,
167 body: Bytes,
168 ) -> Result<http::Response<Self::Stream>, ApiClientError<Self::Error>> {
169 (**self).stream(request, path, body).await
170 }
171
172 fn fake_stream(&self) -> http::Response<Self::Stream> {
173 (**self).fake_stream()
174 }
175}
176
177#[xmtp_common::async_trait]
178impl<T: MaybeSend + MaybeSync + ?Sized> Client for Arc<T>
179where
180 T: Client,
181{
182 type Error = T::Error;
183
184 type Stream = T::Stream;
185
186 async fn request(
187 &self,
188 request: request::Builder,
189 path: PathAndQuery,
190 body: Bytes,
191 ) -> Result<http::Response<Bytes>, ApiClientError<Self::Error>> {
192 (**self).request(request, path, body).await
193 }
194
195 async fn stream(
196 &self,
197 request: request::Builder,
198 path: PathAndQuery,
199 body: Bytes,
200 ) -> Result<http::Response<Self::Stream>, ApiClientError<Self::Error>> {
201 (**self).stream(request, path, body).await
202 }
203
204 fn fake_stream(&self) -> http::Response<Self::Stream> {
206 (**self).fake_stream()
207 }
208}
209
210#[xmtp_common::async_trait]
211pub trait IsConnectedCheck: MaybeSend + MaybeSync {
212 async fn is_connected(&self) -> bool;
214}
215
216#[xmtp_common::async_trait]
219pub trait Query<C: Client>: MaybeSend + MaybeSync {
220 type Output: MaybeSend + MaybeSync;
221 async fn query(&mut self, client: &C) -> Result<Self::Output, ApiClientError<C::Error>>;
222}
223
224#[xmtp_common::async_trait]
225pub trait QueryRaw<C: Client>: MaybeSend + MaybeSync {
226 async fn query_raw(&mut self, client: &C) -> Result<bytes::Bytes, ApiClientError<C::Error>>;
227}
228
229#[xmtp_common::async_trait]
234pub trait QueryStream<T, C>
235where
236 C: Client,
237{
238 async fn stream(
242 &mut self,
243 client: &C,
244 ) -> Result<XmtpStream<<C as Client>::Stream, T>, ApiClientError<C::Error>>;
245
246 fn fake_stream(&mut self, client: &C) -> XmtpStream<<C as Client>::Stream, T>;
247}
248
249#[xmtp_common::async_trait]
250pub trait QueryStreamExt<T, C: Client> {
251 async fn subscribe(
253 &mut self,
254 client: &C,
255 ) -> Result<XmtpStream<<C as Client>::Stream, T>, ApiClientError<C::Error>>
256 where
257 T: Default + prost::Message + 'static;
258}
259
260#[xmtp_common::async_trait]
261impl<T, C, E> QueryStreamExt<T, C> for E
262where
263 C: Client,
264 E: Endpoint<Output = T>,
265{
266 async fn subscribe(
267 &mut self,
268 client: &C,
269 ) -> Result<XmtpStream<<C as Client>::Stream, T>, ApiClientError<C::Error>>
270 where
271 T: Default + prost::Message + 'static,
272 {
273 self.stream(client).await
274 }
275}
276
277#[cfg(test)]
278mod test {
279 use crate::api::{
280 EndpointExt, Query,
281 mock::{MockNetworkClient, TestEndpoint},
282 };
283
284 #[xmtp_common::test]
286 async fn endpoints_can_be_chained() {
287 let client = MockNetworkClient::new();
288 std::mem::drop(TestEndpoint.ignore_response().retry().query(&client));
289 std::mem::drop(TestEndpoint.retry().ignore_response().query(&client));
290 }
291}