Skip to content

Commit

Permalink
refactor verify queue channel
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Jan 18, 2024
1 parent 650b17e commit 7a789d6
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 26 deletions.
4 changes: 2 additions & 2 deletions tx-pool/src/component/tests/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ async fn verify_queue_basic() {
let tx2 = build_tx(vec![(&tx.hash(), 0)], 1);

let id = tx.proposal_short_id();
let (queue_tx, mut queue_rx) = watch::channel(0_usize);
let (exit_tx, mut exit_rx) = watch::channel(());
let mut queue = VerifyQueue::new(queue_tx);
let mut queue = VerifyQueue::new();
let mut queue_rx = queue.subscribe();
let count = tokio::spawn(async move {
let mut counts = vec![];
loop {
Expand Down
13 changes: 11 additions & 2 deletions tx-pool/src/component/verify_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,20 @@ pub struct VerifyEntry {
pub struct VerifyQueue {
/// inner tx entry
inner: MultiIndexVerifyEntryMap,
/// used to notify the tx-pool to update the txs count
/// when queue is changed, notify the tx-pool to update the txs count
queue_tx: watch::Sender<usize>,
/// subscribe this channel to get the txs count in the queue
queue_rx: watch::Receiver<usize>,
}

impl VerifyQueue {
/// Create a new VerifyQueue
pub(crate) fn new(queue_tx: watch::Sender<usize>) -> Self {
pub(crate) fn new() -> Self {
let (queue_tx, queue_rx) = watch::channel(0_usize);
VerifyQueue {
inner: MultiIndexVerifyEntryMap::default(),
queue_tx,
queue_rx,
}
}

Expand Down Expand Up @@ -85,6 +89,11 @@ impl VerifyQueue {
shrink_to_fit!(self.inner, SHRINK_THRESHOLD);
}

/// get a queue_rx to subscribe the txs count in the queue
pub fn subscribe(&self) -> watch::Receiver<usize> {
self.queue_rx.clone()
}

/// Remove a tx from the queue
pub fn remove_tx(&mut self, id: &ProposalShortId) -> Option<Entry> {
self.inner.remove_by_id(id).map(|e| {
Expand Down
12 changes: 3 additions & 9 deletions tx-pool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,7 @@ impl TxPoolServiceBuilder {
}
};

let (queue_tx, queue_rx) = watch::channel(0_usize);
let verify_queue = Arc::new(RwLock::new(VerifyQueue::new(queue_tx)));
let verify_queue = Arc::new(RwLock::new(VerifyQueue::new()));
let (block_assembler_sender, mut block_assembler_receiver) = self.block_assembler_channel;
let service = TxPoolService {
tx_pool_config: Arc::new(tx_pool.config.clone()),
Expand All @@ -489,13 +488,8 @@ impl TxPoolServiceBuilder {
after_delay: Arc::new(AtomicBool::new(after_delay_window)),
};

let mut verify_mgr = VerifyMgr::new(
service.clone(),
self.chunk_rx,
self.signal_receiver.clone(),
verify_queue,
queue_rx,
);
let mut verify_mgr =
VerifyMgr::new(service.clone(), self.chunk_rx, self.signal_receiver.clone());
self.handle.spawn(async move { verify_mgr.run().await });

let mut receiver = self.receiver;
Expand Down
20 changes: 7 additions & 13 deletions tx-pool/src/verify_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use tokio::task::JoinHandle;
struct Worker {
tasks: Arc<RwLock<VerifyQueue>>,
command_rx: watch::Receiver<ChunkCommand>,
queue_rx: watch::Receiver<usize>,
service: TxPoolService,
exit_signal: CancellationToken,
}
Expand All @@ -22,7 +21,6 @@ impl Clone for Worker {
Self {
tasks: Arc::clone(&self.tasks),
command_rx: self.command_rx.clone(),
queue_rx: self.queue_rx.clone(),
exit_signal: self.exit_signal.clone(),
service: self.service.clone(),
}
Expand All @@ -34,19 +32,18 @@ impl Worker {
service: TxPoolService,
tasks: Arc<RwLock<VerifyQueue>>,
command_rx: watch::Receiver<ChunkCommand>,
queue_rx: watch::Receiver<usize>,
exit_signal: CancellationToken,
) -> Self {
Worker {
service,
tasks,
command_rx,
queue_rx,
exit_signal,
}
}

pub fn start(mut self) -> JoinHandle<()> {
pub async fn start(mut self) -> JoinHandle<()> {
let mut queue_rx = self.tasks.read().await.subscribe();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(500));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Expand All @@ -55,10 +52,10 @@ impl Worker {
_ = self.exit_signal.cancelled() => {
break;
}
_ = self.queue_rx.changed() => {
_ = self.command_rx.changed() => {
true
}
_ = self.command_rx.changed() => {
_ = queue_rx.changed() => {
true
}
_ = interval.tick() => {
Expand All @@ -73,7 +70,7 @@ impl Worker {
}

async fn process_inner(&mut self) {
if self.command_rx.borrow().to_owned() == ChunkCommand::Suspend {
if self.command_rx.borrow().to_owned() != ChunkCommand::Resume {
return;
}

Expand Down Expand Up @@ -110,12 +107,10 @@ impl VerifyMgr {
service: TxPoolService,
chunk_rx: watch::Receiver<ChunkCommand>,
signal_exit: CancellationToken,
verify_queue: Arc<RwLock<VerifyQueue>>,
queue_rx: watch::Receiver<usize>,
) -> Self {
let workers: Vec<_> = (0..num_cpus::get())
.map({
let tasks = Arc::clone(&verify_queue);
let tasks = Arc::clone(&service.verify_queue);
let signal_exit = signal_exit.clone();
move |_| {
let (child_tx, child_rx) = watch::channel(ChunkCommand::Resume);
Expand All @@ -125,7 +120,6 @@ impl VerifyMgr {
service.clone(),
Arc::clone(&tasks),
child_rx,
queue_rx.clone(),
signal_exit.clone(),
),
)
Expand All @@ -151,7 +145,7 @@ impl VerifyMgr {
async fn start_loop(&mut self) {
let mut join_handles = Vec::new();
for w in self.workers.iter_mut() {
let h = w.1.clone().start();
let h = w.1.clone().start().await;
join_handles.push(h);
}
self.join_handles.replace(join_handles);
Expand Down

0 comments on commit 7a789d6

Please sign in to comment.