xmtp_api_d14n/protocol/
in_memory_cursor_store.rs

1use crate::protocol::{CursorStore, CursorStoreError};
2use parking_lot::Mutex;
3use std::collections::{HashMap, HashSet};
4use std::fmt;
5use std::sync::Arc;
6use xmtp_proto::api::VectorClock;
7use xmtp_proto::types::{Cursor, GlobalCursor, OriginatorId, OrphanedEnvelope, Topic};
8
9#[derive(Default, Clone)]
10pub struct InMemoryCursorStore {
11    topics: HashMap<Topic, GlobalCursor>,
12    icebox: Arc<Mutex<HashSet<OrphanedEnvelope>>>,
13}
14
15impl InMemoryCursorStore {
16    pub fn new() -> Self {
17        Self {
18            topics: HashMap::new(),
19            icebox: Arc::new(Mutex::new(HashSet::new())),
20        }
21    }
22
23    /// Record that a message for this topic with the given clock was received
24    pub fn received(&mut self, topic: Topic, new_clock: &GlobalCursor) {
25        let current = self.topics.entry(topic).or_default();
26        current.merge(new_clock);
27    }
28
29    /// Get the current vector clock for this topic
30    pub fn get_latest(&self, topic: &Topic) -> Option<&GlobalCursor> {
31        self.topics.get(topic)
32    }
33
34    /// Compute the lowest common cursor across a set of topics.
35    /// For each node_id, uses the **minimum** sequence ID seen across all topics.
36    pub fn lowest_common_cursor(&self, topics: &[&Topic]) -> GlobalCursor {
37        let mut min_clock = GlobalCursor::default();
38
39        for topic in topics {
40            if let Some(cursor) = self.get_latest(topic) {
41                for (&node_id, &seq_id) in cursor {
42                    min_clock
43                        .entry(node_id)
44                        .and_modify(|existing| *existing = (*existing).min(seq_id))
45                        .or_insert(seq_id);
46                }
47            }
48        }
49        min_clock
50    }
51
52    /// Get the number of orphaned envelopes currently in the icebox
53    #[cfg(test)]
54    pub fn orphan_count(&self) -> usize {
55        self.icebox.lock().len()
56    }
57
58    #[cfg(test)]
59    pub fn icebox(&self) -> Vec<OrphanedEnvelope> {
60        let icebox = self.icebox.lock();
61        Vec::from_iter(icebox.clone())
62    }
63}
64
65impl CursorStore for InMemoryCursorStore {
66    fn lowest_common_cursor(
67        &self,
68        topics: &[&Topic],
69    ) -> Result<xmtp_proto::types::GlobalCursor, crate::protocol::CursorStoreError> {
70        Ok(self.lowest_common_cursor(topics))
71    }
72
73    fn lcc_maybe_missing(
74        &self,
75        topics: &[&Topic],
76    ) -> Result<GlobalCursor, super::CursorStoreError> {
77        Ok(self.lowest_common_cursor(topics))
78    }
79
80    fn latest(
81        &self,
82        topic: &xmtp_proto::types::Topic,
83    ) -> Result<GlobalCursor, crate::protocol::CursorStoreError> {
84        Ok(self
85            .get_latest(topic)
86            .cloned()
87            .unwrap_or_else(GlobalCursor::default))
88    }
89
90    fn latest_per_originator(
91        &self,
92        topic: &xmtp_proto::types::Topic,
93        originators: &[&OriginatorId],
94    ) -> Result<GlobalCursor, crate::protocol::CursorStoreError> {
95        Ok(self
96            .get_latest(topic)
97            .unwrap_or(&Default::default())
98            .iter()
99            .filter(|(k, _)| originators.contains(k))
100            .map(|(&k, &v)| (k, v))
101            .collect())
102    }
103
104    fn latest_for_topics(
105        &self,
106        topics: &mut dyn Iterator<Item = &Topic>,
107    ) -> Result<HashMap<Topic, GlobalCursor>, super::CursorStoreError> {
108        Ok(topics
109            .map(|topic| (topic.clone(), self.latest(topic).unwrap_or_default()))
110            .collect())
111    }
112
113    fn find_message_dependencies(
114        &self,
115        hash: &[&[u8]],
116    ) -> Result<HashMap<Vec<u8>, Cursor>, super::CursorStoreError> {
117        // in mem does not keep track of deps/commits
118        Err(CursorStoreError::NoDependenciesFound(
119            hash.iter().map(hex::encode).collect(),
120        ))
121    }
122
123    fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<(), CursorStoreError> {
124        let mut icebox = self.icebox.lock();
125        (*icebox).extend(orphans);
126        Ok(())
127    }
128
129    fn resolve_children(
130        &self,
131        cursors: &[Cursor],
132    ) -> Result<Vec<OrphanedEnvelope>, CursorStoreError> {
133        let icebox = self.icebox.lock();
134        Ok(Vec::from_iter(resolve_children_inner(cursors, &icebox)))
135    }
136}
137
138fn resolve_children_inner(
139    cursors: &[Cursor],
140    icebox: &HashSet<OrphanedEnvelope>,
141) -> HashSet<OrphanedEnvelope> {
142    let mut children: HashSet<OrphanedEnvelope> =
143        cursors.iter().fold(HashSet::new(), |mut acc, cursor| {
144            // extract if item in an icebox is child of the cursor
145            let children = icebox
146                .iter()
147                .filter(|o| o.is_child_of(cursor))
148                .cloned()
149                .collect::<HashSet<_>>();
150            acc.extend(children);
151            acc
152        });
153    // recursively work through deps
154    let cursors = children.iter().fold(Vec::new(), |mut acc, c| {
155        if !c.depends_on.is_empty() {
156            acc.push(c.cursor);
157        }
158        acc
159    });
160    if !cursors.is_empty() {
161        let v = resolve_children_inner(&cursors, icebox);
162        children.extend(v);
163    }
164    children
165}
166
167impl fmt::Debug for InMemoryCursorStore {
168    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169        let mut entries = f.debug_map();
170
171        for (topic, cursor) in &self.topics {
172            // display topic as hex for readability
173            let topic_hex = hex::encode(topic);
174            entries.entry(&topic_hex, cursor);
175        }
176
177        entries.finish()
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184
185    fn cursor_with(kvs: &[(u32, u64)]) -> GlobalCursor {
186        GlobalCursor::new(kvs.iter().cloned().collect())
187    }
188
189    #[xmtp_common::test]
190    fn test_processed_and_get_latest() {
191        let mut store = InMemoryCursorStore::new();
192        let topic = topic("chat/abc");
193
194        let cursor = cursor_with(&[(1, 10), (2, 5)]);
195        store.received(topic.clone(), &cursor.clone());
196
197        let latest = store.get_latest(&topic).unwrap();
198        assert_eq!(latest.get(&1), 10);
199        assert_eq!(latest.get(&2), 5);
200    }
201
202    #[xmtp_common::test]
203    fn test_merge_on_processed() {
204        let mut store = InMemoryCursorStore::new();
205        let topic = topic("chat/merge");
206
207        let c1 = cursor_with(&[(1, 10), (2, 5)]);
208        let c2 = cursor_with(&[(1, 12), (2, 3), (3, 7)]);
209
210        store.received(topic.clone(), &c1);
211        store.received(topic.clone(), &c2);
212
213        let latest = store.get_latest(&topic).unwrap();
214        assert_eq!(latest.get(&1), 12);
215        assert_eq!(latest.get(&2), 5);
216        assert_eq!(latest.get(&3), 7);
217    }
218
219    #[xmtp_common::test]
220    fn test_get_latest_nonexistent_topic() {
221        let store = InMemoryCursorStore::new();
222        let missing_topic = topic("does/not/exist");
223
224        assert!(store.get_latest(&missing_topic).is_none());
225    }
226
227    #[xmtp_common::test]
228    fn test_independent_topics() {
229        let mut store = InMemoryCursorStore::new();
230
231        let topic_a = topic("a");
232        let topic_b = topic("b");
233
234        store.received(topic_a.clone(), &cursor_with(&[(1, 1)]));
235        store.received(topic_b.clone(), &cursor_with(&[(2, 2)]));
236
237        let a = store.get_latest(&topic_a).unwrap();
238        let b = store.get_latest(&topic_b).unwrap();
239
240        assert_eq!(a.get(&1), 1);
241        assert_eq!(b.get(&2), 2);
242    }
243
244    #[xmtp_common::test]
245    fn test_merge_into_empty_store_creates_topic() {
246        let mut store = InMemoryCursorStore::new();
247        let topic = topic("new/topic");
248        let cursor = cursor_with(&[(5, 9)]);
249
250        store.received(topic.clone(), &cursor.clone());
251
252        let stored = store.get_latest(&topic).unwrap();
253        assert_eq!(stored.get(&5), 9);
254    }
255
256    fn topic(name: &str) -> Topic {
257        Topic::from_bytes(name.as_bytes())
258    }
259
260    #[xmtp_common::test]
261    fn test_lcc_normal_case() {
262        let mut store = InMemoryCursorStore::new();
263
264        store.received(topic("a"), &cursor_with(&[(1, 10), (2, 20)]));
265        store.received(topic("b"), &cursor_with(&[(1, 15), (2, 12), (3, 9)]));
266        store.received(topic("c"), &cursor_with(&[(1, 8), (3, 11)]));
267
268        let lcc = store.lowest_common_cursor(&[&topic("a"), &topic("b"), &topic("c")]);
269
270        assert_eq!(lcc.get(&1), 8); // min(10, 15, 8)
271        assert_eq!(lcc.get(&2), 12); // min(20, 12)
272        assert_eq!(lcc.get(&3), 9); // min(9, 11)
273    }
274
275    #[xmtp_common::test]
276    fn test_lcc_with_missing_topic() {
277        let mut store = InMemoryCursorStore::new();
278
279        store.received(topic("a"), &cursor_with(&[(1, 10)]));
280        store.received(topic("b"), &cursor_with(&[(1, 5)]));
281
282        let lcc = store.lowest_common_cursor(&[&topic("a"), &topic("b"), &topic("not-found")]);
283
284        assert_eq!(lcc.get(&1), 5); // min(10, 5)
285    }
286
287    #[xmtp_common::test]
288    fn test_lcc_with_zero_values() {
289        let mut store = InMemoryCursorStore::new();
290
291        store.received(topic("x"), &cursor_with(&[(1, 0), (2, 4)]));
292        store.received(topic("y"), &cursor_with(&[(1, 3), (2, 0)]));
293
294        let lcc = store.lowest_common_cursor(&[&topic("x"), &topic("y")]);
295
296        assert_eq!(lcc.get(&1), 0);
297        assert_eq!(lcc.get(&2), 0);
298    }
299
300    #[xmtp_common::test]
301    fn test_lcc_with_unseen_nodes() {
302        let mut store = InMemoryCursorStore::new();
303
304        store.received(topic("a"), &cursor_with(&[(1, 5)]));
305        store.received(topic("b"), &cursor_with(&[(2, 7)]));
306
307        let lcc = store.lowest_common_cursor(&[&topic("a"), &topic("b")]);
308
309        assert_eq!(lcc.get(&1), 5);
310        assert_eq!(lcc.get(&2), 7);
311    }
312
313    #[xmtp_common::test]
314    fn test_lcc_with_no_cursors() {
315        let store = InMemoryCursorStore::new();
316
317        let result = store.lowest_common_cursor(&[&topic("a"), &topic("b")]);
318        assert!(result.is_empty());
319    }
320}