From 64c6fe7ee3f11e0f100ceb1257277b7f8d6cb808 Mon Sep 17 00:00:00 2001 From: yukang Date: Mon, 15 Jan 2024 23:57:09 +0800 Subject: [PATCH] remove EntryCache suspend status --- tx-pool/src/process.rs | 41 ++++++---------- tx-pool/src/util.rs | 17 ++----- tx-pool/src/verify_mgr.rs | 49 ++++++++----------- .../src/contextual_block_verifier.rs | 13 ++--- verification/src/cache.rs | 20 +------- 5 files changed, 45 insertions(+), 95 deletions(-) diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index a309cfb346..d1150164cc 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -651,24 +651,17 @@ impl TxPoolService { let data_loader = snapshot.as_data_loader(); - let completed = if let Some(ref entry) = cached { - match entry { - CacheEntry::Completed(completed) => { - let ret = TimeRelativeTransactionVerifier::new( - Arc::clone(&rtx), - Arc::clone(&self.consensus), - data_loader, - tx_env, - ) - .verify() - .map_err(Reject::Verification); - try_or_return_with_snapshot!(ret, snapshot); - *completed - } - CacheEntry::Suspended(_) => { - panic!("Unexpected suspended entry in cache"); - } - } + let completed = if let Some(ref completed) = cached { + let ret = TimeRelativeTransactionVerifier::new( + Arc::clone(&rtx), + Arc::clone(&self.consensus), + data_loader, + tx_env, + ) + .verify() + .map_err(Reject::Verification); + try_or_return_with_snapshot!(ret, snapshot); + *completed } else if remote.is_some() { // for remote transaction with large decleard cycles, we enqueue it to verify queue let ret = self @@ -706,7 +699,7 @@ impl TxPoolService { { return Err(Reject::Verification(e)); } - Ok(CacheEntry::completed(cycles, fee)) + Ok(Completed { cycles, fee }) } ScriptVerifyResult::Suspended(_state) => { panic!("unexpect suspend"); @@ -714,11 +707,7 @@ impl TxPoolService { } }); - let entry = try_or_return_with_snapshot!(ret, snapshot); - match entry { - CacheEntry::Completed(completed) => completed, - _ => panic!("unexpect suspend"), - } + try_or_return_with_snapshot!(ret, snapshot) }; let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size); @@ -731,7 +720,7 @@ impl TxPoolService { let txs_verify_cache = Arc::clone(&self.txs_verify_cache); tokio::spawn(async move { let mut guard = txs_verify_cache.write().await; - guard.put(tx_hash, CacheEntry::Completed(completed)); + guard.put(tx_hash, completed); }); } @@ -813,7 +802,7 @@ impl TxPoolService { let txs_verify_cache = Arc::clone(&self.txs_verify_cache); tokio::spawn(async move { let mut guard = txs_verify_cache.write().await; - guard.put(tx_hash, CacheEntry::Completed(verified)); + guard.put(tx_hash, verified); }); } diff --git a/tx-pool/src/util.rs b/tx-pool/src/util.rs index 1c02df3d67..937af7d841 100644 --- a/tx-pool/src/util.rs +++ b/tx-pool/src/util.rs @@ -92,18 +92,11 @@ pub(crate) fn verify_rtx( let consensus = snapshot.cloned_consensus(); let data_loader = snapshot.as_data_loader(); - if let Some(ref cached) = cache_entry { - match cached { - CacheEntry::Completed(completed) => { - TimeRelativeTransactionVerifier::new(rtx, consensus, data_loader, tx_env) - .verify() - .map(|_| *completed) - .map_err(Reject::Verification) - } - CacheEntry::Suspended(suspended) => { - panic!("Unexpected suspended entry in verify_rtx: {:?}", suspended); - } - } + if let Some(ref completed) = cache_entry { + TimeRelativeTransactionVerifier::new(rtx, consensus, data_loader, tx_env) + .verify() + .map(|_| *completed) + .map_err(Reject::Verification) } else { block_in_place(|| { ContextualTransactionVerifier::new(Arc::clone(&rtx), consensus, data_loader, tx_env) diff --git a/tx-pool/src/verify_mgr.rs b/tx-pool/src/verify_mgr.rs index 4a081a9166..80ea5387d7 100644 --- a/tx-pool/src/verify_mgr.rs +++ b/tx-pool/src/verify_mgr.rs @@ -154,34 +154,25 @@ impl Worker { let tx_env = Arc::new(TxVerifyEnv::new_submit(tip_header)); eprintln!("run_verify_tx cached: {:?}", cached); - if let Some(ref cached) = cached { - match cached { - CacheEntry::Completed(completed) => { - let ret = TimeRelativeTransactionVerifier::new( - Arc::clone(&rtx), - Arc::clone(&consensus), - snapshot.as_data_loader(), - Arc::clone(&tx_env), - ) - .verify() - .map(|_| *completed) - .map_err(Reject::Verification); - let completed = try_or_return_with_snapshot!(ret, snapshot); - - let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size); - let (ret, submit_snapshot) = - self.service.submit_entry(tip_hash, entry, status).await; - try_or_return_with_snapshot!(ret, submit_snapshot); - self.service - .after_process(tx, remote, &submit_snapshot, &Ok(completed)) - .await; - return Some((Ok(false), submit_snapshot)); - } - CacheEntry::Suspended(_suspended) => { - eprintln!("not expected suspended: {:?}", cached); - //panic!("not expected"); - } - } + if let Some(ref completed) = cached { + let ret = TimeRelativeTransactionVerifier::new( + Arc::clone(&rtx), + Arc::clone(&consensus), + snapshot.as_data_loader(), + Arc::clone(&tx_env), + ) + .verify() + .map(|_| *completed) + .map_err(Reject::Verification); + let completed = try_or_return_with_snapshot!(ret, snapshot); + + let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size); + let (ret, submit_snapshot) = self.service.submit_entry(tip_hash, entry, status).await; + try_or_return_with_snapshot!(ret, submit_snapshot); + self.service + .after_process(tx, remote, &submit_snapshot, &Ok(completed)) + .await; + return Some((Ok(false), submit_snapshot)); } let cloned_snapshot = Arc::clone(&snapshot); @@ -257,7 +248,7 @@ impl Worker { update_cache( Arc::clone(&self.service.txs_verify_cache), tx_hash, - CacheEntry::Completed(completed), + completed, ) .await; diff --git a/verification/contextual/src/contextual_block_verifier.rs b/verification/contextual/src/contextual_block_verifier.rs index ae040e2e59..132b4e3ca0 100644 --- a/verification/contextual/src/contextual_block_verifier.rs +++ b/verification/contextual/src/contextual_block_verifier.rs @@ -379,7 +379,7 @@ impl<'a, 'b, CS: ChainStore + VersionbitsIndexer + 'static> BlockTxsVerifier<'a, self.handle.spawn(async move { let mut guard = txs_verify_cache.write().await; for (k, v) in ret { - guard.put(k, CacheEntry::Completed(v)); + guard.put(k, v); } }); } @@ -412,9 +412,8 @@ impl<'a, 'b, CS: ChainStore + VersionbitsIndexer + 'static> BlockTxsVerifier<'a, .map(|(index, tx)| { let tx_hash = tx.transaction.hash(); - if let Some(cache_entry) = fetched_cache.get(&tx_hash) { - match cache_entry { - CacheEntry::Completed(completed) => TimeRelativeTransactionVerifier::new( + if let Some(completed) = fetched_cache.get(&tx_hash) { + TimeRelativeTransactionVerifier::new( Arc::clone(tx), Arc::clone(&self.context.consensus), self.context.store.as_data_loader(), @@ -428,11 +427,7 @@ impl<'a, 'b, CS: ChainStore + VersionbitsIndexer + 'static> BlockTxsVerifier<'a, } .into() }) - .map(|_| (tx_hash, *completed)), - CacheEntry::Suspended(_suspended) => { - panic!("unexpected Suspended in verify"); - }, - } + .map(|_| (tx_hash, *completed)) } else { ContextualTransactionVerifier::new( Arc::clone(tx), diff --git a/verification/src/cache.rs b/verification/src/cache.rs index b7ceab66ab..1b01d2d1d4 100644 --- a/verification/src/cache.rs +++ b/verification/src/cache.rs @@ -17,14 +17,8 @@ pub fn init_cache() -> TxVerificationCache { lru::LruCache::new(CACHE_SIZE) } -#[derive(Clone, Debug)] /// TX verification lru entry -pub enum CacheEntry { - /// Completed - Completed(Completed), - /// Suspended - Suspended(Suspended), -} +pub type CacheEntry = Completed; /// Suspended state #[derive(Clone, Debug)] @@ -43,15 +37,3 @@ pub struct Completed { /// Cached tx fee pub fee: Capacity, } - -impl CacheEntry { - /// Constructs a completed CacheEntry - pub fn completed(cycles: Cycle, fee: Capacity) -> Self { - CacheEntry::Completed(Completed { cycles, fee }) - } - - /// Constructs a Suspended CacheEntry - pub fn suspended(snap: Arc, fee: Capacity) -> Self { - CacheEntry::Suspended(Suspended { snap, fee }) - } -}