Skip to content

Commit

Permalink
feat(starknet_mempool): delay declare txs
Browse files Browse the repository at this point in the history
  • Loading branch information
alonh5 committed Mar 4, 2025
1 parent 9a25a8f commit 483a0b0
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 8 deletions.
61 changes: 56 additions & 5 deletions crates/starknet_mempool/src/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Instant;

use starknet_api::block::NonzeroGasPrice;
use starknet_api::core::{ContractAddress, Nonce};
use starknet_api::rpc_transaction::InternalRpcTransaction;
use starknet_api::rpc_transaction::{InternalRpcTransaction, InternalRpcTransactionWithoutTxHash};
use starknet_api::transaction::fields::Tip;
use starknet_api::transaction::TransactionHash;
use starknet_mempool_types::errors::MempoolError;
Expand Down Expand Up @@ -136,7 +137,8 @@ impl MempoolState {
pub struct Mempool {
config: MempoolConfig,
// TODO(AlonH): add docstring explaining visibility and coupling of the fields.
// All transactions currently held in the mempool.
delayed_declares: VecDeque<(Instant, AddTransactionArgs)>,
// All transactions currently held in the mempool (excluding the delayed declares).
tx_pool: TransactionPool,
// Transactions eligible for sequencing.
tx_queue: TransactionQueue,
Expand All @@ -148,6 +150,7 @@ impl Mempool {
pub fn new(config: MempoolConfig, clock: Arc<dyn Clock>) -> Self {
Mempool {
config,
delayed_declares: VecDeque::new(),
tx_pool: TransactionPool::new(clock.clone()),
tx_queue: TransactionQueue::default(),
state: MempoolState::default(),
Expand Down Expand Up @@ -233,15 +236,31 @@ impl Mempool {

// First remove old transactions from the pool.
self.remove_expired_txs();
self.add_ready_declares(&mut metric_handle);

let tx_reference = TransactionReference::new(&args.tx);
self.validate_incoming_tx(tx_reference)?;
self.handle_fee_escalation(&args.tx)?;

if let InternalRpcTransactionWithoutTxHash::Declare(_) = &args.tx.tx {
self.delayed_declares.push_back((self.clock.now(), args));
return Ok(());
}

self.add_tx_inner(args, &mut metric_handle)
}

fn add_tx_inner(
&mut self,
args: AddTransactionArgs,
metric_handle: &mut MempoolMetricHandle,
) -> MempoolResult<()> {
let AddTransactionArgs { tx, account_state } = args;
info!("Adding transaction to mempool.");
trace!("{tx:#?}");

let tx_reference = TransactionReference::new(&tx);
self.validate_incoming_tx(tx_reference)?;

self.handle_fee_escalation(&tx)?;
self.tx_pool.insert(tx)?;

metric_handle.transaction_inserted();
Expand All @@ -259,6 +278,20 @@ impl Mempool {
Ok(())
}

fn add_ready_declares(&mut self, metric_handle: &mut MempoolMetricHandle) {
let now = self.clock.now();
while let Some((submission_time, _args)) = self.delayed_declares.front() {
if now - *submission_time < self.config.declare_delay {
break;
}
let (_submission_time, args) =
self.delayed_declares.pop_front().expect("Delay declare should exist.");
let _ = self
.add_tx_inner(args, metric_handle)
.map_err(|err| debug!("Failed to add declare tx after delay: {err}."));
}
}

/// Update the mempool's internal state according to the committed block (resolves nonce gaps,
/// updates account balances).
#[instrument(skip(self, args))]
Expand Down Expand Up @@ -333,9 +366,27 @@ impl Mempool {
}

fn validate_incoming_tx(&self, tx_reference: TransactionReference) -> MempoolResult<()> {
self.validate_no_delayed_declare_front_run(tx_reference)?;
self.state.validate_incoming_tx(tx_reference)
}

/// Validates that the given transaction does not front run a delayed declare.
fn validate_no_delayed_declare_front_run(
&self,
tx_reference: TransactionReference,
) -> MempoolResult<()> {
if self.delayed_declares.iter().any(|(_, tx_args)| {
let tx = &tx_args.tx;
tx.contract_address() == tx_reference.address && tx.nonce() == tx_reference.nonce
}) {
return Err(MempoolError::DuplicateNonce {
address: tx_reference.address,
nonce: tx_reference.nonce,
});
}
Ok(())
}

fn validate_commitment(&self, address: ContractAddress, next_nonce: Nonce) {
self.state.validate_commitment(address, next_nonce);
}
Expand Down
63 changes: 60 additions & 3 deletions crates/starknet_mempool/src/mempool_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Duration;

use mempool_test_utils::starknet_api_test_utils::test_valid_resource_bounds;
use metrics_exporter_prometheus::PrometheusBuilder;
use mockall::predicate;
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
Expand All @@ -17,7 +18,7 @@ use starknet_api::rpc_transaction::{
};
use starknet_api::test_utils::declare::{internal_rpc_declare_tx, DeclareTxArgs};
use starknet_api::transaction::TransactionHash;
use starknet_api::{contract_address, nonce};
use starknet_api::{contract_address, declare_tx_args, nonce, tx_hash};
use starknet_mempool_p2p_types::communication::MockMempoolP2pPropagatorClient;
use starknet_mempool_types::communication::AddTransactionArgsWrapper;
use starknet_mempool_types::errors::MempoolError;
Expand Down Expand Up @@ -146,6 +147,7 @@ impl MempoolTestContentBuilder {
fn build_full_mempool(self) -> Mempool {
Mempool {
config: self.config,
delayed_declares: VecDeque::new(),
tx_pool: self.content.tx_pool.unwrap_or_default().into_values().collect(),
tx_queue: TransactionQueue::new(
self.content.priority_txs.unwrap_or_default(),
Expand All @@ -168,7 +170,7 @@ impl FromIterator<InternalRpcTransaction> for TransactionPool {
}
}

fn _declare_add_tx_input(args: DeclareTxArgs) -> AddTransactionArgs {
fn declare_add_tx_input(args: DeclareTxArgs) -> AddTransactionArgs {
let tx = internal_rpc_declare_tx(args);
let account_state = AccountState { address: tx.contract_address(), nonce: tx.nonce() };

Expand Down Expand Up @@ -1167,3 +1169,58 @@ fn expired_staged_txs_are_not_deleted() {
MempoolTestContentBuilder::new().with_pool([staged_tx.tx, another_tx.tx]).build();
expected_mempool_content.assert_eq(&mempool.content());
}

#[rstest]
fn delay_declare_txs() {
// Create a mempool with a fake clock.
let fake_clock = Arc::new(FakeClock::default());
let declare_delay = Duration::from_secs(5);
let mut mempool = Mempool::new(
MempoolConfig {
transaction_ttl: Duration::from_secs(60),
declare_delay,
// Always accept fee escalation to test only the delayed declare duplicate nonce.
enable_fee_escalation: true,
fee_escalation_percentage: 0,
},
fake_clock.clone(),
);
let first_declare = declare_add_tx_input(
declare_tx_args!(resource_bounds: test_valid_resource_bounds(), sender_address: contract_address!("0x0"), tx_hash: tx_hash!(0)),
);
add_tx(&mut mempool, &first_declare);

fake_clock.advance(Duration::from_secs(1));
let second_declare = declare_add_tx_input(
declare_tx_args!(resource_bounds: test_valid_resource_bounds(), sender_address: contract_address!("0x1"), tx_hash: tx_hash!(1)),
);
add_tx(&mut mempool, &second_declare);

assert_eq!(mempool.get_txs(2).unwrap(), vec![]);

// Complete the first declare's delay.
fake_clock.advance(declare_delay - Duration::from_secs(1));
// Add another declare transaction to trigger `add_ready_declares`, and to verify the first
// declare of the same account cannot be front run.
add_tx_expect_error(
&mut mempool,
&second_declare,
MempoolError::DuplicateNonce {
address: second_declare.tx.contract_address(),
nonce: second_declare.tx.nonce(),
},
);

// Assert only the first declare is in the mempool.
assert_eq!(mempool.get_txs(2).unwrap(), vec![first_declare.tx]);

// Complete the second declare's delay.
fake_clock.advance(Duration::from_secs(1));
// Add another declare transaction to trigger `add_ready_declares`
let another_tx =
add_tx_input!(tx_hash: 2, address: "0x1", tx_nonce: 5, account_nonce: 0, tip: 100);
add_tx(&mut mempool, &another_tx);

// Assert the second declare was also added to the mempool.
assert_eq!(mempool.get_txs(2).unwrap(), vec![second_declare.tx]);
}

0 comments on commit 483a0b0

Please sign in to comment.