1use anyhow::Context;
7use async_trait::async_trait;
8use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, Engine as _};
9use serde_json::json;
10use std::marker::PhantomData;
11use stellar_xdr::curr::{Limits, WriteXdr};
12use tracing::instrument;
13
14use crate::{
15 models::{
16 BlockType, ContractSpec, Network, StellarBlock, StellarContractSpec, StellarEvent,
17 StellarTransaction, StellarTransactionInfo,
18 },
19 services::{
20 blockchain::{
21 client::{BlockChainClient, BlockFilterFactory},
22 transports::StellarTransportClient,
23 BlockchainTransport,
24 },
25 filter::{
26 stellar_helpers::{
27 get_contract_code_ledger_key, get_contract_instance_ledger_key, get_contract_spec,
28 get_wasm_code_from_ledger_entry_data, get_wasm_hash_from_ledger_entry_data,
29 },
30 StellarBlockFilter,
31 },
32 },
33};
34
35use super::error::StellarClientError;
36
37const RPC_METHOD_GET_TRANSACTIONS: &str = "getTransactions";
39const RPC_METHOD_GET_EVENTS: &str = "getEvents";
40const RPC_METHOD_GET_LATEST_LEDGER: &str = "getLatestLedger";
41const RPC_METHOD_GET_LEDGERS: &str = "getLedgers";
42const RPC_METHOD_GET_LEDGER_ENTRIES: &str = "getLedgerEntries";
43
44const RETENTION_MESSAGES: [&str; 2] = [
45 "must be within the ledger range",
46 "must be between the oldest ledger",
47];
48
49#[derive(Clone)]
53pub struct StellarClient<T: Send + Sync + Clone> {
54 http_client: T,
56}
57
58impl<T: Send + Sync + Clone> StellarClient<T> {
59 pub fn new_with_transport(http_client: T) -> Self {
61 Self { http_client }
62 }
63
64 fn check_and_handle_rpc_error(
82 &self,
83 response_body: &serde_json::Value,
84 start_sequence: u32,
85 target_sequence: u32,
86 method_name: &'static str,
87 ) -> Result<(), StellarClientError> {
88 if let Some(json_rpc_error) = response_body.get("error") {
89 let rpc_code = json_rpc_error
90 .get("code")
91 .and_then(|c| c.as_i64())
92 .unwrap_or(0);
93 let rpc_message = json_rpc_error
94 .get("message")
95 .and_then(|m| m.as_str())
96 .unwrap_or("Unknown RPC error")
97 .to_string();
98
99 if rpc_code == -32600
100 && RETENTION_MESSAGES
101 .iter()
102 .any(|msg| rpc_message.to_lowercase().contains(msg))
103 {
104 let ledger_info_str = format!(
105 "start_sequence: {}, target_sequence: {}",
106 start_sequence, target_sequence
107 );
108
109 return Err(StellarClientError::outside_retention_window(
110 rpc_code,
111 rpc_message,
112 ledger_info_str,
113 None,
114 None,
115 ));
116 } else {
117 let message = format!(
119 "Stellar RPC request failed for method '{}': {} (code {})",
120 method_name, rpc_message, rpc_code
121 );
122
123 return Err(StellarClientError::rpc_error(message, None, None));
124 }
125 }
126 Ok(())
127 }
128}
129
130impl StellarClient<StellarTransportClient> {
131 pub async fn new(network: &Network) -> Result<Self, anyhow::Error> {
139 let http_client = StellarTransportClient::new(network).await?;
140 Ok(Self::new_with_transport(http_client))
141 }
142}
143
144#[async_trait]
146pub trait StellarClientTrait {
147 async fn get_transactions(
156 &self,
157 start_sequence: u32,
158 end_sequence: Option<u32>,
159 ) -> Result<Vec<StellarTransaction>, anyhow::Error>;
160
161 async fn get_events(
170 &self,
171 start_sequence: u32,
172 end_sequence: Option<u32>,
173 ) -> Result<Vec<StellarEvent>, anyhow::Error>;
174}
175
176#[async_trait]
177impl<T: Send + Sync + Clone + BlockchainTransport> StellarClientTrait for StellarClient<T> {
178 #[instrument(skip(self), fields(start_sequence, end_sequence))]
184 async fn get_transactions(
185 &self,
186 start_sequence: u32,
187 end_sequence: Option<u32>,
188 ) -> Result<Vec<StellarTransaction>, anyhow::Error> {
189 if let Some(end_sequence) = end_sequence {
191 if start_sequence > end_sequence {
192 let message = format!(
193 "start_sequence {} cannot be greater than end_sequence {}",
194 start_sequence, end_sequence
195 );
196 let input_error = StellarClientError::invalid_input(message, None, None);
197 return Err(anyhow::anyhow!(input_error))
198 .context("Invalid input parameters for Stellar RPC");
199 }
200 }
201
202 const PAGE_LIMIT: u32 = 200;
204 let mut transactions = Vec::new();
205 let target_sequence = end_sequence.unwrap_or(start_sequence);
206 let mut cursor: Option<String> = None;
207 let mut current_iteration = 0;
208
209 while cursor.is_some() || current_iteration <= 0 {
210 let params = if current_iteration == 0 {
211 json!({
213 "startLedger": start_sequence,
214 "pagination": {
215 "limit": PAGE_LIMIT
216 }
217 })
218 } else {
219 json!({
221 "pagination": {
222 "cursor": cursor,
223 "limit": PAGE_LIMIT
224 }
225 })
226 };
227
228 let http_response = self
229 .http_client
230 .send_raw_request(RPC_METHOD_GET_TRANSACTIONS, Some(params))
231 .await;
232
233 match http_response {
234 Ok(response_body) => {
235 if let Err(rpc_error) = self.check_and_handle_rpc_error(
237 &response_body,
238 start_sequence,
239 target_sequence,
240 RPC_METHOD_GET_TRANSACTIONS,
241 ) {
242 return Err(anyhow::anyhow!(rpc_error).context(format!(
244 "Soroban RPC reported an error during {}",
245 RPC_METHOD_GET_TRANSACTIONS,
246 )));
247 }
248
249 let raw_transactions = response_body
251 .get("result")
252 .and_then(|r| r.get("transactions"))
253 .ok_or_else(|| {
254 let message = format!(
255 "Unexpected response structure for method '{}'",
256 RPC_METHOD_GET_TRANSACTIONS
257 );
258 StellarClientError::unexpected_response_structure(message, None, None)
259 })
260 .map_err(|client_parse_error| {
261 anyhow::anyhow!(client_parse_error)
262 .context("Failed to parse transaction response")
263 })?;
264
265 let ledger_transactions: Vec<StellarTransactionInfo> =
266 serde_json::from_value(raw_transactions.clone()).map_err(|e| {
267 let message = format!(
268 "Failed to parse transactions from response for method '{}': {}",
269 RPC_METHOD_GET_TRANSACTIONS, e
270 );
271 let sce_parse_error = StellarClientError::response_parse_error(
272 message,
273 Some(e.into()),
274 None,
275 );
276 anyhow::anyhow!(sce_parse_error)
277 .context("Failed to parse transaction response")
278 })?;
279
280 if ledger_transactions.is_empty() {
281 break;
282 }
283
284 for transaction in ledger_transactions {
285 if transaction.ledger > target_sequence {
286 return Ok(transactions);
287 }
288 transactions.push(StellarTransaction::from(transaction));
289 }
290
291 current_iteration += 1;
293 cursor = response_body["result"]["cursor"]
294 .as_str()
295 .map(|s| s.to_string());
296 if cursor.is_none() {
297 break;
298 }
299 }
300 Err(transport_err) => {
301 let ledger_info = format!(
303 "start_sequence: {}, end_sequence: {:?}",
304 start_sequence, end_sequence
305 );
306
307 return Err(anyhow::anyhow!(transport_err)).context(format!(
308 "Failed to {} from Stellar RPC for ledger: {}",
309 RPC_METHOD_GET_TRANSACTIONS, ledger_info
310 ));
311 }
312 }
313 }
314 Ok(transactions)
315 }
316
317 #[instrument(skip(self), fields(start_sequence, end_sequence))]
323 async fn get_events(
324 &self,
325 start_sequence: u32,
326 end_sequence: Option<u32>,
327 ) -> Result<Vec<StellarEvent>, anyhow::Error> {
328 if let Some(end_sequence) = end_sequence {
330 if start_sequence > end_sequence {
331 let message = format!(
332 "start_sequence {} cannot be greater than end_sequence {}",
333 start_sequence, end_sequence
334 );
335 let input_error = StellarClientError::invalid_input(message, None, None);
336 return Err(anyhow::anyhow!(input_error))
337 .context("Invalid input parameters for Stellar RPC");
338 }
339 }
340
341 const PAGE_LIMIT: u32 = 200;
343 let mut events = Vec::new();
344 let target_sequence = end_sequence.unwrap_or(start_sequence);
345 let mut cursor: Option<String> = None;
346 let mut current_iteration = 0;
347
348 while cursor.is_some() || current_iteration <= 0 {
349 let params = if current_iteration == 0 {
350 json!({
352 "startLedger": start_sequence,
353 "filters": [{
354 "type": "contract",
355 }],
356 "pagination": {
357 "limit": PAGE_LIMIT
358 }
359 })
360 } else {
361 json!({
363 "filters": [{
364 "type": "contract",
365 }],
366 "pagination": {
367 "cursor": cursor,
368 "limit": PAGE_LIMIT
369 }
370 })
371 };
372
373 let http_response = self
374 .http_client
375 .send_raw_request(RPC_METHOD_GET_EVENTS, Some(params))
376 .await;
377
378 match http_response {
379 Ok(response_body) => {
380 if let Err(rpc_error) = self.check_and_handle_rpc_error(
382 &response_body,
383 start_sequence,
384 target_sequence,
385 RPC_METHOD_GET_EVENTS,
386 ) {
387 return Err(anyhow::anyhow!(rpc_error).context(format!(
389 "Soroban RPC reported an error during {}",
390 RPC_METHOD_GET_EVENTS
391 )));
392 }
393
394 let raw_events = response_body
396 .get("result")
397 .and_then(|r| r.get("events"))
398 .ok_or_else(|| {
399 let message = format!(
400 "Unexpected response structure for method '{}'",
401 RPC_METHOD_GET_EVENTS
402 );
403 StellarClientError::unexpected_response_structure(message, None, None)
404 })
405 .map_err(|client_parse_error| {
406 anyhow::anyhow!(client_parse_error)
407 .context("Failed to parse event response")
408 })?;
409
410 let ledger_events: Vec<StellarEvent> =
411 serde_json::from_value(raw_events.clone()).map_err(|e| {
412 let message = format!(
413 "Failed to parse events from response for method '{}': {}",
414 RPC_METHOD_GET_EVENTS, e
415 );
416 let sce_parse_error = StellarClientError::response_parse_error(
417 message,
418 Some(e.into()),
419 None,
420 );
421 anyhow::anyhow!(sce_parse_error)
422 .context("Failed to parse event response")
423 })?;
424
425 for event in ledger_events {
426 if event.ledger > target_sequence {
427 return Ok(events);
428 }
429 events.push(event);
430 }
431
432 current_iteration += 1;
434 cursor = response_body["result"]["cursor"]
435 .as_str()
436 .map(|s| s.to_string());
437 if cursor.is_none() {
438 break;
439 }
440 }
441 Err(transport_err) => {
442 let ledger_info = format!(
444 "start_sequence: {}, end_sequence: {:?}",
445 start_sequence, end_sequence
446 );
447
448 return Err(anyhow::anyhow!(transport_err)).context(format!(
449 "Failed to {} from Stellar RPC for ledger: {}",
450 RPC_METHOD_GET_EVENTS, ledger_info,
451 ));
452 }
453 }
454 }
455 Ok(events)
456 }
457}
458
459impl<T: Send + Sync + Clone + BlockchainTransport> BlockFilterFactory<Self> for StellarClient<T> {
460 type Filter = StellarBlockFilter<Self>;
461
462 fn filter() -> Self::Filter {
463 StellarBlockFilter {
464 _client: PhantomData {},
465 }
466 }
467}
468
469#[async_trait]
470impl<T: Send + Sync + Clone + BlockchainTransport> BlockChainClient for StellarClient<T> {
471 #[instrument(skip(self))]
473 async fn get_latest_block_number(&self) -> Result<u64, anyhow::Error> {
474 let response = self
475 .http_client
476 .send_raw_request::<serde_json::Value>(RPC_METHOD_GET_LATEST_LEDGER, None)
477 .await
478 .with_context(|| "Failed to get latest ledger")?;
479
480 let sequence = response["result"]["sequence"]
481 .as_u64()
482 .ok_or_else(|| anyhow::anyhow!("Invalid sequence number"))?;
483
484 Ok(sequence)
485 }
486
487 #[instrument(skip(self), fields(start_block, end_block))]
495 async fn get_blocks(
496 &self,
497 start_block: u64,
498 end_block: Option<u64>,
499 ) -> Result<Vec<BlockType>, anyhow::Error> {
500 const PAGE_LIMIT: u32 = 200;
502
503 if let Some(end_block) = end_block {
505 if start_block > end_block {
506 let message = format!(
507 "start_block {} cannot be greater than end_block {}",
508 start_block, end_block
509 );
510 let input_error = StellarClientError::invalid_input(message, None, None);
511 return Err(anyhow::anyhow!(input_error))
512 .context("Invalid input parameters for Stellar RPC");
513 }
514 }
515
516 let mut blocks = Vec::new();
517 let target_block = end_block.unwrap_or(start_block);
518 let mut cursor: Option<String> = None;
519 let mut current_iteration = 0;
520
521 while cursor.is_some() || current_iteration <= 0 {
522 let params = if current_iteration == 0 {
523 json!({
525 "startLedger": start_block,
526 "pagination": {
527 "limit": if end_block.is_none() { 1 } else { PAGE_LIMIT }
528 }
529 })
530 } else {
531 json!({
533 "pagination": {
534 "cursor": cursor,
535 "limit": PAGE_LIMIT
536 }
537 })
538 };
539
540 let http_response = self
541 .http_client
542 .send_raw_request(RPC_METHOD_GET_LEDGERS, Some(params))
543 .await;
544
545 match http_response {
546 Ok(response_body) => {
547 if let Err(rpc_error) = self.check_and_handle_rpc_error(
549 &response_body,
550 start_block as u32,
551 target_block as u32,
552 RPC_METHOD_GET_LEDGERS,
553 ) {
554 return Err(anyhow::anyhow!(rpc_error).context(format!(
556 "Soroban RPC reported an error during {}",
557 RPC_METHOD_GET_LEDGERS,
558 )));
559 }
560
561 let raw_ledgers = response_body
563 .get("result")
564 .and_then(|r| r.get("ledgers"))
565 .ok_or_else(|| {
566 let message = format!(
567 "Unexpected response structure for method '{}'",
568 RPC_METHOD_GET_LEDGERS
569 );
570 let sce_parse_error = StellarClientError::unexpected_response_structure(
571 message, None, None,
572 );
573 anyhow::anyhow!(sce_parse_error)
574 .context("Failed to parse ledger response")
575 })?;
576
577 let ledgers: Vec<StellarBlock> = serde_json::from_value(raw_ledgers.clone())
578 .map_err(|e| {
579 let message = format!(
580 "Failed to parse ledgers from response for method '{}': {}",
581 RPC_METHOD_GET_LEDGERS, e
582 );
583 let sce_parse_error = StellarClientError::response_parse_error(
584 message,
585 Some(e.into()),
586 None,
587 );
588 anyhow::anyhow!(sce_parse_error)
589 .context("Failed to parse ledger response")
590 })?;
591
592 if ledgers.is_empty() {
593 break;
594 }
595
596 for ledger in ledgers {
597 if (ledger.sequence as u64) > target_block {
598 return Ok(blocks);
599 }
600 blocks.push(BlockType::Stellar(Box::new(ledger)));
601 }
602
603 current_iteration += 1;
605 cursor = response_body["result"]["cursor"]
606 .as_str()
607 .map(|s| s.to_string());
608
609 if cursor == Some(start_block.to_string()) {
611 break;
612 }
613
614 if cursor.is_none() {
615 break;
616 }
617 }
618 Err(transport_err) => {
619 let ledger_info =
621 format!("start_block: {}, end_block: {:?}", start_block, end_block);
622
623 return Err(anyhow::anyhow!(transport_err)).context(format!(
624 "Failed to {} from Stellar RPC for ledger: {}",
625 RPC_METHOD_GET_LEDGERS, ledger_info,
626 ));
627 }
628 }
629 }
630 Ok(blocks)
631 }
632
633 #[instrument(skip(self), fields(contract_id))]
641 async fn get_contract_spec(&self, contract_id: &str) -> Result<ContractSpec, anyhow::Error> {
642 let contract_instance_ledger_key = get_contract_instance_ledger_key(contract_id)
644 .map_err(|e| anyhow::anyhow!("Failed to get contract instance ledger key: {}", e))?;
645
646 let contract_instance_ledger_key_xdr_bytes = contract_instance_ledger_key
647 .to_xdr(Limits::none())
648 .map_err(|e| {
649 anyhow::anyhow!(
650 "Failed to convert contract instance ledger key to XDR: {}",
651 e
652 )
653 })?;
654 let contract_instance_ledger_key_xdr =
655 BASE64_STANDARD.encode(&contract_instance_ledger_key_xdr_bytes);
656
657 let params = json!({
658 "keys": [contract_instance_ledger_key_xdr],
659 "xdrFormat": "base64"
660 });
661
662 let response = self
663 .http_client
664 .send_raw_request(RPC_METHOD_GET_LEDGER_ENTRIES, Some(params))
665 .await
666 .with_context(|| format!("Failed to get contract wasm code for {}", contract_id))?;
667
668 let contract_data_xdr_base64 = match response["result"]["entries"][0]["xdr"].as_str() {
669 Some(xdr) => xdr,
670 None => {
671 return Err(anyhow::anyhow!("Failed to get contract data XDR"));
672 }
673 };
674
675 let wasm_hash = get_wasm_hash_from_ledger_entry_data(contract_data_xdr_base64)
676 .map_err(|e| anyhow::anyhow!("Failed to get wasm hash: {}", e))?;
677
678 let contract_code_ledger_key = get_contract_code_ledger_key(wasm_hash.as_str())
679 .map_err(|e| anyhow::anyhow!("Failed to get contract code ledger key: {}", e))?;
680
681 let contract_code_ledger_key_xdr_bytes = contract_code_ledger_key
682 .to_xdr(Limits::none())
683 .map_err(|e| {
684 anyhow::anyhow!("Failed to convert contract code ledger key to XDR: {}", e)
685 })?;
686 let contract_code_ledger_key_xdr =
687 BASE64_STANDARD.encode(&contract_code_ledger_key_xdr_bytes);
688
689 let params = json!({
690 "keys": [contract_code_ledger_key_xdr],
691 "xdrFormat": "base64"
692 });
693
694 let response = self
695 .http_client
696 .send_raw_request(RPC_METHOD_GET_LEDGER_ENTRIES, Some(params))
697 .await
698 .with_context(|| format!("Failed to get contract wasm code for {}", contract_id))?;
699
700 let contract_code_xdr_base64 = match response["result"]["entries"][0]["xdr"].as_str() {
701 Some(xdr) => xdr,
702 None => {
703 return Err(anyhow::anyhow!("Failed to get contract code XDR"));
704 }
705 };
706
707 let wasm_code = get_wasm_code_from_ledger_entry_data(contract_code_xdr_base64)
708 .map_err(|e| anyhow::anyhow!("Failed to get wasm code: {}", e))?;
709
710 let contract_spec = get_contract_spec(wasm_code.as_str())
711 .map_err(|e| anyhow::anyhow!("Failed to get contract spec: {}", e))?;
712
713 Ok(ContractSpec::Stellar(StellarContractSpec::from(
714 contract_spec,
715 )))
716 }
717}