Skip to content

Commit

Permalink
Merge pull request #22 from contrun/insert-block-with-stored-but-unve…
Browse files Browse the repository at this point in the history
…rified-parent

Add unit test: Insert block with stored but unverified parent
  • Loading branch information
eval-exec authored Jan 25, 2024
2 parents fe9975b + 9811428 commit 8a4b2cd
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 77 deletions.
151 changes: 75 additions & 76 deletions chain/src/consume_orphan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,77 @@ pub(crate) struct ConsumeDescendantProcessor {
pub verify_failed_blocks_tx: tokio::sync::mpsc::UnboundedSender<VerifyFailedBlockInfo>,
}

// Store the an unverified block to the database. We may usually do this
// for an orphan block with unknown parent. But this function is also useful in testing.
pub fn store_unverified_block(shared: &Shared, block: Arc<BlockView>) -> Result<(HeaderView, U256), Error> {
let (block_number, block_hash) = (block.number(), block.hash());

let parent_header = shared
.store()
.get_block_header(&block.data().header().raw().parent_hash())
.expect("parent already store");

if let Some(ext) = shared.store().get_block_ext(&block.hash()) {
debug!("block {}-{} has stored BlockExt", block_number, block_hash);
return Ok((parent_header, ext.total_difficulty));
}

trace!("begin accept block: {}-{}", block.number(), block.hash());

let parent_ext = shared
.store()
.get_block_ext(&block.data().header().raw().parent_hash())
.expect("parent already store");

if parent_ext.verified == Some(false) {
return Err(InvalidParentError {
parent_hash: parent_header.hash(),
}
.into());
}

let cannon_total_difficulty =
parent_ext.total_difficulty.to_owned() + block.header().difficulty();

let db_txn = Arc::new(shared.store().begin_transaction());

let txn_snapshot = db_txn.get_snapshot();
let _snapshot_block_ext = db_txn.get_update_for_block_ext(&block.hash(), &txn_snapshot);

db_txn.insert_block(block.as_ref())?;

let next_block_epoch = shared
.consensus()
.next_epoch_ext(&parent_header, &db_txn.borrow_as_data_loader())
.expect("epoch should be stored");
let new_epoch = next_block_epoch.is_head();
let epoch = next_block_epoch.epoch();

db_txn.insert_block_epoch_index(
&block.header().hash(),
&epoch.last_block_hash_in_previous_epoch(),
)?;
if new_epoch {
db_txn.insert_epoch_ext(&epoch.last_block_hash_in_previous_epoch(), &epoch)?;
}

let ext = BlockExt {
received_at: unix_time_as_millis(),
total_difficulty: cannon_total_difficulty.clone(),
total_uncles_count: parent_ext.total_uncles_count + block.data().uncles().len() as u64,
verified: None,
txs_fees: vec![],
cycles: None,
txs_sizes: None,
};

db_txn.insert_block_ext(&block.header().hash(), &ext)?;

db_txn.commit()?;

Ok((parent_header, cannon_total_difficulty))
}

