openzeppelin_monitor/services/notification/
pool.rs1use crate::services::blockchain::TransientErrorRetryStrategy;
2use crate::services::notification::SmtpConfig;
3use crate::utils::client_storage::ClientStorage;
4use crate::utils::{create_retryable_http_client, RetryConfig};
5use lettre::Tokio1Executor;
6use lettre::{transport::smtp::authentication::Credentials, AsyncSmtpTransport};
7use reqwest::Client as ReqwestClient;
8use reqwest_middleware::ClientWithMiddleware;
9use std::sync::Arc;
10use std::time::Duration;
11use thiserror::Error;
12
13#[derive(Debug, Error)]
14pub enum NotificationPoolError {
15 #[error("Failed to create HTTP client: {0}")]
16 HttpClientBuildError(String),
17
18 #[error("Failed to create SMTP client: {0}")]
19 SmtpClientBuildError(String),
20}
21
22pub struct NotificationClientPool {
28 http_clients: ClientStorage<ClientWithMiddleware>,
29 smtp_clients: ClientStorage<AsyncSmtpTransport<Tokio1Executor>>,
30}
31
32impl NotificationClientPool {
33 pub fn new() -> Self {
34 Self {
35 http_clients: ClientStorage::new(),
36 smtp_clients: ClientStorage::new(),
37 }
38 }
39
40 async fn get_or_create_client<T, F>(
42 &self,
43 key: &str,
44 storage: &ClientStorage<T>,
45 create_fn: F,
46 ) -> Result<Arc<T>, NotificationPoolError>
47 where
48 T: Send + Sync,
49 F: FnOnce() -> Result<T, NotificationPoolError>,
50 {
51 if let Some(client) = storage.clients.read().await.get(key) {
53 return Ok(client.clone());
54 }
55
56 let mut clients = storage.clients.write().await;
58 if let Some(client) = clients.get(key) {
60 return Ok(client.clone());
61 }
62
63 let new_client = create_fn()?;
65 let arc_client = Arc::new(new_client);
66 clients.insert(key.to_string(), arc_client.clone());
67
68 Ok(arc_client)
69 }
70
71 pub async fn get_or_create_http_client(
80 &self,
81 retry_policy: &RetryConfig,
82 ) -> Result<Arc<ClientWithMiddleware>, NotificationPoolError> {
83 let key = format!("{:?}", retry_policy);
84 self.get_or_create_client(&key, &self.http_clients, || {
85 let base_client = ReqwestClient::builder()
86 .pool_max_idle_per_host(10)
87 .pool_idle_timeout(Some(Duration::from_secs(90)))
88 .connect_timeout(Duration::from_secs(10))
89 .build()
90 .map_err(|e| NotificationPoolError::HttpClientBuildError(e.to_string()))?;
91
92 Ok(create_retryable_http_client(
93 retry_policy,
94 base_client,
95 Some(TransientErrorRetryStrategy),
96 ))
97 })
98 .await
99 }
100
101 pub async fn get_or_create_smtp_client(
110 &self,
111 smtp_config: &SmtpConfig,
112 ) -> Result<Arc<AsyncSmtpTransport<Tokio1Executor>>, NotificationPoolError> {
113 let key = format!("{:?}", smtp_config);
114 self.get_or_create_client(&key, &self.smtp_clients, || {
115 let creds =
116 Credentials::new(smtp_config.username.clone(), smtp_config.password.clone());
117 Ok(
118 AsyncSmtpTransport::<Tokio1Executor>::relay(&smtp_config.host)
119 .map_err(|e| NotificationPoolError::SmtpClientBuildError(e.to_string()))?
120 .port(smtp_config.port)
121 .credentials(creds)
122 .build(),
123 )
124 })
125 .await
126 }
127
128 #[cfg(test)]
130 pub async fn get_active_http_client_count(&self) -> usize {
131 self.http_clients.clients.read().await.len()
132 }
133
134 #[cfg(test)]
136 pub async fn get_active_smtp_client_count(&self) -> usize {
137 self.smtp_clients.clients.read().await.len()
138 }
139}
140
141impl Default for NotificationClientPool {
142 fn default() -> Self {
143 Self::new()
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 fn create_pool() -> NotificationClientPool {
152 NotificationClientPool::new()
153 }
154
155 #[tokio::test]
156 async fn test_pool_init_empty() {
157 let pool = create_pool();
158 let http_count = pool.get_active_http_client_count().await;
159 let smtp_count = pool.get_active_smtp_client_count().await;
160
161 assert_eq!(http_count, 0, "Pool should be empty initially");
162 assert_eq!(smtp_count, 0, "Pool should be empty initially");
163 }
164
165 #[tokio::test]
166 async fn test_pool_get_or_create_http_client() {
167 let pool = create_pool();
168 let retry_config = RetryConfig::default();
169 let client = pool.get_or_create_http_client(&retry_config).await;
170
171 assert!(
172 client.is_ok(),
173 "Should successfully create or get HTTP client"
174 );
175
176 assert_eq!(
177 pool.get_active_http_client_count().await,
178 1,
179 "Pool should have one active HTTP client"
180 );
181 }
182
183 #[tokio::test]
184 async fn test_pool_returns_same_client() {
185 let pool = create_pool();
186 let retry_config = RetryConfig::default();
187 let client1 = pool.get_or_create_http_client(&retry_config).await.unwrap();
188 let client2 = pool.get_or_create_http_client(&retry_config).await.unwrap();
189
190 assert!(
191 Arc::ptr_eq(&client1, &client2),
192 "Should return the same client instance"
193 );
194 assert_eq!(
195 pool.get_active_http_client_count().await,
196 1,
197 "Pool should still have one active HTTP client"
198 );
199 }
200
201 #[tokio::test]
202 async fn test_pool_concurrent_access() {
203 let pool = Arc::new(create_pool());
204 let retry_config = RetryConfig::default();
205
206 let num_tasks = 10;
207 let mut tasks = Vec::new();
208
209 for _ in 0..num_tasks {
210 let pool_clone = Arc::clone(&pool);
211 let retry_config = retry_config.clone();
212 tasks.push(tokio::spawn(async move {
213 let client = pool_clone.get_or_create_http_client(&retry_config).await;
214 assert!(
215 client.is_ok(),
216 "Should successfully create or get HTTP client"
217 );
218 }));
219 }
220
221 let results = futures::future::join_all(tasks).await;
222
223 for result in results {
224 assert!(result.is_ok(), "All tasks should complete successfully");
225 }
226 }
227
228 #[tokio::test]
229 async fn test_pool_default() {
230 let pool = NotificationClientPool::default();
231 let retry_config = RetryConfig::default();
232
233 assert_eq!(
234 pool.get_active_http_client_count().await,
235 0,
236 "Default pool should be empty initially"
237 );
238
239 assert_eq!(
240 pool.get_active_smtp_client_count().await,
241 0,
242 "Default pool should be empty initially"
243 );
244
245 let client = pool.get_or_create_http_client(&retry_config).await;
246
247 assert!(
248 client.is_ok(),
249 "Default pool should successfully create or get HTTP client"
250 );
251
252 assert_eq!(
253 pool.get_active_http_client_count().await,
254 1,
255 "Default pool should have one active HTTP client"
256 );
257 }
258
259 #[tokio::test]
260 async fn test_pool_returns_different_http_clients_for_different_configs() {
261 let pool = create_pool();
262
263 let retry_config_1 = RetryConfig::default();
265
266 let retry_config_2 = RetryConfig {
268 max_retries: 5,
269 ..Default::default()
270 };
271
272 let client1 = pool
274 .get_or_create_http_client(&retry_config_1)
275 .await
276 .unwrap();
277 let client2 = pool
278 .get_or_create_http_client(&retry_config_2)
279 .await
280 .unwrap();
281
282 assert!(
284 !Arc::ptr_eq(&client1, &client2),
285 "Should return different client instances for different configurations"
286 );
287
288 assert_eq!(
290 pool.get_active_http_client_count().await,
291 2,
292 "Pool should have two active HTTP clients"
293 );
294
295 let client1_again = pool
297 .get_or_create_http_client(&retry_config_1)
298 .await
299 .unwrap();
300 assert!(
301 Arc::ptr_eq(&client1, &client1_again),
302 "Should return the same client instance when called again with the same config"
303 );
304
305 assert_eq!(
307 pool.get_active_http_client_count().await,
308 2,
309 "Pool should still have two active HTTP clients after getting an existing one"
310 );
311 }
312
313 #[tokio::test]
314 async fn test_pool_returns_different_smtp_clients_for_different_configs() {
315 let pool = create_pool();
316
317 let smtp_config_1 = SmtpConfig {
319 host: "smtp.example.com".to_string(),
320 port: 587,
321 username: "user1".to_string(),
322 password: "pass1".to_string(),
323 };
324
325 let smtp_config_2 = SmtpConfig {
327 host: "smtp.example.com".to_string(),
328 port: 587,
329 username: "user2".to_string(),
330 password: "pass2".to_string(),
331 };
332
333 let client1 = pool
335 .get_or_create_smtp_client(&smtp_config_1)
336 .await
337 .unwrap();
338 let client2 = pool
339 .get_or_create_smtp_client(&smtp_config_2)
340 .await
341 .unwrap();
342
343 assert!(
345 !Arc::ptr_eq(&client1, &client2),
346 "Should return different client instances for different configurations"
347 );
348
349 assert_eq!(
351 pool.get_active_smtp_client_count().await,
352 2,
353 "Pool should have two active SMTP clients"
354 );
355
356 let client1_again = pool
358 .get_or_create_smtp_client(&smtp_config_1)
359 .await
360 .unwrap();
361
362 assert!(
363 Arc::ptr_eq(&client1, &client1_again),
364 "Should return the same client instance when called again with the same config"
365 );
366
367 assert_eq!(
368 pool.get_active_smtp_client_count().await,
369 2,
370 "Pool should still have two active SMTP clients after getting an existing one"
371 );
372 }
373}