xmtp_api_d14n/protocol/traits/
cursor_store.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use xmtp_common::{MaybeSend, MaybeSync, RetryableError};
4use xmtp_proto::{
5    api::ApiClientError,
6    types::{Cursor, GlobalCursor, OriginatorId, OrphanedEnvelope, Topic, TopicKind},
7};
8
9#[derive(thiserror::Error, Debug)]
10pub enum CursorStoreError {
11    #[error("error writing cursors to persistent store")]
12    Write,
13    #[error("error reading cursors from persistent store")]
14    Read,
15    #[error("the store cannot handle topic of kind {0}")]
16    UnhandledTopicKind(TopicKind),
17    #[error("no dependencies found for {_0:?}")]
18    NoDependenciesFound(Vec<String>),
19    #[error("{0}")]
20    Other(Box<dyn RetryableError>),
21}
22
23impl CursorStoreError {
24    pub fn other<E: RetryableError + 'static>(e: E) -> Self {
25        CursorStoreError::Other(Box::new(e))
26    }
27}
28
29impl RetryableError for CursorStoreError {
30    fn is_retryable(&self) -> bool {
31        match self {
32            Self::Other(s) => s.is_retryable(),
33            // retries should be an implementation detail
34            _ => false,
35        }
36    }
37}
38
39impl<E: std::error::Error> From<CursorStoreError> for ApiClientError<E> {
40    fn from(value: CursorStoreError) -> Self {
41        ApiClientError::Other(Box::new(value) as Box<_>)
42    }
43}
44
45/// Trait defining how cursors should be stored, updated, and fetched
46/// _NOTE:_, implementations decide retry strategy. the exact implementation of persistence (or lack)
47/// is up to implementors. functions are assumed to be idempotent & atomic.
48pub trait CursorStore: MaybeSend + MaybeSync {
49    // /// Get the last seen cursor per originator
50    // fn last_seen(&self, topic: &Topic) -> Result<GlobalCursor, Self::Error>;
51
52    /// Compute the lowest common cursor across a set of topics.
53    /// For each node_id, uses the **minimum** sequence ID seen across all topics.
54    fn lowest_common_cursor(&self, topics: &[&Topic]) -> Result<GlobalCursor, CursorStoreError>;
55
56    /// get the highest sequence id for a topic, regardless of originator
57    fn latest(&self, topic: &Topic) -> Result<GlobalCursor, CursorStoreError>;
58
59    /// Get the latest cursor for each originator
60    fn latest_per_originator(
61        &self,
62        topic: &Topic,
63        originators: &[&OriginatorId],
64    ) -> Result<GlobalCursor, CursorStoreError>;
65
66    fn latest_for_originator(
67        &self,
68        topic: &Topic,
69        originator: &OriginatorId,
70    ) -> Result<Cursor, CursorStoreError> {
71        let sid = self
72            .latest_per_originator(topic, &[originator])?
73            .get(originator);
74        Ok(Cursor::new(sid, *originator))
75    }
76
77    /// Get the latest cursor for multiple topics at once.
78    /// Returns a HashMap mapping each topic to its GlobalCursor.
79    fn latest_for_topics(
80        &self,
81        topics: &mut dyn Iterator<Item = &Topic>,
82    ) -> Result<HashMap<Topic, GlobalCursor>, CursorStoreError>;
83
84    // temp until reliable streams
85    fn lcc_maybe_missing(&self, topic: &[&Topic]) -> Result<GlobalCursor, CursorStoreError>;
86    /// find dependencies of each locally-stored intent payload hash
87    fn find_message_dependencies(
88        &self,
89        hashes: &[&[u8]],
90    ) -> Result<HashMap<Vec<u8>, Cursor>, CursorStoreError>;
91
92    /// ice envelopes that cannot yet be processed
93    fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<(), CursorStoreError>;
94
95    /// try to resolve any children that may depend on [`Cursor`]
96    fn resolve_children(
97        &self,
98        cursors: &[Cursor],
99    ) -> Result<Vec<OrphanedEnvelope>, CursorStoreError>;
100}
101
102impl<T: CursorStore> CursorStore for Option<T> {
103    fn lowest_common_cursor(&self, topics: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
104        if let Some(c) = self {
105            c.lowest_common_cursor(topics)
106        } else {
107            NoCursorStore.lowest_common_cursor(topics)
108        }
109    }
110
111    fn latest(&self, topic: &Topic) -> Result<GlobalCursor, CursorStoreError> {
112        if let Some(c) = self {
113            c.latest(topic)
114        } else {
115            NoCursorStore.latest(topic)
116        }
117    }
118
119    fn latest_per_originator(
120        &self,
121        topic: &Topic,
122        originators: &[&OriginatorId],
123    ) -> Result<GlobalCursor, CursorStoreError> {
124        if let Some(c) = self {
125            c.latest_per_originator(topic, originators)
126        } else {
127            NoCursorStore.latest_per_originator(topic, originators)
128        }
129    }
130
131    fn latest_for_topics(
132        &self,
133        topics: &mut dyn Iterator<Item = &Topic>,
134    ) -> Result<HashMap<Topic, GlobalCursor>, CursorStoreError> {
135        if let Some(c) = self {
136            c.latest_for_topics(topics)
137        } else {
138            NoCursorStore.latest_for_topics(topics)
139        }
140    }
141
142    fn lcc_maybe_missing(&self, topic: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
143        if let Some(c) = self {
144            c.lcc_maybe_missing(topic)
145        } else {
146            NoCursorStore.lcc_maybe_missing(topic)
147        }
148    }
149    fn find_message_dependencies(
150        &self,
151        hashes: &[&[u8]],
152    ) -> Result<HashMap<Vec<u8>, Cursor>, CursorStoreError> {
153        if let Some(c) = self {
154            c.find_message_dependencies(hashes)
155        } else {
156            NoCursorStore.find_message_dependencies(hashes)
157        }
158    }
159    fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<(), CursorStoreError> {
160        if let Some(c) = self {
161            c.ice(orphans)
162        } else {
163            NoCursorStore.ice(orphans)
164        }
165    }
166
167    fn resolve_children(
168        &self,
169        cursors: &[Cursor],
170    ) -> Result<Vec<OrphanedEnvelope>, CursorStoreError> {
171        if let Some(c) = self {
172            c.resolve_children(cursors)
173        } else {
174            NoCursorStore.resolve_children(cursors)
175        }
176    }
177}
178
179impl<T: CursorStore + ?Sized> CursorStore for &T {
180    fn lowest_common_cursor(&self, topics: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
181        (**self).lowest_common_cursor(topics)
182    }
183
184    fn latest(&self, topic: &Topic) -> Result<GlobalCursor, CursorStoreError> {
185        (**self).latest(topic)
186    }
187
188    fn latest_per_originator(
189        &self,
190        topic: &Topic,
191        originators: &[&OriginatorId],
192    ) -> Result<GlobalCursor, CursorStoreError> {
193        (**self).latest_per_originator(topic, originators)
194    }
195
196    fn latest_for_topics(
197        &self,
198        topics: &mut dyn Iterator<Item = &Topic>,
199    ) -> Result<HashMap<Topic, GlobalCursor>, CursorStoreError> {
200        (**self).latest_for_topics(topics)
201    }
202
203    fn lcc_maybe_missing(&self, topic: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
204        (**self).lcc_maybe_missing(topic)
205    }
206
207    fn find_message_dependencies(
208        &self,
209        hashes: &[&[u8]],
210    ) -> Result<HashMap<Vec<u8>, Cursor>, CursorStoreError> {
211        (**self).find_message_dependencies(hashes)
212    }
213
214    fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<(), CursorStoreError> {
215        (**self).ice(orphans)
216    }
217
218    fn resolve_children(
219        &self,
220        cursors: &[Cursor],
221    ) -> Result<Vec<OrphanedEnvelope>, CursorStoreError> {
222        (**self).resolve_children(cursors)
223    }
224}
225
226impl<T: CursorStore + ?Sized> CursorStore for Arc<T> {
227    fn lowest_common_cursor(&self, topics: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
228        (**self).lowest_common_cursor(topics)
229    }
230
231    fn latest(&self, topic: &Topic) -> Result<GlobalCursor, CursorStoreError> {
232        (**self).latest(topic)
233    }
234
235    fn latest_per_originator(
236        &self,
237        topic: &Topic,
238        originators: &[&OriginatorId],
239    ) -> Result<GlobalCursor, CursorStoreError> {
240        (**self).latest_per_originator(topic, originators)
241    }
242
243    fn latest_for_topics(
244        &self,
245        topics: &mut dyn Iterator<Item = &Topic>,
246    ) -> Result<HashMap<Topic, GlobalCursor>, CursorStoreError> {
247        (**self).latest_for_topics(topics)
248    }
249
250    fn lcc_maybe_missing(&self, topic: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
251        (**self).lcc_maybe_missing(topic)
252    }
253
254    fn find_message_dependencies(
255        &self,
256        hashes: &[&[u8]],
257    ) -> Result<HashMap<Vec<u8>, Cursor>, CursorStoreError> {
258        (**self).find_message_dependencies(hashes)
259    }
260
261    fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<(), CursorStoreError> {
262        (**self).ice(orphans)
263    }
264
265    fn resolve_children(
266        &self,
267        cursors: &[Cursor],
268    ) -> Result<Vec<OrphanedEnvelope>, CursorStoreError> {
269        (**self).resolve_children(cursors)
270    }
271}
272
273impl<T: CursorStore + ?Sized> CursorStore for Box<T> {
274    fn lowest_common_cursor(&self, topics: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
275        (**self).lowest_common_cursor(topics)
276    }
277
278    fn latest(&self, topic: &Topic) -> Result<GlobalCursor, CursorStoreError> {
279        (**self).latest(topic)
280    }
281
282    fn latest_per_originator(
283        &self,
284        topic: &Topic,
285        originators: &[&OriginatorId],
286    ) -> Result<GlobalCursor, CursorStoreError> {
287        (**self).latest_per_originator(topic, originators)
288    }
289
290    fn latest_for_topics(
291        &self,
292        topics: &mut dyn Iterator<Item = &Topic>,
293    ) -> Result<HashMap<Topic, GlobalCursor>, CursorStoreError> {
294        (**self).latest_for_topics(topics)
295    }
296
297    fn lcc_maybe_missing(&self, topic: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
298        (**self).lcc_maybe_missing(topic)
299    }
300
301    fn find_message_dependencies(
302        &self,
303        hashes: &[&[u8]],
304    ) -> Result<HashMap<Vec<u8>, Cursor>, CursorStoreError> {
305        (**self).find_message_dependencies(hashes)
306    }
307
308    fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<(), CursorStoreError> {
309        (**self).ice(orphans)
310    }
311
312    fn resolve_children(
313        &self,
314        cursors: &[Cursor],
315    ) -> Result<Vec<OrphanedEnvelope>, CursorStoreError> {
316        (**self).resolve_children(cursors)
317    }
318}
319
320/// This cursor store always returns 0
321#[derive(Default, Copy, Clone)]
322pub struct NoCursorStore;
323
324impl CursorStore for NoCursorStore {
325    fn lowest_common_cursor(&self, _: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
326        Ok(GlobalCursor::default())
327    }
328
329    fn latest(&self, _: &Topic) -> Result<GlobalCursor, CursorStoreError> {
330        Ok(GlobalCursor::default())
331    }
332
333    fn latest_per_originator(
334        &self,
335        _: &Topic,
336        _: &[&OriginatorId],
337    ) -> Result<GlobalCursor, CursorStoreError> {
338        Ok(GlobalCursor::default())
339    }
340
341    fn latest_for_topics(
342        &self,
343        topics: &mut dyn Iterator<Item = &Topic>,
344    ) -> Result<HashMap<Topic, GlobalCursor>, CursorStoreError> {
345        Ok(HashMap::from_iter(
346            topics.map(|t| (t.clone(), GlobalCursor::default())),
347        ))
348    }
349
350    fn lcc_maybe_missing(&self, _: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
351        Ok(GlobalCursor::default())
352    }
353
354    fn find_message_dependencies(
355        &self,
356        _hashes: &[&[u8]],
357    ) -> Result<HashMap<Vec<u8>, Cursor>, CursorStoreError> {
358        Ok(HashMap::new())
359    }
360
361    fn ice(&self, _orphans: Vec<OrphanedEnvelope>) -> Result<(), CursorStoreError> {
362        Ok(())
363    }
364
365    fn resolve_children(
366        &self,
367        _cursors: &[Cursor],
368    ) -> Result<Vec<OrphanedEnvelope>, CursorStoreError> {
369        Ok(Vec::new())
370    }
371}