xmtp_api_d14n/protocol/sort/
causal.rs1use xmtp_proto::types::{Topic, TopicCursor};
2use xmtp_proto::{api::VectorClock, types::GlobalCursor};
3
4use crate::protocol::{ApplyCursor, Envelope, EnvelopeError, Sort};
5
6pub struct CausalSort<'a, E> {
7 envelopes: &'a mut Vec<E>,
8 topic_cursor: &'a mut TopicCursor,
9}
10
11struct Missed<E> {
14 envelope: E,
15 depends_on: GlobalCursor,
16 topic: Topic,
17}
18
19impl<E> Missed<E> {
20 pub fn new(envelope: E, depends_on: GlobalCursor, topic: Topic) -> Self {
21 Self {
22 envelope,
23 depends_on,
24 topic,
25 }
26 }
27
28 pub fn into_envelope(self) -> E {
29 self.envelope
30 }
31}
32
33impl<'b, 'a: 'b, E: Envelope<'a>> CausalSort<'b, E> {
34 fn recover_newly_valid(&mut self, missed: &mut Vec<Missed<E>>) -> Vec<E> {
39 missed
40 .extract_if(.., |m| {
41 let clock = self.topic_cursor.get_or_default(&m.topic);
42 clock.dominates(&m.depends_on)
43 })
44 .map(move |m| m.envelope)
45 .collect()
46 }
47}
48
49impl<'b, 'a: 'b, E: Envelope<'a>> Sort<Vec<E>> for CausalSort<'b, E> {
50 fn sort(mut self) -> Result<Option<Vec<E>>, EnvelopeError> {
51 let mut i = 0;
52 let mut missed = Vec::new();
53 while i < self.envelopes.len() {
54 let env = &mut self.envelopes[i];
55 let topic = env.topic()?;
56 let last_seen = env.depends_on()?.unwrap_or(Default::default());
57 let vector_clock = self.topic_cursor.get_or_default(&topic);
58 if vector_clock.dominates(&last_seen) {
59 self.topic_cursor.apply(env)?;
60 let newly_valid = self.recover_newly_valid(&mut missed);
61 i += 1;
62 self.envelopes.splice(i..i, newly_valid.into_iter());
63 } else {
64 let missed_envelope = self.envelopes.remove(i);
65 missed.push(Missed::new(missed_envelope, last_seen, topic));
66 }
67 }
68 Ok((!missed.is_empty()).then_some(missed.into_iter().map(Missed::into_envelope).collect()))
69 }
70}
71
72pub fn causal<'b, 'a: 'b, E: Envelope<'a>>(
95 envelopes: &'b mut Vec<E>,
96 topic_cursor: &'b mut TopicCursor,
97) -> impl Sort<Vec<E>> + use<'a, 'b, E> {
98 CausalSort {
99 envelopes,
100 topic_cursor,
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use crate::protocol::sort;
107 use crate::protocol::utils::test::{
108 EnvelopesWithMissing, TestEnvelope, depends_on_one, missing_dependencies,
109 sorted_dependencies,
110 };
111 use proptest::prelude::*;
112
113 use super::*;
114 fn assert_sorted(sorted: &[TestEnvelope], missing: &[TestEnvelope], removed: &[TestEnvelope]) {
115 let mut missing_and_removed = removed.to_vec();
116 missing_and_removed.extend(missing.to_vec().iter().cloned());
117 for envelope in missing {
118 assert!(
119 depends_on_one(envelope, missing_and_removed.as_slice()),
122 "{envelope} has no dependency that is missing. missing & removed: {:?}",
123 missing_and_removed
124 .iter()
125 .map(|e| e.cursor().to_string())
126 .collect::<Vec<_>>()
127 );
128 }
129 for envelope in sorted {
131 assert!(
132 !depends_on_one(envelope, missing_and_removed.as_slice()),
133 "{envelope} depends on a missing or removed dependency in \nremoved: {:?}, \nmissing: {:?} but it was marked as sorted,\n sorted {:?}",
134 removed
135 .iter()
136 .map(|e| e.cursor().to_string())
137 .collect::<Vec<_>>(),
138 missing
139 .iter()
140 .map(|e| e.cursor().to_string())
141 .collect::<Vec<_>>(),
142 sorted
143 .iter()
144 .map(|e| e.cursor().to_string())
145 .collect::<Vec<_>>(),
146 );
147 }
148 }
149
150 proptest! {
151 #[xmtp_common::test]
152 fn causal_sort(envelopes in missing_dependencies(10, vec![10, 20, 30, 40])) {
153 let mut topic_cursor = TopicCursor::default();
154 let EnvelopesWithMissing { mut envelopes, removed, .. } = envelopes;
155 let mut missing = vec![];
156 if let Some(m) = sort::causal(&mut envelopes, &mut topic_cursor).sort()? {
157 missing = m.to_vec();
158 }
159 assert_sorted(&envelopes, &missing, &removed)
160 }
161
162 #[xmtp_common::test]
163 fn reapplies_within_array(mut envelopes in sorted_dependencies(10, vec![10, 20, 30, 40]).prop_shuffle()) {
164 let mut topic_cursor = TopicCursor::default();
165 let mut missing = vec![];
166 if let Some(m) = sort::causal(&mut envelopes, &mut topic_cursor).sort()? {
167 missing = m.to_vec();
168 }
169 assert_sorted(&envelopes, &missing, &[])
170 }
171 }
172}