xmtp_proto/types/
topic_cursor.rs

1use std::{
2    collections::{HashMap, hash_map::Entry},
3    ops::{Deref, DerefMut},
4};
5
6use crate::{
7    api::VectorClock,
8    types::{GlobalCursor, InstallationId, Topic},
9};
10
11/// A cursor that keeps a [`super::GlobalCursor`] for each topic it has seen.
12#[derive(Default, Debug, PartialEq, Clone)]
13pub struct TopicCursor {
14    inner: HashMap<Topic, GlobalCursor>,
15}
16
17pub type TopicEntry<'a> = Entry<'a, Topic, GlobalCursor>;
18
19impl TopicCursor {
20    /// get the item at [`Topic`] or insert the default
21    pub fn get_or_default(&mut self, topic: &Topic) -> &GlobalCursor {
22        self.inner.entry(topic.clone()).or_default()
23    }
24
25    /// get the [`GlobalCursor`] corresponding to the [`super::TopicKind::GroupMessagesV1`]
26    /// by [`super::GroupId`]
27    pub fn get_group(&self, group_id: impl AsRef<[u8]>) -> GlobalCursor {
28        self.inner
29            .get(&Topic::new_group_message(group_id))
30            .cloned()
31            .unwrap_or_default()
32    }
33
34    /// check if this topic cursor contains [`super::GroupId`]
35    pub fn contains_group(&self, group_id: impl AsRef<[u8]>) -> bool {
36        self.inner.contains_key(&Topic::new_group_message(group_id))
37    }
38
39    /// Computes the Lowest Common Cursor (LCC) across all topics.
40    ///
41    /// For each originator node, takes the minimum sequence ID seen across
42    /// all topics. Returns an empty cursor if this TopicCursor is empty.
43    pub fn lcc(&self) -> GlobalCursor {
44        self.values().fold(GlobalCursor::default(), |mut acc, c| {
45            acc.merge_least(c);
46            acc
47        })
48    }
49
50    /// Computes the Greatest Common Cursor (GCC) across all topics.
51    ///
52    /// For each originator node, takes the maximum sequence ID seen across
53    /// all topics. Returns an empty cursor if this TopicCursor is empty.
54    pub fn gcc(&self) -> GlobalCursor {
55        self.values().fold(GlobalCursor::default(), |mut acc, c| {
56            acc.merge(c);
57            acc
58        })
59    }
60
61    /// consume this topic cursor into a list of its topics
62    pub fn into_topics(self) -> Vec<Topic> {
63        self.inner.into_keys().collect()
64    }
65
66    /// get a [`Vec`] of all [`Topic`] in this cursor
67    /// by cloning.
68    pub fn topics(&self) -> Vec<Topic> {
69        self.inner.keys().cloned().collect()
70    }
71
72    /// entry api for only [super::TopicKind::GroupMessagesV1]
73    pub fn group_entry(&mut self, group_id: impl AsRef<[u8]>) -> TopicEntry<'_> {
74        self.inner.entry(Topic::new_group_message(group_id))
75    }
76
77    /// entry api for only [super::TopicKind::IdentityUpdatesV1]
78    pub fn identity_entry(&mut self, inbox_id: impl AsRef<[u8]>) -> TopicEntry<'_> {
79        self.inner.entry(Topic::new_identity_update(inbox_id))
80    }
81
82    /// entry api for only [super::TopicKind::WelcomeMessagesV1]
83    pub fn welcome_entry(&mut self, installation_id: InstallationId) -> TopicEntry<'_> {
84        self.inner
85            .entry(Topic::new_welcome_message(installation_id))
86    }
87
88    /// entry api for only [super::TopicKind::KeyPackagesV1]
89    pub fn key_package_entry(&mut self, installation_id: InstallationId) -> TopicEntry<'_> {
90        self.inner.entry(Topic::new_key_package(installation_id))
91    }
92
93    /// iterate over all [`super::TopicKind::GroupMessagesV1`]
94    /// topics as a pair of (&[`super::GroupId`], &[`GlobalCursor`])
95    pub fn groups(&self) -> impl Iterator<Item = (&[u8], &GlobalCursor)> {
96        self.inner
97            .iter()
98            .filter_map(|(t, g)| Some((t.group_message_v1()?.identifier(), g)))
99    }
100}
101
102impl From<HashMap<Topic, GlobalCursor>> for TopicCursor {
103    fn from(inner: HashMap<Topic, GlobalCursor>) -> Self {
104        TopicCursor { inner }
105    }
106}
107
108impl FromIterator<(Topic, GlobalCursor)> for TopicCursor {
109    fn from_iter<T: IntoIterator<Item = (Topic, GlobalCursor)>>(iter: T) -> Self {
110        TopicCursor {
111            inner: HashMap::from_iter(iter),
112        }
113    }
114}
115
116impl IntoIterator for TopicCursor {
117    type Item = (Topic, GlobalCursor);
118    type IntoIter = <HashMap<Topic, GlobalCursor> as IntoIterator>::IntoIter;
119
120    fn into_iter(self) -> Self::IntoIter {
121        self.inner.into_iter()
122    }
123}
124
125impl<'a> IntoIterator for &'a TopicCursor {
126    type Item = (&'a Topic, &'a GlobalCursor);
127    type IntoIter = std::collections::hash_map::Iter<'a, Topic, GlobalCursor>;
128    fn into_iter(self) -> Self::IntoIter {
129        self.inner.iter()
130    }
131}
132
133impl<'a> IntoIterator for &'a mut TopicCursor {
134    type Item = (&'a Topic, &'a mut GlobalCursor);
135    type IntoIter = std::collections::hash_map::IterMut<'a, Topic, GlobalCursor>;
136
137    fn into_iter(self) -> Self::IntoIter {
138        self.inner.iter_mut()
139    }
140}
141
142impl Deref for TopicCursor {
143    type Target = HashMap<Topic, GlobalCursor>;
144    fn deref(&self) -> &Self::Target {
145        &self.inner
146    }
147}
148
149impl DerefMut for TopicCursor {
150    fn deref_mut(&mut self) -> &mut Self::Target {
151        &mut self.inner
152    }
153}
154
155impl TopicCursor {
156    pub fn add(&mut self, topic: Topic, cursor: GlobalCursor) {
157        self.inner.insert(topic, cursor);
158    }
159}
160
161impl std::fmt::Display for TopicCursor {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        for (topic, has_seen) in self.inner.iter() {
164            writeln!(f, "{} -> {}", topic, has_seen)?;
165        }
166        Ok(())
167    }
168}