xmtp_proto/traits/
query.rs

1//! Implementation of the Query trait for all Endpoints
2
3use bytes::Bytes;
4
5use super::{Client, Endpoint, Query, QueryStream};
6use crate::{
7    ApiEndpoint,
8    api::{QueryRaw, XmtpStream},
9    prelude::ApiClientError,
10};
11
12pub(super) async fn request<C: Client>(
13    client: &C,
14    endpoint: &mut impl Endpoint,
15) -> Result<http::Response<Bytes>, ApiClientError<C::Error>> {
16    let request = http::Request::builder();
17    let endpoint_url = endpoint.grpc_endpoint();
18    let path = http::uri::PathAndQuery::try_from(endpoint_url.as_ref())?;
19    client
20        .request(request, path, endpoint.body()?)
21        .await
22        .map_err(|e| e.endpoint(endpoint_url.into_owned()))
23}
24
25// blanket Query implementation for a bare Endpoint
26#[xmtp_common::async_trait]
27impl<Q, C> Query<C> for Q
28where
29    Q: Endpoint,
30    C: Client,
31    <Q as Endpoint>::Output: Default + prost::Message + 'static,
32{
33    type Output = <Q as Endpoint>::Output;
34    async fn query(&mut self, client: &C) -> Result<Self::Output, ApiClientError<C::Error>> {
35        let rsp = request(client, self).await?;
36        let value = prost::Message::decode(rsp.into_body())?;
37        Ok(value)
38    }
39}
40
41// blanket QueryRaw implementation for a bare Endpoint
42#[xmtp_common::async_trait]
43impl<E, C> QueryRaw<C> for E
44where
45    E: Endpoint,
46    C: Client,
47{
48    async fn query_raw(&mut self, client: &C) -> Result<bytes::Bytes, ApiClientError<C::Error>> {
49        let rsp = request(client, self).await?;
50        Ok(rsp.into_body())
51    }
52}
53
54// blanket Query implementation for a bare Endpoint
55#[xmtp_common::async_trait]
56impl<E, T, C> QueryStream<T, C> for E
57where
58    E: Endpoint,
59    C: Client,
60    T: Default + prost::Message + 'static,
61{
62    async fn stream(
63        &mut self,
64        client: &C,
65    ) -> Result<XmtpStream<<C as Client>::Stream, T>, ApiClientError<C::Error>> {
66        let request = http::Request::builder();
67        let endpoint = self.grpc_endpoint();
68        let path = http::uri::PathAndQuery::try_from(endpoint.as_ref())?;
69        let rsp = client
70            .stream(request, path, self.body()?)
71            .await
72            .map_err(|e| e.endpoint(endpoint.as_ref().to_owned()))?;
73        let stream = rsp.into_body();
74        let stream = XmtpStream::new(stream, ApiEndpoint::Path(endpoint.into_owned()));
75        Ok(stream)
76    }
77
78    fn fake_stream(&mut self, client: &C) -> XmtpStream<<C as Client>::Stream, T> {
79        let endpoint = self.grpc_endpoint();
80        let rsp = client.fake_stream();
81        let stream = rsp.into_body();
82        XmtpStream::new(stream, ApiEndpoint::Path(endpoint.into_owned()))
83    }
84}