xmtp_api_d14n/protocol/
order.rs

1use std::collections::HashSet;
2
3use crate::protocol::{
4    CursorStore, Envelope, EnvelopeError, OrderedEnvelopeCollection, ResolutionError,
5    ResolveDependencies, Resolved, Sort, sort, types::RequiredDependency,
6};
7use derive_builder::Builder;
8use itertools::Itertools;
9use tracing::Level;
10use xmtp_proto::api::VectorClock;
11use xmtp_proto::types::{Cursor, OrphanedEnvelope, TopicCursor};
12
13/// Order dependencies of `Self` according to [XIP](https://github.com/xmtp/XIPs/blob/main/XIPs/xip-49-decentralized-backend.md#335-cross-originator-message-ordering)
14/// If dependencies are missing, this ordering will try to resolve them
15/// and re-apply resolved dependencies to the front of the envelope list
16/// construct this strategy with [`Ordered::builder`]
17#[derive(Debug, Clone, Builder)]
18#[builder(setter(strip_option), build_fn(error = "EnvelopeError"))]
19pub struct Ordered<T, R, S> {
20    envelopes: Vec<T>,
21    resolver: R,
22    topic_cursor: TopicCursor,
23    store: S,
24}
25
26impl<T, R, S> Ordered<T, R, S>
27where
28    S: CursorStore,
29    R: ResolveDependencies<ResolvedEnvelope = T>,
30    T: Envelope<'static> + prost::Message + Default,
31{
32    /// get the missing dependencies in the form of a [`RequiredDependency`]
33    fn required_dependencies(
34        &mut self,
35        missing: &[T],
36    ) -> Result<HashSet<RequiredDependency>, EnvelopeError> {
37        missing
38            .iter()
39            .map(|e| {
40                let dependencies = e.depends_on()?.unwrap_or(Default::default());
41                let topic = e.topic()?;
42                let topic_clock = self.topic_cursor.get_or_default(&topic);
43                let need = topic_clock.missing(&dependencies);
44                let needed_by = e.cursor()?;
45                Ok(need
46                    .into_iter()
47                    .map(move |c| RequiredDependency::new(topic.clone(), c, needed_by)))
48            })
49            .flatten_ok()
50            .try_collect()
51    }
52
53    // convenient internal proxy to causal sorting
54    fn causal_sort(&mut self) -> Result<Option<Vec<T>>, EnvelopeError> {
55        sort::causal(&mut self.envelopes, &mut self.topic_cursor).sort()
56    }
57
58    // convenient internal proxy to timestamp sort
59    fn timestamp_sort(&mut self) -> Result<(), EnvelopeError> {
60        // timestamp sort never returns missing envelopes
61        let _ = sort::timestamp(&mut self.envelopes).sort()?;
62        Ok(())
63    }
64
65    /// try to find any lost children and re-apply them to the
66    /// end of the envelopes list before any resolution occurs
67    fn recover_lost_children(&mut self) -> Result<(), EnvelopeError> {
68        let cursors: Vec<_> = self.envelopes.iter().map(|e| e.cursor()).try_collect()?;
69        let children = self.store.resolve_children(&cursors)?;
70        if !children.is_empty() {
71            tracing::info!("recovered {} children", children.len());
72            if tracing::enabled!(Level::TRACE) {
73                for child in &children {
74                    tracing::trace!(
75                        "recovered child@{} dependant on parent@{} for group@{}",
76                        &child.cursor,
77                        &child.depends_on,
78                        &child.group_id
79                    );
80                }
81            }
82        }
83        let cursors: HashSet<Cursor> = HashSet::from_iter(cursors);
84        let mut envelopes: Vec<T> = children
85            .into_iter()
86            // ensure we don't re-add duplicates from the db
87            .filter(|o| !cursors.contains(&o.cursor))
88            .map(OrphanedEnvelope::into_payload)
89            .map(T::decode)
90            .try_collect()?;
91        // ensure we append them to the list so that the sorting
92        // adds the parent envelopes to the topic cursor before the orphans
93        self.envelopes.append(&mut envelopes);
94        Ok(())
95    }
96}
97
98impl<T, R, S> Ordered<T, R, S> {
99    pub fn into_parts(self) -> (Vec<T>, TopicCursor) {
100        (self.envelopes, self.topic_cursor)
101    }
102}
103
104impl<T: Clone, R: Clone, S: Clone> Ordered<T, R, S> {
105    pub fn builder() -> OrderedBuilder<T, R, S> {
106        OrderedBuilder::default()
107    }
108}
109
110#[xmtp_common::async_trait]
111impl<T, R, S> OrderedEnvelopeCollection for Ordered<T, R, S>
112where
113    T: Envelope<'static> + prost::Message + Default,
114    R: ResolveDependencies<ResolvedEnvelope = T>,
115    S: CursorStore,
116{
117    // NOTE:
118    // In the case where a child has multiple dependants, and one is still missing:
119    // 1.) child is recovered
120    // 2.) child is added to "missing"
121    // 3.) resolution of missing is attempted
122    // 4.) child re-iced if resolution failed
123    async fn order(&mut self) -> Result<(), ResolutionError> {
124        self.recover_lost_children()?;
125        self.timestamp_sort()?;
126        while let Some(mut missing) = self.causal_sort()? {
127            let needed_envelopes = self.required_dependencies(&missing)?;
128            let Resolved { mut resolved, .. } = self.resolver.resolve(needed_envelopes).await?;
129            if resolved.is_empty() {
130                let orphans = missing
131                    .into_iter()
132                    .map(|e| e.orphan())
133                    .inspect(|orphan| {
134                        if let Ok(o) = orphan {
135                            tracing::debug!("icing {}", o)
136                        }
137                    })
138                    .try_collect()?;
139                self.store.ice(orphans)?;
140                break;
141            }
142            self.envelopes.append(&mut resolved);
143            self.envelopes.append(&mut missing);
144            self.recover_lost_children()?;
145        }
146        Ok(())
147    }
148
149    fn order_offline(&mut self) -> Result<(), ResolutionError> {
150        self.recover_lost_children()?;
151        self.timestamp_sort()?;
152        if let Some(missing) = self.causal_sort()? {
153            tracing::debug!("icing {} orphans", missing.len());
154            let orphans = missing
155                .into_iter()
156                .map(|e| e.orphan())
157                .inspect(|orphan| {
158                    if let Ok(o) = orphan {
159                        tracing::debug!("icing {}", o)
160                    }
161                })
162                .try_collect()?;
163            self.store.ice(orphans)?;
164        }
165        Ok(())
166    }
167}
168
169#[cfg(test)]
170mod test {
171    use std::sync::Arc;
172
173    use super::*;
174    use crate::protocol::{
175        InMemoryCursorStore,
176        utils::test::{EnvelopesWithMissing, TestEnvelope, missing_dependencies},
177    };
178    use futures::FutureExt;
179    use parking_lot::Mutex;
180    use proptest::{prelude::*, sample::subsequence};
181    use xmtp_common::{DebugDisplay, assert_ok};
182    use xmtp_proto::types::OriginatorId;
183
184    // Simple mock resolver that holds available envelopes to resolve
185    #[derive(Clone, Debug)]
186    struct MockResolver {
187        available: Arc<Mutex<Vec<TestEnvelope>>>,
188        unavailable: Arc<Mutex<Vec<TestEnvelope>>>,
189        returned: Arc<Mutex<Vec<TestEnvelope>>>,
190    }
191
192    #[xmtp_common::async_trait]
193    impl ResolveDependencies for MockResolver {
194        type ResolvedEnvelope = TestEnvelope;
195
196        async fn resolve(
197            &self,
198            missing: HashSet<RequiredDependency>,
199        ) -> Result<Resolved<TestEnvelope>, ResolutionError> {
200            let missing_set: HashSet<_> = missing.iter().map(|m| m.cursor).collect();
201            let mut available = self.available.lock();
202            let available_cursors = available
203                .iter()
204                .map(|a| a.cursor())
205                .collect::<HashSet<Cursor>>();
206            // Return envelopes that match the missing set
207            let resolved = available
208                .extract_if(.., |env| {
209                    let cursor = env.cursor();
210                    missing_set.contains(&cursor)
211                })
212                .collect::<Vec<_>>();
213            let mut ret = self.returned.lock();
214            ret.extend(resolved.clone());
215            let unresolved_cursors = missing_set
216                .difference(&available_cursors)
217                .collect::<HashSet<_>>();
218            let unresolved: HashSet<_> = missing
219                .into_iter()
220                .filter(|m| unresolved_cursors.contains(&m.cursor))
221                .collect();
222
223            Ok(Resolved::new(
224                resolved,
225                (!unresolved.is_empty()).then_some(unresolved),
226            ))
227        }
228    }
229
230    #[track_caller]
231    fn assert_topic_cursor_seen(
232        cursor: &TopicCursor,
233        env: &TestEnvelope,
234        message: &str,
235    ) -> Result<(), TestCaseError> {
236        let clock = cursor.get(&env.topic().unwrap()).unwrap();
237        prop_assert!(clock.has_seen(&env.cursor()), "{}", message);
238        Ok(())
239    }
240
241    #[track_caller]
242    fn assert_dependencies_satisfied(
243        env: &TestEnvelope,
244        topic_cursor: &mut TopicCursor,
245    ) -> Result<(), TestCaseError> {
246        let topic = env.topic().unwrap();
247        let clock = topic_cursor.get_or_default(&topic);
248        prop_assert!(
249            clock.dominates(&env.depends_on()),
250            "Envelope {} should have satisfied dependencies. Topic clock: {}",
251            env,
252            clock
253        );
254        Ok(())
255    }
256
257    #[track_caller]
258    fn assert_no_unavailable_deps(
259        env: &TestEnvelope,
260        unavailable: &[TestEnvelope],
261    ) -> Result<(), TestCaseError> {
262        for unavailable_env in unavailable {
263            prop_assert!(
264                !env.has_dependency_on(unavailable_env),
265                "Envelope should not depend on unavailable envelope. Envelope: {}, Unavailable: {}",
266                env,
267                unavailable_env
268            );
269        }
270        Ok(())
271    }
272
273    prop_compose! {
274        pub fn resolvable_dependencies(length: usize, originators: Vec<OriginatorId>)
275            (envelopes in missing_dependencies(length, originators))
276                (available in subsequence(envelopes.removed.clone(), 0..=envelopes.removed.len()), envelopes in Just(envelopes))
277        -> EnvelopesWithResolver {
278            let mut unavailable = envelopes.removed.clone();
279            unavailable.retain(|e| !available.contains(e));
280            EnvelopesWithResolver {
281                missing: envelopes,
282                resolver: MockResolver {
283                    available: Arc::new(Mutex::new(available)),
284                    unavailable: Arc::new(Mutex::new(unavailable)),
285                    returned: Arc::new(Mutex::new(Vec::new()))
286                }
287            }
288        }
289    }
290
291    #[derive(Debug, Clone)]
292    struct EnvelopesWithResolver {
293        missing: EnvelopesWithMissing,
294        resolver: MockResolver,
295    }
296
297    impl EnvelopesWithResolver {
298        fn available(&self) -> Vec<TestEnvelope> {
299            let a = self.resolver.available.lock();
300            a.clone()
301        }
302
303        fn unavailable(&self) -> Vec<TestEnvelope> {
304            let v = self.resolver.unavailable.lock();
305            v.clone()
306        }
307
308        /// make a dependency at `idx` in `unavailable` available
309        pub fn make_available(&self, idx: usize) -> TestEnvelope {
310            let mut v = self.resolver.unavailable.lock();
311            let mut available = self.resolver.available.lock();
312            let new = v.remove(idx);
313            available.push(new.clone());
314            new
315        }
316
317        pub fn returned(&self) -> Vec<TestEnvelope> {
318            let v = self.resolver.returned.lock();
319            v.clone()
320        }
321
322        fn all_envelopes(&self) -> Vec<TestEnvelope> {
323            self.available()
324                .into_iter()
325                .chain(self.missing.envelopes.clone())
326                .chain(self.returned())
327                .collect()
328        }
329
330        fn find_envelope(&self, cursor: &Cursor) -> Option<TestEnvelope> {
331            self.all_envelopes()
332                .into_iter()
333                .find(|e| e.cursor() == *cursor)
334        }
335
336        fn has_unavailable_in_dependency_chain(&self, env: &TestEnvelope) -> bool {
337            let unavailable = self.unavailable();
338
339            // Check immediate dependencies
340            if env.has_dependency_on_any(&unavailable) {
341                return true;
342            }
343
344            // Check transitive dependencies
345            let mut to_check = vec![env.depends_on()];
346            while let Some(deps) = to_check.pop() {
347                if deps.is_empty() {
348                    continue;
349                }
350
351                for cursor in deps.cursors() {
352                    if let Some(dep_env) = self.find_envelope(&cursor) {
353                        if dep_env.has_dependency_on_any(&unavailable) {
354                            return true;
355                        }
356                        if !dep_env.depends_on().is_empty() {
357                            to_check.push(dep_env.depends_on());
358                        }
359                    }
360                }
361            }
362            false
363        }
364
365        // get only dependencies that can be validly depended on
366        // (none of the dependencies dependants are unavailable)
367        pub fn only_valid_dependants(&self) -> Vec<TestEnvelope> {
368            self.missing
369                .envelopes
370                .iter()
371                .filter(|env| !self.has_unavailable_in_dependency_chain(env))
372                .cloned()
373                .collect()
374        }
375    }
376
377    proptest! {
378        #[xmtp_common::test]
379        fn orders_with_unresolvable_dependencies(
380            envelopes in resolvable_dependencies(10, vec![10, 20, 30])
381        ) {
382            let valid = envelopes.only_valid_dependants();
383            let available = envelopes.available();
384            let unavailable = envelopes.unavailable();
385            let EnvelopesWithResolver {
386                missing,
387                resolver
388            } = envelopes;
389            let store = InMemoryCursorStore::new();
390            let mut ordered = Ordered::builder()
391                .envelopes(missing.envelopes)
392                .resolver(resolver)
393                .store(store.clone())
394                .topic_cursor(TopicCursor::default())
395                .build()
396                .unwrap();
397
398            // Perform ordering - some dependencies cannot be resolved
399            ordered.order().now_or_never()
400                .expect("Future should complete immediately")
401                .unwrap();
402
403            let (result, mut topic_cursor) = ordered.into_parts();
404
405            // Verify all valid envelopes are seen by topic cursor
406            for env in &valid {
407                assert_topic_cursor_seen(
408                    &topic_cursor,
409                    env,
410                    &format!("topic cursor {} must have seen {:?}\n\
411                        ordering_pass: \n{}\n\
412                        icebox: \n{}\n",
413                        topic_cursor, env, result.format_enumerated(),store.icebox().format_enumerated()
414                    ))?;
415            }
416
417            // Check that all envelopes in result have satisfied dependencies
418            for envelope in &result {
419                assert_dependencies_satisfied(envelope, &mut topic_cursor)?;
420                // Envelopes with satisfied dependencies shouldn't depend on unavailable ones
421                assert_no_unavailable_deps(envelope, &unavailable)?;
422            }
423            // Verify that envelopes which were made available are in the result
424            // (unless they themselves depend on unavailable envelopes or aren't needed)
425            for available_env in &available {
426                if available_env.has_dependency_on_any(&unavailable) { continue; }
427                if result.iter().all(|e| !e.has_dependency_on(available_env)) { continue; }
428
429                prop_assert!(
430                    result.iter().any(|e| e == available_env),
431                    "Result does not contain {}", available_env
432                );
433                assert_dependencies_satisfied(available_env, &mut topic_cursor)?;
434            }
435        }
436
437        #[xmtp_common::test]
438        fn orders_with_recovered_children(
439            envelopes in resolvable_dependencies(10, vec![10, 20, 30])
440        ) {
441            let valid = envelopes.only_valid_dependants();
442            let valid_cursors = valid.iter().map(|e| e.cursor()).collect::<HashSet<_>>();
443            let unavailable = envelopes.unavailable();
444            let EnvelopesWithResolver {
445                ref missing,
446                ref resolver
447            } = envelopes;
448
449            let topic_cursor = TopicCursor::default();
450            let envelopes_to_check = missing.envelopes.clone();
451            let store = InMemoryCursorStore::new();
452            let mut ordered = Ordered::builder()
453                .envelopes(missing.envelopes.clone())
454                .resolver(resolver.clone())
455                .store(store.clone())
456                .topic_cursor(topic_cursor)
457                .build()
458                .unwrap();
459
460            // Perform ordering - some dependencies cannot be resolved, so children get iced
461            ordered.order().now_or_never()
462                .expect("Future should complete immediately")
463                .unwrap();
464
465            let (first_ordering_pass, topic_cursor) = ordered.into_parts();
466
467            // Verify that the store has some orphaned envelopes
468            let orphan_count = store.orphan_count();
469
470            // If there were unavailable dependencies and envelopes depending on them,
471            // we should have some orphans
472            if !unavailable.is_empty() {
473                let has_dependent_envelopes = envelopes_to_check.iter().any(|e| {
474                    unavailable.iter().any(|u| e.has_dependency_on(u))
475                });
476
477                if has_dependent_envelopes {
478                    prop_assert!(
479                        orphan_count > 0,
480                        "Expected some envelopes to be iced when dependencies are unavailable"
481                    );
482                }
483            }
484
485            // Now simulate a scenario where one of the unavailable envelopes becomes available
486            // and we do another ordering pass - the children should be recovered
487            if !unavailable.is_empty() && orphan_count > 0 {
488                // make the first unavailable envelope available again
489                let newly_available = envelopes.make_available(0);
490
491                // Create a new ordered instance with the newly available envelope
492                let mut ordered = Ordered::builder()
493                    .envelopes(vec![newly_available.clone()])
494                    .resolver(resolver.clone())
495                    .store(store.clone())
496                    .topic_cursor(topic_cursor)
497                    .build()
498                    .unwrap();
499
500                // Perform ordering again - this should recover children
501                let orphan_count = store.orphan_count();
502
503                // If the newly available envelope had children, they should be recovered
504                // (orphan count should decrease)
505                let (had_children, returned) = {
506                    let returned = store.resolve_children(&[newly_available.cursor()]);
507                    assert_ok!(&returned);
508                    let returned = returned.unwrap();
509                    // Check if any orphan was a child of the newly available envelope
510                    let had_children = !returned.is_empty();
511                    (had_children, returned)
512                };
513
514                ordered.order().now_or_never()
515                    .expect("Future should complete immediately")
516                    .unwrap();
517
518                let (second_ordering_pass, mut new_topic_cursor) = ordered.into_parts();
519
520                let had_children = orphan_count > 0 && had_children;
521                let child_str = returned.format_list();
522                let icebox_str = store.icebox().format_enumerated();
523                let second_ordering_pass_str = second_ordering_pass.format_enumerated();
524                let first_ordering_pass_str = first_ordering_pass.format_enumerated();
525                let is_valid = valid_cursors.contains(&newly_available.cursor());
526                let valid_children = returned.iter().map(TestEnvelope::from).filter(|c| c.only_depends_on(&valid)).collect::<Vec<_>>();
527                let num_valid_children = valid_children.len();
528                let valid_children_str = valid_children.format_enumerated();
529                if had_children && is_valid {
530                    prop_assert_eq!(1 + num_valid_children, second_ordering_pass.len(),
531                        "valid orphans should be in envelopes list\n\
532                        valid_children: \n{}\n\
533                        icebox: \n{}\n\
534                        final topic_cursor: \n{}\n\
535                        first_ordering_pass: \n{}\n\
536                        newly_available->{}\n\
537                        second_ordering_pass:\n{}\n\
538                        ", valid_children_str, icebox_str, new_topic_cursor, first_ordering_pass_str, newly_available, second_ordering_pass_str,
539                    );
540                    prop_assert!(
541                        !second_ordering_pass.is_empty(),
542                        "Expected children to be recovered when parent becomes available.\n \
543                         Result length: {} \
544                         \n newly_available: {} \
545                         \nicebox:\n{} \
546                         \nlen: {} \
547                          \nrecovered_children:\n{}
548                          \n topic cursor {:?}",
549                        second_ordering_pass.len(),
550                        newly_available,
551                        icebox_str,
552                        store.orphan_count(),
553                        child_str,
554                        new_topic_cursor
555                    );
556                }
557
558                // Verify that all envelopes in the result have satisfied dependencies
559                for envelope in &second_ordering_pass {
560                    assert_dependencies_satisfied(envelope, &mut new_topic_cursor)?;
561                }
562            }
563        }
564    }
565}