Skip to content

Commit

Permalink
Add BlockchainEvent::Stored(Block)
Browse files Browse the repository at this point in the history
Also streamline the event firings in general. Any `Finalized` and `EpochFinalized` event does *not* supersede an `Extended` event. After this commit these will be emitted consistently.

Note that the validator still must `on_blockchain_extended` for `EpochFinalized` as well as `Finalized` as the ordering of emitted events would otherwise matter since the validator set needs adjustment before the block producer is initialized.
  • Loading branch information
nibhar authored and hrxi committed Jan 26, 2024
1 parent 49cb6a9 commit e276ede
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 27 deletions.
3 changes: 3 additions & 0 deletions blockchain-interface/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub enum BlockchainEvent {
Extended(Blake2bHash),
HistoryAdopted(Blake2bHash),
Rebranched(Vec<(Blake2bHash, Block)>, Vec<(Blake2bHash, Block)>),
/// Given Block was stored in the chain store but was not adopted as new head block.
/// I.e. forked blocks and inferior chain blocks.
Stored(Block),
Finalized(Blake2bHash),
EpochFinalized(Blake2bHash),
}
Expand Down
6 changes: 6 additions & 0 deletions blockchain/src/blockchain/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ impl Blockchain {
}
txn.commit();

// Fork and inferior chain block fire a Stored Event.
// They can never fire a Finalized or EpochFinalized as then they would not be inferior/forked.
this.notifier
.send(BlockchainEvent::Stored(chain_info.head))
.ok();

Ok((result, Ok(ChunksPushResult::EmptyChunks)))
}

Expand Down
24 changes: 17 additions & 7 deletions blockchain/src/blockchain/zkp_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,13 @@ impl Blockchain {
);

// We shouldn't log errors if there are no listeners.
_ = this
.notifier
.send(BlockchainEvent::EpochFinalized(block_hash_blake2b));
this.notifier
.send(BlockchainEvent::Extended(block_hash_blake2b.clone()))
.ok();

this.notifier
.send(BlockchainEvent::EpochFinalized(block_hash_blake2b))
.ok();

// We don't have any block logs, so we do not notify the block log stream.

Expand Down Expand Up @@ -336,12 +340,18 @@ impl Blockchain {
);

// We shouldn't log errors if there are no listeners.
this.notifier
.send(BlockchainEvent::Extended(block_hash.clone()))
.ok();

if is_election_block {
_ = this
.notifier
.send(BlockchainEvent::EpochFinalized(block_hash));
this.notifier
.send(BlockchainEvent::EpochFinalized(block_hash))
.ok();
} else {
_ = this.notifier.send(BlockchainEvent::Finalized(block_hash));
this.notifier
.send(BlockchainEvent::Finalized(block_hash))
.ok();
}

// We don't have any block logs, so we do not notify the block log stream.
Expand Down
7 changes: 6 additions & 1 deletion consensus/src/consensus/remote_event_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,17 @@ impl<N: Network> Future for RemoteEventDispatcher<N> {
new_blocks.push(block);
}
BlockchainEvent::Rebranched(_reverted_blocks, adopted_blocks) => {
// We dont't notify about reverted block, only adopted blocks
// We don't notify about reverted block, only adopted blocks
new_blocks.extend(adopted_blocks.into_iter().map(|(_, block)| block));
}
BlockchainEvent::HistoryAdopted(_) => {
// In the future we might be interested in other events
}
BlockchainEvent::Stored(_block) => {
// Stored events are not reported as they are not on the main chain.
// If they ever become main chain blocks, they will be reported then with the respective
// BlockchainEvent::Rebranched(..)
}
}
// This hash map is used to collect all the notifications for a given peer.
let mut peer_receipts: HashMap<N::PeerId, Vec<(Blake2bHash, u32)>> = HashMap::new();
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/sync/live/block_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,9 @@ impl<N: Network> BlockQueue<N> {
block_infos.push((block.block_number(), block_hash));
}
}
BlockchainEvent::Stored(block) => {
block_infos.push((block.block_number(), block.hash()));
}
}
block_infos
}
Expand Down
7 changes: 6 additions & 1 deletion consensus/src/sync/live/state_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,12 @@ impl<N: Network> Stream for StateQueue<N> {
self.diff_queue.set_diff_needed(true);
}
}
_ => {}
BlockchainEvent::HistoryAdopted(_) => {
// Nothing to do for adopted history
}
BlockchainEvent::Stored(_block) => {
// Block has not been applied so nothing to do here.
}
}
}

Expand Down
46 changes: 35 additions & 11 deletions light-blockchain/src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use nimiq_blockchain_interface::{
use nimiq_hash::{Blake2bHash, Hash};
use nimiq_keys::Address;
use nimiq_primitives::policy::Policy;
use parking_lot::RwLockUpgradableReadGuard;
use parking_lot::{RwLockUpgradableReadGuard, RwLockWriteGuard};

use crate::blockchain::LightBlockchain;

Expand Down Expand Up @@ -137,9 +137,19 @@ impl LightBlockchain {
PushResult::Forked
}
};

// Upgrade for the store
let mut this = RwLockUpgradableReadGuard::upgrade(this);

