Skip to content

Commit

Permalink
remove interval in worker
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Jan 23, 2024
1 parent 0f87718 commit 37f32ae
Showing 1 changed file with 29 additions and 30 deletions.
59 changes: 29 additions & 30 deletions tx-pool/src/verify_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use ckb_logger::info;
use ckb_script::ChunkCommand;
use ckb_stop_handler::CancellationToken;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{watch, RwLock};
use tokio::task::JoinHandle;

Expand All @@ -14,6 +13,7 @@ struct Worker {
command_rx: watch::Receiver<ChunkCommand>,
service: TxPoolService,
exit_signal: CancellationToken,
status: ChunkCommand,
}

impl Clone for Worker {
Expand All @@ -23,6 +23,7 @@ impl Clone for Worker {
command_rx: self.command_rx.clone(),
exit_signal: self.exit_signal.clone(),
service: self.service.clone(),
status: self.status.clone(),
}
}
}
Expand All @@ -39,57 +40,55 @@ impl Worker {
tasks,
command_rx,
exit_signal,
status: ChunkCommand::Resume,
}
}

pub fn start(mut self) -> JoinHandle<()> {
tokio::spawn(async move {
// use a notify to receive the queue change event makes sure the worker
// know immediately when the queue is changed, otherwise we may have a delay of `interval`
let queue_ready = self.tasks.read().await.subscribe();
let mut interval = tokio::time::interval(Duration::from_millis(500));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
let try_pick = tokio::select! {
tokio::select! {
_ = self.exit_signal.cancelled() => {
break;
}
_ = self.command_rx.changed() => {
*self.command_rx.borrow() == ChunkCommand::Resume
self.status = self.command_rx.borrow().to_owned();
self.process_inner().await;
}
_ = queue_ready.notified() => {
true
}
_ = interval.tick() => {
true
self.process_inner().await;
}
};
if try_pick {
self.process_inner().await;
}
}
})
}

async fn process_inner(&mut self) {
if self.tasks.read().await.peek().is_none() {
return;
}
// pick a entry to run verify
let entry = match self.tasks.write().await.pop_first() {
Some(entry) => entry,
None => return,
};
loop {
if self.status != ChunkCommand::Resume {
return;
}
// cheap query to check queue is not empty
if self.tasks.read().await.peek().is_none() {
return;
}
// pick a entry to run verify
let entry = match self.tasks.write().await.pop_first() {
Some(entry) => entry,
None => return,
};

let (res, snapshot) = self
.service
.run_verify_tx(entry.clone(), &mut self.command_rx)
.await
.expect("run_verify_tx failed");
let (res, snapshot) = self
.service
.run_verify_tx(entry.clone(), &mut self.command_rx)
.await
.expect("run_verify_tx failed");

self.service
.after_process(entry.tx, entry.remote, &snapshot, &res)
.await;
self.service
.after_process(entry.tx, entry.remote, &snapshot, &res)
.await;
}
}
}

Expand Down

0 comments on commit 37f32ae

Please sign in to comment.