Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BlockchainEvent::Stored(Block) for forked and inferior chain blocks #2153

Merged
merged 1 commit into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions blockchain-interface/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub enum BlockchainEvent {
Extended(Blake2bHash),
HistoryAdopted(Blake2bHash),
Rebranched(Vec<(Blake2bHash, Block)>, Vec<(Blake2bHash, Block)>),
/// Given Block was stored in the chain store but was not adopted as new head block.
/// I.e. forked blocks and inferior chain blocks.
Stored(Block),
Finalized(Blake2bHash),
EpochFinalized(Blake2bHash),
}
Expand Down
6 changes: 6 additions & 0 deletions blockchain/src/blockchain/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ impl Blockchain {
}
txn.commit();

// Fork and inferior chain block fire a Stored Event.
// They can never fire a Finalized or EpochFinalized as then they would not be inferior/forked.
this.notifier
.send(BlockchainEvent::Stored(chain_info.head))
.ok();

Ok((result, Ok(ChunksPushResult::EmptyChunks)))
}

Expand Down
24 changes: 17 additions & 7 deletions blockchain/src/blockchain/zkp_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,13 @@ impl Blockchain {
);

// We shouldn't log errors if there are no listeners.
_ = this
.notifier
.send(BlockchainEvent::EpochFinalized(block_hash_blake2b));
this.notifier
.send(BlockchainEvent::Extended(block_hash_blake2b.clone()))
.ok();

this.notifier
.send(BlockchainEvent::EpochFinalized(block_hash_blake2b))
.ok();

// We don't have any block logs, so we do not notify the block log stream.

Expand Down Expand Up @@ -336,12 +340,18 @@ impl Blockchain {
);

// We shouldn't log errors if there are no listeners.
this.notifier
.send(BlockchainEvent::Extended(block_hash.clone()))
.ok();

if is_election_block {
_ = this
.notifier
.send(BlockchainEvent::EpochFinalized(block_hash));
this.notifier
.send(BlockchainEvent::EpochFinalized(block_hash))
.ok();
} else {
_ = this.notifier.send(BlockchainEvent::Finalized(block_hash));
this.notifier
.send(BlockchainEvent::Finalized(block_hash))
.ok();
}

// We don't have any block logs, so we do not notify the block log stream.
Expand Down
7 changes: 6 additions & 1 deletion consensus/src/consensus/remote_event_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,17 @@ impl<N: Network> Future for RemoteEventDispatcher<N> {
new_blocks.push(block);
}
BlockchainEvent::Rebranched(_reverted_blocks, adopted_blocks) => {
// We dont't notify about reverted block, only adopted blocks
// We don't notify about reverted block, only adopted blocks
new_blocks.extend(adopted_blocks.into_iter().map(|(_, block)| block));
}
BlockchainEvent::HistoryAdopted(_) => {
// In the future we might be interested in other events
}
BlockchainEvent::Stored(_block) => {
// Stored events are not reported as they are not on the main chain.
// If they ever become main chain blocks, they will be reported then with the respective
// BlockchainEvent::Rebranched(..)
}
}
// This hash map is used to collect all the notifications for a given peer.
let mut peer_receipts: HashMap<N::PeerId, Vec<(Blake2bHash, u32)>> = HashMap::new();
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/sync/live/block_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,9 @@ impl<N: Network> BlockQueue<N> {
block_infos.push((block.block_number(), block_hash));
}
}
BlockchainEvent::Stored(block) => {
block_infos.push((block.block_number(), block.hash()));
}
}
block_infos
}
Expand Down
7 changes: 6 additions & 1 deletion consensus/src/sync/live/state_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,12 @@ impl<N: Network> Stream for StateQueue<N> {
self.diff_queue.set_diff_needed(true);
}
}
_ => {}
BlockchainEvent::HistoryAdopted(_) => {
// Nothing to do for adopted history
}
BlockchainEvent::Stored(_block) => {
// Block has not been applied so nothing to do here.
}
}
}

Expand Down
46 changes: 35 additions & 11 deletions light-blockchain/src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use nimiq_blockchain_interface::{
use nimiq_hash::{Blake2bHash, Hash};
use nimiq_keys::Address;
use nimiq_primitives::policy::Policy;
use parking_lot::RwLockUpgradableReadGuard;
use parking_lot::{RwLockUpgradableReadGuard, RwLockWriteGuard};

use crate::blockchain::LightBlockchain;

Expand Down Expand Up @@ -137,9 +137,19 @@ impl LightBlockchain {
PushResult::Forked
}
};

// Upgrade for the store
let mut this = RwLockUpgradableReadGuard::upgrade(this);

// Otherwise, we are creating/extending a fork. Store ChainInfo.
this.chain_store.put_chain_info(chain_info);
this.chain_store.put_chain_info(chain_info.clone());