// Otherwise, we are creating/extending a fork. Store ChainInfo.
this.chain_store.put_chain_info(chain_info);
this.chain_store.put_chain_info(chain_info.clone());

// Downgrade asap
let this = RwLockWriteGuard::downgrade_to_upgradable(this);

this.notifier
.send(BlockchainEvent::Stored(chain_info.head))
.ok();

Ok(result)
}
Expand Down Expand Up @@ -197,14 +207,18 @@ impl LightBlockchain {
);

// We shouldn't log errors if there are no listeners.
this.notifier
.send(BlockchainEvent::Extended(block_hash.clone()))
.ok();

if is_election_block {
_ = this
.notifier
.send(BlockchainEvent::EpochFinalized(block_hash));
this.notifier
.send(BlockchainEvent::EpochFinalized(block_hash))
.ok();
} else if is_macro_block {
_ = this.notifier.send(BlockchainEvent::Finalized(block_hash));
} else {
_ = this.notifier.send(BlockchainEvent::Extended(block_hash));
this.notifier
.send(BlockchainEvent::Finalized(block_hash))
.ok();
}

Ok(PushResult::Extended)
Expand Down Expand Up @@ -347,9 +361,19 @@ impl LightBlockchain {
);

// We do not log errors if there are no listeners
_ = this
.notifier
.send(BlockchainEvent::Rebranched(reverted_blocks, adopted_blocks));
this.notifier
.send(BlockchainEvent::Rebranched(reverted_blocks, adopted_blocks))
.ok();

if this.head.is_election() {
this.notifier
.send(BlockchainEvent::EpochFinalized(this.head_hash()))
.ok();
} else if this.head.is_macro() {
this.notifier
.send(BlockchainEvent::Finalized(this.head_hash()))
.ok();
}

Ok(PushResult::Rebranched)
}
Expand Down
24 changes: 17 additions & 7 deletions light-blockchain/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,13 @@ impl LightBlockchain {
this.current_validators = block.validators();

// We shouldn't log errors if there are no listeners.
_ = this
.notifier
.send(BlockchainEvent::EpochFinalized(block_hash_blake2b));
this.notifier
.send(BlockchainEvent::Extended(block_hash_blake2b.clone()))
.ok();

this.notifier
.send(BlockchainEvent::EpochFinalized(block_hash_blake2b))
.ok();

Ok(PushResult::Extended)
}
Expand Down Expand Up @@ -153,6 +157,10 @@ impl LightBlockchain {

this.macro_head = block.clone().unwrap_macro();

this.notifier
.send(BlockchainEvent::Extended(block_hash.clone()))
.ok();

// If it's an election block, you have more steps.
if block.is_election() {
this.election_head = block.unwrap_macro_ref().clone();
Expand All @@ -163,12 +171,14 @@ impl LightBlockchain {
this.chain_store.put_election(block.unwrap_macro().header);

// We shouldn't log errors if there are no listeners.
_ = this
.notifier
.send(BlockchainEvent::EpochFinalized(block_hash));
this.notifier
.send(BlockchainEvent::EpochFinalized(block_hash))
.ok();
} else {
// We shouldn't log errors if there are no listeners.
_ = this.notifier.send(BlockchainEvent::Finalized(block_hash));
this.notifier
.send(BlockchainEvent::Finalized(block_hash))
.ok();
}

Ok(PushResult::Extended)
Expand Down
1 change: 1 addition & 0 deletions rpc-server/src/dispatchers/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ impl BlockchainInterface for BlockchainDispatcher {
BlockchainEvent::Rebranched(_, new_branch) => {
Some(new_branch.into_iter().last().unwrap().0.into())
}
BlockchainEvent::Stored(_block) => None,
};
future::ready(result)
})
Expand Down
6 changes: 6 additions & 0 deletions validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,17 +455,23 @@ where
BlockchainEvent::Extended(ref hash) => self.on_blockchain_extended(hash),
BlockchainEvent::HistoryAdopted(ref hash) => self.on_blockchain_history_adopted(hash),
BlockchainEvent::Finalized(ref hash) => {
// The on_blockchain_extended is necessary for the order of events to not matter.
self.on_blockchain_extended(hash);
self.update_consensus_state(Some(hash));
}
BlockchainEvent::EpochFinalized(ref hash) => {
self.init_epoch();
// The on_blockchain_extended is necessary for the order of events to not matter.
self.on_blockchain_extended(hash);
self.update_consensus_state(Some(hash));
}
BlockchainEvent::Rebranched(ref old_chain, ref new_chain) => {
self.on_blockchain_rebranched(old_chain, new_chain)
}
BlockchainEvent::Stored(ref _block) => {
// Nothing to do here for now. Forks are already reported on `fork_event_rx`
// and inferior chain blocks are irrelevant here.
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions web-client/src/client/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,9 @@ impl Client {
adopted_blocks,
)
}
Some(BlockchainEvent::Stored(block)) => {
(block.hash(), "stored", Array::new(), Array::new())
}
None => {
break;
}
Expand Down

0 comments on commit e276ede

Please sign in to comment.