1use crate::{
4 ConversionError,
5 api::VectorClock,
6 types::{OriginatorId, SequenceId},
7 xmtp::xmtpv4::envelopes::Cursor,
8};
9use core::fmt;
10use serde::{Deserialize, Serialize};
11use std::{
12 collections::{BTreeMap, HashMap, HashSet},
13 fmt::Write,
14 ops::{Deref, DerefMut},
15};
16use xmtp_configuration::Originators;
17
18#[derive(Default, Debug, Clone, Hash, Serialize, Deserialize, PartialEq, Eq)]
22pub struct GlobalCursor {
23 inner: BTreeMap<OriginatorId, SequenceId>,
24}
25
26impl GlobalCursor {
27 pub fn new(map: BTreeMap<OriginatorId, SequenceId>) -> Self {
29 Self { inner: map }
30 }
31
32 pub fn has_seen(&self, other: &super::Cursor) -> bool {
34 let sid = self.get(&other.originator_id);
35 sid >= other.sequence_id
36 }
37
38 pub fn with_hashmap(map: HashMap<OriginatorId, SequenceId>) -> Self {
40 Self {
41 inner: BTreeMap::from_iter(map),
42 }
43 }
44
45 pub fn cursors(&self) -> impl Iterator<Item = super::Cursor> {
47 self.iter().map(|(k, v)| super::Cursor {
48 originator_id: *k,
49 sequence_id: *v,
50 })
51 }
52
53 pub fn apply(&mut self, cursor: &super::Cursor) {
55 let _ = self
56 .inner
57 .entry(cursor.originator_id)
58 .and_modify(|sid| *sid = (*sid).max(cursor.sequence_id))
59 .or_insert(cursor.sequence_id);
60 }
61
62 pub fn apply_least(&mut self, cursor: &super::Cursor) {
65 let _ = self
66 .inner
67 .entry(cursor.originator_id)
68 .and_modify(|sid| *sid = (*sid).min(cursor.sequence_id))
69 .or_insert(cursor.sequence_id);
70 }
71
72 pub fn get(&self, originator: &OriginatorId) -> SequenceId {
74 self.inner.get(originator).copied().unwrap_or(0)
75 }
76
77 pub fn cursor(&self, originator: &OriginatorId) -> super::Cursor {
79 super::Cursor::new(self.get(originator), *originator)
80 }
81
82 pub fn max(&self) -> SequenceId {
84 self.inner.values().copied().max().unwrap_or(0)
85 }
86
87 pub fn v3_welcome(&self) -> SequenceId {
89 self.inner
90 .get(&(Originators::WELCOME_MESSAGES))
91 .copied()
92 .unwrap_or_default()
93 }
94
95 pub fn v3_message(&self) -> SequenceId {
97 self.inner
98 .get(&(Originators::APPLICATION_MESSAGES))
99 .copied()
100 .unwrap_or_default()
101 }
102
103 pub fn commit(&self) -> SequenceId {
105 self.inner
106 .get(&(Originators::MLS_COMMITS))
107 .copied()
108 .unwrap_or_default()
109 }
110
111 pub fn commit_cursor(&self) -> super::Cursor {
113 let sequence_id = self.get(&(Originators::MLS_COMMITS));
114 super::Cursor::mls_commits(sequence_id)
115 }
116
117 pub fn v3_installations(&self) -> SequenceId {
119 self.inner
120 .get(&(Originators::INSTALLATIONS))
121 .copied()
122 .unwrap_or_default()
123 }
124
125 pub fn inbox_log(&self) -> SequenceId {
127 self.inner
128 .get(&(Originators::INBOX_LOG))
129 .copied()
130 .unwrap_or_default()
131 }
132}
133
134impl fmt::Display for GlobalCursor {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 let mut s = String::new();
137 for (oid, sid) in self.inner.iter() {
138 write!(s, "{}", super::Cursor::new(*sid, *oid))?;
139 }
140 write!(f, "{:25}", s)
141 }
142}
143
144impl FromIterator<(OriginatorId, SequenceId)> for GlobalCursor {
145 fn from_iter<T: IntoIterator<Item = (OriginatorId, SequenceId)>>(iter: T) -> Self {
146 GlobalCursor::new(BTreeMap::from_iter(iter))
147 }
148}
149
150impl From<Cursor> for GlobalCursor {
151 fn from(value: Cursor) -> Self {
152 GlobalCursor {
153 inner: BTreeMap::from_iter(value.node_id_to_sequence_id),
154 }
155 }
156}
157
158impl From<GlobalCursor> for Cursor {
159 fn from(value: GlobalCursor) -> Self {
160 Cursor {
161 node_id_to_sequence_id: HashMap::from_iter(value.inner),
162 }
163 }
164}
165
166impl TryFrom<GlobalCursor> for crate::types::Cursor {
167 type Error = ConversionError;
168
169 fn try_from(value: GlobalCursor) -> Result<Self, Self::Error> {
170 if value.len() > 1 {
171 return Err(ConversionError::InvalidLength {
172 item: std::any::type_name::<GlobalCursor>(),
173 expected: 1,
174 got: value.len(),
175 });
176 }
177 if value.is_empty() {
178 return Err(ConversionError::InvalidLength {
179 item: std::any::type_name::<GlobalCursor>(),
180 expected: 1,
181 got: 0,
182 });
183 }
184
185 let (oid, sid) = value
186 .into_iter()
187 .next()
188 .expect("ensured length is at least one");
189 Ok(super::Cursor::new(sid, oid))
190 }
191}
192
193impl TryFrom<Cursor> for crate::types::Cursor {
194 type Error = ConversionError;
195
196 fn try_from(value: Cursor) -> Result<Self, Self::Error> {
197 let global: GlobalCursor = value.into();
198 global.try_into()
199 }
200}
201
202impl From<crate::types::Cursor> for GlobalCursor {
203 fn from(value: crate::types::Cursor) -> Self {
204 let mut map = BTreeMap::new();
205 map.insert(value.originator_id, value.sequence_id);
206 GlobalCursor { inner: map }
207 }
208}
209
210impl From<BTreeMap<OriginatorId, SequenceId>> for GlobalCursor {
211 fn from(value: BTreeMap<OriginatorId, SequenceId>) -> Self {
212 GlobalCursor { inner: value }
213 }
214}
215
216impl IntoIterator for GlobalCursor {
217 type Item = (OriginatorId, SequenceId);
218 type IntoIter = <BTreeMap<OriginatorId, SequenceId> as IntoIterator>::IntoIter;
219
220 fn into_iter(self) -> Self::IntoIter {
221 self.inner.into_iter()
222 }
223}
224
225impl<'a> IntoIterator for &'a GlobalCursor {
226 type Item = (&'a OriginatorId, &'a SequenceId);
227 type IntoIter = std::collections::btree_map::Iter<'a, OriginatorId, SequenceId>;
228 fn into_iter(self) -> Self::IntoIter {
229 self.inner.iter()
230 }
231}
232
233impl<'a> IntoIterator for &'a mut GlobalCursor {
234 type Item = (&'a OriginatorId, &'a mut SequenceId);
235 type IntoIter = std::collections::btree_map::IterMut<'a, OriginatorId, SequenceId>;
236
237 fn into_iter(self) -> Self::IntoIter {
238 self.inner.iter_mut()
239 }
240}
241
242impl Deref for GlobalCursor {
243 type Target = BTreeMap<OriginatorId, SequenceId>;
244
245 fn deref(&self) -> &Self::Target {
246 &self.inner
247 }
248}
249
250impl DerefMut for GlobalCursor {
251 fn deref_mut(&mut self) -> &mut Self::Target {
252 &mut self.inner
253 }
254}
255
256impl<C: Into<super::Cursor>> Extend<C> for GlobalCursor {
257 fn extend<T: IntoIterator<Item = C>>(&mut self, iter: T) {
258 self.inner.extend(iter.into_iter().map(|c| {
259 let c: super::Cursor = c.into();
260 (c.originator_id, c.sequence_id)
261 }))
262 }
263}
264
265impl From<HashMap<OriginatorId, SequenceId>> for GlobalCursor {
266 fn from(value: HashMap<OriginatorId, SequenceId>) -> Self {
267 GlobalCursor::with_hashmap(value)
268 }
269}
270
271#[derive(Debug, Clone, Copy, PartialEq, Eq)]
272pub enum ClockOrdering {
273 Equal,
274 Ancestor,
275 Descendant,
276 Concurrent,
277}
278
279impl VectorClock for GlobalCursor {
280 fn dominates(&self, other: &Self) -> bool {
281 other.iter().all(|(&node, &seq)| self.get(&node) >= seq)
282 }
283
284 fn missing(&self, other: &Self) -> Vec<super::Cursor> {
286 other
287 .iter()
288 .filter_map(|(&node, &seq)| {
289 (self.get(&node) < seq).then_some(super::Cursor::new(seq, node))
290 })
291 .collect()
292 }
293
294 fn merge(&mut self, other: &Self) {
295 for (&node, &seq) in other {
296 let entry = self.entry(node).or_insert(0);
297 *entry = (*entry).max(seq);
298 }
299 }
300
301 fn merge_least(&mut self, other: &Self) {
302 for (&node, &seq) in other {
303 let entry = self.entry(node).or_insert(seq);
304 *entry = (*entry).min(seq);
305 }
306 }
307
308 fn compare(&self, other: &Self) -> ClockOrdering {
309 let all_nodes: HashSet<_> = self.keys().chain(other.keys()).collect();
310
311 let mut self_greater = false;
312 let mut other_greater = false;
313
314 for node in all_nodes {
315 let a = self.get(node);
316 let b = other.get(node);
317
318 if a > b {
319 self_greater = true;
320 } else if a < b {
321 other_greater = true;
322 }
323 }
324
325 match (self_greater, other_greater) {
326 (false, false) => ClockOrdering::Equal,
327 (true, false) => ClockOrdering::Descendant,
328 (false, true) => ClockOrdering::Ancestor,
329 (true, true) => ClockOrdering::Concurrent,
330 }
331 }
332
333 fn apply(&mut self, cursor: &super::Cursor) {
334 let _ = self
335 .inner
336 .entry(cursor.originator_id)
337 .and_modify(|sid| *sid = (*sid).max(cursor.sequence_id))
338 .or_insert(cursor.sequence_id);
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345
346 #[xmtp_common::test]
347 fn dominates_empty() {
348 let empty = GlobalCursor::default();
349 let mut not_empty = GlobalCursor::default();
350 not_empty.insert(1, 1);
351 assert!(not_empty.dominates(&empty));
352 }
353}