xmtp_api_d14n/endpoints/d14n/
subscribe_envelopes.rs1use derive_builder::Builder;
2use prost::Message;
3use prost::bytes::Bytes;
4use std::borrow::Cow;
5use xmtp_proto::api::{BodyError, Endpoint};
6use xmtp_proto::types::{GlobalCursor, OriginatorId, Topic};
7use xmtp_proto::xmtp::xmtpv4::message_api::SubscribeEnvelopesRequest;
8use xmtp_proto::xmtp::xmtpv4::message_api::{EnvelopesQuery, SubscribeEnvelopesResponse};
9
10#[derive(Debug, Builder, Default, Clone)]
12#[builder(build_fn(error = "BodyError"))]
13pub struct SubscribeEnvelopes {
14 #[builder(setter(each(name = "topic", into)), default)]
15 topics: Vec<Topic>,
16 #[builder(setter(into), default)]
17 last_seen: Option<GlobalCursor>,
18 #[builder(default)]
19 originators: Vec<OriginatorId>,
20}
21
22impl SubscribeEnvelopes {
23 pub fn builder() -> SubscribeEnvelopesBuilder {
24 Default::default()
25 }
26}
27
28impl Endpoint for SubscribeEnvelopes {
29 type Output = SubscribeEnvelopesResponse;
30 fn grpc_endpoint(&self) -> Cow<'static, str> {
31 xmtp_proto::path_and_query::<SubscribeEnvelopesRequest>()
32 }
33
34 fn body(&self) -> Result<Bytes, BodyError> {
35 for topic in &self.topics {
36 tracing::info!("subscribing to {}", topic.clone());
37 }
38 let query = EnvelopesQuery {
39 topics: self.topics.iter().map(Topic::cloned_vec).collect(),
40 last_seen: self.last_seen.clone().map(Into::into),
41 originator_node_ids: self.originators.clone(),
42 };
43 let query = SubscribeEnvelopesRequest { query: Some(query) };
44 Ok(query.encode_to_vec().into())
45 }
46}
47
48#[cfg(test)]
49mod test {
50 use super::*;
51 use xmtp_api_grpc::test::XmtpdClient;
52 use xmtp_proto::{api::QueryStreamExt as _, prelude::*};
53
54 #[xmtp_common::test]
55 fn test_file_descriptor() {
56 use xmtp_proto::xmtp::xmtpv4::message_api::SubscribeEnvelopesRequest;
57 let pnq = xmtp_proto::path_and_query::<SubscribeEnvelopesRequest>();
58 println!("{}", pnq);
59 }
60
61 #[xmtp_common::test]
62 fn test_grpc_endpoint_returns_correct_path() {
63 let endpoint = SubscribeEnvelopes::default();
64 assert_eq!(
65 endpoint.grpc_endpoint(),
66 "/xmtp.xmtpv4.message_api.ReplicationApi/SubscribeEnvelopes"
67 );
68 }
69
70 #[xmtp_common::test]
71 async fn test_subscribe_envelopes() {
72 use crate::d14n::SubscribeEnvelopes;
73
74 let client = XmtpdClient::create();
75 let client = client.build().unwrap();
76
77 let mut endpoint = SubscribeEnvelopes::builder()
78 .topics(vec![])
79 .last_seen(None)
80 .build()
81 .unwrap();
82 let rsp = endpoint
83 .subscribe(&client)
84 .await
85 .inspect_err(|e| tracing::info!("{:?}", e));
86 assert!(rsp.is_ok());
87 }
88}