xmtp_api_d14n/queries/
api_stats.rs

1use xmtp_proto::api::HasStats;
2use xmtp_proto::api_client::AggregateStats;
3use xmtp_proto::api_client::ApiStats;
4use xmtp_proto::api_client::IdentityStats;
5use xmtp_proto::api_client::XmtpMlsClient;
6use xmtp_proto::identity_v1;
7use xmtp_proto::mls_v1;
8use xmtp_proto::prelude::ApiBuilder;
9use xmtp_proto::prelude::XmtpIdentityClient;
10use xmtp_proto::prelude::XmtpMlsStreams;
11use xmtp_proto::types::InstallationId;
12use xmtp_proto::types::TopicCursor;
13use xmtp_proto::types::WelcomeMessage;
14use xmtp_proto::types::{GroupId, GroupMessage};
15
16use crate::protocol::XmtpQuery;
17
18/// Wraps an ApiClient that tracks stats of each api call
19#[derive(Clone)]
20pub struct TrackedStatsClient<C> {
21    inner: C,
22    stats: ApiStats,
23    identity_stats: IdentityStats,
24}
25
26impl<C> TrackedStatsClient<C> {
27    pub fn new(inner: C) -> Self {
28        Self {
29            inner,
30            stats: Default::default(),
31            identity_stats: Default::default(),
32        }
33    }
34}
35
36#[xmtp_common::async_trait]
37impl<C> XmtpMlsClient for TrackedStatsClient<C>
38where
39    C: XmtpMlsClient,
40{
41    type Error = <C as XmtpMlsClient>::Error;
42
43    async fn upload_key_package(
44        &self,
45        request: mls_v1::UploadKeyPackageRequest,
46    ) -> Result<(), Self::Error> {
47        self.stats.upload_key_package.count_request();
48        self.inner.upload_key_package(request).await
49    }
50
51    async fn fetch_key_packages(
52        &self,
53        request: mls_v1::FetchKeyPackagesRequest,
54    ) -> Result<mls_v1::FetchKeyPackagesResponse, Self::Error> {
55        self.stats.fetch_key_package.count_request();
56        self.inner.fetch_key_packages(request).await
57    }
58
59    async fn send_group_messages(
60        &self,
61        request: mls_v1::SendGroupMessagesRequest,
62    ) -> Result<(), Self::Error> {
63        self.stats.send_group_messages.count_request();
64        self.inner.send_group_messages(request).await
65    }
66
67    async fn send_welcome_messages(
68        &self,
69        request: mls_v1::SendWelcomeMessagesRequest,
70    ) -> Result<(), Self::Error> {
71        self.stats.send_welcome_messages.count_request();
72        self.inner.send_welcome_messages(request).await
73    }
74    async fn query_group_messages(
75        &self,
76        group_id: GroupId,
77    ) -> Result<Vec<GroupMessage>, Self::Error> {
78        self.stats.query_group_messages.count_request();
79        self.inner.query_group_messages(group_id).await
80    }
81
82    async fn query_latest_group_message(
83        &self,
84        group_id: GroupId,
85    ) -> Result<Option<GroupMessage>, Self::Error> {
86        self.stats.query_group_messages.count_request();
87        self.inner.query_latest_group_message(group_id).await
88    }
89
90    async fn query_welcome_messages(
91        &self,
92        installation_key: InstallationId,
93    ) -> Result<Vec<WelcomeMessage>, Self::Error> {
94        self.stats.query_welcome_messages.count_request();
95        self.inner.query_welcome_messages(installation_key).await
96    }
97
98    async fn publish_commit_log(
99        &self,
100        request: mls_v1::BatchPublishCommitLogRequest,
101    ) -> Result<(), Self::Error> {
102        self.stats.publish_commit_log.count_request();
103        self.inner.publish_commit_log(request).await
104    }
105
106    async fn query_commit_log(
107        &self,
108        request: mls_v1::BatchQueryCommitLogRequest,
109    ) -> Result<mls_v1::BatchQueryCommitLogResponse, Self::Error> {
110        self.stats.query_commit_log.count_request();
111        self.inner.query_commit_log(request).await
112    }
113
114    async fn get_newest_group_message(
115        &self,
116        request: mls_v1::GetNewestGroupMessageRequest,
117    ) -> Result<Vec<Option<xmtp_proto::types::GroupMessageMetadata>>, Self::Error> {
118        self.stats.get_newest_group_message.count_request();
119        self.inner.get_newest_group_message(request).await
120    }
121}
122
123#[xmtp_common::async_trait]
124impl<C> XmtpIdentityClient for TrackedStatsClient<C>
125where
126    C: XmtpIdentityClient,
127{
128    type Error = <C as XmtpIdentityClient>::Error;
129
130    async fn publish_identity_update(
131        &self,
132        request: identity_v1::PublishIdentityUpdateRequest,
133    ) -> Result<identity_v1::PublishIdentityUpdateResponse, Self::Error> {
134        self.identity_stats.publish_identity_update.count_request();
135        self.inner.publish_identity_update(request).await
136    }
137
138    async fn get_identity_updates_v2(
139        &self,
140        request: identity_v1::GetIdentityUpdatesRequest,
141    ) -> Result<identity_v1::GetIdentityUpdatesResponse, Self::Error> {
142        self.identity_stats.get_identity_updates_v2.count_request();
143        self.inner.get_identity_updates_v2(request).await
144    }
145
146    async fn get_inbox_ids(
147        &self,
148        request: identity_v1::GetInboxIdsRequest,
149    ) -> Result<identity_v1::GetInboxIdsResponse, Self::Error> {
150        self.identity_stats.get_inbox_ids.count_request();
151        self.inner.get_inbox_ids(request).await
152    }
153
154    async fn verify_smart_contract_wallet_signatures(
155        &self,
156        request: identity_v1::VerifySmartContractWalletSignaturesRequest,
157    ) -> Result<identity_v1::VerifySmartContractWalletSignaturesResponse, Self::Error> {
158        self.identity_stats
159            .verify_smart_contract_wallet_signature
160            .count_request();
161        self.inner
162            .verify_smart_contract_wallet_signatures(request)
163            .await
164    }
165}
166
167#[xmtp_common::async_trait]
168impl<C> XmtpMlsStreams for TrackedStatsClient<C>
169where
170    C: XmtpMlsStreams,
171{
172    type GroupMessageStream = <C as XmtpMlsStreams>::GroupMessageStream;
173    type WelcomeMessageStream = <C as XmtpMlsStreams>::WelcomeMessageStream;
174    type Error = <C as XmtpMlsStreams>::Error;
175
176    async fn subscribe_group_messages(
177        &self,
178        group_ids: &[&GroupId],
179    ) -> Result<Self::GroupMessageStream, Self::Error> {
180        self.stats.subscribe_messages.count_request();
181        self.inner.subscribe_group_messages(group_ids).await
182    }
183
184    async fn subscribe_group_messages_with_cursors(
185        &self,
186        groups_with_cursors: &TopicCursor,
187    ) -> Result<Self::GroupMessageStream, Self::Error> {
188        self.stats.subscribe_messages.count_request();
189        self.inner
190            .subscribe_group_messages_with_cursors(groups_with_cursors)
191            .await
192    }
193
194    async fn subscribe_welcome_messages(
195        &self,
196        installations: &[&InstallationId],
197    ) -> Result<Self::WelcomeMessageStream, Self::Error> {
198        self.stats.subscribe_welcomes.count_request();
199        self.inner.subscribe_welcome_messages(installations).await
200    }
201}
202
203impl<C> HasStats for TrackedStatsClient<C> {
204    fn aggregate_stats(&self) -> AggregateStats {
205        AggregateStats {
206            identity: self.identity_stats.clone(),
207            mls: self.stats.clone(),
208        }
209    }
210
211    fn mls_stats(&self) -> ApiStats {
212        self.stats.clone()
213    }
214
215    fn identity_stats(&self) -> IdentityStats {
216        self.identity_stats.clone()
217    }
218}
219
220#[derive(Clone)]
221pub struct StatsBuilder<Builder> {
222    client: Builder,
223}
224
225impl<Builder> StatsBuilder<Builder> {
226    pub fn new(client: Builder) -> Self {
227        Self { client }
228    }
229}
230
231impl<C> TrackedStatsClient<C> {
232    pub fn builder<T: Default>() -> StatsBuilder<T> {
233        StatsBuilder::new(T::default())
234    }
235}
236
237impl<Builder> ApiBuilder for StatsBuilder<Builder>
238where
239    Builder: ApiBuilder,
240{
241    type Output = TrackedStatsClient<<Builder as ApiBuilder>::Output>;
242
243    type Error = <Builder as ApiBuilder>::Error;
244
245    fn build(self) -> Result<Self::Output, Self::Error> {
246        Ok(TrackedStatsClient::new(<Builder as ApiBuilder>::build(
247            self.client,
248        )?))
249    }
250}
251
252#[xmtp_common::async_trait]
253impl<C: XmtpQuery> XmtpQuery for TrackedStatsClient<C> {
254    type Error = <C as XmtpQuery>::Error;
255
256    async fn query_at(
257        &self,
258        topic: xmtp_proto::types::Topic,
259        at: Option<xmtp_proto::types::GlobalCursor>,
260    ) -> Result<crate::protocol::XmtpEnvelope, Self::Error> {
261        <C as XmtpQuery>::query_at(&self.inner, topic, at).await
262    }
263}
264
265#[cfg(any(test, feature = "test-utils"))]
266mod tests {
267    #![allow(clippy::unwrap_used)]
268    use crate::XmtpTestClientExt;
269
270    use super::*;
271    use xmtp_proto::api_client::ToxicProxies;
272    use xmtp_proto::api_client::ToxicTestClient;
273    use xmtp_proto::prelude::XmtpTestClient;
274
275    impl<C> XmtpTestClientExt for TrackedStatsClient<C>
276    where
277        C: XmtpTestClientExt,
278    {
279        fn with_cursor_store(
280            store: std::sync::Arc<dyn crate::protocol::CursorStore>,
281        ) -> <Self as XmtpTestClient>::Builder {
282            StatsBuilder {
283                client: <C as XmtpTestClientExt>::with_cursor_store(store),
284            }
285        }
286    }
287    impl<C> XmtpTestClient for TrackedStatsClient<C>
288    where
289        C: XmtpTestClient,
290    {
291        type Builder = StatsBuilder<C::Builder>;
292        fn create() -> Self::Builder {
293            StatsBuilder::new(<C as XmtpTestClient>::create())
294        }
295    }
296
297    #[xmtp_common::async_trait]
298    impl<C> ToxicTestClient for TrackedStatsClient<C>
299    where
300        C: ToxicTestClient,
301    {
302        async fn proxies() -> ToxicProxies {
303            <C as ToxicTestClient>::proxies().await
304        }
305    }
306}