1use std::collections::HashSet;
2
3use diesel::prelude::*;
4
5use super::{
6 DbConnection,
7 schema::readd_status::{self},
8};
9use crate::{ConnectionExt, impl_store};
10
11#[derive(Identifiable, Queryable, Selectable, Insertable, Debug, Clone, PartialEq, Eq)]
12#[diesel(table_name = readd_status)]
13#[diesel(primary_key(group_id, installation_id))]
14pub struct ReaddStatus {
15 pub group_id: Vec<u8>,
16 pub installation_id: Vec<u8>,
17 pub requested_at_sequence_id: Option<i64>,
18 pub responded_at_sequence_id: Option<i64>,
19}
20
21impl_store!(ReaddStatus, readd_status);
22
23pub trait QueryReaddStatus {
24 fn get_readd_status(
25 &self,
26 group_id: &[u8],
27 installation_id: &[u8],
28 ) -> Result<Option<ReaddStatus>, crate::ConnectionError>;
29
30 fn is_awaiting_readd(
31 &self,
32 group_id: &[u8],
33 installation_id: &[u8],
34 ) -> Result<bool, crate::ConnectionError>;
35
36 fn update_requested_at_sequence_id(
40 &self,
41 group_id: &[u8],
42 installation_id: &[u8],
43 sequence_id: i64,
44 ) -> Result<(), crate::ConnectionError>;
45
46 fn update_responded_at_sequence_id(
50 &self,
51 group_id: &[u8],
52 installation_id: &[u8],
53 sequence_id: i64,
54 ) -> Result<(), crate::ConnectionError>;
55
56 fn delete_other_readd_statuses(
57 &self,
58 group_id: &[u8],
59 self_installation_id: &[u8],
60 ) -> Result<(), crate::ConnectionError>;
61
62 fn delete_readd_statuses(
63 &self,
64 group_id: &[u8],
65 installation_ids: HashSet<Vec<u8>>,
66 ) -> Result<(), crate::ConnectionError>;
67
68 fn get_readds_awaiting_response(
69 &self,
70 group_id: &[u8],
71 self_installation_id: &[u8],
72 ) -> Result<Vec<ReaddStatus>, crate::ConnectionError>;
73}
74
75impl<C: ConnectionExt> QueryReaddStatus for DbConnection<C> {
76 fn get_readd_status(
77 &self,
78 group_id: &[u8],
79 installation_id: &[u8],
80 ) -> Result<Option<ReaddStatus>, crate::ConnectionError> {
81 use super::schema::readd_status::dsl as readd_dsl;
82 use diesel::QueryDsl;
83
84 self.raw_query_read(|conn| {
85 readd_dsl::readd_status
86 .filter(readd_dsl::group_id.eq(group_id))
87 .filter(readd_dsl::installation_id.eq(installation_id))
88 .first::<ReaddStatus>(conn)
89 .optional()
90 })
91 }
92
93 fn is_awaiting_readd(
94 &self,
95 group_id: &[u8],
96 installation_id: &[u8],
97 ) -> Result<bool, crate::ConnectionError> {
98 let readd_status = self.get_readd_status(group_id, installation_id)?;
99 if let Some(readd_status) = readd_status
100 && let Some(requested_at) = readd_status.requested_at_sequence_id
101 && requested_at >= readd_status.responded_at_sequence_id.unwrap_or(0)
102 {
103 return Ok(true);
104 }
105 Ok(false)
106 }
107
108 fn update_requested_at_sequence_id(
109 &self,
110 group_id: &[u8],
111 installation_id: &[u8],
112 sequence_id: i64,
113 ) -> Result<(), crate::ConnectionError> {
114 use super::schema::readd_status::dsl as readd_dsl;
115 use diesel::query_dsl::methods::FilterDsl;
116
117 let new_status = super::readd_status::ReaddStatus {
118 group_id: group_id.to_vec(),
119 installation_id: installation_id.to_vec(),
120 requested_at_sequence_id: Some(sequence_id),
121 responded_at_sequence_id: None,
122 };
123
124 self.raw_query_write(|conn| {
125 diesel::insert_into(readd_dsl::readd_status)
126 .values(&new_status)
127 .on_conflict((readd_dsl::group_id, readd_dsl::installation_id))
128 .do_update()
129 .set(readd_dsl::requested_at_sequence_id.eq(sequence_id))
130 .filter(
131 readd_dsl::requested_at_sequence_id
132 .is_null()
133 .or(readd_dsl::requested_at_sequence_id.lt(sequence_id)),
134 )
135 .execute(conn)
136 })?;
137
138 Ok(())
139 }
140
141 fn update_responded_at_sequence_id(
142 &self,
143 group_id: &[u8],
144 installation_id: &[u8],
145 sequence_id: i64,
146 ) -> Result<(), crate::ConnectionError> {
147 use super::schema::readd_status::dsl as readd_dsl;
148 use diesel::query_dsl::methods::FilterDsl;
149
150 let new_status = super::readd_status::ReaddStatus {
151 group_id: group_id.to_vec(),
152 installation_id: installation_id.to_vec(),
153 requested_at_sequence_id: None,
154 responded_at_sequence_id: Some(sequence_id),
155 };
156
157 self.raw_query_write(|conn| {
158 diesel::insert_into(readd_dsl::readd_status)
159 .values(&new_status)
160 .on_conflict((readd_dsl::group_id, readd_dsl::installation_id))
161 .do_update()
162 .set(readd_dsl::responded_at_sequence_id.eq(sequence_id))
163 .filter(
164 readd_dsl::responded_at_sequence_id
165 .is_null()
166 .or(readd_dsl::responded_at_sequence_id.lt(sequence_id)),
167 )
168 .execute(conn)
169 })?;
170
171 Ok(())
172 }
173
174 fn delete_other_readd_statuses(
175 &self,
176 group_id: &[u8],
177 self_installation_id: &[u8],
178 ) -> Result<(), crate::ConnectionError> {
179 use super::schema::readd_status::dsl as readd_dsl;
180 use diesel::{ExpressionMethods, QueryDsl};
181
182 self.raw_query_write(|conn| {
183 diesel::delete(
184 readd_dsl::readd_status
185 .filter(readd_dsl::group_id.eq(group_id))
186 .filter(readd_dsl::installation_id.ne(self_installation_id)),
187 )
188 .execute(conn)?;
189 Ok(())
190 })
191 }
192
193 fn delete_readd_statuses(
194 &self,
195 group_id: &[u8],
196 installation_ids: HashSet<Vec<u8>>,
197 ) -> Result<(), crate::ConnectionError> {
198 use super::schema::readd_status::dsl as readd_dsl;
199 use diesel::{ExpressionMethods, QueryDsl};
200
201 self.raw_query_write(|conn| {
202 diesel::delete(
203 readd_dsl::readd_status
204 .filter(readd_dsl::group_id.eq(group_id))
205 .filter(readd_dsl::installation_id.eq_any(installation_ids)),
206 )
207 .execute(conn)?;
208 Ok(())
209 })
210 }
211
212 fn get_readds_awaiting_response(
213 &self,
214 group_id: &[u8],
215 self_installation_id: &[u8],
216 ) -> Result<Vec<ReaddStatus>, crate::ConnectionError> {
217 use super::schema::readd_status::dsl as readd_dsl;
218 use diesel::{ExpressionMethods, QueryDsl};
219
220 self.raw_query_read(|conn| {
221 readd_dsl::readd_status
222 .filter(readd_dsl::group_id.eq(group_id))
223 .filter(readd_dsl::installation_id.ne(self_installation_id))
224 .filter(readd_dsl::requested_at_sequence_id.is_not_null())
225 .filter(
226 readd_dsl::requested_at_sequence_id
227 .ge(readd_dsl::responded_at_sequence_id)
228 .or(readd_dsl::responded_at_sequence_id.is_null()),
229 )
230 .load::<ReaddStatus>(conn)
231 })
232 }
233}
234
235impl<T> QueryReaddStatus for &T
236where
237 T: QueryReaddStatus,
238{
239 fn get_readd_status(
240 &self,
241 group_id: &[u8],
242 installation_id: &[u8],
243 ) -> Result<Option<ReaddStatus>, crate::ConnectionError> {
244 (**self).get_readd_status(group_id, installation_id)
245 }
246
247 fn is_awaiting_readd(
248 &self,
249 group_id: &[u8],
250 installation_id: &[u8],
251 ) -> Result<bool, crate::ConnectionError> {
252 (**self).is_awaiting_readd(group_id, installation_id)
253 }
254
255 fn update_requested_at_sequence_id(
256 &self,
257 group_id: &[u8],
258 installation_id: &[u8],
259 sequence_id: i64,
260 ) -> Result<(), crate::ConnectionError> {
261 (**self).update_requested_at_sequence_id(group_id, installation_id, sequence_id)
262 }
263
264 fn update_responded_at_sequence_id(
265 &self,
266 group_id: &[u8],
267 installation_id: &[u8],
268 sequence_id: i64,
269 ) -> Result<(), crate::ConnectionError> {
270 (**self).update_responded_at_sequence_id(group_id, installation_id, sequence_id)
271 }
272
273 fn delete_other_readd_statuses(
274 &self,
275 group_id: &[u8],
276 self_installation_id: &[u8],
277 ) -> Result<(), crate::ConnectionError> {
278 (**self).delete_other_readd_statuses(group_id, self_installation_id)
279 }
280
281 fn delete_readd_statuses(
282 &self,
283 group_id: &[u8],
284 installation_ids: HashSet<Vec<u8>>,
285 ) -> Result<(), crate::ConnectionError> {
286 (**self).delete_readd_statuses(group_id, installation_ids)
287 }
288
289 fn get_readds_awaiting_response(
290 &self,
291 group_id: &[u8],
292 self_installation_id: &[u8],
293 ) -> Result<Vec<ReaddStatus>, crate::ConnectionError> {
294 (**self).get_readds_awaiting_response(group_id, self_installation_id)
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use crate::{Store, test_utils::with_connection};
302
303 #[xmtp_common::test]
304 fn test_get_readd_status_not_found() {
305 with_connection(|conn| {
306 let group_id = vec![1, 2, 3];
307 let installation_id = vec![4, 5, 6];
308
309 let result = conn.get_readd_status(&group_id, &installation_id).unwrap();
310 assert!(result.is_none());
311 })
312 }
313
314 #[xmtp_common::test]
315 fn test_store_and_get_readd_status() {
316 with_connection(|conn| {
317 let group_id = vec![1, 2, 3];
318 let installation_id = vec![4, 5, 6];
319
320 let status = ReaddStatus {
321 group_id: group_id.clone(),
322 installation_id: installation_id.clone(),
323 requested_at_sequence_id: Some(100),
324 responded_at_sequence_id: Some(50),
325 };
326
327 status.store(conn).unwrap();
329
330 let retrieved = conn.get_readd_status(&group_id, &installation_id).unwrap();
332 assert!(retrieved.is_some());
333 let retrieved_status = retrieved.unwrap();
334 assert_eq!(retrieved_status.requested_at_sequence_id, Some(100));
335 assert_eq!(retrieved_status.responded_at_sequence_id, Some(50));
336 })
337 }
338
339 #[xmtp_common::test]
340 fn test_update_requested_at_sequence_id_creates_new() {
341 with_connection(|conn| {
342 let group_id = vec![1, 2, 3];
343 let installation_id = vec![4, 5, 6];
344 let sequence_id = 100;
345
346 conn.update_requested_at_sequence_id(&group_id, &installation_id, sequence_id)
348 .unwrap();
349
350 let status = conn.get_readd_status(&group_id, &installation_id).unwrap();
352 assert!(status.is_some());
353 let status = status.unwrap();
354 assert_eq!(status.requested_at_sequence_id, Some(sequence_id));
355 assert_eq!(status.responded_at_sequence_id, None);
356 })
357 }
358
359 #[xmtp_common::test]
360 fn test_update_requested_at_sequence_id_updates_existing() {
361 with_connection(|conn| {
362 let group_id = vec![1, 2, 3];
363 let installation_id = vec![4, 5, 6];
364
365 let initial_status = ReaddStatus {
367 group_id: group_id.clone(),
368 installation_id: installation_id.clone(),
369 requested_at_sequence_id: Some(50),
370 responded_at_sequence_id: Some(25),
371 };
372 initial_status.store(conn).unwrap();
373
374 conn.update_requested_at_sequence_id(&group_id, &installation_id, 100)
376 .unwrap();
377
378 let status = conn.get_readd_status(&group_id, &installation_id).unwrap();
380 assert!(status.is_some());
381 let status = status.unwrap();
382 assert_eq!(status.requested_at_sequence_id, Some(100));
383 assert_eq!(status.responded_at_sequence_id, Some(25)); })
385 }
386
387 #[xmtp_common::test]
388 fn test_update_requested_at_sequence_id_only_updates_if_higher() {
389 with_connection(|conn| {
390 let group_id = vec![1, 2, 3];
391 let installation_id = vec![4, 5, 6];
392
393 let initial_status = ReaddStatus {
395 group_id: group_id.clone(),
396 installation_id: installation_id.clone(),
397 requested_at_sequence_id: Some(100),
398 responded_at_sequence_id: Some(50),
399 };
400 initial_status.store(conn).unwrap();
401
402 conn.update_requested_at_sequence_id(&group_id, &installation_id, 75)
404 .unwrap();
405
406 let status = conn.get_readd_status(&group_id, &installation_id).unwrap();
408 assert!(status.is_some());
409 let status = status.unwrap();
410 assert_eq!(status.requested_at_sequence_id, Some(100)); assert_eq!(status.responded_at_sequence_id, Some(50)); })
413 }
414
415 #[xmtp_common::test]
416 fn test_update_requested_at_sequence_id_updates_from_null() {
417 with_connection(|conn| {
418 let group_id = vec![1, 2, 3];
419 let installation_id = vec![4, 5, 6];
420
421 let initial_status = ReaddStatus {
423 group_id: group_id.clone(),
424 installation_id: installation_id.clone(),
425 requested_at_sequence_id: None,
426 responded_at_sequence_id: Some(25),
427 };
428 initial_status.store(conn).unwrap();
429
430 conn.update_requested_at_sequence_id(&group_id, &installation_id, 50)
432 .unwrap();
433
434 let status = conn.get_readd_status(&group_id, &installation_id).unwrap();
436 assert!(status.is_some());
437 let status = status.unwrap();
438 assert_eq!(status.requested_at_sequence_id, Some(50));
439 assert_eq!(status.responded_at_sequence_id, Some(25)); })
441 }
442
443 #[xmtp_common::test]
444 async fn test_update_responded_at_sequence_id_creates_new() {
445 with_connection(|conn| {
446 let group_id = vec![1, 2, 3];
447 let installation_id = vec![4, 5, 6];
448 let sequence_id = 100;
449
450 conn.update_responded_at_sequence_id(&group_id, &installation_id, sequence_id)
452 .unwrap();
453
454 let status = conn.get_readd_status(&group_id, &installation_id).unwrap();
456 assert!(status.is_some());
457 let status = status.unwrap();
458 assert_eq!(status.responded_at_sequence_id, Some(sequence_id));
459 assert_eq!(status.requested_at_sequence_id, None);
460 })
461 }
462
463 #[xmtp_common::test]
464 fn test_update_responded_at_sequence_id_only_updates_if_higher() {
465 with_connection(|conn| {
466 let group_id = vec![1, 2, 3];
467 let installation_id = vec![4, 5, 6];
468
469 let initial_status = ReaddStatus {
471 group_id: group_id.clone(),
472 installation_id: installation_id.clone(),
473 requested_at_sequence_id: Some(50),
474 responded_at_sequence_id: Some(100),
475 };
476 initial_status.store(conn).unwrap();
477
478 conn.update_responded_at_sequence_id(&group_id, &installation_id, 75)
480 .unwrap();
481
482 let status = conn.get_readd_status(&group_id, &installation_id).unwrap();
484 assert!(status.is_some());
485 let status = status.unwrap();
486 assert_eq!(status.responded_at_sequence_id, Some(100)); assert_eq!(status.requested_at_sequence_id, Some(50)); conn.update_responded_at_sequence_id(&group_id, &installation_id, 125)
491 .unwrap();
492
493 let status = conn.get_readd_status(&group_id, &installation_id).unwrap();
495 assert!(status.is_some());
496 let status = status.unwrap();
497 assert_eq!(status.responded_at_sequence_id, Some(125)); assert_eq!(status.requested_at_sequence_id, Some(50)); })
500 }
501
502 #[xmtp_common::test]
503 fn test_is_awaiting_readd_no_status() {
504 with_connection(|conn| {
505 let group_id = vec![1, 2, 3];
506 let installation_id = vec![4, 5, 6];
507
508 let result = conn.is_awaiting_readd(&group_id, &installation_id).unwrap();
510 assert!(!result);
511 })
512 }
513
514 #[xmtp_common::test]
515 fn test_is_awaiting_readd_no_request() {
516 with_connection(|conn| {
517 let group_id = vec![1, 2, 3];
518 let installation_id = vec![4, 5, 6];
519
520 ReaddStatus {
522 group_id: group_id.clone(),
523 installation_id: installation_id.clone(),
524 requested_at_sequence_id: None,
525 responded_at_sequence_id: Some(5),
526 }
527 .store(conn)
528 .unwrap();
529
530 let result = conn.is_awaiting_readd(&group_id, &installation_id).unwrap();
532 assert!(!result);
533 })
534 }
535
536 #[xmtp_common::test]
537 fn test_is_awaiting_readd_request_pending() {
538 with_connection(|conn| {
539 let group_id = vec![1, 2, 3];
540 let installation_id = vec![4, 5, 6];
541
542 ReaddStatus {
544 group_id: group_id.clone(),
545 installation_id: installation_id.clone(),
546 requested_at_sequence_id: Some(10),
547 responded_at_sequence_id: Some(5),
548 }
549 .store(conn)
550 .unwrap();
551
552 let result = conn.is_awaiting_readd(&group_id, &installation_id).unwrap();
554 assert!(result);
555 })
556 }
557
558 #[xmtp_common::test]
559 fn test_is_awaiting_readd_request_fulfilled() {
560 with_connection(|conn| {
561 let group_id = vec![1, 2, 3];
562 let installation_id = vec![4, 5, 6];
563
564 ReaddStatus {
566 group_id: group_id.clone(),
567 installation_id: installation_id.clone(),
568 requested_at_sequence_id: Some(5),
569 responded_at_sequence_id: Some(10),
570 }
571 .store(conn)
572 .unwrap();
573
574 let result = conn.is_awaiting_readd(&group_id, &installation_id).unwrap();
576 assert!(!result);
577 })
578 }
579
580 #[xmtp_common::test]
581 fn test_is_awaiting_readd_equal_sequence_ids() {
582 with_connection(|conn| {
583 let group_id = vec![1, 2, 3];
584 let installation_id = vec![4, 5, 6];
585
586 ReaddStatus {
588 group_id: group_id.clone(),
589 installation_id: installation_id.clone(),
590 requested_at_sequence_id: Some(10),
591 responded_at_sequence_id: Some(10),
592 }
593 .store(conn)
594 .unwrap();
595
596 let result = conn.is_awaiting_readd(&group_id, &installation_id).unwrap();
600 assert!(result);
601 })
602 }
603
604 #[xmtp_common::test]
605 fn test_is_awaiting_readd_no_responded_at() {
606 with_connection(|conn| {
607 let group_id = vec![1, 2, 3];
608 let installation_id = vec![4, 5, 6];
609
610 ReaddStatus {
612 group_id: group_id.clone(),
613 installation_id: installation_id.clone(),
614 requested_at_sequence_id: Some(5),
615 responded_at_sequence_id: None,
616 }
617 .store(conn)
618 .unwrap();
619
620 let result = conn.is_awaiting_readd(&group_id, &installation_id).unwrap();
622 assert!(result);
623 })
624 }
625
626 #[xmtp_common::test]
627 fn test_delete_other_readd_statuses() {
628 with_connection(|conn| {
629 let group_id = vec![1, 2, 3];
630 let keep_installation_id = vec![10, 11, 12];
631 let delete_installation_id_1 = vec![20, 21, 22];
632 let delete_installation_id_2 = vec![30, 31, 32];
633
634 let status_to_keep = ReaddStatus {
636 group_id: group_id.clone(),
637 installation_id: keep_installation_id.clone(),
638 requested_at_sequence_id: Some(10),
639 responded_at_sequence_id: Some(5),
640 };
641 status_to_keep.store(conn).unwrap();
642
643 let status_to_delete_1 = ReaddStatus {
644 group_id: group_id.clone(),
645 installation_id: delete_installation_id_1.clone(),
646 requested_at_sequence_id: Some(15),
647 responded_at_sequence_id: Some(8),
648 };
649 status_to_delete_1.store(conn).unwrap();
650
651 let status_to_delete_2 = ReaddStatus {
652 group_id: group_id.clone(),
653 installation_id: delete_installation_id_2.clone(),
654 requested_at_sequence_id: Some(20),
655 responded_at_sequence_id: None,
656 };
657 status_to_delete_2.store(conn).unwrap();
658
659 let different_group_status = ReaddStatus {
661 group_id: vec![4, 5, 6],
662 installation_id: vec![40, 41, 42],
663 requested_at_sequence_id: Some(25),
664 responded_at_sequence_id: Some(12),
665 };
666 different_group_status.store(conn).unwrap();
667
668 conn.delete_other_readd_statuses(&group_id, &keep_installation_id)
670 .unwrap();
671
672 let kept_status = conn
674 .get_readd_status(&group_id, &keep_installation_id)
675 .unwrap();
676 assert!(kept_status.is_some());
677
678 let deleted_status_1 = conn
680 .get_readd_status(&group_id, &delete_installation_id_1)
681 .unwrap();
682 assert!(deleted_status_1.is_none());
683
684 let deleted_status_2 = conn
685 .get_readd_status(&group_id, &delete_installation_id_2)
686 .unwrap();
687 assert!(deleted_status_2.is_none());
688
689 let different_group_check = conn.get_readd_status(&[4, 5, 6], &[40, 41, 42]).unwrap();
691 assert!(different_group_check.is_some());
692 })
693 }
694
695 #[xmtp_common::test]
696 fn test_get_readds_awaiting_response() {
697 with_connection(|conn| {
698 let group_id = vec![1, 2, 3];
699 let self_installation_id = vec![10, 11, 12];
700 let other_installation_id_1 = vec![20, 21, 22];
701 let other_installation_id_2 = vec![30, 31, 32];
702
703 let pending_status_1 = ReaddStatus {
707 group_id: group_id.clone(),
708 installation_id: other_installation_id_1.clone(),
709 requested_at_sequence_id: Some(10),
710 responded_at_sequence_id: Some(5),
711 };
712 pending_status_1.store(conn).unwrap();
713
714 let pending_status_2 = ReaddStatus {
716 group_id: group_id.clone(),
717 installation_id: other_installation_id_2.clone(),
718 requested_at_sequence_id: Some(15),
719 responded_at_sequence_id: None,
720 };
721 pending_status_2.store(conn).unwrap();
722
723 let fulfilled_status = ReaddStatus {
725 group_id: group_id.clone(),
726 installation_id: vec![40, 41, 42],
727 requested_at_sequence_id: Some(8),
728 responded_at_sequence_id: Some(12),
729 };
730 fulfilled_status.store(conn).unwrap();
731
732 let self_status = ReaddStatus {
734 group_id: group_id.clone(),
735 installation_id: self_installation_id.clone(),
736 requested_at_sequence_id: Some(20),
737 responded_at_sequence_id: Some(10),
738 };
739 self_status.store(conn).unwrap();
740
741 let no_request_status = ReaddStatus {
743 group_id: group_id.clone(),
744 installation_id: vec![50, 51, 52],
745 requested_at_sequence_id: None,
746 responded_at_sequence_id: Some(5),
747 };
748 no_request_status.store(conn).unwrap();
749
750 let different_group_status = ReaddStatus {
752 group_id: vec![4, 5, 6],
753 installation_id: vec![60, 61, 62],
754 requested_at_sequence_id: Some(25),
755 responded_at_sequence_id: Some(15),
756 };
757 different_group_status.store(conn).unwrap();
758
759 let result = conn
761 .get_readds_awaiting_response(&group_id, &self_installation_id)
762 .unwrap();
763
764 assert_eq!(result.len(), 2);
766
767 let returned_installations: Vec<Vec<u8>> =
769 result.iter().map(|r| r.installation_id.clone()).collect();
770 assert!(returned_installations.contains(&other_installation_id_1));
771 assert!(returned_installations.contains(&other_installation_id_2));
772
773 for status in result {
775 assert_eq!(status.group_id, group_id);
776 assert_ne!(status.installation_id, self_installation_id);
777 assert!(status.requested_at_sequence_id.is_some());
778
779 let requested_at = status.requested_at_sequence_id.unwrap();
781 let responded_at = status.responded_at_sequence_id.unwrap_or(0);
782 assert!(requested_at >= responded_at);
783 }
784 })
785 }
786}