Skip to content

Commit

Permalink
Mempool: fix mempool tests
Browse files Browse the repository at this point in the history
- set Tokio runtime flavor to `current_thread` for tests
- yield back execution when necessary
- get rid of sleeps

Low end machines sometimes struggle to complete all the verification tasks within the fixed duration thus creating false positives. Now it waits until all tasks are finished.
Additionally these tests should be able to finish a bit faster.
  • Loading branch information
Eligioo authored and jsdanielh committed Sep 26, 2024
1 parent 4d16988 commit 784ae64
Showing 1 changed file with 121 additions and 127 deletions.
248 changes: 121 additions & 127 deletions mempool/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{env, str::FromStr, sync::Arc, time::Duration};
use std::{env, str::FromStr, sync::Arc};

use nimiq_block::{Block, MicroBlock, MicroBody, MicroHeader};
use nimiq_blockchain::{BlockProducer, Blockchain, BlockchainConfig};
Expand All @@ -21,13 +21,12 @@ use nimiq_test_utils::{
test_rng::test_rng,
test_transaction::{generate_accounts, generate_transactions, TestTransaction},
};
use nimiq_time::sleep;
use nimiq_transaction::{ExecutedTransaction, Transaction};
use nimiq_transaction_builder::TransactionBuilder;
use nimiq_utils::time::OffsetTime;
use nimiq_vrf::VrfSeed;
use parking_lot::RwLock;
use tokio::sync::mpsc;
use tokio::{sync::mpsc, task::yield_now};
use tokio_stream::wrappers::ReceiverStream;

pub const ACCOUNT_SECRET_KEY: &str =
Expand Down Expand Up @@ -97,7 +96,6 @@ async fn send_txn_to_mempool(
.await
.expect("Send failed");

sleep(Duration::from_secs(2)).await;
mempool.stop_executor_without_unsubscribe().await;
}

Expand Down Expand Up @@ -130,122 +128,9 @@ async fn send_control_txn_to_mempool(
.await
.expect("Send failed");

sleep(Duration::from_secs(1)).await;
mempool.stop_control_executor_without_unsubscribe().await;
}

async fn multiple_start_stop_send(
blockchain: Arc<RwLock<Blockchain>>,
transactions: Vec<Transaction>,
) {
let min_tps = tps_setting(100);

// Create a MPSC channel to directly send transactions to the mempool
let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64);

// Create mempool and subscribe with a custom txn stream.
let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default());
let mut hub = MockHub::new();
let mock_id = MockId::new(hub.new_address().into());
let mock_network = Arc::new(hub.new_network());

// Subscribe mempool with the mpsc stream created
mempool
.start_executor_with_txn_stream::<MockNetwork>(
Box::pin(ReceiverStream::new(txn_stream_rx)),
mock_network,
)
.await;

// Send the transactions
let txn_stream_tx1 = txn_stream_tx.clone();
let mock_id1 = mock_id.clone();
let txns = transactions.clone();
tokio::task::spawn(async move {
for txn in txns {
txn_stream_tx1
.send((txn.clone(), mock_id1.clone()))
.await
.unwrap();
}
})
.await
.expect("Send failed");

sleep(Duration::from_secs(2)).await;
mempool.stop_executor_without_unsubscribe().await;

// Get the transactions from the mempool
let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX);

// We should obtain the same amount of transactions
assert_eq!(obtained_txns.len(), min_tps);

// Now send more transactions via the transaction stream.
let txns = transactions.clone();
tokio::task::spawn(async move {
for txn in txns {
txn_stream_tx
.send((txn.clone(), mock_id.clone()))
.await
.expect_err("Send should fail, executor is stopped");
}
})
.await
.expect("Send failed");

sleep(Duration::from_secs(2)).await;

// Call stop again, nothing should happen.
mempool.stop_executor_without_unsubscribe().await;

// We should not obtain any, since the executor should not be running.
let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX);

// We should obtain 0 transactions
assert_eq!(obtained_txns.len(), 0_usize);

// Restart the executor
// Create a MPSC channel to directly send transactions to the mempool
let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64);

// Create mempool and subscribe with a custom txn stream.
let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default());
let mut hub = MockHub::new();
let mock_id = MockId::new(hub.new_address().into());
let mock_network = Arc::new(hub.new_network());

// Subscribe mempool with the mpsc stream created
mempool
.start_executor_with_txn_stream::<MockNetwork>(
Box::pin(ReceiverStream::new(txn_stream_rx)),
mock_network,
)
.await;

// Send the transactions
let txns = transactions.clone();
tokio::task::spawn(async move {
for txn in txns {
txn_stream_tx
.send((txn.clone(), mock_id.clone()))
.await
.unwrap();
}
})
.await
.expect("Send failed");

sleep(Duration::from_secs(2)).await;
mempool.stop_executor_without_unsubscribe().await;

// Get the transactions from the mempool
let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX);

// We should obtain same number of txns
assert_eq!(obtained_txns.len(), min_tps);
}

