Skip to content

Commit

Permalink
feat: implement first version of the Fanout Stage (#3)
Browse files Browse the repository at this point in the history
Co-authored-by: Lance Vincent Salera <salera.lancevincent@gmail.com>
Co-authored-by: Santiago Carmuega <santiago@carmuega.me>
  • Loading branch information
3 people authored Jan 24, 2025
1 parent 9d7cbe1 commit d604885
Show file tree
Hide file tree
Showing 14 changed files with 753 additions and 51 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ anyhow = "1.0.95"
async-trait = "0.1.85"
chrono = "0.4.39"
config = { version = "0.15.4", features = ["toml"] }
futures-util = "0.3.31"
gasket = { git = "https://github.com/construkts/gasket-rs.git", features = ["derive"] }
hex = "0.4.3"
itertools = "0.14.0"
pallas = "0.32.0"
rocket = "0.5.1"
serde = { version = "1.0.217", features = ["derive"] }
thiserror = "2.0.11"
sqlx = { version = "0.8.3", features = ["runtime-tokio-rustls", "sqlite", "chrono"] }
tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] }
tonic = { version = "0.12.3", features = ["tls"] }
Expand Down
7 changes: 7 additions & 0 deletions boros.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,10 @@ url = "https://648c73ef8620b8bae7eceea9.mockapi.io/transactions"
[chain.source]
type = "N2N"
peer = "test"

