xmtp_api_d14n/queries/stream/
ordered.rs1use crate::protocol::{
4 CursorStore, Envelope, EnvelopeError, Ordered, OrderedEnvelopeCollection, ResolutionError,
5 TypedNoopResolver,
6};
7use futures::{Stream, TryStream};
8use pin_project_lite::pin_project;
9use std::{
10 error::Error,
11 marker::PhantomData,
12 task::{Poll, ready},
13};
14use xmtp_common::RetryableError;
15use xmtp_proto::{api::ApiClientError, types::TopicCursor};
16
17pin_project! {
18 pub struct OrderedStream<S, Store, T> {
19 #[pin] inner: S,
20 cursor_store: Store,
21 topic_cursor: TopicCursor,
22 _marker: PhantomData<T>
23 }
24}
25
26#[derive(thiserror::Error, Debug)]
29pub enum OrderedStreamError {
30 #[error(transparent)]
31 Resolver(#[from] ResolutionError),
32}
33
34impl<E: Error> From<OrderedStreamError> for ApiClientError<E> {
35 fn from(value: OrderedStreamError) -> Self {
36 ApiClientError::Other(Box::new(value) as _)
37 }
38}
39
40impl RetryableError for OrderedStreamError {
41 fn is_retryable(&self) -> bool {
42 false
43 }
44}
45
46impl From<OrderedStreamError> for EnvelopeError {
47 fn from(value: OrderedStreamError) -> Self {
48 EnvelopeError::DynError(Box::new(value) as _)
49 }
50}
51
52pub fn ordered<S, Store, T>(
62 s: S,
63 cursor_store: Store,
64 initial_topic_cursor: TopicCursor,
65) -> OrderedStream<S, Store, T> {
66 OrderedStream::<S, Store, T> {
67 inner: s,
68 cursor_store,
69 topic_cursor: initial_topic_cursor,
70 _marker: PhantomData,
71 }
72}
73
74impl<S, Store, T> Stream for OrderedStream<S, Store, T>
75where
76 S: TryStream<Ok = Vec<T>>,
77 T: Envelope<'static> + prost::Message + Default + Clone,
78 S::Error: From<EnvelopeError> + From<OrderedStreamError>,
79 Store: CursorStore,
80{
81 type Item = Result<S::Ok, S::Error>;
82
83 fn poll_next(
84 mut self: std::pin::Pin<&mut Self>,
85 cx: &mut std::task::Context<'_>,
86 ) -> Poll<Option<Self::Item>> {
87 let item = match ready!(self.as_mut().project().inner.try_poll_next(cx)) {
88 Some(v) => v,
89 None => return Poll::Ready(None),
90 };
91 let envelopes = item?;
92 tracing::debug!(len = envelopes.len(), "new streamed group messages");
93 if tracing::enabled!(tracing::Level::TRACE) {
94 envelopes.iter().for_each(|e| {
95 if let Ok(cursor) = e.cursor() {
96 tracing::trace!(
97 originator_id = cursor.originator_id,
98 sequence_id = cursor.sequence_id,
99 "new streamed group message"
100 );
101 }
102 });
103 }
104 let mut ordering = Ordered::builder()
105 .envelopes(envelopes)
106 .resolver(TypedNoopResolver::<T>::new())
107 .topic_cursor(self.topic_cursor.clone())
108 .store(&self.cursor_store)
109 .build()?;
110 ordering.order_offline().map_err(OrderedStreamError::from)?;
111 let (envelopes, mut new_cursor) = ordering.into_parts();
112 let this = self.as_mut().project();
113 std::mem::swap(this.topic_cursor, &mut new_cursor);
114 Poll::Ready(Some(Ok(envelopes)))
115 }
116}
117
118#[cfg(test)]
119mod test {
120 use super::*;
121 use crate::protocol::{InMemoryCursorStore, test::missing_dependencies};
122 use futures::{FutureExt, StreamExt, future, stream};
123 use proptest::prelude::*;
124 use xmtp_proto::api::VectorClock;
125
126 proptest! {
127 #[xmtp_common::test]
128 fn orders_stream_and_ices_missing(
129 envelopes in missing_dependencies(10, vec![10, 20, 30])
130 ) {
131 let store = InMemoryCursorStore::new();
132 let envs = envelopes.envelopes.clone();
133 let s = stream::once(future::ready(Ok::<_, EnvelopeError>(envs)));
134 let ordered_stream = ordered(s, store.clone(), TopicCursor::default());
135 futures::pin_mut!(ordered_stream);
136
137 let result = ordered_stream.next().now_or_never()
138 .expect("Stream should yield immediately")
139 .expect("Stream should not be empty")
140 .expect("Should not error");
141
142 let mut topic_cursor = TopicCursor::default();
143 for env in &result {
144 let topic = env.topic().unwrap();
145 topic_cursor.entry(topic.clone()).or_default().apply(&env.cursor());
146 }
147
148 for env in &result {
149 let topic = env.topic().unwrap();
150 let clock = topic_cursor.get_or_default(&topic);
151 prop_assert!(
152 clock.dominates(&env.depends_on()),
153 "Envelope {} should have satisfied dependencies. Topic clock: {}",
154 env, clock
155 );
156 }
157
158 if !envelopes.removed.is_empty() {
160 let has_deps_on_removed = envelopes.envelopes.iter()
161 .any(|e| envelopes.removed.iter().any(|r| e.has_dependency_on(r)));
162 if has_deps_on_removed {
163 prop_assert!(!store.icebox().is_empty(),
164 "Expected some envelopes to be iced when dependencies are missing");
165 }
166 }
167 }
168 }
169}