From 965ea177f8835b2508b5b1d7b84c78186668cb76 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Sat, 1 Mar 2025 18:40:11 +0800 Subject: [PATCH] ckb_network: prevent `EventHandler::handle_error` removed user configured public_addrs Signed-off-by: Eval EXEC --- network/src/network.rs | 89 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 77 insertions(+), 12 deletions(-) diff --git a/network/src/network.rs b/network/src/network.rs index 853deff2f9..699acb353a 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -28,6 +28,7 @@ use ckb_systemtime::{Duration, Instant}; use ckb_util::{Condvar, Mutex, RwLock}; use futures::{Future, channel::mpsc::Sender}; use ipnetwork::IpNetwork; +use p2p::multiaddr::MultiAddr; use p2p::{ SessionId, async_trait, builder::ServiceBuilder, @@ -67,6 +68,54 @@ const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100); // After 5 minutes we consider this dial hang const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300); +/// CKB node's public addresses: +/// +/// This struct holds the public addresses of the CKB node, categorized by how they were obtained. +pub struct PublicAddresses { + /// Addresses explicitly configured by the user in the ckb.toml configuration file. + /// These addresses are considered static and represent the node's intended public endpoints. + configured: HashSet, + + /// Addresses discovered dynamically at runtime through observing successful outbound connections. + /// These addresses may change over time and are managed behind a `RwLock` to allow concurrent + /// read access while providing exclusive write access for updates. Addresses that fail to connect + /// are removed from this set. + discovered: RwLock>, +} + +impl PublicAddresses { + fn new(configured: HashSet, discovered: HashSet) -> Self { + Self { + configured, + discovered: RwLock::new(discovered), + } + } + + fn all(&self) -> Vec { + self.configured + .iter() + .chain(self.discovered.read().iter()) + .cloned() + .collect() + } + + fn contains(&self, addr: &MultiAddr) -> bool { + self.discovered.read().contains(addr) || self.configured.contains(addr) + } + + fn count(&self) -> usize { + self.configured.len() + self.discovered.read().len() + } + + fn random_choose(&self) -> Option { + let addrs = self.all(); + if addrs.is_empty() { + return None; + } + addrs.into_iter().choose(&mut rand::thread_rng()) + } +} + /// The global shared state of the network module pub struct NetworkState { pub(crate) peer_registry: RwLock, @@ -76,7 +125,7 @@ pub struct NetworkState { dialing_addrs: RwLock>, /// Node public addresses, /// includes manually public addrs and remote peer observed addrs - public_addrs: RwLock>, + public_addrs: PublicAddresses, pending_observed_addrs: RwLock>, local_private_key: secio::SecioKeyPair, local_peer_id: PeerId, @@ -99,7 +148,7 @@ impl NetworkState { let local_private_key = config.fetch_private_key()?; let local_peer_id = local_private_key.peer_id(); // set max score to public addresses - let public_addrs: HashSet = config + let configured_public_addrs: HashSet = config .listen_addresses .iter() .chain(config.public_addresses.iter()) @@ -114,6 +163,9 @@ impl NetworkState { } }) .collect(); + + let discovered_public_addrs = HashSet::new(); + let public_addrs = PublicAddresses::new(configured_public_addrs, discovered_public_addrs); info!("Loading the peer store. This process may take a few seconds to complete."); let peer_store = Mutex::new(PeerStore::load_from_dir_or_default( @@ -134,7 +186,7 @@ impl NetworkState { bootnodes, peer_registry: RwLock::new(peer_registry), dialing_addrs: RwLock::new(HashMap::default()), - public_addrs: RwLock::new(public_addrs), + public_addrs, listened_addrs: RwLock::new(Vec::new()), pending_observed_addrs: RwLock::new(HashSet::default()), local_private_key, @@ -334,7 +386,7 @@ impl NetworkState { pub(crate) fn public_addrs(&self, count: usize) -> Vec { self.public_addrs - .read() + .all() .iter() .take(count) .cloned() @@ -387,7 +439,7 @@ impl NetworkState { trace!("Do not dial self: {:?}, {}", peer_id, addr); return false; } - if self.public_addrs.read().contains(addr) { + if self.public_addrs.contains(addr) { trace!( "Do not dial listened address(self): {:?}, {}", peer_id, addr @@ -499,12 +551,12 @@ impl NetworkState { pub(crate) fn try_dial_observed_addrs(&self, p2p_control: &ServiceControl) { let mut pending_observed_addrs = self.pending_observed_addrs.write(); if pending_observed_addrs.is_empty() { - let addrs = self.public_addrs.read(); - if addrs.is_empty() { + let addrs = &self.public_addrs; + if addrs.count() == 0 { return; } // random get addr - if let Some(addr) = addrs.iter().choose(&mut rand::thread_rng()) { + if let Some(addr) = addrs.random_choose() { if let Err(err) = p2p_control.dial( addr.clone(), TargetProtocol::Single(SupportProtocols::Identify.protocol_id()), @@ -608,7 +660,11 @@ impl ServiceHandle for EventHandler { async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) { match error { ServiceError::DialerError { address, error } => { - let mut public_addrs = self.network_state.public_addrs.write(); + let mut discovered_public_addrs = + self.network_state.public_addrs.discovered.write(); + + let mut user_configured_public_addrs = + self.network_state.public_addrs.configured.write(); match error { DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError( @@ -617,7 +673,7 @@ impl ServiceHandle for EventHandler { debug!("dial observed address success: {:?}", address); if let Some(ip) = multiaddr_to_socketaddr(&address) { if is_reachable(ip.ip()) { - public_addrs.insert(address); + discovered_public_addrs.insert(address); } } return; @@ -631,9 +687,18 @@ impl ServiceHandle for EventHandler { debug!("DialerError({}) {}", address, error); } } - if public_addrs.remove(&address) { + + if user_configured_public_addrs.contains(&address) { + // don't remove the public_addr, sicne its user configred in ckb.toml + warn!( + "Dial the public addr {} which is configured in ckb.toml failed, keep it.", + address + ); + } + + if discovered_public_addrs.remove(&address) { info!( - "Dial {} failed, remove it from network_state.public_addrs", + "Dial {} failed, remove it from network_state.public_addrs.discovered", address ); }