Skip to content

Commit

Permalink
remove EntryCache suspend status
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Jan 15, 2024
1 parent a9335e1 commit 64c6fe7
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 95 deletions.
41 changes: 15 additions & 26 deletions tx-pool/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,24 +651,17 @@ impl TxPoolService {

let data_loader = snapshot.as_data_loader();

let completed = if let Some(ref entry) = cached {
match entry {
CacheEntry::Completed(completed) => {
let ret = TimeRelativeTransactionVerifier::new(
Arc::clone(&rtx),
Arc::clone(&self.consensus),
data_loader,
tx_env,
)
.verify()
.map_err(Reject::Verification);
try_or_return_with_snapshot!(ret, snapshot);
*completed
}
CacheEntry::Suspended(_) => {
panic!("Unexpected suspended entry in cache");
}
}
let completed = if let Some(ref completed) = cached {
let ret = TimeRelativeTransactionVerifier::new(
Arc::clone(&rtx),
Arc::clone(&self.consensus),
data_loader,
tx_env,
)
.verify()
.map_err(Reject::Verification);
try_or_return_with_snapshot!(ret, snapshot);
*completed
} else if remote.is_some() {
// for remote transaction with large decleard cycles, we enqueue it to verify queue
let ret = self
Expand Down Expand Up @@ -706,19 +699,15 @@ impl TxPoolService {
{
return Err(Reject::Verification(e));
}
Ok(CacheEntry::completed(cycles, fee))
Ok(Completed { cycles, fee })
}
ScriptVerifyResult::Suspended(_state) => {
panic!("unexpect suspend");
}
}
});

let entry = try_or_return_with_snapshot!(ret, snapshot);
match entry {
CacheEntry::Completed(completed) => completed,
_ => panic!("unexpect suspend"),
}
try_or_return_with_snapshot!(ret, snapshot)
};

let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size);
Expand All @@ -731,7 +720,7 @@ impl TxPoolService {
let txs_verify_cache = Arc::clone(&self.txs_verify_cache);
tokio::spawn(async move {
let mut guard = txs_verify_cache.write().await;
guard.put(tx_hash, CacheEntry::Completed(completed));
guard.put(tx_hash, completed);
});
}

Expand Down Expand Up @@ -813,7 +802,7 @@ impl TxPoolService {
let txs_verify_cache = Arc::clone(&self.txs_verify_cache);
tokio::spawn(async move {
let mut guard = txs_verify_cache.write().await;
guard.put(tx_hash, CacheEntry::Completed(verified));
guard.put(tx_hash, verified);
});
}

