openzeppelin_monitor/services/notification/
pool.rs

1use crate::services::blockchain::TransientErrorRetryStrategy;
2use crate::services::notification::SmtpConfig;
3use crate::utils::client_storage::ClientStorage;
4use crate::utils::{create_retryable_http_client, RetryConfig};
5use lettre::Tokio1Executor;
6use lettre::{transport::smtp::authentication::Credentials, AsyncSmtpTransport};
7use reqwest::Client as ReqwestClient;
8use reqwest_middleware::ClientWithMiddleware;
9use std::sync::Arc;
10use std::time::Duration;
11use thiserror::Error;
12
13#[derive(Debug, Error)]
14pub enum NotificationPoolError {
15	#[error("Failed to create HTTP client: {0}")]
16	HttpClientBuildError(String),
17
18	#[error("Failed to create SMTP client: {0}")]
19	SmtpClientBuildError(String),
20}
21
22/// Notification client pool that manages HTTP and SMTP clients for sending notifications.
23///
24/// Provides a thread-safe way to access and create HTTP and SMTP clients
25/// for sending notifications. It uses a `ClientStorage` to hold the clients,
26/// allowing for efficient reuse and management of HTTP and SMTP connections.
27pub struct NotificationClientPool {
28	http_clients: ClientStorage<ClientWithMiddleware>,
29	smtp_clients: ClientStorage<AsyncSmtpTransport<Tokio1Executor>>,
30}
31
32impl NotificationClientPool {
33	pub fn new() -> Self {
34		Self {
35			http_clients: ClientStorage::new(),
36			smtp_clients: ClientStorage::new(),
37		}
38	}
39
40	/// A private, generic method to handle the core logic of getting or creating a client.
41	async fn get_or_create_client<T, F>(
42		&self,
43		key: &str,
44		storage: &ClientStorage<T>,
45		create_fn: F,
46	) -> Result<Arc<T>, NotificationPoolError>
47	where
48		T: Send + Sync,
49		F: FnOnce() -> Result<T, NotificationPoolError>,
50	{
51		// 1. Fast path (read lock)
52		if let Some(client) = storage.clients.read().await.get(key) {
53			return Ok(client.clone());
54		}
55
56		// 2. Slow path (write lock)
57		let mut clients = storage.clients.write().await;
58		// 3. Double-check
59		if let Some(client) = clients.get(key) {
60			return Ok(client.clone());
61		}
62
63		// 4. Create and insert
64		let new_client = create_fn()?;
65		let arc_client = Arc::new(new_client);
66		clients.insert(key.to_string(), arc_client.clone());
67
68		Ok(arc_client)
69	}
70
71	/// Get or create an HTTP client with retry capabilities.
72	///
73	/// # Arguments
74	/// * `retry_policy` - Configuration for HTTP retry policy
75	/// # Returns
76	/// * `Result<Arc<ClientWithMiddleware>, NotificationPoolError>` - The HTTP client
77	///   wrapped in an `Arc` for shared ownership, or an error if client creation
78	///   fails.
79	pub async fn get_or_create_http_client(
80		&self,
81		retry_policy: &RetryConfig,
82	) -> Result<Arc<ClientWithMiddleware>, NotificationPoolError> {
83		let key = format!("{:?}", retry_policy);
84		self.get_or_create_client(&key, &self.http_clients, || {
85			let base_client = ReqwestClient::builder()
86				.pool_max_idle_per_host(10)
87				.pool_idle_timeout(Some(Duration::from_secs(90)))
88				.connect_timeout(Duration::from_secs(10))
89				.build()
90				.map_err(|e| NotificationPoolError::HttpClientBuildError(e.to_string()))?;
91
92			Ok(create_retryable_http_client(
93				retry_policy,
94				base_client,
95				Some(TransientErrorRetryStrategy),
96			))
97		})
98		.await
99	}
100
101	/// Get or create an SMTP client for sending emails.
102	/// # Arguments
103	/// * `smtp_config` - Configuration for the SMTP client, including host,
104	///   port, username, and password.
105	/// # Returns
106	/// * `Result<Arc<AsyncSmtpTransport<Tokio1Executor>>, NotificationPoolError>` - The SMTP client
107	///   wrapped in an `Arc` for shared ownership, or an error if client creation
108	///   fails.
109	pub async fn get_or_create_smtp_client(
110		&self,
111		smtp_config: &SmtpConfig,
112	) -> Result<Arc<AsyncSmtpTransport<Tokio1Executor>>, NotificationPoolError> {
113		let key = format!("{:?}", smtp_config);
114		self.get_or_create_client(&key, &self.smtp_clients, || {
115			let creds =
116				Credentials::new(smtp_config.username.clone(), smtp_config.password.clone());
117			Ok(
118				AsyncSmtpTransport::<Tokio1Executor>::relay(&smtp_config.host)
119					.map_err(|e| NotificationPoolError::SmtpClientBuildError(e.to_string()))?
120					.port(smtp_config.port)
121					.credentials(creds)
122					.build(),
123			)
124		})
125		.await
126	}
127
128	/// Get the number of active HTTP clients in the pool
129	#[cfg(test)]
130	pub async fn get_active_http_client_count(&self) -> usize {
131		self.http_clients.clients.read().await.len()
132	}
133
134	/// Get the number of active SMTP clients in the pool
135	#[cfg(test)]
136	pub async fn get_active_smtp_client_count(&self) -> usize {
137		self.smtp_clients.clients.read().await.len()
138	}
139}
140
141impl Default for NotificationClientPool {
142	fn default() -> Self {
143		Self::new()
144	}
145}
146
147#[cfg(test)]
148mod tests {
149	use super::*;
150
151	fn create_pool() -> NotificationClientPool {
152		NotificationClientPool::new()
153	}
154
155	#[tokio::test]
156	async fn test_pool_init_empty() {
157		let pool = create_pool();
158		let http_count = pool.get_active_http_client_count().await;
159		let smtp_count = pool.get_active_smtp_client_count().await;
160
161		assert_eq!(http_count, 0, "Pool should be empty initially");
162		assert_eq!(smtp_count, 0, "Pool should be empty initially");
163	}
164
165	#[tokio::test]
166	async fn test_pool_get_or_create_http_client() {
167		let pool = create_pool();
168		let retry_config = RetryConfig::default();
169		let client = pool.get_or_create_http_client(&retry_config).await;
170
171		assert!(
172			client.is_ok(),
173			"Should successfully create or get HTTP client"
174		);
175
176		assert_eq!(
177			pool.get_active_http_client_count().await,
178			1,
179			"Pool should have one active HTTP client"
180		);
181	}
182
183	#[tokio::test]
184	async fn test_pool_returns_same_client() {
185		let pool = create_pool();
186		let retry_config = RetryConfig::default();
187		let client1 = pool.get_or_create_http_client(&retry_config).await.unwrap();
188		let client2 = pool.get_or_create_http_client(&retry_config).await.unwrap();
189
190		assert!(
191			Arc::ptr_eq(&client1, &client2),
192			"Should return the same client instance"
193		);
194		assert_eq!(
195			pool.get_active_http_client_count().await,
196			1,
197			"Pool should still have one active HTTP client"
198		);
199	}
200
201	#[tokio::test]
202	async fn test_pool_concurrent_access() {
203		let pool = Arc::new(create_pool());
204		let retry_config = RetryConfig::default();
205
206		let num_tasks = 10;
207		let mut tasks = Vec::new();
208
209		for _ in 0..num_tasks {
210			let pool_clone = Arc::clone(&pool);
211			let retry_config = retry_config.clone();
212			tasks.push(tokio::spawn(async move {
213				let client = pool_clone.get_or_create_http_client(&retry_config).await;
214				assert!(
215					client.is_ok(),
216					"Should successfully create or get HTTP client"
217				);
218			}));
219		}
220
221		let results = futures::future::join_all(tasks).await;
222
223		for result in results {
224			assert!(result.is_ok(), "All tasks should complete successfully");
225		}
226	}
227
228	#[tokio::test]
229	async fn test_pool_default() {
230		let pool = NotificationClientPool::default();
231		let retry_config = RetryConfig::default();
232
233		assert_eq!(
234			pool.get_active_http_client_count().await,
235			0,
236			"Default pool should be empty initially"
237		);
238
239		assert_eq!(
240			pool.get_active_smtp_client_count().await,
241			0,
242			"Default pool should be empty initially"
243		);
244
245		let client = pool.get_or_create_http_client(&retry_config).await;
246
247		assert!(
248			client.is_ok(),
249			"Default pool should successfully create or get HTTP client"
250		);
251
252		assert_eq!(
253			pool.get_active_http_client_count().await,
254			1,
255			"Default pool should have one active HTTP client"
256		);
257	}
258
259	#[tokio::test]
260	async fn test_pool_returns_different_http_clients_for_different_configs() {
261		let pool = create_pool();
262
263		// Config 1 (default)
264		let retry_config_1 = RetryConfig::default();
265
266		// Config 2 (different retry count)
267		let retry_config_2 = RetryConfig {
268			max_retries: 5,
269			..Default::default()
270		};
271
272		// Get a client for each config
273		let client1 = pool
274			.get_or_create_http_client(&retry_config_1)
275			.await
276			.unwrap();
277		let client2 = pool
278			.get_or_create_http_client(&retry_config_2)
279			.await
280			.unwrap();
281
282		// Pointers should NOT be equal, as they are different clients
283		assert!(
284			!Arc::ptr_eq(&client1, &client2),
285			"Should return different client instances for different configurations"
286		);
287
288		// The pool should now contain two distinct clients
289		assert_eq!(
290			pool.get_active_http_client_count().await,
291			2,
292			"Pool should have two active HTTP clients"
293		);
294
295		// Getting the first client again should return the original one
296		let client1_again = pool
297			.get_or_create_http_client(&retry_config_1)
298			.await
299			.unwrap();
300		assert!(
301			Arc::ptr_eq(&client1, &client1_again),
302			"Should return the same client instance when called again with the same config"
303		);
304
305		// Pool size should still be 2
306		assert_eq!(
307			pool.get_active_http_client_count().await,
308			2,
309			"Pool should still have two active HTTP clients after getting an existing one"
310		);
311	}
312
313	#[tokio::test]
314	async fn test_pool_returns_different_smtp_clients_for_different_configs() {
315		let pool = create_pool();
316
317		// Config 1 (default)
318		let smtp_config_1 = SmtpConfig {
319			host: "smtp.example.com".to_string(),
320			port: 587,
321			username: "user1".to_string(),
322			password: "pass1".to_string(),
323		};
324
325		// Config 2 (different credentials)
326		let smtp_config_2 = SmtpConfig {
327			host: "smtp.example.com".to_string(),
328			port: 587,
329			username: "user2".to_string(),
330			password: "pass2".to_string(),
331		};
332
333		// Get a client for each config
334		let client1 = pool
335			.get_or_create_smtp_client(&smtp_config_1)
336			.await
337			.unwrap();
338		let client2 = pool
339			.get_or_create_smtp_client(&smtp_config_2)
340			.await
341			.unwrap();
342
343		// Pointers should NOT be equal, as they are different clients
344		assert!(
345			!Arc::ptr_eq(&client1, &client2),
346			"Should return different client instances for different configurations"
347		);
348
349		// The pool should now contain two distinct clients
350		assert_eq!(
351			pool.get_active_smtp_client_count().await,
352			2,
353			"Pool should have two active SMTP clients"
354		);
355
356		// Getting the first client again should return the original one
357		let client1_again = pool
358			.get_or_create_smtp_client(&smtp_config_1)
359			.await
360			.unwrap();
361
362		assert!(
363			Arc::ptr_eq(&client1, &client1_again),
364			"Should return the same client instance when called again with the same config"
365		);
366
367		assert_eq!(
368			pool.get_active_smtp_client_count().await,
369			2,
370			"Pool should still have two active SMTP clients after getting an existing one"
371		);
372	}
373}