xmtp_api_grpc/streams/
escapable.rs1use std::{
2 io::ErrorKind,
3 pin::Pin,
4 task::{Poll, ready},
5};
6
7use futures::{Stream, TryStream, stream::FusedStream};
8use pin_project_lite::pin_project;
9use std::error::Error;
10use tonic::Status;
11
12pin_project! {
13 pub struct EscapableTonicStream<S> {
25 #[pin] inner: S,
26 is_broken: bool
27 }
28}
29
30impl<S> EscapableTonicStream<S> {
31 pub fn new(inner: S) -> Self {
32 Self {
33 inner,
34 is_broken: false,
35 }
36 }
37}
38
39fn maybe_extract_io_err(err: &Status) -> Option<&std::io::Error> {
40 if let Some(source) = err.source()
41 && let Some(hyper_err) = source.downcast_ref::<hyper::Error>()
43 && let Some(hyper_source) = hyper_err.source()
44 && let Some(s) = hyper_source.downcast_ref::<h2::Error>()
45 {
46 return s.get_io();
47 }
48 None
49}
50
51impl<S> Stream for EscapableTonicStream<S>
52where
53 S: TryStream<Error = Status>,
54{
55 type Item = Result<S::Ok, Status>;
56
57 fn poll_next(
58 mut self: Pin<&mut Self>,
59 cx: &mut std::task::Context<'_>,
60 ) -> std::task::Poll<Option<Self::Item>> {
61 if self.is_broken {
64 return Poll::Ready(None);
65 }
66 let mut this = self.as_mut().project();
67 let item = ready!(this.inner.as_mut().try_poll_next(cx));
68 match item {
69 Some(Err(e)) => {
70 tracing::error!("error in tonic stream {}", e);
71 if let Some(io) = maybe_extract_io_err(&e)
73 && io.kind() == ErrorKind::BrokenPipe
74 {
75 *this.is_broken = true;
76 cx.waker().wake_by_ref();
78 }
79 Poll::Ready(Some(Err(e)))
80 }
81 i => Poll::Ready(i),
82 }
83 }
84}
85
86impl<S> FusedStream for EscapableTonicStream<S>
87where
88 S: TryStream<Error = Status>,
89 S::Error: Into<Status>,
90{
91 fn is_terminated(&self) -> bool {
92 self.is_broken
93 }
94}