xmtp_api_grpc/streams/
non_blocking_request.rs

1//! Non Blocking Request
2//! Fully awaits server_streaming request on native
3//! polls server_streaming request once on wasm (requiring Unpin)
4use crate::streams::NonBlockingWebStream;
5use pin_project_lite::pin_project;
6use prost::bytes::Bytes;
7use std::{
8    future::Future,
9    pin::Pin,
10    task::{Context, Poll},
11};
12use tonic::{Response, Status};
13
14pin_project! {
15    pub struct NonBlockingStreamRequest<F> {
16        #[pin] inner: F,
17    }
18}
19
20impl<F> NonBlockingStreamRequest<F> {
21    pub fn new(inner: F) -> Self {
22        Self { inner }
23    }
24}
25
26impl<F> Future for NonBlockingStreamRequest<F>
27where
28    F: Future,
29{
30    type Output = F::Output;
31
32    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
33        let this = self.as_mut().project();
34        this.inner.poll(cx)
35    }
36}
37
38#[cfg(target_arch = "wasm32")]
39impl<F> NonBlockingStreamRequest<F>
40where
41    F: Future + Unpin,
42{
43    async fn establish(&mut self) {
44        // we need to poll the future once to progress the future state &
45        // actually send the request in the first place.
46        // since the request has not even been sent yet, this should always be pending.
47        let mut this = Pin::new(self);
48        if futures::future::poll_immediate(&mut this).await.is_some() {
49            tracing::error!("Stream ready before established");
50            unreachable!("Stream ready before established")
51        }
52    }
53}
54
55#[cfg(not(target_arch = "wasm32"))]
56pub async fn send<F>(
57    this: NonBlockingStreamRequest<F>,
58) -> Result<
59    Response<NonBlockingWebStream<NonBlockingStreamRequest<F>, tonic::Streaming<Bytes>>>,
60    Status,
61>
62where
63    F: Future<Output = Result<Response<tonic::Streaming<Bytes>>, Status>> + Send,
64{
65    let response = this.await?;
66    Ok(response.map(NonBlockingWebStream::started))
67}
68
69#[cfg(target_arch = "wasm32")]
70pub async fn send<F>(
71    mut this: NonBlockingStreamRequest<F>,
72) -> Result<
73    Response<NonBlockingWebStream<NonBlockingStreamRequest<F>, tonic::Streaming<Bytes>>>,
74    Status,
75>
76where
77    F: Future<Output = Result<Response<tonic::Streaming<Bytes>>, Status>> + Unpin,
78{
79    this.establish().await;
80    let body = NonBlockingWebStream::new(this);
81    Ok(Response::new(body))
82}
83
84// we cant use From<> because of orphan rules
85pub trait IntoInner {
86    type Out;
87    fn into_inner(self) -> Self::Out;
88}
89
90impl IntoInner for Response<tonic::Streaming<Bytes>> {
91    type Out = tonic::Streaming<Bytes>;
92    fn into_inner(self) -> Self::Out {
93        Response::into_inner(self)
94    }
95}