Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fanout stage mock server unit test #7

Merged
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ Cargo.lock
dev.db*
.env*

boros.config.toml
boros.config.toml
boros.toml
boros.db
2 changes: 1 addition & 1 deletion examples/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let mut client = submit::submit_service_client::SubmitServiceClient::new(channel);

let tx = "84a300d9010281825820cdc219e7abe938a35ca074d4bd02d6ccc3c2fc25d1462af07b6c1e8f40933af200018282581d603f79e7eab3ab95c1f78824872ac6fd65f79d120868057f2bd19306f81a3b9aca0082581d603f79e7eab3ab95c1f78824872ac6fd65f79d120868057f2bd19306f81a77c0bd2f021a0002990da100d90102818258205d4b008e92a42846add4d060e49d7427700ced0ab8eb73e559acc14d228ca5475840f3f12cbfd551e5e51f9eb32fcf695c3a63ec3dfb7329108f45b441cafc7a706659d06238665327779e32415c91b6190e0cd00096aee41f6e405be59d69462708f5f6";
let tx = "84a300d901028182582071e8d419538ec00b262d6d625d02fec577ecad60feae0ad0238e74f97a0162b500018182581d606e2e6d54e1a27ad640786852e20c22fb982ebcee4773a2926aae4b391b00000001a1113ab4021a000292b1a100d9010281825820524e506f6a872c4ee3ee6d4b9913670c4b441860b3aa5438de92a676e20f527b5840ebb051997a9b920f366526c87e877ab525014257480ad22f78338f662068bd899850386ab2abb714bbc5c7e719bb0462e2de1c60000f33e81c043fc9db949a02f5f6";

let bytes = hex::decode(tx)?;