[peer_manager]
peers = [
"preview-node.play.dev.cardano.org:3001",
"adaboy-preview-1c.gleeze.com:5000",
"testicles.kiwipool.org:9720"
]
16 changes: 13 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ async fn main() -> Result<()> {
.with(env_filter)
.init();

let _config = Config::new().expect("invalid config file");
let config = Config::new().expect("invalid config file");

let pipeline = pipeline::run();
let cbor_txs_db = storage::in_memory_db::CborTransactionsDb::new();

let pipeline = pipeline::run(cbor_txs_db.clone(), config);
let server = server::run();

try_join!(pipeline, server)?;
Expand All @@ -33,7 +35,15 @@ async fn main() -> Result<()> {
}

#[derive(Deserialize)]
struct Config {}
struct PeerManagerConfig {
peers: Vec<String>,
}

#[derive(Deserialize)]
struct Config {
peer_manager: PeerManagerConfig,
}

impl Config {
pub fn new() -> Result<Self, Box<dyn Error>> {
let config = config::Config::builder()
Expand Down
38 changes: 0 additions & 38 deletions src/pipeline/fanout.rs

This file was deleted.

232 changes: 232 additions & 0 deletions src/pipeline/fanout/mempool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
use futures_util::StreamExt;
use itertools::Itertools;
use pallas::{
crypto::hash::Hash,
ledger::traverse::{MultiEraBlock, MultiEraTx},
};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use thiserror::Error;
use tokio::sync::broadcast;
use tracing::debug;

type TxHash = Hash<32>;

#[derive(Debug, Error)]
pub enum MempoolError {
#[error("traverse error: {0}")]
TraverseError(#[from] pallas::ledger::traverse::Error),

#[error("decode error: {0}")]
DecodeError(#[from] pallas::codec::minicbor::decode::Error),

#[error("plutus not supported")]
PlutusNotSupported,

#[error("invalid tx: {0}")]
InvalidTx(String),
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Tx {
pub hash: TxHash,
pub era: u16,
pub bytes: Vec<u8>,
// TODO: we'll improve this to track number of confirmations in further iterations.
pub confirmed: bool,
}

#[derive(Clone)]
pub enum TxStage {
Pending,
Inflight,
Acknowledged,
Confirmed,
Unknown,
}

#[derive(Clone)]
pub struct Event {
pub new_stage: TxStage,
pub tx: Tx,
}

#[derive(Default)]
struct MempoolState {
pending: Vec<Tx>,
inflight: Vec<Tx>,
acknowledged: HashMap<TxHash, Tx>,
}

/// A very basic, FIFO, single consumer mempool
#[derive(Clone)]
pub struct Mempool {
mempool: Arc<RwLock<MempoolState>>,
updates: broadcast::Sender<Event>,
}

impl Mempool {
pub fn new() -> Self {
let mempool = Arc::new(RwLock::new(MempoolState::default()));
let (updates, _) = broadcast::channel(16);

Self { mempool, updates }
}

pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.updates.subscribe()
}

pub fn notify(&self, new_stage: TxStage, tx: Tx) {
if self.updates.send(Event { new_stage, tx }).is_err() {
debug!("no mempool update receivers");
}
}

fn receive(&self, tx: Tx) {
let mut state = self.mempool.write().unwrap();

state.pending.push(tx.clone());
self.notify(TxStage::Pending, tx);

debug!(
pending = state.pending.len(),
inflight = state.inflight.len(),
acknowledged = state.acknowledged.len(),
"mempool state changed"
);
}

pub fn receive_raw(&self, cbor: &[u8]) -> Result<TxHash, MempoolError> {
let tx = MultiEraTx::decode(cbor)?;

let hash = tx.hash();

let tx = Tx {
hash,
// TODO: this is a hack to make the era compatible with the ledger
era: u16::from(tx.era()) - 1,
bytes: cbor.into(),
confirmed: false,
};

self.receive(tx);

Ok(hash)
}

pub fn request(&self, desired: usize) -> Vec<Tx> {
let available = self.pending_total();
self.request_exact(std::cmp::min(desired, available))
}

pub fn request_exact(&self, count: usize) -> Vec<Tx> {
let mut state = self.mempool.write().unwrap();

let selected = state.pending.drain(..count).collect_vec();

for tx in selected.iter() {
state.inflight.push(tx.clone());
self.notify(TxStage::Inflight, tx.clone());
}

debug!(
pending = state.pending.len(),
inflight = state.inflight.len(),
acknowledged = state.acknowledged.len(),
"mempool state changed"
);

selected
}

pub fn acknowledge(&self, count: usize) {
debug!(n = count, "acknowledging txs");

let mut state = self.mempool.write().unwrap();

let selected = state.inflight.drain(..count).collect_vec();

for tx in selected {
state.acknowledged.insert(tx.hash, tx.clone());
self.notify(TxStage::Acknowledged, tx.clone());
}

debug!(
pending = state.pending.len(),
inflight = state.inflight.len(),
acknowledged = state.acknowledged.len(),
"mempool state changed"
);
}

pub fn find_inflight(&self, tx_hash: &TxHash) -> Option<Tx> {
let state = self.mempool.read().unwrap();
state.inflight.iter().find(|x| x.hash.eq(tx_hash)).cloned()
}

pub fn find_pending(&self, tx_hash: &TxHash) -> Option<Tx> {
let state = self.mempool.read().unwrap();
state.pending.iter().find(|x| x.hash.eq(tx_hash)).cloned()
}

pub fn pending_total(&self) -> usize {
let state = self.mempool.read().unwrap();
state.pending.len()
}

pub fn check_stage(&self, tx_hash: &TxHash) -> TxStage {
let state = self.mempool.read().unwrap();

if let Some(tx) = state.acknowledged.get(tx_hash) {
if tx.confirmed {
TxStage::Confirmed
} else {
TxStage::Acknowledged
}
} else if self.find_inflight(tx_hash).is_some() {
TxStage::Inflight
} else if self.find_pending(tx_hash).is_some() {
TxStage::Pending
} else {
TxStage::Unknown
}
}

pub fn apply_block(&self, block: &MultiEraBlock) {
let mut state = self.mempool.write().unwrap();

if state.acknowledged.is_empty() {
return;
}

for tx in block.txs() {
let tx_hash = tx.hash();

if let Some(acknowledged_tx) = state.acknowledged.get_mut(&tx_hash) {
acknowledged_tx.confirmed = true;
self.notify(TxStage::Confirmed, acknowledged_tx.clone());
debug!(%tx_hash, "confirming tx");
}
}
}

pub fn undo_block(&self, block: &MultiEraBlock) {
let mut state = self.mempool.write().unwrap();

if state.acknowledged.is_empty() {
return;
}

for tx in block.txs() {
let tx_hash = tx.hash();

if let Some(acknowledged_tx) = state.acknowledged.get_mut(&tx_hash) {
acknowledged_tx.confirmed = false;
debug!(%tx_hash, "un-confirming tx");
}
}
}
}
37 changes: 37 additions & 0 deletions src/pipeline/fanout/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
pub mod stage;
pub mod mempool;
pub mod tx_submit_peer;
pub mod tx_submit_peer_manager;

pub use stage::Stage;


// Test for Fanout Stage
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::in_memory_db::CborTransactionsDb;
use crate::{Config, PeerManagerConfig};
use std::sync::{Arc, Mutex};
use std::vec;

#[test]
fn test_fanout_stage() {
let cbor_txs_db = CborTransactionsDb {
cbor_txs: Arc::new(Mutex::new(vec![
vec![1, 2, 3]
])),
};

let config = Config {
peer_manager: PeerManagerConfig {
peers: vec!["".to_string()],
},
};

// Run mock node

// Run Fanout Stage
let fanout = Stage::new(cbor_txs_db, config);
}
}
Loading

0 comments on commit d604885

Please sign in to comment.