Skip to content

Commit

Permalink
feat: implement tx retry (#13)
Browse files Browse the repository at this point in the history
* feat: migration for cursor table

* feat: implemented cursor storage

* feat: adjusted cursor storage

* feat: implemented monitor cursor

* feat: implemented retry tx mechanism

* docs: updated config example

* docs: updated config example
  • Loading branch information
paulobressan authored Feb 5, 2025
1 parent 5f7a7dd commit 1c61145
Show file tree
Hide file tree
Showing 19 changed files with 622 additions and 399 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion examples/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
payment.*
cardano-cli
cardano-cli

tx.raw
tx.signed
1 change: 0 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,3 @@ Executing
```
./tx-gen
```

7 changes: 6 additions & 1 deletion examples/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ peers = [
]

[monitor]
# old tx inflight will be retried when reach a value bigger than retry_slot_diff
retry_slot_diff = 1000

[u5c]
uri = "https://mainnet.utxorpc-v0.demeter.run"

[monitor.metadata]
# metadata is optional
[u5c.metadata]
"dmtr-api-key" = "your key"
2 changes: 1 addition & 1 deletion src/ledger/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub mod relay;
pub mod utxo;
pub mod u5c;
34 changes: 15 additions & 19 deletions src/ledger/relay/mod.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,31 @@
use async_trait::async_trait;

/// A simple trait for returning a list of relay addresses in the format "hostname:port".
#[async_trait]
#[allow(dead_code)]
#[async_trait::async_trait]
pub trait RelayDataProvider {
async fn get_relays(&self) -> Vec<String>;
}

/// A mock provider that returns a pre-defined list of string addresses, e.g., ["relay1:3001", "relay2:3002"].
pub struct MockRelayDataProvider {
pub mock_relays: Vec<String>,
}

#[async_trait]
impl RelayDataProvider for MockRelayDataProvider {
async fn get_relays(&self) -> Vec<String> {
self.mock_relays.clone()
}
}

#[cfg(test)]
mod tests {
use super::*;
use tokio::test;

/// A mock provider that returns a pre-defined list of string addresses, e.g., ["relay1:3001", "relay2:3002"].
pub struct MockRelayDataProvider {
pub mock_relays: Vec<String>,
}

#[async_trait::async_trait]
impl RelayDataProvider for MockRelayDataProvider {
async fn get_relays(&self) -> Vec<String> {
self.mock_relays.clone()
}
}

#[test]
async fn it_returns_mock_relays() {
let provider = MockRelayDataProvider {
mock_relays: vec![
"relay1:3001".to_string(),
"relay2:3002".to_string(),
],
mock_relays: vec!["relay1:3001".to_string(), "relay2:3002".to_string()],
};

let relays = provider.get_relays().await;
Expand Down
230 changes: 230 additions & 0 deletions src/ledger/u5c/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
use std::{collections::HashMap, pin::Pin, str::FromStr};

use anyhow::bail;
use async_stream::stream;
use futures::{Stream, TryStreamExt};
use pallas::interop::utxorpc::spec::{
cardano::Tx,
sync::{
any_chain_block, follow_tip_response, sync_service_client::SyncServiceClient, BlockRef,
FollowTipRequest, ReadTipRequest,
},
};
use serde::Deserialize;
use tonic::{
metadata::{MetadataKey, MetadataValue},
transport::{Channel, ClientTlsConfig, Uri},
Request,
};
use tracing::info;

pub type Point = (u64, Vec<u8>);
pub type ChainSyncStream = Pin<Box<dyn Stream<Item = anyhow::Result<Event>> + Send>>;

#[derive(Debug)]
pub enum Event {
RollForward(Point, Vec<Tx>),
Rollback(Point),
}

#[derive(Deserialize, Clone)]
pub struct Config {
uri: String,
metadata: HashMap<String, String>,
}

// TODO: remove dead_code after the implementation
#[allow(dead_code)]
#[async_trait::async_trait]
pub trait U5cDataAdapter: Send + Sync {
async fn fetch_tip(&self) -> anyhow::Result<Point>;
async fn fetch_utxos(&self, utxo_refs: &[String]) -> anyhow::Result<HashMap<String, Vec<u8>>>;
async fn stream(&self) -> anyhow::Result<ChainSyncStream>;
}

pub struct U5cDataAdapterImpl {
channel: Channel,
metadata: HashMap<String, String>,
cursor: Option<Point>,
}
impl U5cDataAdapterImpl {
pub async fn try_new(config: Config, cursor: Option<Point>) -> anyhow::Result<Self> {
let uri: Uri = config.uri.parse()?;

let channel = Channel::builder(uri)
.tls_config(ClientTlsConfig::new().with_webpki_roots())?
.connect()
.await?;

Ok(Self {
channel,
metadata: config.metadata,
cursor,
})
}

fn interceptor(&self, req: &mut Request<()>) {
self.metadata.clone().into_iter().for_each(|(key, value)| {
req.metadata_mut().insert(
MetadataKey::from_str(&key).unwrap(),
MetadataValue::from_str(&value).unwrap(),
);
});
}
}
#[async_trait::async_trait]
impl U5cDataAdapter for U5cDataAdapterImpl {
async fn fetch_tip(&self) -> anyhow::Result<Point> {
let mut client = SyncServiceClient::with_interceptor(
self.channel.clone(),
move |mut req: Request<()>| {
self.interceptor(&mut req);
Ok(req)
},
);

let response = client
.read_tip(tonic::Request::new(ReadTipRequest {}))
.await?
.into_inner();

let point = match response.tip {
Some(ref block_ref) => (block_ref.index, block_ref.hash.to_vec()),
None => bail!("U5c none tip"),
};

Ok(point)
}
async fn fetch_utxos(&self, _utxo_refs: &[String]) -> anyhow::Result<HashMap<String, Vec<u8>>> {
todo!()
}

async fn stream(&self) -> anyhow::Result<ChainSyncStream> {
info!("U5C connected");

let mut client = SyncServiceClient::with_interceptor(
self.channel.clone(),
move |mut req: Request<()>| {
self.interceptor(&mut req);
Ok(req)
},
);

let follow_tip_request = match &self.cursor {
Some((slot, hash)) => {
info!("U5C starting from slot {}", slot);
FollowTipRequest {
intersect: vec![BlockRef {
index: *slot,
hash: hash.clone().into(),
}],
..Default::default()
}
}
None => {
info!("U5C starting from tip");
FollowTipRequest::default()
}
};

let mut tip_stream = client
.follow_tip(tonic::Request::new(follow_tip_request))
.await?
.into_inner();

let stream = stream! {
while let Some(follow_tip) = tip_stream.try_next().await? {
if let Some(action) = follow_tip.action {
match action {
follow_tip_response::Action::Apply(any) => {
match any.chain.unwrap() {
any_chain_block::Chain::Cardano(block) => {
if let Some(body) = block.body {
let header = block.header.unwrap();
yield Ok(Event::RollForward((header.slot, header.hash.to_vec()), body.tx));
}
},
}
},
follow_tip_response::Action::Undo(any) => {
match any.chain.unwrap() {
any_chain_block::Chain::Cardano(block) => {
let header = block.header.unwrap();
yield Ok(Event::Rollback((header.slot, header.hash.to_vec())));
},
}
},
follow_tip_response::Action::Reset(_block_ref) => {
info!("U5C reset not implemented yet");
},
}
}
}
};

Ok(Box::pin(stream))
}
}

#[cfg(test)]
mod u5c_tests {
use super::*;
use std::collections::HashMap;

pub struct MockU5cDataAdapter {
pub known_utxos: HashMap<String, Vec<u8>>,
}

#[async_trait::async_trait]
impl U5cDataAdapter for MockU5cDataAdapter {
async fn fetch_tip(&self) -> anyhow::Result<Point> {
todo!()
}

async fn fetch_utxos(
&self,
utxo_refs: &[String],
) -> anyhow::Result<HashMap<String, Vec<u8>>> {
let mut result = HashMap::new();

// Check each requested reference; if known, clone the data into the result
for reference in utxo_refs {
if let Some(cbor_data) = self.known_utxos.get(reference) {
result.insert(reference.clone(), cbor_data.clone());
}
}

Ok(result)
}

async fn stream(&self) -> anyhow::Result<ChainSyncStream> {
todo!()
}
}

#[tokio::test]
async fn it_returns_found_utxos_only() {
// Arrange: a mock with two known references
let mut known = HashMap::new();
known.insert("abc123#0".to_string(), vec![0x82, 0xa0]); // example CBOR
known.insert("abc123#1".to_string(), vec![0x83, 0x04]);

let provider = MockU5cDataAdapter { known_utxos: known };

// We'll request three references, one of which doesn't exist
let requested = vec![
"abc123#0".to_string(),
"abc123#1".to_string(),
"missing#2".to_string(),
];

// Act
let results = provider.fetch_utxos(&requested).await.unwrap();

// Assert
assert_eq!(results.len(), 2, "Should only contain two known references");
assert!(results.contains_key("abc123#0"));
assert!(results.contains_key("abc123#1"));
assert!(!results.contains_key("missing#2"));
}
}
Loading

0 comments on commit 1c61145

Please sign in to comment.