xmtp_api_d14n/queries/stream/
flattened.rs

1//! Convenience type which changes a stream whos items implement `Paged` to return the inner paged items (Vec<T>)
2
3use futures::{Stream, TryStream};
4use pin_project_lite::pin_project;
5use std::task::Poll;
6use xmtp_proto::api_client::Paged;
7
8use crate::protocol::EnvelopeError;
9
10pin_project! {
11    pub struct FlattenedStream<S> {
12        #[pin] inner: S,
13    }
14}
15
16/// Wrap a `TryStream<T>` whos items, TryStream::<T>::Ok, implement
17/// [`Paged`](xmtp_proto::api_client::Paged).
18/// functionally, this means a struct wrapping a Vec<T> will return the inner Vec<T> instead of the
19/// struct
20///
21/// in other words, a Stream of the form `Stream<Item = Foo>`
22/// will instead be `Stream<Item = Vec<Boo>>`
23/// ```ignore
24///     struct Foo {
25///         items: Vec<Boo>
26///     }
27///
28///     impl Paged for Foo {
29///         type Message = Boo;
30///         fn messages(self) -> Vec<Boo> {
31///             self.items
32///         }
33///         /* .. */
34///     }
35/// ```
36///
37pub fn flattened<S>(s: S) -> FlattenedStream<S> {
38    FlattenedStream::<S> { inner: s }
39}
40
41impl<S> Stream for FlattenedStream<S>
42where
43    S: TryStream,
44    S::Ok: Paged,
45    S::Error: From<EnvelopeError>,
46{
47    type Item = Result<Vec<<S::Ok as Paged>::Message>, S::Error>;
48
49    fn poll_next(
50        mut self: std::pin::Pin<&mut Self>,
51        cx: &mut std::task::Context<'_>,
52    ) -> Poll<Option<Self::Item>> {
53        let this = self.as_mut().project();
54        this.inner.try_poll_next(cx).map_ok(Paged::messages)
55    }
56}
57
58#[cfg(test)]
59mod tests {
60    use super::*;
61    use futures::StreamExt;
62    use futures::stream;
63    use xmtp_proto::xmtp::mls::api::v1::PagingInfo;
64
65    #[derive(Debug, Clone)]
66    struct TestPagedResponse {
67        items: Vec<String>,
68    }
69
70    impl Paged for TestPagedResponse {
71        type Message = String;
72
73        fn info(&self) -> &Option<PagingInfo> {
74            &None
75        }
76
77        fn messages(self) -> Vec<Self::Message> {
78            self.items
79        }
80    }
81
82    #[xmtp_common::test]
83    async fn test_flattened_stream() {
84        // Create a stream of TestPagedResponse items
85        let test_data: Vec<Result<TestPagedResponse, EnvelopeError>> = vec![
86            Ok(TestPagedResponse {
87                items: vec!["message1".to_string(), "message2".to_string()],
88            }),
89            Ok(TestPagedResponse {
90                items: vec!["message3".to_string()],
91            }),
92            Ok(TestPagedResponse {
93                items: vec![
94                    "message4".to_string(),
95                    "message5".to_string(),
96                    "message6".to_string(),
97                ],
98            }),
99        ];
100
101        let source_stream = stream::iter(test_data);
102        let mut flattened = flattened(source_stream);
103
104        // First batch
105        let result1 = flattened.next().await.unwrap().unwrap();
106        assert_eq!(
107            result1,
108            vec!["message1".to_string(), "message2".to_string()]
109        );
110
111        // Second batch
112        let result2 = flattened.next().await.unwrap().unwrap();
113        assert_eq!(result2, vec!["message3".to_string()]);
114
115        // Third batch
116        let result3 = flattened.next().await.unwrap().unwrap();
117        assert_eq!(
118            result3,
119            vec![
120                "message4".to_string(),
121                "message5".to_string(),
122                "message6".to_string()
123            ]
124        );
125
126        // Stream should be exhausted
127        assert!(flattened.next().await.is_none());
128    }
129}