xmtp_api_grpc/streams/
escapable.rs

1use std::{
2    io::ErrorKind,
3    pin::Pin,
4    task::{Poll, ready},
5};
6
7use futures::{Stream, TryStream, stream::FusedStream};
8use pin_project_lite::pin_project;
9use std::error::Error;
10use tonic::Status;
11
12pin_project! {
13    /// Wraps a tonic stream which exits once it encounters
14    /// an unrecoverable HTTP Error.
15    /// This wrapper does not try to differentiate between
16    /// transient HTTP Errors unrecoverable HTTP errors.
17    /// Once an error is encountered, the stream will yield the item
18    /// with the error, and then end the stream.
19    /// the stream is ended by returning Poll::Ready(None).
20    ///
21    /// These errors are treated as unrecoverable:
22    ///   - ErrorKind::BrokenPipe
23    ///     - BrokenPipe results from the HTTP/2 KeepAlive interval being exceeded
24    pub struct EscapableTonicStream<S> {
25        #[pin] inner: S,
26        is_broken: bool
27    }
28}
29
30impl<S> EscapableTonicStream<S> {
31    pub fn new(inner: S) -> Self {
32        Self {
33            inner,
34            is_broken: false,
35        }
36    }
37}
38
39fn maybe_extract_io_err(err: &Status) -> Option<&std::io::Error> {
40    if let Some(source) = err.source()
41        //try to downcast to hyper error
42    && let Some(hyper_err) = source.downcast_ref::<hyper::Error>()
43    && let Some(hyper_source) = hyper_err.source()
44    && let Some(s) = hyper_source.downcast_ref::<h2::Error>()
45    {
46        return s.get_io();
47    }
48    None
49}
50
51impl<S> Stream for EscapableTonicStream<S>
52where
53    S: TryStream<Error = Status>,
54{
55    type Item = Result<S::Ok, Status>;
56
57    fn poll_next(
58        mut self: Pin<&mut Self>,
59        cx: &mut std::task::Context<'_>,
60    ) -> std::task::Poll<Option<Self::Item>> {
61        // if we are broken, do not attempt to poll
62        // the inner stream anymore
63        if self.is_broken {
64            return Poll::Ready(None);
65        }
66        let mut this = self.as_mut().project();
67        let item = ready!(this.inner.as_mut().try_poll_next(cx));
68        match item {
69            Some(Err(e)) => {
70                tracing::error!("error in tonic stream {}", e);
71                // if the error is broken pipe, abort stream
72                if let Some(io) = maybe_extract_io_err(&e)
73                    && io.kind() == ErrorKind::BrokenPipe
74                {
75                    *this.is_broken = true;
76                    // register the next item (end of stream) with the executor
77                    cx.waker().wake_by_ref();
78                }
79                Poll::Ready(Some(Err(e)))
80            }
81            i => Poll::Ready(i),
82        }
83    }
84}
85
86impl<S> FusedStream for EscapableTonicStream<S>
87where
88    S: TryStream<Error = Status>,
89    S::Error: Into<Status>,
90{
91    fn is_terminated(&self) -> bool {
92        self.is_broken
93    }
94}