From b7f7ca4b6769c2f4a70f9b3303aa42a2ce27b46d Mon Sep 17 00:00:00 2001 From: yukang Date: Thu, 18 Jan 2024 11:01:04 +0800 Subject: [PATCH] add comments and more cleanup --- script/Cargo.toml | 2 +- script/src/syscalls/spawn.rs | 3 +++ script/src/verify.rs | 28 ++++++++------------- tx-pool/src/component/verify_queue.rs | 12 ++++----- tx-pool/src/process.rs | 32 ++++++++++++------------ tx-pool/src/verify_mgr.rs | 6 +++++ verification/src/transaction_verifier.rs | 3 +-- 7 files changed, 43 insertions(+), 43 deletions(-) diff --git a/script/Cargo.toml b/script/Cargo.toml index db7f913e02..3c492a727f 100644 --- a/script/Cargo.toml +++ b/script/Cargo.toml @@ -28,7 +28,7 @@ ckb-logger = { path = "../util/logger", version = "= 0.114.0-pre", optional = tr serde = { version = "1.0", features = ["derive"] } ckb-error = { path = "../error", version = "= 0.114.0-pre" } ckb-chain-spec = { path = "../spec", version = "= 0.114.0-pre" } -tokio = { version = "1.35.0", features = ["sync", "rt-multi-thread"] } +tokio = { version = "1.35.0", features = ["rt-multi-thread"] } [dev-dependencies] proptest = "1.0" diff --git a/script/src/syscalls/spawn.rs b/script/src/syscalls/spawn.rs index 15301bc199..9b7fee0298 100644 --- a/script/src/syscalls/spawn.rs +++ b/script/src/syscalls/spawn.rs @@ -235,6 +235,9 @@ where Ok(true) } Err(err) => { + // `CyclesExceeded` for old version snapshot + // `Pause` for new version suspend with pause signal + // Maybe we need to cleanup in future if matches!(err, VMError::Pause | VMError::CyclesExceeded) { let mut context = self.context.lock().map_err(|e| { VMError::Unexpected(format!("Failed to acquire lock: {}", e)) diff --git a/script/src/verify.rs b/script/src/verify.rs index b4e3cb558c..cc8dee5bde 100644 --- a/script/src/verify.rs +++ b/script/src/verify.rs @@ -693,8 +693,9 @@ impl { - // FIXME: we need to cleanup this later, state will not contain snapshot - panic!("unexpect suspend in resumable_verify_with_signal"); + // FIXME(yukang): we need to cleanup this later, state will not contain snapshot + unreachable!("unexpect suspend in resumable_verify_with_signal"); } Err(e) => { #[cfg(feature = "logging")] @@ -1051,10 +1052,7 @@ impl Ok(ChunkState::Completed(cycles)), - Err(e) => Err(e), - } + verifier.verify().map(ChunkState::Completed) } else { self.chunk_run_with_signal(group, max_cycles, command_rx) .await @@ -1200,7 +1198,7 @@ impl, context: Arc>, signal: &mut Receiver, @@ -1435,11 +1433,7 @@ async fn run_vms_with_signal( exit_code ))}, (Err(err), _) => { - let map_vm_internal_error = |error: VMInternalError| match error { - VMInternalError::CyclesExceeded => ScriptError::ExceededMaximumCycles(max_cycles), - _ => ScriptError::VMInternalError(error), - }; - return Err(map_vm_internal_error(err)); + return Err(ScriptError::VMInternalError(err)); } } @@ -1456,13 +1450,13 @@ async fn run_vms_child( context: Arc>, ) { let (mut exit_code, mut cycles, mut spawn_data) = (0, 0, None); - child_recv.mark_changed(); // mark changed to make sure we can receive initial command // and start to run immediately + child_recv.mark_changed(); loop { select! { _ = child_recv.changed() => { - match child_recv.borrow().to_owned() { + match *child_recv.borrow() { ChunkCommand::Stop => { let exit = (Err(ckb_vm::Error::Unexpected("stopped".to_string())), cycles); let _ = finished_send.send(exit); diff --git a/tx-pool/src/component/verify_queue.rs b/tx-pool/src/component/verify_queue.rs index 04eb0e782b..1da2add87f 100644 --- a/tx-pool/src/component/verify_queue.rs +++ b/tx-pool/src/component/verify_queue.rs @@ -3,6 +3,7 @@ extern crate rustc_hash; extern crate slab; use ckb_network::PeerIndex; +use ckb_systemtime::unix_time_as_millis; use ckb_types::{ core::{tx_pool::Reject, Cycle, TransactionView}, packed::ProposalShortId, @@ -14,7 +15,7 @@ use tokio::sync::watch; const DEFAULT_MAX_VERIFY_TRANSACTIONS: usize = 100; const SHRINK_THRESHOLD: usize = 100; -/// The verify queue is a priority queue of transactions to verify. +/// The verify queue Entry to verify. #[derive(Debug, Clone, Eq)] pub struct Entry { pub(crate) tx: TransactionView, @@ -71,7 +72,7 @@ impl VerifyQueue { /// Returns true if the queue is full. pub fn is_full(&self) -> bool { - self.len() > DEFAULT_MAX_VERIFY_TRANSACTIONS + self.len() >= DEFAULT_MAX_VERIFY_TRANSACTIONS } /// Returns true if the queue contains a tx with the specified id. @@ -130,16 +131,13 @@ impl VerifyQueue { } if self.is_full() { return Err(Reject::Full(format!( - "chunk is full, tx_hash: {:#x}", + "chunk is full, failed to add tx: {:#x}", tx.hash() ))); } self.inner.insert(VerifyEntry { id: tx.proposal_short_id(), - added_time: std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .expect("timestamp") - .as_millis() as u64, + added_time: unix_time_as_millis(), inner: Entry { tx, remote }, }); self.queue_tx.send(self.len()).unwrap(); diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 2a4064fb1d..b13d02c1f9 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -189,16 +189,16 @@ impl TxPoolService { } } + pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool { + let queue = self.verify_queue.read().await; + queue.contains_key(&tx.proposal_short_id()) + } + pub(crate) async fn orphan_contains(&self, tx: &TransactionView) -> bool { let orphan = self.orphan.read().await; orphan.contains_key(&tx.proposal_short_id()) } - pub(crate) async fn chunk_contains(&self, tx: &TransactionView) -> bool { - let chunk = self.verify_queue.read().await; - chunk.contains_key(&tx.proposal_short_id()) - } - pub(crate) async fn with_tx_pool_read_lock) -> U>( &self, mut f: F, @@ -302,7 +302,7 @@ impl TxPoolService { return Err(Reject::Duplicated(tx.hash())); } - if self.chunk_contains(&tx).await { + if self.verify_queue_contains(&tx).await { return Err(Reject::Duplicated(tx.hash())); } @@ -334,7 +334,7 @@ impl TxPoolService { // non contextual verify first self.non_contextual_verify(&tx, remote)?; - if self.chunk_contains(&tx).await || self.orphan_contains(&tx).await { + if self.verify_queue_contains(&tx).await || self.orphan_contains(&tx).await { return Err(Reject::Duplicated(tx.hash())); } @@ -365,8 +365,8 @@ impl TxPoolService { pub(crate) async fn remove_tx(&self, tx_hash: Byte32) -> bool { let id = ProposalShortId::from_tx_hash(&tx_hash); { - let mut chunk = self.verify_queue.write().await; - if chunk.remove_tx(&id).is_some() { + let mut queue = self.verify_queue.write().await; + if queue.remove_tx(&id).is_some() { return true; } } @@ -527,7 +527,7 @@ impl TxPoolService { for orphan in orphans.into_iter() { if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles { debug!( - "process_orphan {} added to chunk; find previous from {}", + "process_orphan {} added to verify queue; find previous from {}", orphan.tx.hash(), tx.hash(), ); @@ -673,7 +673,7 @@ impl TxPoolService { .enqueue_suspended_tx(rtx.transaction.clone(), remote) .await; try_or_return_with_snapshot!(ret, snapshot); - eprintln!("added to queue here: {:?}", tx.proposal_short_id()); + error!("added to verify queue: {:?}", tx.proposal_short_id()); return Some((Ok(ProcessResult::Suspended), snapshot)); } None => { @@ -796,7 +796,7 @@ impl TxPoolService { let completed: Completed = match state { VerifyResult::Completed(cycles) => Completed { cycles, fee }, _ => { - panic!("not expected"); + unreachable!("unexpected Suspend in run_verify_tx"); } }; if let Some((declared_cycle, _peer)) = remote { @@ -828,8 +828,8 @@ impl TxPoolService { tx: TransactionView, remote: Option<(Cycle, PeerIndex)>, ) -> Result { - let mut chunk = self.verify_queue.write().await; - chunk.add_tx(tx, remote) + let mut queue = self.verify_queue.write().await; + queue.add_tx(tx, remote) } pub(crate) async fn _process_tx( @@ -1015,8 +1015,8 @@ impl TxPoolService { self.remove_orphan_txs_by_attach(&attached).await; { - let mut chunk = self.verify_queue.write().await; - chunk.remove_txs(attached.iter().map(|tx| tx.proposal_short_id())); + let mut queue = self.verify_queue.write().await; + queue.remove_txs(attached.iter().map(|tx| tx.proposal_short_id())); } } diff --git a/tx-pool/src/verify_mgr.rs b/tx-pool/src/verify_mgr.rs index 5188e8c50a..b5ac11527d 100644 --- a/tx-pool/src/verify_mgr.rs +++ b/tx-pool/src/verify_mgr.rs @@ -168,6 +168,12 @@ impl VerifyMgr { } } } + if let Some(jh) = self.join_handles.take() { + for h in jh { + h.await.expect("Worker thread panic"); + } + } + info!("TxPool verify_mgr service exited"); } pub async fn run(&mut self) { diff --git a/verification/src/transaction_verifier.rs b/verification/src/transaction_verifier.rs index 594601c843..5df15d0705 100644 --- a/verification/src/transaction_verifier.rs +++ b/verification/src/transaction_verifier.rs @@ -173,7 +173,6 @@ where .script .resumable_verify_with_signal(limit_cycles, &mut command_rx) .await?; - eprintln!("resumable_verify_with_signal ret: {:?}", ret); Ok((ret, fee)) } @@ -372,7 +371,7 @@ impl