Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(l1): receipts invalid size libmdbx #2065

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
10 changes: 10 additions & 0 deletions crates/storage/store/rlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ impl<T: RLPDecode> Rlp<T> {
}
}

impl<T> Rlp<T> {
pub fn from_bytes(bytes: Vec<u8>) -> Self {
Self(bytes, Default::default())
}

pub fn bytes(&self) -> &Vec<u8> {
&self.0
}
}

#[cfg(feature = "libmdbx")]
impl<T: Send + Sync> Decodable for Rlp<T> {
fn decode(b: &[u8]) -> anyhow::Result<Self> {
Expand Down
253 changes: 227 additions & 26 deletions crates/storage/store/store_db/libmdbx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::api::StoreEngine;
use crate::error::StoreError;
use crate::rlp::{
AccountCodeHashRLP, AccountCodeRLP, AccountHashRLP, AccountStateRLP, BlockBodyRLP,
BlockHashRLP, BlockHeaderRLP, BlockRLP, BlockTotalDifficultyRLP, ReceiptRLP, Rlp,
TransactionHashRLP, TupleRLP,
BlockHashRLP, BlockHeaderRLP, BlockRLP, BlockTotalDifficultyRLP, Rlp, TransactionHashRLP,
TupleRLP,
};
use crate::store::{MAX_SNAPSHOT_READS, STATE_TRIE_SEGMENTS};
use crate::trie_db::libmdbx::LibmdbxTrieDB;
Expand All @@ -20,13 +20,13 @@ use ethrex_rlp::decode::RLPDecode;
use ethrex_rlp::encode::RLPEncode;
use ethrex_rlp::error::RLPDecodeError;
use ethrex_trie::{Nibbles, Trie};
use libmdbx::orm::{Decodable, Encodable, Table};
use libmdbx::orm::{Decodable, DupSort, Encodable, Table};
use libmdbx::{
dupsort,
orm::{table, Database},
table_info,
};
use libmdbx::{DatabaseOptions, Mode, ReadWriteOptions};
use libmdbx::{DatabaseOptions, Mode, PageSize, ReadWriteOptions, TransactionKind};
use serde_json;
use std::fmt::{Debug, Formatter};
use std::path::Path;
Expand Down Expand Up @@ -63,11 +63,12 @@ impl Store {
.begin_readwrite()
.map_err(StoreError::LibmdbxError)?;

let mut cursor = txn.cursor::<T>().map_err(StoreError::LibmdbxError)?;
for (key, value) in key_values {
txn.upsert::<T>(key, value)
cursor
.upsert(key, value)
.map_err(StoreError::LibmdbxError)?;
}

txn.commit().map_err(StoreError::LibmdbxError)
}

Expand Down Expand Up @@ -189,7 +190,9 @@ impl StoreEngine for Store {
index: Index,
receipt: Receipt,
) -> Result<(), StoreError> {
self.write::<Receipts>((block_hash, index).into(), receipt.into())
let key: Rlp<(BlockHash, Index)> = (block_hash, index).into();
let entries = IndexedChunk::from::<Receipts>(key, &receipt.encode_to_vec());
self.write_batch::<Receipts>(entries.into_iter())
}

fn get_receipt(
Expand All @@ -198,7 +201,10 @@ impl StoreEngine for Store {
index: Index,
) -> Result<Option<Receipt>, StoreError> {
if let Some(hash) = self.get_block_hash_by_block_number(block_number)? {
Ok(self.read::<Receipts>((hash, index).into())?.map(|b| b.to()))
let txn = self.db.begin_read().map_err(StoreError::LibmdbxError)?;
let mut cursor = txn.cursor::<Receipts>().map_err(StoreError::LibmdbxError)?;
let key = (hash, index).into();
IndexedChunk::read_from_db(&mut cursor, key)
} else {
Ok(None)
}
Expand Down Expand Up @@ -485,20 +491,22 @@ impl StoreEngine for Store {
block_hash: BlockHash,
receipts: Vec<Receipt>,
) -> Result<(), StoreError> {
let key_values = receipts.into_iter().enumerate().map(|(index, receipt)| {
(
<(H256, u64) as Into<TupleRLP<BlockHash, Index>>>::into((block_hash, index as u64)),
<Receipt as Into<ReceiptRLP>>::into(receipt),
)
});
let mut key_values = vec![];

for (index, receipt) in receipts.clone().into_iter().enumerate() {
let key = (block_hash, index as u64).into();
let receipt_rlp = receipt.encode_to_vec();
let mut entries = IndexedChunk::from::<Receipts>(key, &receipt_rlp);
key_values.append(&mut entries);
}

self.write_batch::<Receipts>(key_values)
self.write_batch::<Receipts>(key_values.into_iter())
}

fn get_receipts_for_block(&self, block_hash: &BlockHash) -> Result<Vec<Receipt>, StoreError> {
let mut receipts = vec![];
let mut receipt_index = 0;
let mut key: TupleRLP<BlockHash, Index> = (*block_hash, 0).into();
let mut key = (*block_hash, 0).into();
let txn = self.db.begin_read().map_err(|_| StoreError::ReadError)?;
let mut cursor = txn
.cursor::<Receipts>()
Expand All @@ -509,15 +517,13 @@ impl StoreEngine for Store {
// So we search for values in the db that match with this kind
// of key, until we reach an Index that returns None
// and we stop the search.
while let Some((_, encoded_receipt)) =
cursor.seek_exact(key).map_err(|_| StoreError::ReadError)?
{
receipts.push(encoded_receipt);
while let Some(receipt) = IndexedChunk::read_from_db(&mut cursor, key)? {
receipts.push(receipt);
receipt_index += 1;
key = (*block_hash, receipt_index).into();
}

Ok(receipts.into_iter().map(|receipt| receipt.to()).collect())
Ok(receipts)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be a risk of infinite looping here. If your block uses 256 chunks then after chunk with index 255 the addition overflows, and you'll ask for chunk with index 0 of the same hash, and thus start the whole cycle again forever.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should store the size somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I considered that as well. The size is defined as a u8, meaning we can have up to 256 chunks. However, if each value is 2022 bytes, we'd need to store around 517,632 bytes (~0.5 MB) to reach this limit. Tbh, I am not sure about the log size in ethereum and whether a receipt can take that amount of space.

For now, as we discussed offline, we'll keep it as is and address any limitations if they arise. I'll also update the code to explicitly reflect this limit and leave a clear warning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solved in 18b2941.


fn set_header_download_checkpoint(&self, block_hash: BlockHash) -> Result<(), StoreError> {
Expand Down Expand Up @@ -733,6 +739,105 @@ impl Debug for Store {

// Define tables

/// For `dupsort` tables, multiple values can be stored under the same key.
/// To maintain an explicit order, each value is assigned an `index`.
/// This is useful when storing large byte sequences that exceed the maximum size limit,
/// requiring them to be split into smaller chunks for storage.
pub struct IndexedChunk<T: RLPEncode + RLPDecode> {
index: u8,
value: Rlp<T>,
}

pub trait ChunkTrait<T: RLPEncode + RLPDecode> {
#[allow(unused)]
fn index(&self) -> u8;
fn value_bytes(&self) -> &Vec<u8>;
}

impl<T: RLPEncode + RLPDecode> ChunkTrait<T> for IndexedChunk<T> {
fn index(&self) -> u8 {
self.index
}

fn value_bytes(&self) -> &Vec<u8> {
self.value.bytes()
}
}

impl<T: Send + Sync + RLPEncode + RLPDecode> Decodable for IndexedChunk<T> {
fn decode(b: &[u8]) -> anyhow::Result<Self> {
let index = b[0];
let value = Rlp::from_bytes(b[1..].to_vec());
Ok(Self { index, value })
}
}

impl<T: Send + Sync + RLPEncode + RLPDecode> Encodable for IndexedChunk<T> {
type Encoded = Vec<u8>;

fn encode(self) -> Self::Encoded {
// by appending the index at the begging, we enforce the btree ordering from lowest to highest
let mut buf = vec![self.index];
buf.extend_from_slice(self.value.bytes());
buf
}
}

impl<T: RLPEncode + RLPDecode> IndexedChunk<T> {
/// Splits a value into a indexed chunks if it exceeds the maximum storage size.
/// Each chunk is assigned an index to ensure correct ordering when retrieved.
pub fn from<Tab: Table>(key: Tab::Key, bytes: &[u8]) -> Vec<(Tab::Key, Self)>
where
Tab::Key: Clone,
{
let chunks: Vec<Vec<u8>> = bytes
// -1 to account for the index byte
.chunks(DB_MAX_VALUE_SIZE - 1)
.map(|i| i.to_vec())
.collect();
chunks
.into_iter()
.enumerate()
.map(|(index, chunk)| {
(
key.clone(),
IndexedChunk {
index: index as u8,
value: Rlp::from_bytes(chunk),
},
)
})
.collect()
}

/// Reads multiple stored chunks and reconstructs the original full value.
/// The chunks are appended in order based on their assigned index.
pub fn read_from_db<Tab: Table + DupSort, K: TransactionKind>(
cursor: &mut libmdbx::orm::Cursor<'_, K, Tab>,
key: Tab::Key,
) -> Result<Option<T>, StoreError>
where
Tab::Key: Decodable,
Tab::Value: ChunkTrait<T>,
{
let mut value = vec![];

if let Some((_, chunk)) = cursor.seek_exact(key).map_err(StoreError::LibmdbxError)? {
value.extend_from_slice(chunk.value_bytes());
} else {
return Ok(None);
}

// Fetch remaining parts
while let Some((_, chunk)) = cursor.next_value().map_err(StoreError::LibmdbxError)? {
value.extend_from_slice(chunk.value_bytes());
}

let decoded = T::decode(&value).map_err(StoreError::RLPDecode)?;
Ok(Some(decoded))
}
}

table!(
/// The canonical block hash for each block number. It represents the canonical chain.
( CanonicalBlockHashes ) BlockNumber => BlockHashRLP
Expand Down Expand Up @@ -764,7 +869,7 @@ table!(

dupsort!(
/// Receipts table.
( Receipts ) TupleRLP<BlockHash, Index>[Index] => ReceiptRLP
( Receipts ) TupleRLP<BlockHash, Index>[Index] => IndexedChunk<Receipt>
);

dupsort!(
Expand Down Expand Up @@ -890,6 +995,15 @@ impl Encodable for SnapStateIndex {
(self as u32).encode()
}
}

/// default page size recommended by libmdbx
///
/// - See here: https://github.com/erthink/libmdbx/tree/master?tab=readme-ov-file#limitations
/// - and here: https://libmdbx.dqdkfa.ru/structmdbx_1_1env_1_1geometry.html#a45048bf2de9120d01dae2151c060d459
const DB_PAGE_SIZE: usize = 4096;
/// For a default page size of 4096, the max value size is roughly 1/2 page size.
const DB_MAX_VALUE_SIZE: usize = 2022;

/// Initializes a new database with the provided path. If the path is `None`, the database
/// will be temporary.
pub fn init_db(path: Option<impl AsRef<Path>>) -> Database {
Expand All @@ -916,6 +1030,7 @@ pub fn init_db(path: Option<impl AsRef<Path>>) -> Database {
.collect();
let path = path.map(|p| p.as_ref().to_path_buf());
let options = DatabaseOptions {
page_size: Some(PageSize::Set(DB_PAGE_SIZE)),
mode: Mode::ReadWrite(ReadWriteOptions {
// Set max DB size to 1TB
max_size: Some(1024_isize.pow(4)),
Expand All @@ -928,10 +1043,12 @@ pub fn init_db(path: Option<impl AsRef<Path>>) -> Database {

#[cfg(test)]
mod tests {
use libmdbx::{
dupsort,
orm::{table, Database, Decodable, Encodable},
table_info,
use super::*;
use crate::rlp::TupleRLP;
use bytes::Bytes;
use ethrex_common::{
types::{BlockHash, Index, Log, TxType},
Address, Bloom, H256,
};

#[test]
Expand Down Expand Up @@ -1119,4 +1236,88 @@ mod tests {
assert_eq!(acc, 58);
}
}

// Test IndexedChunks implementation with receipts as the type
#[test]
fn mdbx_indexed_chunks_test() {
dupsort!(
/// Receipts table.
( Receipts ) TupleRLP<BlockHash, Index>[Index] => IndexedChunk<Receipt>
);

let tables = [table_info!(Receipts)].into_iter().collect();
let options = DatabaseOptions {
page_size: Some(PageSize::Set(DB_PAGE_SIZE)),
mode: Mode::ReadWrite(ReadWriteOptions {
max_size: Some(1024_isize.pow(4)),
..Default::default()
}),
..Default::default()
};
let db = Database::create_with_options(None, options, &tables).unwrap();

let mut receipts = vec![];
for i in 0..10 {
receipts.push(generate_big_receipt(100 * (i + 1), 10, 10 * (i + 1)));
}

// encode receipts
let block_hash = H256::random();
let mut key_values = vec![];
for (i, receipt) in receipts.iter().enumerate() {
let key = (block_hash, i as u64).into();
let receipt_rlp = receipt.encode_to_vec();
let mut entries = IndexedChunk::from::<Receipts>(key, &receipt_rlp);
key_values.append(&mut entries);
}

// store values
let txn = db.begin_readwrite().unwrap();
let mut cursor = txn.cursor::<Receipts>().unwrap();
for (key, value) in key_values {
cursor.upsert(key, value).unwrap()
}
txn.commit().unwrap();

// now retrieve the values and assert they are the same
let mut stored_receipts = vec![];
let mut receipt_index = 0;
let mut key: TupleRLP<BlockHash, Index> = (block_hash, 0).into();
let txn = db.begin_read().unwrap();
let mut cursor = txn.cursor::<Receipts>().unwrap();
while let Some(receipt) = IndexedChunk::read_from_db(&mut cursor, key).unwrap() {
stored_receipts.push(receipt);
receipt_index += 1;
key = (block_hash, receipt_index).into();
}

assert_eq!(receipts, stored_receipts);
}

fn generate_big_receipt(
data_size_in_bytes: usize,
logs_size: usize,
topics_size: usize,
) -> Receipt {
let large_data: Bytes = Bytes::from(vec![1u8; data_size_in_bytes]);
let large_topics: Vec<H256> = std::iter::repeat(H256::random())
.take(topics_size)
.collect();

let logs = std::iter::repeat(Log {
address: Address::random(),
topics: large_topics.clone(),
data: large_data.clone(),
})
.take(logs_size)
.collect();

Receipt {
tx_type: TxType::EIP7702,
succeeded: true,
cumulative_gas_used: u64::MAX,
bloom: Bloom::default(),
logs,
}
}
}