Skip to content

Commit

Permalink
Basic peer manager based on libp2p peer_store and `connection_limit…
Browse files Browse the repository at this point in the history
…s` (#126)
  • Loading branch information
dknopik authored Feb 20, 2025
1 parent f027e46 commit 8d2ae13
Show file tree
Hide file tree
Showing 9 changed files with 1,040 additions and 554 deletions.
1,028 changes: 585 additions & 443 deletions Cargo.lock

Large diffs are not rendered by default.

40 changes: 22 additions & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,26 @@ ssv_types = { path = "anchor/common/ssv_types" }
subnet_tracker = { path = "anchor/subnet_tracker" }
version = { path = "anchor/common/version" }

beacon_node_fallback = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
bls = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
eth2 = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
eth2_config = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
eth2_network_config = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
health_metrics = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
lighthouse_network = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
metrics = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
safe_arith = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
sensitive_url = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
slashing_protection = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
slot_clock = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
task_executor = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0", default-features = false, features = [
beacon_node_fallback = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
bls = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
eth2 = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
eth2_config = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
eth2_network_config = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
health_metrics = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
lighthouse_network = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
metrics = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
safe_arith = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
sensitive_url = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
slashing_protection = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
slot_clock = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
task_executor = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2", default-features = false, features = [
"tracing",
] }
types = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
unused_port = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
validator_metrics = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
validator_services = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
validator_store = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
types = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
unused_port = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
validator_metrics = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
validator_services = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
validator_store = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }

