xmtp_api_d14n/protocol/sort/
causal.rs

1use xmtp_proto::types::{Topic, TopicCursor};
2use xmtp_proto::{api::VectorClock, types::GlobalCursor};
3
4use crate::protocol::{ApplyCursor, Envelope, EnvelopeError, Sort};
5
6pub struct CausalSort<'a, E> {
7    envelopes: &'a mut Vec<E>,
8    topic_cursor: &'a mut TopicCursor,
9}
10
11// store temporary info about the envelope
12// so that we do not need to re-deserialize
13struct Missed<E> {
14    envelope: E,
15    depends_on: GlobalCursor,
16    topic: Topic,
17}
18
19impl<E> Missed<E> {
20    pub fn new(envelope: E, depends_on: GlobalCursor, topic: Topic) -> Self {
21        Self {
22            envelope,
23            depends_on,
24            topic,
25        }
26    }
27
28    pub fn into_envelope(self) -> E {
29        self.envelope
30    }
31}
32
33impl<'b, 'a: 'b, E: Envelope<'a>> CausalSort<'b, E> {
34    // check if any of the dependencies of envelopes in `other` are
35    // satisfied by any envelopes in `self.envelopes`
36    // this lets us resolve dependencies internally
37    // for deeply-nested sets of dependencies.
38    fn recover_newly_valid(&mut self, missed: &mut Vec<Missed<E>>) -> Vec<E> {
39        missed
40            .extract_if(.., |m| {
41                let clock = self.topic_cursor.get_or_default(&m.topic);
42                clock.dominates(&m.depends_on)
43            })
44            .map(move |m| m.envelope)
45            .collect()
46    }
47}
48
49impl<'b, 'a: 'b, E: Envelope<'a>> Sort<Vec<E>> for CausalSort<'b, E> {
50    fn sort(mut self) -> Result<Option<Vec<E>>, EnvelopeError> {
51        let mut i = 0;
52        let mut missed = Vec::new();
53        while i < self.envelopes.len() {
54            let env = &mut self.envelopes[i];
55            let topic = env.topic()?;
56            let last_seen = env.depends_on()?.unwrap_or(Default::default());
57            let vector_clock = self.topic_cursor.get_or_default(&topic);
58            if vector_clock.dominates(&last_seen) {
59                self.topic_cursor.apply(env)?;
60                let newly_valid = self.recover_newly_valid(&mut missed);
61                i += 1;
62                self.envelopes.splice(i..i, newly_valid.into_iter());
63            } else {
64                let missed_envelope = self.envelopes.remove(i);
65                missed.push(Missed::new(missed_envelope, last_seen, topic));
66            }
67        }
68        Ok((!missed.is_empty()).then_some(missed.into_iter().map(Missed::into_envelope).collect()))
69    }
70}
71
72/// Sorts Envelopes Causally in-place
73/// All envelopes part of `Self` will be sorted. Envelopes with missing
74/// dependencies will be returned. `TopicCursor` will be updated to reflect
75/// causally-verified envelopes.
76/// [XIP Definition](https://github.com/xmtp/XIPs/blob/main/XIPs/xip-49-decentralized-backend.md#causal-ordering)
77/// A causal sort orders envelopes by their dependencies.
78/// A `dependency` is defined as an envelope which must be processed before any
79/// dependant envelopes.
80/// ```text
81/// Unsorted (arrival order):        Sorted (causal order):
82///
83///   [E3] ──depends on──> [E1]         [E1] ───→ [E2] ───→ [E3]
84///   [E1] (no deps)                     ↑         ↑
85///   [E2] ──depends on──> [E1]          │         │
86///                                   (no deps) (needs E1)
87///
88/// E1 must be processed first, then E2, then E3
89/// ```
90/// # Arguments
91/// * `envelopes`: the [`Envelope`]'s being sorted
92/// * `topic_cursor`: the cursor position of all known topics
93///
94pub fn causal<'b, 'a: 'b, E: Envelope<'a>>(
95    envelopes: &'b mut Vec<E>,
96    topic_cursor: &'b mut TopicCursor,
97) -> impl Sort<Vec<E>> + use<'a, 'b, E> {
98    CausalSort {
99        envelopes,
100        topic_cursor,
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use crate::protocol::sort;
107    use crate::protocol::utils::test::{
108        EnvelopesWithMissing, TestEnvelope, depends_on_one, missing_dependencies,
109        sorted_dependencies,
110    };
111    use proptest::prelude::*;
112
113    use super::*;
114    fn assert_sorted(sorted: &[TestEnvelope], missing: &[TestEnvelope], removed: &[TestEnvelope]) {
115        let mut missing_and_removed = removed.to_vec();
116        missing_and_removed.extend(missing.to_vec().iter().cloned());
117        for envelope in missing {
118            assert!(
119                // verify the missing envelope has a dependency on a removed or a different missing
120                // envelope
121                depends_on_one(envelope, missing_and_removed.as_slice()),
122                "{envelope} has no dependency that is missing. missing & removed: {:?}",
123                missing_and_removed
124                    .iter()
125                    .map(|e| e.cursor().to_string())
126                    .collect::<Vec<_>>()
127            );
128        }
129        // ensure the ones that are sorted do not depend on any that are removed
130        for envelope in sorted {
131            assert!(
132                !depends_on_one(envelope, missing_and_removed.as_slice()),
133                "{envelope} depends on a missing or removed dependency in \nremoved: {:?}, \nmissing: {:?} but it was marked as sorted,\n sorted {:?}",
134                removed
135                    .iter()
136                    .map(|e| e.cursor().to_string())
137                    .collect::<Vec<_>>(),
138                missing
139                    .iter()
140                    .map(|e| e.cursor().to_string())
141                    .collect::<Vec<_>>(),
142                sorted
143                    .iter()
144                    .map(|e| e.cursor().to_string())
145                    .collect::<Vec<_>>(),
146            );
147        }
148    }
149
150    proptest! {
151        #[xmtp_common::test]
152        fn causal_sort(envelopes in missing_dependencies(10, vec![10, 20, 30, 40])) {
153            let mut topic_cursor = TopicCursor::default();
154            let EnvelopesWithMissing { mut envelopes, removed, .. } = envelopes;
155            let mut missing = vec![];
156            if let Some(m) = sort::causal(&mut envelopes, &mut topic_cursor).sort()? {
157                missing = m.to_vec();
158            }
159            assert_sorted(&envelopes, &missing, &removed)
160        }
161
162        #[xmtp_common::test]
163        fn reapplies_within_array(mut envelopes in sorted_dependencies(10, vec![10, 20, 30, 40]).prop_shuffle()) {
164            let mut topic_cursor = TopicCursor::default();
165            let mut missing = vec![];
166            if let Some(m) = sort::causal(&mut envelopes, &mut topic_cursor).sort()? {
167                missing = m.to_vec();
168            }
169            assert_sorted(&envelopes, &missing, &[])
170        }
171    }
172}