openzeppelin_monitor/services/blockchain/clients/stellar/
client.rs

1//! Stellar blockchain client implementation.
2//!
3//! This module provides functionality to interact with the Stellar blockchain,
4//! supporting operations like block retrieval, transaction lookup, and event filtering.
5
6use anyhow::Context;
7use async_trait::async_trait;
8use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, Engine as _};
9use serde_json::json;
10use std::marker::PhantomData;
11use stellar_xdr::curr::{Limits, WriteXdr};
12use tracing::instrument;
13
14use crate::{
15	models::{
16		BlockType, ContractSpec, Network, StellarBlock, StellarContractSpec, StellarEvent,
17		StellarTransaction, StellarTransactionInfo,
18	},
19	services::{
20		blockchain::{
21			client::{BlockChainClient, BlockFilterFactory},
22			transports::StellarTransportClient,
23			BlockchainTransport,
24		},
25		filter::{
26			stellar_helpers::{
27				get_contract_code_ledger_key, get_contract_instance_ledger_key, get_contract_spec,
28				get_wasm_code_from_ledger_entry_data, get_wasm_hash_from_ledger_entry_data,
29			},
30			StellarBlockFilter,
31		},
32	},
33};
34
35use super::error::StellarClientError;
36
37/// Stellar RPC method constants
38const RPC_METHOD_GET_TRANSACTIONS: &str = "getTransactions";
39const RPC_METHOD_GET_EVENTS: &str = "getEvents";
40const RPC_METHOD_GET_LATEST_LEDGER: &str = "getLatestLedger";
41const RPC_METHOD_GET_LEDGERS: &str = "getLedgers";
42const RPC_METHOD_GET_LEDGER_ENTRIES: &str = "getLedgerEntries";
43
44const RETENTION_MESSAGES: [&str; 2] = [
45	"must be within the ledger range",
46	"must be between the oldest ledger",
47];
48
49/// Client implementation for the Stellar blockchain
50///
51/// Provides high-level access to Stellar blockchain data and operations through HTTP transport.
52#[derive(Clone)]
53pub struct StellarClient<T: Send + Sync + Clone> {
54	/// The underlying Stellar transport client for RPC communication
55	http_client: T,
56}
57
58impl<T: Send + Sync + Clone> StellarClient<T> {
59	/// Creates a new Stellar client instance with a specific transport client
60	pub fn new_with_transport(http_client: T) -> Self {
61		Self { http_client }
62	}
63
64	/// Checks a JSON-RPC response for error information and converts it into a `StellarClientError` if present.
65	///
66	/// This function inspects the given JSON response body for an "error" field.
67	/// If a known "outside of retention window" condition is detected (by code/message),
68	/// it returns a specific `StellarClientError::outside_retention_window`.
69	/// Otherwise, it returns a generic `StellarClientError::rpc_error` for any other error found.
70	/// If no error is present, it returns `Ok(())`.
71	///
72	/// # Arguments
73	/// * `response_body` - A reference to the JSON response body to inspect.
74	/// * `start_sequence` - The starting ledger sequence number relevant to the request.
75	/// * `target_sequence` - The target ledger sequence number relevant to the request.
76	/// * `method_name` - The name of the RPC method that was called (for error reporting).
77	///
78	/// # Returns
79	/// * `Ok(())` if no error is present in the response.
80	/// * `Err(StellarClientError)` if an error is detected.
81	fn check_and_handle_rpc_error(
82		&self,
83		response_body: &serde_json::Value,
84		start_sequence: u32,
85		target_sequence: u32,
86		method_name: &'static str,
87	) -> Result<(), StellarClientError> {
88		if let Some(json_rpc_error) = response_body.get("error") {
89			let rpc_code = json_rpc_error
90				.get("code")
91				.and_then(|c| c.as_i64())
92				.unwrap_or(0);
93			let rpc_message = json_rpc_error
94				.get("message")
95				.and_then(|m| m.as_str())
96				.unwrap_or("Unknown RPC error")
97				.to_string();
98
99			if rpc_code == -32600
100				&& RETENTION_MESSAGES
101					.iter()
102					.any(|msg| rpc_message.to_lowercase().contains(msg))
103			{
104				let ledger_info_str = format!(
105					"start_sequence: {}, target_sequence: {}",
106					start_sequence, target_sequence
107				);
108
109				return Err(StellarClientError::outside_retention_window(
110					rpc_code,
111					rpc_message,
112					ledger_info_str,
113					None,
114					None,
115				));
116			} else {
117				// Other JSON-RPC error reported by the server
118				let message = format!(
119					"Stellar RPC request failed for method '{}': {} (code {})",
120					method_name, rpc_message, rpc_code
121				);
122
123				return Err(StellarClientError::rpc_error(message, None, None));
124			}
125		}
126		Ok(())
127	}
128}
129
130impl StellarClient<StellarTransportClient> {
131	/// Creates a new Stellar client instance
132	///
133	/// # Arguments
134	/// * `network` - Network configuration containing RPC endpoints and chain details
135	///
136	/// # Returns
137	/// * `Result<Self, anyhow::Error>` - New client instance or connection error
138	pub async fn new(network: &Network) -> Result<Self, anyhow::Error> {
139		let http_client = StellarTransportClient::new(network).await?;
140		Ok(Self::new_with_transport(http_client))
141	}
142}
143
144/// Extended functionality specific to the Stellar blockchain
145#[async_trait]
146pub trait StellarClientTrait {
147	/// Retrieves transactions within a sequence range
148	///
149	/// # Arguments
150	/// * `start_sequence` - Starting sequence number
151	/// * `end_sequence` - Optional ending sequence number. If None, only fetches start_sequence
152	///
153	/// # Returns
154	/// * `Result<Vec<StellarTransaction>, anyhow::Error>` - Collection of transactions or error
155	async fn get_transactions(
156		&self,
157		start_sequence: u32,
158		end_sequence: Option<u32>,
159	) -> Result<Vec<StellarTransaction>, anyhow::Error>;
160
161	/// Retrieves events within a sequence range
162	///
163	/// # Arguments
164	/// * `start_sequence` - Starting sequence number
165	/// * `end_sequence` - Optional ending sequence number. If None, only fetches start_sequence
166	///
167	/// # Returns
168	/// * `Result<Vec<StellarEvent>, anyhow::Error>` - Collection of events or error
169	async fn get_events(
170		&self,
171		start_sequence: u32,
172		end_sequence: Option<u32>,
173	) -> Result<Vec<StellarEvent>, anyhow::Error>;
174}
175
176#[async_trait]
177impl<T: Send + Sync + Clone + BlockchainTransport> StellarClientTrait for StellarClient<T> {
178	/// Retrieves transactions within a sequence range with pagination
179	///
180	/// # Errors
181	/// - Returns `anyhow::Error` if start_sequence > end_sequence
182	/// - Returns `anyhow::Error` if transaction parsing fails
183	#[instrument(skip(self), fields(start_sequence, end_sequence))]
184	async fn get_transactions(
185		&self,
186		start_sequence: u32,
187		end_sequence: Option<u32>,
188	) -> Result<Vec<StellarTransaction>, anyhow::Error> {
189		// Validate input parameters
190		if let Some(end_sequence) = end_sequence {
191			if start_sequence > end_sequence {
192				let message = format!(
193					"start_sequence {} cannot be greater than end_sequence {}",
194					start_sequence, end_sequence
195				);
196				let input_error = StellarClientError::invalid_input(message, None, None);
197				return Err(anyhow::anyhow!(input_error))
198					.context("Invalid input parameters for Stellar RPC");
199			}
200		}
201
202		// max limit for the RPC endpoint is 200
203		const PAGE_LIMIT: u32 = 200;
204		let mut transactions = Vec::new();
205		let target_sequence = end_sequence.unwrap_or(start_sequence);
206		let mut cursor: Option<String> = None;
207		let mut current_iteration = 0;
208
209		while cursor.is_some() || current_iteration <= 0 {
210			let params = if current_iteration == 0 {
211				// First iteration, we need to fetch the transactions from the start sequence without a cursor
212				json!({
213					"startLedger": start_sequence,
214					"pagination": {
215						"limit": PAGE_LIMIT
216					}
217				})
218			} else {
219				// Subsequent iterations, we need to fetch the transactions from the cursor
220				json!({
221					"pagination": {
222						"cursor": cursor,
223						"limit": PAGE_LIMIT
224					}
225				})
226			};
227
228			let http_response = self
229				.http_client
230				.send_raw_request(RPC_METHOD_GET_TRANSACTIONS, Some(params))
231				.await;
232
233			match http_response {
234				Ok(response_body) => {
235					// Check for RPC errors in the response
236					if let Err(rpc_error) = self.check_and_handle_rpc_error(
237						&response_body,
238						start_sequence,
239						target_sequence,
240						RPC_METHOD_GET_TRANSACTIONS,
241					) {
242						// A terminal JSON-RPC error was found, convert and return
243						return Err(anyhow::anyhow!(rpc_error).context(format!(
244							"Soroban RPC reported an error during {}",
245							RPC_METHOD_GET_TRANSACTIONS,
246						)));
247					}
248
249					// Extract the transactions from the response
250					let raw_transactions = response_body
251						.get("result")
252						.and_then(|r| r.get("transactions"))
253						.ok_or_else(|| {
254							let message = format!(
255								"Unexpected response structure for method '{}'",
256								RPC_METHOD_GET_TRANSACTIONS
257							);
258							StellarClientError::unexpected_response_structure(message, None, None)
259						})
260						.map_err(|client_parse_error| {
261							anyhow::anyhow!(client_parse_error)
262								.context("Failed to parse transaction response")
263						})?;
264
265					let ledger_transactions: Vec<StellarTransactionInfo> =
266						serde_json::from_value(raw_transactions.clone()).map_err(|e| {
267							let message = format!(
268								"Failed to parse transactions from response for method '{}': {}",
269								RPC_METHOD_GET_TRANSACTIONS, e
270							);
271							let sce_parse_error = StellarClientError::response_parse_error(
272								message,
273								Some(e.into()),
274								None,
275							);
276							anyhow::anyhow!(sce_parse_error)
277								.context("Failed to parse transaction response")
278						})?;
279
280					if ledger_transactions.is_empty() {
281						break;
282					}
283
284					for transaction in ledger_transactions {
285						if transaction.ledger > target_sequence {
286							return Ok(transactions);
287						}
288						transactions.push(StellarTransaction::from(transaction));
289					}
290
291					// Increment the number of iterations to ensure we break the loop in case there is no cursor
292					current_iteration += 1;
293					cursor = response_body["result"]["cursor"]
294						.as_str()
295						.map(|s| s.to_string());
296					if cursor.is_none() {
297						break;
298					}
299				}
300				Err(transport_err) => {
301					// Ledger info for logging
302					let ledger_info = format!(
303						"start_sequence: {}, end_sequence: {:?}",
304						start_sequence, end_sequence
305					);
306
307					return Err(anyhow::anyhow!(transport_err)).context(format!(
308						"Failed to {} from Stellar RPC for ledger: {}",
309						RPC_METHOD_GET_TRANSACTIONS, ledger_info
310					));
311				}
312			}
313		}
314		Ok(transactions)
315	}
316
317	/// Retrieves events within a sequence range with pagination
318	///
319	/// # Errors
320	/// - Returns `anyhow::Error` if start_sequence > end_sequence
321	/// - Returns `anyhow::Error` if event parsing fails
322	#[instrument(skip(self), fields(start_sequence, end_sequence))]
323	async fn get_events(
324		&self,
325		start_sequence: u32,
326		end_sequence: Option<u32>,
327	) -> Result<Vec<StellarEvent>, anyhow::Error> {
328		// Validate input parameters
329		if let Some(end_sequence) = end_sequence {
330			if start_sequence > end_sequence {
331				let message = format!(
332					"start_sequence {} cannot be greater than end_sequence {}",
333					start_sequence, end_sequence
334				);
335				let input_error = StellarClientError::invalid_input(message, None, None);
336				return Err(anyhow::anyhow!(input_error))
337					.context("Invalid input parameters for Stellar RPC");
338			}
339		}
340
341		// max limit for the RPC endpoint is 200
342		const PAGE_LIMIT: u32 = 200;
343		let mut events = Vec::new();
344		let target_sequence = end_sequence.unwrap_or(start_sequence);
345		let mut cursor: Option<String> = None;
346		let mut current_iteration = 0;
347
348		while cursor.is_some() || current_iteration <= 0 {
349			let params = if current_iteration == 0 {
350				// First iteration, we need to fetch the events from the start sequence without a cursor
351				json!({
352					"startLedger": start_sequence,
353					"filters": [{
354						"type": "contract",
355					}],
356					"pagination": {
357						"limit": PAGE_LIMIT
358					}
359				})
360			} else {
361				// Subsequent iterations, we need to fetch the events from the cursor
362				json!({
363					"filters": [{
364						"type": "contract",
365					}],
366					"pagination": {
367						"cursor": cursor,
368						"limit": PAGE_LIMIT
369					}
370				})
371			};
372
373			let http_response = self
374				.http_client
375				.send_raw_request(RPC_METHOD_GET_EVENTS, Some(params))
376				.await;
377
378			match http_response {
379				Ok(response_body) => {
380					// Check for RPC errors in the response
381					if let Err(rpc_error) = self.check_and_handle_rpc_error(
382						&response_body,
383						start_sequence,
384						target_sequence,
385						RPC_METHOD_GET_EVENTS,
386					) {
387						// A terminal JSON-RPC error was found, convert and return
388						return Err(anyhow::anyhow!(rpc_error).context(format!(
389							"Soroban RPC reported an error during {}",
390							RPC_METHOD_GET_EVENTS
391						)));
392					}
393
394					// Extract the events from the response
395					let raw_events = response_body
396						.get("result")
397						.and_then(|r| r.get("events"))
398						.ok_or_else(|| {
399							let message = format!(
400								"Unexpected response structure for method '{}'",
401								RPC_METHOD_GET_EVENTS
402							);
403							StellarClientError::unexpected_response_structure(message, None, None)
404						})
405						.map_err(|client_parse_error| {
406							anyhow::anyhow!(client_parse_error)
407								.context("Failed to parse event response")
408						})?;
409
410					let ledger_events: Vec<StellarEvent> =
411						serde_json::from_value(raw_events.clone()).map_err(|e| {
412							let message = format!(
413								"Failed to parse events from response for method '{}': {}",
414								RPC_METHOD_GET_EVENTS, e
415							);
416							let sce_parse_error = StellarClientError::response_parse_error(
417								message,
418								Some(e.into()),
419								None,
420							);
421							anyhow::anyhow!(sce_parse_error)
422								.context("Failed to parse event response")
423						})?;
424
425					for event in ledger_events {
426						if event.ledger > target_sequence {
427							return Ok(events);
428						}
429						events.push(event);
430					}
431
432					// Increment the number of iterations to ensure we break the loop in case there is no cursor
433					current_iteration += 1;
434					cursor = response_body["result"]["cursor"]
435						.as_str()
436						.map(|s| s.to_string());
437					if cursor.is_none() {
438						break;
439					}
440				}
441				Err(transport_err) => {
442					// Ledger info for logging
443					let ledger_info = format!(
444						"start_sequence: {}, end_sequence: {:?}",
445						start_sequence, end_sequence
446					);
447
448					return Err(anyhow::anyhow!(transport_err)).context(format!(
449						"Failed to {} from Stellar RPC for ledger: {}",
450						RPC_METHOD_GET_EVENTS, ledger_info,
451					));
452				}
453			}
454		}
455		Ok(events)
456	}
457}
458
459impl<T: Send + Sync + Clone + BlockchainTransport> BlockFilterFactory<Self> for StellarClient<T> {
460	type Filter = StellarBlockFilter<Self>;
461
462	fn filter() -> Self::Filter {
463		StellarBlockFilter {
464			_client: PhantomData {},
465		}
466	}
467}
468
469#[async_trait]
470impl<T: Send + Sync + Clone + BlockchainTransport> BlockChainClient for StellarClient<T> {
471	/// Retrieves the latest block number with retry functionality
472	#[instrument(skip(self))]
473	async fn get_latest_block_number(&self) -> Result<u64, anyhow::Error> {
474		let response = self
475			.http_client
476			.send_raw_request::<serde_json::Value>(RPC_METHOD_GET_LATEST_LEDGER, None)
477			.await
478			.with_context(|| "Failed to get latest ledger")?;
479
480		let sequence = response["result"]["sequence"]
481			.as_u64()
482			.ok_or_else(|| anyhow::anyhow!("Invalid sequence number"))?;
483
484		Ok(sequence)
485	}
486
487	/// Retrieves blocks within the specified range with retry functionality
488	///
489	/// # Note
490	/// If end_block is None, only the start_block will be retrieved
491	///
492	/// # Errors
493	/// - Returns `anyhow::Error`
494	#[instrument(skip(self), fields(start_block, end_block))]
495	async fn get_blocks(
496		&self,
497		start_block: u64,
498		end_block: Option<u64>,
499	) -> Result<Vec<BlockType>, anyhow::Error> {
500		// max limit for the RPC endpoint is 200
501		const PAGE_LIMIT: u32 = 200;
502
503		// Validate input parameters
504		if let Some(end_block) = end_block {
505			if start_block > end_block {
506				let message = format!(
507					"start_block {} cannot be greater than end_block {}",
508					start_block, end_block
509				);
510				let input_error = StellarClientError::invalid_input(message, None, None);
511				return Err(anyhow::anyhow!(input_error))
512					.context("Invalid input parameters for Stellar RPC");
513			}
514		}
515
516		let mut blocks = Vec::new();
517		let target_block = end_block.unwrap_or(start_block);
518		let mut cursor: Option<String> = None;
519		let mut current_iteration = 0;
520
521		while cursor.is_some() || current_iteration <= 0 {
522			let params = if current_iteration == 0 {
523				// First iteration, we need to fetch the ledgers from the start block without a cursor
524				json!({
525					"startLedger": start_block,
526					"pagination": {
527						"limit": if end_block.is_none() { 1 } else { PAGE_LIMIT }
528					}
529				})
530			} else {
531				// Subsequent iterations, we need to fetch the ledgers from the cursor
532				json!({
533					"pagination": {
534						"cursor": cursor,
535						"limit": PAGE_LIMIT
536					}
537				})
538			};
539
540			let http_response = self
541				.http_client
542				.send_raw_request(RPC_METHOD_GET_LEDGERS, Some(params))
543				.await;
544
545			match http_response {
546				Ok(response_body) => {
547					// Check for RPC errors in the response
548					if let Err(rpc_error) = self.check_and_handle_rpc_error(
549						&response_body,
550						start_block as u32,
551						target_block as u32,
552						RPC_METHOD_GET_LEDGERS,
553					) {
554						// A terminal JSON-RPC error was found, convert and return
555						return Err(anyhow::anyhow!(rpc_error).context(format!(
556							"Soroban RPC reported an error during {}",
557							RPC_METHOD_GET_LEDGERS,
558						)));
559					}
560
561					// Extract the ledgers from the response
562					let raw_ledgers = response_body
563						.get("result")
564						.and_then(|r| r.get("ledgers"))
565						.ok_or_else(|| {
566							let message = format!(
567								"Unexpected response structure for method '{}'",
568								RPC_METHOD_GET_LEDGERS
569							);
570							let sce_parse_error = StellarClientError::unexpected_response_structure(
571								message, None, None,
572							);
573							anyhow::anyhow!(sce_parse_error)
574								.context("Failed to parse ledger response")
575						})?;
576
577					let ledgers: Vec<StellarBlock> = serde_json::from_value(raw_ledgers.clone())
578						.map_err(|e| {
579							let message = format!(
580								"Failed to parse ledgers from response for method '{}': {}",
581								RPC_METHOD_GET_LEDGERS, e
582							);
583							let sce_parse_error = StellarClientError::response_parse_error(
584								message,
585								Some(e.into()),
586								None,
587							);
588							anyhow::anyhow!(sce_parse_error)
589								.context("Failed to parse ledger response")
590						})?;
591
592					if ledgers.is_empty() {
593						break;
594					}
595
596					for ledger in ledgers {
597						if (ledger.sequence as u64) > target_block {
598							return Ok(blocks);
599						}
600						blocks.push(BlockType::Stellar(Box::new(ledger)));
601					}
602
603					// Increment the number of iterations to ensure we break the loop in case there is no cursor
604					current_iteration += 1;
605					cursor = response_body["result"]["cursor"]
606						.as_str()
607						.map(|s| s.to_string());
608
609					// If the cursor is the same as the start block, we have reached the end of the range
610					if cursor == Some(start_block.to_string()) {
611						break;
612					}
613
614					if cursor.is_none() {
615						break;
616					}
617				}
618				Err(transport_err) => {
619					// Ledger info for logging
620					let ledger_info =
621						format!("start_block: {}, end_block: {:?}", start_block, end_block);
622
623					return Err(anyhow::anyhow!(transport_err)).context(format!(
624						"Failed to {} from Stellar RPC for ledger: {}",
625						RPC_METHOD_GET_LEDGERS, ledger_info,
626					));
627				}
628			}
629		}
630		Ok(blocks)
631	}
632
633	/// Retrieves the contract spec for a given contract ID
634	///
635	/// # Arguments
636	/// * `contract_id` - The ID of the contract to retrieve the spec for
637	///
638	/// # Returns
639	/// * `Result<ContractSpec, anyhow::Error>` - The contract spec or error
640	#[instrument(skip(self), fields(contract_id))]
641	async fn get_contract_spec(&self, contract_id: &str) -> Result<ContractSpec, anyhow::Error> {
642		// Get contract wasm code from contract ID
643		let contract_instance_ledger_key = get_contract_instance_ledger_key(contract_id)
644			.map_err(|e| anyhow::anyhow!("Failed to get contract instance ledger key: {}", e))?;
645
646		let contract_instance_ledger_key_xdr_bytes = contract_instance_ledger_key
647			.to_xdr(Limits::none())
648			.map_err(|e| {
649				anyhow::anyhow!(
650					"Failed to convert contract instance ledger key to XDR: {}",
651					e
652				)
653			})?;
654		let contract_instance_ledger_key_xdr =
655			BASE64_STANDARD.encode(&contract_instance_ledger_key_xdr_bytes);
656
657		let params = json!({
658			"keys": [contract_instance_ledger_key_xdr],
659			"xdrFormat": "base64"
660		});
661
662		let response = self
663			.http_client
664			.send_raw_request(RPC_METHOD_GET_LEDGER_ENTRIES, Some(params))
665			.await
666			.with_context(|| format!("Failed to get contract wasm code for {}", contract_id))?;
667
668		let contract_data_xdr_base64 = match response["result"]["entries"][0]["xdr"].as_str() {
669			Some(xdr) => xdr,
670			None => {
671				return Err(anyhow::anyhow!("Failed to get contract data XDR"));
672			}
673		};
674
675		let wasm_hash = get_wasm_hash_from_ledger_entry_data(contract_data_xdr_base64)
676			.map_err(|e| anyhow::anyhow!("Failed to get wasm hash: {}", e))?;
677
678		let contract_code_ledger_key = get_contract_code_ledger_key(wasm_hash.as_str())
679			.map_err(|e| anyhow::anyhow!("Failed to get contract code ledger key: {}", e))?;
680
681		let contract_code_ledger_key_xdr_bytes = contract_code_ledger_key
682			.to_xdr(Limits::none())
683			.map_err(|e| {
684				anyhow::anyhow!("Failed to convert contract code ledger key to XDR: {}", e)
685			})?;
686		let contract_code_ledger_key_xdr =
687			BASE64_STANDARD.encode(&contract_code_ledger_key_xdr_bytes);
688
689		let params = json!({
690			"keys": [contract_code_ledger_key_xdr],
691			"xdrFormat": "base64"
692		});
693
694		let response = self
695			.http_client
696			.send_raw_request(RPC_METHOD_GET_LEDGER_ENTRIES, Some(params))
697			.await
698			.with_context(|| format!("Failed to get contract wasm code for {}", contract_id))?;
699
700		let contract_code_xdr_base64 = match response["result"]["entries"][0]["xdr"].as_str() {
701			Some(xdr) => xdr,
702			None => {
703				return Err(anyhow::anyhow!("Failed to get contract code XDR"));
704			}
705		};
706
707		let wasm_code = get_wasm_code_from_ledger_entry_data(contract_code_xdr_base64)
708			.map_err(|e| anyhow::anyhow!("Failed to get wasm code: {}", e))?;
709
710		let contract_spec = get_contract_spec(wasm_code.as_str())
711			.map_err(|e| anyhow::anyhow!("Failed to get contract spec: {}", e))?;
712
713		Ok(ContractSpec::Stellar(StellarContractSpec::from(
714			contract_spec,
715		)))
716	}
717}