xmtp_proto/traits/
query.rs1use bytes::Bytes;
4
5use super::{Client, Endpoint, Query, QueryStream};
6use crate::{
7 ApiEndpoint,
8 api::{QueryRaw, XmtpStream},
9 prelude::ApiClientError,
10};
11
12pub(super) async fn request<C: Client>(
13 client: &C,
14 endpoint: &mut impl Endpoint,
15) -> Result<http::Response<Bytes>, ApiClientError<C::Error>> {
16 let request = http::Request::builder();
17 let endpoint_url = endpoint.grpc_endpoint();
18 let path = http::uri::PathAndQuery::try_from(endpoint_url.as_ref())?;
19 client
20 .request(request, path, endpoint.body()?)
21 .await
22 .map_err(|e| e.endpoint(endpoint_url.into_owned()))
23}
24
25#[xmtp_common::async_trait]
27impl<Q, C> Query<C> for Q
28where
29 Q: Endpoint,
30 C: Client,
31 <Q as Endpoint>::Output: Default + prost::Message + 'static,
32{
33 type Output = <Q as Endpoint>::Output;
34 async fn query(&mut self, client: &C) -> Result<Self::Output, ApiClientError<C::Error>> {
35 let rsp = request(client, self).await?;
36 let value = prost::Message::decode(rsp.into_body())?;
37 Ok(value)
38 }
39}
40
41#[xmtp_common::async_trait]
43impl<E, C> QueryRaw<C> for E
44where
45 E: Endpoint,
46 C: Client,
47{
48 async fn query_raw(&mut self, client: &C) -> Result<bytes::Bytes, ApiClientError<C::Error>> {
49 let rsp = request(client, self).await?;
50 Ok(rsp.into_body())
51 }
52}
53
54#[xmtp_common::async_trait]
56impl<E, T, C> QueryStream<T, C> for E
57where
58 E: Endpoint,
59 C: Client,
60 T: Default + prost::Message + 'static,
61{
62 async fn stream(
63 &mut self,
64 client: &C,
65 ) -> Result<XmtpStream<<C as Client>::Stream, T>, ApiClientError<C::Error>> {
66 let request = http::Request::builder();
67 let endpoint = self.grpc_endpoint();
68 let path = http::uri::PathAndQuery::try_from(endpoint.as_ref())?;
69 let rsp = client
70 .stream(request, path, self.body()?)
71 .await
72 .map_err(|e| e.endpoint(endpoint.as_ref().to_owned()))?;
73 let stream = rsp.into_body();
74 let stream = XmtpStream::new(stream, ApiEndpoint::Path(endpoint.into_owned()));
75 Ok(stream)
76 }
77
78 fn fake_stream(&mut self, client: &C) -> XmtpStream<<C as Client>::Stream, T> {
79 let endpoint = self.grpc_endpoint();
80 let rsp = client.fake_stream();
81 let stream = rsp.into_body();
82 XmtpStream::new(stream, ApiEndpoint::Path(endpoint.into_owned()))
83 }
84}