xmtp_api_d14n/queries/d14n/
xmtp_query.rs

1use xmtp_common::RetryableError;
2use xmtp_configuration::MAX_PAGE_SIZE;
3use xmtp_proto::{
4    api::{ApiClientError, Client, Query},
5    types::{GlobalCursor, Topic},
6    xmtp::xmtpv4::envelopes::OriginatorEnvelope,
7};
8
9use crate::{
10    D14nClient,
11    d14n::QueryEnvelope,
12    protocol::{CursorStore, Sort, XmtpEnvelope, XmtpQuery, sort},
13};
14
15#[xmtp_common::async_trait]
16impl<C, Store, E> XmtpQuery for D14nClient<C, Store>
17where
18    C: Client<Error = E>,
19    ApiClientError<E>: From<ApiClientError<<C as Client>::Error>>,
20    E: RetryableError + 'static,
21    Store: CursorStore,
22{
23    type Error = ApiClientError<E>;
24
25    async fn query_at(
26        &self,
27        topic: Topic,
28        at: Option<GlobalCursor>,
29    ) -> Result<XmtpEnvelope, Self::Error> {
30        let mut envelopes: Vec<OriginatorEnvelope> = QueryEnvelope::builder()
31            .topic(topic)
32            .last_seen(at.unwrap_or_default())
33            .limit(MAX_PAGE_SIZE)
34            .build()?
35            .query(&self.client)
36            .await?
37            .envelopes;
38        // sort the envelopes by their originator timestamp
39        sort::timestamp(&mut envelopes).sort()?;
40        Ok(XmtpEnvelope::new(envelopes))
41    }
42}