diff --git a/cmd/ethrex/ethrex.rs b/cmd/ethrex/ethrex.rs index 8a59668397..3459a20e27 100644 --- a/cmd/ethrex/ethrex.rs +++ b/cmd/ethrex/ethrex.rs @@ -261,7 +261,7 @@ async fn main() { peer_table.clone(), sync_mode, cancel_token.clone(), - blockchain, + blockchain.clone(), ); // TODO: Check every module starts properly. @@ -298,6 +298,7 @@ async fn main() { http_socket_addr, authrpc_socket_addr, store.clone(), + blockchain.clone(), jwt_secret_clone, local_p2p_node, local_node_record, @@ -313,6 +314,7 @@ async fn main() { http_socket_addr, authrpc_socket_addr, store.clone(), + blockchain.clone(), jwt_secret_clone, local_p2p_node, local_node_record, @@ -348,7 +350,7 @@ async fn main() { error!("Cannot run with DEV_MODE if the `l2` feature is enabled."); panic!("Run without the --dev argument."); } - let l2_proposer = ethrex_l2::start_proposer(store).into_future(); + let l2_proposer = ethrex_l2::start_proposer(store.clone(), blockchain.clone()).into_future(); tracker.spawn(l2_proposer); } else if #[cfg(feature = "dev")] { use ethrex_dev; @@ -386,6 +388,7 @@ async fn main() { signer, peer_table.clone(), store, + blockchain, ) .await.expect("Network starts"); tracker.spawn(ethrex_p2p::periodically_show_peer_stats(peer_table.clone())); diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index d697424b02..7f948521a0 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -5,15 +5,21 @@ pub mod mempool; pub mod payload; mod smoke_test; +use constants::MAX_INITCODE_SIZE; +use error::MempoolError; use error::{ChainError, InvalidBlockError}; -use ethrex_common::constants::GAS_PER_BLOB; +use ethrex_common::constants::{GAS_PER_BLOB, MIN_BASE_FEE_PER_BLOB_GAS}; use ethrex_common::types::requests::{compute_requests_hash, EncodedRequests, Requests}; +use ethrex_common::types::BlobsBundle; +use ethrex_common::types::MempoolTransaction; use ethrex_common::types::{ compute_receipts_root, validate_block_header, validate_cancun_header_fields, validate_prague_header_fields, validate_pre_cancun_header_fields, Block, BlockHash, BlockHeader, BlockNumber, ChainConfig, EIP4844Transaction, Receipt, Transaction, }; -use ethrex_common::H256; + +use ethrex_common::{Address, H256}; +use mempool::Mempool; use std::{ops::Div, time::Instant}; use ethrex_storage::error::StoreError; @@ -31,6 +37,7 @@ use tracing::{error, info, warn}; pub struct Blockchain { pub vm: EVM, pub storage: Store, + pub mempool: Mempool, } impl Blockchain { @@ -38,6 +45,7 @@ impl Blockchain { Self { vm: evm, storage: store, + mempool: Mempool::new(), } } @@ -45,6 +53,7 @@ impl Blockchain { Self { vm: Default::default(), storage: store, + mempool: Mempool::new(), } } @@ -150,6 +159,162 @@ impl Blockchain { } info!("Added {size} blocks to blockchain"); } + + /// Add a blob transaction and its blobs bundle to the mempool checking that the transaction is valid + #[cfg(feature = "c-kzg")] + pub fn add_blob_transaction_to_pool( + &self, + transaction: EIP4844Transaction, + blobs_bundle: BlobsBundle, + ) -> Result { + // Validate blobs bundle + + blobs_bundle.validate(&transaction)?; + + let transaction = Transaction::EIP4844Transaction(transaction); + let sender = transaction.sender(); + + // Validate transaction + self.validate_transaction(&transaction, sender)?; + + // Add transaction and blobs bundle to storage + let hash = transaction.compute_hash(); + self.mempool + .add_transaction(hash, MempoolTransaction::new(transaction, sender))?; + self.mempool.add_blobs_bundle(hash, blobs_bundle)?; + Ok(hash) + } + + /// Add a transaction to the mempool checking that the transaction is valid + pub fn add_transaction_to_pool(&self, transaction: Transaction) -> Result { + // Blob transactions should be submitted via add_blob_transaction along with the corresponding blobs bundle + if matches!(transaction, Transaction::EIP4844Transaction(_)) { + return Err(MempoolError::BlobTxNoBlobsBundle); + } + let sender = transaction.sender(); + // Validate transaction + self.validate_transaction(&transaction, sender)?; + + let hash = transaction.compute_hash(); + + // Add transaction to storage + self.mempool + .add_transaction(hash, MempoolTransaction::new(transaction, sender))?; + + Ok(hash) + } + + /// Remove a transaction from the mempool + pub fn remove_transaction_from_pool(&self, hash: &H256) -> Result<(), StoreError> { + self.mempool.remove_transaction(hash) + } + + /* + + SOME VALIDATIONS THAT WE COULD INCLUDE + Stateless validations + 1. This transaction is valid on current mempool + -> Depends on mempool transaction filtering logic + 2. Ensure the maxPriorityFeePerGas is high enough to cover the requirement of the calling pool (the minimum to be included in) + -> Depends on mempool transaction filtering logic + 3. Transaction's encoded size is smaller than maximum allowed + -> I think that this is not in the spec, but it may be a good idea + 4. Make sure the transaction is signed properly + 5. Ensure a Blob Transaction comes with its sidecar (Done! - All blob validations have been moved to `common/types/blobs_bundle.rs`): + 1. Validate number of BlobHashes is positive (Done!) + 2. Validate number of BlobHashes is less than the maximum allowed per block, + which may be computed as `maxBlobGasPerBlock / blobTxBlobGasPerBlob` + 3. Ensure number of BlobHashes is equal to: + - The number of blobs (Done!) + - The number of commitments (Done!) + - The number of proofs (Done!) + 4. Validate that the hashes matches with the commitments, performing a `kzg4844` hash. (Done!) + 5. Verify the blob proofs with the `kzg4844` (Done!) + Stateful validations + 1. Ensure transaction nonce is higher than the `from` address stored nonce + 2. Certain pools do not allow for nonce gaps. Ensure a gap is not produced (that is, the transaction nonce is exactly the following of the stored one) + 3. Ensure the transactor has enough funds to cover transaction cost: + - Transaction cost is calculated as `(gas * gasPrice) + (blobGas * blobGasPrice) + value` + 4. In case of transaction reorg, ensure the transactor has enough funds to cover for transaction replacements without overdrafts. + - This is done by comparing the total spent gas of the transactor from all pooled transactions, and accounting for the necessary gas spenditure if any of those transactions is replaced. + 5. Ensure the transactor is able to add a new transaction. The number of transactions sent by an account may be limited by a certain configured value + + */ + + pub fn validate_transaction( + &self, + tx: &Transaction, + sender: Address, + ) -> Result<(), MempoolError> { + // TODO: Add validations here + + let header_no = self.storage.get_latest_block_number()?; + let header = self + .storage + .get_block_header(header_no)? + .ok_or(MempoolError::NoBlockHeaderError)?; + let config = self.storage.get_chain_config()?; + + // NOTE: We could add a tx size limit here, but it's not in the actual spec + + // Check init code size + if config.is_shanghai_activated(header.timestamp) + && tx.is_contract_creation() + && tx.data().len() > MAX_INITCODE_SIZE + { + return Err(MempoolError::TxMaxInitCodeSizeError); + } + + // Check gas limit is less than header's gas limit + if header.gas_limit < tx.gas_limit() { + return Err(MempoolError::TxGasLimitExceededError); + } + + // Check priority fee is less or equal than gas fee gap + if tx.max_priority_fee().unwrap_or(0) > tx.max_fee_per_gas().unwrap_or(0) { + return Err(MempoolError::TxTipAboveFeeCapError); + } + + // Check that the gas limit is covers the gas needs for transaction metadata. + if tx.gas_limit() < mempool::transaction_intrinsic_gas(tx, &header, &config)? { + return Err(MempoolError::TxIntrinsicGasCostAboveLimitError); + } + + // Check that the specified blob gas fee is above the minimum value + if let Some(fee) = tx.max_fee_per_blob_gas() { + // Blob tx fee checks + if fee < MIN_BASE_FEE_PER_BLOB_GAS.into() { + return Err(MempoolError::TxBlobBaseFeeTooLowError); + } + }; + + let maybe_sender_acc_info = self.storage.get_account_info(header_no, sender)?; + + if let Some(sender_acc_info) = maybe_sender_acc_info { + if tx.nonce() < sender_acc_info.nonce { + return Err(MempoolError::InvalidNonce); + } + + let tx_cost = tx + .cost_without_base_fee() + .ok_or(MempoolError::InvalidTxGasvalues)?; + + if tx_cost > sender_acc_info.balance { + return Err(MempoolError::NotEnoughBalance); + } + } else { + // An account that is not in the database cannot possibly have enough balance to cover the transaction cost + return Err(MempoolError::NotEnoughBalance); + } + + if let Some(chain_id) = tx.chain_id() { + if chain_id != config.chain_id { + return Err(MempoolError::InvalidChainId(config.chain_id)); + } + } + + Ok(()) + } } pub fn validate_requests_hash( diff --git a/crates/blockchain/mempool.rs b/crates/blockchain/mempool.rs index 233f50dfb2..77bdb9ff37 100644 --- a/crates/blockchain/mempool.rs +++ b/crates/blockchain/mempool.rs @@ -1,128 +1,203 @@ -use std::collections::HashMap; +use std::{ + collections::{HashMap, HashSet}, + sync::{Arc, Mutex, RwLock}, +}; use crate::{ constants::{ - MAX_INITCODE_SIZE, TX_ACCESS_LIST_ADDRESS_GAS, TX_ACCESS_LIST_STORAGE_KEY_GAS, - TX_CREATE_GAS_COST, TX_DATA_NON_ZERO_GAS, TX_DATA_NON_ZERO_GAS_EIP2028, - TX_DATA_ZERO_GAS_COST, TX_GAS_COST, TX_INIT_CODE_WORD_GAS_COST, + TX_ACCESS_LIST_ADDRESS_GAS, TX_ACCESS_LIST_STORAGE_KEY_GAS, TX_CREATE_GAS_COST, + TX_DATA_NON_ZERO_GAS, TX_DATA_NON_ZERO_GAS_EIP2028, TX_DATA_ZERO_GAS_COST, TX_GAS_COST, + TX_INIT_CODE_WORD_GAS_COST, }, error::MempoolError, }; use ethrex_common::{ - constants::MIN_BASE_FEE_PER_BLOB_GAS, - types::{ - BlobsBundle, BlockHeader, ChainConfig, EIP4844Transaction, MempoolTransaction, Transaction, - }, + types::{BlobsBundle, BlockHeader, ChainConfig, MempoolTransaction, Transaction, TxType}, Address, H256, U256, }; -use ethrex_storage::{error::StoreError, Store}; - -/// Add a blob transaction and its blobs bundle to the mempool -#[cfg(feature = "c-kzg")] -pub fn add_blob_transaction( - transaction: EIP4844Transaction, - blobs_bundle: BlobsBundle, - store: &Store, -) -> Result { - // Validate blobs bundle - blobs_bundle.validate(&transaction)?; - - let transaction = Transaction::EIP4844Transaction(transaction); - let sender = transaction.sender(); - - // Validate transaction - validate_transaction(&transaction, sender, store.clone())?; - - // Add transaction and blobs bundle to storage - let hash = transaction.compute_hash(); - store.add_transaction_to_pool(hash, MempoolTransaction::new(transaction, sender))?; - store.add_blobs_bundle_to_pool(hash, blobs_bundle)?; - Ok(hash) +use ethrex_storage::error::StoreError; + +#[derive(Debug, Clone, Default)] +pub struct Mempool { + transaction_pool: Arc>>, + blobs_bundle_pool: Arc>>, } +impl Mempool { + pub fn new() -> Self { + Self::default() + } -/// Add a transaction to the mempool -pub fn add_transaction(transaction: Transaction, store: &Store) -> Result { - // Blob transactions should be submitted via add_blob_transaction along with the corresponding blobs bundle - if matches!(transaction, Transaction::EIP4844Transaction(_)) { - return Err(MempoolError::BlobTxNoBlobsBundle); + /// Add transaction to the pool without doing validity checks + pub fn add_transaction( + &self, + hash: H256, + transaction: MempoolTransaction, + ) -> Result<(), StoreError> { + let mut tx_pool = self + .transaction_pool + .write() + .map_err(|error| StoreError::MempoolWriteLock(error.to_string()))?; + tx_pool.insert(hash, transaction); + + Ok(()) } - let sender = transaction.sender(); - // Validate transaction - validate_transaction(&transaction, sender, store.clone())?; - let hash = transaction.compute_hash(); + /// Add a blobs bundle to the pool by its blob transaction hash + pub fn add_blobs_bundle( + &self, + tx_hash: H256, + blobs_bundle: BlobsBundle, + ) -> Result<(), StoreError> { + self.blobs_bundle_pool + .lock() + .map_err(|error| StoreError::Custom(error.to_string()))? + .insert(tx_hash, blobs_bundle); + Ok(()) + } - // Add transaction to storage - store.add_transaction_to_pool(hash, MempoolTransaction::new(transaction, sender))?; + /// Get a blobs bundle to the pool given its blob transaction hash + pub fn get_blobs_bundle(&self, tx_hash: H256) -> Result, StoreError> { + Ok(self + .blobs_bundle_pool + .lock() + .map_err(|error| StoreError::Custom(error.to_string()))? + .get(&tx_hash) + .cloned()) + } - Ok(hash) -} + /// Remove a transaction from the pool + pub fn remove_transaction(&self, hash: &H256) -> Result<(), StoreError> { + let mut tx_pool = self + .transaction_pool + .write() + .map_err(|error| StoreError::MempoolWriteLock(error.to_string()))?; + if let Some(tx) = tx_pool.get(hash) { + if matches!(tx.tx_type(), TxType::EIP4844) { + self.blobs_bundle_pool + .lock() + .map_err(|error| StoreError::Custom(error.to_string()))? + .remove(&tx.compute_hash()); + } -/// Fetch a blobs bundle from the mempool given its blob transaction hash -pub fn get_blobs_bundle(tx_hash: H256, store: Store) -> Result, MempoolError> { - Ok(store.get_blobs_bundle_from_pool(tx_hash)?) -} + tx_pool.remove(hash); + }; -/// Applies the filter and returns a set of suitable transactions from the mempool. -/// These transactions will be grouped by sender and sorted by nonce -pub fn filter_transactions( - filter: &PendingTxFilter, - store: &Store, -) -> Result>, StoreError> { - let filter_tx = |tx: &Transaction| -> bool { - // Filter by tx type - let is_blob_tx = matches!(tx, Transaction::EIP4844Transaction(_)); - if filter.only_plain_txs && is_blob_tx || filter.only_blob_txs && !is_blob_tx { - return false; - } + Ok(()) + } - // Filter by tip & base_fee - if let Some(min_tip) = filter.min_tip { - if !tx - .effective_gas_tip(filter.base_fee) - .is_some_and(|tip| tip >= min_tip) - { + /// Applies the filter and returns a set of suitable transactions from the mempool. + /// These transactions will be grouped by sender and sorted by nonce + pub fn filter_transactions( + &self, + filter: &PendingTxFilter, + ) -> Result>, StoreError> { + let filter_tx = |tx: &Transaction| -> bool { + // Filter by tx type + let is_blob_tx = matches!(tx, Transaction::EIP4844Transaction(_)); + if filter.only_plain_txs && is_blob_tx || filter.only_blob_txs && !is_blob_tx { return false; } - // This is a temporary fix to avoid invalid transactions to be included. - // This should be removed once https://github.com/lambdaclass/ethrex/issues/680 - // is addressed. - } else if tx.effective_gas_tip(filter.base_fee).is_none() { - return false; - } - // Filter by blob gas fee - if let (true, Some(blob_fee)) = (is_blob_tx, filter.blob_fee) { - if !tx.max_fee_per_blob_gas().is_some_and(|fee| fee >= blob_fee) { + // Filter by tip & base_fee + if let Some(min_tip) = filter.min_tip { + if !tx + .effective_gas_tip(filter.base_fee) + .is_some_and(|tip| tip >= min_tip) + { + return false; + } + // This is a temporary fix to avoid invalid transactions to be included. + // This should be removed once https://github.com/lambdaclass/ethrex/issues/680 + // is addressed. + } else if tx.effective_gas_tip(filter.base_fee).is_none() { return false; } + + // Filter by blob gas fee + if let (true, Some(blob_fee)) = (is_blob_tx, filter.blob_fee) { + if !tx.max_fee_per_blob_gas().is_some_and(|fee| fee >= blob_fee) { + return false; + } + } + true + }; + self.filter_transactions_with_filter_fn(&filter_tx) + } + + /// Applies the filter and returns a set of suitable transactions from the mempool. + /// These transactions will be grouped by sender and sorted by nonce + pub fn filter_transactions_with_filter_fn( + &self, + filter: &dyn Fn(&Transaction) -> bool, + ) -> Result>, StoreError> { + let mut txs_by_sender: HashMap> = HashMap::new(); + let tx_pool = self + .transaction_pool + .read() + .map_err(|error| StoreError::MempoolReadLock(error.to_string()))?; + + for (_, tx) in tx_pool.iter() { + if filter(tx) { + txs_by_sender + .entry(tx.sender()) + .or_default() + .push(tx.clone()) + } } - true - }; - store.filter_pool_transactions(&filter_tx) -} -/// Remove a transaction from the mempool -pub fn remove_transaction(hash: &H256, store: &Store) -> Result<(), StoreError> { - store.remove_transaction_from_pool(hash) -} + txs_by_sender.iter_mut().for_each(|(_, txs)| txs.sort()); + Ok(txs_by_sender) + } -pub fn get_nonce(address: &Address, store: &Store) -> Result, MempoolError> { - let pending_filter = PendingTxFilter { - min_tip: None, - base_fee: None, - blob_fee: None, - only_plain_txs: false, - only_blob_txs: false, - }; + /// Gets hashes from possible_hashes that are not already known in the mempool. + pub fn filter_unknown_transactions( + &self, + possible_hashes: &[H256], + ) -> Result, StoreError> { + let tx_pool = self + .transaction_pool + .read() + .map_err(|error| StoreError::MempoolReadLock(error.to_string()))?; + + let tx_set: HashSet<_> = tx_pool.iter().map(|(hash, _)| hash).collect(); + Ok(possible_hashes + .iter() + .filter(|hash| !tx_set.contains(hash)) + .copied() + .collect()) + } - let pending_txs = filter_transactions(&pending_filter, store)?; - let nonce = match pending_txs.get(address) { - Some(txs) => txs.last().map(|tx| tx.nonce() + 1), - None => None, - }; + pub fn get_transaction_by_hash( + &self, + transaction_hash: H256, + ) -> Result, StoreError> { + let tx = self + .transaction_pool + .read() + .map_err(|error| StoreError::MempoolReadLock(error.to_string()))? + .get(&transaction_hash) + .map(|e| e.clone().into()); + + Ok(tx) + } + + pub fn get_nonce(&self, address: &Address) -> Result, MempoolError> { + let pending_filter = PendingTxFilter { + min_tip: None, + base_fee: None, + blob_fee: None, + only_plain_txs: false, + only_blob_txs: false, + }; - Ok(nonce) + let pending_txs = self.filter_transactions(&pending_filter)?; + let nonce = match pending_txs.get(address) { + Some(txs) => txs.last().map(|tx| tx.nonce() + 1), + None => None, + }; + + Ok(nonce) + } } #[derive(Debug, Default)] @@ -134,113 +209,7 @@ pub struct PendingTxFilter { pub only_blob_txs: bool, } -/* - -SOME VALIDATIONS THAT WE COULD INCLUDE -Stateless validations -1. This transaction is valid on current mempool - -> Depends on mempool transaction filtering logic -2. Ensure the maxPriorityFeePerGas is high enough to cover the requirement of the calling pool (the minimum to be included in) - -> Depends on mempool transaction filtering logic -3. Transaction's encoded size is smaller than maximum allowed - -> I think that this is not in the spec, but it may be a good idea -4. Make sure the transaction is signed properly -5. Ensure a Blob Transaction comes with its sidecar (Done! - All blob validations have been moved to `common/types/blobs_bundle.rs`): - 1. Validate number of BlobHashes is positive (Done!) - 2. Validate number of BlobHashes is less than the maximum allowed per block, - which may be computed as `maxBlobGasPerBlock / blobTxBlobGasPerBlob` - 3. Ensure number of BlobHashes is equal to: - - The number of blobs (Done!) - - The number of commitments (Done!) - - The number of proofs (Done!) - 4. Validate that the hashes matches with the commitments, performing a `kzg4844` hash. (Done!) - 5. Verify the blob proofs with the `kzg4844` (Done!) -Stateful validations -1. Ensure transaction nonce is higher than the `from` address stored nonce -2. Certain pools do not allow for nonce gaps. Ensure a gap is not produced (that is, the transaction nonce is exactly the following of the stored one) -3. Ensure the transactor has enough funds to cover transaction cost: - - Transaction cost is calculated as `(gas * gasPrice) + (blobGas * blobGasPrice) + value` -4. In case of transaction reorg, ensure the transactor has enough funds to cover for transaction replacements without overdrafts. -- This is done by comparing the total spent gas of the transactor from all pooled transactions, and accounting for the necessary gas spenditure if any of those transactions is replaced. -5. Ensure the transactor is able to add a new transaction. The number of transactions sent by an account may be limited by a certain configured value - -*/ - -fn validate_transaction( - tx: &Transaction, - sender: Address, - store: Store, -) -> Result<(), MempoolError> { - // TODO: Add validations here - - let header_no = store.get_latest_block_number()?; - let header = store - .get_block_header(header_no)? - .ok_or(MempoolError::NoBlockHeaderError)?; - let config = store.get_chain_config()?; - - // NOTE: We could add a tx size limit here, but it's not in the actual spec - - // Check init code size - if config.is_shanghai_activated(header.timestamp) - && tx.is_contract_creation() - && tx.data().len() > MAX_INITCODE_SIZE - { - return Err(MempoolError::TxMaxInitCodeSizeError); - } - - // Check gas limit is less than header's gas limit - if header.gas_limit < tx.gas_limit() { - return Err(MempoolError::TxGasLimitExceededError); - } - - // Check priority fee is less or equal than gas fee gap - if tx.max_priority_fee().unwrap_or(0) > tx.max_fee_per_gas().unwrap_or(0) { - return Err(MempoolError::TxTipAboveFeeCapError); - } - - // Check that the gas limit is covers the gas needs for transaction metadata. - if tx.gas_limit() < transaction_intrinsic_gas(tx, &header, &config)? { - return Err(MempoolError::TxIntrinsicGasCostAboveLimitError); - } - - // Check that the specified blob gas fee is above the minimum value - if let Some(fee) = tx.max_fee_per_blob_gas() { - // Blob tx fee checks - if fee < MIN_BASE_FEE_PER_BLOB_GAS.into() { - return Err(MempoolError::TxBlobBaseFeeTooLowError); - } - }; - - let maybe_sender_acc_info = store.get_account_info(header_no, sender)?; - - if let Some(sender_acc_info) = maybe_sender_acc_info { - if tx.nonce() < sender_acc_info.nonce { - return Err(MempoolError::InvalidNonce); - } - - let tx_cost = tx - .cost_without_base_fee() - .ok_or(MempoolError::InvalidTxGasvalues)?; - - if tx_cost > sender_acc_info.balance { - return Err(MempoolError::NotEnoughBalance); - } - } else { - // An account that is not in the database cannot possibly have enough balance to cover the transaction cost - return Err(MempoolError::NotEnoughBalance); - } - - if let Some(chain_id) = tx.chain_id() { - if chain_id != config.chain_id { - return Err(MempoolError::InvalidChainId(config.chain_id)); - } - } - - Ok(()) -} - -fn transaction_intrinsic_gas( +pub fn transaction_intrinsic_gas( tx: &Transaction, header: &BlockHeader, config: &ChainConfig, @@ -302,16 +271,20 @@ fn transaction_intrinsic_gas( } #[cfg(test)] mod tests { + use crate::constants::MAX_INITCODE_SIZE; use crate::error::MempoolError; use crate::mempool::{ - MAX_INITCODE_SIZE, TX_ACCESS_LIST_ADDRESS_GAS, TX_ACCESS_LIST_STORAGE_KEY_GAS, - TX_CREATE_GAS_COST, TX_DATA_NON_ZERO_GAS, TX_DATA_NON_ZERO_GAS_EIP2028, - TX_DATA_ZERO_GAS_COST, TX_GAS_COST, TX_INIT_CODE_WORD_GAS_COST, + Mempool, TX_ACCESS_LIST_ADDRESS_GAS, TX_ACCESS_LIST_STORAGE_KEY_GAS, TX_CREATE_GAS_COST, + TX_DATA_NON_ZERO_GAS, TX_DATA_NON_ZERO_GAS_EIP2028, TX_DATA_ZERO_GAS_COST, TX_GAS_COST, + TX_INIT_CODE_WORD_GAS_COST, }; + use crate::Blockchain; + use std::collections::HashMap; - use super::{transaction_intrinsic_gas, validate_transaction}; + use super::transaction_intrinsic_gas; use ethrex_common::types::{ - BlockHeader, ChainConfig, EIP1559Transaction, EIP4844Transaction, Transaction, TxKind, + BlobsBundle, BlockHeader, ChainConfig, EIP1559Transaction, EIP4844Transaction, + MempoolTransaction, Transaction, TxKind, BYTES_PER_BLOB, }; use ethrex_common::{Address, Bytes, H256, U256}; use ethrex_storage::EngineType; @@ -532,6 +505,7 @@ mod tests { let (config, header) = build_basic_config_and_header(false, true); let store = setup_storage(config, header).expect("Storage setup"); + let blockchain = Blockchain::default_with_store(store); let tx = EIP1559Transaction { nonce: 3, @@ -546,7 +520,7 @@ mod tests { }; let tx = Transaction::EIP1559Transaction(tx); - let validation = validate_transaction(&tx, Address::random(), store); + let validation = blockchain.validate_transaction(&tx, Address::random()); assert!(matches!( validation, Err(MempoolError::TxMaxInitCodeSizeError) @@ -558,6 +532,7 @@ mod tests { let (config, header) = build_basic_config_and_header(false, false); let store = setup_storage(config, header).expect("Storage setup"); + let blockchain = Blockchain::default_with_store(store); let tx = EIP1559Transaction { nonce: 3, @@ -572,7 +547,7 @@ mod tests { }; let tx = Transaction::EIP1559Transaction(tx); - let validation = validate_transaction(&tx, Address::random(), store); + let validation = blockchain.validate_transaction(&tx, Address::random()); assert!(matches!( validation, Err(MempoolError::TxGasLimitExceededError) @@ -584,6 +559,7 @@ mod tests { let (config, header) = build_basic_config_and_header(false, false); let store = setup_storage(config, header).expect("Storage setup"); + let blockchain = Blockchain::default_with_store(store); let tx = EIP1559Transaction { nonce: 3, @@ -598,7 +574,7 @@ mod tests { }; let tx = Transaction::EIP1559Transaction(tx); - let validation = validate_transaction(&tx, Address::random(), store); + let validation = blockchain.validate_transaction(&tx, Address::random()); assert!(matches!( validation, Err(MempoolError::TxTipAboveFeeCapError) @@ -609,7 +585,7 @@ mod tests { fn transaction_with_gas_limit_lower_than_intrinsic_gas_should_fail() { let (config, header) = build_basic_config_and_header(false, false); let store = setup_storage(config, header).expect("Storage setup"); - + let blockchain = Blockchain::default_with_store(store); let intrinsic_gas_cost = TX_GAS_COST; let tx = EIP1559Transaction { @@ -625,7 +601,7 @@ mod tests { }; let tx = Transaction::EIP1559Transaction(tx); - let validation = validate_transaction(&tx, Address::random(), store); + let validation = blockchain.validate_transaction(&tx, Address::random()); assert!(matches!( validation, Err(MempoolError::TxIntrinsicGasCostAboveLimitError) @@ -636,6 +612,7 @@ mod tests { fn transaction_with_blob_base_fee_below_min_should_fail() { let (config, header) = build_basic_config_and_header(false, false); let store = setup_storage(config, header).expect("Storage setup"); + let blockchain = Blockchain::default_with_store(store); let tx = EIP4844Transaction { nonce: 3, @@ -651,10 +628,49 @@ mod tests { }; let tx = Transaction::EIP4844Transaction(tx); - let validation = validate_transaction(&tx, Address::random(), store); + let validation = blockchain.validate_transaction(&tx, Address::random()); assert!(matches!( validation, Err(MempoolError::TxBlobBaseFeeTooLowError) )); } + + #[test] + fn test_filter_mempool_transactions() { + let plain_tx_decoded = Transaction::decode_canonical(&hex::decode("f86d80843baa0c4082f618946177843db3138ae69679a54b95cf345ed759450d870aa87bee538000808360306ba0151ccc02146b9b11adf516e6787b59acae3e76544fdcd75e77e67c6b598ce65da064c5dd5aae2fbb535830ebbdad0234975cd7ece3562013b63ea18cc0df6c97d4").unwrap()).unwrap(); + let plain_tx_sender = plain_tx_decoded.sender(); + let plain_tx = MempoolTransaction::new(plain_tx_decoded, plain_tx_sender); + let blob_tx_decoded = Transaction::decode_canonical(&hex::decode("03f88f0780843b9aca008506fc23ac00830186a09400000000000000000000000000000000000001008080c001e1a0010657f37554c781402a22917dee2f75def7ab966d7b770905398eba3c44401401a0840650aa8f74d2b07f40067dc33b715078d73422f01da17abdbd11e02bbdfda9a04b2260f6022bf53eadb337b3e59514936f7317d872defb891a708ee279bdca90").unwrap()).unwrap(); + let blob_tx_sender = blob_tx_decoded.sender(); + let blob_tx = MempoolTransaction::new(blob_tx_decoded, blob_tx_sender); + let plain_tx_hash = plain_tx.compute_hash(); + let blob_tx_hash = blob_tx.compute_hash(); + let mempool = Mempool::new(); + let filter = + |tx: &Transaction| -> bool { matches!(tx, Transaction::EIP4844Transaction(_)) }; + mempool + .add_transaction(blob_tx_hash, blob_tx.clone()) + .unwrap(); + mempool.add_transaction(plain_tx_hash, plain_tx).unwrap(); + let txs = mempool.filter_transactions_with_filter_fn(&filter).unwrap(); + assert_eq!(txs, HashMap::from([(blob_tx.sender(), vec![blob_tx])])); + } + + #[test] + fn blobs_bundle_loadtest() { + // Write a bundle of 6 blobs 10 times + // If this test fails please adjust the max_size in the DB config + let mempool = Mempool::new(); + for i in 0..300 { + let blobs = [[i as u8; BYTES_PER_BLOB]; 6]; + let commitments = [[i as u8; 48]; 6]; + let proofs = [[i as u8; 48]; 6]; + let bundle = BlobsBundle { + blobs: blobs.to_vec(), + commitments: commitments.to_vec(), + proofs: proofs.to_vec(), + }; + mempool.add_blobs_bundle(H256::random(), bundle).unwrap(); + } + } } diff --git a/crates/blockchain/payload.rs b/crates/blockchain/payload.rs index 6eb3e9df75..2f16f96c89 100644 --- a/crates/blockchain/payload.rs +++ b/crates/blockchain/payload.rs @@ -37,7 +37,7 @@ use ethrex_metrics::metrics_transactions::{MetricsTxStatus, MetricsTxType, METRI use crate::{ constants::{GAS_LIMIT_BOUND_DIVISOR, MIN_GAS_LIMIT, TX_GAS_COST}, error::{ChainError, InvalidBlockError}, - mempool::{self, PendingTxFilter}, + mempool::PendingTxFilter, Blockchain, }; @@ -321,18 +321,15 @@ impl Blockchain { only_blob_txs: true, ..tx_filter }; - let store = context.store().ok_or(StoreError::Custom( - "no store in the context (is an ExecutionDB being used?)".to_string(), - ))?; Ok(( // Plain txs TransactionQueue::new( - mempool::filter_transactions(&plain_tx_filter, store)?, + self.mempool.filter_transactions(&plain_tx_filter)?, context.base_fee_per_gas(), )?, // Blob txs TransactionQueue::new( - mempool::filter_transactions(&blob_tx_filter, store)?, + self.mempool.filter_transactions(&blob_tx_filter)?, context.base_fee_per_gas(), )?, )) @@ -397,12 +394,7 @@ impl Blockchain { // Pull transaction from the mempool debug!("Ignoring replay-protected transaction: {}", tx_hash); txs.pop(); - mempool::remove_transaction( - &head_tx.tx.compute_hash(), - context - .store() - .ok_or(ChainError::StoreError(StoreError::MissingStore))?, - )?; + self.remove_transaction_from_pool(&head_tx.tx.compute_hash())?; continue; } @@ -416,12 +408,7 @@ impl Blockchain { Ok(receipt) => { txs.shift()?; // Pull transaction from the mempool - mempool::remove_transaction( - &head_tx.tx.compute_hash(), - context - .store() - .ok_or(ChainError::StoreError(StoreError::MissingStore))?, - )?; + self.remove_transaction_from_pool(&head_tx.tx.compute_hash())?; metrics!(METRICS_TX.inc_tx_with_status_and_type( MetricsTxStatus::Succeeded, @@ -475,12 +462,7 @@ impl Blockchain { .get_fork_blob_schedule(context.payload.header.timestamp) .map(|schedule| schedule.max) .unwrap_or_default() as usize; - - let Some(blobs_bundle) = context - .store() - .ok_or(ChainError::StoreError(StoreError::MissingStore))? - .get_blobs_bundle_from_pool(tx_hash)? - else { + let Some(blobs_bundle) = self.mempool.get_blobs_bundle(tx_hash)? else { // No blob tx should enter the mempool without its blobs bundle so this is an internal error return Err( StoreError::Custom(format!("No blobs bundle found for blob tx {tx_hash}")).into(), diff --git a/crates/l2/proposer/l1_watcher.rs b/crates/l2/proposer/l1_watcher.rs index b2c1cdfe1c..4b777d5b88 100644 --- a/crates/l2/proposer/l1_watcher.rs +++ b/crates/l2/proposer/l1_watcher.rs @@ -4,7 +4,7 @@ use crate::{ }; use bytes::Bytes; use ethereum_types::{Address, BigEndianHash, H256, U256}; -use ethrex_blockchain::{constants::TX_GAS_COST, mempool}; +use ethrex_blockchain::{constants::TX_GAS_COST, Blockchain}; use ethrex_common::types::{Signable, Transaction}; use ethrex_rpc::clients::eth::{errors::EthClientError, eth_sender::Overrides, EthClient}; use ethrex_rpc::types::receipt::RpcLog; @@ -15,11 +15,11 @@ use std::{cmp::min, ops::Mul, time::Duration}; use tokio::time::sleep; use tracing::{debug, error, info, warn}; -pub async fn start_l1_watcher(store: Store) -> Result<(), ConfigError> { +pub async fn start_l1_watcher(store: Store, blockchain: Blockchain) -> Result<(), ConfigError> { let eth_config = EthConfig::from_env()?; let watcher_config = L1WatcherConfig::from_env()?; let mut l1_watcher = L1Watcher::new_from_config(watcher_config, eth_config).await?; - l1_watcher.run(&store).await; + l1_watcher.run(&store, &blockchain).await; Ok(()) } @@ -55,9 +55,9 @@ impl L1Watcher { }) } - pub async fn run(&mut self, store: &Store) { + pub async fn run(&mut self, store: &Store, blockchain: &Blockchain) { loop { - if let Err(err) = self.main_logic(store).await { + if let Err(err) = self.main_logic(store, blockchain).await { error!("L1 Watcher Error: {}", err); } @@ -65,7 +65,11 @@ impl L1Watcher { } } - async fn main_logic(&mut self, store: &Store) -> Result<(), L1WatcherError> { + async fn main_logic( + &mut self, + store: &Store, + blockchain: &Blockchain, + ) -> Result<(), L1WatcherError> { loop { sleep(self.check_interval).await; @@ -78,7 +82,7 @@ impl L1Watcher { let pending_deposit_logs = self.get_pending_deposit_logs().await?; let _deposit_txs = self - .process_logs(logs, &pending_deposit_logs, store) + .process_logs(logs, &pending_deposit_logs, store, blockchain) .await?; } } @@ -162,6 +166,7 @@ impl L1Watcher { logs: Vec, pending_deposit_logs: &[H256], store: &Store, + blockchain: &Blockchain, ) -> Result, L1WatcherError> { let mut deposit_txs = Vec::new(); @@ -267,10 +272,9 @@ impl L1Watcher { .await?; mint_transaction.sign_inplace(&self.l2_proposer_pk); - match mempool::add_transaction( - Transaction::PrivilegedL2Transaction(mint_transaction), - store, - ) { + match blockchain + .add_transaction_to_pool(Transaction::PrivilegedL2Transaction(mint_transaction)) + { Ok(hash) => { info!("Mint transaction added to mempool {hash:#x}",); deposit_txs.push(hash); diff --git a/crates/l2/proposer/mod.rs b/crates/l2/proposer/mod.rs index 51f327f438..ec96e6478d 100644 --- a/crates/l2/proposer/mod.rs +++ b/crates/l2/proposer/mod.rs @@ -3,6 +3,7 @@ use std::time::Duration; use crate::utils::config::{errors::ConfigError, proposer::ProposerConfig, read_env_file}; use errors::ProposerError; use ethereum_types::Address; +use ethrex_blockchain::Blockchain; use ethrex_rpc::clients::EngineApiConfig; use ethrex_storage::Store; use tokio::task::JoinSet; @@ -25,7 +26,7 @@ pub struct Proposer { jwt_secret: Vec, } -pub async fn start_proposer(store: Store) { +pub async fn start_proposer(store: Store, blockchain: Blockchain) { info!("Starting Proposer"); if let Err(e) = read_env_file() { @@ -34,7 +35,10 @@ pub async fn start_proposer(store: Store) { } let mut task_set = JoinSet::new(); - task_set.spawn(l1_watcher::start_l1_watcher(store.clone())); + task_set.spawn(l1_watcher::start_l1_watcher( + store.clone(), + blockchain.clone(), + )); task_set.spawn(l1_committer::start_l1_committer(store.clone())); task_set.spawn(prover_server::start_prover_server(store.clone())); task_set.spawn(start_proposer_server(store.clone())); diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index 76247d116b..251622821c 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -630,6 +630,7 @@ pub(super) mod tests { network::{node_id_from_signing_key, serve_p2p_requests, MAX_MESSAGES_TO_BROADCAST}, rlpx::message::Message as RLPxMessage, }; + use ethrex_blockchain::Blockchain; use ethrex_storage::{EngineType, Store}; use k256::ecdsa::SigningKey; use rand::rngs::OsRng; @@ -677,6 +678,7 @@ pub(super) mod tests { let storage = Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB"); + let blockchain = Blockchain::default_with_store(storage.clone()); let table = Arc::new(Mutex::new(KademliaTable::new(node_id))); let (broadcast, _) = tokio::sync::broadcast::channel::<(tokio::task::Id, Arc)>( MAX_MESSAGES_TO_BROADCAST, @@ -690,6 +692,7 @@ pub(super) mod tests { signer, table, storage, + blockchain, broadcast, }; diff --git a/crates/networking/p2p/network.rs b/crates/networking/p2p/network.rs index 5f771d45ad..5428753fcc 100644 --- a/crates/networking/p2p/network.rs +++ b/crates/networking/p2p/network.rs @@ -11,6 +11,7 @@ use crate::{ }, rlpx::utils::log_peer_error, }; +use ethrex_blockchain::Blockchain; use ethrex_common::H512; use ethrex_storage::Store; use k256::{ @@ -47,6 +48,7 @@ pub struct P2PContext { pub signer: SigningKey, pub table: Arc>, pub storage: Store, + pub blockchain: Blockchain, pub(crate) broadcast: RLPxConnBroadcastSender, pub local_node: Node, pub enr_seq: u64, @@ -59,6 +61,7 @@ pub async fn start_network( signer: SigningKey, peer_table: Arc>, storage: Store, + blockchain: Blockchain, ) -> Result<(), NetworkError> { let (channel_broadcast_send_end, _) = tokio::sync::broadcast::channel::<( tokio::task::Id, @@ -75,6 +78,7 @@ pub async fn start_network( signer, table: peer_table, storage, + blockchain, broadcast: channel_broadcast_send_end, }; let discovery = Discv4Server::try_new(context.clone()) diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 419720fa56..e2a693f668 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -19,11 +19,12 @@ use crate::{ }, types::Node, }; -use ethrex_blockchain::mempool::{self}; +use ethrex_blockchain::Blockchain; use ethrex_common::{ types::{MempoolTransaction, Transaction}, H256, H512, }; + use ethrex_storage::Store; use futures::SinkExt; use k256::{ecdsa::SigningKey, PublicKey, SecretKey}; @@ -74,6 +75,7 @@ pub(crate) struct RLPxConnection { node: Node, framed: Framed, storage: Store, + blockchain: Blockchain, capabilities: Vec<(Capability, u8)>, negotiated_eth_version: u8, negotiated_snap_version: u8, @@ -98,6 +100,7 @@ impl RLPxConnection { stream: S, codec: RLPxCodec, storage: Store, + blockchain: Blockchain, connection_broadcast: RLPxConnBroadcastSender, ) -> Self { Self { @@ -105,6 +108,7 @@ impl RLPxConnection { node, framed: Framed::new(stream, codec), storage, + blockchain, capabilities: vec![], negotiated_eth_version: 0, negotiated_snap_version: 0, @@ -349,8 +353,9 @@ impl RLPxConnection { let filter = |tx: &Transaction| -> bool { !self.broadcasted_txs.contains(&tx.compute_hash()) }; let txs: Vec = self - .storage - .filter_pool_transactions(&filter)? + .blockchain + .mempool + .filter_transactions_with_filter_fn(&filter)? .into_values() .flatten() .collect(); @@ -358,7 +363,7 @@ impl RLPxConnection { let tx_count = txs.len(); for tx in txs { self.send(Message::NewPooledTransactionHashes( - NewPooledTransactionHashes::new(vec![(*tx).clone()], &self.storage)?, + NewPooledTransactionHashes::new(vec![(*tx).clone()], &self.blockchain)?, )) .await?; // Possible improvement: the mempool already knows the hash but the filter function does not return it @@ -412,7 +417,7 @@ impl RLPxConnection { if is_synced { let mut valid_txs = vec![]; for tx in &txs.transactions { - if let Err(e) = mempool::add_transaction(tx.clone(), &self.storage) { + if let Err(e) = self.blockchain.add_transaction_to_pool(tx.clone()) { log_peer_warn(&self.node, &format!("Error adding transaction: {}", e)); continue; } @@ -451,19 +456,19 @@ impl RLPxConnection { { //TODO(#1415): evaluate keeping track of requests to avoid sending the same twice. let hashes = - new_pooled_transaction_hashes.get_transactions_to_request(&self.storage)?; + new_pooled_transaction_hashes.get_transactions_to_request(&self.blockchain)?; //TODO(#1416): Evaluate keeping track of the request-id. let request = GetPooledTransactions::new(random(), hashes); self.send(Message::GetPooledTransactions(request)).await?; } Message::GetPooledTransactions(msg) => { - let response = msg.handle(&self.storage)?; + let response = msg.handle(&self.blockchain)?; self.send(Message::PooledTransactions(response)).await?; } Message::PooledTransactions(msg) if peer_supports_eth => { if is_synced { - msg.handle(&self.node, &self.storage)?; + msg.handle(&self.node, &self.blockchain)?; } } Message::GetStorageRanges(req) => { diff --git a/crates/networking/p2p/rlpx/eth/transactions.rs b/crates/networking/p2p/rlpx/eth/transactions.rs index 637488f175..66c346dc2f 100644 --- a/crates/networking/p2p/rlpx/eth/transactions.rs +++ b/crates/networking/p2p/rlpx/eth/transactions.rs @@ -1,7 +1,7 @@ use bytes::BufMut; use bytes::Bytes; use ethrex_blockchain::error::MempoolError; -use ethrex_blockchain::mempool; +use ethrex_blockchain::Blockchain; use ethrex_common::types::BlobsBundle; use ethrex_common::types::P2PTransaction; use ethrex_common::types::WrappedEIP4844Transaction; @@ -10,7 +10,7 @@ use ethrex_rlp::{ error::{RLPDecodeError, RLPEncodeError}, structs::{Decoder, Encoder}, }; -use ethrex_storage::{error::StoreError, Store}; +use ethrex_storage::error::StoreError; use crate::rlpx::utils::log_peer_warn; use crate::rlpx::{ @@ -72,7 +72,10 @@ pub(crate) struct NewPooledTransactionHashes { } impl NewPooledTransactionHashes { - pub fn new(transactions: Vec, storage: &Store) -> Result { + pub fn new( + transactions: Vec, + blockchain: &Blockchain, + ) -> Result { let transactions_len = transactions.len(); let mut transaction_types = Vec::with_capacity(transactions_len); let mut transaction_sizes = Vec::with_capacity(transactions_len); @@ -89,8 +92,9 @@ impl NewPooledTransactionHashes { // Network representation for PooledTransactions // https://eips.ethereum.org/EIPS/eip-4844#networking Transaction::EIP4844Transaction(eip4844_tx) => { - let tx_blobs_bundle = storage - .get_blobs_bundle_from_pool(transaction_hash)? + let tx_blobs_bundle = blockchain + .mempool + .get_blobs_bundle(transaction_hash)? .unwrap_or(BlobsBundle::empty()); eip4844_tx.rlp_length_as_pooled_tx(&tx_blobs_bundle) } @@ -105,8 +109,13 @@ impl NewPooledTransactionHashes { }) } - pub fn get_transactions_to_request(&self, storage: &Store) -> Result, StoreError> { - storage.filter_unknown_transactions(&self.transaction_hashes) + pub fn get_transactions_to_request( + &self, + blockchain: &Blockchain, + ) -> Result, StoreError> { + blockchain + .mempool + .filter_unknown_transactions(&self.transaction_hashes) } } @@ -166,12 +175,12 @@ impl GetPooledTransactions { } } - pub fn handle(&self, store: &Store) -> Result { + pub fn handle(&self, blockchain: &Blockchain) -> Result { // TODO(#1615): get transactions in batch instead of iterating over them. let txs = self .transaction_hashes .iter() - .map(|hash| Self::get_p2p_transaction(hash, store)) + .map(|hash| Self::get_p2p_transaction(hash, blockchain)) // Return an error in case anything failed. .collect::, _>>()? .into_iter() @@ -189,9 +198,9 @@ impl GetPooledTransactions { /// Gets a p2p transaction given a hash. fn get_p2p_transaction( hash: &H256, - store: &Store, + blockchain: &Blockchain, ) -> Result, StoreError> { - let Some(tx) = store.get_transaction_by_hash_from_pool(*hash)? else { + let Some(tx) = blockchain.mempool.get_transaction_by_hash(*hash)? else { return Ok(None); }; let result = match tx { @@ -199,7 +208,7 @@ impl GetPooledTransactions { Transaction::EIP2930Transaction(itx) => P2PTransaction::EIP2930Transaction(itx), Transaction::EIP1559Transaction(itx) => P2PTransaction::EIP1559Transaction(itx), Transaction::EIP4844Transaction(itx) => { - let Some(bundle) = store.get_blobs_bundle_from_pool(*hash)? else { + let Some(bundle) = blockchain.mempool.get_blobs_bundle(*hash)? else { return Err(StoreError::Custom(format!( "Blob transaction present without its bundle: hash {}", hash @@ -263,10 +272,10 @@ impl PooledTransactions { /// Saves every incoming pooled transaction to the mempool. - pub fn handle(self, node: &Node, store: &Store) -> Result<(), MempoolError> { + pub fn handle(self, node: &Node, blockchain: &Blockchain) -> Result<(), MempoolError> { for tx in self.pooled_transactions { if let P2PTransaction::EIP4844TransactionWithBlobs(itx) = tx { - if let Err(e) = mempool::add_blob_transaction(itx.tx, itx.blobs_bundle, store) { + if let Err(e) = blockchain.add_blob_transaction_to_pool(itx.tx, itx.blobs_bundle) { log_peer_warn(node, &format!("Error adding transaction: {}", e)); continue; } @@ -274,7 +283,7 @@ impl PooledTransactions { let regular_tx = tx .try_into() .map_err(|error| MempoolError::StoreError(StoreError::Custom(error)))?; - if let Err(e) = mempool::add_transaction(regular_tx, store) { + if let Err(e) = blockchain.add_transaction_to_pool(regular_tx) { log_peer_warn(node, &format!("Error adding transaction: {}", e)); continue; } diff --git a/crates/networking/p2p/rlpx/handshake.rs b/crates/networking/p2p/rlpx/handshake.rs index 85da3b72a8..5031f88c49 100644 --- a/crates/networking/p2p/rlpx/handshake.rs +++ b/crates/networking/p2p/rlpx/handshake.rs @@ -58,6 +58,7 @@ where stream, codec, context.storage, + context.blockchain, context.broadcast, )) } @@ -84,6 +85,7 @@ where stream, codec, context.storage, + context.blockchain, context.broadcast, )) } diff --git a/crates/networking/rpc/engine/fork_choice.rs b/crates/networking/rpc/engine/fork_choice.rs index 9f29b218de..501bf3853f 100644 --- a/crates/networking/rpc/engine/fork_choice.rs +++ b/crates/networking/rpc/engine/fork_choice.rs @@ -242,10 +242,12 @@ fn handle_forkchoice( // Remove included transactions from the mempool after we accept the fork choice // TODO(#797): The remove of transactions from the mempool could be incomplete (i.e. REORGS) if let Ok(Some(block)) = context.storage.get_block_by_hash(head.compute_block_hash()) { - context - .storage - .remove_transactions_from_pool(&block.body.transactions) - .map_err(|err| RpcErr::Internal(err.to_string()))?; + for tx in &block.body.transactions { + context + .blockchain + .remove_transaction_from_pool(&tx.compute_hash()) + .map_err(|err| RpcErr::Internal(err.to_string()))?; + } } else { return Err(RpcErr::Internal( "Failed to get block by hash to remove transactions from the mempool" diff --git a/crates/networking/rpc/eth/account.rs b/crates/networking/rpc/eth/account.rs index e1746f6e40..4e1ac33305 100644 --- a/crates/networking/rpc/eth/account.rs +++ b/crates/networking/rpc/eth/account.rs @@ -1,4 +1,3 @@ -use ethrex_blockchain::mempool; use serde_json::Value; use tracing::info; @@ -162,7 +161,7 @@ impl RpcHandler for GetTransactionCountRequest { // If the tag is Pending, we need to get the nonce from the mempool let pending_nonce = if self.block == BlockTag::Pending { - mempool::get_nonce(&self.address, &context.storage)? + context.blockchain.mempool.get_nonce(&self.address)? } else { None }; diff --git a/crates/networking/rpc/eth/filter.rs b/crates/networking/rpc/eth/filter.rs index edd714d9ca..6dbe67859d 100644 --- a/crates/networking/rpc/eth/filter.rs +++ b/crates/networking/rpc/eth/filter.rs @@ -246,6 +246,7 @@ impl FilterChangesRequest { #[cfg(test)] mod tests { + use ethrex_blockchain::Blockchain; use std::{ collections::HashMap, sync::{Arc, Mutex}, @@ -436,9 +437,12 @@ mod tests { json_req: serde_json::Value, filters_pointer: ActiveFilters, ) -> u64 { + let storage = Store::new("in-mem", EngineType::InMemory) + .expect("Fatal: could not create in memory test db"); + let blockchain = Blockchain::default_with_store(storage.clone()); let context = RpcApiContext { - storage: Store::new("in-mem", EngineType::InMemory) - .expect("Fatal: could not create in memory test db"), + storage, + blockchain, jwt_secret: Default::default(), local_p2p_node: example_p2p_node(), local_node_record: example_local_node_record(), @@ -499,8 +503,12 @@ mod tests { ); let active_filters = Arc::new(Mutex::new(HashMap::from([filter]))); + let storage = Store::new("in-mem", EngineType::InMemory) + .expect("Fatal: could not create in memory test db"); + let blockchain = Blockchain::default_with_store(storage.clone()); let context = RpcApiContext { - storage: Store::new("in-mem", EngineType::InMemory).unwrap(), + storage, + blockchain, local_p2p_node: example_p2p_node(), local_node_record: example_local_node_record(), jwt_secret: Default::default(), @@ -526,8 +534,12 @@ mod tests { async fn removing_non_existing_filter_returns_false() { let active_filters = Arc::new(Mutex::new(HashMap::new())); + let storage = Store::new("in-mem", EngineType::InMemory) + .expect("Fatal: could not create in memory test db"); + let blockchain = Blockchain::default_with_store(storage.clone()); let context = RpcApiContext { - storage: Store::new("in-mem", EngineType::InMemory).unwrap(), + storage, + blockchain, local_p2p_node: example_p2p_node(), local_node_record: example_local_node_record(), active_filters: active_filters.clone(), diff --git a/crates/networking/rpc/eth/gas_price.rs b/crates/networking/rpc/eth/gas_price.rs index 9dacf5edcd..7e3a5e3f07 100644 --- a/crates/networking/rpc/eth/gas_price.rs +++ b/crates/networking/rpc/eth/gas_price.rs @@ -65,14 +65,18 @@ mod tests { use crate::{EngineClient, EthClient}; #[cfg(feature = "based")] use bytes::Bytes; + use ethrex_blockchain::Blockchain; use ethrex_p2p::sync::SyncManager; use serde_json::json; use std::sync::Arc; use tokio::sync::Mutex; fn default_context() -> RpcApiContext { + let storage = setup_store(); + let blockchain = Blockchain::default_with_store(storage.clone()); RpcApiContext { - storage: setup_store(), + storage, + blockchain, jwt_secret: Default::default(), local_p2p_node: example_p2p_node(), local_node_record: example_local_node_record(), diff --git a/crates/networking/rpc/eth/max_priority_fee.rs b/crates/networking/rpc/eth/max_priority_fee.rs index 6fdaf7ac31..fc82b1cca4 100644 --- a/crates/networking/rpc/eth/max_priority_fee.rs +++ b/crates/networking/rpc/eth/max_priority_fee.rs @@ -48,14 +48,18 @@ mod tests { use crate::{EngineClient, EthClient}; #[cfg(feature = "based")] use bytes::Bytes; + use ethrex_blockchain::Blockchain; use ethrex_p2p::sync::SyncManager; use serde_json::{json, Value}; use std::sync::Arc; use tokio::sync::Mutex; fn default_context() -> RpcApiContext { + let storage = setup_store(); + let blockchain = Blockchain::default_with_store(storage.clone()); RpcApiContext { - storage: setup_store(), + storage, + blockchain, jwt_secret: Default::default(), local_p2p_node: example_p2p_node(), local_node_record: example_local_node_record(), diff --git a/crates/networking/rpc/eth/transaction.rs b/crates/networking/rpc/eth/transaction.rs index a28388c086..1e186983dd 100644 --- a/crates/networking/rpc/eth/transaction.rs +++ b/crates/networking/rpc/eth/transaction.rs @@ -14,7 +14,6 @@ use ethrex_common::{ H256, U256, }; -use ethrex_blockchain::mempool; use ethrex_rlp::encode::RLPEncode; use ethrex_storage::Store; @@ -617,13 +616,14 @@ impl RpcHandler for SendRawTransactionRequest { fn handle(&self, context: RpcApiContext) -> Result { let hash = if let SendRawTransactionRequest::EIP4844(wrapped_blob_tx) = self { - mempool::add_blob_transaction( + context.blockchain.add_blob_transaction_to_pool( wrapped_blob_tx.tx.clone(), wrapped_blob_tx.blobs_bundle.clone(), - &context.storage, ) } else { - mempool::add_transaction(self.to_transaction(), &context.storage) + context + .blockchain + .add_transaction_to_pool(self.to_transaction()) }?; serde_json::to_value(format!("{:#x}", hash)) .map_err(|error| RpcErr::Internal(error.to_string())) diff --git a/crates/networking/rpc/rpc.rs b/crates/networking/rpc/rpc.rs index 219c9a6525..56cf3cc30e 100644 --- a/crates/networking/rpc/rpc.rs +++ b/crates/networking/rpc/rpc.rs @@ -36,6 +36,7 @@ use eth::{ GetTransactionByHashRequest, GetTransactionReceiptRequest, }, }; +use ethrex_blockchain::Blockchain; use ethrex_p2p::{sync::SyncManager, types::NodeRecord}; use serde::Deserialize; use serde_json::Value; @@ -79,6 +80,7 @@ enum RpcRequestWrapper { #[derive(Debug, Clone)] pub struct RpcApiContext { storage: Store, + blockchain: Blockchain, jwt_secret: Bytes, local_p2p_node: Node, local_node_record: NodeRecord, @@ -153,6 +155,7 @@ pub async fn start_api( http_addr: SocketAddr, authrpc_addr: SocketAddr, storage: Store, + blockchain: Blockchain, jwt_secret: Bytes, local_p2p_node: Node, local_node_record: NodeRecord, @@ -164,7 +167,8 @@ pub async fn start_api( // filters are used by the filters endpoints (eth_newFilter, eth_getFilterChanges, ...etc) let active_filters = Arc::new(Mutex::new(HashMap::new())); let service_context = RpcApiContext { - storage: storage.clone(), + storage, + blockchain, jwt_secret, local_p2p_node, local_node_record, @@ -453,11 +457,12 @@ where mod tests { use super::*; use crate::utils::test_utils::{example_local_node_record, example_p2p_node}; + use ethrex_blockchain::Blockchain; use ethrex_common::{ constants::MAINNET_DEPOSIT_CONTRACT_ADDRESS, types::{ChainConfig, Genesis}, }; - use ethrex_storage::EngineType; + use ethrex_storage::{EngineType, Store}; use sha3::{Digest, Keccak256}; use std::fs::File; use std::io::BufReader; @@ -481,10 +486,12 @@ mod tests { let storage = Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB"); storage.set_chain_config(&example_chain_config()).unwrap(); + let blockchain = Blockchain::default_with_store(storage.clone()); let context = RpcApiContext { local_p2p_node, local_node_record: example_local_node_record(), storage, + blockchain, jwt_secret: Default::default(), active_filters: Default::default(), syncer: Arc::new(TokioMutex::new(SyncManager::dummy())), @@ -565,6 +572,7 @@ mod tests { // Setup initial storage let storage = Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB"); + let blockchain = Blockchain::default_with_store(storage.clone()); let genesis = read_execution_api_genesis_file(); storage .add_initial_state(genesis) @@ -575,6 +583,7 @@ mod tests { local_p2p_node, local_node_record: example_local_node_record(), storage, + blockchain, jwt_secret: Default::default(), active_filters: Default::default(), syncer: Arc::new(TokioMutex::new(SyncManager::dummy())), @@ -600,6 +609,7 @@ mod tests { // Setup initial storage let storage = Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB"); + let blockchain = Blockchain::default_with_store(storage.clone()); let genesis = read_execution_api_genesis_file(); storage .add_initial_state(genesis) @@ -610,6 +620,7 @@ mod tests { local_p2p_node, local_node_record: example_local_node_record(), storage, + blockchain, jwt_secret: Default::default(), active_filters: Default::default(), syncer: Arc::new(TokioMutex::new(SyncManager::dummy())), @@ -666,6 +677,7 @@ mod tests { let storage = Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB"); storage.set_chain_config(&example_chain_config()).unwrap(); + let blockchain = Blockchain::default_with_store(storage.clone()); let chain_id = storage .get_chain_config() .expect("failed to get chain_id") @@ -674,6 +686,7 @@ mod tests { let local_p2p_node = example_p2p_node(); let context = RpcApiContext { storage, + blockchain, local_p2p_node, local_node_record: example_local_node_record(), jwt_secret: Default::default(), diff --git a/crates/networking/rpc/utils.rs b/crates/networking/rpc/utils.rs index e9fd50dda3..1b0d9c2ac8 100644 --- a/crates/networking/rpc/utils.rs +++ b/crates/networking/rpc/utils.rs @@ -254,6 +254,7 @@ pub fn parse_json_hex(hex: &serde_json::Value) -> Result { pub mod test_utils { use std::{net::SocketAddr, str::FromStr}; + use ethrex_blockchain::Blockchain; use ethrex_common::H512; use ethrex_p2p::{ sync::SyncManager, @@ -311,7 +312,7 @@ pub mod test_utils { storage .add_initial_state(serde_json::from_str(TEST_GENESIS).unwrap()) .expect("Failed to build test genesis"); - + let blockchain = Blockchain::default_with_store(storage.clone()); let jwt_secret = Default::default(); let local_p2p_node = example_p2p_node(); #[cfg(feature = "based")] @@ -322,6 +323,7 @@ pub mod test_utils { http_addr, authrpc_addr, storage, + blockchain, jwt_secret, local_p2p_node, example_local_node_record(), diff --git a/crates/storage/store.rs b/crates/storage/store.rs index cc219d2029..b9d9ab5664 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -10,17 +10,17 @@ use bytes::Bytes; use ethereum_types::{Address, H256, U256}; use ethrex_common::types::{ code_hash, AccountInfo, AccountState, BlobsBundle, Block, BlockBody, BlockHash, BlockHeader, - BlockNumber, ChainConfig, Genesis, GenesisAccount, Index, MempoolTransaction, Receipt, - Transaction, TxType, EMPTY_TRIE_HASH, + BlockNumber, ChainConfig, Genesis, GenesisAccount, Index, Receipt, Transaction, + EMPTY_TRIE_HASH, }; use ethrex_rlp::decode::RLPDecode; use ethrex_rlp::encode::RLPEncode; use ethrex_trie::{Nibbles, Trie}; use serde::{Deserialize, Serialize}; use sha3::{Digest as _, Keccak256}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::fmt::Debug; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::Arc; use tracing::info; /// Number of state trie segments to fetch concurrently during state sync @@ -32,8 +32,6 @@ pub const MAX_SNAPSHOT_READS: usize = 100; #[derive(Debug, Clone)] pub struct Store { engine: Arc, - pub mempool: Arc>>, - pub blobs_bundle_pool: Arc>>, } #[allow(dead_code)] @@ -83,19 +81,13 @@ impl Store { #[cfg(feature = "libmdbx")] EngineType::Libmdbx => Self { engine: Arc::new(LibmdbxStore::new(path)?), - mempool: Arc::new(RwLock::new(HashMap::new())), - blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), }, EngineType::InMemory => Self { engine: Arc::new(InMemoryStore::new()), - mempool: Arc::new(RwLock::new(HashMap::new())), - blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), }, #[cfg(feature = "redb")] EngineType::RedB => Self { engine: Arc::new(RedBStore::new()?), - mempool: Arc::new(RwLock::new(HashMap::new())), - blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), }, }; info!("Started store engine"); @@ -268,121 +260,6 @@ impl Store { self.engine.get_transaction_location(transaction_hash) } - /// Add transaction to the pool - pub fn add_transaction_to_pool( - &self, - hash: H256, - transaction: MempoolTransaction, - ) -> Result<(), StoreError> { - let mut mempool = self - .mempool - .write() - .map_err(|error| StoreError::MempoolWriteLock(error.to_string()))?; - mempool.insert(hash, transaction); - - Ok(()) - } - - /// Add a blobs bundle to the pool by its blob transaction hash - pub fn add_blobs_bundle_to_pool( - &self, - tx_hash: H256, - blobs_bundle: BlobsBundle, - ) -> Result<(), StoreError> { - self.blobs_bundle_pool - .lock() - .map_err(|error| StoreError::Custom(error.to_string()))? - .insert(tx_hash, blobs_bundle); - Ok(()) - } - - /// Get a blobs bundle to the pool given its blob transaction hash - pub fn get_blobs_bundle_from_pool( - &self, - tx_hash: H256, - ) -> Result, StoreError> { - Ok(self - .blobs_bundle_pool - .lock() - .map_err(|error| StoreError::Custom(error.to_string()))? - .get(&tx_hash) - .cloned()) - } - - /// Remove a transaction from the pool - pub fn remove_transaction_from_pool(&self, hash: &H256) -> Result<(), StoreError> { - let mut mempool = self - .mempool - .write() - .map_err(|error| StoreError::MempoolWriteLock(error.to_string()))?; - if let Some(tx) = mempool.get(hash) { - if matches!(tx.tx_type(), TxType::EIP4844) { - self.blobs_bundle_pool - .lock() - .map_err(|error| StoreError::Custom(error.to_string()))? - .remove(&tx.compute_hash()); - } - - mempool.remove(hash); - }; - - Ok(()) - } - - pub fn remove_transactions_from_pool(&self, filter: &[Transaction]) -> Result<(), StoreError> { - let mut mempool = self - .mempool - .write() - .map_err(|err| StoreError::MempoolWriteLock(err.to_string()))?; - for tx in filter { - mempool.remove(&tx.compute_hash()); - } - Ok(()) - } - - /// Applies the filter and returns a set of suitable transactions from the mempool. - /// These transactions will be grouped by sender and sorted by nonce - pub fn filter_pool_transactions( - &self, - filter: &dyn Fn(&Transaction) -> bool, - ) -> Result>, StoreError> { - let mut txs_by_sender: HashMap> = HashMap::new(); - let mempool = self - .mempool - .read() - .map_err(|error| StoreError::MempoolReadLock(error.to_string()))?; - - for (_, tx) in mempool.iter() { - if filter(tx) { - txs_by_sender - .entry(tx.sender()) - .or_default() - .push(tx.clone()) - } - } - - txs_by_sender.iter_mut().for_each(|(_, txs)| txs.sort()); - Ok(txs_by_sender) - } - - /// Gets hashes from possible_hashes that are not already known in the mempool. - pub fn filter_unknown_transactions( - &self, - possible_hashes: &[H256], - ) -> Result, StoreError> { - let mempool = self - .mempool - .read() - .map_err(|error| StoreError::MempoolReadLock(error.to_string()))?; - - let tx_set: HashSet<_> = mempool.iter().map(|(hash, _)| hash).collect(); - Ok(possible_hashes - .iter() - .filter(|hash| !tx_set.contains(hash)) - .copied() - .collect()) - } - pub fn add_account_code(&self, code_hash: H256, code: Bytes) -> Result<(), StoreError> { self.engine.add_account_code(code_hash, code) } @@ -598,20 +475,6 @@ impl Store { self.engine.get_transaction_by_hash(transaction_hash) } - pub fn get_transaction_by_hash_from_pool( - &self, - transaction_hash: H256, - ) -> Result, StoreError> { - let tx = self - .mempool - .read() - .map_err(|error| StoreError::MempoolReadLock(error.to_string()))? - .get(&transaction_hash) - .map(|e| e.clone().into()); - - Ok(tx) - } - pub fn get_transaction_by_location( &self, block_hash: BlockHash, @@ -1183,7 +1046,7 @@ mod tests { use bytes::Bytes; use ethereum_types::{H256, U256}; use ethrex_common::{ - types::{Transaction, TxType, BYTES_PER_BLOB, EMPTY_KECCACK_HASH}, + types::{Transaction, TxType, EMPTY_KECCACK_HASH}, Bloom, }; use ethrex_rlp::decode::RLPDecode; @@ -1234,8 +1097,6 @@ mod tests { run_test(&test_store_block_tags, engine_type); run_test(&test_chain_config_storage, engine_type); run_test(&test_genesis_block, engine_type); - run_test(&test_filter_mempool_transactions, engine_type); - run_test(&blobs_bundle_loadtest, engine_type); } fn test_genesis_block(store: Store) { @@ -1482,45 +1343,4 @@ mod tests { ..Default::default() } } - - use hex_literal::hex; - - fn test_filter_mempool_transactions(store: Store) { - let plain_tx_decoded = Transaction::decode_canonical(&hex!("f86d80843baa0c4082f618946177843db3138ae69679a54b95cf345ed759450d870aa87bee538000808360306ba0151ccc02146b9b11adf516e6787b59acae3e76544fdcd75e77e67c6b598ce65da064c5dd5aae2fbb535830ebbdad0234975cd7ece3562013b63ea18cc0df6c97d4")).unwrap(); - let plain_tx_sender = plain_tx_decoded.sender(); - let plain_tx = MempoolTransaction::new(plain_tx_decoded, plain_tx_sender); - let blob_tx_decoded = Transaction::decode_canonical(&hex!("03f88f0780843b9aca008506fc23ac00830186a09400000000000000000000000000000000000001008080c001e1a0010657f37554c781402a22917dee2f75def7ab966d7b770905398eba3c44401401a0840650aa8f74d2b07f40067dc33b715078d73422f01da17abdbd11e02bbdfda9a04b2260f6022bf53eadb337b3e59514936f7317d872defb891a708ee279bdca90")).unwrap(); - let blob_tx_sender = blob_tx_decoded.sender(); - let blob_tx = MempoolTransaction::new(blob_tx_decoded, blob_tx_sender); - let plain_tx_hash = plain_tx.compute_hash(); - let blob_tx_hash = blob_tx.compute_hash(); - let filter = - |tx: &Transaction| -> bool { matches!(tx, Transaction::EIP4844Transaction(_)) }; - store - .add_transaction_to_pool(blob_tx_hash, blob_tx.clone()) - .unwrap(); - store - .add_transaction_to_pool(plain_tx_hash, plain_tx) - .unwrap(); - let txs = store.filter_pool_transactions(&filter).unwrap(); - assert_eq!(txs, HashMap::from([(blob_tx.sender(), vec![blob_tx])])); - } - - fn blobs_bundle_loadtest(store: Store) { - // Write a bundle of 6 blobs 10 times - // If this test fails please adjust the max_size in the DB config - for i in 0..300 { - let blobs = [[i as u8; BYTES_PER_BLOB]; 6]; - let commitments = [[i as u8; 48]; 6]; - let proofs = [[i as u8; 48]; 6]; - let bundle = BlobsBundle { - blobs: blobs.to_vec(), - commitments: commitments.to_vec(), - proofs: proofs.to_vec(), - }; - store - .add_blobs_bundle_to_pool(H256::random(), bundle) - .unwrap(); - } - } }