xmtp_api_d14n/queries/
api_stats.rs1use xmtp_proto::api::HasStats;
2use xmtp_proto::api_client::AggregateStats;
3use xmtp_proto::api_client::ApiStats;
4use xmtp_proto::api_client::IdentityStats;
5use xmtp_proto::api_client::XmtpMlsClient;
6use xmtp_proto::identity_v1;
7use xmtp_proto::mls_v1;
8use xmtp_proto::prelude::ApiBuilder;
9use xmtp_proto::prelude::XmtpIdentityClient;
10use xmtp_proto::prelude::XmtpMlsStreams;
11use xmtp_proto::types::InstallationId;
12use xmtp_proto::types::TopicCursor;
13use xmtp_proto::types::WelcomeMessage;
14use xmtp_proto::types::{GroupId, GroupMessage};
15
16use crate::protocol::XmtpQuery;
17
18#[derive(Clone)]
20pub struct TrackedStatsClient<C> {
21 inner: C,
22 stats: ApiStats,
23 identity_stats: IdentityStats,
24}
25
26impl<C> TrackedStatsClient<C> {
27 pub fn new(inner: C) -> Self {
28 Self {
29 inner,
30 stats: Default::default(),
31 identity_stats: Default::default(),
32 }
33 }
34}
35
36#[xmtp_common::async_trait]
37impl<C> XmtpMlsClient for TrackedStatsClient<C>
38where
39 C: XmtpMlsClient,
40{
41 type Error = <C as XmtpMlsClient>::Error;
42
43 async fn upload_key_package(
44 &self,
45 request: mls_v1::UploadKeyPackageRequest,
46 ) -> Result<(), Self::Error> {
47 self.stats.upload_key_package.count_request();
48 self.inner.upload_key_package(request).await
49 }
50
51 async fn fetch_key_packages(
52 &self,
53 request: mls_v1::FetchKeyPackagesRequest,
54 ) -> Result<mls_v1::FetchKeyPackagesResponse, Self::Error> {
55 self.stats.fetch_key_package.count_request();
56 self.inner.fetch_key_packages(request).await
57 }
58
59 async fn send_group_messages(
60 &self,
61 request: mls_v1::SendGroupMessagesRequest,
62 ) -> Result<(), Self::Error> {
63 self.stats.send_group_messages.count_request();
64 self.inner.send_group_messages(request).await
65 }
66
67 async fn send_welcome_messages(
68 &self,
69 request: mls_v1::SendWelcomeMessagesRequest,
70 ) -> Result<(), Self::Error> {
71 self.stats.send_welcome_messages.count_request();
72 self.inner.send_welcome_messages(request).await
73 }
74 async fn query_group_messages(
75 &self,
76 group_id: GroupId,
77 ) -> Result<Vec<GroupMessage>, Self::Error> {
78 self.stats.query_group_messages.count_request();
79 self.inner.query_group_messages(group_id).await
80 }
81
82 async fn query_latest_group_message(
83 &self,
84 group_id: GroupId,
85 ) -> Result<Option<GroupMessage>, Self::Error> {
86 self.stats.query_group_messages.count_request();
87 self.inner.query_latest_group_message(group_id).await
88 }
89
90 async fn query_welcome_messages(
91 &self,
92 installation_key: InstallationId,
93 ) -> Result<Vec<WelcomeMessage>, Self::Error> {
94 self.stats.query_welcome_messages.count_request();
95 self.inner.query_welcome_messages(installation_key).await
96 }
97
98 async fn publish_commit_log(
99 &self,
100 request: mls_v1::BatchPublishCommitLogRequest,
101 ) -> Result<(), Self::Error> {
102 self.stats.publish_commit_log.count_request();
103 self.inner.publish_commit_log(request).await
104 }
105
106 async fn query_commit_log(
107 &self,
108 request: mls_v1::BatchQueryCommitLogRequest,
109 ) -> Result<mls_v1::BatchQueryCommitLogResponse, Self::Error> {
110 self.stats.query_commit_log.count_request();
111 self.inner.query_commit_log(request).await
112 }
113
114 async fn get_newest_group_message(
115 &self,
116 request: mls_v1::GetNewestGroupMessageRequest,
117 ) -> Result<Vec<Option<xmtp_proto::types::GroupMessageMetadata>>, Self::Error> {
118 self.stats.get_newest_group_message.count_request();
119 self.inner.get_newest_group_message(request).await
120 }
121}
122
123#[xmtp_common::async_trait]
124impl<C> XmtpIdentityClient for TrackedStatsClient<C>
125where
126 C: XmtpIdentityClient,
127{
128 type Error = <C as XmtpIdentityClient>::Error;
129
130 async fn publish_identity_update(
131 &self,
132 request: identity_v1::PublishIdentityUpdateRequest,
133 ) -> Result<identity_v1::PublishIdentityUpdateResponse, Self::Error> {
134 self.identity_stats.publish_identity_update.count_request();
135 self.inner.publish_identity_update(request).await
136 }
137
138 async fn get_identity_updates_v2(
139 &self,
140 request: identity_v1::GetIdentityUpdatesRequest,
141 ) -> Result<identity_v1::GetIdentityUpdatesResponse, Self::Error> {
142 self.identity_stats.get_identity_updates_v2.count_request();
143 self.inner.get_identity_updates_v2(request).await
144 }
145
146 async fn get_inbox_ids(
147 &self,
148 request: identity_v1::GetInboxIdsRequest,
149 ) -> Result<identity_v1::GetInboxIdsResponse, Self::Error> {
150 self.identity_stats.get_inbox_ids.count_request();
151 self.inner.get_inbox_ids(request).await
152 }
153
154 async fn verify_smart_contract_wallet_signatures(
155 &self,
156 request: identity_v1::VerifySmartContractWalletSignaturesRequest,
157 ) -> Result<identity_v1::VerifySmartContractWalletSignaturesResponse, Self::Error> {
158 self.identity_stats
159 .verify_smart_contract_wallet_signature
160 .count_request();
161 self.inner
162 .verify_smart_contract_wallet_signatures(request)
163 .await
164 }
165}
166
167#[xmtp_common::async_trait]
168impl<C> XmtpMlsStreams for TrackedStatsClient<C>
169where
170 C: XmtpMlsStreams,
171{
172 type GroupMessageStream = <C as XmtpMlsStreams>::GroupMessageStream;
173 type WelcomeMessageStream = <C as XmtpMlsStreams>::WelcomeMessageStream;
174 type Error = <C as XmtpMlsStreams>::Error;
175
176 async fn subscribe_group_messages(
177 &self,
178 group_ids: &[&GroupId],
179 ) -> Result<Self::GroupMessageStream, Self::Error> {
180 self.stats.subscribe_messages.count_request();
181 self.inner.subscribe_group_messages(group_ids).await
182 }
183
184 async fn subscribe_group_messages_with_cursors(
185 &self,
186 groups_with_cursors: &TopicCursor,
187 ) -> Result<Self::GroupMessageStream, Self::Error> {
188 self.stats.subscribe_messages.count_request();
189 self.inner
190 .subscribe_group_messages_with_cursors(groups_with_cursors)
191 .await
192 }
193
194 async fn subscribe_welcome_messages(
195 &self,
196 installations: &[&InstallationId],
197 ) -> Result<Self::WelcomeMessageStream, Self::Error> {
198 self.stats.subscribe_welcomes.count_request();
199 self.inner.subscribe_welcome_messages(installations).await
200 }
201}
202
203impl<C> HasStats for TrackedStatsClient<C> {
204 fn aggregate_stats(&self) -> AggregateStats {
205 AggregateStats {
206 identity: self.identity_stats.clone(),
207 mls: self.stats.clone(),
208 }
209 }
210
211 fn mls_stats(&self) -> ApiStats {
212 self.stats.clone()
213 }
214
215 fn identity_stats(&self) -> IdentityStats {
216 self.identity_stats.clone()
217 }
218}
219
220#[derive(Clone)]
221pub struct StatsBuilder<Builder> {
222 client: Builder,
223}
224
225impl<Builder> StatsBuilder<Builder> {
226 pub fn new(client: Builder) -> Self {
227 Self { client }
228 }
229}
230
231impl<C> TrackedStatsClient<C> {
232 pub fn builder<T: Default>() -> StatsBuilder<T> {
233 StatsBuilder::new(T::default())
234 }
235}
236
237impl<Builder> ApiBuilder for StatsBuilder<Builder>
238where
239 Builder: ApiBuilder,
240{
241 type Output = TrackedStatsClient<<Builder as ApiBuilder>::Output>;
242
243 type Error = <Builder as ApiBuilder>::Error;
244
245 fn build(self) -> Result<Self::Output, Self::Error> {
246 Ok(TrackedStatsClient::new(<Builder as ApiBuilder>::build(
247 self.client,
248 )?))
249 }
250}
251
252#[xmtp_common::async_trait]
253impl<C: XmtpQuery> XmtpQuery for TrackedStatsClient<C> {
254 type Error = <C as XmtpQuery>::Error;
255
256 async fn query_at(
257 &self,
258 topic: xmtp_proto::types::Topic,
259 at: Option<xmtp_proto::types::GlobalCursor>,
260 ) -> Result<crate::protocol::XmtpEnvelope, Self::Error> {
261 <C as XmtpQuery>::query_at(&self.inner, topic, at).await
262 }
263}
264
265#[cfg(any(test, feature = "test-utils"))]
266mod tests {
267 #![allow(clippy::unwrap_used)]
268 use crate::XmtpTestClientExt;
269
270 use super::*;
271 use xmtp_proto::api_client::ToxicProxies;
272 use xmtp_proto::api_client::ToxicTestClient;
273 use xmtp_proto::prelude::XmtpTestClient;
274
275 impl<C> XmtpTestClientExt for TrackedStatsClient<C>
276 where
277 C: XmtpTestClientExt,
278 {
279 fn with_cursor_store(
280 store: std::sync::Arc<dyn crate::protocol::CursorStore>,
281 ) -> <Self as XmtpTestClient>::Builder {
282 StatsBuilder {
283 client: <C as XmtpTestClientExt>::with_cursor_store(store),
284 }
285 }
286 }
287 impl<C> XmtpTestClient for TrackedStatsClient<C>
288 where
289 C: XmtpTestClient,
290 {
291 type Builder = StatsBuilder<C::Builder>;
292 fn create() -> Self::Builder {
293 StatsBuilder::new(<C as XmtpTestClient>::create())
294 }
295 }
296
297 #[xmtp_common::async_trait]
298 impl<C> ToxicTestClient for TrackedStatsClient<C>
299 where
300 C: ToxicTestClient,
301 {
302 async fn proxies() -> ToxicProxies {
303 <C as ToxicTestClient>::proxies().await
304 }
305 }
306}