From d604885664947ae54483a247ef3a71507722c6ba Mon Sep 17 00:00:00 2001 From: Clark Alesna Date: Fri, 24 Jan 2025 23:44:26 +0800 Subject: [PATCH] feat: implement first version of the Fanout Stage (#3) Co-authored-by: Lance Vincent Salera Co-authored-by: Santiago Carmuega --- Cargo.toml | 5 + boros.toml | 7 + src/main.rs | 16 +- src/pipeline/fanout.rs | 38 --- src/pipeline/fanout/mempool.rs | 232 ++++++++++++++ src/pipeline/fanout/mod.rs | 37 +++ src/pipeline/fanout/stage.rs | 80 +++++ src/pipeline/fanout/tx_submit_peer.rs | 294 ++++++++++++++++++ src/pipeline/fanout/tx_submit_peer_manager.rs | 38 +++ src/pipeline/ingest.rs | 6 +- src/pipeline/mod.rs | 18 +- src/server/mod.rs | 3 +- src/storage/in_memory_db.rs | 29 ++ src/storage/mod.rs | 1 + 14 files changed, 753 insertions(+), 51 deletions(-) delete mode 100644 src/pipeline/fanout.rs create mode 100644 src/pipeline/fanout/mempool.rs create mode 100644 src/pipeline/fanout/mod.rs create mode 100644 src/pipeline/fanout/stage.rs create mode 100644 src/pipeline/fanout/tx_submit_peer.rs create mode 100644 src/pipeline/fanout/tx_submit_peer_manager.rs create mode 100644 src/storage/in_memory_db.rs diff --git a/Cargo.toml b/Cargo.toml index e5dc723..ac6ffff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,9 +16,14 @@ anyhow = "1.0.95" async-trait = "0.1.85" chrono = "0.4.39" config = { version = "0.15.4", features = ["toml"] } +futures-util = "0.3.31" gasket = { git = "https://github.com/construkts/gasket-rs.git", features = ["derive"] } +hex = "0.4.3" +itertools = "0.14.0" pallas = "0.32.0" +rocket = "0.5.1" serde = { version = "1.0.217", features = ["derive"] } +thiserror = "2.0.11" sqlx = { version = "0.8.3", features = ["runtime-tokio-rustls", "sqlite", "chrono"] } tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] } tonic = { version = "0.12.3", features = ["tls"] } diff --git a/boros.toml b/boros.toml index 39f3e48..76dec7c 100644 --- a/boros.toml +++ b/boros.toml @@ -9,3 +9,10 @@ url = "https://648c73ef8620b8bae7eceea9.mockapi.io/transactions" [chain.source] type = "N2N" peer = "test" + +[peer_manager] +peers = [ + "preview-node.play.dev.cardano.org:3001", + "adaboy-preview-1c.gleeze.com:5000", + "testicles.kiwipool.org:9720" +] \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 0d60756..fabc51f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,9 +22,11 @@ async fn main() -> Result<()> { .with(env_filter) .init(); - let _config = Config::new().expect("invalid config file"); + let config = Config::new().expect("invalid config file"); - let pipeline = pipeline::run(); + let cbor_txs_db = storage::in_memory_db::CborTransactionsDb::new(); + + let pipeline = pipeline::run(cbor_txs_db.clone(), config); let server = server::run(); try_join!(pipeline, server)?; @@ -33,7 +35,15 @@ async fn main() -> Result<()> { } #[derive(Deserialize)] -struct Config {} +struct PeerManagerConfig { + peers: Vec, +} + +#[derive(Deserialize)] +struct Config { + peer_manager: PeerManagerConfig, +} + impl Config { pub fn new() -> Result> { let config = config::Config::builder() diff --git a/src/pipeline/fanout.rs b/src/pipeline/fanout.rs deleted file mode 100644 index 6dbadd2..0000000 --- a/src/pipeline/fanout.rs +++ /dev/null @@ -1,38 +0,0 @@ -use std::time::Duration; - -use gasket::framework::*; -use tokio::time::sleep; -use tracing::info; - -use super::Transaction; - -#[derive(Stage)] -#[stage(name = "fanout", unit = "Transaction", worker = "Worker")] -pub struct Stage {} - -pub struct Worker; - -#[async_trait::async_trait(?Send)] -impl gasket::framework::Worker for Worker { - async fn bootstrap(_stage: &Stage) -> Result { - Ok(Self) - } - - async fn schedule( - &mut self, - _stage: &mut Stage, - ) -> Result, WorkerError> { - // TODO: fetch data from db - sleep(Duration::from_secs(30)).await; - Ok(WorkSchedule::Unit(Transaction {})) - } - - async fn execute( - &mut self, - _unit: &Transaction, - _stage: &mut Stage, - ) -> Result<(), WorkerError> { - info!("fanout stage"); - Ok(()) - } -} diff --git a/src/pipeline/fanout/mempool.rs b/src/pipeline/fanout/mempool.rs new file mode 100644 index 0000000..9417c80 --- /dev/null +++ b/src/pipeline/fanout/mempool.rs @@ -0,0 +1,232 @@ +use futures_util::StreamExt; +use itertools::Itertools; +use pallas::{ + crypto::hash::Hash, + ledger::traverse::{MultiEraBlock, MultiEraTx}, +}; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; +use thiserror::Error; +use tokio::sync::broadcast; +use tracing::debug; + +type TxHash = Hash<32>; + +#[derive(Debug, Error)] +pub enum MempoolError { + #[error("traverse error: {0}")] + TraverseError(#[from] pallas::ledger::traverse::Error), + + #[error("decode error: {0}")] + DecodeError(#[from] pallas::codec::minicbor::decode::Error), + + #[error("plutus not supported")] + PlutusNotSupported, + + #[error("invalid tx: {0}")] + InvalidTx(String), +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct Tx { + pub hash: TxHash, + pub era: u16, + pub bytes: Vec, + // TODO: we'll improve this to track number of confirmations in further iterations. + pub confirmed: bool, +} + +#[derive(Clone)] +pub enum TxStage { + Pending, + Inflight, + Acknowledged, + Confirmed, + Unknown, +} + +#[derive(Clone)] +pub struct Event { + pub new_stage: TxStage, + pub tx: Tx, +} + +#[derive(Default)] +struct MempoolState { + pending: Vec, + inflight: Vec, + acknowledged: HashMap, +} + +/// A very basic, FIFO, single consumer mempool +#[derive(Clone)] +pub struct Mempool { + mempool: Arc>, + updates: broadcast::Sender, +} + +impl Mempool { + pub fn new() -> Self { + let mempool = Arc::new(RwLock::new(MempoolState::default())); + let (updates, _) = broadcast::channel(16); + + Self { mempool, updates } + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.updates.subscribe() + } + + pub fn notify(&self, new_stage: TxStage, tx: Tx) { + if self.updates.send(Event { new_stage, tx }).is_err() { + debug!("no mempool update receivers"); + } + } + + fn receive(&self, tx: Tx) { + let mut state = self.mempool.write().unwrap(); + + state.pending.push(tx.clone()); + self.notify(TxStage::Pending, tx); + + debug!( + pending = state.pending.len(), + inflight = state.inflight.len(), + acknowledged = state.acknowledged.len(), + "mempool state changed" + ); + } + + pub fn receive_raw(&self, cbor: &[u8]) -> Result { + let tx = MultiEraTx::decode(cbor)?; + + let hash = tx.hash(); + + let tx = Tx { + hash, + // TODO: this is a hack to make the era compatible with the ledger + era: u16::from(tx.era()) - 1, + bytes: cbor.into(), + confirmed: false, + }; + + self.receive(tx); + + Ok(hash) + } + + pub fn request(&self, desired: usize) -> Vec { + let available = self.pending_total(); + self.request_exact(std::cmp::min(desired, available)) + } + + pub fn request_exact(&self, count: usize) -> Vec { + let mut state = self.mempool.write().unwrap(); + + let selected = state.pending.drain(..count).collect_vec(); + + for tx in selected.iter() { + state.inflight.push(tx.clone()); + self.notify(TxStage::Inflight, tx.clone()); + } + + debug!( + pending = state.pending.len(), + inflight = state.inflight.len(), + acknowledged = state.acknowledged.len(), + "mempool state changed" + ); + + selected + } + + pub fn acknowledge(&self, count: usize) { + debug!(n = count, "acknowledging txs"); + + let mut state = self.mempool.write().unwrap(); + + let selected = state.inflight.drain(..count).collect_vec(); + + for tx in selected { + state.acknowledged.insert(tx.hash, tx.clone()); + self.notify(TxStage::Acknowledged, tx.clone()); + } + + debug!( + pending = state.pending.len(), + inflight = state.inflight.len(), + acknowledged = state.acknowledged.len(), + "mempool state changed" + ); + } + + pub fn find_inflight(&self, tx_hash: &TxHash) -> Option { + let state = self.mempool.read().unwrap(); + state.inflight.iter().find(|x| x.hash.eq(tx_hash)).cloned() + } + + pub fn find_pending(&self, tx_hash: &TxHash) -> Option { + let state = self.mempool.read().unwrap(); + state.pending.iter().find(|x| x.hash.eq(tx_hash)).cloned() + } + + pub fn pending_total(&self) -> usize { + let state = self.mempool.read().unwrap(); + state.pending.len() + } + + pub fn check_stage(&self, tx_hash: &TxHash) -> TxStage { + let state = self.mempool.read().unwrap(); + + if let Some(tx) = state.acknowledged.get(tx_hash) { + if tx.confirmed { + TxStage::Confirmed + } else { + TxStage::Acknowledged + } + } else if self.find_inflight(tx_hash).is_some() { + TxStage::Inflight + } else if self.find_pending(tx_hash).is_some() { + TxStage::Pending + } else { + TxStage::Unknown + } + } + + pub fn apply_block(&self, block: &MultiEraBlock) { + let mut state = self.mempool.write().unwrap(); + + if state.acknowledged.is_empty() { + return; + } + + for tx in block.txs() { + let tx_hash = tx.hash(); + + if let Some(acknowledged_tx) = state.acknowledged.get_mut(&tx_hash) { + acknowledged_tx.confirmed = true; + self.notify(TxStage::Confirmed, acknowledged_tx.clone()); + debug!(%tx_hash, "confirming tx"); + } + } + } + + pub fn undo_block(&self, block: &MultiEraBlock) { + let mut state = self.mempool.write().unwrap(); + + if state.acknowledged.is_empty() { + return; + } + + for tx in block.txs() { + let tx_hash = tx.hash(); + + if let Some(acknowledged_tx) = state.acknowledged.get_mut(&tx_hash) { + acknowledged_tx.confirmed = false; + debug!(%tx_hash, "un-confirming tx"); + } + } + } +} diff --git a/src/pipeline/fanout/mod.rs b/src/pipeline/fanout/mod.rs new file mode 100644 index 0000000..f97eae9 --- /dev/null +++ b/src/pipeline/fanout/mod.rs @@ -0,0 +1,37 @@ +pub mod stage; +pub mod mempool; +pub mod tx_submit_peer; +pub mod tx_submit_peer_manager; + +pub use stage::Stage; + + +// Test for Fanout Stage +#[cfg(test)] +mod tests { + use super::*; + use crate::storage::in_memory_db::CborTransactionsDb; + use crate::{Config, PeerManagerConfig}; + use std::sync::{Arc, Mutex}; + use std::vec; + + #[test] + fn test_fanout_stage() { + let cbor_txs_db = CborTransactionsDb { + cbor_txs: Arc::new(Mutex::new(vec![ + vec![1, 2, 3] + ])), + }; + + let config = Config { + peer_manager: PeerManagerConfig { + peers: vec!["".to_string()], + }, + }; + + // Run mock node + + // Run Fanout Stage + let fanout = Stage::new(cbor_txs_db, config); + } +} \ No newline at end of file diff --git a/src/pipeline/fanout/stage.rs b/src/pipeline/fanout/stage.rs new file mode 100644 index 0000000..34775d3 --- /dev/null +++ b/src/pipeline/fanout/stage.rs @@ -0,0 +1,80 @@ +use std::time::Duration; + +use gasket::framework::*; +use tokio::time::sleep; +use tracing::info; + +use crate::{pipeline::Transaction, storage::in_memory_db::CborTransactionsDb, Config}; + +use super::tx_submit_peer_manager::TxSubmitPeerManager; + +#[derive(Stage)] +#[stage(name = "fanout", unit = "Transaction", worker = "Worker")] +pub struct Stage { + pub cbor_txs_db: CborTransactionsDb, + pub config: Config, +} + +impl Stage { + pub fn new(cbor_txs_db: CborTransactionsDb, config: Config) -> Self { + Self { + cbor_txs_db, + config, + } + } +} + +pub struct Worker { + tx_submit_peer_manager: TxSubmitPeerManager, +} + +#[async_trait::async_trait(?Send)] +impl gasket::framework::Worker for Worker { + async fn bootstrap(_stage: &Stage) -> Result { + // Load configuration and Start Clients + let peer_addresses = _stage.config.peer_manager.peers.clone(); + + info!("Peer Addresses: {:?}", peer_addresses); + + // Proof of Concept: TxSubmitPeerManager + // Pass Config Network Magic and Peer Addresses + let mut tx_submit_peer_manager = TxSubmitPeerManager::new(2, peer_addresses); + tx_submit_peer_manager.init().await.unwrap(); + + Ok(Self { + tx_submit_peer_manager, + }) + } + + async fn schedule( + &mut self, + stage: &mut Stage, + ) -> Result, WorkerError> { + info!("Cbor Transactions Length: {}", stage.cbor_txs_db.cbor_txs.lock().unwrap().len()); + + if let Some(tx_cbor) = stage.cbor_txs_db.pop_tx() { + return Ok(WorkSchedule::Unit(Transaction { + cbor: tx_cbor + })); + } else { + // TODO: should we really have a sleep here? + sleep(Duration::from_secs(30)).await; + return Ok(WorkSchedule::Idle); + } + } + + async fn execute( + &mut self, + unit: &Transaction, + _stage: &mut Stage, + ) -> Result<(), WorkerError> { + info!("fanout stage"); + + // extract cbor from unit and pass it to tx_submit_peer_manager + // comment out for now until we have a proper tx to submit + let tx_cbor = unit.cbor.clone(); + self.tx_submit_peer_manager.add_tx(tx_cbor).await; + + Ok(()) + } +} diff --git a/src/pipeline/fanout/tx_submit_peer.rs b/src/pipeline/fanout/tx_submit_peer.rs new file mode 100644 index 0000000..8ae2500 --- /dev/null +++ b/src/pipeline/fanout/tx_submit_peer.rs @@ -0,0 +1,294 @@ +use std::fmt::Error; +use std::sync::Arc; +use std::time::Duration; + +use itertools::Itertools; +use pallas::crypto::hash::Hash; +use pallas::network::miniprotocols::txsubmission::{EraTxBody, EraTxId, Request}; +use pallas::network::{facades::PeerClient, miniprotocols::txsubmission::TxIdAndSize}; +use tokio::sync::{Mutex, RwLock}; +use tokio::task; +use tracing::{error, info, warn}; + +use super::mempool::{self, Mempool}; + +/// A TxSubmitPeer has: +/// - Its own mempool +/// - An optional PeerClient connection +/// - The peer address and network magic +pub struct TxSubmitPeer { + mempool: Arc>, + client: Arc>>, + peer_addr: String, + network_magic: u64, + unfulfilled_request: Arc>>, +} + +impl TxSubmitPeer { + /// Lightweight constructor: just set fields and create a new mempool. + /// No I/O occurs here. + pub fn new(peer_addr: &str, network_magic: u64) -> Self { + TxSubmitPeer { + mempool: Arc::new(Mutex::new(Mempool::new())), + client: Arc::new(Mutex::new(None)), + peer_addr: peer_addr.to_string(), + network_magic, + unfulfilled_request: Arc::new(RwLock::new(None)), + } + } + + /// Initialize the peer connection (async). + /// 1) Connect to the Cardano node at `peer_addr`. + /// 2) Send the `txsubmission` init message. + /// 3) Spawn a background task to handle requests. + /// 4) Return immediately (non-blocking). + pub async fn init(&mut self) -> Result<(), Error> { + // 1) Connect to the node + let mut client = PeerClient::connect(&self.peer_addr, self.network_magic) + .await + .map_err(|e| { + error!(error=?e, peer=%self.peer_addr, "Failed to connect to peer"); + e + }).unwrap(); + + // 2) Initialize the txsubmission mini-protocol + client.txsubmission().send_init().await.map_err(|e| { + error!(error=?e, peer=%self.peer_addr, "Failed to send init message"); + e + }).unwrap(); + + // 3) Store it so we can spawn our background loop + self.client = Arc::new(Mutex::new(Some(client))); + + // 4) Spawn the loop in another task + self.start_background_task(); + + Ok(()) + } + + /// Spawns a background async loop that continuously + /// waits for new requests from the connected node. + fn start_background_task(&mut self) { + let client_arc = Arc::clone(&self.client); + let mempool_arc = Arc::clone(&self.mempool); + let unfulfilled_request_arc = Arc::clone(&self.unfulfilled_request); + let peer_addr = self.peer_addr.clone(); + + task::spawn(async move { + loop { + // Separate function to handle leftover unfulfilled requests + async fn process_unfulfilled( + mempool: &Mempool, + client: &mut PeerClient, + unfulfilled_request_arc: Arc>>, + request: usize, + peer_addr: &str, + ) -> Option { + let available = mempool.pending_total(); + + if available > 0 { + info!(peer=%peer_addr, request, available, "Found enough TXs to fulfill request"); + reply_txs(mempool, client, unfulfilled_request_arc, 0, request, peer_addr) + .await + .ok(); + None + } else { + info!(peer=%peer_addr, request, available, "Not enough TXs yet; will retry"); + tokio::time::sleep(Duration::from_secs(10)).await; + Some(request) + } + } + + // Separate function to reply with TXs + async fn reply_txs( + mempool: &Mempool, + client: &mut PeerClient, + unfulfilled_request_arc: Arc>>, + ack: usize, + req: usize, + peer_addr: &str, + ) -> Result<(), Error> { + mempool.acknowledge(ack); + + let available = mempool.pending_total(); + if available > 0 { + let txs = mempool.request(req); + propagate_txs(client, txs, peer_addr).await?; + let mut unfulfilled = unfulfilled_request_arc.write().await; + *unfulfilled = None; + } else { + info!(peer=%peer_addr, req, available, "Still not enough TXs; storing unfulfilled request"); + let mut unfulfilled = unfulfilled_request_arc.write().await; + *unfulfilled = Some(req); + } + + Ok(()) + } + + // Check if there's an unfulfilled request + info!(peer=%peer_addr, "Checking for unfulfilled request..."); + let outstanding_request = { + let read_guard = unfulfilled_request_arc.read().await; + *read_guard + }; + + match outstanding_request { + Some(request) => { + // Handle leftover request + let mempool_guard = mempool_arc.lock().await; + let mut client_guard = client_arc.lock().await; + let client_ref = match client_guard.as_mut() { + Some(c) => c, + None => { + warn!(peer=%peer_addr, "No client available; breaking"); + break; + } + }; + + process_unfulfilled( + &mempool_guard, + client_ref, + Arc::clone(&unfulfilled_request_arc), + request, + &peer_addr, + ) + .await; + } + None => { + info!(peer=%peer_addr, "Waiting for next request..."); + // We lock only to get the next_request + let next_req = { + let mut client_guard = client_arc.lock().await; + let client_ref = match client_guard.as_mut() { + Some(c) => c, + None => { + warn!(peer=%peer_addr, "No client available; breaking"); + break; + } + }; + client_ref.txsubmission().next_request().await + }; + + let request = match next_req { + Ok(r) => { + info!(peer=%peer_addr, "Received request from node"); + r + } + Err(e) => { + error!(peer=%peer_addr, error=?e, "Error reading request; breaking loop"); + break; + } + }; + + // Handle the received request + match request { + Request::TxIds(ack, req) => { + info!(peer=%peer_addr, ack, req, "Blocking TxIds request"); + let ack_usize = ack as usize; + let req_usize = req as usize; + + let mempool_guard = mempool_arc.lock().await; + let mut client_guard = client_arc.lock().await; + let client_ref = match client_guard.as_mut() { + Some(c) => c, + None => { + warn!(peer=%peer_addr, "No client available; breaking"); + break; + } + }; + + reply_txs( + &mempool_guard, + client_ref, + Arc::clone(&unfulfilled_request_arc), + ack_usize, + req_usize, + &peer_addr, + ) + .await + .ok(); + } + Request::TxIdsNonBlocking(ack, req) => { + info!(peer=%peer_addr, ack, req, "Non-blocking TxIds request"); + let mempool_guard = mempool_arc.lock().await; + mempool_guard.acknowledge(ack as usize); + + let txs = mempool_guard.request(req as usize); + drop(mempool_guard); // drop before I/O + + let mut client_guard = client_arc.lock().await; + let client_ref = match client_guard.as_mut() { + Some(c) => c, + None => { + warn!(peer=%peer_addr, "No client available; breaking"); + break; + } + }; + propagate_txs(client_ref, txs, &peer_addr).await.ok(); + } + Request::Txs(ids) => { + // Collect the Ids hash and log them + let ids: Vec<_> = ids.iter().map(|x| (x.0, x.1.clone())).collect(); + + info!(peer=%peer_addr, ids=%ids.iter().map(|x| hex::encode(&x.1)).join(", "), "Tx batch request"); + + let to_send = { + let mempool_guard = mempool_arc.lock().await; + ids.iter() + .filter_map(|x| { + mempool_guard.find_inflight(&Hash::from(x.1.as_slice())) + }) + .map(|x| EraTxBody(x.era, x.bytes.clone())) + .collect_vec() + }; + + // Log the number of TXs we're sending + info!(peer=%peer_addr, count=to_send.len(), "Sending TXs upstream"); + + let mut client_guard = client_arc.lock().await; + let client_ref = match client_guard.as_mut() { + Some(c) => c, + None => { + warn!(peer=%peer_addr, "No client available; breaking"); + break; + } + }; + + if let Err(err) = client_ref.txsubmission().reply_txs(to_send).await { + error!(peer=%peer_addr, error=?err, "Error sending TXs upstream"); + } + } + } + } + } + } + }); + } + + /// Add a new transaction to the mempool. + pub async fn add_tx(&self, tx: Vec) { + let mempool = self.mempool.lock().await; + mempool.receive_raw(&tx).unwrap(); + } +} + +/// Propagate TX IDs to the node. +async fn propagate_txs( + client: &mut PeerClient, + txs: Vec, + peer_addr: &str, +) -> Result<(), Error> { + info!(peer=%peer_addr, count=txs.len(), "Propagating TX IDs"); + + let payload = txs + .iter() + .map(|x| TxIdAndSize(EraTxId(x.era, x.hash.to_vec()), x.bytes.len() as u32)) + .collect_vec(); + + client.txsubmission().reply_tx_ids(payload).await.map_err(|e| { + error!(peer=%peer_addr, error=?e, "Failed to reply with TX IDs"); + e + }).unwrap(); + + Ok(()) +} diff --git a/src/pipeline/fanout/tx_submit_peer_manager.rs b/src/pipeline/fanout/tx_submit_peer_manager.rs new file mode 100644 index 0000000..7e673b7 --- /dev/null +++ b/src/pipeline/fanout/tx_submit_peer_manager.rs @@ -0,0 +1,38 @@ +use std::collections::HashMap; +use std::fmt::Error; + +use super::tx_submit_peer::TxSubmitPeer; + +pub struct TxSubmitPeerManager { + network_magic: u64, + peers: HashMap>, +} + +impl TxSubmitPeerManager { + pub fn new(network_magic: u64, peer_addresses: Vec) -> Self { + TxSubmitPeerManager { + network_magic, + peers: peer_addresses + .into_iter() + .map(|peer_addr| (peer_addr, None)) + .collect(), + } + } + + pub async fn init(&mut self) -> Result<(), Error> { + for (peer_addr, peer) in self.peers.iter_mut() { + let mut txsubmitpeer = TxSubmitPeer::new(peer_addr, self.network_magic); + txsubmitpeer.init().await.unwrap(); + *peer = Some(txsubmitpeer); + } + Ok(()) + } + + pub async fn add_tx(&self, tx: Vec) { + for (_, peer) in self.peers.iter() { + if let Some(peer) = peer { + peer.add_tx(tx.clone()).await; + } + } + } +} diff --git a/src/pipeline/ingest.rs b/src/pipeline/ingest.rs index 7e56a8b..b54e42a 100644 --- a/src/pipeline/ingest.rs +++ b/src/pipeline/ingest.rs @@ -24,7 +24,9 @@ impl gasket::framework::Worker for Worker { ) -> Result, WorkerError> { // TODO: fetch data from db sleep(Duration::from_secs(30)).await; - Ok(WorkSchedule::Unit(Transaction {})) + Ok(WorkSchedule::Unit(Transaction { + cbor: vec![0, 1, 2, 3], + })) } async fn execute( @@ -33,7 +35,7 @@ impl gasket::framework::Worker for Worker { _stage: &mut Stage, ) -> Result<(), WorkerError> { info!("ingest"); - + Ok(()) } } diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 28eb3ef..7d6204b 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -1,16 +1,20 @@ use anyhow::Result; -mod fanout; -mod ingest; -mod monitor; +use crate::{storage::in_memory_db::CborTransactionsDb, Config}; + +pub mod fanout; +pub mod ingest; +pub mod monitor; #[derive(Debug)] -pub struct Transaction {} +pub struct Transaction { + pub cbor: Vec, +} -pub async fn run() -> Result<()> { +pub async fn run(cbor_txs_db: CborTransactionsDb, config: Config) -> Result<()> { tokio::spawn(async { let ingest = ingest::Stage {}; - let fanout = fanout::Stage {}; + let fanout = fanout::Stage::new(cbor_txs_db, config); let monitor = monitor::Stage {}; let policy: gasket::runtime::Policy = Default::default(); @@ -25,4 +29,4 @@ pub async fn run() -> Result<()> { .await?; Ok(()) -} +} \ No newline at end of file diff --git a/src/server/mod.rs b/src/server/mod.rs index d688e4a..4b3fe03 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,7 +1,8 @@ -use anyhow::Result; +use anyhow::{Ok, Result}; pub mod utxorpc; + pub async fn run() -> Result<()> { Ok(()) } diff --git a/src/storage/in_memory_db.rs b/src/storage/in_memory_db.rs new file mode 100644 index 0000000..7ff6d99 --- /dev/null +++ b/src/storage/in_memory_db.rs @@ -0,0 +1,29 @@ +use std::sync::{Arc, Mutex}; + +#[derive(Clone)] +pub struct CborTransactionsDb { + pub cbor_txs: Arc>>>, +} + +impl CborTransactionsDb { + pub fn new() -> Self { + Self { + cbor_txs: Arc::new(Mutex::new(vec![])), + } + } + + pub fn push_tx(&self, tx: Vec) { + let mut txs = self.cbor_txs.lock().unwrap(); + txs.push(tx); + } + + pub fn pop_tx(&self) -> Option> { + let mut txs = self.cbor_txs.lock().unwrap(); + + if txs.len() == 0 { + None + } else { + Some(txs.remove(0)) + } + } +} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 64a3f0e..1425775 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,6 +1,7 @@ use chrono::{DateTime, Utc}; pub mod sqlite; +pub mod in_memory_db; pub enum TransactionPriority { LOW,