xmtp_api_d14n/protocol/resolve/
network_backoff.rs1use std::collections::HashSet;
2
3use crate::{
4 d14n::QueryEnvelope,
5 protocol::{
6 Envelope, ResolutionError, ResolveDependencies, Resolved, types::RequiredDependency,
7 },
8};
9use itertools::Itertools;
10use tracing::warn;
11use xmtp_common::{ExponentialBackoff, RetryableError, Strategy};
12use xmtp_configuration::MAX_PAGE_SIZE;
13use xmtp_proto::{
14 api::{Client, Query, VectorClock},
15 types::{Cursor, GlobalCursor, Topic},
16 xmtp::xmtpv4::envelopes::OriginatorEnvelope,
17};
18
19#[derive(Clone, Debug)]
21pub struct NetworkBackoffResolver<ApiClient> {
22 client: ApiClient,
23 backoff: ExponentialBackoff,
24}
25
26pub fn network_backoff<ApiClient>(client: &ApiClient) -> NetworkBackoffResolver<&ApiClient> {
27 NetworkBackoffResolver {
28 client,
29 backoff: ExponentialBackoff::default(),
30 }
31}
32
33#[xmtp_common::async_trait]
34impl<ApiClient> ResolveDependencies for NetworkBackoffResolver<ApiClient>
35where
36 ApiClient: Client,
37 <ApiClient as Client>::Error: RetryableError,
38{
39 type ResolvedEnvelope = OriginatorEnvelope;
40 async fn resolve(
46 &self,
47 mut missing: HashSet<RequiredDependency>,
48 ) -> Result<Resolved<Self::ResolvedEnvelope>, ResolutionError> {
49 let mut attempts = 0;
50 let time_spent = xmtp_common::time::Instant::now();
51 let mut resolved = Vec::new();
52 while !missing.is_empty() {
53 if let Some(wait_for) = self.backoff.backoff(attempts, time_spent) {
54 tracing::info!("waiting for {:?}", wait_for);
55 xmtp_common::time::sleep(wait_for).await;
56 attempts += 1;
57 } else {
58 missing.iter().for_each(|m| {
59 warn!("dropping dependency {}, could not resolve", m);
60 });
61 break;
62 }
63 let (topics, lcc) = lcc(&missing);
64 let envelopes = QueryEnvelope::builder()
65 .topics(topics)
66 .last_seen(lcc)
67 .limit(MAX_PAGE_SIZE)
68 .build()?
69 .query(&self.client)
70 .await
71 .map_err(ResolutionError::api)?
72 .envelopes;
73 let got = envelopes
74 .iter()
75 .map(|e| e.cursor())
76 .collect::<Result<HashSet<Cursor>, _>>()?;
77 missing.retain(|m| !got.contains(&m.cursor));
78 resolved.extend(envelopes);
79 }
80 Ok(Resolved {
81 resolved,
82 unresolved: (!missing.is_empty()).then_some(missing),
83 })
84 }
85}
86
87fn lcc(missing: &HashSet<RequiredDependency>) -> (Vec<Topic>, GlobalCursor) {
89 let (topics, last_seen): (Vec<_>, Vec<GlobalCursor>) = missing
93 .iter()
94 .into_grouping_map_by(|m| m.topic.clone())
95 .fold(GlobalCursor::default(), |mut acc, _key, val| {
96 acc.apply_least(&val.cursor);
97 acc
98 })
99 .into_iter()
100 .unzip();
101 let last_seen = last_seen
102 .into_iter()
103 .fold(GlobalCursor::default(), |mut acc, clock| {
104 acc.merge_least(&clock);
105 acc
106 });
107 (topics, last_seen)
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113 use crate::protocol::extractors::test_utils::TestEnvelopeBuilder;
114 use crate::protocol::utils::test;
115 use prost::Message;
116 use xmtp_proto::api::mock::MockNetworkClient;
117 use xmtp_proto::types::TopicKind;
118 use xmtp_proto::xmtp::xmtpv4::message_api::QueryEnvelopesResponse;
119
120 #[xmtp_common::test]
121 async fn test_resolve_all_found_immediately() {
122 let mut client = MockNetworkClient::new();
123 let topic = Topic::new(TopicKind::GroupMessagesV1, vec![1, 2, 3]);
124
125 let missing = test::create_missing_set(
126 topic.clone(),
127 vec![Cursor::new(10, 1u32), Cursor::new(20, 2u32)],
128 vec![Cursor::new(11, 1u32), Cursor::new(21, 2u32)],
129 );
130
131 let envelope1 = TestEnvelopeBuilder::new()
132 .with_originator_node_id(1)
133 .with_originator_sequence_id(10)
134 .build();
135 let envelope2 = TestEnvelopeBuilder::new()
136 .with_originator_node_id(2)
137 .with_originator_sequence_id(20)
138 .build();
139
140 let response = QueryEnvelopesResponse {
141 envelopes: vec![envelope1, envelope2],
142 };
143
144 client.expect_request().returning(move |_, _, _| {
145 let bytes = response.clone().encode_to_vec();
146 Ok(http::Response::new(bytes.into()))
147 });
148
149 let resolver = network_backoff(&client);
150 test::test_resolve_all_found_immediately(&resolver, missing, 2).await;
151 }
152
153 #[xmtp_common::test]
154 async fn test_resolve_partial_resolution() {
155 let mut client = MockNetworkClient::new();
156 let topic = Topic::new(TopicKind::GroupMessagesV1, vec![1, 2, 3]);
157
158 let missing = test::create_missing_set(
159 topic.clone(),
160 vec![Cursor::new(10, 1u32), Cursor::new(20, 2u32)],
161 vec![Cursor::new(11, 1u32), Cursor::new(21, 2u32)],
162 );
163 let expected_unresolved = test::create_missing_set(
164 topic.clone(),
165 vec![Cursor::new(20, 2u32)],
166 vec![Cursor::new(21, 2u32)],
167 );
168
169 let envelope1 = TestEnvelopeBuilder::new()
171 .with_originator_node_id(1)
172 .with_originator_sequence_id(10)
173 .build();
174
175 client.expect_request().returning(move |_, _, _| {
176 let response = QueryEnvelopesResponse {
177 envelopes: vec![envelope1.clone()],
178 };
179 let bytes = response.encode_to_vec();
180 Ok(http::Response::new(bytes.into()))
181 });
182
183 let resolver = NetworkBackoffResolver {
184 client: &client,
185 backoff: ExponentialBackoff::builder()
187 .total_wait_max(std::time::Duration::from_millis(10))
188 .build(),
189 };
190
191 test::test_resolve_partial_resolution(&resolver, missing, 1, expected_unresolved).await;
192 }
193
194 #[xmtp_common::test]
195 async fn test_resolve_empty_missing_set() {
196 let client = MockNetworkClient::new();
197 let resolver = network_backoff(&client);
198
199 test::test_resolve_empty_missing_set(&resolver).await;
200 }
201}