// Downgrade asap
let this = RwLockWriteGuard::downgrade_to_upgradable(this);

this.notifier
.send(BlockchainEvent::Stored(chain_info.head))
.ok();

Ok(result)
}
Expand Down Expand Up @@ -197,14 +207,18 @@ impl LightBlockchain {
);

// We shouldn't log errors if there are no listeners.
this.notifier
.send(BlockchainEvent::Extended(block_hash.clone()))
.ok();

if is_election_block {
_ = this
.notifier
.send(BlockchainEvent::EpochFinalized(block_hash));
this.notifier
.send(BlockchainEvent::EpochFinalized(block_hash))
.ok();
} else if is_macro_block {
_ = this.notifier.send(BlockchainEvent::Finalized(block_hash));
} else {
_ = this.notifier.send(BlockchainEvent::Extended(block_hash));
this.notifier
.send(BlockchainEvent::Finalized(block_hash))
.ok();
}

Ok(PushResult::Extended)
Expand Down Expand Up @@ -347,9 +361,19 @@ impl LightBlockchain {
);

// We do not log errors if there are no listeners
_ = this
.notifier
.send(BlockchainEvent::Rebranched(reverted_blocks, adopted_blocks));
this.notifier
.send(BlockchainEvent::Rebranched(reverted_blocks, adopted_blocks))
.ok();

if this.head.is_election() {
this.notifier
.send(BlockchainEvent::EpochFinalized(this.head_hash()))
.ok();
} else if this.head.is_macro() {
this.notifier
.send(BlockchainEvent::Finalized(this.head_hash()))
.ok();
}

Ok(PushResult::Rebranched)
}
Expand Down
24 changes: 17 additions & 7 deletions light-blockchain/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,13 @@ impl LightBlockchain {
this.current_validators = block.validators();

// We shouldn't log errors if there are no listeners.
_ = this
.notifier
.send(BlockchainEvent::EpochFinalized(block_hash_blake2b));
this.notifier
.send(BlockchainEvent::Extended(block_hash_blake2b.clone()))
.ok();

this.notifier
.send(BlockchainEvent::EpochFinalized(block_hash_blake2b))
.ok();

Ok(PushResult::Extended)
}
Expand Down Expand Up @@ -153,6 +157,10 @@ impl LightBlockchain {

this.macro_head = block.clone().unwrap_macro();

this.notifier
.send(BlockchainEvent::Extended(block_hash.clone()))
.ok();

// If it's an election block, you have more steps.
if block.is_election() {
this.election_head = block.unwrap_macro_ref().clone();
Expand All @@ -163,12 +171,14 @@ impl LightBlockchain {
this.chain_store.put_election(block.unwrap_macro().header);

// We shouldn't log errors if there are no listeners.
_ = this
.notifier
.send(BlockchainEvent::EpochFinalized(block_hash));
this.notifier
.send(BlockchainEvent::EpochFinalized(block_hash))
.ok();
} else {
// We shouldn't log errors if there are no listeners.
_ = this.notifier.send(BlockchainEvent::Finalized(block_hash));
this.notifier
.send(BlockchainEvent::Finalized(block_hash))
.ok();
}

Ok(PushResult::Extended)
Expand Down
1 change: 1 addition & 0 deletions rpc-server/src/dispatchers/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ impl BlockchainInterface for BlockchainDispatcher {
BlockchainEvent::Rebranched(_, new_branch) => {
Some(new_branch.into_iter().last().unwrap().0.into())
}
BlockchainEvent::Stored(_block) => None,
};
future::ready(result)
})
Expand Down
6 changes: 6 additions & 0 deletions validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,17 +454,23 @@ where
BlockchainEvent::Extended(ref hash) => self.on_blockchain_extended(hash),
BlockchainEvent::HistoryAdopted(ref hash) => self.on_blockchain_history_adopted(hash),
BlockchainEvent::Finalized(ref hash) => {
// The on_blockchain_extended is necessary for the order of events to not matter.
self.on_blockchain_extended(hash);
self.update_consensus_state(Some(hash));
}
BlockchainEvent::EpochFinalized(ref hash) => {
self.init_epoch();
// The on_blockchain_extended is necessary for the order of events to not matter.
self.on_blockchain_extended(hash);
self.update_consensus_state(Some(hash));
}
BlockchainEvent::Rebranched(ref old_chain, ref new_chain) => {
self.on_blockchain_rebranched(old_chain, new_chain)
}
BlockchainEvent::Stored(ref _block) => {
// Nothing to do here for now. Forks are already reported on `fork_event_rx`
// and inferior chain blocks are irrelevant here.
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions web-client/src/client/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,9 @@ impl Client {
adopted_blocks,
)
}
Some(BlockchainEvent::Stored(block)) => {
(block.hash(), "stored", Array::new(), Array::new())
}
None => {
break;
}
Expand Down
Loading