alloy = { version = "0.11.0", features = [
"sol-types",
Expand Down Expand Up @@ -121,6 +121,10 @@ zeroize = "1.8.1"
[patch.crates-io]
# todo: remove when https://github.com/supranational/blst/pull/248 is merged
blst = { git = "https://github.com/dknopik/blst", branch = "sk-conversion" }
# todo: remove when libp2p versions are aligned again. this is only needed because cargo audit crashes otherwise
libp2p = { git = "https://github.com/libp2p/rust-libp2p.git", rev = "082eb16" }
libp2p-mplex = { git = "https://github.com/libp2p/rust-libp2p.git", rev = "082eb16" }
quick-protobuf-codec = { git = "https://github.com/libp2p/rust-libp2p.git", rev = "082eb16" }

[profile.maxperf]
inherits = "release"
Expand Down
7 changes: 4 additions & 3 deletions anchor/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ethereum_ssz = { workspace = true }
ethereum_ssz_derive = { workspace = true }
futures = { workspace = true }
hex = "0.4.3"
libp2p = { version = "0.54", default-features = false, features = [
libp2p = { git = "https://github.com/libp2p/rust-libp2p.git", rev = "082eb16", default-features = false, features = [
"identify",
"yamux",
"noise",
Expand All @@ -23,6 +23,7 @@ libp2p = { version = "0.54", default-features = false, features = [
"gossipsub",
"quic",
"ping",
"peer-store",
"request-response",
] }
lighthouse_network = { workspace = true }
Expand All @@ -40,7 +41,7 @@ version = { workspace = true }

[dev-dependencies]
async-channel = { workspace = true }
libp2p-swarm = { version = "0.45.1", features = ["macros"] }
libp2p-swarm-test = { version = "0.4.0" }
libp2p-swarm = { git = "https://github.com/libp2p/rust-libp2p.git", rev = "082eb16", features = ["macros"] }
libp2p-swarm-test = { git = "https://github.com/libp2p/rust-libp2p.git", rev = "082eb16" }
tokio = { workspace = true, features = ["rt", "macros", "time"] }
tracing-subscriber = { workspace = true }
3 changes: 3 additions & 0 deletions anchor/network/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::discovery::Discovery;
use crate::handshake;
use crate::peer_manager::PeerManager;
use libp2p::swarm::NetworkBehaviour;
use libp2p::{gossipsub, identify, ping};

Expand All @@ -13,6 +14,8 @@ pub struct AnchorBehaviour {
pub gossipsub: gossipsub::Behaviour,
/// Discv5 Discovery protocol.
pub discovery: Discovery,
/// Anchor peer manager, wrapping libp2p behaviours with minimal added logic for peer selection.
pub peer_manager: PeerManager,

pub handshake: handshake::Behaviour,
}
11 changes: 5 additions & 6 deletions anchor/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use futures::StreamExt;
use libp2p::bytes::Bytes;
use libp2p::core::transport::PortUse;
use libp2p::core::Endpoint;
use libp2p::swarm::dummy::ConnectionHandler;
use libp2p::swarm::{
ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
dummy, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
THandlerOutEvent, ToSwarm,
};
use lighthouse_network::discovery::enr_ext::{QUIC6_ENR_KEY, QUIC_ENR_KEY};
Expand Down Expand Up @@ -432,7 +431,7 @@ impl Discovery {

impl NetworkBehaviour for Discovery {
// Discovery is not a real NetworkBehaviour...
type ConnectionHandler = ConnectionHandler;
type ConnectionHandler = dummy::ConnectionHandler;
type ToSwarm = DiscoveredPeers;

fn handle_established_inbound_connection(
Expand All @@ -442,7 +441,7 @@ impl NetworkBehaviour for Discovery {
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(ConnectionHandler)
Ok(dummy::ConnectionHandler)
}

fn handle_established_outbound_connection(
Expand All @@ -453,7 +452,7 @@ impl NetworkBehaviour for Discovery {
_role_override: Endpoint,
_port_use: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(ConnectionHandler)
Ok(dummy::ConnectionHandler)
}

fn on_swarm_event(&mut self, event: FromSwarm) {
Expand Down Expand Up @@ -592,7 +591,7 @@ pub fn subnet_predicate(subnets: Vec<SubnetId>) -> impl Fn(&Enr) -> bool + Send
.any(|&s| committee_bitfield.get(*s as usize).unwrap_or(false));

if !predicate {
debug!(
trace!(
peer_id = %enr.peer_id(),
"Peer found but not on any of the desired subnets",
);
Expand Down
29 changes: 8 additions & 21 deletions anchor/network/src/handshake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,10 @@ pub fn handle_event(
match event {
Event::Message {
peer,
message:
Message::Request {
request_id: _,
request,
channel,
},
message: Message::Request {
request, channel, ..
},
..
} => Some(handle_request(
our_node_info,
behaviour,
Expand All @@ -89,25 +87,14 @@ pub fn handle_event(
)),
Event::Message {
peer,
message:
Message::Response {
request_id: _,
response,
},
message: Message::Response { response, .. },
..
} => Some(handle_response(our_node_info, peer, response)),
Event::OutboundFailure {
peer,
request_id: _,
error,
} => Some(Err(Failed {
Event::OutboundFailure { peer, error, .. } => Some(Err(Failed {
peer_id: peer,
error: Box::new(Error::Outbound(error)),
})),
Event::InboundFailure {
peer,
request_id: _,
error,
} => Some(Err(Failed {
Event::InboundFailure { peer, error, .. } => Some(Err(Failed {
peer_id: peer,
error: Box::new(Error::Inbound(error)),
})),
Expand Down
2 changes: 2 additions & 0 deletions anchor/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod discovery;
mod handshake;
mod keypair_utils;
mod network;
mod peer_manager;
mod transport;
pub use config::Config;
pub use lighthouse_network::{ListenAddr, ListenAddress};
Expand All @@ -14,3 +15,4 @@ pub use network::Network;
pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;

pub const SUBNET_COUNT: usize = 128;
type SubnetBits = [u8; SUBNET_COUNT / 8];
104 changes: 41 additions & 63 deletions anchor/network/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::num::{NonZeroU8, NonZeroUsize};
use std::pin::Pin;
use std::time::Duration;
use std::time::{Duration, Instant};

use futures::StreamExt;
use libp2p::core::muxing::StreamMuxerBox;
Expand All @@ -15,21 +16,21 @@ use libp2p::{
};
use lighthouse_network::discovery::DiscoveredPeers;
use lighthouse_network::discv5::enr::k256::sha2::{Digest, Sha256};
use lighthouse_network::EnrExt;
use ssv_types::message::SignedSSVMessage;
use ssz::Decode;
use subnet_tracker::{SubnetEvent, SubnetId};
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info};

use crate::behaviour::AnchorBehaviour;
use crate::behaviour::AnchorBehaviourEvent;
use crate::discovery::{Discovery, DiscoveryError, FIND_NODE_QUERY_CLOSEST_PEERS};
use crate::handshake::node_info::{NodeInfo, NodeMetadata};
use crate::keypair_utils::load_private_key;
use crate::peer_manager::{PeerManager, SubnetConnectActions};
use crate::transport::build_transport;
use crate::{handshake, Config};
use crate::{handshake, Config, Enr};

use crate::network::NetworkError::{Gossipsub, SwarmConfig};
use thiserror::Error;
Expand Down Expand Up @@ -135,17 +136,6 @@ impl Network {

/// Main loop for polling and handling swarm and channels.
pub async fn run(mut self) {
let topic = IdentTopic::new("ssv.v2.9");

match self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
Err(e) => {
warn!(topic = %topic, "error" = ?e, "Failed to subscribe to topic");
}
Ok(_) => {
debug!(topic = %topic, "Subscribed to topic");
}
}

loop {
tokio::select! {
swarm_message = self.swarm.select_next_some() => {
Expand Down Expand Up @@ -179,21 +169,8 @@ impl Network {
}
// TODO handle gossipsub events
},
// Inform the peer manager about discovered peers.
//
// The peer manager will subsequently decide which peers need to be dialed and then dial
// them.
AnchorBehaviourEvent::Discovery(DiscoveredPeers { peers }) => {
//self.peer_manager_mut().peers_discovered(peers);
debug!(peers = ?peers, "Peers discovered");
for (enr, _) in peers {
for tcp in enr.multiaddr_tcp() {
trace!(address = ?tcp, "Dialing peer");
if let Err(e) = self.swarm.dial(tcp.clone()) {
error!(address = ?tcp, error = ?e, "Error dialing peer");
}
}
}
self.on_discovered_peers(peers);
}
AnchorBehaviourEvent::Handshake(event) => {
if let Some(result) = handshake::handle_event(
Expand Down Expand Up @@ -240,6 +217,19 @@ impl Network {
}
}

fn on_discovered_peers(&mut self, peers: HashMap<Enr, Option<Instant>>) {
debug!(peers = ?peers, "Peers discovered");
let manager = self.peer_manager();
// need to collect to avoid double borrow
let to_dial = peers
.into_iter()
.filter_map(|(enr, _)| manager.discovered_peer(enr))
.collect::<Vec<_>>();
for dial in to_dial {
let _ = self.swarm.dial(dial);
}
}

fn on_subnet_tracker_event(&mut self, event: SubnetEvent) {
match event {
SubnetEvent::Join(subnet) => {
Expand All @@ -251,24 +241,31 @@ impl Network {
{
error!(?err, subnet = *subnet, "can't subscribe");
}
self.swarm
.behaviour_mut()
.discovery
.start_subnet_query(vec![subnet]);
let SubnetConnectActions { dial, discover } =
self.peer_manager().join_subnet(subnet);
for peer in dial {
let _ = self.swarm.dial(peer);
}
if discover {
self.swarm
.behaviour_mut()
.discovery
.start_subnet_query(vec![subnet]);
}
}
SubnetEvent::Leave(subnet) => {
if let Err(err) = self
.swarm
self.swarm
.behaviour_mut()
.gossipsub
.unsubscribe(&subnet_to_topic(subnet))
{
error!(?err, subnet = *subnet, "can't unsubscribe");
}
.unsubscribe(&subnet_to_topic(subnet));
}
}
}

fn peer_manager(&mut self) -> &mut PeerManager {
&mut self.swarm.behaviour_mut().peer_manager
}

fn handle_handshake_result(&mut self, result: Result<handshake::Completed, handshake::Failed>) {
match result {
Ok(handshake::Completed {
Expand All @@ -286,7 +283,7 @@ impl Network {
}

fn subnet_to_topic(subnet: SubnetId) -> IdentTopic {
IdentTopic::new(format!("ssv.{}", *subnet))
IdentTopic::new(format!("ssv.v2.{}", *subnet))
}

async fn build_anchor_behaviour(
Expand Down Expand Up @@ -339,13 +336,16 @@ async fn build_anchor_behaviour(
discovery
};

let handshake = handshake::create_behaviour(local_keypair.clone());
let peer_manager = PeerManager::new(network_config);

let handshake = handshake::create_behaviour(local_keypair);

Ok(AnchorBehaviour {
identify,
ping: ping::Behaviour::default(),
gossipsub,
discovery,
peer_manager,
handshake,
})
}
Expand All @@ -370,28 +370,6 @@ fn build_swarm(
let dial_concurrency_factor = NonZeroU8::new(1)
.ok_or_else(|| SwarmConfig("dial_concurrency_factor cannot be 0".to_string()))?;

// TODO: revisit once peer manager is integrated
// let connection_limits = {
// let limits = libp2p::connection_limits::ConnectionLimits::default()
// .with_max_pending_incoming(Some(5))
// .with_max_pending_outgoing(Some(16))
// .with_max_established_incoming(Some(
// (config.target_peers as f32
// * (1.0 + PEER_EXCESS_FACTOR - MIN_OUTBOUND_ONLY_FACTOR))
// .ceil() as u32,
// ))
// .with_max_established_outgoing(Some(
// (config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as u32,
// ))
// .with_max_established(Some(
// (config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR + PRIORITY_PEER_EXCESS))
// .ceil() as u32,
// ))
// .with_max_established_per_peer(Some(1));
//
// libp2p::connection_limits::Behaviour::new(limits)
// };

let swarm_config = libp2p::swarm::Config::with_executor(Executor(executor))
.with_notify_handler_buffer_size(notify_handler_buffer_size)
.with_per_connection_event_buffer_size(4)
Expand Down
Loading

0 comments on commit 8d2ae13

Please sign in to comment.