diff --git a/Cargo.lock b/Cargo.lock index 7de36f845f..e091555fa1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4032,6 +4032,7 @@ dependencies = [ "nimiq-blockchain-proxy", "nimiq-keys", "nimiq-log", + "nimiq-network-interface", "nimiq-network-libp2p", "nimiq-serde", "nimiq-utils", @@ -4492,6 +4493,7 @@ dependencies = [ "bitflags 2.8.0", "futures-util", "multiaddr", + "nimiq-keys", "nimiq-serde", "nimiq-utils", "serde", diff --git a/dht/Cargo.toml b/dht/Cargo.toml index ddfd26382b..b63b60ee96 100644 --- a/dht/Cargo.toml +++ b/dht/Cargo.toml @@ -26,6 +26,7 @@ nimiq-blockchain-interface = { workspace = true } nimiq-blockchain-proxy = { workspace = true, features = ["full"] } nimiq-keys = { workspace = true } nimiq-log = { workspace = true, optional = true } +nimiq-network-interface = { workspace = true } nimiq-network-libp2p = { workspace = true } nimiq-serde = { workspace = true } nimiq-utils = { workspace = true } diff --git a/dht/src/lib.rs b/dht/src/lib.rs index afcc7deb30..c0084f2b84 100644 --- a/dht/src/lib.rs +++ b/dht/src/lib.rs @@ -1,13 +1,16 @@ use nimiq_blockchain_proxy::BlockchainProxy; use nimiq_keys::{Address, KeyPair}; +use nimiq_network_interface::{ + network::Network as NetworkInterface, validator_record::ValidatorRecord, +}; use nimiq_network_libp2p::{ dht::{DhtRecord, DhtVerifierError, Verifier as DhtVerifier}, + discovery::peer_contacts::{ValidatorInfoError, ValidatorRecordVerifier}, libp2p::kad::Record, - PeerId, + Network, PeerId, }; use nimiq_serde::Deserialize; use nimiq_utils::tagged_signing::{TaggedSignable, TaggedSigned}; -use nimiq_validator_network::validator_record::ValidatorRecord; pub struct Verifier { blockchain: BlockchainProxy, @@ -17,71 +20,46 @@ impl Verifier { pub fn new(blockchain: BlockchainProxy) -> Self { Self { blockchain } } +} - fn verify_validator_record(&self, record: &Record) -> Result { - // Deserialize the value of the record, which is a ValidatorRecord. If it fails return an error. - let validator_record = - TaggedSigned::, KeyPair>::deserialize_from_vec(&record.value) - .map_err(DhtVerifierError::MalformedValue)?; - - // Make sure the peer who signed the record is also the one presented in the record. - if let Some(publisher) = record.publisher { - if validator_record.record.peer_id != publisher { - return Err(DhtVerifierError::PublisherMismatch( - publisher, - validator_record.record.peer_id, - )); - } - } else { - log::warn!("Validating a dht record without a publisher"); - return Err(DhtVerifierError::PublisherMissing); - } - - // Deserialize the key of the record which is an Address. If it fails return an error. - let validator_address = Address::deserialize_from_vec(record.key.as_ref()) - .map_err(DhtVerifierError::MalformedKey)?; - - // Make sure the validator address used as key is identical to the one in the record. - if validator_record.record.validator_address != validator_address { - return Err(DhtVerifierError::AddressMismatch( - validator_address, - validator_record.record.validator_address, - )); - } - +impl ValidatorRecordVerifier for Verifier { + fn verify_validator_record( + &self, + signed_record: &TaggedSigned< + ValidatorRecord<::PeerId>, + KeyPair, + >, + ) -> Result<(), ValidatorInfoError> { // Acquire blockchain read access. For now exclude Light clients. let blockchain = match self.blockchain { - BlockchainProxy::Light(ref _light_blockchain) => { - return Err(DhtVerifierError::UnknownTag) - } - BlockchainProxy::Full(ref full_blockchain) => full_blockchain, + BlockchainProxy::Full(ref blockchain) => blockchain, + BlockchainProxy::Light(_) => return Err(ValidatorInfoError::StateIncomplete), }; let blockchain_read = blockchain.read(); // Get the staking contract to retrieve the public key for verification. let staking_contract = blockchain_read .get_staking_contract_if_complete(None) - .ok_or(DhtVerifierError::StateIncomplete)?; + .ok_or(ValidatorInfoError::StateIncomplete)?; // Get the public key needed for verification. let data_store = blockchain_read.get_staking_contract_store(); let txn = blockchain_read.read_transaction(); let public_key = staking_contract - .get_validator(&data_store.read(&txn), &validator_address) - .ok_or(DhtVerifierError::UnknownValidator(validator_address))? + .get_validator( + &data_store.read(&txn), + &signed_record.record.validator_address, + ) + .ok_or(ValidatorInfoError::UnknownValidator( + signed_record.record.validator_address.clone(), + ))? .signing_key; // Verify the record. - validator_record + signed_record .verify(&public_key) - .then(|| { - DhtRecord::Validator( - record.publisher.unwrap(), - validator_record.record, - record.clone(), - ) - }) - .ok_or(DhtVerifierError::InvalidSignature) + .then_some(()) + .ok_or(ValidatorInfoError::InvalidSignature) } } @@ -96,7 +74,46 @@ impl DhtVerifier for Verifier { // Depending on tag perform the verification. match tag { - ValidatorRecord::::TAG => self.verify_validator_record(record), + ValidatorRecord::::TAG => { + // Deserialize the value of the record, which is a ValidatorRecord. If it fails return an error. + let validator_record = + TaggedSigned::, KeyPair>::deserialize_from_vec( + &record.value, + ) + .map_err(DhtVerifierError::MalformedValue)?; + + // Make sure the peer who published the record is also the one signed into the record. + if record.publisher.ok_or(DhtVerifierError::MissingPublisher)? + != validator_record.record.peer_id + { + return Err(DhtVerifierError::PublisherMismatch( + record.publisher.unwrap(), + validator_record.record.peer_id, + )); + } + + // Deserialize the key of the record which is an Address. If it fails return an error. + let validator_address = Address::deserialize_from_vec(record.key.as_ref()) + .map_err(DhtVerifierError::MalformedKey)?; + + // Make sure the address used as key is also the one signed into the record. + if validator_address != validator_record.record.validator_address { + return Err(DhtVerifierError::AddressMismatch( + validator_address, + validator_record.record.validator_address, + )); + } + + self.verify_validator_record(&validator_record) + .map_err(|_| DhtVerifierError::ValidatorInfoError) + .map(|_| { + DhtRecord::Validator( + validator_record.record.peer_id, + validator_record.record, + record.clone(), + ) + }) + } _ => { log::error!(tag, "DHT invalid record tag received"); Err(DhtVerifierError::UnknownTag) diff --git a/handel/tests/mod.rs b/handel/tests/mod.rs index e0caefabc1..c2f5cf0fd5 100644 --- a/handel/tests/mod.rs +++ b/handel/tests/mod.rs @@ -135,7 +135,7 @@ impl std::fmt::Debug for Protocol { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] #[serde(bound = "C: AggregatableContribution")] struct SerializableLevelUpdate { aggregate: C, @@ -164,7 +164,7 @@ impl From> for SerializableLevelUpda } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] #[serde(bound = "C: AggregatableContribution")] struct Update(pub SerializableLevelUpdate); diff --git a/network-interface/Cargo.toml b/network-interface/Cargo.toml index 56456dd06e..4ad6363eb9 100644 --- a/network-interface/Cargo.toml +++ b/network-interface/Cargo.toml @@ -30,5 +30,6 @@ thiserror = "2.0" tokio = { version = "1.43", features = ["rt"] } tokio-stream = { version = "0.1", features = ["default", "sync"] } +nimiq-keys = { workspace = true, features = ["serde-derive"] } nimiq-serde = { workspace = true } nimiq-utils = { workspace = true, features = ["tagged-signing"] } diff --git a/network-interface/src/lib.rs b/network-interface/src/lib.rs index a001071467..52d403177e 100644 --- a/network-interface/src/lib.rs +++ b/network-interface/src/lib.rs @@ -1,5 +1,6 @@ pub mod network; pub mod peer_info; pub mod request; +pub mod validator_record; pub use multiaddr::{multiaddr, Multiaddr, Protocol}; diff --git a/network-interface/src/network.rs b/network-interface/src/network.rs index 6c852e86b6..48c82af038 100644 --- a/network-interface/src/network.rs +++ b/network-interface/src/network.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashSet, fmt::{Debug, Display}, hash::Hash, time::Duration, @@ -6,14 +7,16 @@ use std::{ use async_trait::async_trait; use futures::stream::BoxStream; +use nimiq_keys::{Address, KeyPair}; use nimiq_serde::{Deserialize, DeserializeError, Serialize}; -use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable}; +use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSigned}; use thiserror::Error; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use crate::{ peer_info::*, request::{Message, Request, RequestError}, + validator_record::ValidatorRecord, }; /// Network events that the network will report when subscribing @@ -87,7 +90,17 @@ pub enum SendError { #[async_trait] pub trait Network: Send + Sync + Unpin + 'static { - type PeerId: Copy + Debug + Display + Ord + Hash + Send + Sync + Unpin + 'static; + type PeerId: Copy + + Debug + + Display + + Ord + + Hash + + Send + + Sync + + Unpin + + Serialize + + Deserialize + + 'static; type AddressType: Debug + Display + 'static; type Error: std::error::Error; type PubsubId: PubsubId + Send + Sync + Unpin; @@ -113,6 +126,14 @@ pub trait Network: Send + Sync + Unpin + 'static { min_peers: usize, ) -> Result, Self::Error>; + /// Returns all peer ids that are known for the given validator. + /// The returned list might contain unverified mappings. + fn get_peers_by_validator( + &self, + validator_address: &Address, + include_unverified: bool, + ) -> HashSet; + /// Returns true when the given peer provides the services flags that are required by us fn peer_provides_required_services(&self, peer_id: Self::PeerId) -> bool; @@ -218,4 +239,13 @@ pub trait Network: Send + Sync + Unpin + 'static { request_id: Self::RequestId, response: Req::Response, ) -> Result<(), Self::Error>; + + /// Registers a callback to produce a signed ValidatorRecord from a given peer_id and timestamp. + fn register_validator_signing_callback( + &self, + callback: impl Fn(Self::PeerId, u64) -> TaggedSigned, KeyPair> + + Send + + Sync + + 'static, + ); } diff --git a/network-interface/src/request/mod.rs b/network-interface/src/request/mod.rs index f911ee15a4..98ca1c70b7 100644 --- a/network-interface/src/request/mod.rs +++ b/network-interface/src/request/mod.rs @@ -129,7 +129,7 @@ impl RequestKind for MessageMarker { } pub trait RequestCommon: - Serialize + Deserialize + Send + Sync + Unpin + fmt::Debug + 'static + Serialize + Deserialize + Send + Sync + Unpin + fmt::Debug + Clone + 'static { type Kind: RequestKind; const TYPE_ID: u16; diff --git a/validator-network/src/validator_record.rs b/network-interface/src/validator_record.rs similarity index 100% rename from validator-network/src/validator_record.rs rename to network-interface/src/validator_record.rs diff --git a/network-libp2p/Cargo.toml b/network-libp2p/Cargo.toml index a43461ea59..2e7de33952 100644 --- a/network-libp2p/Cargo.toml +++ b/network-libp2p/Cargo.toml @@ -51,6 +51,7 @@ nimiq-utils = { workspace = true, features = [ "tagged-signing", "libp2p", "time", + "spawn", ] } nimiq-validator-network = { workspace = true } @@ -85,7 +86,14 @@ libp2p = { version = "0.54", default-features = false, features = [ [dev-dependencies] # In dev/testing we require more tokio features -tokio = { version = "1.43", features = ["macros", "rt", "rt-multi-thread", "test-util", "time", "tracing"] } +tokio = { version = "1.43", features = [ + "macros", + "rt", + "rt-multi-thread", + "test-util", + "time", + "tracing", +] } nimiq-test-log = { workspace = true } nimiq-test-utils = { workspace = true } @@ -93,4 +101,9 @@ nimiq-test-utils = { workspace = true } [features] kad = [] metrics = ["prometheus-client"] -tokio-websocket = ["libp2p/dns", "libp2p/tcp", "libp2p/tokio", "libp2p/websocket"] +tokio-websocket = [ + "libp2p/dns", + "libp2p/tcp", + "libp2p/tokio", + "libp2p/websocket", +] diff --git a/network-libp2p/src/behaviour.rs b/network-libp2p/src/behaviour.rs index ac26a382af..124c3693b3 100644 --- a/network-libp2p/src/behaviour.rs +++ b/network-libp2p/src/behaviour.rs @@ -12,9 +12,11 @@ use parking_lot::RwLock; use rand::rngs::OsRng; use crate::{ - connection_pool, - connection_pool::behaviour::Config as PoolConfig, - discovery::{self, peer_contacts::PeerContactBook}, + connection_pool::{self, behaviour::Config as PoolConfig}, + discovery::{ + self, + peer_contacts::{PeerContactBook, ValidatorRecordVerifier}, + }, dispatch::codecs::MessageCodec, Config, }; @@ -50,6 +52,7 @@ impl Behaviour { contacts: Arc>, peer_score_params: gossipsub::PeerScoreParams, force_dht_server_mode: bool, + #[cfg(feature = "kad")] verifier: Arc, ) -> Self { let public_key = config.keypair.public(); let peer_id = public_key.to_peer_id(); @@ -68,6 +71,8 @@ impl Behaviour { config.discovery.clone(), config.keypair.clone(), Arc::clone(&contacts), + #[cfg(feature = "kad")] + verifier, ); // Gossipsub behaviour @@ -169,6 +174,6 @@ impl Behaviour { /// Updates the scores of all peers in the peer contact book. /// Updates are performed with the score values of Gossipsub pub fn update_scores(&self, contacts: Arc>) { - contacts.read().update_scores(&self.gossipsub); + contacts.write().update_scores(&self.gossipsub); } } diff --git a/network-libp2p/src/connection_pool/behaviour.rs b/network-libp2p/src/connection_pool/behaviour.rs index 757bde0415..e95047dcf1 100644 --- a/network-libp2p/src/connection_pool/behaviour.rs +++ b/network-libp2p/src/connection_pool/behaviour.rs @@ -534,7 +534,7 @@ impl Behaviour { let own_peer_id = own_contact.peer_id(); contacts - .query(self.required_services) + .query(self.required_services, true) .filter_map(|contact| { let peer_id = contact.peer_id(); if peer_id != own_peer_id @@ -562,7 +562,7 @@ impl Behaviour { let own_peer_id = own_contact.peer_id(); contacts - .query(services) + .query(services, true) .filter_map(|contact| { let peer_id = contact.peer_id(); if peer_id != own_peer_id diff --git a/network-libp2p/src/dht.rs b/network-libp2p/src/dht.rs index 7502d190c1..3e529db829 100644 --- a/network-libp2p/src/dht.rs +++ b/network-libp2p/src/dht.rs @@ -1,8 +1,9 @@ use libp2p::{kad::Record, PeerId}; use nimiq_keys::Address; -use nimiq_network_interface::network::Network as NetworkInterface; +use nimiq_network_interface::{ + network::Network as NetworkInterface, validator_record::ValidatorRecord, +}; use nimiq_serde::DeserializeError; -use nimiq_validator_network::validator_record::ValidatorRecord; pub use crate::network_types::DhtRecord; use crate::Network; @@ -22,6 +23,8 @@ pub enum DhtVerifierError { ), StateIncomplete, InvalidSignature, + ValidatorInfoError, + MissingPublisher, } pub trait Verifier: Send + Sync { diff --git a/network-libp2p/src/discovery/behaviour.rs b/network-libp2p/src/discovery/behaviour.rs index 4711f96389..cfe99b8d88 100644 --- a/network-libp2p/src/discovery/behaviour.rs +++ b/network-libp2p/src/discovery/behaviour.rs @@ -22,7 +22,7 @@ use parking_lot::RwLock; use super::{ handler::{Handler, HandlerOutEvent}, - peer_contacts::{PeerContact, PeerContactBook}, + peer_contacts::{PeerContact, PeerContactBook, ValidatorRecordVerifier}, }; #[derive(Clone, Debug)] @@ -115,6 +115,10 @@ pub struct Behaviour { /// Timer to do house-keeping in the peer address book. house_keeping_timer: Interval, + + /// dht verifier TODO + #[cfg(feature = "kad")] + verifier: Arc, } impl Behaviour { @@ -122,9 +126,10 @@ impl Behaviour { config: Config, keypair: Keypair, peer_contact_book: Arc>, + #[cfg(feature = "kad")] verifier: Arc, ) -> Self { let house_keeping_timer = interval(config.house_keeping_interval); - peer_contact_book.write().update_own_contact(&keypair); + peer_contact_book.write().refresh_own_contact(); // Report our own known addresses as candidates to the swarm let mut events = VecDeque::new(); @@ -139,14 +144,14 @@ impl Behaviour { peer_contact_book, events, house_keeping_timer, + #[cfg(feature = "kad")] + verifier, } } /// Adds addresses into our own contact within the peer contact book pub fn add_own_addresses(&self, addresses: Vec) { - self.peer_contact_book - .write() - .add_own_addresses(addresses, &self.keypair) + self.peer_contact_book.write().add_own_addresses(addresses) } /// Returns whether an address in `Multiaddr` format is a dialable websocket address @@ -177,6 +182,8 @@ impl NetworkBehaviour for Behaviour { self.keypair.clone(), self.peer_contact_book(), remote_addr.clone(), + #[cfg(feature = "kad")] + Arc::clone(&self.verifier), )) } @@ -194,6 +201,8 @@ impl NetworkBehaviour for Behaviour { self.keypair.clone(), self.peer_contact_book(), addr.clone(), + #[cfg(feature = "kad")] + Arc::clone(&self.verifier), )) } @@ -227,7 +236,7 @@ impl NetworkBehaviour for Behaviour { Poll::Ready(Some(_)) => { trace!("Doing house-keeping in peer address book"); let mut peer_address_book = self.peer_contact_book.write(); - peer_address_book.update_own_contact(&self.keypair); + peer_address_book.refresh_own_contact(); peer_address_book.house_keeping(); } Poll::Ready(None) => unreachable!(), diff --git a/network-libp2p/src/discovery/handler.rs b/network-libp2p/src/discovery/handler.rs index ca2844d074..0140d43a75 100644 --- a/network-libp2p/src/discovery/handler.rs +++ b/network-libp2p/src/discovery/handler.rs @@ -1,6 +1,7 @@ use std::{ collections::{HashSet, VecDeque}, future::Future as _, + ops::Deref, pin::Pin, sync::Arc, task::{Context, Poll, Waker}, @@ -33,7 +34,10 @@ use thiserror::Error; use super::{ behaviour::Config, message_codec::{MessageReader, MessageWriter}, - peer_contacts::{PeerContactBook, SignedPeerContact}, + peer_contacts::{ + PeerContactBook, PeerContactError, SignedPeerContact, ValidatorInfoError, + ValidatorRecordVerifier, + }, protocol::{ChallengeNonce, DiscoveryMessage, DiscoveryProtocol}, }; use crate::{AUTONAT_DIAL_BACK_PROTOCOL, AUTONAT_DIAL_REQUEST_PROTOCOL}; @@ -134,6 +138,11 @@ pub struct Handler { /// The peer contact book peer_contact_book: Arc>, + /// Used to verify PeerContacts. + /// Required as contacts could contain a ValidatorInfo, for which a current verification key is required. + #[cfg(feature = "kad")] + verifier: Arc, + /// The peer address we're connected to (address that got us connected). peer_address: Multiaddr, @@ -179,8 +188,9 @@ impl Handler { keypair: Keypair, peer_contact_book: Arc>, peer_address: Multiaddr, + #[cfg(feature = "kad")] verifier: Arc, ) -> Self { - if let Some(peer_contact) = peer_contact_book.write().get(&peer_id) { + if let Some(peer_contact) = peer_contact_book.write().get_mut(&peer_id) { if let Some(outer_protocol_address) = outer_protocol_address(&peer_address) { peer_contact.set_outer_protocol_address(outer_protocol_address); } @@ -202,6 +212,8 @@ impl Handler { inbound: None, outbound: None, waker: None, + #[cfg(feature = "kad")] + verifier, events: VecDeque::new(), } } @@ -232,7 +244,7 @@ impl Handler { let mut rng = thread_rng(); peer_contact_book - .query(self.services_filter) + .query(self.services_filter, false) .choose_multiple(&mut rng, limit) .into_iter() .map(|c| c.signed().clone()) @@ -287,6 +299,27 @@ pub(crate) fn outer_protocol_address(addr: &Multiaddr) -> Option { .unwrap_or(None) } +fn filter_contact( + #[cfg(feature = "kad")] verifier: Arc, +) -> Box Option> { + Box::new(move |mut peer_contact: SignedPeerContact| { + match peer_contact.verify( + #[cfg(feature = "kad")] + verifier.deref(), + ) { + Ok(_) => Some(peer_contact), + // If there is a validator record, but the state is incomplete, + // then it cannot be verified and must be checked again at a later time. + Err(PeerContactError::ValidatorRecord(ValidatorInfoError::StateIncomplete)) => { + peer_contact.local_only = true; + Some(peer_contact) + } + // Filter contact if verification fails for any other reason. + Err(_) => None, + } + }) +} + impl ConnectionHandler for Handler { type FromBehaviour = (); type ToBehaviour = HandlerOutEvent; @@ -526,7 +559,11 @@ impl ConnectionHandler for Handler { peer_contacts, } => { // Check the peer contact for a valid signature. - if !peer_contact.verify() { + let filter_fn = filter_contact( + #[cfg(feature = "kad")] + Arc::clone(&self.verifier), + ); + let Some(peer_contact) = filter_fn(peer_contact.clone()) else { return Poll::Ready( ConnectionHandlerEvent::NotifyBehaviour( HandlerOutEvent::Error( @@ -536,7 +573,7 @@ impl ConnectionHandler for Handler { ), ), ); - } + }; if self.peer_id != peer_contact.peer_id() { return Poll::Ready( @@ -574,19 +611,13 @@ impl ConnectionHandler for Handler { ), ); } - for peer_contact in &peer_contacts { - if !peer_contact.verify() { - return Poll::Ready( - ConnectionHandlerEvent::NotifyBehaviour( - HandlerOutEvent::Error( - Error::InvalidPeerContactSignature { - peer_contact: peer_contact.clone(), - }, - ), - ), - ); - } - } + let peer_contacts: Vec = peer_contacts + .into_iter() + .filter_map(filter_contact( + #[cfg(feature = "kad")] + Arc::clone(&self.verifier), + )) + .collect(); let mut peer_contact_book = self.peer_contact_book.write(); @@ -685,19 +716,14 @@ impl ConnectionHandler for Handler { ), ); } - for peer_contact in &peer_contacts { - if !peer_contact.verify() { - return Poll::Ready( - ConnectionHandlerEvent::NotifyBehaviour( - HandlerOutEvent::Error( - Error::InvalidPeerContactSignature { - peer_contact: peer_contact.clone(), - }, - ), - ), - ); - } - } + + let peer_contacts: Vec = peer_contacts + .into_iter() + .filter_map(filter_contact( + #[cfg(feature = "kad")] + Arc::clone(&self.verifier), + )) + .collect(); // Insert the new peer contacts into the peer contact book. self.peer_contact_book.write().insert_all_filtered( diff --git a/network-libp2p/src/discovery/peer_contacts.rs b/network-libp2p/src/discovery/peer_contacts.rs index c77961e62b..68d0fb9f44 100644 --- a/network-libp2p/src/discovery/peer_contacts.rs +++ b/network-libp2p/src/discovery/peer_contacts.rs @@ -1,5 +1,7 @@ use std::{ - collections::{HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet}, + fmt::Debug, + ops::Deref, sync::Arc, time::Duration, }; @@ -11,14 +13,13 @@ use libp2p::{ multiaddr::Protocol, Multiaddr, PeerId, }; -use nimiq_keys::{Address, KeyPair}; +use nimiq_keys::{Address, KeyPair as SchnorrKey}; use nimiq_network_interface::{ network::Network as NetworkInterface, peer_info::{PeerInfo, Services}, + validator_record::ValidatorRecord, }; -use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSignature}; -use nimiq_validator_network::validator_record::ValidatorRecord; -use parking_lot::RwLock; +use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSignature, TaggedSigned}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -28,6 +29,10 @@ use crate::{utils, Network}; pub enum PeerContactError { #[error("Exceeded number of advertised addresses")] AdvertisedAddressesExceeded, + #[error("Validator Record failed to verify")] + ValidatorRecord(ValidatorInfoError), + #[error("Contact signature is invalid")] + InvalidSignature, } /// The validator info contains all information which is not present in a [PeerContact] @@ -40,13 +45,58 @@ pub struct ValidatorInfo { /// The signature for the [ValidatorRecord]. /// It does _not_ verify for this structure, but only once the [nimiq_utils::tagged_signing::TaggedSigned] is reconstructed /// with the given information of this struct and the corresponding [PeerContact]. - signature: TaggedSignature::PeerId>, KeyPair>, + signature: TaggedSignature::PeerId>, SchnorrKey>, +} + +#[derive(Debug)] +pub enum ValidatorInfoError { + StateIncomplete, + InvalidSignature, + UnknownValidator(Address), +} + +impl ValidatorInfo { + pub fn new( + validator_address: Address, + signature: TaggedSignature< + ValidatorRecord<::PeerId>, + SchnorrKey, + >, + ) -> Self { + Self { + validator_address, + signature, + } + } +} + +pub trait ValidatorRecordVerifier: Send + Sync { + fn verify_validator_record( + &self, + signed_record: &TaggedSigned< + ValidatorRecord<::PeerId>, + SchnorrKey, + >, + ) -> Result<(), ValidatorInfoError>; +} + +impl ValidatorRecordVerifier for () { + fn verify_validator_record( + &self, + _signed_record: &TaggedSigned< + ValidatorRecord<::PeerId>, + SchnorrKey, + >, + ) -> Result<(), ValidatorInfoError> { + Ok(()) + } } /// A plain peer contact. This contains: /// /// - A set of multi-addresses for the peer. /// - The peer's public key. +/// - An optional [ValidatorInfo] in case this peer is a running a validator. /// - A bitmask of the services supported by this peer. /// - A timestamp when this contact information was generated. /// @@ -91,15 +141,15 @@ impl PeerContact { Ok(Self { addresses, public_key, - services, validator_info: None, + services, timestamp, }) } /// Derives the peer ID from the public key pub fn peer_id(&self) -> PeerId { - self.public_key.clone().to_peer_id() + self.public_key.to_peer_id() } /// Returns the timestamp of the contact. It is generally set to the time the contact was created. @@ -123,6 +173,7 @@ impl PeerContact { SignedPeerContact { inner: self, signature, + local_only: false, } } @@ -160,10 +211,35 @@ impl PeerContact { /// Verifies whether the lengths of the advertised addresses are within /// the expected limits. This is helpful to verify a received peer contact. - pub fn verify(&self) -> Result<(), PeerContactError> { + pub fn verify( + &self, + #[cfg(feature = "kad")] verifier: &dyn ValidatorRecordVerifier, + ) -> Result<(), PeerContactError> { if self.addresses.len() > Self::MAX_ADDRESSES { return Err(PeerContactError::AdvertisedAddressesExceeded); } + + // In case the Contact includes a ValidatorInfo it also needs to be verified. + #[cfg(feature = "kad")] + if let Some(ValidatorInfo { + validator_address, + signature, + }) = self.validator_info.clone() + { + // Reconstruct the record + let record = ValidatorRecord { + validator_address, + timestamp: self.timestamp, + peer_id: self.peer_id(), + }; + + let signed_record = TaggedSigned::new(record, signature); + + verifier + .verify_validator_record(&signed_record) + .map_err(PeerContactError::ValidatorRecord)?; + } + Ok(()) } } @@ -180,17 +256,30 @@ pub struct SignedPeerContact { /// The signature over the serialized peer contact. pub signature: TaggedSignature, + + #[serde(skip)] + pub local_only: bool, } impl SignedPeerContact { /// Verifies that the signature is valid for this peer contact and also does /// intrinsic verification on the inner PeerContact. - pub fn verify(&self) -> bool { - if self.inner.verify().is_err() { - return false; - }; - self.signature + pub fn verify( + &self, + #[cfg(feature = "kad")] verifier: &dyn ValidatorRecordVerifier, + ) -> Result<(), PeerContactError> { + // The record signature must be verified first. + if !self + .signature .tagged_verify(&self.inner, &self.inner.public_key) + { + return Err(PeerContactError::InvalidSignature); + } + + self.inner.verify( + #[cfg(feature = "kad")] + verifier, + ) } /// Gets the public key of this peer contact. @@ -213,7 +302,7 @@ struct PeerContactMeta { /// This encapsulates a peer contact (signed), but also pre-computes frequently used values such as `peer_id` and /// `protocols`. It also contains meta-data that can be mutated. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct PeerContactInfo { /// The peer ID derived from the public key in the peer contact. peer_id: PeerId, @@ -222,7 +311,7 @@ pub struct PeerContactInfo { contact: SignedPeerContact, /// Mutable meta-data. - meta: RwLock, + meta: PeerContactMeta, } impl From for PeerContactInfo { @@ -232,10 +321,10 @@ impl From for PeerContactInfo { Self { peer_id, contact, - meta: RwLock::new(PeerContactMeta { + meta: PeerContactMeta { score: 0., outer_protocol_address: None, - }), + }, } } } @@ -272,7 +361,10 @@ impl PeerContactInfo { } /// Returns whether the peer contact exceeds its age limit - pub fn exceeds_age(&self, max_age: Duration, unix_time: Duration) -> bool { + pub fn exceeds_age(&self, max_age: Duration) -> bool { + let unix_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); unix_time .checked_sub(Duration::from_secs(self.contact.inner.timestamp())) .is_some_and(|age| age > max_age) @@ -285,42 +377,42 @@ impl PeerContactInfo { /// Gets the peer score pub fn get_score(&self) -> f64 { - self.meta.read().score + self.meta.score } /// Sets the peer score - pub fn set_score(&self, score: f64) { - self.meta.write().score = score; + pub fn set_score(&mut self, score: f64) { + self.meta.score = score; } /// Gets the outer protocol address of the peer. For example `/ip4/x.x.x.x` or `/dns4/foo.bar` pub fn get_outer_protocol_address(&self) -> Option { - self.meta.read().outer_protocol_address.clone() + self.meta.outer_protocol_address.clone() } /// Sets the outer protocol address of the peer once - pub fn set_outer_protocol_address(&self, addr: Multiaddr) { - self.meta - .write() - .outer_protocol_address - .get_or_insert_with(|| { - trace!(peer_id = %self.peer_id, %addr, "Set outer protocol address for peer"); - addr - }); + pub fn set_outer_protocol_address(&mut self, addr: Multiaddr) { + self.meta.outer_protocol_address.get_or_insert_with(|| { + trace!(peer_id = %self.peer_id, %addr, "Set outer protocol address for peer"); + addr + }); } } /// Main structure that holds the peer information that has been obtained or /// discovered by the discovery protocol. -#[derive(Debug)] pub struct PeerContactBook { /// Contact information for our own. own_peer_contact: PeerContactInfo, /// Own Peer ID (also present in `own_peer_contact`) own_peer_id: PeerId, + /// Identity keypair for this node. + key_pair: Keypair, /// Contact information for other peers in the network indexed by their /// peer ID. - peer_contacts: HashMap>, + peer_contacts: HashMap, + /// Map from validator address to peer ids. + validator_peer_ids: HashMap>, /// Only return secure websocket addresses. /// With this flag non secure websocket addresses will be stored (to still have a valid signature of the peer contact) /// but won't be returned when calling `get_addresses` @@ -329,30 +421,86 @@ pub struct PeerContactBook { allow_loopback_addresses: bool, /// Flag to indicate whether to support memory transport addresses memory_transport: bool, + /// Validator signing callback + validator_record_signer: Option< + Box TaggedSigned, SchnorrKey> + Send + Sync>, + >, + /// Validator record verifier + #[cfg(feature = "kad")] + validator_record_verifier: Arc, +} + +impl Debug for PeerContactBook { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PeerContactBook") + .field("own_peer_contact", &self.own_peer_contact) + .field("own_peer_id", &self.own_peer_id) + .field("peer_contacts", &self.peer_contacts) + .field("validator_peer_ids", &self.validator_peer_ids) + .field("only_secure_addresses", &self.only_secure_addresses) + .field("allow_loopback_addresses", &self.allow_loopback_addresses) + .field("memory_transport", &self.memory_transport) + .finish() + } } impl PeerContactBook { /// If a peer's age exceeds this value in seconds, it is removed (30 minutes) - pub const MAX_PEER_AGE: u64 = 30 * 60; + pub const MAX_PEER_AGE: Duration = Duration::from_secs(30 * 60); /// Creates a new `PeerContactBook` given our own peer contact information. pub fn new( - own_peer_contact: SignedPeerContact, + mut own_peer_contact: PeerContact, + key_pair: Keypair, only_secure_addresses: bool, allow_loopback_addresses: bool, memory_transport: bool, + #[cfg(feature = "kad")] validator_record_verifier: Arc, ) -> Self { - let own_peer_id = own_peer_contact.inner.peer_id(); + let own_peer_id = own_peer_contact.peer_id(); + own_peer_contact.set_current_time(); + Self { - own_peer_contact: own_peer_contact.into(), + own_peer_contact: own_peer_contact.sign(&key_pair).into(), own_peer_id, + key_pair, peer_contacts: HashMap::new(), only_secure_addresses, allow_loopback_addresses, memory_transport, + validator_peer_ids: HashMap::new(), + validator_record_signer: None, + #[cfg(feature = "kad")] + validator_record_verifier, } } + /// Obtain a list of peer ids associated to the given validator address + pub fn get_validator_peer_ids( + &self, + validator_address: &Address, + include_unverified: bool, + ) -> HashSet { + let Some(peers) = self.validator_peer_ids.get(validator_address) else { + return HashSet::new(); + }; + + if include_unverified { + return peers.clone(); + } + + peers + .iter() + .filter(|peer_id| { + self.peer_contacts + .get(peer_id) + .map(|contact| !contact.contact.local_only) + .unwrap_or(false) + }) + .cloned() + .collect() + } + /// Insert a peer contact or update an existing one pub fn insert(&mut self, contact: SignedPeerContact) { // Don't insert our own contact into our peer contacts @@ -360,28 +508,42 @@ impl PeerContactBook { return; } - log::debug!(peer_id = %contact.peer_id(), addresses = ?contact.inner.addresses, "Adding peer contact"); + let peer_id = contact.peer_id(); + let current_ts = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs(); + if let Some(validator) = &contact.inner.validator_info { + self.validator_peer_ids + .entry(validator.validator_address.clone()) + .or_default() + .insert(peer_id); + } + let info = PeerContactInfo::from(contact); - let peer_id = info.peer_id; match self.peer_contacts.entry(peer_id) { - std::collections::hash_map::Entry::Occupied(mut entry) => { + Entry::Occupied(mut entry) => { let entry_value = entry.get_mut(); // Only update the contact if the timestamp is greater than the entry we have for this peer // and if the timestamp of the peer contact is not in the future if entry_value.contact().timestamp < info.contact().timestamp && info.contact().timestamp <= current_ts { - *entry_value = Arc::new(info); + *entry_value = info; } } - std::collections::hash_map::Entry::Vacant(entry) => { - entry.insert(Arc::new(info)); + Entry::Vacant(entry) => { + log::trace!( + peer_id = %info.peer_id, + services = ?info.services(), + addresses = ?info.contact.inner.addresses, + validator_address = ?info.contact.inner.validator_info.as_ref().map(|info| info.validator_address.clone()), + "Adding peer contact", + ); + entry.insert(info); } } } @@ -426,16 +588,23 @@ impl PeerContactBook { // TODO Check peer contact timestamp // Call `insert()` here instead? - let info = Arc::new(info); let is_new = self .peer_contacts - .insert(info.peer_id, Arc::clone(&info)) + .insert(info.peer_id, info.clone()) .is_none(); + if let Some(validator_info) = &info.contact.inner.validator_info { + self.validator_peer_ids + .entry(validator_info.validator_address.clone()) + .or_default() + .insert(info.peer_id); + } + if is_new { log::trace!( peer_id = %info.peer_id, services = ?info.services(), addresses = ?info.contact.inner.addresses, + validator_address = ?info.contact.inner.validator_info.as_ref().map(|info| info.validator_address.clone()), "Adding peer contact", ); } @@ -462,10 +631,16 @@ impl PeerContactBook { } } - /// Gets a peer contact if it exists given its peer_id. + /// Gets a reference to a peer contact if it exists given its peer_id. /// If the peer_id is not found, `None` is returned. - pub fn get(&self, peer_id: &PeerId) -> Option> { - self.peer_contacts.get(peer_id).cloned() + pub fn get(&self, peer_id: &PeerId) -> Option<&PeerContactInfo> { + self.peer_contacts.get(peer_id) + } + + /// Gets a mutable reference to a peer contact if it exists given its peer_id. + /// If the peer_id is not found, `None` is returned. + pub fn get_mut(&mut self, peer_id: &PeerId) -> Option<&mut PeerContactInfo> { + self.peer_contacts.get_mut(peer_id) } /// Gets the peer contact's addresses if it exists given its peer_id. @@ -509,12 +684,16 @@ impl PeerContactBook { /// Gets a set of peer contacts given a services filter. /// Every peer contact that matches such services will be returned. - pub fn query(&self, services: Services) -> impl Iterator> + '_ { + pub fn query( + &self, + services: Services, + include_local_only: bool, + ) -> impl Iterator + '_ { // TODO: This is a naive implementation // TODO: Sort by score? self.peer_contacts.iter().filter_map(move |(_, contact)| { - if contact.matches(services) { - Some(Arc::clone(contact)) + if contact.matches(services) && (!contact.contact.local_only || include_local_only) { + Some(contact) } else { None } @@ -523,9 +702,8 @@ impl PeerContactBook { /// Updates the score of every peer in the contact book with the gossipsub /// peer score. - pub fn update_scores(&self, gossipsub: &gossipsub::Behaviour) { - let contacts = self.peer_contacts.iter(); - + pub fn update_scores(&mut self, gossipsub: &gossipsub::Behaviour) { + let contacts = self.peer_contacts.iter_mut(); for contact in contacts { if let Some(score) = gossipsub.peer_score(contact.0) { contact.1.set_score(score); @@ -536,39 +714,42 @@ impl PeerContactBook { } /// Adds a set of addresses to the list of addresses known for our own contact. - pub fn add_own_addresses>( - &mut self, - addresses: I, - keypair: &Keypair, - ) { + pub fn add_own_addresses>(&mut self, addresses: I) { let mut contact = self.own_peer_contact.contact.inner.clone(); let addresses = addresses.into_iter().collect::>(); trace!(?addresses, "Adding own addresses"); contact.add_addresses(addresses); - self.own_peer_contact = PeerContactInfo::from(contact.sign(keypair)); + self.update_own_contact(contact); } /// Removes a set of addresses from the list of addresses known for our own. - pub fn remove_own_addresses>( - &mut self, - addresses: I, - keypair: &Keypair, - ) { + pub fn remove_own_addresses>(&mut self, addresses: I) { let mut contact = self.own_peer_contact.contact.inner.clone(); let addresses = addresses.into_iter().collect::>(); contact.remove_addresses(addresses); - self.own_peer_contact = PeerContactInfo::from(contact.sign(keypair)); + self.update_own_contact(contact); } - /// Updates the timestamp of our own contact - pub fn update_own_contact(&mut self, keypair: &Keypair) { - // Not really optimal to clone here, but *shrugs* - let mut contact = self.own_peer_contact.contact.inner.clone(); + /// Updates the timestamp of our own contact. + pub fn refresh_own_contact(&mut self) { + let contact = self.own_peer_contact.contact.inner.clone(); + self.update_own_contact(contact); + } - // Update timestamp + fn update_own_contact(&mut self, mut contact: PeerContact) { + // Update timestamp. contact.set_current_time(); - self.own_peer_contact = PeerContactInfo::from(contact.sign(keypair)); + // Update validator info. + contact.validator_info = self.validator_record_signer.as_ref().and_then(|callback| { + let tagged_signed = (callback)(contact.peer_id(), contact.timestamp); + Some(ValidatorInfo { + validator_address: tagged_signed.record.validator_address.clone(), + signature: tagged_signed.signature.clone(), + }) + }); + + self.own_peer_contact = PeerContactInfo::from(contact.sign(&self.key_pair)); } /// Gets our own contact information @@ -576,33 +757,6 @@ impl PeerContactBook { &self.own_peer_contact } - /// Removes peer contacts that have already exceeded the maximum age as - /// defined in `MAX_PEER_AGE`. - pub fn house_keeping(&mut self) { - if let Ok(unix_time) = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) { - let delete_peers = self - .peer_contacts - .iter() - .filter_map(|(peer_id, peer_contact)| { - if peer_contact.exceeds_age( - Duration::from_secs(PeerContactBook::MAX_PEER_AGE), - unix_time, - ) { - debug!(%peer_id, "Removing peer contact because of old age"); - Some(peer_id) - } else { - None - } - }) - .cloned() - .collect::>(); - - for peer_id in delete_peers { - self.peer_contacts.remove(&peer_id); - } - } - } - /// Returns true if an address is valid for dialing. /// It performs basic checks against unsupported addresses. pub fn is_address_dialable(&self, address: &Multiaddr) -> bool { @@ -659,6 +813,71 @@ impl PeerContactBook { } } } + + /// Removes expired or invalid peer contacts from the contact books. + pub fn house_keeping(&mut self) { + let peers_to_delete = self + .peer_contacts + .iter_mut() + .filter_map(|(peer_id, peer_contact)| { + // Remove expired peers. + if peer_contact.exceeds_age(PeerContactBook::MAX_PEER_AGE) { + debug!(%peer_id, "Removing peer contact because of old age"); + return Some(peer_id.clone()); + } + + // Re-verify local_only contacts. + #[cfg(feature = "kad")] + if peer_contact.contact.local_only { + let result = peer_contact + .contact + .verify(self.validator_record_verifier.deref()); + match result { + // Verification succeeded, clear local_only flag. + Ok(_) => peer_contact.contact.local_only = false, + // State is (still) incomplete, do nothing. + Err(PeerContactError::ValidatorRecord( + ValidatorInfoError::StateIncomplete, + )) => {} + // Verification failed, delete contact. + Err(_) => return Some(peer_id.clone()), + }; + } + + None + }) + .collect::>(); + + for peer_id in peers_to_delete { + let contact = self.peer_contacts.remove(&peer_id).unwrap(); + + let Some(validator_info) = contact.contact.inner.validator_info.as_ref() else { + continue; + }; + + let entry = self + .validator_peer_ids + .entry(validator_info.validator_address.clone()); + if let Entry::Occupied(mut entry) = entry { + entry.get_mut().remove(&peer_id); + if entry.get().is_empty() { + entry.remove(); + } + } + } + } + + /// Registers a callback to produce a signed ValidatorRecord from a given peer_id and timestamp. + pub fn register_validator_signing_callback( + &mut self, + callback: impl Fn(PeerId, u64) -> TaggedSigned, SchnorrKey> + + Send + + Sync + + 'static, + ) { + self.validator_record_signer = Some(Box::new(callback)); + self.refresh_own_contact(); + } } mod serde_public_key { diff --git a/network-libp2p/src/network.rs b/network-libp2p/src/network.rs index e60e8d471f..827a97432c 100644 --- a/network-libp2p/src/network.rs +++ b/network-libp2p/src/network.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, future::Future, pin::Pin, sync::Arc, @@ -13,6 +13,7 @@ use futures::{future::BoxFuture, ready, stream::BoxStream, Stream, StreamExt}; use libp2p::{ gossipsub, request_response::InboundRequestId, swarm::NetworkInfo, Multiaddr, PeerId, Swarm, }; +use nimiq_keys::{Address, KeyPair}; use nimiq_network_interface::{ network::{ CloseReason, MsgAcceptance, Network as NetworkInterface, NetworkEvent, SubscribeEvents, @@ -23,6 +24,7 @@ use nimiq_network_interface::{ InboundRequestError, Message, OutboundRequestError, Request, RequestCommon, RequestError, RequestSerialize, RequestType, }, + validator_record::ValidatorRecord, }; use nimiq_serde::{Deserialize, Serialize}; use nimiq_time::{interval, timeout}; @@ -38,7 +40,7 @@ use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; use crate::network_metrics::NetworkMetrics; use crate::{ dht, - discovery::peer_contacts::PeerContactBook, + discovery::peer_contacts::{PeerContactBook, ValidatorRecordVerifier}, network_types::{GossipsubId, NetworkAction, ValidateMessage}, rate_limiting::RateLimitConfig, swarm::{new_swarm, swarm_task}, @@ -79,16 +81,23 @@ impl Network { /// pub async fn new( config: Config, - #[cfg(feature = "kad")] dht_verifier: impl dht::Verifier + 'static, + #[cfg(feature = "kad")] verifier: impl dht::Verifier + ValidatorRecordVerifier + 'static, ) -> Self { let required_services = config.required_services; // TODO: persist to disk let own_peer_contact = config.peer_contact.clone(); + + #[cfg(feature = "kad")] + let verifier = Arc::new(verifier); + let contacts = Arc::new(RwLock::new(PeerContactBook::new( - own_peer_contact.sign(&config.keypair), + own_peer_contact, + config.keypair.clone(), config.only_secure_ws_connections, config.allow_loopback_addresses, config.memory_transport, + #[cfg(feature = "kad")] + (Arc::clone(&verifier) as Arc), ))); let params = gossipsub::PeerScoreParams { ip_colocation_factor_threshold: 20.0, @@ -100,11 +109,14 @@ impl Network { // In memory transport we don't have a mechanism that sets the DHT in server mode such as confirming an address // with Autonat. This is because Autonat v1 only works with IP addresses. let force_dht_server_mode = config.memory_transport; + let swarm = new_swarm( config, Arc::clone(&contacts), params.clone(), force_dht_server_mode, + #[cfg(feature = "kad")] + (Arc::clone(&verifier) as Arc), ); let local_peer_id = *Swarm::local_peer_id(&swarm); @@ -128,7 +140,7 @@ impl Network { update_scores, Arc::clone(&contacts), #[cfg(feature = "kad")] - dht_verifier, + (Arc::clone(&verifier) as Arc), force_dht_server_mode, dht_quorum, #[cfg(feature = "metrics")] @@ -522,6 +534,16 @@ impl NetworkInterface for Network { Ok(filtered_peers) } + fn get_peers_by_validator( + &self, + validator_address: &Address, + include_unverified: bool, + ) -> HashSet { + self.contacts + .read() + .get_validator_peer_ids(validator_address, include_unverified) + } + fn peer_provides_required_services(&self, peer_id: PeerId) -> bool { if let Some(peer_info) = self.connected_peers.read().get(&peer_id) { peer_info.get_services().contains(self.required_services) @@ -743,4 +765,16 @@ impl NetworkInterface for Network { output_rx.await? } + + fn register_validator_signing_callback( + &self, + callback: impl Fn(Self::PeerId, u64) -> TaggedSigned, KeyPair> + + Send + + Sync + + 'static, + ) { + self.contacts + .write() + .register_validator_signing_callback(callback) + } } diff --git a/network-libp2p/src/network_types.rs b/network-libp2p/src/network_types.rs index 0579b1c8bf..ce66771a23 100644 --- a/network-libp2p/src/network_types.rs +++ b/network-libp2p/src/network_types.rs @@ -15,10 +15,10 @@ use nimiq_network_interface::{ network::{CloseReason, MsgAcceptance, PubsubId, Topic}, peer_info::Services, request::{RequestError, RequestType}, + validator_record::ValidatorRecord, }; use nimiq_serde::{Deserialize, DeserializeError}; use nimiq_utils::tagged_signing::{TaggedSignable, TaggedSigned}; -use nimiq_validator_network::validator_record::ValidatorRecord; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; diff --git a/network-libp2p/src/swarm.rs b/network-libp2p/src/swarm.rs index cb1646b627..70b770f388 100644 --- a/network-libp2p/src/swarm.rs +++ b/network-libp2p/src/swarm.rs @@ -45,7 +45,10 @@ use crate::network_metrics::NetworkMetrics; use crate::{ autonat::NatStatus, behaviour, dht, - discovery::{self, peer_contacts::PeerContactBook}, + discovery::{ + self, + peer_contacts::{PeerContactBook, ValidatorRecordVerifier}, + }, network_types::{ DhtBootStrapState, DhtRecord, DhtResults, GossipsubTopicInfo, NetworkAction, TaskState, ValidateMessage, @@ -63,7 +66,7 @@ struct EventInfo<'a> { connected_peers: &'a RwLock>, rate_limiting: &'a mut RateLimits, #[cfg(feature = "kad")] - dht_verifier: &'a dyn dht::Verifier, + dht_verifier: Arc, #[cfg(feature = "metrics")] metrics: &'a Arc, } @@ -73,6 +76,7 @@ pub(crate) fn new_swarm( contacts: Arc>, peer_score_params: gossipsub::PeerScoreParams, force_dht_server_mode: bool, + #[cfg(feature = "kad")] verifier: Arc, ) -> Swarm { let keypair = config.keypair.clone(); let transport = new_transport( @@ -83,8 +87,14 @@ pub(crate) fn new_swarm( ) .unwrap(); - let behaviour = - behaviour::Behaviour::new(config, contacts, peer_score_params, force_dht_server_mode); + let behaviour = behaviour::Behaviour::new( + config, + contacts, + peer_score_params, + force_dht_server_mode, + #[cfg(feature = "kad")] + verifier, + ); // TODO add proper config #[cfg(not(target_family = "wasm"))] @@ -114,7 +124,7 @@ pub(crate) async fn swarm_task( connected_peers: Arc>>, mut update_scores: Interval, contacts: Arc>, - #[cfg(feature = "kad")] dht_verifier: impl dht::Verifier, + #[cfg(feature = "kad")] dht_verifier: Arc, force_dht_server_mode: bool, dht_quorum: NonZeroU8, #[cfg(feature = "metrics")] metrics: Arc, @@ -162,7 +172,7 @@ pub(crate) async fn swarm_task( connected_peers: &connected_peers, rate_limiting: &mut rate_limiting, #[cfg(feature = "kad")] - dht_verifier: &dht_verifier, + dht_verifier: Arc::clone(&dht_verifier), #[cfg( feature = "metrics")] metrics: &metrics, }, ); diff --git a/network-libp2p/tests/discovery.rs b/network-libp2p/tests/discovery.rs index 900dbdce01..fb5768cb7e 100644 --- a/network-libp2p/tests/discovery.rs +++ b/network-libp2p/tests/discovery.rs @@ -20,7 +20,7 @@ use nimiq_hash::Blake2bHash; use nimiq_network_interface::peer_info::Services; use nimiq_network_libp2p::discovery::{ self, - peer_contacts::{PeerContact, PeerContactBook, SignedPeerContact}, + peer_contacts::{PeerContact, PeerContactBook, SignedPeerContact, ValidatorRecordVerifier}, }; use nimiq_test_log::test; use nimiq_utils::spawn; @@ -72,18 +72,24 @@ impl TestNode { .unwrap() .as_secs(), ) - .expect("PeerContact must be creatable") - .sign(&keypair); + .expect("PeerContact must be creatable"); let peer_contact_book = Arc::new(RwLock::new(PeerContactBook::new( peer_contact, + keypair.clone(), false, true, true, + #[cfg(feature = "kad")] + (Arc::new(()) as Arc), ))); - let behaviour = - discovery::Behaviour::new(config, keypair.clone(), Arc::clone(&peer_contact_book)); + let behaviour = discovery::Behaviour::new( + config, + keypair.clone(), + Arc::clone(&peer_contact_book), + Arc::new(()), + ); let mut swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() @@ -123,6 +129,11 @@ impl TestNode { } fn random_peer_contact(n: usize, services: Services) -> SignedPeerContact { + let (keypair, peer_contact) = random_peer_key_and_contact(n, services); + peer_contact.sign(&keypair) +} + +fn random_peer_key_and_contact(n: usize, services: Services) -> (Keypair, PeerContact) { let keypair = Keypair::generate_ed25519(); let peer_contact = PeerContact::new( @@ -140,9 +151,8 @@ fn random_peer_contact(n: usize, services: Services) -> SignedPeerContact { ) .expect("PeerContact must be creatable"); - peer_contact.sign(&keypair) + (keypair, peer_contact) } - fn test_peers_in_contact_book( peer_contact_book: &PeerContactBook, peer_contacts: &[SignedPeerContact], @@ -253,18 +263,21 @@ pub async fn test_dialing_peer_from_contacts() { #[test] fn test_housekeeping() { + let (keypair, peer_contact) = random_peer_key_and_contact(1, Services::FULL_BLOCKS); let mut peer_contact_book = PeerContactBook::new( - random_peer_contact(1, Services::FULL_BLOCKS), + peer_contact, + keypair, false, true, true, + #[cfg(feature = "kad")] + (Arc::new(()) as Arc), ); let fresh_contact = random_peer_contact(1, Services::FULL_BLOCKS); let old_contact = { let keypair = Keypair::generate_ed25519(); - let peer_contact = PeerContact::new( Some("/dns/test_old.local/tcp/443/wss".parse().unwrap()), keypair.public(), @@ -272,8 +285,8 @@ fn test_housekeeping() { SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() - .as_secs() - .saturating_sub(PeerContactBook::MAX_PEER_AGE * 2), + .saturating_sub(PeerContactBook::MAX_PEER_AGE * 2) + .as_secs(), ) .expect("Peer contact must be creatable"); diff --git a/network-libp2p/tests/network.rs b/network-libp2p/tests/network.rs index 8fec7df3ef..8d805bf264 100644 --- a/network-libp2p/tests/network.rs +++ b/network-libp2p/tests/network.rs @@ -12,10 +12,14 @@ use nimiq_keys::{Address, KeyPair}; use nimiq_network_interface::{ network::{CloseReason, MsgAcceptance, Network as NetworkInterface, NetworkEvent, Topic}, peer_info::Services, + validator_record::ValidatorRecord, }; use nimiq_network_libp2p::{ dht, - discovery::{self, peer_contacts::PeerContact}, + discovery::{ + self, + peer_contacts::{PeerContact, ValidatorInfoError, ValidatorRecordVerifier}, + }, Config, Network, }; use nimiq_serde::{Deserialize, Serialize}; @@ -27,7 +31,6 @@ use nimiq_utils::{ spawn, tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSigned}, }; -use nimiq_validator_network::validator_record::ValidatorRecord; use parking_lot::RwLock; use rand::{thread_rng, Rng}; @@ -203,6 +206,26 @@ impl Verifier { } } +impl ValidatorRecordVerifier for Verifier { + fn verify_validator_record( + &self, + signed_record: &TaggedSigned< + ValidatorRecord<::PeerId>, + KeyPair, + >, + ) -> Result<(), ValidatorInfoError> { + let keys = self.keys.read(); + let public_key = keys.get(&signed_record.record.validator_address).ok_or( + ValidatorInfoError::UnknownValidator(signed_record.record.validator_address.clone()), + )?; + + signed_record + .verify(&public_key) + .then(|| ()) + .ok_or(ValidatorInfoError::InvalidSignature) + } +} + impl dht::Verifier for Verifier { fn verify( &self, @@ -228,21 +251,15 @@ impl dht::Verifier for Verifier { let validator_address = Address::deserialize_from_vec(record.key.as_ref()) .map_err(dht::DhtVerifierError::MalformedKey)?; - let keys = self.keys.read(); - let public_key = keys - .get(&validator_address) - .ok_or(dht::DhtVerifierError::UnknownValidator(validator_address))?; - - validator_record - .verify(public_key) - .then(|| { + self.verify_validator_record(&validator_record) + .map(|_| { dht::DhtRecord::Validator( - record.publisher.unwrap(), + validator_record.record.peer_id, validator_record.record, record.clone(), ) }) - .ok_or(dht::DhtVerifierError::InvalidSignature) + .map_err(|_| dht::DhtVerifierError::ValidatorInfoError) } } diff --git a/network-mock/Cargo.toml b/network-mock/Cargo.toml index b372e240b5..eeef99bd39 100644 --- a/network-mock/Cargo.toml +++ b/network-mock/Cargo.toml @@ -33,6 +33,7 @@ tokio = { version = "1.43", features = [ ] } tokio-stream = "0.1" +nimiq-keys = { workspace = true } nimiq-network-interface = { workspace = true } nimiq-serde = { workspace = true } nimiq-time = { workspace = true } diff --git a/network-mock/src/network.rs b/network-mock/src/network.rs index aebb148ec3..89b1bfbcb7 100644 --- a/network-mock/src/network.rs +++ b/network-mock/src/network.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashSet, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -8,6 +9,7 @@ use std::{ use async_trait::async_trait; use futures::{stream::BoxStream, StreamExt}; +use nimiq_keys::{Address, KeyPair}; use nimiq_network_interface::{ network::{ CloseReason, MsgAcceptance, Network, NetworkEvent, PubsubId, SubscribeEvents, Topic, @@ -17,10 +19,11 @@ use nimiq_network_interface::{ InboundRequestError, Message, OutboundRequestError, Request, RequestCommon, RequestError, RequestKind, RequestSerialize, RequestType, }, + validator_record::ValidatorRecord, }; use nimiq_serde::{Deserialize, DeserializeError, Serialize}; use nimiq_time::timeout; -use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable}; +use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSigned}; use parking_lot::{Mutex, RwLock}; use thiserror::Error; use tokio::sync::{broadcast, mpsc, oneshot}; @@ -652,4 +655,20 @@ impl Network for MockNetwork { ) -> Result, MockNetworkError> { Ok(self.get_peers()) } + + fn get_peers_by_validator( + &self, + _validator_address: &Address, + _include_unverified: bool, + ) -> HashSet { + // TODO + HashSet::new() + } + + fn register_validator_signing_callback( + &self, + _callback: impl Fn(Self::PeerId, u64) -> TaggedSigned, KeyPair>, + ) { + // TODO + } } diff --git a/validator-network/src/lib.rs b/validator-network/src/lib.rs index 58a5e035bc..0471dec57f 100644 --- a/validator-network/src/lib.rs +++ b/validator-network/src/lib.rs @@ -1,7 +1,6 @@ pub mod error; pub mod network_impl; pub mod single_response_requester; -pub mod validator_record; use async_trait::async_trait; use futures::stream::BoxStream; @@ -9,8 +8,10 @@ use nimiq_keys::{Address, KeyPair}; use nimiq_network_interface::{ network::{CloseReason, MsgAcceptance, Network, SubscribeEvents, Topic}, request::{Message, Request, RequestCommon}, + validator_record::ValidatorRecord, }; use nimiq_primitives::slots_allocation::Validators; +use nimiq_utils::tagged_signing::TaggedSigned; pub use crate::error::NetworkError; @@ -98,4 +99,17 @@ pub trait ValidatorNetwork: Send + Sync { /// Returns the network peer ID for the given `validator_id` if it is known. fn get_peer_id(&self, validator_id: u16) -> Option<::PeerId>; + + /// Registers a callback to produce a signed ValidatorRecord from a given peer_id and timestamp. + fn register_validator_signing_callback( + &self, + callback: impl Fn( + ::PeerId, + u64, + ) + -> TaggedSigned::PeerId>, KeyPair> + + Send + + Sync + + 'static, + ); } diff --git a/validator-network/src/network_impl.rs b/validator-network/src/network_impl.rs index d71cc3635c..e4fe662a4b 100644 --- a/validator-network/src/network_impl.rs +++ b/validator-network/src/network_impl.rs @@ -1,21 +1,28 @@ -use std::{collections::BTreeMap, error::Error, fmt::Debug, future, sync::Arc}; +use std::{ + collections::{BTreeMap, HashSet}, + error::Error, + fmt::Debug, + future, + sync::Arc, +}; use async_trait::async_trait; -use futures::{future::BoxFuture, stream::BoxStream, FutureExt, StreamExt, TryFutureExt}; +use futures::{future::BoxFuture, stream::BoxStream, FutureExt, StreamExt}; use log::warn; use nimiq_keys::{Address, KeyPair}; use nimiq_network_interface::{ network::{CloseReason, MsgAcceptance, Network, SubscribeEvents, Topic}, request::{InboundRequestError, Message, Request, RequestCommon, RequestError}, + validator_record::ValidatorRecord, }; -use nimiq_primitives::slots_allocation::{Validator, Validators}; +use nimiq_primitives::slots_allocation::Validators; use nimiq_serde::{Deserialize, Serialize}; -use nimiq_utils::spawn; +use nimiq_utils::{spawn, stream::FuturesUnordered, tagged_signing::TaggedSigned}; use parking_lot::RwLock; use time::OffsetDateTime; use super::{MessageStream, NetworkError, PubsubId, ValidatorNetwork}; -use crate::validator_record::ValidatorRecord; +use crate::NetworkError::UnknownValidator; /// Validator `PeerId` cache state #[derive(Clone, Copy)] @@ -107,16 +114,6 @@ where self.own_validator_id.read().ok_or(NetworkError::NotElected) } - /// Given the Validators and a validator_id, returns the Validator represented by the id if it exists. - /// None otherwise. - fn get_validator(validators: Option<&Validators>, validator_id: u16) -> Option<&Validator> { - // Acquire read on the validators and make sure they have been set. Return None otherwise. - validators.and_then(|validators| { - (usize::from(validator_id) < validators.num_validators()) - .then(|| validators.get_validator_by_slot_band(validator_id)) - }) - } - /// Looks up the peer ID for a validator address in the DHT. async fn resolve_peer_id( network: &N, @@ -136,14 +133,11 @@ where network: &N, validator_address: &Address, ) -> Result, NetworkError> { - if let Some(record) = network + let peer_id = network .dht_get::<_, ValidatorRecord, KeyPair>(validator_address) .await? - { - Ok(Some(record.peer_id)) - } else { - Ok(None) - } + .map(|record| record.peer_id); + Ok(peer_id) } /// Looks up the peer ID for a validator address in the DHT and updates @@ -154,24 +148,25 @@ where /// /// The given `validator_id` is used for logging purposes only. async fn update_peer_id_cache(&self, validator_id: u16, validator_address: &Address) { - let cache_value = match Self::resolve_peer_id( + let result = Self::resolve_peer_id( &self.network, validator_address, Arc::clone(&self.dht_fallback), ) - .await - { + .await; + + let mut cache_value = match result { Ok(Some(peer_id)) => { - log::trace!( + log::debug!( %peer_id, validator_id, %validator_address, - "Resolved validator peer ID" + "Resolved validator peer ID from DHT" ); Ok(peer_id) } Ok(None) => { - log::debug!(validator_id, %validator_address, "Unable to resolve validator peer ID: Entry not found in DHT"); + log::debug!(validator_id, %validator_address, "Unable to resolve validator peer ID from DHT: Entry not found"); Err(()) } Err(error) => { @@ -179,12 +174,27 @@ where validator_id, ?error, %validator_address, - "Unable to resolve validator peer ID: Network error" + "Unable to resolve validator peer ID from DHT: Network error" ); Err(()) } }; + // If the DHT lookup failed, check the peer contact book for a verified peer id. + if cache_value.is_err() { + cache_value = self + .get_verified_validator_peer_id(validator_id) + .inspect(|peer_id| { + log::debug!( + %peer_id, + validator_id, + %validator_address, + "Resolved validator peer ID from contact book" + ); + }) + .ok_or(()); + } + match self .validator_peer_id_cache .write() @@ -206,12 +216,11 @@ where /// Look up the peer ID for a validator ID. fn get_validator_cache(&self, validator_id: u16) -> CacheState { - let validators = self.validators.read(); - let Some(validator) = Self::get_validator(validators.as_ref(), validator_id) else { + let Some(validator_address) = self.get_validator_address(validator_id) else { return CacheState::Error(None); }; - if let Some(cache_state) = self.validator_peer_id_cache.read().get(&validator.address) { + if let Some(cache_state) = self.validator_peer_id_cache.read().get(&validator_address) { match *cache_state { CacheState::Resolved(..) => return *cache_state, CacheState::Error(..) => {} @@ -228,7 +237,7 @@ where { // Re-check the validator Peer ID cache with the write lock taken and update it if necessary let mut validator_peer_id_cache = self.validator_peer_id_cache.write(); - if let Some(cache_state) = validator_peer_id_cache.get_mut(&validator.address) { + if let Some(cache_state) = validator_peer_id_cache.get_mut(&validator_address) { new_cache_state = match *cache_state { CacheState::Resolved(..) => return *cache_state, CacheState::Error(prev_peer_id) => { @@ -248,9 +257,9 @@ where } else { new_cache_state = CacheState::InProgress(None); // No cache entry for this validator ID: we are going to perform the DHT query - validator_peer_id_cache.insert(validator.address.clone(), new_cache_state); + validator_peer_id_cache.insert(validator_address.clone(), new_cache_state); log::debug!( - ?validator.address, + %validator_address, validator_id, "No cache entry found, querying DHT", ); @@ -258,7 +267,6 @@ where } let self_ = self.arc_clone(); - let validator_address = validator.address.clone(); spawn(async move { Self::update_peer_id_cache(&self_, validator_id, &validator_address).await; }); @@ -280,14 +288,13 @@ where // Fetch the validator from the validators. If it does not exist that peer_id is not // assigned in this epoch and there is no cached entry to clear. - let validators = self.validators.read(); - let Some(validator) = Self::get_validator(validators.as_ref(), validator_id) else { + let Some(validator_address) = self.get_validator_address(validator_id) else { return; }; // Fetch the cache. If it does not exist there is no need to clear. let mut validator_peer_id_cache = self.validator_peer_id_cache.write(); - let Some(cache_entry) = validator_peer_id_cache.get_mut(&validator.address) else { + let Some(cache_entry) = validator_peer_id_cache.get_mut(&validator_address) else { return; }; @@ -298,6 +305,45 @@ where } } } + + /// Fetches all peer ids (including unverified ones) from the contact book for the given validator_id. + fn get_validator_peer_ids(&self, validator_id: u16) -> HashSet { + let Some(validator_address) = self.get_validator_address(validator_id) else { + return HashSet::new(); + }; + self.network + .get_peers_by_validator(&validator_address, true) + } + + /// Fetches the verified peer id from the contact book for the given validator_id if it exists. + fn get_verified_validator_peer_id(&self, validator_id: u16) -> Option { + let validator_address = self.get_validator_address(validator_id)?; + let peer_ids = self + .network + .get_peers_by_validator(&validator_address, false); + + if peer_ids.len() > 1 { + warn!( + %validator_address, + num_peer_ids = peer_ids.len(), + "More than one verified peer id found for validator" + ); + } + + // TODO Pick latest peer id instead of random one. + peer_ids.iter().next().cloned() + } + + /// Fetches the validator address for the given validator_id. + fn get_validator_address(&self, validator_id: u16) -> Option
{ + let validators = self.validators.read(); + let address = validators + .as_ref()? + .get_validator_by_slot_band(validator_id) + .address + .clone(); + Some(address) + } } /// Messages sent over the validator network get augmented with the sending @@ -305,7 +351,7 @@ where /// /// This makes it easier for the recipient to check that the sender is indeed a /// currently elected validator. -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] struct ValidatorMessage { validator_id: u16, inner: M, @@ -380,25 +426,38 @@ where validator_id: self.local_validator_id()?, inner: msg, }; + // Use the last known peer ID, knowing that it might be already outdated. - // The network doesn't have a way to know if a record is outdated but we mark + // The network doesn't have a way to know if a record is outdated, but we mark // them as potentially outdated when a request/response error happens. // If the cache has a potentially outdated value, it will be updated soon // and then available to use by future calls to this function. - let peer_id = self + let cached_peer_id = self .get_validator_cache(validator_id) - .potentially_outdated_peer_id() - .ok_or_else(|| NetworkError::UnknownValidator(validator_id))?; - - self.network - .message(msg, peer_id) - .map_err(|e| { + .potentially_outdated_peer_id(); + if let Some(peer_id) = cached_peer_id { + if let Err(e) = self.network.message(msg.clone(), peer_id).await { // The validator peer id might have changed and thus caused a connection failure. self.clear_validator_peer_id_cache_on_error(validator_id, &e, &peer_id); + } else { + return Ok(()); + } + } - NetworkError::Request(e) - }) - .await + // Try all validator peer_ids from our peer contact book. + let mut futures = FuturesUnordered::new(); + for peer_id in self.get_validator_peer_ids(validator_id) { + let network = Arc::clone(&self.network); + let msg = msg.clone(); + futures.push(async move { network.message(msg, peer_id).await }); + } + + let results = futures.collect::>>().await; + if results.iter().any(|result| result.is_ok()) { + Ok(()) + } else { + Err(UnknownValidator(validator_id)) + } } async fn request( @@ -413,19 +472,27 @@ where validator_id: self.local_validator_id()?, inner: request, }; + if let Some(peer_id) = self.get_validator_cache(validator_id).current_peer_id() { - self.network - .request(request, peer_id) - .map_err(|e| { - // The validator peer id might have changed and thus caused a connection failure. - self.clear_validator_peer_id_cache_on_error(validator_id, &e, &peer_id); + match self.network.request(request.clone(), peer_id).await { + Ok(response) => return Ok(response), + Err(e) => self.clear_validator_peer_id_cache_on_error(validator_id, &e, &peer_id), + } + } - NetworkError::Request(e) - }) - .await - } else { - Err(NetworkError::Unreachable) + // Try all validator peer_ids from our peer contact book. + let mut futures = FuturesUnordered::new(); + for peer_id in self.get_validator_peer_ids(validator_id) { + let network = Arc::clone(&self.network); + let request = request.clone(); + futures.push(async move { network.request(request, peer_id).await }); } + + futures + .filter_map(|result| future::ready(result.ok())) + .next() + .await + .ok_or(UnknownValidator(validator_id)) } fn receive(&self) -> MessageStream @@ -439,17 +506,20 @@ where .filter_map(move |(message, peer_id)| { let self_ = self_.arc_clone(); async move { - let validator_peer_id = self_.get_validator_cache(message.validator_id).potentially_outdated_peer_id(); + let cached_peer_id = self_.get_validator_cache(message.validator_id).potentially_outdated_peer_id(); + let peer_ids = self_.get_validator_peer_ids(message.validator_id); + // Check that each message actually comes from the peer that it // claims it comes from. Reject it otherwise. - if validator_peer_id + if cached_peer_id .as_ref() .map(|pid| *pid != peer_id) - .unwrap_or(true) + .unwrap_or(true) && !peer_ids.contains(&peer_id) { - warn!(%peer_id, ?validator_peer_id, claimed_validator_id = message.validator_id, "Dropping validator message"); + warn!(%peer_id, ?cached_peer_id, ?peer_ids, claimed_validator_id = message.validator_id, "Dropping validator message"); return None; } + Some((message.inner, message.validator_id)) } }), @@ -466,18 +536,22 @@ where .filter_map(move |(message, request_id, peer_id)| { let self_ = self_.arc_clone(); async move { - let validator_peer_id = self_.get_validator_cache(message.validator_id).potentially_outdated_peer_id(); + let cached_peer_id = self_.get_validator_cache(message.validator_id).potentially_outdated_peer_id(); + let peer_ids = self_.get_validator_peer_ids(message.validator_id); + // Check that each message actually comes from the peer that it // claims it comes from. Reject it otherwise. - if validator_peer_id + if cached_peer_id .as_ref() .map(|pid| *pid != peer_id) - .unwrap_or(true) + .unwrap_or(true) && !peer_ids.contains(&peer_id) { - warn!(%peer_id, ?validator_peer_id, claimed_validator_id = message.validator_id, "Dropping validator request"); + warn!(%peer_id, ?cached_peer_id, ?peer_ids, claimed_validator_id = message.validator_id, "Dropping validator message"); return None; } + Some((message.inner, request_id, message.validator_id)) + } }) .boxed() @@ -548,4 +622,19 @@ where self.get_validator_cache(validator_id) .potentially_outdated_peer_id() } + + /// Registers a callback to produce a signed ValidatorRecord from a given peer_id and timestamp. + fn register_validator_signing_callback( + &self, + callback: impl Fn( + ::PeerId, + u64, + ) + -> TaggedSigned::PeerId>, KeyPair> + + Send + + Sync + + 'static, + ) { + self.network.register_validator_signing_callback(callback) + } } diff --git a/validator/src/validator.rs b/validator/src/validator.rs index 4c007fd451..ed051ed788 100644 --- a/validator/src/validator.rs +++ b/validator/src/validator.rs @@ -32,10 +32,14 @@ use nimiq_mempool_task::MempoolTask; use nimiq_network_interface::{ network::{MsgAcceptance, Network, NetworkEvent, SubscribeEvents}, request::request_handler, + validator_record::ValidatorRecord, }; use nimiq_primitives::{coin::Coin, policy::Policy}; use nimiq_transaction_builder::TransactionBuilder; -use nimiq_utils::spawn; +use nimiq_utils::{ + spawn, + tagged_signing::{TaggedKeyPair, TaggedSigned}, +}; use nimiq_validator_network::{PubsubId, ValidatorNetwork}; use parking_lot::RwLock; #[cfg(feature = "metrics")] @@ -200,6 +204,19 @@ where .await }); + let key = signing_key.clone(); + let validator_address1 = validator_address.clone(); + network.register_validator_signing_callback(move |peer_id, timestamp| { + let record = ValidatorRecord { + timestamp, + peer_id, + validator_address: validator_address1.clone(), + }; + + let signature = key.tagged_sign(&record); + TaggedSigned::new(record, signature) + }); + Self { consensus: consensus.proxy(), blockchain, @@ -366,6 +383,22 @@ where // Inform the network about the current validator ID. self.network.set_validator_id(*self.slot_band.read()); + let key = self.signing_key(); + let validator_address = self.validator_address(); + + self.network + .register_validator_signing_callback(move |peer_id, timestamp| { + let record = ValidatorRecord { + timestamp, + peer_id, + validator_address: validator_address.clone(), + }; + + let signature = key.tagged_sign(&record); + + TaggedSigned::new(record, signature) + }); + // Set the elected validators of the current epoch in the network as well. self.network.set_validators(validators); diff --git a/validator/tests/mock.rs b/validator/tests/mock.rs index 23a5b8bd81..35055e0186 100644 --- a/validator/tests/mock.rs +++ b/validator/tests/mock.rs @@ -30,7 +30,7 @@ use nimiq_validator::aggregation::{ }; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] struct SkipBlockMessage(SerializableLevelUpdate); impl RequestCommon for SkipBlockMessage {