openzeppelin_monitor/services/notification/
mod.rs

1//! Notification service implementation.
2//!
3//! This module provides functionality to send notifications through various channels
4//! Supports variable substitution in message templates.
5
6use async_trait::async_trait;
7
8use std::{collections::HashMap, sync::Arc};
9
10mod email;
11mod error;
12pub mod payload_builder;
13mod pool;
14mod script;
15mod template_formatter;
16mod webhook;
17
18use crate::{
19	models::{
20		MonitorMatch, NotificationMessage, ScriptLanguage, Trigger, TriggerType, TriggerTypeConfig,
21	},
22	utils::{normalize_string, RetryConfig},
23};
24
25pub use email::{EmailContent, EmailNotifier, SmtpConfig};
26pub use error::NotificationError;
27pub use payload_builder::{
28	DiscordPayloadBuilder, GenericWebhookPayloadBuilder, SlackPayloadBuilder,
29	TelegramPayloadBuilder, WebhookPayloadBuilder,
30};
31pub use pool::NotificationClientPool;
32pub use script::ScriptNotifier;
33pub use webhook::{WebhookConfig, WebhookNotifier};
34
35/// A container for all components needed to configure and send a webhook notification.
36struct WebhookComponents {
37	config: WebhookConfig,
38	retry_policy: RetryConfig,
39	builder: Box<dyn WebhookPayloadBuilder>,
40}
41
42/// A type alias to simplify the complex tuple returned by the internal `match` statement.
43type WebhookParts = (
44	String,                          // url
45	NotificationMessage,             // message
46	Option<String>,                  // method
47	Option<String>,                  // secret
48	Option<HashMap<String, String>>, // headers
49	Box<dyn WebhookPayloadBuilder>,  // payload builder
50);
51
52/// A trait for trigger configurations that can be sent via webhook.
53/// This abstracts away the specific details of each webhook provider.
54trait AsWebhookComponents {
55	/// Consolidates the logic for creating webhook components from a trigger config.
56	/// It returns the generic `WebhookConfig`, RetryConfig and the specific `WebhookPayloadBuilder`
57	/// needed for the given trigger type.
58	fn as_webhook_components(&self) -> Result<WebhookComponents, NotificationError>;
59}
60
61impl AsWebhookComponents for TriggerTypeConfig {
62	fn as_webhook_components(&self) -> Result<WebhookComponents, NotificationError> {
63		let (url, message, method, secret, headers, builder): WebhookParts = match self {
64			TriggerTypeConfig::Webhook {
65				url,
66				message,
67				method,
68				secret,
69				headers,
70				..
71			} => (
72				url.as_ref().to_string(),
73				message.clone(),
74				method.clone(),
75				secret.as_ref().map(|s| s.as_ref().to_string()),
76				headers.clone(),
77				Box::new(GenericWebhookPayloadBuilder),
78			),
79			TriggerTypeConfig::Discord {
80				discord_url,
81				message,
82				..
83			} => (
84				discord_url.as_ref().to_string(),
85				message.clone(),
86				Some("POST".to_string()),
87				None,
88				None,
89				Box::new(DiscordPayloadBuilder),
90			),
91			TriggerTypeConfig::Telegram {
92				token,
93				message,
94				chat_id,
95				disable_web_preview,
96				..
97			} => (
98				format!("https://api.telegram.org/bot{}/sendMessage", token),
99				message.clone(),
100				Some("POST".to_string()),
101				None,
102				None,
103				Box::new(TelegramPayloadBuilder {
104					chat_id: chat_id.clone(),
105					disable_web_preview: disable_web_preview.unwrap_or(false),
106				}),
107			),
108			TriggerTypeConfig::Slack {
109				slack_url, message, ..
110			} => (
111				slack_url.as_ref().to_string(),
112				message.clone(),
113				Some("POST".to_string()),
114				None,
115				None,
116				Box::new(SlackPayloadBuilder),
117			),
118			_ => {
119				return Err(NotificationError::config_error(
120					format!("Trigger type is not webhook-compatible: {:?}", self),
121					None,
122					None,
123				))
124			}
125		};
126
127		// Construct the final WebhookConfig from the extracted parts.
128		let config = WebhookConfig {
129			url,
130			title: message.title,
131			body_template: message.body,
132			method,
133			secret,
134			headers,
135			url_params: None,
136			payload_fields: None,
137		};
138
139		// Use the retry policy from the trigger config
140		let retry_policy = self.get_retry_policy().ok_or_else(|| {
141			NotificationError::config_error(
142				"Webhook trigger config is unexpectedly missing a retry policy.",
143				None,
144				None,
145			)
146		})?;
147
148		Ok(WebhookComponents {
149			config,
150			retry_policy,
151			builder,
152		})
153	}
154}
155
156/// Interface for executing scripts
157///
158/// This Interface is used to execute scripts for notifications.
159/// It is implemented by the ScriptNotifier struct.
160#[async_trait]
161pub trait ScriptExecutor {
162	/// Executes a script to send a custom notifications
163	///
164	/// # Arguments
165	/// * `monitor_match` - The monitor match to send
166	/// * `script_content` - The script content to execute
167	///
168	/// # Returns
169	/// * `Result<(), NotificationError>` - Success or error
170	async fn script_notify(
171		&self,
172		monitor_match: &MonitorMatch,
173		script_content: &(ScriptLanguage, String),
174	) -> Result<(), NotificationError>;
175}
176
177/// Service for managing notifications across different channels
178pub struct NotificationService {
179	/// Client pool for managing notification clients (HTTP, SMTP)
180	client_pool: Arc<NotificationClientPool>,
181}
182
183impl NotificationService {
184	/// Creates a new notification service instance
185	pub fn new() -> Self {
186		NotificationService {
187			client_pool: Arc::new(NotificationClientPool::new()),
188		}
189	}
190
191	/// Executes a notification based on the trigger configuration
192	///
193	/// # Arguments
194	/// * `trigger` - Trigger containing the notification type and parameters
195	/// * `variables` - Variables to substitute in message templates
196	/// * `monitor_match` - Monitor match to send (needed for custom script trigger)
197	/// * `trigger_scripts` - Contains the script content to execute (needed for custom script
198	///   trigger)
199	///
200	/// # Returns
201	/// * `Result<(), NotificationError>` - Success or error
202	pub async fn execute(
203		&self,
204		trigger: &Trigger,
205		variables: &HashMap<String, String>,
206		monitor_match: &MonitorMatch,
207		trigger_scripts: &HashMap<String, (ScriptLanguage, String)>,
208	) -> Result<(), NotificationError> {
209		match &trigger.trigger_type {
210			// Match Webhook-based triggers
211			TriggerType::Slack
212			| TriggerType::Discord
213			| TriggerType::Webhook
214			| TriggerType::Telegram => {
215				// Use the Webhookable trait to get config, retry policy and payload builder
216				let components = trigger.config.as_webhook_components()?;
217
218				// Get or create the HTTP client from the pool based on the retry policy
219				let http_client = self
220					.client_pool
221					.get_or_create_http_client(&components.retry_policy)
222					.await
223					.map_err(|e| {
224						NotificationError::execution_error(
225							"Failed to get or create HTTP client from pool".to_string(),
226							Some(e.into()),
227							None,
228						)
229					})?;
230
231				// Build the payload
232				let payload = components.builder.build_payload(
233					&components.config.title,
234					&components.config.body_template,
235					variables,
236				);
237
238				// Create the notifier
239				let notifier = WebhookNotifier::new(components.config, http_client)?;
240
241				notifier.notify_json(&payload).await?;
242			}
243			TriggerType::Email => {
244				// Extract SMTP configuration from the trigger
245				let smtp_config = match &trigger.config {
246					TriggerTypeConfig::Email {
247						host,
248						port,
249						username,
250						password,
251						..
252					} => SmtpConfig {
253						host: host.clone(),
254						port: port.unwrap_or(465),
255						username: username.as_ref().to_string(),
256						password: password.as_ref().to_string(),
257					},
258					_ => {
259						return Err(NotificationError::config_error(
260							"Invalid email configuration".to_string(),
261							None,
262							None,
263						));
264					}
265				};
266
267				// Get or create the SMTP client from the pool
268				let smtp_client = self
269					.client_pool
270					.get_or_create_smtp_client(&smtp_config)
271					.await
272					.map_err(|e| {
273						NotificationError::execution_error(
274							"Failed to get SMTP client from pool".to_string(),
275							Some(e.into()),
276							None,
277						)
278					})?;
279
280				let notifier = EmailNotifier::from_config(&trigger.config, smtp_client)?;
281				let message = EmailNotifier::format_message(notifier.body_template(), variables);
282				notifier.notify(&message).await?;
283			}
284			TriggerType::Script => {
285				let notifier = ScriptNotifier::from_config(&trigger.config)?;
286				let monitor_name = match monitor_match {
287					MonitorMatch::EVM(evm_match) => &evm_match.monitor.name,
288					MonitorMatch::Stellar(stellar_match) => &stellar_match.monitor.name,
289				};
290				let script_path = match &trigger.config {
291					TriggerTypeConfig::Script { script_path, .. } => script_path,
292					_ => {
293						return Err(NotificationError::config_error(
294							"Invalid script configuration".to_string(),
295							None,
296							None,
297						));
298					}
299				};
300				let script = trigger_scripts
301					.get(&format!(
302						"{}|{}",
303						normalize_string(monitor_name),
304						script_path
305					))
306					.ok_or_else(|| {
307						NotificationError::config_error(
308							"Script content not found".to_string(),
309							None,
310							None,
311						)
312					});
313				let script_content = match &script {
314					Ok(content) => content,
315					Err(e) => {
316						return Err(NotificationError::config_error(e.to_string(), None, None));
317					}
318				};
319
320				notifier
321					.script_notify(monitor_match, script_content)
322					.await?;
323			}
324		}
325		Ok(())
326	}
327}
328
329impl Default for NotificationService {
330	fn default() -> Self {
331		Self::new()
332	}
333}
334
335#[cfg(test)]
336mod tests {
337	use super::*;
338	use crate::{
339		models::{
340			AddressWithSpec, EVMMonitorMatch, EVMTransactionReceipt, EventCondition,
341			FunctionCondition, MatchConditions, Monitor, MonitorMatch, NotificationMessage,
342			ScriptLanguage, SecretString, SecretValue, TransactionCondition, TriggerType,
343		},
344		utils::tests::{
345			builders::{evm::monitor::MonitorBuilder, trigger::TriggerBuilder},
346			evm::transaction::TransactionBuilder,
347		},
348	};
349	use std::collections::HashMap;
350
351	fn create_test_monitor(
352		event_conditions: Vec<EventCondition>,
353		function_conditions: Vec<FunctionCondition>,
354		transaction_conditions: Vec<TransactionCondition>,
355		addresses: Vec<AddressWithSpec>,
356	) -> Monitor {
357		let mut builder = MonitorBuilder::new()
358			.name("test")
359			.networks(vec!["evm_mainnet".to_string()]);
360
361		// Add all conditions
362		for event in event_conditions {
363			builder = builder.event(&event.signature, event.expression);
364		}
365		for function in function_conditions {
366			builder = builder.function(&function.signature, function.expression);
367		}
368		for transaction in transaction_conditions {
369			builder = builder.transaction(transaction.status, transaction.expression);
370		}
371
372		// Add addresses
373		for addr in addresses {
374			builder = builder.address(&addr.address);
375		}
376
377		builder.build()
378	}
379
380	fn create_mock_monitor_match() -> MonitorMatch {
381		MonitorMatch::EVM(Box::new(EVMMonitorMatch {
382			monitor: create_test_monitor(vec![], vec![], vec![], vec![]),
383			transaction: TransactionBuilder::new().build(),
384			receipt: Some(EVMTransactionReceipt::default()),
385			logs: Some(vec![]),
386			network_slug: "evm_mainnet".to_string(),
387			matched_on: MatchConditions {
388				functions: vec![],
389				events: vec![],
390				transactions: vec![],
391			},
392			matched_on_args: None,
393		}))
394	}
395
396	#[tokio::test]
397	async fn test_slack_notification_invalid_config() {
398		let service = NotificationService::new();
399
400		let trigger = TriggerBuilder::new()
401			.name("test_slack")
402			.script("invalid", ScriptLanguage::Python)
403			.trigger_type(TriggerType::Slack) // Intentionally wrong config type
404			.build();
405
406		let variables = HashMap::new();
407		let result = service
408			.execute(
409				&trigger,
410				&variables,
411				&create_mock_monitor_match(),
412				&HashMap::new(),
413			)
414			.await;
415		assert!(result.is_err());
416		match result {
417			Err(NotificationError::ConfigError(ctx)) => {
418				assert!(ctx
419					.message
420					.contains("Trigger type is not webhook-compatible"));
421			}
422			_ => panic!("Expected ConfigError"),
423		}
424	}
425
426	#[tokio::test]
427	async fn test_email_notification_invalid_config() {
428		let service = NotificationService::new();
429
430		let trigger = TriggerBuilder::new()
431			.name("test_email")
432			.script("invalid", ScriptLanguage::Python)
433			.trigger_type(TriggerType::Email) // Intentionally wrong config type
434			.build();
435
436		let variables = HashMap::new();
437		let result = service
438			.execute(
439				&trigger,
440				&variables,
441				&create_mock_monitor_match(),
442				&HashMap::new(),
443			)
444			.await;
445		assert!(result.is_err());
446		match result {
447			Err(NotificationError::ConfigError(ctx)) => {
448				assert!(ctx.message.contains("Invalid email configuration"));
449			}
450			_ => panic!("Expected ConfigError"),
451		}
452	}
453
454	#[tokio::test]
455	async fn test_webhook_notification_invalid_config() {
456		let service = NotificationService::new();
457
458		let trigger = TriggerBuilder::new()
459			.name("test_webhook")
460			.script("invalid", ScriptLanguage::Python)
461			.trigger_type(TriggerType::Webhook) // Intentionally wrong config type
462			.build();
463
464		let variables = HashMap::new();
465		let result = service
466			.execute(
467				&trigger,
468				&variables,
469				&create_mock_monitor_match(),
470				&HashMap::new(),
471			)
472			.await;
473		assert!(result.is_err());
474		match result {
475			Err(NotificationError::ConfigError(ctx)) => {
476				assert!(ctx
477					.message
478					.contains("Trigger type is not webhook-compatible"));
479			}
480			_ => panic!("Expected ConfigError"),
481		}
482	}
483
484	#[tokio::test]
485	async fn test_discord_notification_invalid_config() {
486		let service = NotificationService::new();
487
488		let trigger = TriggerBuilder::new()
489			.name("test_discord")
490			.script("invalid", ScriptLanguage::Python)
491			.trigger_type(TriggerType::Discord) // Intentionally wrong config type
492			.build();
493
494		let variables = HashMap::new();
495		let result = service
496			.execute(
497				&trigger,
498				&variables,
499				&create_mock_monitor_match(),
500				&HashMap::new(),
501			)
502			.await;
503		assert!(result.is_err());
504		match result {
505			Err(NotificationError::ConfigError(ctx)) => {
506				assert!(ctx
507					.message
508					.contains("Trigger type is not webhook-compatible"));
509			}
510			_ => panic!("Expected ConfigError"),
511		}
512	}
513
514	#[tokio::test]
515	async fn test_telegram_notification_invalid_config() {
516		let service = NotificationService::new();
517
518		let trigger = TriggerBuilder::new()
519			.name("test_telegram")
520			.script("invalid", ScriptLanguage::Python)
521			.trigger_type(TriggerType::Telegram) // Intentionally wrong config type
522			.build();
523
524		let variables = HashMap::new();
525		let result = service
526			.execute(
527				&trigger,
528				&variables,
529				&create_mock_monitor_match(),
530				&HashMap::new(),
531			)
532			.await;
533		assert!(result.is_err());
534		match result {
535			Err(NotificationError::ConfigError(ctx)) => {
536				assert!(ctx
537					.message
538					.contains("Trigger type is not webhook-compatible"));
539			}
540			_ => panic!("Expected ConfigError"),
541		}
542	}
543
544	#[tokio::test]
545	async fn test_script_notification_invalid_config() {
546		let service = NotificationService::new();
547
548		let trigger = TriggerBuilder::new()
549			.name("test_script")
550			.telegram("invalid", "invalid", false)
551			.trigger_type(TriggerType::Script) // Intentionally wrong config type
552			.build();
553
554		let variables = HashMap::new();
555
556		let result = service
557			.execute(
558				&trigger,
559				&variables,
560				&create_mock_monitor_match(),
561				&HashMap::new(),
562			)
563			.await;
564
565		assert!(result.is_err());
566		match result {
567			Err(NotificationError::ConfigError(ctx)) => {
568				assert!(ctx.message.contains("Invalid script configuration"));
569			}
570			_ => panic!("Expected ConfigError"),
571		}
572	}
573
574	#[test]
575	fn as_webhook_components_trait_for_slack_config() {
576		let title = "Slack Title";
577		let message = "Slack Body";
578
579		let slack_config = TriggerTypeConfig::Slack {
580			slack_url: SecretValue::Plain(SecretString::new(
581				"https://slack.example.com".to_string(),
582			)),
583			message: NotificationMessage {
584				title: title.to_string(),
585				body: message.to_string(),
586			},
587			retry_policy: RetryConfig::default(),
588		};
589
590		let components = slack_config.as_webhook_components().unwrap();
591
592		// Assert WebhookConfig is correct
593		assert_eq!(components.config.url, "https://slack.example.com");
594		assert_eq!(components.config.title, title);
595		assert_eq!(components.config.body_template, message);
596		assert_eq!(components.config.method, Some("POST".to_string()));
597		assert!(components.config.secret.is_none());
598
599		// Assert the builder creates the correct payload
600		let payload = components
601			.builder
602			.build_payload(title, message, &HashMap::new());
603		assert!(
604			payload.get("blocks").is_some(),
605			"Expected a Slack payload with 'blocks'"
606		);
607		assert!(
608			payload.get("content").is_none(),
609			"Did not expect a Discord payload"
610		);
611	}
612
613	#[test]
614	fn as_webhook_components_trait_for_discord_config() {
615		let title = "Discord Title";
616		let message = "Discord Body";
617		let discord_config = TriggerTypeConfig::Discord {
618			discord_url: SecretValue::Plain(SecretString::new(
619				"https://discord.example.com".to_string(),
620			)),
621			message: NotificationMessage {
622				title: title.to_string(),
623				body: message.to_string(),
624			},
625			retry_policy: RetryConfig::default(),
626		};
627
628		let components = discord_config.as_webhook_components().unwrap();
629
630		// Assert WebhookConfig is correct
631		assert_eq!(components.config.url, "https://discord.example.com");
632		assert_eq!(components.config.title, title);
633		assert_eq!(components.config.body_template, message);
634		assert_eq!(components.config.method, Some("POST".to_string()));
635
636		// Assert the builder creates the correct payload
637		let payload = components
638			.builder
639			.build_payload(title, message, &HashMap::new());
640		assert!(
641			payload.get("content").is_some(),
642			"Expected a Discord payload with 'content'"
643		);
644		assert!(
645			payload.get("blocks").is_none(),
646			"Did not expect a Slack payload"
647		);
648	}
649
650	#[test]
651	fn as_webhook_components_trait_for_telegram_config() {
652		let title = "Telegram Title";
653		let message = "Telegram Body";
654		let telegram_config = TriggerTypeConfig::Telegram {
655			token: SecretValue::Plain(SecretString::new("test-token".to_string())),
656			chat_id: "12345".to_string(),
657			disable_web_preview: Some(true),
658			message: NotificationMessage {
659				title: title.to_string(),
660				body: message.to_string(),
661			},
662			retry_policy: RetryConfig::default(),
663		};
664
665		let components = telegram_config.as_webhook_components().unwrap();
666
667		// Assert WebhookConfig is correct
668		assert_eq!(
669			components.config.url,
670			"https://api.telegram.org/bottest-token/sendMessage"
671		);
672		assert_eq!(components.config.title, title);
673		assert_eq!(components.config.body_template, message);
674
675		// Assert the builder creates the correct payload
676		let payload = components
677			.builder
678			.build_payload(title, message, &HashMap::new());
679		assert_eq!(payload.get("chat_id").unwrap(), "12345");
680		assert_eq!(payload.get("disable_web_page_preview").unwrap(), &true);
681		assert!(payload.get("text").is_some());
682	}
683
684	#[test]
685	fn as_webhook_components_trait_for_generic_webhook_config() {
686		let title = "Generic Title";
687		let body_template = "Generic Body";
688		let webhook_config = TriggerTypeConfig::Webhook {
689			url: SecretValue::Plain(SecretString::new("https://generic.example.com".to_string())),
690			message: NotificationMessage {
691				title: title.to_string(),
692				body: body_template.to_string(),
693			},
694			method: Some("PUT".to_string()),
695			secret: Some(SecretValue::Plain(SecretString::new(
696				"my-secret".to_string(),
697			))),
698			headers: Some([("X-Custom".to_string(), "Value".to_string())].into()),
699			retry_policy: RetryConfig::default(),
700		};
701
702		let components = webhook_config.as_webhook_components().unwrap();
703
704		// Assert WebhookConfig is correct
705		assert_eq!(components.config.url, "https://generic.example.com");
706		assert_eq!(components.config.method, Some("PUT".to_string()));
707		assert_eq!(components.config.secret, Some("my-secret".to_string()));
708		assert!(components.config.headers.is_some());
709		assert_eq!(
710			components.config.headers.unwrap().get("X-Custom").unwrap(),
711			"Value"
712		);
713
714		// Assert the builder creates the correct payload
715		let payload = components
716			.builder
717			.build_payload(title, body_template, &HashMap::new());
718		assert!(payload.get("title").is_some());
719		assert!(payload.get("body").is_some());
720	}
721}