1use crate::protocol::{CursorStore, CursorStoreError};
2use parking_lot::Mutex;
3use std::collections::{HashMap, HashSet};
4use std::fmt;
5use std::sync::Arc;
6use xmtp_proto::api::VectorClock;
7use xmtp_proto::types::{Cursor, GlobalCursor, OriginatorId, OrphanedEnvelope, Topic};
8
9#[derive(Default, Clone)]
10pub struct InMemoryCursorStore {
11 topics: HashMap<Topic, GlobalCursor>,
12 icebox: Arc<Mutex<HashSet<OrphanedEnvelope>>>,
13}
14
15impl InMemoryCursorStore {
16 pub fn new() -> Self {
17 Self {
18 topics: HashMap::new(),
19 icebox: Arc::new(Mutex::new(HashSet::new())),
20 }
21 }
22
23 pub fn received(&mut self, topic: Topic, new_clock: &GlobalCursor) {
25 let current = self.topics.entry(topic).or_default();
26 current.merge(new_clock);
27 }
28
29 pub fn get_latest(&self, topic: &Topic) -> Option<&GlobalCursor> {
31 self.topics.get(topic)
32 }
33
34 pub fn lowest_common_cursor(&self, topics: &[&Topic]) -> GlobalCursor {
37 let mut min_clock = GlobalCursor::default();
38
39 for topic in topics {
40 if let Some(cursor) = self.get_latest(topic) {
41 for (&node_id, &seq_id) in cursor {
42 min_clock
43 .entry(node_id)
44 .and_modify(|existing| *existing = (*existing).min(seq_id))
45 .or_insert(seq_id);
46 }
47 }
48 }
49 min_clock
50 }
51
52 #[cfg(test)]
54 pub fn orphan_count(&self) -> usize {
55 self.icebox.lock().len()
56 }
57
58 #[cfg(test)]
59 pub fn icebox(&self) -> Vec<OrphanedEnvelope> {
60 let icebox = self.icebox.lock();
61 Vec::from_iter(icebox.clone())
62 }
63}
64
65impl CursorStore for InMemoryCursorStore {
66 fn lowest_common_cursor(
67 &self,
68 topics: &[&Topic],
69 ) -> Result<xmtp_proto::types::GlobalCursor, crate::protocol::CursorStoreError> {
70 Ok(self.lowest_common_cursor(topics))
71 }
72
73 fn lcc_maybe_missing(
74 &self,
75 topics: &[&Topic],
76 ) -> Result<GlobalCursor, super::CursorStoreError> {
77 Ok(self.lowest_common_cursor(topics))
78 }
79
80 fn latest(
81 &self,
82 topic: &xmtp_proto::types::Topic,
83 ) -> Result<GlobalCursor, crate::protocol::CursorStoreError> {
84 Ok(self
85 .get_latest(topic)
86 .cloned()
87 .unwrap_or_else(GlobalCursor::default))
88 }
89
90 fn latest_per_originator(
91 &self,
92 topic: &xmtp_proto::types::Topic,
93 originators: &[&OriginatorId],
94 ) -> Result<GlobalCursor, crate::protocol::CursorStoreError> {
95 Ok(self
96 .get_latest(topic)
97 .unwrap_or(&Default::default())
98 .iter()
99 .filter(|(k, _)| originators.contains(k))
100 .map(|(&k, &v)| (k, v))
101 .collect())
102 }
103
104 fn latest_for_topics(
105 &self,
106 topics: &mut dyn Iterator<Item = &Topic>,
107 ) -> Result<HashMap<Topic, GlobalCursor>, super::CursorStoreError> {
108 Ok(topics
109 .map(|topic| (topic.clone(), self.latest(topic).unwrap_or_default()))
110 .collect())
111 }
112
113 fn find_message_dependencies(
114 &self,
115 hash: &[&[u8]],
116 ) -> Result<HashMap<Vec<u8>, Cursor>, super::CursorStoreError> {
117 Err(CursorStoreError::NoDependenciesFound(
119 hash.iter().map(hex::encode).collect(),
120 ))
121 }
122
123 fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<(), CursorStoreError> {
124 let mut icebox = self.icebox.lock();
125 (*icebox).extend(orphans);
126 Ok(())
127 }
128
129 fn resolve_children(
130 &self,
131 cursors: &[Cursor],
132 ) -> Result<Vec<OrphanedEnvelope>, CursorStoreError> {
133 let icebox = self.icebox.lock();
134 Ok(Vec::from_iter(resolve_children_inner(cursors, &icebox)))
135 }
136}
137
138fn resolve_children_inner(
139 cursors: &[Cursor],
140 icebox: &HashSet<OrphanedEnvelope>,
141) -> HashSet<OrphanedEnvelope> {
142 let mut children: HashSet<OrphanedEnvelope> =
143 cursors.iter().fold(HashSet::new(), |mut acc, cursor| {
144 let children = icebox
146 .iter()
147 .filter(|o| o.is_child_of(cursor))
148 .cloned()
149 .collect::<HashSet<_>>();
150 acc.extend(children);
151 acc
152 });
153 let cursors = children.iter().fold(Vec::new(), |mut acc, c| {
155 if !c.depends_on.is_empty() {
156 acc.push(c.cursor);
157 }
158 acc
159 });
160 if !cursors.is_empty() {
161 let v = resolve_children_inner(&cursors, icebox);
162 children.extend(v);
163 }
164 children
165}
166
167impl fmt::Debug for InMemoryCursorStore {
168 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169 let mut entries = f.debug_map();
170
171 for (topic, cursor) in &self.topics {
172 let topic_hex = hex::encode(topic);
174 entries.entry(&topic_hex, cursor);
175 }
176
177 entries.finish()
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184
185 fn cursor_with(kvs: &[(u32, u64)]) -> GlobalCursor {
186 GlobalCursor::new(kvs.iter().cloned().collect())
187 }
188
189 #[xmtp_common::test]
190 fn test_processed_and_get_latest() {
191 let mut store = InMemoryCursorStore::new();
192 let topic = topic("chat/abc");
193
194 let cursor = cursor_with(&[(1, 10), (2, 5)]);
195 store.received(topic.clone(), &cursor.clone());
196
197 let latest = store.get_latest(&topic).unwrap();
198 assert_eq!(latest.get(&1), 10);
199 assert_eq!(latest.get(&2), 5);
200 }
201
202 #[xmtp_common::test]
203 fn test_merge_on_processed() {
204 let mut store = InMemoryCursorStore::new();
205 let topic = topic("chat/merge");
206
207 let c1 = cursor_with(&[(1, 10), (2, 5)]);
208 let c2 = cursor_with(&[(1, 12), (2, 3), (3, 7)]);
209
210 store.received(topic.clone(), &c1);
211 store.received(topic.clone(), &c2);
212
213 let latest = store.get_latest(&topic).unwrap();
214 assert_eq!(latest.get(&1), 12);
215 assert_eq!(latest.get(&2), 5);
216 assert_eq!(latest.get(&3), 7);
217 }
218
219 #[xmtp_common::test]
220 fn test_get_latest_nonexistent_topic() {
221 let store = InMemoryCursorStore::new();
222 let missing_topic = topic("does/not/exist");
223
224 assert!(store.get_latest(&missing_topic).is_none());
225 }
226
227 #[xmtp_common::test]
228 fn test_independent_topics() {
229 let mut store = InMemoryCursorStore::new();
230
231 let topic_a = topic("a");
232 let topic_b = topic("b");
233
234 store.received(topic_a.clone(), &cursor_with(&[(1, 1)]));
235 store.received(topic_b.clone(), &cursor_with(&[(2, 2)]));
236
237 let a = store.get_latest(&topic_a).unwrap();
238 let b = store.get_latest(&topic_b).unwrap();
239
240 assert_eq!(a.get(&1), 1);
241 assert_eq!(b.get(&2), 2);
242 }
243
244 #[xmtp_common::test]
245 fn test_merge_into_empty_store_creates_topic() {
246 let mut store = InMemoryCursorStore::new();
247 let topic = topic("new/topic");
248 let cursor = cursor_with(&[(5, 9)]);
249
250 store.received(topic.clone(), &cursor.clone());
251
252 let stored = store.get_latest(&topic).unwrap();
253 assert_eq!(stored.get(&5), 9);
254 }
255
256 fn topic(name: &str) -> Topic {
257 Topic::from_bytes(name.as_bytes())
258 }
259
260 #[xmtp_common::test]
261 fn test_lcc_normal_case() {
262 let mut store = InMemoryCursorStore::new();
263
264 store.received(topic("a"), &cursor_with(&[(1, 10), (2, 20)]));
265 store.received(topic("b"), &cursor_with(&[(1, 15), (2, 12), (3, 9)]));
266 store.received(topic("c"), &cursor_with(&[(1, 8), (3, 11)]));
267
268 let lcc = store.lowest_common_cursor(&[&topic("a"), &topic("b"), &topic("c")]);
269
270 assert_eq!(lcc.get(&1), 8); assert_eq!(lcc.get(&2), 12); assert_eq!(lcc.get(&3), 9); }
274
275 #[xmtp_common::test]
276 fn test_lcc_with_missing_topic() {
277 let mut store = InMemoryCursorStore::new();
278
279 store.received(topic("a"), &cursor_with(&[(1, 10)]));
280 store.received(topic("b"), &cursor_with(&[(1, 5)]));
281
282 let lcc = store.lowest_common_cursor(&[&topic("a"), &topic("b"), &topic("not-found")]);
283
284 assert_eq!(lcc.get(&1), 5); }
286
287 #[xmtp_common::test]
288 fn test_lcc_with_zero_values() {
289 let mut store = InMemoryCursorStore::new();
290
291 store.received(topic("x"), &cursor_with(&[(1, 0), (2, 4)]));
292 store.received(topic("y"), &cursor_with(&[(1, 3), (2, 0)]));
293
294 let lcc = store.lowest_common_cursor(&[&topic("x"), &topic("y")]);
295
296 assert_eq!(lcc.get(&1), 0);
297 assert_eq!(lcc.get(&2), 0);
298 }
299
300 #[xmtp_common::test]
301 fn test_lcc_with_unseen_nodes() {
302 let mut store = InMemoryCursorStore::new();
303
304 store.received(topic("a"), &cursor_with(&[(1, 5)]));
305 store.received(topic("b"), &cursor_with(&[(2, 7)]));
306
307 let lcc = store.lowest_common_cursor(&[&topic("a"), &topic("b")]);
308
309 assert_eq!(lcc.get(&1), 5);
310 assert_eq!(lcc.get(&2), 7);
311 }
312
313 #[xmtp_common::test]
314 fn test_lcc_with_no_cursors() {
315 let store = InMemoryCursorStore::new();
316
317 let result = store.lowest_common_cursor(&[&topic("a"), &topic("b")]);
318 assert!(result.is_empty());
319 }
320}