1use async_trait::async_trait;
7
8use std::{collections::HashMap, sync::Arc};
9
10mod email;
11mod error;
12pub mod payload_builder;
13mod pool;
14mod script;
15mod template_formatter;
16mod webhook;
17
18use crate::{
19 models::{
20 MonitorMatch, NotificationMessage, ScriptLanguage, Trigger, TriggerType, TriggerTypeConfig,
21 },
22 utils::{normalize_string, RetryConfig},
23};
24
25pub use email::{EmailContent, EmailNotifier, SmtpConfig};
26pub use error::NotificationError;
27pub use payload_builder::{
28 DiscordPayloadBuilder, GenericWebhookPayloadBuilder, SlackPayloadBuilder,
29 TelegramPayloadBuilder, WebhookPayloadBuilder,
30};
31pub use pool::NotificationClientPool;
32pub use script::ScriptNotifier;
33pub use webhook::{WebhookConfig, WebhookNotifier};
34
35struct WebhookComponents {
37 config: WebhookConfig,
38 retry_policy: RetryConfig,
39 builder: Box<dyn WebhookPayloadBuilder>,
40}
41
42type WebhookParts = (
44 String, NotificationMessage, Option<String>, Option<String>, Option<HashMap<String, String>>, Box<dyn WebhookPayloadBuilder>, );
51
52trait AsWebhookComponents {
55 fn as_webhook_components(&self) -> Result<WebhookComponents, NotificationError>;
59}
60
61impl AsWebhookComponents for TriggerTypeConfig {
62 fn as_webhook_components(&self) -> Result<WebhookComponents, NotificationError> {
63 let (url, message, method, secret, headers, builder): WebhookParts = match self {
64 TriggerTypeConfig::Webhook {
65 url,
66 message,
67 method,
68 secret,
69 headers,
70 ..
71 } => (
72 url.as_ref().to_string(),
73 message.clone(),
74 method.clone(),
75 secret.as_ref().map(|s| s.as_ref().to_string()),
76 headers.clone(),
77 Box::new(GenericWebhookPayloadBuilder),
78 ),
79 TriggerTypeConfig::Discord {
80 discord_url,
81 message,
82 ..
83 } => (
84 discord_url.as_ref().to_string(),
85 message.clone(),
86 Some("POST".to_string()),
87 None,
88 None,
89 Box::new(DiscordPayloadBuilder),
90 ),
91 TriggerTypeConfig::Telegram {
92 token,
93 message,
94 chat_id,
95 disable_web_preview,
96 ..
97 } => (
98 format!("https://api.telegram.org/bot{}/sendMessage", token),
99 message.clone(),
100 Some("POST".to_string()),
101 None,
102 None,
103 Box::new(TelegramPayloadBuilder {
104 chat_id: chat_id.clone(),
105 disable_web_preview: disable_web_preview.unwrap_or(false),
106 }),
107 ),
108 TriggerTypeConfig::Slack {
109 slack_url, message, ..
110 } => (
111 slack_url.as_ref().to_string(),
112 message.clone(),
113 Some("POST".to_string()),
114 None,
115 None,
116 Box::new(SlackPayloadBuilder),
117 ),
118 _ => {
119 return Err(NotificationError::config_error(
120 format!("Trigger type is not webhook-compatible: {:?}", self),
121 None,
122 None,
123 ))
124 }
125 };
126
127 let config = WebhookConfig {
129 url,
130 title: message.title,
131 body_template: message.body,
132 method,
133 secret,
134 headers,
135 url_params: None,
136 payload_fields: None,
137 };
138
139 let retry_policy = self.get_retry_policy().ok_or_else(|| {
141 NotificationError::config_error(
142 "Webhook trigger config is unexpectedly missing a retry policy.",
143 None,
144 None,
145 )
146 })?;
147
148 Ok(WebhookComponents {
149 config,
150 retry_policy,
151 builder,
152 })
153 }
154}
155
156#[async_trait]
161pub trait ScriptExecutor {
162 async fn script_notify(
171 &self,
172 monitor_match: &MonitorMatch,
173 script_content: &(ScriptLanguage, String),
174 ) -> Result<(), NotificationError>;
175}
176
177pub struct NotificationService {
179 client_pool: Arc<NotificationClientPool>,
181}
182
183impl NotificationService {
184 pub fn new() -> Self {
186 NotificationService {
187 client_pool: Arc::new(NotificationClientPool::new()),
188 }
189 }
190
191 pub async fn execute(
203 &self,
204 trigger: &Trigger,
205 variables: &HashMap<String, String>,
206 monitor_match: &MonitorMatch,
207 trigger_scripts: &HashMap<String, (ScriptLanguage, String)>,
208 ) -> Result<(), NotificationError> {
209 match &trigger.trigger_type {
210 TriggerType::Slack
212 | TriggerType::Discord
213 | TriggerType::Webhook
214 | TriggerType::Telegram => {
215 let components = trigger.config.as_webhook_components()?;
217
218 let http_client = self
220 .client_pool
221 .get_or_create_http_client(&components.retry_policy)
222 .await
223 .map_err(|e| {
224 NotificationError::execution_error(
225 "Failed to get or create HTTP client from pool".to_string(),
226 Some(e.into()),
227 None,
228 )
229 })?;
230
231 let payload = components.builder.build_payload(
233 &components.config.title,
234 &components.config.body_template,
235 variables,
236 );
237
238 let notifier = WebhookNotifier::new(components.config, http_client)?;
240
241 notifier.notify_json(&payload).await?;
242 }
243 TriggerType::Email => {
244 let smtp_config = match &trigger.config {
246 TriggerTypeConfig::Email {
247 host,
248 port,
249 username,
250 password,
251 ..
252 } => SmtpConfig {
253 host: host.clone(),
254 port: port.unwrap_or(465),
255 username: username.as_ref().to_string(),
256 password: password.as_ref().to_string(),
257 },
258 _ => {
259 return Err(NotificationError::config_error(
260 "Invalid email configuration".to_string(),
261 None,
262 None,
263 ));
264 }
265 };
266
267 let smtp_client = self
269 .client_pool
270 .get_or_create_smtp_client(&smtp_config)
271 .await
272 .map_err(|e| {
273 NotificationError::execution_error(
274 "Failed to get SMTP client from pool".to_string(),
275 Some(e.into()),
276 None,
277 )
278 })?;
279
280 let notifier = EmailNotifier::from_config(&trigger.config, smtp_client)?;
281 let message = EmailNotifier::format_message(notifier.body_template(), variables);
282 notifier.notify(&message).await?;
283 }
284 TriggerType::Script => {
285 let notifier = ScriptNotifier::from_config(&trigger.config)?;
286 let monitor_name = match monitor_match {
287 MonitorMatch::EVM(evm_match) => &evm_match.monitor.name,
288 MonitorMatch::Stellar(stellar_match) => &stellar_match.monitor.name,
289 };
290 let script_path = match &trigger.config {
291 TriggerTypeConfig::Script { script_path, .. } => script_path,
292 _ => {
293 return Err(NotificationError::config_error(
294 "Invalid script configuration".to_string(),
295 None,
296 None,
297 ));
298 }
299 };
300 let script = trigger_scripts
301 .get(&format!(
302 "{}|{}",
303 normalize_string(monitor_name),
304 script_path
305 ))
306 .ok_or_else(|| {
307 NotificationError::config_error(
308 "Script content not found".to_string(),
309 None,
310 None,
311 )
312 });
313 let script_content = match &script {
314 Ok(content) => content,
315 Err(e) => {
316 return Err(NotificationError::config_error(e.to_string(), None, None));
317 }
318 };
319
320 notifier
321 .script_notify(monitor_match, script_content)
322 .await?;
323 }
324 }
325 Ok(())
326 }
327}
328
329impl Default for NotificationService {
330 fn default() -> Self {
331 Self::new()
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338 use crate::{
339 models::{
340 AddressWithSpec, EVMMonitorMatch, EVMTransactionReceipt, EventCondition,
341 FunctionCondition, MatchConditions, Monitor, MonitorMatch, NotificationMessage,
342 ScriptLanguage, SecretString, SecretValue, TransactionCondition, TriggerType,
343 },
344 utils::tests::{
345 builders::{evm::monitor::MonitorBuilder, trigger::TriggerBuilder},
346 evm::transaction::TransactionBuilder,
347 },
348 };
349 use std::collections::HashMap;
350
351 fn create_test_monitor(
352 event_conditions: Vec<EventCondition>,
353 function_conditions: Vec<FunctionCondition>,
354 transaction_conditions: Vec<TransactionCondition>,
355 addresses: Vec<AddressWithSpec>,
356 ) -> Monitor {
357 let mut builder = MonitorBuilder::new()
358 .name("test")
359 .networks(vec!["evm_mainnet".to_string()]);
360
361 for event in event_conditions {
363 builder = builder.event(&event.signature, event.expression);
364 }
365 for function in function_conditions {
366 builder = builder.function(&function.signature, function.expression);
367 }
368 for transaction in transaction_conditions {
369 builder = builder.transaction(transaction.status, transaction.expression);
370 }
371
372 for addr in addresses {
374 builder = builder.address(&addr.address);
375 }
376
377 builder.build()
378 }
379
380 fn create_mock_monitor_match() -> MonitorMatch {
381 MonitorMatch::EVM(Box::new(EVMMonitorMatch {
382 monitor: create_test_monitor(vec![], vec![], vec![], vec![]),
383 transaction: TransactionBuilder::new().build(),
384 receipt: Some(EVMTransactionReceipt::default()),
385 logs: Some(vec![]),
386 network_slug: "evm_mainnet".to_string(),
387 matched_on: MatchConditions {
388 functions: vec![],
389 events: vec![],
390 transactions: vec![],
391 },
392 matched_on_args: None,
393 }))
394 }
395
396 #[tokio::test]
397 async fn test_slack_notification_invalid_config() {
398 let service = NotificationService::new();
399
400 let trigger = TriggerBuilder::new()
401 .name("test_slack")
402 .script("invalid", ScriptLanguage::Python)
403 .trigger_type(TriggerType::Slack) .build();
405
406 let variables = HashMap::new();
407 let result = service
408 .execute(
409 &trigger,
410 &variables,
411 &create_mock_monitor_match(),
412 &HashMap::new(),
413 )
414 .await;
415 assert!(result.is_err());
416 match result {
417 Err(NotificationError::ConfigError(ctx)) => {
418 assert!(ctx
419 .message
420 .contains("Trigger type is not webhook-compatible"));
421 }
422 _ => panic!("Expected ConfigError"),
423 }
424 }
425
426 #[tokio::test]
427 async fn test_email_notification_invalid_config() {
428 let service = NotificationService::new();
429
430 let trigger = TriggerBuilder::new()
431 .name("test_email")
432 .script("invalid", ScriptLanguage::Python)
433 .trigger_type(TriggerType::Email) .build();
435
436 let variables = HashMap::new();
437 let result = service
438 .execute(
439 &trigger,
440 &variables,
441 &create_mock_monitor_match(),
442 &HashMap::new(),
443 )
444 .await;
445 assert!(result.is_err());
446 match result {
447 Err(NotificationError::ConfigError(ctx)) => {
448 assert!(ctx.message.contains("Invalid email configuration"));
449 }
450 _ => panic!("Expected ConfigError"),
451 }
452 }
453
454 #[tokio::test]
455 async fn test_webhook_notification_invalid_config() {
456 let service = NotificationService::new();
457
458 let trigger = TriggerBuilder::new()
459 .name("test_webhook")
460 .script("invalid", ScriptLanguage::Python)
461 .trigger_type(TriggerType::Webhook) .build();
463
464 let variables = HashMap::new();
465 let result = service
466 .execute(
467 &trigger,
468 &variables,
469 &create_mock_monitor_match(),
470 &HashMap::new(),
471 )
472 .await;
473 assert!(result.is_err());
474 match result {
475 Err(NotificationError::ConfigError(ctx)) => {
476 assert!(ctx
477 .message
478 .contains("Trigger type is not webhook-compatible"));
479 }
480 _ => panic!("Expected ConfigError"),
481 }
482 }
483
484 #[tokio::test]
485 async fn test_discord_notification_invalid_config() {
486 let service = NotificationService::new();
487
488 let trigger = TriggerBuilder::new()
489 .name("test_discord")
490 .script("invalid", ScriptLanguage::Python)
491 .trigger_type(TriggerType::Discord) .build();
493
494 let variables = HashMap::new();
495 let result = service
496 .execute(
497 &trigger,
498 &variables,
499 &create_mock_monitor_match(),
500 &HashMap::new(),
501 )
502 .await;
503 assert!(result.is_err());
504 match result {
505 Err(NotificationError::ConfigError(ctx)) => {
506 assert!(ctx
507 .message
508 .contains("Trigger type is not webhook-compatible"));
509 }
510 _ => panic!("Expected ConfigError"),
511 }
512 }
513
514 #[tokio::test]
515 async fn test_telegram_notification_invalid_config() {
516 let service = NotificationService::new();
517
518 let trigger = TriggerBuilder::new()
519 .name("test_telegram")
520 .script("invalid", ScriptLanguage::Python)
521 .trigger_type(TriggerType::Telegram) .build();
523
524 let variables = HashMap::new();
525 let result = service
526 .execute(
527 &trigger,
528 &variables,
529 &create_mock_monitor_match(),
530 &HashMap::new(),
531 )
532 .await;
533 assert!(result.is_err());
534 match result {
535 Err(NotificationError::ConfigError(ctx)) => {
536 assert!(ctx
537 .message
538 .contains("Trigger type is not webhook-compatible"));
539 }
540 _ => panic!("Expected ConfigError"),
541 }
542 }
543
544 #[tokio::test]
545 async fn test_script_notification_invalid_config() {
546 let service = NotificationService::new();
547
548 let trigger = TriggerBuilder::new()
549 .name("test_script")
550 .telegram("invalid", "invalid", false)
551 .trigger_type(TriggerType::Script) .build();
553
554 let variables = HashMap::new();
555
556 let result = service
557 .execute(
558 &trigger,
559 &variables,
560 &create_mock_monitor_match(),
561 &HashMap::new(),
562 )
563 .await;
564
565 assert!(result.is_err());
566 match result {
567 Err(NotificationError::ConfigError(ctx)) => {
568 assert!(ctx.message.contains("Invalid script configuration"));
569 }
570 _ => panic!("Expected ConfigError"),
571 }
572 }
573
574 #[test]
575 fn as_webhook_components_trait_for_slack_config() {
576 let title = "Slack Title";
577 let message = "Slack Body";
578
579 let slack_config = TriggerTypeConfig::Slack {
580 slack_url: SecretValue::Plain(SecretString::new(
581 "https://slack.example.com".to_string(),
582 )),
583 message: NotificationMessage {
584 title: title.to_string(),
585 body: message.to_string(),
586 },
587 retry_policy: RetryConfig::default(),
588 };
589
590 let components = slack_config.as_webhook_components().unwrap();
591
592 assert_eq!(components.config.url, "https://slack.example.com");
594 assert_eq!(components.config.title, title);
595 assert_eq!(components.config.body_template, message);
596 assert_eq!(components.config.method, Some("POST".to_string()));
597 assert!(components.config.secret.is_none());
598
599 let payload = components
601 .builder
602 .build_payload(title, message, &HashMap::new());
603 assert!(
604 payload.get("blocks").is_some(),
605 "Expected a Slack payload with 'blocks'"
606 );
607 assert!(
608 payload.get("content").is_none(),
609 "Did not expect a Discord payload"
610 );
611 }
612
613 #[test]
614 fn as_webhook_components_trait_for_discord_config() {
615 let title = "Discord Title";
616 let message = "Discord Body";
617 let discord_config = TriggerTypeConfig::Discord {
618 discord_url: SecretValue::Plain(SecretString::new(
619 "https://discord.example.com".to_string(),
620 )),
621 message: NotificationMessage {
622 title: title.to_string(),
623 body: message.to_string(),
624 },
625 retry_policy: RetryConfig::default(),
626 };
627
628 let components = discord_config.as_webhook_components().unwrap();
629
630 assert_eq!(components.config.url, "https://discord.example.com");
632 assert_eq!(components.config.title, title);
633 assert_eq!(components.config.body_template, message);
634 assert_eq!(components.config.method, Some("POST".to_string()));
635
636 let payload = components
638 .builder
639 .build_payload(title, message, &HashMap::new());
640 assert!(
641 payload.get("content").is_some(),
642 "Expected a Discord payload with 'content'"
643 );
644 assert!(
645 payload.get("blocks").is_none(),
646 "Did not expect a Slack payload"
647 );
648 }
649
650 #[test]
651 fn as_webhook_components_trait_for_telegram_config() {
652 let title = "Telegram Title";
653 let message = "Telegram Body";
654 let telegram_config = TriggerTypeConfig::Telegram {
655 token: SecretValue::Plain(SecretString::new("test-token".to_string())),
656 chat_id: "12345".to_string(),
657 disable_web_preview: Some(true),
658 message: NotificationMessage {
659 title: title.to_string(),
660 body: message.to_string(),
661 },
662 retry_policy: RetryConfig::default(),
663 };
664
665 let components = telegram_config.as_webhook_components().unwrap();
666
667 assert_eq!(
669 components.config.url,
670 "https://api.telegram.org/bottest-token/sendMessage"
671 );
672 assert_eq!(components.config.title, title);
673 assert_eq!(components.config.body_template, message);
674
675 let payload = components
677 .builder
678 .build_payload(title, message, &HashMap::new());
679 assert_eq!(payload.get("chat_id").unwrap(), "12345");
680 assert_eq!(payload.get("disable_web_page_preview").unwrap(), &true);
681 assert!(payload.get("text").is_some());
682 }
683
684 #[test]
685 fn as_webhook_components_trait_for_generic_webhook_config() {
686 let title = "Generic Title";
687 let body_template = "Generic Body";
688 let webhook_config = TriggerTypeConfig::Webhook {
689 url: SecretValue::Plain(SecretString::new("https://generic.example.com".to_string())),
690 message: NotificationMessage {
691 title: title.to_string(),
692 body: body_template.to_string(),
693 },
694 method: Some("PUT".to_string()),
695 secret: Some(SecretValue::Plain(SecretString::new(
696 "my-secret".to_string(),
697 ))),
698 headers: Some([("X-Custom".to_string(), "Value".to_string())].into()),
699 retry_policy: RetryConfig::default(),
700 };
701
702 let components = webhook_config.as_webhook_components().unwrap();
703
704 assert_eq!(components.config.url, "https://generic.example.com");
706 assert_eq!(components.config.method, Some("PUT".to_string()));
707 assert_eq!(components.config.secret, Some("my-secret".to_string()));
708 assert!(components.config.headers.is_some());
709 assert_eq!(
710 components.config.headers.unwrap().get("X-Custom").unwrap(),
711 "Value"
712 );
713
714 let payload = components
716 .builder
717 .build_payload(title, body_template, &HashMap::new());
718 assert!(payload.get("title").is_some());
719 assert!(payload.get("body").is_some());
720 }
721}