1use std::collections::HashMap;
2use std::sync::Arc;
3use xmtp_common::{MaybeSend, MaybeSync, RetryableError};
4use xmtp_proto::{
5 api::ApiClientError,
6 types::{Cursor, GlobalCursor, OriginatorId, OrphanedEnvelope, Topic, TopicKind},
7};
8
9#[derive(thiserror::Error, Debug)]
10pub enum CursorStoreError {
11 #[error("error writing cursors to persistent store")]
12 Write,
13 #[error("error reading cursors from persistent store")]
14 Read,
15 #[error("the store cannot handle topic of kind {0}")]
16 UnhandledTopicKind(TopicKind),
17 #[error("no dependencies found for {_0:?}")]
18 NoDependenciesFound(Vec<String>),
19 #[error("{0}")]
20 Other(Box<dyn RetryableError>),
21}
22
23impl CursorStoreError {
24 pub fn other<E: RetryableError + 'static>(e: E) -> Self {
25 CursorStoreError::Other(Box::new(e))
26 }
27}
28
29impl RetryableError for CursorStoreError {
30 fn is_retryable(&self) -> bool {
31 match self {
32 Self::Other(s) => s.is_retryable(),
33 _ => false,
35 }
36 }
37}
38
39impl<E: std::error::Error> From<CursorStoreError> for ApiClientError<E> {
40 fn from(value: CursorStoreError) -> Self {
41 ApiClientError::Other(Box::new(value) as Box<_>)
42 }
43}
44
45pub trait CursorStore: MaybeSend + MaybeSync {
49 fn lowest_common_cursor(&self, topics: &[&Topic]) -> Result<GlobalCursor, CursorStoreError>;
55
56 fn latest(&self, topic: &Topic) -> Result<GlobalCursor, CursorStoreError>;
58
59 fn latest_per_originator(
61 &self,
62 topic: &Topic,
63 originators: &[&OriginatorId],
64 ) -> Result<GlobalCursor, CursorStoreError>;
65
66 fn latest_for_originator(
67 &self,
68 topic: &Topic,
69 originator: &OriginatorId,
70 ) -> Result<Cursor, CursorStoreError> {
71 let sid = self
72 .latest_per_originator(topic, &[originator])?
73 .get(originator);
74 Ok(Cursor::new(sid, *originator))
75 }
76
77 fn latest_for_topics(
80 &self,
81 topics: &mut dyn Iterator<Item = &Topic>,
82 ) -> Result<HashMap<Topic, GlobalCursor>, CursorStoreError>;
83
84 fn lcc_maybe_missing(&self, topic: &[&Topic]) -> Result<GlobalCursor, CursorStoreError>;
86 fn find_message_dependencies(
88 &self,
89 hashes: &[&[u8]],
90 ) -> Result<HashMap<Vec<u8>, Cursor>, CursorStoreError>;
91
92 fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<(), CursorStoreError>;
94
95 fn resolve_children(
97 &self,
98 cursors: &[Cursor],
99 ) -> Result<Vec<OrphanedEnvelope>, CursorStoreError>;
100}
101
102impl<T: CursorStore> CursorStore for Option<T> {
103 fn lowest_common_cursor(&self, topics: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
104 if let Some(c) = self {
105 c.lowest_common_cursor(topics)
106 } else {
107 NoCursorStore.lowest_common_cursor(topics)
108 }
109 }
110
111 fn latest(&self, topic: &Topic) -> Result<GlobalCursor, CursorStoreError> {
112 if let Some(c) = self {
113 c.latest(topic)
114 } else {
115 NoCursorStore.latest(topic)
116 }
117 }
118
119 fn latest_per_originator(
120 &self,
121 topic: &Topic,
122 originators: &[&OriginatorId],
123 ) -> Result<GlobalCursor, CursorStoreError> {
124 if let Some(c) = self {
125 c.latest_per_originator(topic, originators)
126 } else {
127 NoCursorStore.latest_per_originator(topic, originators)
128 }
129 }
130
131 fn latest_for_topics(
132 &self,
133 topics: &mut dyn Iterator<Item = &Topic>,
134 ) -> Result<HashMap<Topic, GlobalCursor>, CursorStoreError> {
135 if let Some(c) = self {
136 c.latest_for_topics(topics)
137 } else {
138 NoCursorStore.latest_for_topics(topics)
139 }
140 }
141
142 fn lcc_maybe_missing(&self, topic: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
143 if let Some(c) = self {
144 c.lcc_maybe_missing(topic)
145 } else {
146 NoCursorStore.lcc_maybe_missing(topic)
147 }
148 }
149 fn find_message_dependencies(
150 &self,
151 hashes: &[&[u8]],
152 ) -> Result<HashMap<Vec<u8>, Cursor>, CursorStoreError> {
153 if let Some(c) = self {
154 c.find_message_dependencies(hashes)
155 } else {
156 NoCursorStore.find_message_dependencies(hashes)
157 }
158 }
159 fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<(), CursorStoreError> {
160 if let Some(c) = self {
161 c.ice(orphans)
162 } else {
163 NoCursorStore.ice(orphans)
164 }
165 }
166
167 fn resolve_children(
168 &self,
169 cursors: &[Cursor],
170 ) -> Result<Vec<OrphanedEnvelope>, CursorStoreError> {
171 if let Some(c) = self {
172 c.resolve_children(cursors)
173 } else {
174 NoCursorStore.resolve_children(cursors)
175 }
176 }
177}
178
179impl<T: CursorStore + ?Sized> CursorStore for &T {
180 fn lowest_common_cursor(&self, topics: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
181 (**self).lowest_common_cursor(topics)
182 }
183
184 fn latest(&self, topic: &Topic) -> Result<GlobalCursor, CursorStoreError> {
185 (**self).latest(topic)
186 }
187
188 fn latest_per_originator(
189 &self,
190 topic: &Topic,
191 originators: &[&OriginatorId],
192 ) -> Result<GlobalCursor, CursorStoreError> {
193 (**self).latest_per_originator(topic, originators)
194 }
195
196 fn latest_for_topics(
197 &self,
198 topics: &mut dyn Iterator<Item = &Topic>,
199 ) -> Result<HashMap<Topic, GlobalCursor>, CursorStoreError> {
200 (**self).latest_for_topics(topics)
201 }
202
203 fn lcc_maybe_missing(&self, topic: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
204 (**self).lcc_maybe_missing(topic)
205 }
206
207 fn find_message_dependencies(
208 &self,
209 hashes: &[&[u8]],
210 ) -> Result<HashMap<Vec<u8>, Cursor>, CursorStoreError> {
211 (**self).find_message_dependencies(hashes)
212 }
213
214 fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<(), CursorStoreError> {
215 (**self).ice(orphans)
216 }
217
218 fn resolve_children(
219 &self,
220 cursors: &[Cursor],
221 ) -> Result<Vec<OrphanedEnvelope>, CursorStoreError> {
222 (**self).resolve_children(cursors)
223 }
224}
225
226impl<T: CursorStore + ?Sized> CursorStore for Arc<T> {
227 fn lowest_common_cursor(&self, topics: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
228 (**self).lowest_common_cursor(topics)
229 }
230
231 fn latest(&self, topic: &Topic) -> Result<GlobalCursor, CursorStoreError> {
232 (**self).latest(topic)
233 }
234
235 fn latest_per_originator(
236 &self,
237 topic: &Topic,
238 originators: &[&OriginatorId],
239 ) -> Result<GlobalCursor, CursorStoreError> {
240 (**self).latest_per_originator(topic, originators)
241 }
242
243 fn latest_for_topics(
244 &self,
245 topics: &mut dyn Iterator<Item = &Topic>,
246 ) -> Result<HashMap<Topic, GlobalCursor>, CursorStoreError> {
247 (**self).latest_for_topics(topics)
248 }
249
250 fn lcc_maybe_missing(&self, topic: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
251 (**self).lcc_maybe_missing(topic)
252 }
253
254 fn find_message_dependencies(
255 &self,
256 hashes: &[&[u8]],
257 ) -> Result<HashMap<Vec<u8>, Cursor>, CursorStoreError> {
258 (**self).find_message_dependencies(hashes)
259 }
260
261 fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<(), CursorStoreError> {
262 (**self).ice(orphans)
263 }
264
265 fn resolve_children(
266 &self,
267 cursors: &[Cursor],
268 ) -> Result<Vec<OrphanedEnvelope>, CursorStoreError> {
269 (**self).resolve_children(cursors)
270 }
271}
272
273impl<T: CursorStore + ?Sized> CursorStore for Box<T> {
274 fn lowest_common_cursor(&self, topics: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
275 (**self).lowest_common_cursor(topics)
276 }
277
278 fn latest(&self, topic: &Topic) -> Result<GlobalCursor, CursorStoreError> {
279 (**self).latest(topic)
280 }
281
282 fn latest_per_originator(
283 &self,
284 topic: &Topic,
285 originators: &[&OriginatorId],
286 ) -> Result<GlobalCursor, CursorStoreError> {
287 (**self).latest_per_originator(topic, originators)
288 }
289
290 fn latest_for_topics(
291 &self,
292 topics: &mut dyn Iterator<Item = &Topic>,
293 ) -> Result<HashMap<Topic, GlobalCursor>, CursorStoreError> {
294 (**self).latest_for_topics(topics)
295 }
296
297 fn lcc_maybe_missing(&self, topic: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
298 (**self).lcc_maybe_missing(topic)
299 }
300
301 fn find_message_dependencies(
302 &self,
303 hashes: &[&[u8]],
304 ) -> Result<HashMap<Vec<u8>, Cursor>, CursorStoreError> {
305 (**self).find_message_dependencies(hashes)
306 }
307
308 fn ice(&self, orphans: Vec<OrphanedEnvelope>) -> Result<(), CursorStoreError> {
309 (**self).ice(orphans)
310 }
311
312 fn resolve_children(
313 &self,
314 cursors: &[Cursor],
315 ) -> Result<Vec<OrphanedEnvelope>, CursorStoreError> {
316 (**self).resolve_children(cursors)
317 }
318}
319
320#[derive(Default, Copy, Clone)]
322pub struct NoCursorStore;
323
324impl CursorStore for NoCursorStore {
325 fn lowest_common_cursor(&self, _: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
326 Ok(GlobalCursor::default())
327 }
328
329 fn latest(&self, _: &Topic) -> Result<GlobalCursor, CursorStoreError> {
330 Ok(GlobalCursor::default())
331 }
332
333 fn latest_per_originator(
334 &self,
335 _: &Topic,
336 _: &[&OriginatorId],
337 ) -> Result<GlobalCursor, CursorStoreError> {
338 Ok(GlobalCursor::default())
339 }
340
341 fn latest_for_topics(
342 &self,
343 topics: &mut dyn Iterator<Item = &Topic>,
344 ) -> Result<HashMap<Topic, GlobalCursor>, CursorStoreError> {
345 Ok(HashMap::from_iter(
346 topics.map(|t| (t.clone(), GlobalCursor::default())),
347 ))
348 }
349
350 fn lcc_maybe_missing(&self, _: &[&Topic]) -> Result<GlobalCursor, CursorStoreError> {
351 Ok(GlobalCursor::default())
352 }
353
354 fn find_message_dependencies(
355 &self,
356 _hashes: &[&[u8]],
357 ) -> Result<HashMap<Vec<u8>, Cursor>, CursorStoreError> {
358 Ok(HashMap::new())
359 }
360
361 fn ice(&self, _orphans: Vec<OrphanedEnvelope>) -> Result<(), CursorStoreError> {
362 Ok(())
363 }
364
365 fn resolve_children(
366 &self,
367 _cursors: &[Cursor],
368 ) -> Result<Vec<OrphanedEnvelope>, CursorStoreError> {
369 Ok(Vec::new())
370 }
371}