fn create_dummy_micro_block(transactions: Option<Vec<Transaction>>) -> Block {
// Build a dummy MicroHeader
let micro_header = MicroHeader {
Expand Down Expand Up @@ -810,7 +695,7 @@ async fn multiple_transactions_multiple_senders() {
}
}

#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))]
#[test(tokio::test(flavor = "current_thread"))]
async fn mempool_tps() {
let min_tps = tps_setting(100);
let mut rng = test_rng(true);
Expand Down Expand Up @@ -902,7 +787,7 @@ async fn mempool_tps() {
}
}

#[test(tokio::test)]
#[test(tokio::test(flavor = "current_thread"))]
async fn multiple_start_stop() {
let mut rng = test_rng(true);
let time = Arc::new(OffsetTime::new());
Expand Down Expand Up @@ -934,7 +819,7 @@ async fn multiple_start_stop() {
};
mempool_transactions.push(mempool_transaction);
}
let (txns, _) = generate_transactions(mempool_transactions, true);
let (transactions, _) = generate_transactions(mempool_transactions, true);
log::debug!("Done generating transactions and accounts");

// Add validator to genesis
Expand Down Expand Up @@ -973,11 +858,120 @@ async fn multiple_start_stop() {
.unwrap(),
));

let min_tps = num_txns as usize;

// Create a MPSC channel to directly send transactions to the mempool
let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64);

// Create mempool and subscribe with a custom txn stream.
let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default());
let mut hub = MockHub::new();
let mock_id = MockId::new(hub.new_address().into());
let mock_network = Arc::new(hub.new_network());

// Subscribe mempool with the mpsc stream created
mempool
.start_executor_with_txn_stream::<MockNetwork>(
Box::pin(ReceiverStream::new(txn_stream_rx)),
mock_network,
)
.await;

// Send the transactions
let txn_stream_tx1 = txn_stream_tx.clone();
let mock_id1 = mock_id.clone();
let txns = transactions.clone();
tokio::task::spawn(async move {
for txn in txns {
txn_stream_tx1
.send((txn.clone(), mock_id1.clone()))
.await
.unwrap();
}
})
.await
.expect("Send failed");

// Yield execution back to the runtime so that the txn_stream can get polled
yield_now().await;

mempool.stop_executor_without_unsubscribe().await;

// Get the transactions from the mempool
let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX);

// We should obtain the same amount of transactions
assert_eq!(obtained_txns.len(), min_tps);

// Now send more transactions via the transaction stream.
let txns = transactions.clone();
tokio::task::spawn(async move {
for txn in txns {
txn_stream_tx
.send((txn.clone(), mock_id.clone()))
.await
.expect_err("Send should fail, executor is stopped");
}
})
.await
.expect("Send failed");

// Yield execution back to the runtime so that the txn_stream can get polled
yield_now().await;

// Call stop again, nothing should happen.
mempool.stop_executor_without_unsubscribe().await;

// We should not obtain any, since the executor should not be running.
let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX);

// We should obtain 0 transactions
assert_eq!(obtained_txns.len(), 0_usize);

// Restart the executor
// Create a MPSC channel to directly send transactions to the mempool
let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64);

// Create mempool and subscribe with a custom txn stream.
let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default());
let mut hub = MockHub::new();
let mock_id = MockId::new(hub.new_address().into());
let mock_network = Arc::new(hub.new_network());

// Subscribe mempool with the mpsc stream created
mempool
.start_executor_with_txn_stream::<MockNetwork>(
Box::pin(ReceiverStream::new(txn_stream_rx)),
mock_network,
)
.await;

// Send the transactions
multiple_start_stop_send(blockchain, txns).await;
let txns = transactions.clone();
tokio::task::spawn(async move {
for txn in txns {
txn_stream_tx
.send((txn.clone(), mock_id.clone()))
.await
.unwrap();
}
})
.await
.expect("Send failed");

// Yield execution back to the runtime so that the txn_stream can get polled
yield_now().await;

mempool.stop_executor_without_unsubscribe().await;

// Get the transactions from the mempool
let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX);

// We should obtain same number of txns
assert_eq!(obtained_txns.len(), min_tps);
}

#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))]
#[test(tokio::test(flavor = "current_thread"))]
async fn mempool_update() {
let mut rng = test_rng(true);
let time = Arc::new(OffsetTime::new());
Expand Down Expand Up @@ -1170,7 +1164,7 @@ async fn mempool_update() {
}
}

#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))]
#[test(tokio::test(flavor = "current_thread"))]
// The purpose of this test is to verify that aged transactions, that is,
// transactions that are stored in the mempool for which the validity
// window is already expired, are properly pruned from the mempool.
Expand Down Expand Up @@ -1301,7 +1295,7 @@ async fn mempool_update_aged_transaction() {
);
}

#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))]
#[test(tokio::test(flavor = "current_thread"))]
async fn mempool_update_not_enough_balance() {
let mut rng = test_rng(true);
let time = Arc::new(OffsetTime::new());
Expand Down Expand Up @@ -1462,7 +1456,7 @@ async fn mempool_update_not_enough_balance() {
}
}

#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))]
#[test(tokio::test(flavor = "current_thread"))]
async fn mempool_update_pruned_account() {
let mut rng = test_rng(true);
let time = Arc::new(OffsetTime::new());
Expand Down Expand Up @@ -1622,7 +1616,7 @@ async fn mempool_update_pruned_account() {
}
}

#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))]
#[test(tokio::test(flavor = "current_thread"))]
async fn mempool_basic_prioritization_control_tx() {
let time = Arc::new(OffsetTime::new());
let env = MdbxDatabase::new_volatile(Default::default()).unwrap();
Expand Down Expand Up @@ -1706,7 +1700,7 @@ async fn mempool_basic_prioritization_control_tx() {
);
}

#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))]
#[test(tokio::test(flavor = "current_thread"))]
async fn mempool_regular_and_control_tx() {
let mut rng = test_rng(true);
let balance = 100_000_000;
Expand Down

0 comments on commit 784ae64

Please sign in to comment.