Expand Down
3 changes: 1 addition & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ async fn main() -> Result<()> {
storage.migrate().await?;

let tx_storage = Arc::new(SqliteTransaction::new(storage));
let cbor_txs_db = storage::in_memory_db::CborTransactionsDb::new();

let pipeline = pipeline::run(cbor_txs_db.clone(), config.clone());
let pipeline = pipeline::run(config.clone(), tx_storage.clone());
let server = server::run(config.server, tx_storage.clone());

try_join!(pipeline, server)?;
Expand Down
189 changes: 189 additions & 0 deletions src/pipeline/fanout/mock_ouroboros_tx_submit_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use std::sync::{Arc, Mutex, RwLock};

use pallas::network::{
facades::PeerServer,
miniprotocols::txsubmission::{EraTxId, Reply, State},
};
use tokio::{net::TcpListener, task};
use tracing::{error, info};

pub struct MockOuroborosTxSubmitPeerServer {
pub socket_addr: String,
pub network_magic: u64,
pub acknowledge_txs: Arc<Mutex<Vec<pallas::crypto::hash::Hash<32>>>>,
pub is_done: Arc<RwLock<bool>>,
}

impl MockOuroborosTxSubmitPeerServer {
pub fn new(socket_addr: String, network_magic: u64) -> Self {
Self {
socket_addr,
network_magic,
acknowledge_txs: Arc::new(Mutex::new(vec![])),
is_done: Arc::new(RwLock::new(false)),
}
}

pub async fn init(self: Arc<Self>) {
task::spawn(async move {
let tcp_listener = TcpListener::bind(&self.socket_addr)
.await
.expect("Failed to bind");

info!(
"SERVER: MockOuroborosTxSubmitPeerServer listening on: {}",
&self.socket_addr
);

loop {
match PeerServer::accept(&tcp_listener, self.network_magic).await {
Ok(peer_server) => {
let this = Arc::clone(&self);
task::spawn(async move {
this.start_background_task(peer_server).await;
});
}
Err(err) => {
info!("SERVER: Accept error: {:?}", err);
}
}
}
});
}

async fn start_background_task(&self, mut peer_server: PeerServer) {
let mut collected_tx_ids: Vec<EraTxId> = vec![];
let addr = peer_server.accepted_address().unwrap();

info!("SERVER: New connection from {addr}");
let tx_server = peer_server.txsubmission();

info!("SERVER: waiting for init (agency is theirs)");
if let Err(err) = tx_server.wait_for_init().await {
error!("SERVER: error waiting for init: {:?}", err);
return;
}

info!("SERVER: init received, now we have agency => requesting TxIds in a loop...");

let mut acknowledge = 0u16;
let count = 3u16;

info!("SERVER: Current State: {:?}", tx_server.state());
info!("SERVER: requesting TxIds (blocking=true)");

// Request TxIds Blocking
if matches!(tx_server.state(), State::Idle) {
if let Err(err) = tx_server
.acknowledge_and_request_tx_ids(true, acknowledge, count)
.await
{
panic!("SERVER: error requesting TxIds => {err:?}");
}
}


// Recieve TxIds for the blocking request
let txids_reply = match tx_server.receive_next_reply().await {
Ok(reply) => reply,
Err(err) => {
panic!("SERVER: error receiving next reply => {err:?}");
}
};

// Process the TxIds
if let Reply::TxIds(ids_and_sizes) = txids_reply {
let num_ids = ids_and_sizes.len() as u32;
info!(
"SERVER: got TxIds => {} total, ack so far => {}",
num_ids, acknowledge
);

let new_tx_ids = ids_and_sizes
.into_iter()
.map(|tx_id_and_size| tx_id_and_size.0)
.collect::<Vec<_>>();

collected_tx_ids.extend(new_tx_ids);

info!("SERVER: appended {} new TxIds to collected list", num_ids);
info!("SERVER: next request will be non-blocking");
}

// Request TxIds Non-Blocking
if matches!(tx_server.state(), State::Idle) {
if let Err(err) = tx_server
.acknowledge_and_request_tx_ids(false, 0, count)
.await
{
panic!("SERVER: error requesting TxIds => {err:?}");
}
}

// Recieve TxIds for the non-blocking request
let txids_reply = match tx_server.receive_next_reply().await {
Ok(reply) => reply,
Err(err) => {
panic!("SERVER: error receiving next reply => {err:?}");
}
};

// Process the TxIds
if let Reply::TxIds(ids_and_sizes) = txids_reply {
let num_ids = ids_and_sizes.len() as u32;
info!(
"SERVER: got TxIds => {} total, ack so far => {}",
num_ids, acknowledge
);

let new_tx_ids = ids_and_sizes
.into_iter()
.map(|tx_id_and_size| tx_id_and_size.0)
.collect::<Vec<_>>();

collected_tx_ids.extend(new_tx_ids);

info!("SERVER: appended {} new TxIds to collected list", num_ids);
info!("SERVER: next request will be non-blocking");
}

// request the Tx bodies
if let Err(err) = tx_server.request_txs(collected_tx_ids.clone()).await {
panic!("SERVER: error replying TxIds => {err:?}");
}


// Recieve Tx bodies
let txs_reply = match tx_server.receive_next_reply().await {
Ok(reply) => reply,
Err(err) => {
panic!("SERVER: error receiving next reply => {err:?}");
}
};

// Process the Tx bodies
if let Reply::Txs(bodies) = txs_reply {
info!(
"SERVER: State => {:?}, got Tx bodies => {}",
tx_server.state(),
bodies.len()
);
acknowledge += bodies.len() as u16;
let to_acknowledge_ids = collected_tx_ids.drain(..bodies.len()).collect::<Vec<_>>();
let mut acknowledge_txs = self.acknowledge_txs.lock().unwrap();
to_acknowledge_ids.iter().for_each(|EraTxId(_, hash)| {
acknowledge_txs.push(pallas::crypto::hash::Hash::new(hash.as_slice().try_into().unwrap()));
});
info!(
"SERVER: ack so far => {acknowledge}, State => {:?}, agency is theirs",
tx_server.state()
);
info!("SERVER: got Tx bodies => {}", bodies.len());
}

info!("SERVER: done, closing connection");
peer_server.abort().await;
*self.is_done.write().unwrap() = true;
info!("SERVER: connection closed");
}
}
87 changes: 61 additions & 26 deletions src/pipeline/fanout/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,76 @@ pub mod mempool;
pub mod stage;
pub mod tx_submit_peer;
pub mod tx_submit_peer_manager;
pub mod mock_ouroboros_tx_submit_server;

pub use stage::Stage;

// Test for Fanout Stage
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};

use hex::decode;
use mock_ouroboros_tx_submit_server::MockOuroborosTxSubmitPeerServer;
use pallas::ledger::traverse::MultiEraTx;

use super::*;
use crate::storage::in_memory_db::CborTransactionsDb;
use crate::{Config, PeerManagerConfig};
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::vec;

