diff --git a/blockchain-interface/src/error.rs b/blockchain-interface/src/error.rs index 16c7fcd617..3c3f1e0cc2 100644 --- a/blockchain-interface/src/error.rs +++ b/blockchain-interface/src/error.rs @@ -15,6 +15,9 @@ pub enum BlockchainEvent { Extended(Blake2bHash), HistoryAdopted(Blake2bHash), Rebranched(Vec<(Blake2bHash, Block)>, Vec<(Blake2bHash, Block)>), + /// Given Block was stored in the chain store but was not adopted as new head block. + /// I.e. forked blocks and inferior chain blocks. + Stored(Block), Finalized(Blake2bHash), EpochFinalized(Blake2bHash), } diff --git a/blockchain/src/blockchain/push.rs b/blockchain/src/blockchain/push.rs index a47bd2c467..219eed323a 100644 --- a/blockchain/src/blockchain/push.rs +++ b/blockchain/src/blockchain/push.rs @@ -161,6 +161,12 @@ impl Blockchain { } txn.commit(); + // Fork and inferior chain block fire a Stored Event. + // They can never fire a Finalized or EpochFinalized as then they would not be inferior/forked. + this.notifier + .send(BlockchainEvent::Stored(chain_info.head)) + .ok(); + Ok((result, Ok(ChunksPushResult::EmptyChunks))) } diff --git a/blockchain/src/blockchain/zkp_sync.rs b/blockchain/src/blockchain/zkp_sync.rs index 9fe4d34a05..a4c31d7899 100644 --- a/blockchain/src/blockchain/zkp_sync.rs +++ b/blockchain/src/blockchain/zkp_sync.rs @@ -151,9 +151,13 @@ impl Blockchain { ); // We shouldn't log errors if there are no listeners. - _ = this - .notifier - .send(BlockchainEvent::EpochFinalized(block_hash_blake2b)); + this.notifier + .send(BlockchainEvent::Extended(block_hash_blake2b.clone())) + .ok(); + + this.notifier + .send(BlockchainEvent::EpochFinalized(block_hash_blake2b)) + .ok(); // We don't have any block logs, so we do not notify the block log stream. @@ -336,12 +340,18 @@ impl Blockchain { ); // We shouldn't log errors if there are no listeners. + this.notifier + .send(BlockchainEvent::Extended(block_hash.clone())) + .ok(); + if is_election_block { - _ = this - .notifier - .send(BlockchainEvent::EpochFinalized(block_hash)); + this.notifier + .send(BlockchainEvent::EpochFinalized(block_hash)) + .ok(); } else { - _ = this.notifier.send(BlockchainEvent::Finalized(block_hash)); + this.notifier + .send(BlockchainEvent::Finalized(block_hash)) + .ok(); } // We don't have any block logs, so we do not notify the block log stream. diff --git a/consensus/src/consensus/remote_event_dispatcher.rs b/consensus/src/consensus/remote_event_dispatcher.rs index 84320f52dc..5ef285f711 100644 --- a/consensus/src/consensus/remote_event_dispatcher.rs +++ b/consensus/src/consensus/remote_event_dispatcher.rs @@ -253,12 +253,17 @@ impl Future for RemoteEventDispatcher { new_blocks.push(block); } BlockchainEvent::Rebranched(_reverted_blocks, adopted_blocks) => { - // We dont't notify about reverted block, only adopted blocks + // We don't notify about reverted block, only adopted blocks new_blocks.extend(adopted_blocks.into_iter().map(|(_, block)| block)); } BlockchainEvent::HistoryAdopted(_) => { // In the future we might be interested in other events } + BlockchainEvent::Stored(_block) => { + // Stored events are not reported as they are not on the main chain. + // If they ever become main chain blocks, they will be reported then with the respective + // BlockchainEvent::Rebranched(..) + } } // This hash map is used to collect all the notifications for a given peer. let mut peer_receipts: HashMap> = HashMap::new(); diff --git a/consensus/src/sync/live/block_queue/mod.rs b/consensus/src/sync/live/block_queue/mod.rs index b9991f18eb..7c66ea288e 100644 --- a/consensus/src/sync/live/block_queue/mod.rs +++ b/consensus/src/sync/live/block_queue/mod.rs @@ -428,6 +428,9 @@ impl BlockQueue { block_infos.push((block.block_number(), block_hash)); } } + BlockchainEvent::Stored(block) => { + block_infos.push((block.block_number(), block.hash())); + } } block_infos } diff --git a/consensus/src/sync/live/state_queue/mod.rs b/consensus/src/sync/live/state_queue/mod.rs index d061e39517..6e8ad76f13 100644 --- a/consensus/src/sync/live/state_queue/mod.rs +++ b/consensus/src/sync/live/state_queue/mod.rs @@ -510,7 +510,12 @@ impl Stream for StateQueue { self.diff_queue.set_diff_needed(true); } } - _ => {} + BlockchainEvent::HistoryAdopted(_) => { + // Nothing to do for adopted history + } + BlockchainEvent::Stored(_block) => { + // Block has not been applied so nothing to do here. + } } } diff --git a/light-blockchain/src/push.rs b/light-blockchain/src/push.rs index 0065dc3461..c575851f7f 100644 --- a/light-blockchain/src/push.rs +++ b/light-blockchain/src/push.rs @@ -7,7 +7,7 @@ use nimiq_blockchain_interface::{ use nimiq_hash::{Blake2bHash, Hash}; use nimiq_keys::Address; use nimiq_primitives::policy::Policy; -use parking_lot::RwLockUpgradableReadGuard; +use parking_lot::{RwLockUpgradableReadGuard, RwLockWriteGuard}; use crate::blockchain::LightBlockchain; @@ -137,9 +137,19 @@ impl LightBlockchain { PushResult::Forked } }; + + // Upgrade for the store let mut this = RwLockUpgradableReadGuard::upgrade(this); + // Otherwise, we are creating/extending a fork. Store ChainInfo. - this.chain_store.put_chain_info(chain_info); + this.chain_store.put_chain_info(chain_info.clone()); + + // Downgrade asap + let this = RwLockWriteGuard::downgrade_to_upgradable(this); + + this.notifier + .send(BlockchainEvent::Stored(chain_info.head)) + .ok(); Ok(result) } @@ -197,14 +207,18 @@ impl LightBlockchain { ); // We shouldn't log errors if there are no listeners. + this.notifier + .send(BlockchainEvent::Extended(block_hash.clone())) + .ok(); + if is_election_block { - _ = this - .notifier - .send(BlockchainEvent::EpochFinalized(block_hash)); + this.notifier + .send(BlockchainEvent::EpochFinalized(block_hash)) + .ok(); } else if is_macro_block { - _ = this.notifier.send(BlockchainEvent::Finalized(block_hash)); - } else { - _ = this.notifier.send(BlockchainEvent::Extended(block_hash)); + this.notifier + .send(BlockchainEvent::Finalized(block_hash)) + .ok(); } Ok(PushResult::Extended) @@ -347,9 +361,19 @@ impl LightBlockchain { ); // We do not log errors if there are no listeners - _ = this - .notifier - .send(BlockchainEvent::Rebranched(reverted_blocks, adopted_blocks)); + this.notifier + .send(BlockchainEvent::Rebranched(reverted_blocks, adopted_blocks)) + .ok(); + + if this.head.is_election() { + this.notifier + .send(BlockchainEvent::EpochFinalized(this.head_hash())) + .ok(); + } else if this.head.is_macro() { + this.notifier + .send(BlockchainEvent::Finalized(this.head_hash())) + .ok(); + } Ok(PushResult::Rebranched) } diff --git a/light-blockchain/src/sync.rs b/light-blockchain/src/sync.rs index 35b27c0567..772ac67487 100644 --- a/light-blockchain/src/sync.rs +++ b/light-blockchain/src/sync.rs @@ -92,9 +92,13 @@ impl LightBlockchain { this.current_validators = block.validators(); // We shouldn't log errors if there are no listeners. - _ = this - .notifier - .send(BlockchainEvent::EpochFinalized(block_hash_blake2b)); + this.notifier + .send(BlockchainEvent::Extended(block_hash_blake2b.clone())) + .ok(); + + this.notifier + .send(BlockchainEvent::EpochFinalized(block_hash_blake2b)) + .ok(); Ok(PushResult::Extended) } @@ -153,6 +157,10 @@ impl LightBlockchain { this.macro_head = block.clone().unwrap_macro(); + this.notifier + .send(BlockchainEvent::Extended(block_hash.clone())) + .ok(); + // If it's an election block, you have more steps. if block.is_election() { this.election_head = block.unwrap_macro_ref().clone(); @@ -163,12 +171,14 @@ impl LightBlockchain { this.chain_store.put_election(block.unwrap_macro().header); // We shouldn't log errors if there are no listeners. - _ = this - .notifier - .send(BlockchainEvent::EpochFinalized(block_hash)); + this.notifier + .send(BlockchainEvent::EpochFinalized(block_hash)) + .ok(); } else { // We shouldn't log errors if there are no listeners. - _ = this.notifier.send(BlockchainEvent::Finalized(block_hash)); + this.notifier + .send(BlockchainEvent::Finalized(block_hash)) + .ok(); } Ok(PushResult::Extended) diff --git a/rpc-server/src/dispatchers/blockchain.rs b/rpc-server/src/dispatchers/blockchain.rs index c7c3c50cc4..b2aa0ba425 100644 --- a/rpc-server/src/dispatchers/blockchain.rs +++ b/rpc-server/src/dispatchers/blockchain.rs @@ -678,6 +678,7 @@ impl BlockchainInterface for BlockchainDispatcher { BlockchainEvent::Rebranched(_, new_branch) => { Some(new_branch.into_iter().last().unwrap().0.into()) } + BlockchainEvent::Stored(_block) => None, }; future::ready(result) }) diff --git a/validator/src/validator.rs b/validator/src/validator.rs index cc0241186f..dff680919d 100644 --- a/validator/src/validator.rs +++ b/validator/src/validator.rs @@ -455,17 +455,23 @@ where BlockchainEvent::Extended(ref hash) => self.on_blockchain_extended(hash), BlockchainEvent::HistoryAdopted(ref hash) => self.on_blockchain_history_adopted(hash), BlockchainEvent::Finalized(ref hash) => { + // The on_blockchain_extended is necessary for the order of events to not matter. self.on_blockchain_extended(hash); self.update_consensus_state(Some(hash)); } BlockchainEvent::EpochFinalized(ref hash) => { self.init_epoch(); + // The on_blockchain_extended is necessary for the order of events to not matter. self.on_blockchain_extended(hash); self.update_consensus_state(Some(hash)); } BlockchainEvent::Rebranched(ref old_chain, ref new_chain) => { self.on_blockchain_rebranched(old_chain, new_chain) } + BlockchainEvent::Stored(ref _block) => { + // Nothing to do here for now. Forks are already reported on `fork_event_rx` + // and inferior chain blocks are irrelevant here. + } } } diff --git a/web-client/src/client/lib.rs b/web-client/src/client/lib.rs index 718373ad55..5356b19344 100644 --- a/web-client/src/client/lib.rs +++ b/web-client/src/client/lib.rs @@ -866,6 +866,9 @@ impl Client { adopted_blocks, ) } + Some(BlockchainEvent::Stored(block)) => { + (block.hash(), "stored", Array::new(), Array::new()) + } None => { break; }