xmtp_proto/types/
global_cursor.rs

1//! A global cursor is type of cursor representing a view of our position across all originators
2//! in the network.
3use crate::{
4    ConversionError,
5    api::VectorClock,
6    types::{OriginatorId, SequenceId},
7    xmtp::xmtpv4::envelopes::Cursor,
8};
9use core::fmt;
10use serde::{Deserialize, Serialize};
11use std::{
12    collections::{BTreeMap, HashMap, HashSet},
13    fmt::Write,
14    ops::{Deref, DerefMut},
15};
16use xmtp_configuration::Originators;
17
18/// a cursor backed by a [`BTreeMap`].
19/// represents the position across many nodes in the network
20/// a.k.a vector clock
21#[derive(Default, Debug, Clone, Hash, Serialize, Deserialize, PartialEq, Eq)]
22pub struct GlobalCursor {
23    inner: BTreeMap<OriginatorId, SequenceId>,
24}
25
26impl GlobalCursor {
27    /// Construct a new cursor from a BTreeMap
28    pub fn new(map: BTreeMap<OriginatorId, SequenceId>) -> Self {
29        Self { inner: map }
30    }
31
32    /// check if this cursor has seen `other`
33    pub fn has_seen(&self, other: &super::Cursor) -> bool {
34        let sid = self.get(&other.originator_id);
35        sid >= other.sequence_id
36    }
37
38    /// creates a from a [`HashMap`], internally converting to a [`BTreeMap`]
39    pub fn with_hashmap(map: HashMap<OriginatorId, SequenceId>) -> Self {
40        Self {
41            inner: BTreeMap::from_iter(map),
42        }
43    }
44
45    /// iterate over all K/V pairs as a [`super::Cursor`]
46    pub fn cursors(&self) -> impl Iterator<Item = super::Cursor> {
47        self.iter().map(|(k, v)| super::Cursor {
48            originator_id: *k,
49            sequence_id: *v,
50        })
51    }
52
53    /// Apply a singular cursor to 'Self'
54    pub fn apply(&mut self, cursor: &super::Cursor) {
55        let _ = self
56            .inner
57            .entry(cursor.originator_id)
58            .and_modify(|sid| *sid = (*sid).max(cursor.sequence_id))
59            .or_insert(cursor.sequence_id);
60    }
61
62    /// apply a cursor to `Self`, and take the lowest value of SequenceId between
63    /// `Self` and [Cursor](super::Cursor)
64    pub fn apply_least(&mut self, cursor: &super::Cursor) {
65        let _ = self
66            .inner
67            .entry(cursor.originator_id)
68            .and_modify(|sid| *sid = (*sid).min(cursor.sequence_id))
69            .or_insert(cursor.sequence_id);
70    }
71
72    /// Get the maximum sequence id for [`crate::xmtpv4::Originator`]
73    pub fn get(&self, originator: &OriginatorId) -> SequenceId {
74        self.inner.get(originator).copied().unwrap_or(0)
75    }
76
77    /// get the full [`super::Cursor`] that belongs to this [`OriginatorId``
78    pub fn cursor(&self, originator: &OriginatorId) -> super::Cursor {
79        super::Cursor::new(self.get(originator), *originator)
80    }
81
82    /// Get the max sequence id across all originator ids
83    pub fn max(&self) -> SequenceId {
84        self.inner.values().copied().max().unwrap_or(0)
85    }
86
87    /// get latest sequence id for the v3 welcome message originator
88    pub fn v3_welcome(&self) -> SequenceId {
89        self.inner
90            .get(&(Originators::WELCOME_MESSAGES))
91            .copied()
92            .unwrap_or_default()
93    }
94
95    /// get latest sequence id for v3 application message originator
96    pub fn v3_message(&self) -> SequenceId {
97        self.inner
98            .get(&(Originators::APPLICATION_MESSAGES))
99            .copied()
100            .unwrap_or_default()
101    }
102
103    /// get the latest sequence id for the mls commit originator (v3/d14n)
104    pub fn commit(&self) -> SequenceId {
105        self.inner
106            .get(&(Originators::MLS_COMMITS))
107            .copied()
108            .unwrap_or_default()
109    }
110
111    /// get the latest sequence id for the mls commit originator (v3/d14n)
112    pub fn commit_cursor(&self) -> super::Cursor {
113        let sequence_id = self.get(&(Originators::MLS_COMMITS));
114        super::Cursor::mls_commits(sequence_id)
115    }
116
117    /// get the latest sequence_id for the installation/key package originator
118    pub fn v3_installations(&self) -> SequenceId {
119        self.inner
120            .get(&(Originators::INSTALLATIONS))
121            .copied()
122            .unwrap_or_default()
123    }
124
125    /// get the latest sequence id for the inbox log originator (v3/d14n)
126    pub fn inbox_log(&self) -> SequenceId {
127        self.inner
128            .get(&(Originators::INBOX_LOG))
129            .copied()
130            .unwrap_or_default()
131    }
132}
133
134impl fmt::Display for GlobalCursor {
135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        let mut s = String::new();
137        for (oid, sid) in self.inner.iter() {
138            write!(s, "{}", super::Cursor::new(*sid, *oid))?;
139        }
140        write!(f, "{:25}", s)
141    }
142}
143
144impl FromIterator<(OriginatorId, SequenceId)> for GlobalCursor {
145    fn from_iter<T: IntoIterator<Item = (OriginatorId, SequenceId)>>(iter: T) -> Self {
146        GlobalCursor::new(BTreeMap::from_iter(iter))
147    }
148}
149
150impl From<Cursor> for GlobalCursor {
151    fn from(value: Cursor) -> Self {
152        GlobalCursor {
153            inner: BTreeMap::from_iter(value.node_id_to_sequence_id),
154        }
155    }
156}
157
158impl From<GlobalCursor> for Cursor {
159    fn from(value: GlobalCursor) -> Self {
160        Cursor {
161            node_id_to_sequence_id: HashMap::from_iter(value.inner),
162        }
163    }
164}
165
166impl TryFrom<GlobalCursor> for crate::types::Cursor {
167    type Error = ConversionError;
168
169    fn try_from(value: GlobalCursor) -> Result<Self, Self::Error> {
170        if value.len() > 1 {
171            return Err(ConversionError::InvalidLength {
172                item: std::any::type_name::<GlobalCursor>(),
173                expected: 1,
174                got: value.len(),
175            });
176        }
177        if value.is_empty() {
178            return Err(ConversionError::InvalidLength {
179                item: std::any::type_name::<GlobalCursor>(),
180                expected: 1,
181                got: 0,
182            });
183        }
184
185        let (oid, sid) = value
186            .into_iter()
187            .next()
188            .expect("ensured length is at least one");
189        Ok(super::Cursor::new(sid, oid))
190    }
191}
192
193impl TryFrom<Cursor> for crate::types::Cursor {
194    type Error = ConversionError;
195
196    fn try_from(value: Cursor) -> Result<Self, Self::Error> {
197        let global: GlobalCursor = value.into();
198        global.try_into()
199    }
200}
201
202impl From<crate::types::Cursor> for GlobalCursor {
203    fn from(value: crate::types::Cursor) -> Self {
204        let mut map = BTreeMap::new();
205        map.insert(value.originator_id, value.sequence_id);
206        GlobalCursor { inner: map }
207    }
208}
209
210impl From<BTreeMap<OriginatorId, SequenceId>> for GlobalCursor {
211    fn from(value: BTreeMap<OriginatorId, SequenceId>) -> Self {
212        GlobalCursor { inner: value }
213    }
214}
215
216impl IntoIterator for GlobalCursor {
217    type Item = (OriginatorId, SequenceId);
218    type IntoIter = <BTreeMap<OriginatorId, SequenceId> as IntoIterator>::IntoIter;
219
220    fn into_iter(self) -> Self::IntoIter {
221        self.inner.into_iter()
222    }
223}
224
225impl<'a> IntoIterator for &'a GlobalCursor {
226    type Item = (&'a OriginatorId, &'a SequenceId);
227    type IntoIter = std::collections::btree_map::Iter<'a, OriginatorId, SequenceId>;
228    fn into_iter(self) -> Self::IntoIter {
229        self.inner.iter()
230    }
231}
232
233impl<'a> IntoIterator for &'a mut GlobalCursor {
234    type Item = (&'a OriginatorId, &'a mut SequenceId);
235    type IntoIter = std::collections::btree_map::IterMut<'a, OriginatorId, SequenceId>;
236
237    fn into_iter(self) -> Self::IntoIter {
238        self.inner.iter_mut()
239    }
240}
241
242impl Deref for GlobalCursor {
243    type Target = BTreeMap<OriginatorId, SequenceId>;
244
245    fn deref(&self) -> &Self::Target {
246        &self.inner
247    }
248}
249
250impl DerefMut for GlobalCursor {
251    fn deref_mut(&mut self) -> &mut Self::Target {
252        &mut self.inner
253    }
254}
255
256impl<C: Into<super::Cursor>> Extend<C> for GlobalCursor {
257    fn extend<T: IntoIterator<Item = C>>(&mut self, iter: T) {
258        self.inner.extend(iter.into_iter().map(|c| {
259            let c: super::Cursor = c.into();
260            (c.originator_id, c.sequence_id)
261        }))
262    }
263}
264
265impl From<HashMap<OriginatorId, SequenceId>> for GlobalCursor {
266    fn from(value: HashMap<OriginatorId, SequenceId>) -> Self {
267        GlobalCursor::with_hashmap(value)
268    }
269}
270
271#[derive(Debug, Clone, Copy, PartialEq, Eq)]
272pub enum ClockOrdering {
273    Equal,
274    Ancestor,
275    Descendant,
276    Concurrent,
277}
278
279impl VectorClock for GlobalCursor {
280    fn dominates(&self, other: &Self) -> bool {
281        other.iter().all(|(&node, &seq)| self.get(&node) >= seq)
282    }
283
284    /// gets all updates in `other` that are not seen by `self`.
285    fn missing(&self, other: &Self) -> Vec<super::Cursor> {
286        other
287            .iter()
288            .filter_map(|(&node, &seq)| {
289                (self.get(&node) < seq).then_some(super::Cursor::new(seq, node))
290            })
291            .collect()
292    }
293
294    fn merge(&mut self, other: &Self) {
295        for (&node, &seq) in other {
296            let entry = self.entry(node).or_insert(0);
297            *entry = (*entry).max(seq);
298        }
299    }
300
301    fn merge_least(&mut self, other: &Self) {
302        for (&node, &seq) in other {
303            let entry = self.entry(node).or_insert(seq);
304            *entry = (*entry).min(seq);
305        }
306    }
307
308    fn compare(&self, other: &Self) -> ClockOrdering {
309        let all_nodes: HashSet<_> = self.keys().chain(other.keys()).collect();
310
311        let mut self_greater = false;
312        let mut other_greater = false;
313
314        for node in all_nodes {
315            let a = self.get(node);
316            let b = other.get(node);
317
318            if a > b {
319                self_greater = true;
320            } else if a < b {
321                other_greater = true;
322            }
323        }
324
325        match (self_greater, other_greater) {
326            (false, false) => ClockOrdering::Equal,
327            (true, false) => ClockOrdering::Descendant,
328            (false, true) => ClockOrdering::Ancestor,
329            (true, true) => ClockOrdering::Concurrent,
330        }
331    }
332
333    fn apply(&mut self, cursor: &super::Cursor) {
334        let _ = self
335            .inner
336            .entry(cursor.originator_id)
337            .and_modify(|sid| *sid = (*sid).max(cursor.sequence_id))
338            .or_insert(cursor.sequence_id);
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345
346    #[xmtp_common::test]
347    fn dominates_empty() {
348        let empty = GlobalCursor::default();
349        let mut not_empty = GlobalCursor::default();
350        not_empty.insert(1, 1);
351        assert!(not_empty.dominates(&empty));
352    }
353}