xmtp_api_d14n/protocol/resolve/
network_backoff.rs

1use std::collections::HashSet;
2
3use crate::{
4    d14n::QueryEnvelope,
5    protocol::{
6        Envelope, ResolutionError, ResolveDependencies, Resolved, types::RequiredDependency,
7    },
8};
9use itertools::Itertools;
10use tracing::warn;
11use xmtp_common::{ExponentialBackoff, RetryableError, Strategy};
12use xmtp_configuration::MAX_PAGE_SIZE;
13use xmtp_proto::{
14    api::{Client, Query, VectorClock},
15    types::{Cursor, GlobalCursor, Topic},
16    xmtp::xmtpv4::envelopes::OriginatorEnvelope,
17};
18
19/// try resolve d14n dependencies based on a backoff strategy
20#[derive(Clone, Debug)]
21pub struct NetworkBackoffResolver<ApiClient> {
22    client: ApiClient,
23    backoff: ExponentialBackoff,
24}
25
26pub fn network_backoff<ApiClient>(client: &ApiClient) -> NetworkBackoffResolver<&ApiClient> {
27    NetworkBackoffResolver {
28        client,
29        backoff: ExponentialBackoff::default(),
30    }
31}
32
33#[xmtp_common::async_trait]
34impl<ApiClient> ResolveDependencies for NetworkBackoffResolver<ApiClient>
35where
36    ApiClient: Client,
37    <ApiClient as Client>::Error: RetryableError,
38{
39    type ResolvedEnvelope = OriginatorEnvelope;
40    /// Resolve dependencies, starting with a list of dependencies. Should try to resolve
41    /// all dependents after `dependency`, if `Dependency` is missing as well.
42    /// * Once resolved, these dependencies may have missing dependencies of their own.
43    /// # Returns
44    /// * `HashSet<Self::ResolvedEnvelope>`: The list of envelopes which were resolved.
45    async fn resolve(
46        &self,
47        mut missing: HashSet<RequiredDependency>,
48    ) -> Result<Resolved<Self::ResolvedEnvelope>, ResolutionError> {
49        let mut attempts = 0;
50        let time_spent = xmtp_common::time::Instant::now();
51        let mut resolved = Vec::new();
52        while !missing.is_empty() {
53            if let Some(wait_for) = self.backoff.backoff(attempts, time_spent) {
54                tracing::info!("waiting for {:?}", wait_for);
55                xmtp_common::time::sleep(wait_for).await;
56                attempts += 1;
57            } else {
58                missing.iter().for_each(|m| {
59                    warn!("dropping dependency {}, could not resolve", m);
60                });
61                break;
62            }
63            let (topics, lcc) = lcc(&missing);
64            let envelopes = QueryEnvelope::builder()
65                .topics(topics)
66                .last_seen(lcc)
67                .limit(MAX_PAGE_SIZE)
68                .build()?
69                .query(&self.client)
70                .await
71                .map_err(ResolutionError::api)?
72                .envelopes;
73            let got = envelopes
74                .iter()
75                .map(|e| e.cursor())
76                .collect::<Result<HashSet<Cursor>, _>>()?;
77            missing.retain(|m| !got.contains(&m.cursor));
78            resolved.extend(envelopes);
79        }
80        Ok(Resolved {
81            resolved,
82            unresolved: (!missing.is_empty()).then_some(missing),
83        })
84    }
85}
86
87/// Get the LCC and topics from a list of missing envelopes
88fn lcc(missing: &HashSet<RequiredDependency>) -> (Vec<Topic>, GlobalCursor) {
89    // get the lcc by first getting lowest Cursor
90    // per topic, then merging the global cursor of every topic into
91    // one.
92    let (topics, last_seen): (Vec<_>, Vec<GlobalCursor>) = missing
93        .iter()
94        .into_grouping_map_by(|m| m.topic.clone())
95        .fold(GlobalCursor::default(), |mut acc, _key, val| {
96            acc.apply_least(&val.cursor);
97            acc
98        })
99        .into_iter()
100        .unzip();
101    let last_seen = last_seen
102        .into_iter()
103        .fold(GlobalCursor::default(), |mut acc, clock| {
104            acc.merge_least(&clock);
105            acc
106        });
107    (topics, last_seen)
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113    use crate::protocol::extractors::test_utils::TestEnvelopeBuilder;
114    use crate::protocol::utils::test;
115    use prost::Message;
116    use xmtp_proto::api::mock::MockNetworkClient;
117    use xmtp_proto::types::TopicKind;
118    use xmtp_proto::xmtp::xmtpv4::message_api::QueryEnvelopesResponse;
119
120    #[xmtp_common::test]
121    async fn test_resolve_all_found_immediately() {
122        let mut client = MockNetworkClient::new();
123        let topic = Topic::new(TopicKind::GroupMessagesV1, vec![1, 2, 3]);
124
125        let missing = test::create_missing_set(
126            topic.clone(),
127            vec![Cursor::new(10, 1u32), Cursor::new(20, 2u32)],
128            vec![Cursor::new(11, 1u32), Cursor::new(21, 2u32)],
129        );
130
131        let envelope1 = TestEnvelopeBuilder::new()
132            .with_originator_node_id(1)
133            .with_originator_sequence_id(10)
134            .build();
135        let envelope2 = TestEnvelopeBuilder::new()
136            .with_originator_node_id(2)
137            .with_originator_sequence_id(20)
138            .build();
139
140        let response = QueryEnvelopesResponse {
141            envelopes: vec![envelope1, envelope2],
142        };
143
144        client.expect_request().returning(move |_, _, _| {
145            let bytes = response.clone().encode_to_vec();
146            Ok(http::Response::new(bytes.into()))
147        });
148
149        let resolver = network_backoff(&client);
150        test::test_resolve_all_found_immediately(&resolver, missing, 2).await;
151    }
152
153    #[xmtp_common::test]
154    async fn test_resolve_partial_resolution() {
155        let mut client = MockNetworkClient::new();
156        let topic = Topic::new(TopicKind::GroupMessagesV1, vec![1, 2, 3]);
157
158        let missing = test::create_missing_set(
159            topic.clone(),
160            vec![Cursor::new(10, 1u32), Cursor::new(20, 2u32)],
161            vec![Cursor::new(11, 1u32), Cursor::new(21, 2u32)],
162        );
163        let expected_unresolved = test::create_missing_set(
164            topic.clone(),
165            vec![Cursor::new(20, 2u32)],
166            vec![Cursor::new(21, 2u32)],
167        );
168
169        // Only return one of the two requested envelopes
170        let envelope1 = TestEnvelopeBuilder::new()
171            .with_originator_node_id(1)
172            .with_originator_sequence_id(10)
173            .build();
174
175        client.expect_request().returning(move |_, _, _| {
176            let response = QueryEnvelopesResponse {
177                envelopes: vec![envelope1.clone()],
178            };
179            let bytes = response.encode_to_vec();
180            Ok(http::Response::new(bytes.into()))
181        });
182
183        let resolver = NetworkBackoffResolver {
184            client: &client,
185            // Use a backoff with very short timeout for testing
186            backoff: ExponentialBackoff::builder()
187                .total_wait_max(std::time::Duration::from_millis(10))
188                .build(),
189        };
190
191        test::test_resolve_partial_resolution(&resolver, missing, 1, expected_unresolved).await;
192    }
193
194    #[xmtp_common::test]
195    async fn test_resolve_empty_missing_set() {
196        let client = MockNetworkClient::new();
197        let resolver = network_backoff(&client);
198
199        test::test_resolve_empty_missing_set(&resolver).await;
200    }
201}