1use std::collections::HashSet;
2
3use crate::protocol::{
4 CursorStore, Envelope, EnvelopeError, OrderedEnvelopeCollection, ResolutionError,
5 ResolveDependencies, Resolved, Sort, sort, types::RequiredDependency,
6};
7use derive_builder::Builder;
8use itertools::Itertools;
9use tracing::Level;
10use xmtp_proto::api::VectorClock;
11use xmtp_proto::types::{Cursor, OrphanedEnvelope, TopicCursor};
12
13#[derive(Debug, Clone, Builder)]
18#[builder(setter(strip_option), build_fn(error = "EnvelopeError"))]
19pub struct Ordered<T, R, S> {
20 envelopes: Vec<T>,
21 resolver: R,
22 topic_cursor: TopicCursor,
23 store: S,
24}
25
26impl<T, R, S> Ordered<T, R, S>
27where
28 S: CursorStore,
29 R: ResolveDependencies<ResolvedEnvelope = T>,
30 T: Envelope<'static> + prost::Message + Default,
31{
32 fn required_dependencies(
34 &mut self,
35 missing: &[T],
36 ) -> Result<HashSet<RequiredDependency>, EnvelopeError> {
37 missing
38 .iter()
39 .map(|e| {
40 let dependencies = e.depends_on()?.unwrap_or(Default::default());
41 let topic = e.topic()?;
42 let topic_clock = self.topic_cursor.get_or_default(&topic);
43 let need = topic_clock.missing(&dependencies);
44 let needed_by = e.cursor()?;
45 Ok(need
46 .into_iter()
47 .map(move |c| RequiredDependency::new(topic.clone(), c, needed_by)))
48 })
49 .flatten_ok()
50 .try_collect()
51 }
52
53 fn causal_sort(&mut self) -> Result<Option<Vec<T>>, EnvelopeError> {
55 sort::causal(&mut self.envelopes, &mut self.topic_cursor).sort()
56 }
57
58 fn timestamp_sort(&mut self) -> Result<(), EnvelopeError> {
60 let _ = sort::timestamp(&mut self.envelopes).sort()?;
62 Ok(())
63 }
64
65 fn recover_lost_children(&mut self) -> Result<(), EnvelopeError> {
68 let cursors: Vec<_> = self.envelopes.iter().map(|e| e.cursor()).try_collect()?;
69 let children = self.store.resolve_children(&cursors)?;
70 if !children.is_empty() {
71 tracing::info!("recovered {} children", children.len());
72 if tracing::enabled!(Level::TRACE) {
73 for child in &children {
74 tracing::trace!(
75 "recovered child@{} dependant on parent@{} for group@{}",
76 &child.cursor,
77 &child.depends_on,
78 &child.group_id
79 );
80 }
81 }
82 }
83 let cursors: HashSet<Cursor> = HashSet::from_iter(cursors);
84 let mut envelopes: Vec<T> = children
85 .into_iter()
86 .filter(|o| !cursors.contains(&o.cursor))
88 .map(OrphanedEnvelope::into_payload)
89 .map(T::decode)
90 .try_collect()?;
91 self.envelopes.append(&mut envelopes);
94 Ok(())
95 }
96}
97
98impl<T, R, S> Ordered<T, R, S> {
99 pub fn into_parts(self) -> (Vec<T>, TopicCursor) {
100 (self.envelopes, self.topic_cursor)
101 }
102}
103
104impl<T: Clone, R: Clone, S: Clone> Ordered<T, R, S> {
105 pub fn builder() -> OrderedBuilder<T, R, S> {
106 OrderedBuilder::default()
107 }
108}
109
110#[xmtp_common::async_trait]
111impl<T, R, S> OrderedEnvelopeCollection for Ordered<T, R, S>
112where
113 T: Envelope<'static> + prost::Message + Default,
114 R: ResolveDependencies<ResolvedEnvelope = T>,
115 S: CursorStore,
116{
117 async fn order(&mut self) -> Result<(), ResolutionError> {
124 self.recover_lost_children()?;
125 self.timestamp_sort()?;
126 while let Some(mut missing) = self.causal_sort()? {
127 let needed_envelopes = self.required_dependencies(&missing)?;
128 let Resolved { mut resolved, .. } = self.resolver.resolve(needed_envelopes).await?;
129 if resolved.is_empty() {
130 let orphans = missing
131 .into_iter()
132 .map(|e| e.orphan())
133 .inspect(|orphan| {
134 if let Ok(o) = orphan {
135 tracing::debug!("icing {}", o)
136 }
137 })
138 .try_collect()?;
139 self.store.ice(orphans)?;
140 break;
141 }
142 self.envelopes.append(&mut resolved);
143 self.envelopes.append(&mut missing);
144 self.recover_lost_children()?;
145 }
146 Ok(())
147 }
148
149 fn order_offline(&mut self) -> Result<(), ResolutionError> {
150 self.recover_lost_children()?;
151 self.timestamp_sort()?;
152 if let Some(missing) = self.causal_sort()? {
153 tracing::debug!("icing {} orphans", missing.len());
154 let orphans = missing
155 .into_iter()
156 .map(|e| e.orphan())
157 .inspect(|orphan| {
158 if let Ok(o) = orphan {
159 tracing::debug!("icing {}", o)
160 }
161 })
162 .try_collect()?;
163 self.store.ice(orphans)?;
164 }
165 Ok(())
166 }
167}
168
169#[cfg(test)]
170mod test {
171 use std::sync::Arc;
172
173 use super::*;
174 use crate::protocol::{
175 InMemoryCursorStore,
176 utils::test::{EnvelopesWithMissing, TestEnvelope, missing_dependencies},
177 };
178 use futures::FutureExt;
179 use parking_lot::Mutex;
180 use proptest::{prelude::*, sample::subsequence};
181 use xmtp_common::{DebugDisplay, assert_ok};
182 use xmtp_proto::types::OriginatorId;
183
184 #[derive(Clone, Debug)]
186 struct MockResolver {
187 available: Arc<Mutex<Vec<TestEnvelope>>>,
188 unavailable: Arc<Mutex<Vec<TestEnvelope>>>,
189 returned: Arc<Mutex<Vec<TestEnvelope>>>,
190 }
191
192 #[xmtp_common::async_trait]
193 impl ResolveDependencies for MockResolver {
194 type ResolvedEnvelope = TestEnvelope;
195
196 async fn resolve(
197 &self,
198 missing: HashSet<RequiredDependency>,
199 ) -> Result<Resolved<TestEnvelope>, ResolutionError> {
200 let missing_set: HashSet<_> = missing.iter().map(|m| m.cursor).collect();
201 let mut available = self.available.lock();
202 let available_cursors = available
203 .iter()
204 .map(|a| a.cursor())
205 .collect::<HashSet<Cursor>>();
206 let resolved = available
208 .extract_if(.., |env| {
209 let cursor = env.cursor();
210 missing_set.contains(&cursor)
211 })
212 .collect::<Vec<_>>();
213 let mut ret = self.returned.lock();
214 ret.extend(resolved.clone());
215 let unresolved_cursors = missing_set
216 .difference(&available_cursors)
217 .collect::<HashSet<_>>();
218 let unresolved: HashSet<_> = missing
219 .into_iter()
220 .filter(|m| unresolved_cursors.contains(&m.cursor))
221 .collect();
222
223 Ok(Resolved::new(
224 resolved,
225 (!unresolved.is_empty()).then_some(unresolved),
226 ))
227 }
228 }
229
230 #[track_caller]
231 fn assert_topic_cursor_seen(
232 cursor: &TopicCursor,
233 env: &TestEnvelope,
234 message: &str,
235 ) -> Result<(), TestCaseError> {
236 let clock = cursor.get(&env.topic().unwrap()).unwrap();
237 prop_assert!(clock.has_seen(&env.cursor()), "{}", message);
238 Ok(())
239 }
240
241 #[track_caller]
242 fn assert_dependencies_satisfied(
243 env: &TestEnvelope,
244 topic_cursor: &mut TopicCursor,
245 ) -> Result<(), TestCaseError> {
246 let topic = env.topic().unwrap();
247 let clock = topic_cursor.get_or_default(&topic);
248 prop_assert!(
249 clock.dominates(&env.depends_on()),
250 "Envelope {} should have satisfied dependencies. Topic clock: {}",
251 env,
252 clock
253 );
254 Ok(())
255 }
256
257 #[track_caller]
258 fn assert_no_unavailable_deps(
259 env: &TestEnvelope,
260 unavailable: &[TestEnvelope],
261 ) -> Result<(), TestCaseError> {
262 for unavailable_env in unavailable {
263 prop_assert!(
264 !env.has_dependency_on(unavailable_env),
265 "Envelope should not depend on unavailable envelope. Envelope: {}, Unavailable: {}",
266 env,
267 unavailable_env
268 );
269 }
270 Ok(())
271 }
272
273 prop_compose! {
274 pub fn resolvable_dependencies(length: usize, originators: Vec<OriginatorId>)
275 (envelopes in missing_dependencies(length, originators))
276 (available in subsequence(envelopes.removed.clone(), 0..=envelopes.removed.len()), envelopes in Just(envelopes))
277 -> EnvelopesWithResolver {
278 let mut unavailable = envelopes.removed.clone();
279 unavailable.retain(|e| !available.contains(e));
280 EnvelopesWithResolver {
281 missing: envelopes,
282 resolver: MockResolver {
283 available: Arc::new(Mutex::new(available)),
284 unavailable: Arc::new(Mutex::new(unavailable)),
285 returned: Arc::new(Mutex::new(Vec::new()))
286 }
287 }
288 }
289 }
290
291 #[derive(Debug, Clone)]
292 struct EnvelopesWithResolver {
293 missing: EnvelopesWithMissing,
294 resolver: MockResolver,
295 }
296
297 impl EnvelopesWithResolver {
298 fn available(&self) -> Vec<TestEnvelope> {
299 let a = self.resolver.available.lock();
300 a.clone()
301 }
302
303 fn unavailable(&self) -> Vec<TestEnvelope> {
304 let v = self.resolver.unavailable.lock();
305 v.clone()
306 }
307
308 pub fn make_available(&self, idx: usize) -> TestEnvelope {
310 let mut v = self.resolver.unavailable.lock();
311 let mut available = self.resolver.available.lock();
312 let new = v.remove(idx);
313 available.push(new.clone());
314 new
315 }
316
317 pub fn returned(&self) -> Vec<TestEnvelope> {
318 let v = self.resolver.returned.lock();
319 v.clone()
320 }
321
322 fn all_envelopes(&self) -> Vec<TestEnvelope> {
323 self.available()
324 .into_iter()
325 .chain(self.missing.envelopes.clone())
326 .chain(self.returned())
327 .collect()
328 }
329
330 fn find_envelope(&self, cursor: &Cursor) -> Option<TestEnvelope> {
331 self.all_envelopes()
332 .into_iter()
333 .find(|e| e.cursor() == *cursor)
334 }
335
336 fn has_unavailable_in_dependency_chain(&self, env: &TestEnvelope) -> bool {
337 let unavailable = self.unavailable();
338
339 if env.has_dependency_on_any(&unavailable) {
341 return true;
342 }
343
344 let mut to_check = vec![env.depends_on()];
346 while let Some(deps) = to_check.pop() {
347 if deps.is_empty() {
348 continue;
349 }
350
351 for cursor in deps.cursors() {
352 if let Some(dep_env) = self.find_envelope(&cursor) {
353 if dep_env.has_dependency_on_any(&unavailable) {
354 return true;
355 }
356 if !dep_env.depends_on().is_empty() {
357 to_check.push(dep_env.depends_on());
358 }
359 }
360 }
361 }
362 false
363 }
364
365 pub fn only_valid_dependants(&self) -> Vec<TestEnvelope> {
368 self.missing
369 .envelopes
370 .iter()
371 .filter(|env| !self.has_unavailable_in_dependency_chain(env))
372 .cloned()
373 .collect()
374 }
375 }
376
377 proptest! {
378 #[xmtp_common::test]
379 fn orders_with_unresolvable_dependencies(
380 envelopes in resolvable_dependencies(10, vec![10, 20, 30])
381 ) {
382 let valid = envelopes.only_valid_dependants();
383 let available = envelopes.available();
384 let unavailable = envelopes.unavailable();
385 let EnvelopesWithResolver {
386 missing,
387 resolver
388 } = envelopes;
389 let store = InMemoryCursorStore::new();
390 let mut ordered = Ordered::builder()
391 .envelopes(missing.envelopes)
392 .resolver(resolver)
393 .store(store.clone())
394 .topic_cursor(TopicCursor::default())
395 .build()
396 .unwrap();
397
398 ordered.order().now_or_never()
400 .expect("Future should complete immediately")
401 .unwrap();
402
403 let (result, mut topic_cursor) = ordered.into_parts();
404
405 for env in &valid {
407 assert_topic_cursor_seen(
408 &topic_cursor,
409 env,
410 &format!("topic cursor {} must have seen {:?}\n\
411 ordering_pass: \n{}\n\
412 icebox: \n{}\n",
413 topic_cursor, env, result.format_enumerated(),store.icebox().format_enumerated()
414 ))?;
415 }
416
417 for envelope in &result {
419 assert_dependencies_satisfied(envelope, &mut topic_cursor)?;
420 assert_no_unavailable_deps(envelope, &unavailable)?;
422 }
423 for available_env in &available {
426 if available_env.has_dependency_on_any(&unavailable) { continue; }
427 if result.iter().all(|e| !e.has_dependency_on(available_env)) { continue; }
428
429 prop_assert!(
430 result.iter().any(|e| e == available_env),
431 "Result does not contain {}", available_env
432 );
433 assert_dependencies_satisfied(available_env, &mut topic_cursor)?;
434 }
435 }
436
437 #[xmtp_common::test]
438 fn orders_with_recovered_children(
439 envelopes in resolvable_dependencies(10, vec![10, 20, 30])
440 ) {
441 let valid = envelopes.only_valid_dependants();
442 let valid_cursors = valid.iter().map(|e| e.cursor()).collect::<HashSet<_>>();
443 let unavailable = envelopes.unavailable();
444 let EnvelopesWithResolver {
445 ref missing,
446 ref resolver
447 } = envelopes;
448
449 let topic_cursor = TopicCursor::default();
450 let envelopes_to_check = missing.envelopes.clone();
451 let store = InMemoryCursorStore::new();
452 let mut ordered = Ordered::builder()
453 .envelopes(missing.envelopes.clone())
454 .resolver(resolver.clone())
455 .store(store.clone())
456 .topic_cursor(topic_cursor)
457 .build()
458 .unwrap();
459
460 ordered.order().now_or_never()
462 .expect("Future should complete immediately")
463 .unwrap();
464
465 let (first_ordering_pass, topic_cursor) = ordered.into_parts();
466
467 let orphan_count = store.orphan_count();
469
470 if !unavailable.is_empty() {
473 let has_dependent_envelopes = envelopes_to_check.iter().any(|e| {
474 unavailable.iter().any(|u| e.has_dependency_on(u))
475 });
476
477 if has_dependent_envelopes {
478 prop_assert!(
479 orphan_count > 0,
480 "Expected some envelopes to be iced when dependencies are unavailable"
481 );
482 }
483 }
484
485 if !unavailable.is_empty() && orphan_count > 0 {
488 let newly_available = envelopes.make_available(0);
490
491 let mut ordered = Ordered::builder()
493 .envelopes(vec![newly_available.clone()])
494 .resolver(resolver.clone())
495 .store(store.clone())
496 .topic_cursor(topic_cursor)
497 .build()
498 .unwrap();
499
500 let orphan_count = store.orphan_count();
502
503 let (had_children, returned) = {
506 let returned = store.resolve_children(&[newly_available.cursor()]);
507 assert_ok!(&returned);
508 let returned = returned.unwrap();
509 let had_children = !returned.is_empty();
511 (had_children, returned)
512 };
513
514 ordered.order().now_or_never()
515 .expect("Future should complete immediately")
516 .unwrap();
517
518 let (second_ordering_pass, mut new_topic_cursor) = ordered.into_parts();
519
520 let had_children = orphan_count > 0 && had_children;
521 let child_str = returned.format_list();
522 let icebox_str = store.icebox().format_enumerated();
523 let second_ordering_pass_str = second_ordering_pass.format_enumerated();
524 let first_ordering_pass_str = first_ordering_pass.format_enumerated();
525 let is_valid = valid_cursors.contains(&newly_available.cursor());
526 let valid_children = returned.iter().map(TestEnvelope::from).filter(|c| c.only_depends_on(&valid)).collect::<Vec<_>>();
527 let num_valid_children = valid_children.len();
528 let valid_children_str = valid_children.format_enumerated();
529 if had_children && is_valid {
530 prop_assert_eq!(1 + num_valid_children, second_ordering_pass.len(),
531 "valid orphans should be in envelopes list\n\
532 valid_children: \n{}\n\
533 icebox: \n{}\n\
534 final topic_cursor: \n{}\n\
535 first_ordering_pass: \n{}\n\
536 newly_available->{}\n\
537 second_ordering_pass:\n{}\n\
538 ", valid_children_str, icebox_str, new_topic_cursor, first_ordering_pass_str, newly_available, second_ordering_pass_str,
539 );
540 prop_assert!(
541 !second_ordering_pass.is_empty(),
542 "Expected children to be recovered when parent becomes available.\n \
543 Result length: {} \
544 \n newly_available: {} \
545 \nicebox:\n{} \
546 \nlen: {} \
547 \nrecovered_children:\n{}
548 \n topic cursor {:?}",
549 second_ordering_pass.len(),
550 newly_available,
551 icebox_str,
552 store.orphan_count(),
553 child_str,
554 new_topic_cursor
555 );
556 }
557
558 for envelope in &second_ordering_pass {
560 assert_dependencies_satisfied(envelope, &mut new_topic_cursor)?;
561 }
562 }
563 }
564 }
565}