diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index b13d02c1f9c..5334979e428 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -293,6 +293,7 @@ impl TxPoolService { &self, tx: TransactionView, remote: Option<(Cycle, PeerIndex)>, + add_verify_queue: bool, ) -> Result<(), Reject> { // non contextual verify first self.non_contextual_verify(&tx, None)?; @@ -306,7 +307,10 @@ impl TxPoolService { return Err(Reject::Duplicated(tx.hash())); } - if let Some((ret, snapshot)) = self._resumeble_process_tx(tx.clone(), remote).await { + if let Some((ret, snapshot)) = self + ._resumeble_process_tx(tx.clone(), remote, add_verify_queue) + .await + { match ret { Ok(processed) => { if let ProcessResult::Completed(completed) = processed { @@ -634,6 +638,7 @@ impl TxPoolService { &self, tx: TransactionView, remote: Option<(Cycle, PeerIndex)>, + add_verify_queue: bool, ) -> Option<(Result, Arc)> { let tx_hash = tx.hash(); @@ -667,8 +672,10 @@ impl TxPoolService { try_or_return_with_snapshot!(ret, snapshot); completed } - None if remote.is_some() => { - // for remote transaction with large decleard cycles, we enqueue it to verify queue + None if add_verify_queue => { + // for remote transaction with decleard cycles, we enqueue it to verify queue directly + // notified transaction now don't have decleard cycles, we may need to fix it in future, + // now we also enqueue it to verify queue directly let ret = self .enqueue_suspended_tx(rtx.transaction.clone(), remote) .await; @@ -678,6 +685,7 @@ impl TxPoolService { } None => { // for local transaction, we verify it directly with a max cycles limit + assert!(remote.is_none()); let ret = { block_in_place(|| { let cycle_limit = snapshot.cloned_consensus().max_block_cycles(); diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index 7b87ff9e7a7..26a9536226f 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -685,7 +685,7 @@ async fn process(mut service: TxPoolService, message: Message) { responder, arguments: tx, }) => { - let result = service.resumeble_process_tx(tx, None).await; + let result = service.resumeble_process_tx(tx, None, false).await; if let Err(e) = responder.send(result) { error!("Responder sending submit_tx result failed {:?}", e); }; @@ -705,7 +705,7 @@ async fn process(mut service: TxPoolService, message: Message) { }) => { if declared_cycles > service.tx_pool_config.max_tx_verify_cycles { let _result = service - .resumeble_process_tx(tx, Some((declared_cycles, peer))) + .resumeble_process_tx(tx, Some((declared_cycles, peer)), true) .await; if let Err(e) = responder.send(()) { error!("Responder sending submit_tx result failed {:?}", e); @@ -719,7 +719,7 @@ async fn process(mut service: TxPoolService, message: Message) { } Message::NotifyTxs(Notify { arguments: txs }) => { for tx in txs { - let _ret = service.resumeble_process_tx(tx, None).await; + let _ret = service.resumeble_process_tx(tx, None, true).await; } } Message::FreshProposalsFilter(Request {