#[test]
fn test_fanout_stage() {
let cbor_txs_db = CborTransactionsDb {
cbor_txs: Arc::new(Mutex::new(vec![vec![1, 2, 3]])),
};

let config = Config {
peer_manager: PeerManagerConfig {
peers: vec!["".to_string()],
},
server: crate::server::Config {
listen_address: SocketAddr::from_str("[::1]:50052").unwrap(),
},
storage: crate::storage::Config {
db_path: "dev.db".into(),
},
#[tokio::test]
async fn test_fanout_stage() {
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();

let peer_server = Arc::new(MockOuroborosTxSubmitPeerServer::new("0.0.0.0:3001".to_string(), 2));
peer_server.clone().init().await;

tokio::time::sleep(Duration::from_millis(200)).await;
let mut tx_submit_peer_client = tx_submit_peer::TxSubmitPeer::new("127.0.0.1:3001", 2);

tx_submit_peer_client.init().await.unwrap();

tokio::time::sleep(Duration::from_secs(1)).await;

// add txs to peer client
let cbor_data = "84a300d90102828258202000fbfaa10c6b316fe4b23c60b42313fc2ad89b51c7397f82a4a5ca97bd62a9008258202000fbfaa10c6b316fe4b23c60b42313fc2ad89b51c7397f82a4a5ca97bd62a901018182581d606e2e6d54e1a27ad640786852e20c22fb982ebcee4773a2926aae4b391b00000001a1166016021a0002aa3da100d9010281825820524e506f6a872c4ee3ee6d4b9913670c4b441860b3aa5438de92a676e20f527b5840233e9119fa6a3c58ab42bc384f506c2906104ebb059ab4ea6cc79305ff46c7e194d634d23ff775f92e51246e328711e6cbf38aeda01a1885f922047323c68c04f5f6";

// Read the raw bytes from the request body.
let raw_cbor = match decode(cbor_data) {
Ok(bytes) => bytes,
Err(e) => {
tracing::error!("Failed to decode hex string: {:?}", e);
return;
}
};

// Run mock node
let tx = MultiEraTx::decode(&raw_cbor).unwrap();
let tx_id = tx.hash();

tracing::info!("Tx Hash: {:?}", tx_id);

// There is a deadlock here, need to debug
tx_submit_peer_client.add_tx(raw_cbor.clone()).await;

// wait for server to stop
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
let is_done = peer_server.is_done.read().unwrap();
tracing::info!("Is Server done: {:?}", *is_done);
if *is_done {
break;
}
drop(is_done);
}

let server_acknowledge_txs = peer_server.acknowledge_txs.lock().unwrap();
let mut found = false;

// Run Fanout Stage
let fanout = Stage::new(cbor_txs_db, config);
for tx_from_server in server_acknowledge_txs.iter() {
tracing::info!("Tx from server: {:?}, Tx from client: {:?}", tx_from_server, tx_id);
if tx_from_server == &tx_id {
found = true;
break;
}
}
assert!(found);
}
}
}
31 changes: 16 additions & 15 deletions src/pipeline/fanout/stage.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use gasket::framework::*;
use tokio::time::sleep;
use tracing::info;

use crate::{pipeline::Transaction, storage::in_memory_db::CborTransactionsDb, Config};
use crate::{pipeline::Transaction, storage::sqlite::SqliteTransaction, Config};

use super::tx_submit_peer_manager::TxSubmitPeerManager;

#[derive(Stage)]
#[stage(name = "fanout", unit = "Transaction", worker = "Worker")]
pub struct Stage {
pub cbor_txs_db: CborTransactionsDb,
pub tx_storage: Arc<SqliteTransaction>,
pub config: Config,
}

impl Stage {
pub fn new(cbor_txs_db: CborTransactionsDb, config: Config) -> Self {
pub fn new(config: Config, tx_storage: Arc<SqliteTransaction>) -> Self {
Self {
cbor_txs_db,
tx_storage,
config,
}
}
Expand Down Expand Up @@ -50,17 +50,18 @@ impl gasket::framework::Worker<Stage> for Worker {
&mut self,
stage: &mut Stage,
) -> Result<WorkSchedule<Transaction>, WorkerError> {
info!("Cbor Transactions Length: {}", stage.cbor_txs_db.cbor_txs.lock().unwrap().len());
// info!("Cbor Transactions Length: {}", stage.tx_storage);

if let Some(tx_cbor) = stage.cbor_txs_db.pop_tx() {
return Ok(WorkSchedule::Unit(Transaction {
cbor: tx_cbor
}));
} else {
// TODO: should we really have a sleep here?
sleep(Duration::from_secs(30)).await;
return Ok(WorkSchedule::Idle);
}
// if let Some(tx_cbor) = stage.cbor_txs_db.pop_tx() {
// return Ok(WorkSchedule::Unit(Transaction {
// cbor: tx_cbor
// }));
// } else {
// // TODO: should we really have a sleep here?
// sleep(Duration::from_secs(30)).await;
// return Ok(WorkSchedule::Idle);
// }
return Ok(WorkSchedule::Idle);
}

async fn execute(
Expand Down
Loading
Loading