Skip to content

Commit

Permalink
use notify in verify queue
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Jan 20, 2024
1 parent 617c3bd commit f7fd467
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 20 deletions.
13 changes: 6 additions & 7 deletions tx-pool/src/component/tests/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@ async fn verify_queue_basic() {
let id = tx.proposal_short_id();
let (exit_tx, mut exit_rx) = watch::channel(());
let mut queue = VerifyQueue::new();
let mut queue_rx = queue.subscribe();
let queue_rx = queue.subscribe();
let count = tokio::spawn(async move {
let mut counts = vec![];
let mut count = 0;
loop {
select! {
_ = queue_rx.changed() => {
let value = queue_rx.borrow().to_owned();
counts.push(value);
_ = queue_rx.notified() => {
count += 1;
}
_ = exit_rx.changed() => {
break;
}
}
}
counts
count
});

assert!(queue.add_tx(tx.clone(), None).unwrap());
Expand All @@ -55,7 +54,7 @@ async fn verify_queue_basic() {

exit_tx.send(()).unwrap();
let counts = count.await.unwrap();
assert_eq!(counts, vec![1, 1, 1, 2]);
assert_eq!(counts, 4);

let cur = queue.pop_first();
assert_eq!(cur.unwrap().tx, tx);
Expand Down
19 changes: 8 additions & 11 deletions tx-pool/src/component/verify_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use ckb_types::{
};
use ckb_util::shrink_to_fit;
use multi_index_map::MultiIndexMap;
use tokio::sync::watch;
use std::sync::Arc;
use tokio::sync::Notify;

const DEFAULT_MAX_VERIFY_TRANSACTIONS: usize = 100;
const SHRINK_THRESHOLD: usize = 100;
Expand Down Expand Up @@ -46,20 +47,16 @@ pub struct VerifyEntry {
pub struct VerifyQueue {
/// inner tx entry
inner: MultiIndexVerifyEntryMap,
/// when queue is changed, notify the tx-pool to update the txs count
queue_tx: watch::Sender<usize>,
/// subscribe this channel to get the txs count in the queue
queue_rx: watch::Receiver<usize>,
/// subscribe this notify to get be notified when there is item in the queue
ready_rx: Arc<Notify>,
}

impl VerifyQueue {
/// Create a new VerifyQueue
pub(crate) fn new() -> Self {
let (queue_tx, queue_rx) = watch::channel(0_usize);
VerifyQueue {
inner: MultiIndexVerifyEntryMap::default(),
queue_tx,
queue_rx,
ready_rx: Arc::new(Notify::new()),
}
}

Expand Down Expand Up @@ -90,8 +87,8 @@ impl VerifyQueue {
}

/// get a queue_rx to subscribe the txs count in the queue
pub fn subscribe(&self) -> watch::Receiver<usize> {
self.queue_rx.clone()
pub fn subscribe(&self) -> Arc<Notify> {
Arc::clone(&self.ready_rx)
}

/// Remove a tx from the queue
Expand Down Expand Up @@ -148,7 +145,7 @@ impl VerifyQueue {
added_time: unix_time_as_millis(),
inner: Entry { tx, remote },
});
self.queue_tx.send(self.len()).unwrap();
self.ready_rx.notify_one();
Ok(true)
}

Expand Down
4 changes: 2 additions & 2 deletions tx-pool/src/verify_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Worker {
pub async fn start(mut self) -> JoinHandle<()> {
// use a channel to receive the queue change event makes sure the worker
// know immediately when the queue is changed, otherwise we may have a delay of `interval`
let mut queue_rx = self.tasks.read().await.subscribe();
let queue_ready = self.tasks.read().await.subscribe();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(500));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Expand All @@ -57,7 +57,7 @@ impl Worker {
_ = self.command_rx.changed() => {
*self.command_rx.borrow() == ChunkCommand::Resume
}
_ = queue_rx.changed() => {
_ = queue_ready.notified() => {
true
}
_ = interval.tick() => {
Expand Down

0 comments on commit f7fd467

Please sign in to comment.