Skip to content

Commit

Permalink
feat: integrate pipeline with storage (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan authored Jan 27, 2025
1 parent 72d61d0 commit 5ca5820
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 231 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 1 addition & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,11 @@ async fn main() -> Result<()> {
Ok(())
}

#[derive(Deserialize, Clone)]
struct PeerManagerConfig {
peers: Vec<String>,
}

#[derive(Deserialize, Clone)]
struct Config {
server: server::Config,
storage: storage::Config,
peer_manager: PeerManagerConfig,
peer_manager: pipeline::fanout::PeerManagerConfig,
}

impl Config {
Expand Down
81 changes: 5 additions & 76 deletions src/pipeline/fanout/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use itertools::Itertools;
use pallas::{
crypto::hash::Hash,
ledger::traverse::{MultiEraBlock, MultiEraTx},
};
use pallas::{crypto::hash::Hash, ledger::traverse::MultiEraTx};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
Expand All @@ -20,12 +17,6 @@ pub enum MempoolError {

#[error("decode error: {0}")]
DecodeError(#[from] pallas::codec::minicbor::decode::Error),

#[error("plutus not supported")]
PlutusNotSupported,

#[error("invalid tx: {0}")]
InvalidTx(String),
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
Expand All @@ -42,14 +33,14 @@ pub enum TxStage {
Pending,
Inflight,
Acknowledged,
Confirmed,
Unknown,
}

// TODO: validate clippy unused fields
#[allow(dead_code)]
#[derive(Clone)]
pub struct Event {
pub new_stage: TxStage,
pub tx: Tx,
new_stage: TxStage,
tx: Tx,
}

#[derive(Default)]
Expand All @@ -74,10 +65,6 @@ impl Mempool {
Self { mempool, updates }
}

pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.updates.subscribe()
}

pub fn notify(&self, new_stage: TxStage, tx: Tx) {
if self.updates.send(Event { new_stage, tx }).is_err() {
debug!("no mempool update receivers");
Expand Down Expand Up @@ -166,66 +153,8 @@ impl Mempool {
state.inflight.iter().find(|x| x.hash.eq(tx_hash)).cloned()
}

pub fn find_pending(&self, tx_hash: &TxHash) -> Option<Tx> {
let state = self.mempool.read().unwrap();
state.pending.iter().find(|x| x.hash.eq(tx_hash)).cloned()
}

pub fn pending_total(&self) -> usize {
let state = self.mempool.read().unwrap();
state.pending.len()
}

pub fn check_stage(&self, tx_hash: &TxHash) -> TxStage {
let state = self.mempool.read().unwrap();

if let Some(tx) = state.acknowledged.get(tx_hash) {
if tx.confirmed {
TxStage::Confirmed
} else {
TxStage::Acknowledged
}
} else if self.find_inflight(tx_hash).is_some() {
TxStage::Inflight
} else if self.find_pending(tx_hash).is_some() {
TxStage::Pending
} else {
TxStage::Unknown
}
}

pub fn apply_block(&self, block: &MultiEraBlock) {
let mut state = self.mempool.write().unwrap();

if state.acknowledged.is_empty() {
return;
}

for tx in block.txs() {
let tx_hash = tx.hash();

if let Some(acknowledged_tx) = state.acknowledged.get_mut(&tx_hash) {
acknowledged_tx.confirmed = true;
self.notify(TxStage::Confirmed, acknowledged_tx.clone());
debug!(%tx_hash, "confirming tx");
}
}
}

pub fn undo_block(&self, block: &MultiEraBlock) {
let mut state = self.mempool.write().unwrap();

if state.acknowledged.is_empty() {
return;
}

for tx in block.txs() {
let tx_hash = tx.hash();

if let Some(acknowledged_tx) = state.acknowledged.get_mut(&tx_hash) {
acknowledged_tx.confirmed = false;
debug!(%tx_hash, "un-confirming tx");
}
}
}
}
106 changes: 98 additions & 8 deletions src/pipeline/fanout/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,95 @@
use std::{sync::Arc, time::Duration};

use gasket::framework::*;
use serde::Deserialize;
use tokio::time::sleep;
use tracing::info;
use tx_submit_peer_manager::TxSubmitPeerManager;

use crate::storage::{sqlite::SqliteTransaction, Transaction, TransactionStatus};

pub mod mempool;
pub mod stage;
pub mod tx_submit_peer;
pub mod tx_submit_peer_manager;
pub mod mock_ouroboros_tx_submit_server;

pub use stage::Stage;
#[derive(Stage)]
#[stage(name = "fanout", unit = "Transaction", worker = "Worker")]
pub struct Stage {
storage: Arc<SqliteTransaction>,
config: PeerManagerConfig,
}

impl Stage {
pub fn new(storage: Arc<SqliteTransaction>, config: PeerManagerConfig) -> Self {
Self { storage, config }
}
}

pub struct Worker {
tx_submit_peer_manager: TxSubmitPeerManager,
}

#[async_trait::async_trait(?Send)]
impl gasket::framework::Worker<Stage> for Worker {
async fn bootstrap(stage: &Stage) -> Result<Self, WorkerError> {
// Load configuration and Start Clients
let peer_addresses = stage.config.peers.clone();

info!("Peer Addresses: {:?}", peer_addresses);

// Proof of Concept: TxSubmitPeerManager
// Pass Config Network Magic and Peer Addresses
let mut tx_submit_peer_manager = TxSubmitPeerManager::new(2, peer_addresses);
tx_submit_peer_manager.init().await.unwrap();

Ok(Self {
tx_submit_peer_manager,
})
}

async fn schedule(
&mut self,
stage: &mut Stage,
) -> Result<WorkSchedule<Transaction>, WorkerError> {
if let Some(tx) = stage
.storage
.next(TransactionStatus::Validated)
.await
.or_retry()?
{
return Ok(WorkSchedule::Unit(tx));
}

sleep(Duration::from_secs(1)).await;
Ok(WorkSchedule::Idle)
}

async fn execute(&mut self, unit: &Transaction, stage: &mut Stage) -> Result<(), WorkerError> {
let mut transaction = unit.clone();
info!("fanout {}", transaction.id);

// extract cbor from unit and pass it to tx_submit_peer_manager
// comment out for now until we have a proper tx to submit
self.tx_submit_peer_manager
.add_tx(transaction.raw.clone())
.await;

transaction.status = TransactionStatus::InFlight;
stage.storage.update(&transaction).await.or_retry()?;

Ok(())
}
}

#[derive(Deserialize, Clone)]
pub struct PeerManagerConfig {
peers: Vec<String>,
}

// Test for Fanout Stage
#[cfg(test)]
pub mod mock_ouroboros_tx_submit_server;

#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
Expand All @@ -21,7 +104,10 @@ mod tests {
async fn test_fanout_stage() {
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();

let peer_server = Arc::new(MockOuroborosTxSubmitPeerServer::new("0.0.0.0:3001".to_string(), 2));
let peer_server = Arc::new(MockOuroborosTxSubmitPeerServer::new(
"0.0.0.0:3001".to_string(),
2,
));
peer_server.clone().init().await;

tokio::time::sleep(Duration::from_millis(200)).await;
Expand All @@ -47,10 +133,10 @@ mod tests {
let tx_id = tx.hash();

tracing::info!("Tx Hash: {:?}", tx_id);

// There is a deadlock here, need to debug
tx_submit_peer_client.add_tx(raw_cbor.clone()).await;

// wait for server to stop
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
Expand All @@ -66,12 +152,16 @@ mod tests {
let mut found = false;

for tx_from_server in server_acknowledge_txs.iter() {
tracing::info!("Tx from server: {:?}, Tx from client: {:?}", tx_from_server, tx_id);
tracing::info!(
"Tx from server: {:?}, Tx from client: {:?}",
tx_from_server,
tx_id
);
if tx_from_server == &tx_id {
found = true;
break;
}
}
assert!(found);
}
}
}
81 changes: 0 additions & 81 deletions src/pipeline/fanout/stage.rs

This file was deleted.

Loading

0 comments on commit 5ca5820

Please sign in to comment.