diff --git a/tx-pool/src/component/mod.rs b/tx-pool/src/component/mod.rs index beac9064d82..99937a484c8 100644 --- a/tx-pool/src/component/mod.rs +++ b/tx-pool/src/component/mod.rs @@ -7,8 +7,8 @@ pub(crate) mod orphan; pub(crate) mod pool_map; pub(crate) mod recent_reject; pub(crate) mod sort_key; - #[cfg(test)] mod tests; +pub(crate) mod verify_queue; pub use self::entry::TxEntry; diff --git a/tx-pool/src/component/tests/chunk.rs b/tx-pool/src/component/tests/chunk.rs index 71d2c70ffe7..c2ea1a0f1c7 100644 --- a/tx-pool/src/component/tests/chunk.rs +++ b/tx-pool/src/component/tests/chunk.rs @@ -1,23 +1,61 @@ +use crate::component::tests::util::build_tx; +use crate::component::verify_queue::{Entry, VerifyQueue}; use ckb_types::core::TransactionBuilder; +use tokio::select; use tokio::sync::watch; +use tokio::time::sleep; -use crate::verify_queue::{Entry, VerifyQueue}; - -#[test] -fn basic() { +#[tokio::test] +async fn verify_queue_basic() { let tx = TransactionBuilder::default().build(); let entry = Entry { tx: tx.clone(), remote: None, }; + let tx2 = build_tx(vec![(&tx.hash(), 0)], 1); + let id = tx.proposal_short_id(); - let (queue_tx, _queue_rx) = watch::channel(0_usize); + let (queue_tx, mut queue_rx) = watch::channel(0_usize); + let (exit_tx, mut exit_rx) = watch::channel(()); let mut queue = VerifyQueue::new(queue_tx); + let count = tokio::spawn(async move { + let mut counts = vec![]; + loop { + select! { + _ = queue_rx.changed() => { + let value = queue_rx.borrow().to_owned(); + counts.push(value); + } + _ = exit_rx.changed() => { + break; + } + } + } + counts + }); + + assert!(queue.add_tx(tx.clone(), None).unwrap()); + sleep(std::time::Duration::from_millis(100)).await; + + assert!(!queue.add_tx(tx.clone(), None).unwrap()); - assert!(queue.add_tx(tx.clone(), None)); assert_eq!(queue.pop_first().as_ref(), Some(&entry)); assert!(!queue.contains_key(&id)); - assert!(queue.add_tx(tx, None)); + + assert!(queue.add_tx(tx.clone(), None).unwrap()); + sleep(std::time::Duration::from_millis(100)).await; + + assert_eq!(queue.pop_first().as_ref(), Some(&entry)); + + assert!(queue.add_tx(tx.clone(), None).unwrap()); + sleep(std::time::Duration::from_millis(100)).await; + + assert!(queue.add_tx(tx2.clone(), None).unwrap()); + sleep(std::time::Duration::from_millis(100)).await; + + exit_tx.send(()).unwrap(); + let counts = count.await.unwrap(); + assert_eq!(counts, vec![1, 1, 1, 2]); queue.clear(); assert!(!queue.contains_key(&id)); diff --git a/tx-pool/src/verify_queue.rs b/tx-pool/src/component/verify_queue.rs similarity index 89% rename from tx-pool/src/verify_queue.rs rename to tx-pool/src/component/verify_queue.rs index 3e3dc051350..04eb0e782b4 100644 --- a/tx-pool/src/verify_queue.rs +++ b/tx-pool/src/component/verify_queue.rs @@ -4,7 +4,7 @@ extern crate rustc_hash; extern crate slab; use ckb_network::PeerIndex; use ckb_types::{ - core::{Cycle, TransactionView}, + core::{tx_pool::Reject, Cycle, TransactionView}, packed::ProposalShortId, }; use ckb_util::shrink_to_fit; @@ -12,7 +12,7 @@ use multi_index_map::MultiIndexMap; use tokio::sync::watch; const DEFAULT_MAX_VERIFY_TRANSACTIONS: usize = 100; -const SHRINK_THRESHOLD: usize = 120; +const SHRINK_THRESHOLD: usize = 100; /// The verify queue is a priority queue of transactions to verify. #[derive(Debug, Clone, Eq)] @@ -120,9 +120,19 @@ impl VerifyQueue { /// If the queue did not have this tx present, true is returned. /// If the queue did have this tx present, false is returned. - pub fn add_tx(&mut self, tx: TransactionView, remote: Option<(Cycle, PeerIndex)>) -> bool { + pub fn add_tx( + &mut self, + tx: TransactionView, + remote: Option<(Cycle, PeerIndex)>, + ) -> Result { if self.contains_key(&tx.proposal_short_id()) { - return false; + return Ok(false); + } + if self.is_full() { + return Err(Reject::Full(format!( + "chunk is full, tx_hash: {:#x}", + tx.hash() + ))); } self.inner.insert(VerifyEntry { id: tx.proposal_short_id(), @@ -132,9 +142,8 @@ impl VerifyQueue { .as_millis() as u64, inner: Entry { tx, remote }, }); - eprintln!("added to queue len: {:?}", self.len()); self.queue_tx.send(self.len()).unwrap(); - true + Ok(true) } /// Clears the map, removing all elements. diff --git a/tx-pool/src/lib.rs b/tx-pool/src/lib.rs index a117325c87e..b22c3248b5c 100644 --- a/tx-pool/src/lib.rs +++ b/tx-pool/src/lib.rs @@ -12,7 +12,6 @@ mod process; pub mod service; mod util; mod verify_mgr; -pub mod verify_queue; pub use ckb_jsonrpc_types::BlockTemplate; pub use component::entry::TxEntry; diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index fb12691e947..0c7b5cea606 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -731,16 +731,9 @@ impl TxPoolService { &self, tx: TransactionView, remote: Option<(Cycle, PeerIndex)>, - ) -> Result<(), Reject> { - let tx_hash = tx.hash(); + ) -> Result { let mut chunk = self.verify_queue.write().await; - if !chunk.add_tx(tx, remote) { - return Err(Reject::Full(format!( - "chunk is full, tx_hash: {:#x}", - tx_hash - ))); - } - Ok(()) + chunk.add_tx(tx, remote) } pub(crate) async fn _process_tx( diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index 43bc55d8d2e..7b87ff9e7a7 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -4,11 +4,11 @@ use crate::block_assembler::{self, BlockAssembler}; use crate::callback::{Callbacks, PendingCallback, ProposedCallback, RejectCallback}; use crate::component::orphan::OrphanPool; use crate::component::pool_map::{PoolEntry, Status}; +use crate::component::verify_queue::VerifyQueue; use crate::error::{handle_recv_error, handle_send_cmd_error, handle_try_send_error}; use crate::pool::TxPool; use crate::util::after_delay_window; use crate::verify_mgr::VerifyMgr; -use crate::verify_queue::VerifyQueue; use ckb_app_config::{BlockAssemblerConfig, TxPoolConfig}; use ckb_async_runtime::Handle; use ckb_chain_spec::consensus::Consensus; diff --git a/tx-pool/src/verify_mgr.rs b/tx-pool/src/verify_mgr.rs index 1e91718d0e4..5129af9275d 100644 --- a/tx-pool/src/verify_mgr.rs +++ b/tx-pool/src/verify_mgr.rs @@ -1,7 +1,7 @@ extern crate num_cpus; use crate::component::entry::TxEntry; +use crate::component::verify_queue::{Entry, VerifyQueue}; use crate::try_or_return_with_snapshot; -use crate::verify_queue::{Entry, VerifyQueue}; use crate::{error::Reject, service::TxPoolService}; use ckb_chain_spec::consensus::Consensus; use ckb_logger::info;