From 784ae648663c1c6e76eadfadbf918bc6a4e46ca4 Mon Sep 17 00:00:00 2001 From: Stefan Date: Wed, 4 Sep 2024 15:46:01 +0200 Subject: [PATCH] Mempool: fix mempool tests - set Tokio runtime flavor to `current_thread` for tests - yield back execution when necessary - get rid of sleeps Low end machines sometimes struggle to complete all the verification tasks within the fixed duration thus creating false positives. Now it waits until all tasks are finished. Additionally these tests should be able to finish a bit faster. --- mempool/tests/mod.rs | 248 +++++++++++++++++++++---------------------- 1 file changed, 121 insertions(+), 127 deletions(-) diff --git a/mempool/tests/mod.rs b/mempool/tests/mod.rs index 4517a08692..75d54eeb73 100644 --- a/mempool/tests/mod.rs +++ b/mempool/tests/mod.rs @@ -1,4 +1,4 @@ -use std::{env, str::FromStr, sync::Arc, time::Duration}; +use std::{env, str::FromStr, sync::Arc}; use nimiq_block::{Block, MicroBlock, MicroBody, MicroHeader}; use nimiq_blockchain::{BlockProducer, Blockchain, BlockchainConfig}; @@ -21,13 +21,12 @@ use nimiq_test_utils::{ test_rng::test_rng, test_transaction::{generate_accounts, generate_transactions, TestTransaction}, }; -use nimiq_time::sleep; use nimiq_transaction::{ExecutedTransaction, Transaction}; use nimiq_transaction_builder::TransactionBuilder; use nimiq_utils::time::OffsetTime; use nimiq_vrf::VrfSeed; use parking_lot::RwLock; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, task::yield_now}; use tokio_stream::wrappers::ReceiverStream; pub const ACCOUNT_SECRET_KEY: &str = @@ -97,7 +96,6 @@ async fn send_txn_to_mempool( .await .expect("Send failed"); - sleep(Duration::from_secs(2)).await; mempool.stop_executor_without_unsubscribe().await; } @@ -130,122 +128,9 @@ async fn send_control_txn_to_mempool( .await .expect("Send failed"); - sleep(Duration::from_secs(1)).await; mempool.stop_control_executor_without_unsubscribe().await; } -async fn multiple_start_stop_send( - blockchain: Arc>, - transactions: Vec, -) { - let min_tps = tps_setting(100); - - // Create a MPSC channel to directly send transactions to the mempool - let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64); - - // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); - let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); - let mock_network = Arc::new(hub.new_network()); - - // Subscribe mempool with the mpsc stream created - mempool - .start_executor_with_txn_stream::( - Box::pin(ReceiverStream::new(txn_stream_rx)), - mock_network, - ) - .await; - - // Send the transactions - let txn_stream_tx1 = txn_stream_tx.clone(); - let mock_id1 = mock_id.clone(); - let txns = transactions.clone(); - tokio::task::spawn(async move { - for txn in txns { - txn_stream_tx1 - .send((txn.clone(), mock_id1.clone())) - .await - .unwrap(); - } - }) - .await - .expect("Send failed"); - - sleep(Duration::from_secs(2)).await; - mempool.stop_executor_without_unsubscribe().await; - - // Get the transactions from the mempool - let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX); - - // We should obtain the same amount of transactions - assert_eq!(obtained_txns.len(), min_tps); - - // Now send more transactions via the transaction stream. - let txns = transactions.clone(); - tokio::task::spawn(async move { - for txn in txns { - txn_stream_tx - .send((txn.clone(), mock_id.clone())) - .await - .expect_err("Send should fail, executor is stopped"); - } - }) - .await - .expect("Send failed"); - - sleep(Duration::from_secs(2)).await; - - // Call stop again, nothing should happen. - mempool.stop_executor_without_unsubscribe().await; - - // We should not obtain any, since the executor should not be running. - let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX); - - // We should obtain 0 transactions - assert_eq!(obtained_txns.len(), 0_usize); - - // Restart the executor - // Create a MPSC channel to directly send transactions to the mempool - let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64); - - // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); - let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); - let mock_network = Arc::new(hub.new_network()); - - // Subscribe mempool with the mpsc stream created - mempool - .start_executor_with_txn_stream::( - Box::pin(ReceiverStream::new(txn_stream_rx)), - mock_network, - ) - .await; - - // Send the transactions - let txns = transactions.clone(); - tokio::task::spawn(async move { - for txn in txns { - txn_stream_tx - .send((txn.clone(), mock_id.clone())) - .await - .unwrap(); - } - }) - .await - .expect("Send failed"); - - sleep(Duration::from_secs(2)).await; - mempool.stop_executor_without_unsubscribe().await; - - // Get the transactions from the mempool - let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX); - - // We should obtain same number of txns - assert_eq!(obtained_txns.len(), min_tps); -} - fn create_dummy_micro_block(transactions: Option>) -> Block { // Build a dummy MicroHeader let micro_header = MicroHeader { @@ -810,7 +695,7 @@ async fn multiple_transactions_multiple_senders() { } } -#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +#[test(tokio::test(flavor = "current_thread"))] async fn mempool_tps() { let min_tps = tps_setting(100); let mut rng = test_rng(true); @@ -902,7 +787,7 @@ async fn mempool_tps() { } } -#[test(tokio::test)] +#[test(tokio::test(flavor = "current_thread"))] async fn multiple_start_stop() { let mut rng = test_rng(true); let time = Arc::new(OffsetTime::new()); @@ -934,7 +819,7 @@ async fn multiple_start_stop() { }; mempool_transactions.push(mempool_transaction); } - let (txns, _) = generate_transactions(mempool_transactions, true); + let (transactions, _) = generate_transactions(mempool_transactions, true); log::debug!("Done generating transactions and accounts"); // Add validator to genesis @@ -973,11 +858,120 @@ async fn multiple_start_stop() { .unwrap(), )); + let min_tps = num_txns as usize; + + // Create a MPSC channel to directly send transactions to the mempool + let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64); + + // Create mempool and subscribe with a custom txn stream. + let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); + let mut hub = MockHub::new(); + let mock_id = MockId::new(hub.new_address().into()); + let mock_network = Arc::new(hub.new_network()); + + // Subscribe mempool with the mpsc stream created + mempool + .start_executor_with_txn_stream::( + Box::pin(ReceiverStream::new(txn_stream_rx)), + mock_network, + ) + .await; + + // Send the transactions + let txn_stream_tx1 = txn_stream_tx.clone(); + let mock_id1 = mock_id.clone(); + let txns = transactions.clone(); + tokio::task::spawn(async move { + for txn in txns { + txn_stream_tx1 + .send((txn.clone(), mock_id1.clone())) + .await + .unwrap(); + } + }) + .await + .expect("Send failed"); + + // Yield execution back to the runtime so that the txn_stream can get polled + yield_now().await; + + mempool.stop_executor_without_unsubscribe().await; + + // Get the transactions from the mempool + let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX); + + // We should obtain the same amount of transactions + assert_eq!(obtained_txns.len(), min_tps); + + // Now send more transactions via the transaction stream. + let txns = transactions.clone(); + tokio::task::spawn(async move { + for txn in txns { + txn_stream_tx + .send((txn.clone(), mock_id.clone())) + .await + .expect_err("Send should fail, executor is stopped"); + } + }) + .await + .expect("Send failed"); + + // Yield execution back to the runtime so that the txn_stream can get polled + yield_now().await; + + // Call stop again, nothing should happen. + mempool.stop_executor_without_unsubscribe().await; + + // We should not obtain any, since the executor should not be running. + let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX); + + // We should obtain 0 transactions + assert_eq!(obtained_txns.len(), 0_usize); + + // Restart the executor + // Create a MPSC channel to directly send transactions to the mempool + let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64); + + // Create mempool and subscribe with a custom txn stream. + let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); + let mut hub = MockHub::new(); + let mock_id = MockId::new(hub.new_address().into()); + let mock_network = Arc::new(hub.new_network()); + + // Subscribe mempool with the mpsc stream created + mempool + .start_executor_with_txn_stream::( + Box::pin(ReceiverStream::new(txn_stream_rx)), + mock_network, + ) + .await; + // Send the transactions - multiple_start_stop_send(blockchain, txns).await; + let txns = transactions.clone(); + tokio::task::spawn(async move { + for txn in txns { + txn_stream_tx + .send((txn.clone(), mock_id.clone())) + .await + .unwrap(); + } + }) + .await + .expect("Send failed"); + + // Yield execution back to the runtime so that the txn_stream can get polled + yield_now().await; + + mempool.stop_executor_without_unsubscribe().await; + + // Get the transactions from the mempool + let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX); + + // We should obtain same number of txns + assert_eq!(obtained_txns.len(), min_tps); } -#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +#[test(tokio::test(flavor = "current_thread"))] async fn mempool_update() { let mut rng = test_rng(true); let time = Arc::new(OffsetTime::new()); @@ -1170,7 +1164,7 @@ async fn mempool_update() { } } -#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +#[test(tokio::test(flavor = "current_thread"))] // The purpose of this test is to verify that aged transactions, that is, // transactions that are stored in the mempool for which the validity // window is already expired, are properly pruned from the mempool. @@ -1301,7 +1295,7 @@ async fn mempool_update_aged_transaction() { ); } -#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +#[test(tokio::test(flavor = "current_thread"))] async fn mempool_update_not_enough_balance() { let mut rng = test_rng(true); let time = Arc::new(OffsetTime::new()); @@ -1462,7 +1456,7 @@ async fn mempool_update_not_enough_balance() { } } -#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +#[test(tokio::test(flavor = "current_thread"))] async fn mempool_update_pruned_account() { let mut rng = test_rng(true); let time = Arc::new(OffsetTime::new()); @@ -1622,7 +1616,7 @@ async fn mempool_update_pruned_account() { } } -#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +#[test(tokio::test(flavor = "current_thread"))] async fn mempool_basic_prioritization_control_tx() { let time = Arc::new(OffsetTime::new()); let env = MdbxDatabase::new_volatile(Default::default()).unwrap(); @@ -1706,7 +1700,7 @@ async fn mempool_basic_prioritization_control_tx() { ); } -#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +#[test(tokio::test(flavor = "current_thread"))] async fn mempool_regular_and_control_tx() { let mut rng = test_rng(true); let balance = 100_000_000;