Skip to content

Commit

Permalink
refactor and add tests for verify_queue
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Jan 16, 2024
1 parent 9fbea1b commit eb3787f
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 26 deletions.
2 changes: 1 addition & 1 deletion tx-pool/src/component/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ pub(crate) mod orphan;
pub(crate) mod pool_map;
pub(crate) mod recent_reject;
pub(crate) mod sort_key;

#[cfg(test)]
mod tests;
pub(crate) mod verify_queue;

pub use self::entry::TxEntry;
52 changes: 45 additions & 7 deletions tx-pool/src/component/tests/chunk.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,61 @@
use crate::component::tests::util::build_tx;
use crate::component::verify_queue::{Entry, VerifyQueue};
use ckb_types::core::TransactionBuilder;
use tokio::select;
use tokio::sync::watch;
use tokio::time::sleep;

use crate::verify_queue::{Entry, VerifyQueue};

#[test]
fn basic() {
#[tokio::test]
async fn verify_queue_basic() {
let tx = TransactionBuilder::default().build();
let entry = Entry {
tx: tx.clone(),
remote: None,
};
let tx2 = build_tx(vec![(&tx.hash(), 0)], 1);

let id = tx.proposal_short_id();
let (queue_tx, _queue_rx) = watch::channel(0_usize);
let (queue_tx, mut queue_rx) = watch::channel(0_usize);
let (exit_tx, mut exit_rx) = watch::channel(());
let mut queue = VerifyQueue::new(queue_tx);
let count = tokio::spawn(async move {
let mut counts = vec![];
loop {
select! {
_ = queue_rx.changed() => {
let value = queue_rx.borrow().to_owned();
counts.push(value);
}
_ = exit_rx.changed() => {
break;
}
}
}
counts
});

assert!(queue.add_tx(tx.clone(), None).unwrap());
sleep(std::time::Duration::from_millis(100)).await;

assert!(!queue.add_tx(tx.clone(), None).unwrap());

assert!(queue.add_tx(tx.clone(), None));
assert_eq!(queue.pop_first().as_ref(), Some(&entry));
assert!(!queue.contains_key(&id));
assert!(queue.add_tx(tx, None));

assert!(queue.add_tx(tx.clone(), None).unwrap());
sleep(std::time::Duration::from_millis(100)).await;

assert_eq!(queue.pop_first().as_ref(), Some(&entry));

assert!(queue.add_tx(tx.clone(), None).unwrap());
sleep(std::time::Duration::from_millis(100)).await;

assert!(queue.add_tx(tx2.clone(), None).unwrap());
sleep(std::time::Duration::from_millis(100)).await;

exit_tx.send(()).unwrap();
let counts = count.await.unwrap();
assert_eq!(counts, vec![1, 1, 1, 2]);

queue.clear();
assert!(!queue.contains_key(&id));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ extern crate rustc_hash;
extern crate slab;
use ckb_network::PeerIndex;
use ckb_types::{
core::{Cycle, TransactionView},
core::{tx_pool::Reject, Cycle, TransactionView},
packed::ProposalShortId,
};
use ckb_util::shrink_to_fit;
use multi_index_map::MultiIndexMap;
use tokio::sync::watch;

const DEFAULT_MAX_VERIFY_TRANSACTIONS: usize = 100;
const SHRINK_THRESHOLD: usize = 120;
const SHRINK_THRESHOLD: usize = 100;

/// The verify queue is a priority queue of transactions to verify.
#[derive(Debug, Clone, Eq)]
Expand Down Expand Up @@ -120,9 +120,19 @@ impl VerifyQueue {

/// If the queue did not have this tx present, true is returned.
/// If the queue did have this tx present, false is returned.
pub fn add_tx(&mut self, tx: TransactionView, remote: Option<(Cycle, PeerIndex)>) -> bool {
pub fn add_tx(
&mut self,
tx: TransactionView,
remote: Option<(Cycle, PeerIndex)>,
) -> Result<bool, Reject> {
if self.contains_key(&tx.proposal_short_id()) {
return false;
return Ok(false);
}
if self.is_full() {
return Err(Reject::Full(format!(
"chunk is full, tx_hash: {:#x}",
tx.hash()
)));
}
self.inner.insert(VerifyEntry {
id: tx.proposal_short_id(),
Expand All @@ -132,9 +142,8 @@ impl VerifyQueue {
.as_millis() as u64,
inner: Entry { tx, remote },
});
eprintln!("added to queue len: {:?}", self.len());
self.queue_tx.send(self.len()).unwrap();
true
Ok(true)
}

/// Clears the map, removing all elements.
Expand Down
1 change: 0 additions & 1 deletion tx-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ mod process;
pub mod service;
mod util;
mod verify_mgr;
pub mod verify_queue;

pub use ckb_jsonrpc_types::BlockTemplate;
pub use component::entry::TxEntry;
Expand Down
11 changes: 2 additions & 9 deletions tx-pool/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,16 +731,9 @@ impl TxPoolService {
&self,
tx: TransactionView,
remote: Option<(Cycle, PeerIndex)>,
) -> Result<(), Reject> {
let tx_hash = tx.hash();
) -> Result<bool, Reject> {
let mut chunk = self.verify_queue.write().await;
if !chunk.add_tx(tx, remote) {
return Err(Reject::Full(format!(
"chunk is full, tx_hash: {:#x}",
tx_hash
)));
}
Ok(())
chunk.add_tx(tx, remote)
}

pub(crate) async fn _process_tx(
Expand Down
2 changes: 1 addition & 1 deletion tx-pool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use crate::block_assembler::{self, BlockAssembler};
use crate::callback::{Callbacks, PendingCallback, ProposedCallback, RejectCallback};
use crate::component::orphan::OrphanPool;
use crate::component::pool_map::{PoolEntry, Status};
use crate::component::verify_queue::VerifyQueue;
use crate::error::{handle_recv_error, handle_send_cmd_error, handle_try_send_error};
use crate::pool::TxPool;
use crate::util::after_delay_window;
use crate::verify_mgr::VerifyMgr;
use crate::verify_queue::VerifyQueue;
use ckb_app_config::{BlockAssemblerConfig, TxPoolConfig};
use ckb_async_runtime::Handle;
use ckb_chain_spec::consensus::Consensus;
Expand Down
2 changes: 1 addition & 1 deletion tx-pool/src/verify_mgr.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
extern crate num_cpus;
use crate::component::entry::TxEntry;
use crate::component::verify_queue::{Entry, VerifyQueue};
use crate::try_or_return_with_snapshot;
use crate::verify_queue::{Entry, VerifyQueue};
use crate::{error::Reject, service::TxPoolService};
use ckb_chain_spec::consensus::Consensus;
use ckb_logger::info;
Expand Down

0 comments on commit eb3787f

Please sign in to comment.