xmtp_api_d14n/queries/stream/
ordered.rs

1//! Orders a stream with an [`Order`](crate::protocol::Ordered) according to XMTP XIP-49
2
3use crate::protocol::{
4    CursorStore, Envelope, EnvelopeError, Ordered, OrderedEnvelopeCollection, ResolutionError,
5    TypedNoopResolver,
6};
7use futures::{Stream, TryStream};
8use pin_project_lite::pin_project;
9use std::{
10    error::Error,
11    marker::PhantomData,
12    task::{Poll, ready},
13};
14use xmtp_common::RetryableError;
15use xmtp_proto::{api::ApiClientError, types::TopicCursor};
16
17pin_project! {
18    pub struct OrderedStream<S, Store, T> {
19        #[pin] inner: S,
20        cursor_store: Store,
21        topic_cursor: TopicCursor,
22        _marker: PhantomData<T>
23    }
24}
25
26// this is an error which should never occur,
27// and if it does is a bug in libxmtp
28#[derive(thiserror::Error, Debug)]
29pub enum OrderedStreamError {
30    #[error(transparent)]
31    Resolver(#[from] ResolutionError),
32}
33
34impl<E: Error> From<OrderedStreamError> for ApiClientError<E> {
35    fn from(value: OrderedStreamError) -> Self {
36        ApiClientError::Other(Box::new(value) as _)
37    }
38}
39
40impl RetryableError for OrderedStreamError {
41    fn is_retryable(&self) -> bool {
42        false
43    }
44}
45
46impl From<OrderedStreamError> for EnvelopeError {
47    fn from(value: OrderedStreamError) -> Self {
48        EnvelopeError::DynError(Box::new(value) as _)
49    }
50}
51
52/// Wrap a `TryStream<T>` who's items are [Envelope's](crate::protocol::Envelope) such that
53/// it orders the envelopes according to [XIP-49 Cross-Originator Message Ordering](https://github.com/xmtp/XIPs/blob/main/XIPs/xip-49-decentralized-backend.md#335-cross-originator-message-ordering).
54/// If an envelope cannot yet be processed due to missing required dependencies, the streamed
55/// message will be put into a persistent "icebox" until the required dependency is streamed.
56/// This stream implementation will _not_ attempt to do any further dependency resolution
57/// with [`ResolveDependencies`](crate::protocol::ResolveDependencies). there is an implicit
58/// assumption that if an item in the stream is required for processing,
59/// it will at some point be made available in the stream.
60/// This stream instead uses the [`NoopResolver`](crate::protocol::NoopResolver)
61pub fn ordered<S, Store, T>(
62    s: S,
63    cursor_store: Store,
64    initial_topic_cursor: TopicCursor,
65) -> OrderedStream<S, Store, T> {
66    OrderedStream::<S, Store, T> {
67        inner: s,
68        cursor_store,
69        topic_cursor: initial_topic_cursor,
70        _marker: PhantomData,
71    }
72}
73
74impl<S, Store, T> Stream for OrderedStream<S, Store, T>
75where
76    S: TryStream<Ok = Vec<T>>,
77    T: Envelope<'static> + prost::Message + Default + Clone,
78    S::Error: From<EnvelopeError> + From<OrderedStreamError>,
79    Store: CursorStore,
80{
81    type Item = Result<S::Ok, S::Error>;
82
83    fn poll_next(
84        mut self: std::pin::Pin<&mut Self>,
85        cx: &mut std::task::Context<'_>,
86    ) -> Poll<Option<Self::Item>> {
87        let item = match ready!(self.as_mut().project().inner.try_poll_next(cx)) {
88            Some(v) => v,
89            None => return Poll::Ready(None),
90        };
91        let envelopes = item?;
92        tracing::debug!(len = envelopes.len(), "new streamed group messages");
93        if tracing::enabled!(tracing::Level::TRACE) {
94            envelopes.iter().for_each(|e| {
95                if let Ok(cursor) = e.cursor() {
96                    tracing::trace!(
97                        originator_id = cursor.originator_id,
98                        sequence_id = cursor.sequence_id,
99                        "new streamed group message"
100                    );
101                }
102            });
103        }
104        let mut ordering = Ordered::builder()
105            .envelopes(envelopes)
106            .resolver(TypedNoopResolver::<T>::new())
107            .topic_cursor(self.topic_cursor.clone())
108            .store(&self.cursor_store)
109            .build()?;
110        ordering.order_offline().map_err(OrderedStreamError::from)?;
111        let (envelopes, mut new_cursor) = ordering.into_parts();
112        let this = self.as_mut().project();
113        std::mem::swap(this.topic_cursor, &mut new_cursor);
114        Poll::Ready(Some(Ok(envelopes)))
115    }
116}
117
118#[cfg(test)]
119mod test {
120    use super::*;
121    use crate::protocol::{InMemoryCursorStore, test::missing_dependencies};
122    use futures::{FutureExt, StreamExt, future, stream};
123    use proptest::prelude::*;
124    use xmtp_proto::api::VectorClock;
125
126    proptest! {
127        #[xmtp_common::test]
128        fn orders_stream_and_ices_missing(
129            envelopes in missing_dependencies(10, vec![10, 20, 30])
130        ) {
131            let store = InMemoryCursorStore::new();
132            let envs = envelopes.envelopes.clone();
133            let s = stream::once(future::ready(Ok::<_, EnvelopeError>(envs)));
134            let ordered_stream = ordered(s, store.clone(), TopicCursor::default());
135            futures::pin_mut!(ordered_stream);
136
137            let result = ordered_stream.next().now_or_never()
138                .expect("Stream should yield immediately")
139                .expect("Stream should not be empty")
140                .expect("Should not error");
141
142            let mut topic_cursor = TopicCursor::default();
143            for env in &result {
144                let topic = env.topic().unwrap();
145                topic_cursor.entry(topic.clone()).or_default().apply(&env.cursor());
146            }
147
148            for env in &result {
149                let topic = env.topic().unwrap();
150                let clock = topic_cursor.get_or_default(&topic);
151                prop_assert!(
152                    clock.dominates(&env.depends_on()),
153                    "Envelope {} should have satisfied dependencies. Topic clock: {}",
154                    env, clock
155                );
156            }
157
158            // Verify that if dependencies are missing, some envelopes are iced
159            if !envelopes.removed.is_empty() {
160                let has_deps_on_removed = envelopes.envelopes.iter()
161                    .any(|e| envelopes.removed.iter().any(|r| e.has_dependency_on(r)));
162                if has_deps_on_removed {
163                    prop_assert!(!store.icebox().is_empty(),
164                        "Expected some envelopes to be iced when dependencies are missing");
165                }
166            }
167        }
168    }
169}