xmtp_api_d14n/protocol/extractors/
aggregate.rs

1//! Aggregate extractors extract data from collections of envelopes.
2
3use super::{EnvelopeCollection, EnvelopeError, Extractor, ProtocolEnvelope};
4use crate::protocol::traits::EnvelopeVisitor;
5use std::marker::PhantomData;
6
7/// Extract Data from a collection of envelopes
8/// Does not preserve sequenced data, use [`SequencedExtractor`]
9/// to preserve SequenceID/OriginatorID
10/// runs with one extractor.
11/// Does not preserve per-envelope data, since sometimes we
12/// may only care about the payload of a series of envelopes.
13pub struct CollectionExtractor<Envelopes, Extractor> {
14    envelopes: Envelopes,
15    extractor: Extractor,
16}
17
18impl<Envelopes, Extractor> CollectionExtractor<Envelopes, Extractor> {
19    pub fn new(envelopes: Envelopes, extractor: Extractor) -> Self {
20        Self {
21            envelopes,
22            extractor,
23        }
24    }
25}
26
27impl<'a, Envelopes, E> Extractor for CollectionExtractor<Envelopes, E>
28where
29    Envelopes: EnvelopeCollection<'a> + IntoIterator,
30    <Envelopes as IntoIterator>::Item: ProtocolEnvelope<'a>,
31    E: Extractor + EnvelopeVisitor<'a>,
32    EnvelopeError: From<<E as EnvelopeVisitor<'a>>::Error>,
33{
34    type Output = Result<<E as Extractor>::Output, EnvelopeError>;
35
36    fn get(mut self) -> Self::Output {
37        for envelope in self.envelopes.into_iter() {
38            envelope.accept(&mut self.extractor)?;
39        }
40        Ok(self.extractor.get())
41    }
42}
43
44/// Build a [`SequencedExtractor`]
45#[derive(Default)]
46pub struct SequencedExtractorBuilder<Envelope> {
47    envelopes: Vec<Envelope>,
48}
49
50impl<Envelope> SequencedExtractorBuilder<Envelope> {
51    pub fn envelopes<E>(self, envelopes: Vec<E>) -> SequencedExtractorBuilder<E> {
52        SequencedExtractorBuilder::<E> { envelopes }
53    }
54
55    pub fn build<Extractor>(self) -> SequencedExtractor<Envelope, Extractor> {
56        SequencedExtractor {
57            envelopes: self.envelopes,
58            _marker: PhantomData,
59        }
60    }
61}
62
63/// Extract data from a sequence of envelopes, preserving
64/// per-envelope data like Sequence ID
65// TODO: Could probably act on a generic of impl Iterator
66// but could be a later improvement
67pub struct SequencedExtractor<Envelope, Extractor> {
68    envelopes: Vec<Envelope>,
69    _marker: PhantomData<Extractor>,
70}
71
72impl SequencedExtractor<(), ()> {
73    pub fn builder() -> SequencedExtractorBuilder<()> {
74        SequencedExtractorBuilder::default()
75    }
76}
77
78impl<'a, Envelope, E> Extractor for SequencedExtractor<Envelope, E>
79where
80    E: Extractor + EnvelopeVisitor<'a> + Default,
81    Envelope: ProtocolEnvelope<'a>,
82    EnvelopeError: From<<E as EnvelopeVisitor<'a>>::Error>,
83{
84    type Output = Result<Vec<<E as Extractor>::Output>, EnvelopeError>;
85
86    fn get(self) -> Self::Output {
87        let mut out = Vec::with_capacity(self.envelopes.len());
88        for envelope in self.envelopes.into_iter() {
89            let mut extractor = E::default();
90            envelope.accept(&mut extractor)?;
91            out.push(extractor.get());
92        }
93        Ok(out)
94    }
95}
96
97#[cfg(test)]
98mod tests {
99    use super::*;
100    use crate::protocol::extractors::key_packages::KeyPackagesExtractor;
101    use crate::protocol::extractors::test_utils::*;
102    use crate::protocol::extractors::topics::TopicExtractor;
103    use xmtp_proto::xmtp::xmtpv4::envelopes::OriginatorEnvelope;
104
105    fn create_test_key_package() -> Vec<u8> {
106        // Create a simple mock key package for testing
107        xmtp_common::rand_vec::<32>()
108    }
109
110    #[xmtp_common::test]
111    fn test_collection_extractor_single_envelope() {
112        let kp_data = create_test_key_package();
113        let envelope = TestEnvelopeBuilder::new()
114            .with_key_package_custom(kp_data.clone())
115            .build();
116        let envelopes = vec![envelope];
117
118        let extractor = CollectionExtractor::new(envelopes, KeyPackagesExtractor::new());
119        let result = extractor.get().unwrap();
120
121        assert_eq!(result.len(), 1);
122        assert_eq!(result[0].key_package_tls_serialized, kp_data);
123    }
124
125    #[xmtp_common::test]
126    fn test_collection_extractor_multiple_envelopes() {
127        let kp_data1 = create_test_key_package();
128        let kp_data2 = create_test_key_package();
129        let kp_data3 = create_test_key_package();
130
131        let envelopes = vec![
132            TestEnvelopeBuilder::new()
133                .with_key_package_custom(kp_data1.clone())
134                .build(),
135            TestEnvelopeBuilder::new()
136                .with_key_package_custom(kp_data2.clone())
137                .build(),
138            TestEnvelopeBuilder::new()
139                .with_key_package_custom(kp_data3.clone())
140                .build(),
141        ];
142
143        let extractor = CollectionExtractor::new(envelopes, KeyPackagesExtractor::new());
144        let result = extractor.get().unwrap();
145
146        assert_eq!(result.len(), 3);
147        assert_eq!(result[0].key_package_tls_serialized, kp_data1);
148        assert_eq!(result[1].key_package_tls_serialized, kp_data2);
149        assert_eq!(result[2].key_package_tls_serialized, kp_data3);
150    }
151
152    #[xmtp_common::test]
153    fn test_collection_extractor_empty() {
154        let envelopes: Vec<OriginatorEnvelope> = vec![];
155        let extractor = CollectionExtractor::new(envelopes, KeyPackagesExtractor::new());
156        let result = extractor.get().unwrap();
157
158        assert_eq!(result.len(), 0);
159    }
160
161    #[xmtp_common::test]
162    fn test_sequenced_extractor_single_envelope() {
163        let kp_data = create_test_key_package();
164        let envelope = TestEnvelopeBuilder::new()
165            .with_key_package_custom(kp_data.clone())
166            .build();
167        let envelopes = vec![envelope];
168
169        let extractor = SequencedExtractor::builder()
170            .envelopes(envelopes)
171            .build::<KeyPackagesExtractor>();
172
173        let result = extractor.get().unwrap();
174
175        assert_eq!(result.len(), 1);
176        assert_eq!(result[0].len(), 1);
177        assert_eq!(result[0][0].key_package_tls_serialized, kp_data);
178    }
179
180    #[xmtp_common::test]
181    fn test_sequenced_extractor_multiple_envelopes() {
182        let kp_data1 = create_test_key_package();
183        let kp_data2 = create_test_key_package();
184
185        let envelopes = vec![
186            TestEnvelopeBuilder::new()
187                .with_key_package_custom(kp_data1.clone())
188                .build(),
189            TestEnvelopeBuilder::new()
190                .with_key_package_custom(kp_data2.clone())
191                .build(),
192        ];
193
194        let extractor = SequencedExtractor::builder()
195            .envelopes(envelopes)
196            .build::<KeyPackagesExtractor>();
197
198        let result = extractor.get().unwrap();
199
200        assert_eq!(result.len(), 2);
201        assert_eq!(result[0].len(), 1);
202        assert_eq!(result[1].len(), 1);
203        assert_eq!(result[0][0].key_package_tls_serialized, kp_data1);
204        assert_eq!(result[1][0].key_package_tls_serialized, kp_data2);
205    }
206
207    #[xmtp_common::test]
208    fn test_sequenced_extractor_with_topic_extractor() {
209        let kp_data1 = create_test_key_package();
210        let kp_data2 = create_test_key_package();
211
212        let envelopes = vec![
213            TestEnvelopeBuilder::new()
214                .with_key_package_custom(kp_data1)
215                .build(),
216            TestEnvelopeBuilder::new()
217                .with_key_package_custom(kp_data2)
218                .build(),
219        ];
220
221        let extractor = SequencedExtractor::builder()
222            .envelopes(envelopes)
223            .build::<TopicExtractor>();
224
225        // The SequencedExtractor will fail early when processing the first envelope
226        // because mock key package data can't be deserialized
227        let result = extractor.get();
228        assert!(result.is_err());
229    }
230
231    #[xmtp_common::test]
232    fn test_sequenced_extractor_empty() {
233        let envelopes: Vec<OriginatorEnvelope> = vec![];
234
235        let extractor = SequencedExtractor::builder()
236            .envelopes(envelopes)
237            .build::<KeyPackagesExtractor>();
238
239        let result = extractor.get().unwrap();
240        assert_eq!(result.len(), 0);
241    }
242}