xmtp_api_grpc/streams/
non_blocking_request.rs1use crate::streams::NonBlockingWebStream;
5use pin_project_lite::pin_project;
6use prost::bytes::Bytes;
7use std::{
8 future::Future,
9 pin::Pin,
10 task::{Context, Poll},
11};
12use tonic::{Response, Status};
13
14pin_project! {
15 pub struct NonBlockingStreamRequest<F> {
16 #[pin] inner: F,
17 }
18}
19
20impl<F> NonBlockingStreamRequest<F> {
21 pub fn new(inner: F) -> Self {
22 Self { inner }
23 }
24}
25
26impl<F> Future for NonBlockingStreamRequest<F>
27where
28 F: Future,
29{
30 type Output = F::Output;
31
32 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
33 let this = self.as_mut().project();
34 this.inner.poll(cx)
35 }
36}
37
38#[cfg(target_arch = "wasm32")]
39impl<F> NonBlockingStreamRequest<F>
40where
41 F: Future + Unpin,
42{
43 async fn establish(&mut self) {
44 let mut this = Pin::new(self);
48 if futures::future::poll_immediate(&mut this).await.is_some() {
49 tracing::error!("Stream ready before established");
50 unreachable!("Stream ready before established")
51 }
52 }
53}
54
55#[cfg(not(target_arch = "wasm32"))]
56pub async fn send<F>(
57 this: NonBlockingStreamRequest<F>,
58) -> Result<
59 Response<NonBlockingWebStream<NonBlockingStreamRequest<F>, tonic::Streaming<Bytes>>>,
60 Status,
61>
62where
63 F: Future<Output = Result<Response<tonic::Streaming<Bytes>>, Status>> + Send,
64{
65 let response = this.await?;
66 Ok(response.map(NonBlockingWebStream::started))
67}
68
69#[cfg(target_arch = "wasm32")]
70pub async fn send<F>(
71 mut this: NonBlockingStreamRequest<F>,
72) -> Result<
73 Response<NonBlockingWebStream<NonBlockingStreamRequest<F>, tonic::Streaming<Bytes>>>,
74 Status,
75>
76where
77 F: Future<Output = Result<Response<tonic::Streaming<Bytes>>, Status>> + Unpin,
78{
79 this.establish().await;
80 let body = NonBlockingWebStream::new(this);
81 Ok(Response::new(body))
82}
83
84pub trait IntoInner {
86 type Out;
87 fn into_inner(self) -> Self::Out;
88}
89
90impl IntoInner for Response<tonic::Streaming<Bytes>> {
91 type Out = tonic::Streaming<Bytes>;
92 fn into_inner(self) -> Self::Out {
93 Response::into_inner(self)
94 }
95}