Skip to content

Commit

Permalink
enqueue Notify transactions directly
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Jan 18, 2024
1 parent 246363a commit 3b30874
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
14 changes: 11 additions & 3 deletions tx-pool/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ impl TxPoolService {
&self,
tx: TransactionView,
remote: Option<(Cycle, PeerIndex)>,
add_verify_queue: bool,
) -> Result<(), Reject> {
// non contextual verify first
self.non_contextual_verify(&tx, None)?;
Expand All @@ -306,7 +307,10 @@ impl TxPoolService {
return Err(Reject::Duplicated(tx.hash()));
}

if let Some((ret, snapshot)) = self._resumeble_process_tx(tx.clone(), remote).await {
if let Some((ret, snapshot)) = self
._resumeble_process_tx(tx.clone(), remote, add_verify_queue)
.await
{
match ret {
Ok(processed) => {
if let ProcessResult::Completed(completed) = processed {
Expand Down Expand Up @@ -634,6 +638,7 @@ impl TxPoolService {
&self,
tx: TransactionView,
remote: Option<(Cycle, PeerIndex)>,
add_verify_queue: bool,
) -> Option<(Result<ProcessResult, Reject>, Arc<Snapshot>)> {
let tx_hash = tx.hash();

Expand Down Expand Up @@ -667,8 +672,10 @@ impl TxPoolService {
try_or_return_with_snapshot!(ret, snapshot);
completed
}
None if remote.is_some() => {
// for remote transaction with large decleard cycles, we enqueue it to verify queue
None if add_verify_queue => {
// for remote transaction with decleard cycles, we enqueue it to verify queue directly
// notified transaction now don't have decleard cycles, we may need to fix it in future,
// now we also enqueue it to verify queue directly
let ret = self
.enqueue_suspended_tx(rtx.transaction.clone(), remote)
.await;
Expand All @@ -678,6 +685,7 @@ impl TxPoolService {
}
None => {
// for local transaction, we verify it directly with a max cycles limit
assert!(remote.is_none());
let ret = {
block_in_place(|| {
let cycle_limit = snapshot.cloned_consensus().max_block_cycles();
Expand Down
6 changes: 3 additions & 3 deletions tx-pool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ async fn process(mut service: TxPoolService, message: Message) {
responder,
arguments: tx,
}) => {
let result = service.resumeble_process_tx(tx, None).await;
let result = service.resumeble_process_tx(tx, None, false).await;
if let Err(e) = responder.send(result) {
error!("Responder sending submit_tx result failed {:?}", e);
};
Expand All @@ -705,7 +705,7 @@ async fn process(mut service: TxPoolService, message: Message) {
}) => {
if declared_cycles > service.tx_pool_config.max_tx_verify_cycles {
let _result = service
.resumeble_process_tx(tx, Some((declared_cycles, peer)))
.resumeble_process_tx(tx, Some((declared_cycles, peer)), true)
.await;
if let Err(e) = responder.send(()) {
error!("Responder sending submit_tx result failed {:?}", e);
Expand All @@ -719,7 +719,7 @@ async fn process(mut service: TxPoolService, message: Message) {
}
Message::NotifyTxs(Notify { arguments: txs }) => {
for tx in txs {
let _ret = service.resumeble_process_tx(tx, None).await;
let _ret = service.resumeble_process_tx(tx, None, true).await;
}
}
Message::FreshProposalsFilter(Request {
Expand Down

0 comments on commit 3b30874

Please sign in to comment.