From 5ca58208ecdef3835f27346ea1675f417ca0bb70 Mon Sep 17 00:00:00 2001 From: Paulo Bressan Date: Mon, 27 Jan 2025 14:58:10 -0300 Subject: [PATCH] feat: integrate pipeline with storage (#6) --- ...3be43b7aad302f1a42cd462915fd7b0a26105.json | 12 ++ src/main.rs | 7 +- src/pipeline/fanout/mempool.rs | 81 +------------ src/pipeline/fanout/mod.rs | 106 ++++++++++++++++-- src/pipeline/fanout/stage.rs | 81 ------------- src/pipeline/ingest.rs | 47 +++++--- src/pipeline/mod.rs | 11 +- src/storage/in_memory_db.rs | 29 ----- src/storage/mod.rs | 16 ++- src/storage/sqlite.rs | 87 +++++++++++++- 10 files changed, 246 insertions(+), 231 deletions(-) create mode 100644 .sqlx/query-c9c7d7b7a68ae840a5d5127c5cb3be43b7aad302f1a42cd462915fd7b0a26105.json delete mode 100644 src/pipeline/fanout/stage.rs delete mode 100644 src/storage/in_memory_db.rs diff --git a/.sqlx/query-c9c7d7b7a68ae840a5d5127c5cb3be43b7aad302f1a42cd462915fd7b0a26105.json b/.sqlx/query-c9c7d7b7a68ae840a5d5127c5cb3be43b7aad302f1a42cd462915fd7b0a26105.json new file mode 100644 index 0000000..e454d34 --- /dev/null +++ b/.sqlx/query-c9c7d7b7a68ae840a5d5127c5cb3be43b7aad302f1a42cd462915fd7b0a26105.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n UPDATE\n \ttx\n SET\n \traw = $1,\n \tstatus = $2,\n \tupdated_at = $3\n WHERE\n \tid = $4;\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 4 + }, + "nullable": [] + }, + "hash": "c9c7d7b7a68ae840a5d5127c5cb3be43b7aad302f1a42cd462915fd7b0a26105" +} diff --git a/src/main.rs b/src/main.rs index 38eb33b..903f0be 100644 --- a/src/main.rs +++ b/src/main.rs @@ -41,16 +41,11 @@ async fn main() -> Result<()> { Ok(()) } -#[derive(Deserialize, Clone)] -struct PeerManagerConfig { - peers: Vec, -} - #[derive(Deserialize, Clone)] struct Config { server: server::Config, storage: storage::Config, - peer_manager: PeerManagerConfig, + peer_manager: pipeline::fanout::PeerManagerConfig, } impl Config { diff --git a/src/pipeline/fanout/mempool.rs b/src/pipeline/fanout/mempool.rs index 9ffaf51..7a56f8e 100644 --- a/src/pipeline/fanout/mempool.rs +++ b/src/pipeline/fanout/mempool.rs @@ -1,8 +1,5 @@ use itertools::Itertools; -use pallas::{ - crypto::hash::Hash, - ledger::traverse::{MultiEraBlock, MultiEraTx}, -}; +use pallas::{crypto::hash::Hash, ledger::traverse::MultiEraTx}; use std::{ collections::HashMap, sync::{Arc, RwLock}, @@ -20,12 +17,6 @@ pub enum MempoolError { #[error("decode error: {0}")] DecodeError(#[from] pallas::codec::minicbor::decode::Error), - - #[error("plutus not supported")] - PlutusNotSupported, - - #[error("invalid tx: {0}")] - InvalidTx(String), } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -42,14 +33,14 @@ pub enum TxStage { Pending, Inflight, Acknowledged, - Confirmed, - Unknown, } +// TODO: validate clippy unused fields +#[allow(dead_code)] #[derive(Clone)] pub struct Event { - pub new_stage: TxStage, - pub tx: Tx, + new_stage: TxStage, + tx: Tx, } #[derive(Default)] @@ -74,10 +65,6 @@ impl Mempool { Self { mempool, updates } } - pub fn subscribe(&self) -> broadcast::Receiver { - self.updates.subscribe() - } - pub fn notify(&self, new_stage: TxStage, tx: Tx) { if self.updates.send(Event { new_stage, tx }).is_err() { debug!("no mempool update receivers"); @@ -166,66 +153,8 @@ impl Mempool { state.inflight.iter().find(|x| x.hash.eq(tx_hash)).cloned() } - pub fn find_pending(&self, tx_hash: &TxHash) -> Option { - let state = self.mempool.read().unwrap(); - state.pending.iter().find(|x| x.hash.eq(tx_hash)).cloned() - } - pub fn pending_total(&self) -> usize { let state = self.mempool.read().unwrap(); state.pending.len() } - - pub fn check_stage(&self, tx_hash: &TxHash) -> TxStage { - let state = self.mempool.read().unwrap(); - - if let Some(tx) = state.acknowledged.get(tx_hash) { - if tx.confirmed { - TxStage::Confirmed - } else { - TxStage::Acknowledged - } - } else if self.find_inflight(tx_hash).is_some() { - TxStage::Inflight - } else if self.find_pending(tx_hash).is_some() { - TxStage::Pending - } else { - TxStage::Unknown - } - } - - pub fn apply_block(&self, block: &MultiEraBlock) { - let mut state = self.mempool.write().unwrap(); - - if state.acknowledged.is_empty() { - return; - } - - for tx in block.txs() { - let tx_hash = tx.hash(); - - if let Some(acknowledged_tx) = state.acknowledged.get_mut(&tx_hash) { - acknowledged_tx.confirmed = true; - self.notify(TxStage::Confirmed, acknowledged_tx.clone()); - debug!(%tx_hash, "confirming tx"); - } - } - } - - pub fn undo_block(&self, block: &MultiEraBlock) { - let mut state = self.mempool.write().unwrap(); - - if state.acknowledged.is_empty() { - return; - } - - for tx in block.txs() { - let tx_hash = tx.hash(); - - if let Some(acknowledged_tx) = state.acknowledged.get_mut(&tx_hash) { - acknowledged_tx.confirmed = false; - debug!(%tx_hash, "un-confirming tx"); - } - } - } } diff --git a/src/pipeline/fanout/mod.rs b/src/pipeline/fanout/mod.rs index d7fa9aa..5b8ae57 100644 --- a/src/pipeline/fanout/mod.rs +++ b/src/pipeline/fanout/mod.rs @@ -1,12 +1,95 @@ +use std::{sync::Arc, time::Duration}; + +use gasket::framework::*; +use serde::Deserialize; +use tokio::time::sleep; +use tracing::info; +use tx_submit_peer_manager::TxSubmitPeerManager; + +use crate::storage::{sqlite::SqliteTransaction, Transaction, TransactionStatus}; + pub mod mempool; -pub mod stage; pub mod tx_submit_peer; pub mod tx_submit_peer_manager; -pub mod mock_ouroboros_tx_submit_server; -pub use stage::Stage; +#[derive(Stage)] +#[stage(name = "fanout", unit = "Transaction", worker = "Worker")] +pub struct Stage { + storage: Arc, + config: PeerManagerConfig, +} + +impl Stage { + pub fn new(storage: Arc, config: PeerManagerConfig) -> Self { + Self { storage, config } + } +} + +pub struct Worker { + tx_submit_peer_manager: TxSubmitPeerManager, +} + +#[async_trait::async_trait(?Send)] +impl gasket::framework::Worker for Worker { + async fn bootstrap(stage: &Stage) -> Result { + // Load configuration and Start Clients + let peer_addresses = stage.config.peers.clone(); + + info!("Peer Addresses: {:?}", peer_addresses); + + // Proof of Concept: TxSubmitPeerManager + // Pass Config Network Magic and Peer Addresses + let mut tx_submit_peer_manager = TxSubmitPeerManager::new(2, peer_addresses); + tx_submit_peer_manager.init().await.unwrap(); + + Ok(Self { + tx_submit_peer_manager, + }) + } + + async fn schedule( + &mut self, + stage: &mut Stage, + ) -> Result, WorkerError> { + if let Some(tx) = stage + .storage + .next(TransactionStatus::Validated) + .await + .or_retry()? + { + return Ok(WorkSchedule::Unit(tx)); + } + + sleep(Duration::from_secs(1)).await; + Ok(WorkSchedule::Idle) + } + + async fn execute(&mut self, unit: &Transaction, stage: &mut Stage) -> Result<(), WorkerError> { + let mut transaction = unit.clone(); + info!("fanout {}", transaction.id); + + // extract cbor from unit and pass it to tx_submit_peer_manager + // comment out for now until we have a proper tx to submit + self.tx_submit_peer_manager + .add_tx(transaction.raw.clone()) + .await; + + transaction.status = TransactionStatus::InFlight; + stage.storage.update(&transaction).await.or_retry()?; + + Ok(()) + } +} + +#[derive(Deserialize, Clone)] +pub struct PeerManagerConfig { + peers: Vec, +} // Test for Fanout Stage +#[cfg(test)] +pub mod mock_ouroboros_tx_submit_server; + #[cfg(test)] mod tests { use std::{sync::Arc, time::Duration}; @@ -21,7 +104,10 @@ mod tests { async fn test_fanout_stage() { let _ = tracing_subscriber::fmt().with_env_filter("info").try_init(); - let peer_server = Arc::new(MockOuroborosTxSubmitPeerServer::new("0.0.0.0:3001".to_string(), 2)); + let peer_server = Arc::new(MockOuroborosTxSubmitPeerServer::new( + "0.0.0.0:3001".to_string(), + 2, + )); peer_server.clone().init().await; tokio::time::sleep(Duration::from_millis(200)).await; @@ -47,10 +133,10 @@ mod tests { let tx_id = tx.hash(); tracing::info!("Tx Hash: {:?}", tx_id); - + // There is a deadlock here, need to debug tx_submit_peer_client.add_tx(raw_cbor.clone()).await; - + // wait for server to stop loop { tokio::time::sleep(Duration::from_millis(10)).await; @@ -66,7 +152,11 @@ mod tests { let mut found = false; for tx_from_server in server_acknowledge_txs.iter() { - tracing::info!("Tx from server: {:?}, Tx from client: {:?}", tx_from_server, tx_id); + tracing::info!( + "Tx from server: {:?}, Tx from client: {:?}", + tx_from_server, + tx_id + ); if tx_from_server == &tx_id { found = true; break; @@ -74,4 +164,4 @@ mod tests { } assert!(found); } -} \ No newline at end of file +} diff --git a/src/pipeline/fanout/stage.rs b/src/pipeline/fanout/stage.rs deleted file mode 100644 index 969d161..0000000 --- a/src/pipeline/fanout/stage.rs +++ /dev/null @@ -1,81 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use gasket::framework::*; -use tokio::time::sleep; -use tracing::info; - -use crate::{pipeline::Transaction, storage::sqlite::SqliteTransaction, Config}; - -use super::tx_submit_peer_manager::TxSubmitPeerManager; - -#[derive(Stage)] -#[stage(name = "fanout", unit = "Transaction", worker = "Worker")] -pub struct Stage { - pub tx_storage: Arc, - pub config: Config, -} - -impl Stage { - pub fn new(config: Config, tx_storage: Arc) -> Self { - Self { - tx_storage, - config, - } - } -} - -pub struct Worker { - tx_submit_peer_manager: TxSubmitPeerManager, -} - -#[async_trait::async_trait(?Send)] -impl gasket::framework::Worker for Worker { - async fn bootstrap(_stage: &Stage) -> Result { - // Load configuration and Start Clients - let peer_addresses = _stage.config.peer_manager.peers.clone(); - - info!("Peer Addresses: {:?}", peer_addresses); - - // Proof of Concept: TxSubmitPeerManager - // Pass Config Network Magic and Peer Addresses - let mut tx_submit_peer_manager = TxSubmitPeerManager::new(2, peer_addresses); - tx_submit_peer_manager.init().await.unwrap(); - - Ok(Self { - tx_submit_peer_manager, - }) - } - - async fn schedule( - &mut self, - stage: &mut Stage, - ) -> Result, WorkerError> { - // info!("Cbor Transactions Length: {}", stage.tx_storage); - - // if let Some(tx_cbor) = stage.cbor_txs_db.pop_tx() { - // return Ok(WorkSchedule::Unit(Transaction { - // cbor: tx_cbor - // })); - // } else { - // // TODO: should we really have a sleep here? - // sleep(Duration::from_secs(30)).await; - // return Ok(WorkSchedule::Idle); - // } - return Ok(WorkSchedule::Idle); - } - - async fn execute( - &mut self, - unit: &Transaction, - _stage: &mut Stage, - ) -> Result<(), WorkerError> { - info!("fanout stage"); - - // extract cbor from unit and pass it to tx_submit_peer_manager - // comment out for now until we have a proper tx to submit - let tx_cbor = unit.cbor.clone(); - self.tx_submit_peer_manager.add_tx(tx_cbor).await; - - Ok(()) - } -} diff --git a/src/pipeline/ingest.rs b/src/pipeline/ingest.rs index b54e42a..322b26b 100644 --- a/src/pipeline/ingest.rs +++ b/src/pipeline/ingest.rs @@ -1,14 +1,22 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use gasket::framework::*; use tokio::time::sleep; use tracing::info; -use super::Transaction; +use crate::storage::{sqlite::SqliteTransaction, Transaction, TransactionStatus}; #[derive(Stage)] #[stage(name = "ingest", unit = "Transaction", worker = "Worker")] -pub struct Stage {} +pub struct Stage { + storage: Arc, +} + +impl Stage { + pub fn new(storage: Arc) -> Self { + Self { storage } + } +} pub struct Worker; @@ -20,22 +28,29 @@ impl gasket::framework::Worker for Worker { async fn schedule( &mut self, - _stage: &mut Stage, + stage: &mut Stage, ) -> Result, WorkerError> { - // TODO: fetch data from db - sleep(Duration::from_secs(30)).await; - Ok(WorkSchedule::Unit(Transaction { - cbor: vec![0, 1, 2, 3], - })) + if let Some(tx) = stage + .storage + .next(TransactionStatus::Pending) + .await + .or_retry()? + { + return Ok(WorkSchedule::Unit(tx)); + } + + sleep(Duration::from_secs(1)).await; + Ok(WorkSchedule::Idle) } - async fn execute( - &mut self, - _unit: &Transaction, - _stage: &mut Stage, - ) -> Result<(), WorkerError> { - info!("ingest"); - + async fn execute(&mut self, unit: &Transaction, stage: &mut Stage) -> Result<(), WorkerError> { + let mut transaction = unit.clone(); + + info!("ingest {}", transaction.id); + + transaction.status = TransactionStatus::Validated; + stage.storage.update(&transaction).await.or_retry()?; + Ok(()) } } diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 42357a8..9708188 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -8,15 +8,10 @@ pub mod fanout; pub mod ingest; pub mod monitor; -#[derive(Debug)] -pub struct Transaction { - pub cbor: Vec, -} - pub async fn run(config: Config, tx_storage: Arc) -> Result<()> { tokio::spawn(async move { - let ingest = ingest::Stage {}; - let fanout = fanout::Stage::new(config, tx_storage); + let ingest = ingest::Stage::new(tx_storage.clone()); + let fanout = fanout::Stage::new(tx_storage.clone(), config.peer_manager); let monitor = monitor::Stage {}; let policy: gasket::runtime::Policy = Default::default(); @@ -31,4 +26,4 @@ pub async fn run(config: Config, tx_storage: Arc) -> Result<( .await?; Ok(()) -} \ No newline at end of file +} diff --git a/src/storage/in_memory_db.rs b/src/storage/in_memory_db.rs deleted file mode 100644 index 7ff6d99..0000000 --- a/src/storage/in_memory_db.rs +++ /dev/null @@ -1,29 +0,0 @@ -use std::sync::{Arc, Mutex}; - -#[derive(Clone)] -pub struct CborTransactionsDb { - pub cbor_txs: Arc>>>, -} - -impl CborTransactionsDb { - pub fn new() -> Self { - Self { - cbor_txs: Arc::new(Mutex::new(vec![])), - } - } - - pub fn push_tx(&self, tx: Vec) { - let mut txs = self.cbor_txs.lock().unwrap(); - txs.push(tx); - } - - pub fn pop_tx(&self) -> Option> { - let mut txs = self.cbor_txs.lock().unwrap(); - - if txs.len() == 0 { - None - } else { - Some(txs.remove(0)) - } - } -} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index c76a492..67d0d1e 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -4,13 +4,13 @@ use chrono::{DateTime, Utc}; use serde::Deserialize; pub mod sqlite; -pub mod in_memory_db; #[derive(Deserialize, Clone)] pub struct Config { pub db_path: String, } +#[derive(Clone)] pub struct Transaction { pub id: String, pub raw: Vec, @@ -45,9 +45,9 @@ impl TryFrom for TransactionPriority { fn try_from(value: u32) -> Result { match value { - 1 => Ok(Self::Low), + 1 => Ok(Self::High), 2 => Ok(Self::Medium), - 3 => Ok(Self::High), + 3 => Ok(Self::Low), _ => Err(anyhow::Error::msg("transaction priority not supported")), } } @@ -57,9 +57,9 @@ impl TryFrom for u32 { fn try_from(value: TransactionPriority) -> Result { match value { - TransactionPriority::Low => Ok(1), + TransactionPriority::High => Ok(1), TransactionPriority::Medium => Ok(2), - TransactionPriority::High => Ok(3), + TransactionPriority::Low => Ok(3), } } } @@ -67,6 +67,8 @@ impl TryFrom for u32 { #[derive(Clone)] pub enum TransactionStatus { Pending, + Validated, + InFlight, } impl FromStr for TransactionStatus { type Err = anyhow::Error; @@ -74,6 +76,8 @@ impl FromStr for TransactionStatus { fn from_str(s: &str) -> std::result::Result { match s { "pending" => Ok(Self::Pending), + "validated" => Ok(Self::Validated), + "inflight" => Ok(Self::InFlight), _ => Err(anyhow::Error::msg("transaction status not supported")), } } @@ -82,6 +86,8 @@ impl Display for TransactionStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Pending => write!(f, "pending"), + Self::Validated => write!(f, "validated"), + Self::InFlight => write!(f, "inflight"), } } } diff --git a/src/storage/sqlite.rs b/src/storage/sqlite.rs index 9b10eef..c9492f0 100644 --- a/src/storage/sqlite.rs +++ b/src/storage/sqlite.rs @@ -1,9 +1,10 @@ use std::path::Path; use anyhow::{Error, Result}; +use chrono::Utc; use sqlx::{sqlite::SqliteRow, FromRow, Row}; -use super::Transaction; +use super::{Transaction, TransactionStatus}; pub struct SqliteStorage { db: sqlx::sqlite::SqlitePool, @@ -120,11 +121,64 @@ impl SqliteTransaction { db_tx.commit().await?; Ok(()) } + + pub async fn next(&self, status: TransactionStatus) -> Result> { + let transaction = sqlx::query_as::<_, Transaction>( + r#" + SELECT + id, + raw, + status, + priority, + created_at, + updated_at + FROM + tx + WHERE + tx.status = $1 + ORDER BY + priority, + created_at ASC + LIMIT 1; + "#, + ) + .bind(status.to_string()) + .fetch_optional(&self.sqlite.db) + .await?; + + Ok(transaction) + } + + pub async fn update(&self, tx: &Transaction) -> Result<()> { + let status = tx.status.to_string(); + let updated_at = Utc::now(); + + sqlx::query!( + r#" + UPDATE + tx + SET + raw = $1, + status = $2, + updated_at = $3 + WHERE + id = $4; + "#, + tx.raw, + status, + updated_at, + tx.id, + ) + .execute(&self.sqlite.db) + .await?; + + Ok(()) + } } #[cfg(test)] mod tests { - use crate::storage::Transaction; + use crate::storage::{Transaction, TransactionStatus}; use super::{SqliteStorage, SqliteTransaction}; @@ -166,4 +220,33 @@ mod tests { let result = storage.create(&vec![transaction]).await; assert!(result.is_err()); } + + #[tokio::test] + async fn it_should_find_next_transaction() { + let storage = mock_sqlite().await; + let transaction = Transaction::default(); + + storage.create(&vec![transaction]).await.unwrap(); + + let result = storage.next(TransactionStatus::Pending).await; + assert!(result.is_ok()); + assert!(result.unwrap().is_some()); + } + + #[tokio::test] + async fn it_should_update_transaction_valid() { + let storage = mock_sqlite().await; + + let transaction = Transaction::default(); + storage.create(&vec![transaction]).await.unwrap(); + + let mut transaction = Transaction::default(); + transaction.status = TransactionStatus::Validated; + let result = storage.update(&transaction).await; + assert!(result.is_ok()); + + let result = storage.next(TransactionStatus::Validated).await; + assert!(result.is_ok()); + assert!(result.unwrap().is_some()); + } }