Expand Down
17 changes: 5 additions & 12 deletions tx-pool/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,11 @@ pub(crate) fn verify_rtx(
let consensus = snapshot.cloned_consensus();
let data_loader = snapshot.as_data_loader();

if let Some(ref cached) = cache_entry {
match cached {
CacheEntry::Completed(completed) => {
TimeRelativeTransactionVerifier::new(rtx, consensus, data_loader, tx_env)
.verify()
.map(|_| *completed)
.map_err(Reject::Verification)
}
CacheEntry::Suspended(suspended) => {
panic!("Unexpected suspended entry in verify_rtx: {:?}", suspended);
}
}
if let Some(ref completed) = cache_entry {
TimeRelativeTransactionVerifier::new(rtx, consensus, data_loader, tx_env)
.verify()
.map(|_| *completed)
.map_err(Reject::Verification)
} else {
block_in_place(|| {
ContextualTransactionVerifier::new(Arc::clone(&rtx), consensus, data_loader, tx_env)
Expand Down
49 changes: 20 additions & 29 deletions tx-pool/src/verify_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,34 +154,25 @@ impl Worker {
let tx_env = Arc::new(TxVerifyEnv::new_submit(tip_header));

eprintln!("run_verify_tx cached: {:?}", cached);
if let Some(ref cached) = cached {
match cached {
CacheEntry::Completed(completed) => {
let ret = TimeRelativeTransactionVerifier::new(
Arc::clone(&rtx),
Arc::clone(&consensus),
snapshot.as_data_loader(),
Arc::clone(&tx_env),
)
.verify()
.map(|_| *completed)
.map_err(Reject::Verification);
let completed = try_or_return_with_snapshot!(ret, snapshot);

let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size);
let (ret, submit_snapshot) =
self.service.submit_entry(tip_hash, entry, status).await;
try_or_return_with_snapshot!(ret, submit_snapshot);
self.service
.after_process(tx, remote, &submit_snapshot, &Ok(completed))
.await;
return Some((Ok(false), submit_snapshot));
}
CacheEntry::Suspended(_suspended) => {
eprintln!("not expected suspended: {:?}", cached);
//panic!("not expected");
}
}
if let Some(ref completed) = cached {
let ret = TimeRelativeTransactionVerifier::new(
Arc::clone(&rtx),
Arc::clone(&consensus),
snapshot.as_data_loader(),
Arc::clone(&tx_env),
)
.verify()
.map(|_| *completed)
.map_err(Reject::Verification);
let completed = try_or_return_with_snapshot!(ret, snapshot);

let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size);
let (ret, submit_snapshot) = self.service.submit_entry(tip_hash, entry, status).await;
try_or_return_with_snapshot!(ret, submit_snapshot);
self.service
.after_process(tx, remote, &submit_snapshot, &Ok(completed))
.await;
return Some((Ok(false), submit_snapshot));
}

let cloned_snapshot = Arc::clone(&snapshot);
Expand Down Expand Up @@ -257,7 +248,7 @@ impl Worker {
update_cache(
Arc::clone(&self.service.txs_verify_cache),
tx_hash,
CacheEntry::Completed(completed),
completed,
)
.await;

Expand Down
13 changes: 4 additions & 9 deletions verification/contextual/src/contextual_block_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ impl<'a, 'b, CS: ChainStore + VersionbitsIndexer + 'static> BlockTxsVerifier<'a,
self.handle.spawn(async move {
let mut guard = txs_verify_cache.write().await;
for (k, v) in ret {
guard.put(k, CacheEntry::Completed(v));
guard.put(k, v);
}
});
}
Expand Down Expand Up @@ -412,9 +412,8 @@ impl<'a, 'b, CS: ChainStore + VersionbitsIndexer + 'static> BlockTxsVerifier<'a,
.map(|(index, tx)| {
let tx_hash = tx.transaction.hash();

if let Some(cache_entry) = fetched_cache.get(&tx_hash) {
match cache_entry {
CacheEntry::Completed(completed) => TimeRelativeTransactionVerifier::new(
if let Some(completed) = fetched_cache.get(&tx_hash) {
TimeRelativeTransactionVerifier::new(
Arc::clone(tx),
Arc::clone(&self.context.consensus),
self.context.store.as_data_loader(),
Expand All @@ -428,11 +427,7 @@ impl<'a, 'b, CS: ChainStore + VersionbitsIndexer + 'static> BlockTxsVerifier<'a,
}
.into()
})
.map(|_| (tx_hash, *completed)),
CacheEntry::Suspended(_suspended) => {
panic!("unexpected Suspended in verify");
},
}
.map(|_| (tx_hash, *completed))
} else {
ContextualTransactionVerifier::new(
Arc::clone(tx),
Expand Down
20 changes: 1 addition & 19 deletions verification/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@ pub fn init_cache() -> TxVerificationCache {
lru::LruCache::new(CACHE_SIZE)
}

#[derive(Clone, Debug)]
/// TX verification lru entry
pub enum CacheEntry {
/// Completed
Completed(Completed),
/// Suspended
Suspended(Suspended),
}
pub type CacheEntry = Completed;

/// Suspended state
#[derive(Clone, Debug)]
Expand All @@ -43,15 +37,3 @@ pub struct Completed {
/// Cached tx fee
pub fee: Capacity,
}

impl CacheEntry {
/// Constructs a completed CacheEntry
pub fn completed(cycles: Cycle, fee: Capacity) -> Self {
CacheEntry::Completed(Completed { cycles, fee })
}

/// Constructs a Suspended CacheEntry
pub fn suspended(snap: Arc<TransactionSnapshot>, fee: Capacity) -> Self {
CacheEntry::Suspended(Suspended { snap, fee })
}
}

0 comments on commit 64c6fe7

Please sign in to comment.