diff --git a/mempool/tests/mod.rs b/mempool/tests/mod.rs index 4517a08692..75d54eeb73 100644 --- a/mempool/tests/mod.rs +++ b/mempool/tests/mod.rs @@ -1,4 +1,4 @@ -use std::{env, str::FromStr, sync::Arc, time::Duration}; +use std::{env, str::FromStr, sync::Arc}; use nimiq_block::{Block, MicroBlock, MicroBody, MicroHeader}; use nimiq_blockchain::{BlockProducer, Blockchain, BlockchainConfig}; @@ -21,13 +21,12 @@ use nimiq_test_utils::{ test_rng::test_rng, test_transaction::{generate_accounts, generate_transactions, TestTransaction}, }; -use nimiq_time::sleep; use nimiq_transaction::{ExecutedTransaction, Transaction}; use nimiq_transaction_builder::TransactionBuilder; use nimiq_utils::time::OffsetTime; use nimiq_vrf::VrfSeed; use parking_lot::RwLock; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, task::yield_now}; use tokio_stream::wrappers::ReceiverStream; pub const ACCOUNT_SECRET_KEY: &str = @@ -97,7 +96,6 @@ async fn send_txn_to_mempool( .await .expect("Send failed"); - sleep(Duration::from_secs(2)).await; mempool.stop_executor_without_unsubscribe().await; } @@ -130,122 +128,9 @@ async fn send_control_txn_to_mempool( .await .expect("Send failed"); - sleep(Duration::from_secs(1)).await; mempool.stop_control_executor_without_unsubscribe().await; } -async fn multiple_start_stop_send( - blockchain: Arc>, - transactions: Vec, -) { - let min_tps = tps_setting(100); - - // Create a MPSC channel to directly send transactions to the mempool - let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64); - - // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); - let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); - let mock_network = Arc::new(hub.new_network()); - - // Subscribe mempool with the mpsc stream created - mempool - .start_executor_with_txn_stream::( - Box::pin(ReceiverStream::new(txn_stream_rx)), - mock_network, - ) - .await; - - // Send the transactions - let txn_stream_tx1 = txn_stream_tx.clone(); - let mock_id1 = mock_id.clone(); - let txns = transactions.clone(); - tokio::task::spawn(async move { - for txn in txns { - txn_stream_tx1 - .send((txn.clone(), mock_id1.clone())) - .await - .unwrap(); - } - }) - .await - .expect("Send failed"); - - sleep(Duration::from_secs(2)).await; - mempool.stop_executor_without_unsubscribe().await; - - // Get the transactions from the mempool - let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX); - - // We should obtain the same amount of transactions - assert_eq!(obtained_txns.len(), min_tps); - - // Now send more transactions via the transaction stream. - let txns = transactions.clone(); - tokio::task::spawn(async move { - for txn in txns { - txn_stream_tx - .send((txn.clone(), mock_id.clone())) - .await - .expect_err("Send should fail, executor is stopped"); - } - }) - .await - .expect("Send failed"); - - sleep(Duration::from_secs(2)).await; - - // Call stop again, nothing should happen. - mempool.stop_executor_without_unsubscribe().await; - - // We should not obtain any, since the executor should not be running. - let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX); - - // We should obtain 0 transactions - assert_eq!(obtained_txns.len(), 0_usize); - - // Restart the executor - // Create a MPSC channel to directly send transactions to the mempool - let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64); - - // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); - let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); - let mock_network = Arc::new(hub.new_network()); - - // Subscribe mempool with the mpsc stream created - mempool - .start_executor_with_txn_stream::( - Box::pin(ReceiverStream::new(txn_stream_rx)), - mock_network, - ) - .await; - - // Send the transactions - let txns = transactions.clone(); - tokio::task::spawn(async move { - for txn in txns { - txn_stream_tx - .send((txn.clone(), mock_id.clone())) - .await - .unwrap(); - } - }) - .await - .expect("Send failed"); - - sleep(Duration::from_secs(2)).await; - mempool.stop_executor_without_unsubscribe().await; - - // Get the transactions from the mempool - let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX); - - // We should obtain same number of txns - assert_eq!(obtained_txns.len(), min_tps); -} - fn create_dummy_micro_block(transactions: Option>) -> Block { // Build a dummy MicroHeader let micro_header = MicroHeader { @@ -810,7 +695,7 @@ async fn multiple_transactions_multiple_senders() { } } -#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +#[test(tokio::test(flavor = "current_thread"))] async fn mempool_tps() { let min_tps = tps_setting(100); let mut rng = test_rng(true); @@ -902,7 +787,7 @@ async fn mempool_tps() { } } -#[test(tokio::test)] +#[test(tokio::test(flavor = "current_thread"))] async fn multiple_start_stop() { let mut rng = test_rng(true); let time = Arc::new(OffsetTime::new()); @@ -934,7 +819,7 @@ async fn multiple_start_stop() { }; mempool_transactions.push(mempool_transaction); } - let (txns, _) = generate_transactions(mempool_transactions, true); + let (transactions, _) = generate_transactions(mempool_transactions, true); log::debug!("Done generating transactions and accounts"); // Add validator to genesis @@ -973,11 +858,120 @@ async fn multiple_start_stop() { .unwrap(), )); + let min_tps = num_txns as usize; + + // Create a MPSC channel to directly send transactions to the mempool + let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64); + + // Create mempool and subscribe with a custom txn stream. + let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); + let mut hub = MockHub::new(); + let mock_id = MockId::new(hub.new_address().into()); + let mock_network = Arc::new(hub.new_network()); + + // Subscribe mempool with the mpsc stream created + mempool + .start_executor_with_txn_stream::( + Box::pin(ReceiverStream::new(txn_stream_rx)), + mock_network, + ) + .await; + + // Send the transactions + let txn_stream_tx1 = txn_stream_tx.clone(); + let mock_id1 = mock_id.clone(); + let txns = transactions.clone(); + tokio::task::spawn(async move { + for txn in txns { + txn_stream_tx1 + .send((txn.clone(), mock_id1.clone())) + .await + .unwrap(); + } + }) + .await + .expect("Send failed"); + + // Yield execution back to the runtime so that the txn_stream can get polled + yield_now().await; + + mempool.stop_executor_without_unsubscribe().await; + + // Get the transactions from the mempool + let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX); + + // We should obtain the same amount of transactions + assert_eq!(obtained_txns.len(), min_tps); + + // Now send more transactions via the transaction stream. + let txns = transactions.clone(); + tokio::task::spawn(async move { + for txn in txns { + txn_stream_tx + .send((txn.clone(), mock_id.clone())) + .await + .expect_err("Send should fail, executor is stopped"); + } + }) + .await + .expect("Send failed"); + + // Yield execution back to the runtime so that the txn_stream can get polled + yield_now().await; + + // Call stop again, nothing should happen. + mempool.stop_executor_without_unsubscribe().await; + + // We should not obtain any, since the executor should not be running. + let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX); + + // We should obtain 0 transactions + assert_eq!(obtained_txns.len(), 0_usize); + + // Restart the executor + // Create a MPSC channel to directly send transactions to the mempool + let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64); + + // Create mempool and subscribe with a custom txn stream. + let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); + let mut hub = MockHub::new(); + let mock_id = MockId::new(hub.new_address().into()); + let mock_network = Arc::new(hub.new_network()); + + // Subscribe mempool with the mpsc stream created + mempool + .start_executor_with_txn_stream::( + Box::pin(ReceiverStream::new(txn_stream_rx)), + mock_network, + ) + .await; + // Send the transactions - multiple_start_stop_send(blockchain, txns).await; + let txns = transactions.clone(); + tokio::task::spawn(async move { + for txn in txns { + txn_stream_tx + .send((txn.clone(), mock_id.clone())) + .await + .unwrap(); + } + }) + .await + .expect("Send failed"); + + // Yield execution back to the runtime so that the txn_stream can get polled + yield_now().await; + + mempool.stop_executor_without_unsubscribe().await; + + // Get the transactions from the mempool + let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX); + + // We should obtain same number of txns + assert_eq!(obtained_txns.len(), min_tps); } -#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +#[test(tokio::test(flavor = "current_thread"))] async fn mempool_update() { let mut rng = test_rng(true); let time = Arc::new(OffsetTime::new()); @@ -1170,7 +1164,7 @@ async fn mempool_update() { } } -#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +#[test(tokio::test(flavor = "current_thread"))] // The purpose of this test is to verify that aged transactions, that is, // transactions that are stored in the mempool for which the validity // window is already expired, are properly pruned from the mempool. @@ -1301,7 +1295,7 @@ async fn mempool_update_aged_transaction() { ); } -#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +#[test(tokio::test(flavor = "current_thread"))] async fn mempool_update_not_enough_balance() { let mut rng = test_rng(true); let time = Arc::new(OffsetTime::new()); @@ -1462,7 +1456,7 @@ async fn mempool_update_not_enough_balance() { } } -#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +#[test(tokio::test(flavor = "current_thread"))] async fn mempool_update_pruned_account() { let mut rng = test_rng(true); let time = Arc::new(OffsetTime::new()); @@ -1622,7 +1616,7 @@ async fn mempool_update_pruned_account() { } } -#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +#[test(tokio::test(flavor = "current_thread"))] async fn mempool_basic_prioritization_control_tx() { let time = Arc::new(OffsetTime::new()); let env = MdbxDatabase::new_volatile(Default::default()).unwrap(); @@ -1706,7 +1700,7 @@ async fn mempool_basic_prioritization_control_tx() { ); } -#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +#[test(tokio::test(flavor = "current_thread"))] async fn mempool_regular_and_control_tx() { let mut rng = test_rng(true); let balance = 100_000_000;