xmtp_api_d14n/endpoints/d14n/
subscribe_envelopes.rs

1use derive_builder::Builder;
2use prost::Message;
3use prost::bytes::Bytes;
4use std::borrow::Cow;
5use xmtp_proto::api::{BodyError, Endpoint};
6use xmtp_proto::types::{GlobalCursor, OriginatorId, Topic};
7use xmtp_proto::xmtp::xmtpv4::message_api::SubscribeEnvelopesRequest;
8use xmtp_proto::xmtp::xmtpv4::message_api::{EnvelopesQuery, SubscribeEnvelopesResponse};
9
10/// Query a single thing
11#[derive(Debug, Builder, Default, Clone)]
12#[builder(build_fn(error = "BodyError"))]
13pub struct SubscribeEnvelopes {
14    #[builder(setter(each(name = "topic", into)), default)]
15    topics: Vec<Topic>,
16    #[builder(setter(into), default)]
17    last_seen: Option<GlobalCursor>,
18    #[builder(default)]
19    originators: Vec<OriginatorId>,
20}
21
22impl SubscribeEnvelopes {
23    pub fn builder() -> SubscribeEnvelopesBuilder {
24        Default::default()
25    }
26}
27
28impl Endpoint for SubscribeEnvelopes {
29    type Output = SubscribeEnvelopesResponse;
30    fn grpc_endpoint(&self) -> Cow<'static, str> {
31        xmtp_proto::path_and_query::<SubscribeEnvelopesRequest>()
32    }
33
34    fn body(&self) -> Result<Bytes, BodyError> {
35        for topic in &self.topics {
36            tracing::info!("subscribing to {}", topic.clone());
37        }
38        let query = EnvelopesQuery {
39            topics: self.topics.iter().map(Topic::cloned_vec).collect(),
40            last_seen: self.last_seen.clone().map(Into::into),
41            originator_node_ids: self.originators.clone(),
42        };
43        let query = SubscribeEnvelopesRequest { query: Some(query) };
44        Ok(query.encode_to_vec().into())
45    }
46}
47
48#[cfg(test)]
49mod test {
50    use super::*;
51    use xmtp_api_grpc::test::XmtpdClient;
52    use xmtp_proto::{api::QueryStreamExt as _, prelude::*};
53
54    #[xmtp_common::test]
55    fn test_file_descriptor() {
56        use xmtp_proto::xmtp::xmtpv4::message_api::SubscribeEnvelopesRequest;
57        let pnq = xmtp_proto::path_and_query::<SubscribeEnvelopesRequest>();
58        println!("{}", pnq);
59    }
60
61    #[xmtp_common::test]
62    fn test_grpc_endpoint_returns_correct_path() {
63        let endpoint = SubscribeEnvelopes::default();
64        assert_eq!(
65            endpoint.grpc_endpoint(),
66            "/xmtp.xmtpv4.message_api.ReplicationApi/SubscribeEnvelopes"
67        );
68    }
69
70    #[xmtp_common::test]
71    async fn test_subscribe_envelopes() {
72        use crate::d14n::SubscribeEnvelopes;
73
74        let client = XmtpdClient::create();
75        let client = client.build().unwrap();
76
77        let mut endpoint = SubscribeEnvelopes::builder()
78            .topics(vec![])
79            .last_seen(None)
80            .build()
81            .unwrap();
82        let rsp = endpoint
83            .subscribe(&client)
84            .await
85            .inspect_err(|e| tracing::info!("{:?}", e));
86        assert!(rsp.is_ok());
87    }
88}