xmtp_api_d14n/queries/stream/
flattened.rs1use futures::{Stream, TryStream};
4use pin_project_lite::pin_project;
5use std::task::Poll;
6use xmtp_proto::api_client::Paged;
7
8use crate::protocol::EnvelopeError;
9
10pin_project! {
11 pub struct FlattenedStream<S> {
12 #[pin] inner: S,
13 }
14}
15
16pub fn flattened<S>(s: S) -> FlattenedStream<S> {
38 FlattenedStream::<S> { inner: s }
39}
40
41impl<S> Stream for FlattenedStream<S>
42where
43 S: TryStream,
44 S::Ok: Paged,
45 S::Error: From<EnvelopeError>,
46{
47 type Item = Result<Vec<<S::Ok as Paged>::Message>, S::Error>;
48
49 fn poll_next(
50 mut self: std::pin::Pin<&mut Self>,
51 cx: &mut std::task::Context<'_>,
52 ) -> Poll<Option<Self::Item>> {
53 let this = self.as_mut().project();
54 this.inner.try_poll_next(cx).map_ok(Paged::messages)
55 }
56}
57
58#[cfg(test)]
59mod tests {
60 use super::*;
61 use futures::StreamExt;
62 use futures::stream;
63 use xmtp_proto::xmtp::mls::api::v1::PagingInfo;
64
65 #[derive(Debug, Clone)]
66 struct TestPagedResponse {
67 items: Vec<String>,
68 }
69
70 impl Paged for TestPagedResponse {
71 type Message = String;
72
73 fn info(&self) -> &Option<PagingInfo> {
74 &None
75 }
76
77 fn messages(self) -> Vec<Self::Message> {
78 self.items
79 }
80 }
81
82 #[xmtp_common::test]
83 async fn test_flattened_stream() {
84 let test_data: Vec<Result<TestPagedResponse, EnvelopeError>> = vec![
86 Ok(TestPagedResponse {
87 items: vec!["message1".to_string(), "message2".to_string()],
88 }),
89 Ok(TestPagedResponse {
90 items: vec!["message3".to_string()],
91 }),
92 Ok(TestPagedResponse {
93 items: vec![
94 "message4".to_string(),
95 "message5".to_string(),
96 "message6".to_string(),
97 ],
98 }),
99 ];
100
101 let source_stream = stream::iter(test_data);
102 let mut flattened = flattened(source_stream);
103
104 let result1 = flattened.next().await.unwrap().unwrap();
106 assert_eq!(
107 result1,
108 vec!["message1".to_string(), "message2".to_string()]
109 );
110
111 let result2 = flattened.next().await.unwrap().unwrap();
113 assert_eq!(result2, vec!["message3".to_string()]);
114
115 let result3 = flattened.next().await.unwrap().unwrap();
117 assert_eq!(
118 result3,
119 vec![
120 "message4".to_string(),
121 "message5".to_string(),
122 "message6".to_string()
123 ]
124 );
125
126 assert!(flattened.next().await.is_none());
128 }
129}