1use backon::{BackoffBuilder, ExponentialBuilder, Retryable};
7use email_address::EmailAddress;
8use lettre::{
9 message::{
10 header::{self, ContentType},
11 Mailbox, Mailboxes,
12 },
13 transport::smtp::Error as SmtpError,
14 AsyncSmtpTransport, AsyncTransport, Message, Tokio1Executor,
15};
16use pulldown_cmark::{html, Options, Parser};
17use std::{collections::HashMap, error::Error as StdError, sync::Arc};
18
19use crate::{
20 models::TriggerTypeConfig,
21 services::notification::{template_formatter, NotificationError},
22 utils::{JitterSetting, RetryConfig},
23};
24
25#[derive(Debug)]
27pub struct EmailNotifier<T: AsyncTransport + Send + Sync> {
28 subject: String,
30 body_template: String,
32 client: Arc<T>,
34 sender: EmailAddress,
36 recipients: Vec<EmailAddress>,
38 retry_policy: RetryConfig,
40}
41
42#[derive(Clone, Debug, Hash, Eq, PartialEq)]
44pub struct SmtpConfig {
45 pub host: String,
46 pub port: u16,
47 pub username: String,
48 pub password: String,
49}
50
51#[derive(Clone)]
53pub struct EmailContent {
54 pub subject: String,
55 pub body_template: String,
56 pub sender: EmailAddress,
57 pub recipients: Vec<EmailAddress>,
58}
59
60impl<T: AsyncTransport + Send + Sync> EmailNotifier<T>
62where
63 T::Ok: Send + Sync,
64 T::Error: StdError + Send + Sync + 'static,
65{
66 pub fn with_transport(
76 email_content: EmailContent,
77 transport: T,
78 retry_policy: RetryConfig,
79 ) -> Self {
80 Self {
81 subject: email_content.subject,
82 body_template: email_content.body_template,
83 sender: email_content.sender,
84 recipients: email_content.recipients,
85 client: Arc::new(transport),
86 retry_policy,
87 }
88 }
89
90 pub async fn notify(&self, message: &str) -> Result<(), NotificationError> {
98 let recipients_str = self
99 .recipients
100 .iter()
101 .map(ToString::to_string)
102 .collect::<Vec<_>>()
103 .join(", ");
104
105 let mailboxes: Mailboxes = recipients_str.parse::<Mailboxes>().map_err(|e| {
106 NotificationError::notify_failed(
107 format!("Failed to parse recipients: {}", e),
108 Some(e.into()),
109 None,
110 )
111 })?;
112 let recipients_header: header::To = mailboxes.into();
113
114 let email = Message::builder()
115 .mailbox(recipients_header)
116 .from(self.sender.to_string().parse::<Mailbox>().map_err(|e| {
117 NotificationError::notify_failed(
118 format!("Failed to parse sender: {}", e),
119 Some(e.into()),
120 None,
121 )
122 })?)
123 .reply_to(self.sender.to_string().parse::<Mailbox>().map_err(|e| {
124 NotificationError::notify_failed(
125 format!("Failed to parse reply-to: {}", e),
126 Some(e.into()),
127 None,
128 )
129 })?)
130 .subject(&self.subject)
131 .header(ContentType::TEXT_HTML)
132 .body(message.to_owned())
133 .map_err(|e| {
134 NotificationError::notify_failed(
135 format!("Failed to build email message: {}", e),
136 Some(e.into()),
137 None,
138 )
139 })?;
140
141 let operation = || async {
142 self.client.send(email.clone()).await.map_err(|e| {
143 NotificationError::notify_failed(
144 format!("Failed to send email: {}", e),
145 Some(Box::new(e)),
146 None,
147 )
148 })?;
149
150 Ok(())
151 };
152
153 let backoff = ExponentialBuilder::default()
154 .with_min_delay(self.retry_policy.initial_backoff)
155 .with_max_delay(self.retry_policy.max_backoff);
156
157 let backoff_with_jitter = match self.retry_policy.jitter {
158 JitterSetting::Full => backoff.with_jitter(),
159 JitterSetting::None => backoff,
160 };
161
162 let should_retry = |e: &NotificationError| -> bool {
164 if let NotificationError::NotifyFailed(context) = e {
165 if let Some(source) = context.source() {
166 if let Some(smtp_error) = source.downcast_ref::<SmtpError>() {
167 return !smtp_error.is_permanent();
168 }
169 }
170 }
171 true
172 };
173
174 operation
175 .retry(
176 backoff_with_jitter
177 .build()
178 .take(self.retry_policy.max_retries as usize),
179 )
180 .when(should_retry)
181 .await
182 }
183}
184
185impl EmailNotifier<AsyncSmtpTransport<Tokio1Executor>> {
186 pub fn new(
195 smtp_client: Arc<AsyncSmtpTransport<Tokio1Executor>>,
196 email_content: EmailContent,
197 retry_policy: RetryConfig,
198 ) -> Result<Self, NotificationError> {
199 Ok(Self {
200 subject: email_content.subject,
201 body_template: email_content.body_template,
202 sender: email_content.sender,
203 recipients: email_content.recipients,
204 client: smtp_client,
205 retry_policy,
206 })
207 }
208
209 pub fn body_template(&self) -> &str {
211 &self.body_template
212 }
213
214 pub fn format_message(body_template: &str, variables: &HashMap<String, String>) -> String {
224 let formatted_message = template_formatter::format_template(body_template, variables);
225 Self::markdown_to_html(&formatted_message)
226 }
227
228 pub fn markdown_to_html(md: &str) -> String {
230 let opts = Options::all();
232 let parser = Parser::new_ext(md, opts);
233
234 let mut html_out = String::new();
235 html::push_html(&mut html_out, parser);
236 html_out
237 }
238
239 pub fn from_config(
247 config: &TriggerTypeConfig,
248 smtp_client: Arc<AsyncSmtpTransport<Tokio1Executor>>,
249 ) -> Result<Self, NotificationError> {
250 if let TriggerTypeConfig::Email {
251 message,
252 sender,
253 recipients,
254 retry_policy,
255 ..
256 } = config
257 {
258 let email_content = EmailContent {
259 subject: message.title.clone(),
260 body_template: message.body.clone(),
261 sender: sender.clone(),
262 recipients: recipients.clone(),
263 };
264
265 Self::new(smtp_client, email_content, retry_policy.clone())
266 } else {
267 Err(NotificationError::config_error(
268 format!("Invalid email configuration: {:?}", config),
269 None,
270 None,
271 ))
272 }
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use lettre::transport::{smtp::authentication::Credentials, stub::AsyncStubTransport};
279
280 use crate::{
281 models::{NotificationMessage, SecretString, SecretValue},
282 services::notification::pool::NotificationClientPool,
283 utils::RetryConfig,
284 };
285
286 use super::*;
287
288 fn create_test_email_content() -> EmailContent {
289 EmailContent {
290 subject: "Test Subject".to_string(),
291 body_template: "Hello ${name}, your balance is ${balance}".to_string(),
292 sender: "sender@test.com".parse().unwrap(),
293 recipients: vec!["recipient@test.com".parse().unwrap()],
294 }
295 }
296
297 fn create_test_notifier() -> EmailNotifier<AsyncSmtpTransport<Tokio1Executor>> {
298 let smtp_config = SmtpConfig {
299 host: "dummy.smtp.com".to_string(),
300 port: 465,
301 username: "test".to_string(),
302 password: "test".to_string(),
303 };
304
305 let client = AsyncSmtpTransport::<Tokio1Executor>::relay(&smtp_config.host)
306 .unwrap()
307 .port(smtp_config.port)
308 .credentials(Credentials::new(smtp_config.username, smtp_config.password))
309 .build();
310
311 let email_content = create_test_email_content();
312
313 EmailNotifier::new(Arc::new(client), email_content, RetryConfig::default()).unwrap()
314 }
315
316 fn create_test_email_config(port: Option<u16>) -> TriggerTypeConfig {
317 TriggerTypeConfig::Email {
318 host: "smtp.test.com".to_string(),
319 port,
320 username: SecretValue::Plain(SecretString::new("testuser".to_string())),
321 password: SecretValue::Plain(SecretString::new("testpass".to_string())),
322 message: NotificationMessage {
323 title: "Test Subject".to_string(),
324 body: "Hello ${name}".to_string(),
325 },
326 sender: "sender@test.com".parse().unwrap(),
327 recipients: vec!["recipient@test.com".parse().unwrap()],
328 retry_policy: RetryConfig::default(),
329 }
330 }
331
332 #[tokio::test]
337 async fn test_format_message_basic_substitution() {
338 let notifier = create_test_notifier();
339 let mut variables = HashMap::new();
340 variables.insert("name".to_string(), "Alice".to_string());
341 variables.insert("balance".to_string(), "100".to_string());
342
343 let result = EmailNotifier::format_message(notifier.body_template(), &variables);
344 let expected_result = "<p>Hello Alice, your balance is 100</p>\n";
345 assert_eq!(result, expected_result);
346 }
347
348 #[tokio::test]
349 async fn test_format_message_missing_variable() {
350 let notifier = create_test_notifier();
351 let mut variables = HashMap::new();
352 variables.insert("name".to_string(), "Bob".to_string());
353
354 let result = EmailNotifier::format_message(notifier.body_template(), &variables);
355 let expected_result = "<p>Hello Bob, your balance is ${balance}</p>\n";
356 assert_eq!(result, expected_result);
357 }
358
359 #[tokio::test]
360 async fn test_format_message_empty_variables() {
361 let notifier = create_test_notifier();
362 let variables = HashMap::new();
363
364 let result = EmailNotifier::format_message(notifier.body_template(), &variables);
365 let expected_result = "<p>Hello ${name}, your balance is ${balance}</p>\n";
366 assert_eq!(result, expected_result);
367 }
368
369 #[tokio::test]
370 async fn test_format_message_with_empty_values() {
371 let notifier = create_test_notifier();
372 let mut variables = HashMap::new();
373 variables.insert("name".to_string(), "".to_string());
374 variables.insert("balance".to_string(), "".to_string());
375
376 let result = EmailNotifier::format_message(notifier.body_template(), &variables);
377 let expected_result = "<p>Hello , your balance is</p>\n";
378 assert_eq!(result, expected_result);
379 }
380
381 #[tokio::test]
386 async fn test_from_config_valid_email_config() {
387 let config = create_test_email_config(Some(587));
388 let smtp_config = match &config {
389 TriggerTypeConfig::Email {
390 host,
391 port,
392 username,
393 password,
394 ..
395 } => SmtpConfig {
396 host: host.clone(),
397 port: port.unwrap_or(587),
398 username: username.to_string(),
399 password: password.to_string(),
400 },
401 _ => panic!("Expected Email config"),
402 };
403 let pool = NotificationClientPool::new();
404 let smtp_client = pool.get_or_create_smtp_client(&smtp_config).await.unwrap();
405 let notifier = EmailNotifier::from_config(&config, smtp_client);
406 assert!(notifier.is_ok());
407
408 let notifier = notifier.unwrap();
409 assert_eq!(notifier.subject, "Test Subject");
410 assert_eq!(notifier.body_template, "Hello ${name}");
411 assert_eq!(notifier.sender.to_string(), "sender@test.com");
412 assert_eq!(notifier.recipients.len(), 1);
413 assert_eq!(notifier.recipients[0].to_string(), "recipient@test.com");
414 }
415
416 #[tokio::test]
417 async fn test_from_config_invalid_type() {
418 let wrong_config = TriggerTypeConfig::Slack {
420 slack_url: SecretValue::Plain(SecretString::new(
421 "https://slack.com/api/chat.postMessage".to_string(),
422 )),
423 message: NotificationMessage {
424 title: "Test Slack".to_string(),
425 body: "Hello ${name}".to_string(),
426 },
427 retry_policy: RetryConfig::default(),
428 };
429
430 let smtp_config = SmtpConfig {
432 host: "dummy.smtp.com".to_string(),
433 port: 465,
434 username: "test".to_string(),
435 password: "test".to_string(),
436 };
437
438 let smtp_client = Arc::new(
439 AsyncSmtpTransport::<Tokio1Executor>::relay(&smtp_config.host)
440 .unwrap()
441 .port(smtp_config.port)
442 .credentials(Credentials::new(smtp_config.username, smtp_config.password))
443 .build(),
444 );
445
446 let result = EmailNotifier::from_config(&wrong_config, smtp_client);
447 assert!(result.is_err());
448 assert!(matches!(
449 result.unwrap_err(),
450 NotificationError::ConfigError(_)
451 ));
452 }
453
454 #[tokio::test]
455 async fn test_from_config_default_port() {
456 let config = create_test_email_config(None);
457 let smtp_config = match &config {
458 TriggerTypeConfig::Email {
459 host,
460 port,
461 username,
462 password,
463 ..
464 } => SmtpConfig {
465 host: host.clone(),
466 port: port.unwrap_or(587),
467 username: username.to_string(),
468 password: password.to_string(),
469 },
470 _ => panic!("Expected Email config"),
471 };
472 let pool = NotificationClientPool::new();
473 let smtp_client = pool.get_or_create_smtp_client(&smtp_config).await.unwrap();
474 let notifier = EmailNotifier::from_config(&config, smtp_client);
475 assert!(notifier.is_ok());
476 }
477
478 #[tokio::test]
482 async fn test_notify_succeeds_on_first_try() {
483 let transport = AsyncStubTransport::new_ok();
484 let notifier = EmailNotifier::with_transport(
485 create_test_email_content(),
486 transport.clone(),
487 RetryConfig::default(),
488 );
489
490 notifier.notify("test message").await.unwrap();
491 assert_eq!(transport.messages().await.len(), 1);
492 }
493
494 #[tokio::test]
495 async fn test_notify_fails_after_all_retries() {
496 let transport = AsyncStubTransport::new_error();
497 let retry_policy = RetryConfig::default();
498 let default_max_retries = retry_policy.max_retries as usize;
499 let notifier = EmailNotifier::with_transport(
500 create_test_email_content(),
501 transport.clone(),
502 retry_policy,
503 );
504
505 let result = notifier.notify("test message").await;
506 assert!(result.is_err());
507 assert_eq!(
508 transport.messages().await.len(),
509 1 + default_max_retries,
510 "Should be called 1 time + default max retries"
511 );
512 }
513}