impl ConsumeDescendantProcessor {
fn send_unverified_block(
&self,
Expand Down Expand Up @@ -80,84 +151,12 @@ impl ConsumeDescendantProcessor {
}
}

fn accept_descendant(&self, block: Arc<BlockView>) -> Result<(HeaderView, U256), Error> {
let (block_number, block_hash) = (block.number(), block.hash());

let parent_header = self
.shared
.store()
.get_block_header(&block.data().header().raw().parent_hash())
.expect("parent already store");

if let Some(ext) = self.shared.store().get_block_ext(&block.hash()) {
debug!("block {}-{} has stored BlockExt", block_number, block_hash);
return Ok((parent_header, ext.total_difficulty));
}

trace!("begin accept block: {}-{}", block.number(), block.hash());

let parent_ext = self
.shared
.store()
.get_block_ext(&block.data().header().raw().parent_hash())
.expect("parent already store");

if parent_ext.verified == Some(false) {
return Err(InvalidParentError {
parent_hash: parent_header.hash(),
}
.into());
}

let cannon_total_difficulty =
parent_ext.total_difficulty.to_owned() + block.header().difficulty();

let db_txn = Arc::new(self.shared.store().begin_transaction());

let txn_snapshot = db_txn.get_snapshot();
let _snapshot_block_ext = db_txn.get_update_for_block_ext(&block.hash(), &txn_snapshot);

db_txn.insert_block(block.as_ref())?;

let next_block_epoch = self
.shared
.consensus()
.next_epoch_ext(&parent_header, &db_txn.borrow_as_data_loader())
.expect("epoch should be stored");
let new_epoch = next_block_epoch.is_head();
let epoch = next_block_epoch.epoch();

db_txn.insert_block_epoch_index(
&block.header().hash(),
&epoch.last_block_hash_in_previous_epoch(),
)?;
if new_epoch {
db_txn.insert_epoch_ext(&epoch.last_block_hash_in_previous_epoch(), &epoch)?;
}

let ext = BlockExt {
received_at: unix_time_as_millis(),
total_difficulty: cannon_total_difficulty.clone(),
total_uncles_count: parent_ext.total_uncles_count + block.data().uncles().len() as u64,
verified: None,
txs_fees: vec![],
cycles: None,
txs_sizes: None,
};

db_txn.insert_block_ext(&block.header().hash(), &ext)?;

db_txn.commit()?;

Ok((parent_header, cannon_total_difficulty))
}

pub(crate) fn process_descendant(&self, lonely_block: LonelyBlockWithCallback) {
match self.accept_descendant(lonely_block.block().to_owned()) {
match store_unverified_block(&self.shared, lonely_block.block().to_owned()) {
Ok((_parent_header, total_difficulty)) => {
self.shared
.insert_block_status(lonely_block.block().hash(), BlockStatus::BLOCK_STORED);
let lonely_block_hash = lonely_block.into();
self.shared.insert_block_status(lonely_block.block().hash(), BlockStatus::BLOCK_STORED);

let lonely_block_hash: LonelyBlockHashWithCallback = lonely_block.into();

self.send_unverified_block(lonely_block_hash, total_difficulty)
}
Expand Down
1 change: 1 addition & 0 deletions chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod utils;

pub use chain_controller::ChainController;
pub use chain_service::start_chain_services;
pub use consume_orphan::store_unverified_block;

type ProcessBlockRequest = Request<LonelyBlockWithCallback, ()>;
type TruncateRequest = Request<Byte32, Result<(), Error>>;
Expand Down
72 changes: 71 additions & 1 deletion sync/src/tests/sync_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::tests::util::{build_chain, inherit_block};
use crate::SyncShared;
use ckb_chain::start_chain_services;
use ckb_chain::{start_chain_services, store_unverified_block};
use ckb_logger::info;
use ckb_logger_service::LoggerInitGuard;
use ckb_shared::block_status::BlockStatus;
Expand All @@ -16,6 +16,22 @@ use ckb_types::prelude::*;
use std::fmt::format;
use std::sync::Arc;

fn wait_for_expected_block_status(
shared: &SyncShared,
hash: &Byte32,
expect_status: BlockStatus,
) -> bool {
let now = std::time::Instant::now();
while now.elapsed().as_secs() < 2 {
let current_status = shared.active_chain().get_block_status(hash);
if current_status == expect_status {
return true;
}
std::thread::sleep(std::time::Duration::from_micros(100));
}
return false;
}

#[test]
fn test_insert_new_block() {
let (shared, chain) = build_chain(2);
Expand Down Expand Up @@ -143,6 +159,60 @@ fn test_insert_parent_unknown_block() {
));
}

#[test]
fn test_insert_child_block_with_stored_but_unverified_parent() {
let (shared1, _) = build_chain(2);
let (shared, chain) = {
let (shared, mut pack) = SharedBuilder::with_temp_db()
.consensus(shared1.consensus().clone())
.build()
.unwrap();
let chain_controller = start_chain_services(pack.take_chain_services_builder());
(
SyncShared::new(shared, Default::default(), pack.take_relay_tx_receiver()),
chain_controller,
)
};

let block = shared1
.store()
.get_block(&shared1.active_chain().tip_header().hash())
.unwrap();
let parent = {
let parent = shared1
.store()
.get_block(&block.header().parent_hash())
.unwrap();
Arc::new(parent)
};
let parent_hash = parent.header().hash();
let child = Arc::new(block);
let child_hash = child.header().hash();

store_unverified_block(shared.shared(), Arc::clone(&parent)).expect("store parent block");

// Note that we will not find the block status obtained from
// shared.active_chain().get_block_status(&parent_hash) to be BLOCK_STORED,
// because `get_block_status` does not read the block status from the database,
// it use snapshot to get the block status, and the snapshot is not updated.
assert!(shared.store().get_block_ext(&parent_hash).is_some(), "parent block should be stored");

assert!(shared
.blocking_insert_new_block(&chain, Arc::clone(&child))
.expect("insert child block"));

assert!(wait_for_expected_block_status(
&shared,
&child_hash,
BlockStatus::BLOCK_VALID
));
assert!(wait_for_expected_block_status(
&shared,
&parent_hash,
BlockStatus::BLOCK_VALID
));
}

#[test]
fn test_switch_valid_fork() {
let _log_guard: LoggerInitGuard =
Expand Down

0 comments on commit 8a4b2cd

Please sign in to comment.