Skip to content

Commit

Permalink
ckb_network: prevent EventHandler::handle_error removed user config…
Browse files Browse the repository at this point in the history
…ured public_addrs

Signed-off-by: Eval EXEC <execvy@gmail.com>
  • Loading branch information
eval-exec committed Mar 6, 2025
1 parent 2d30744 commit 965ea17
Showing 1 changed file with 77 additions and 12 deletions.
89 changes: 77 additions & 12 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use ckb_systemtime::{Duration, Instant};
use ckb_util::{Condvar, Mutex, RwLock};
use futures::{Future, channel::mpsc::Sender};
use ipnetwork::IpNetwork;
use p2p::multiaddr::MultiAddr;
use p2p::{
SessionId, async_trait,
builder::ServiceBuilder,
Expand Down Expand Up @@ -67,6 +68,54 @@ const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
// After 5 minutes we consider this dial hang
const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);

/// CKB node's public addresses:
///
/// This struct holds the public addresses of the CKB node, categorized by how they were obtained.
pub struct PublicAddresses {
/// Addresses explicitly configured by the user in the ckb.toml configuration file.
/// These addresses are considered static and represent the node's intended public endpoints.
configured: HashSet<MultiAddr>,

/// Addresses discovered dynamically at runtime through observing successful outbound connections.
/// These addresses may change over time and are managed behind a `RwLock` to allow concurrent
/// read access while providing exclusive write access for updates. Addresses that fail to connect
/// are removed from this set.
discovered: RwLock<HashSet<Multiaddr>>,
}

impl PublicAddresses {
fn new(configured: HashSet<MultiAddr>, discovered: HashSet<Multiaddr>) -> Self {
Self {
configured,
discovered: RwLock::new(discovered),
}
}

fn all(&self) -> Vec<MultiAddr> {
self.configured
.iter()
.chain(self.discovered.read().iter())
.cloned()
.collect()
}

fn contains(&self, addr: &MultiAddr) -> bool {
self.discovered.read().contains(addr) || self.configured.contains(addr)
}

fn count(&self) -> usize {
self.configured.len() + self.discovered.read().len()
}

fn random_choose(&self) -> Option<MultiAddr> {
let addrs = self.all();
if addrs.is_empty() {
return None;
}
addrs.into_iter().choose(&mut rand::thread_rng())
}
}

/// The global shared state of the network module
pub struct NetworkState {
pub(crate) peer_registry: RwLock<PeerRegistry>,
Expand All @@ -76,7 +125,7 @@ pub struct NetworkState {
dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
/// Node public addresses,
/// includes manually public addrs and remote peer observed addrs
public_addrs: RwLock<HashSet<Multiaddr>>,
public_addrs: PublicAddresses,
pending_observed_addrs: RwLock<HashSet<Multiaddr>>,
local_private_key: secio::SecioKeyPair,
local_peer_id: PeerId,
Expand All @@ -99,7 +148,7 @@ impl NetworkState {
let local_private_key = config.fetch_private_key()?;
let local_peer_id = local_private_key.peer_id();
// set max score to public addresses
let public_addrs: HashSet<Multiaddr> = config
let configured_public_addrs: HashSet<Multiaddr> = config
.listen_addresses
.iter()
.chain(config.public_addresses.iter())
Expand All @@ -114,6 +163,9 @@ impl NetworkState {
}
})
.collect();

let discovered_public_addrs = HashSet::new();
let public_addrs = PublicAddresses::new(configured_public_addrs, discovered_public_addrs);
info!("Loading the peer store. This process may take a few seconds to complete.");

let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
Expand All @@ -134,7 +186,7 @@ impl NetworkState {
bootnodes,
peer_registry: RwLock::new(peer_registry),
dialing_addrs: RwLock::new(HashMap::default()),
public_addrs: RwLock::new(public_addrs),
public_addrs,
listened_addrs: RwLock::new(Vec::new()),
pending_observed_addrs: RwLock::new(HashSet::default()),
local_private_key,
Expand Down Expand Up @@ -334,7 +386,7 @@ impl NetworkState {

pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
self.public_addrs
.read()
.all()
.iter()
.take(count)
.cloned()
Expand Down Expand Up @@ -387,7 +439,7 @@ impl NetworkState {
trace!("Do not dial self: {:?}, {}", peer_id, addr);
return false;
}
if self.public_addrs.read().contains(addr) {
if self.public_addrs.contains(addr) {
trace!(
"Do not dial listened address(self): {:?}, {}",
peer_id, addr
Expand Down Expand Up @@ -499,12 +551,12 @@ impl NetworkState {
pub(crate) fn try_dial_observed_addrs(&self, p2p_control: &ServiceControl) {
let mut pending_observed_addrs = self.pending_observed_addrs.write();
if pending_observed_addrs.is_empty() {
let addrs = self.public_addrs.read();
if addrs.is_empty() {
let addrs = &self.public_addrs;
if addrs.count() == 0 {
return;
}
// random get addr
if let Some(addr) = addrs.iter().choose(&mut rand::thread_rng()) {
if let Some(addr) = addrs.random_choose() {
if let Err(err) = p2p_control.dial(
addr.clone(),
TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
Expand Down Expand Up @@ -608,7 +660,11 @@ impl ServiceHandle for EventHandler {
async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
match error {
ServiceError::DialerError { address, error } => {
let mut public_addrs = self.network_state.public_addrs.write();
let mut discovered_public_addrs =
self.network_state.public_addrs.discovered.write();

let mut user_configured_public_addrs =
self.network_state.public_addrs.configured.write();

match error {
DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
Expand All @@ -617,7 +673,7 @@ impl ServiceHandle for EventHandler {
debug!("dial observed address success: {:?}", address);
if let Some(ip) = multiaddr_to_socketaddr(&address) {
if is_reachable(ip.ip()) {
public_addrs.insert(address);
discovered_public_addrs.insert(address);
}
}
return;
Expand All @@ -631,9 +687,18 @@ impl ServiceHandle for EventHandler {
debug!("DialerError({}) {}", address, error);
}
}
if public_addrs.remove(&address) {

if user_configured_public_addrs.contains(&address) {
// don't remove the public_addr, sicne its user configred in ckb.toml
warn!(
"Dial the public addr {} which is configured in ckb.toml failed, keep it.",
address
);
}

if discovered_public_addrs.remove(&address) {
info!(
"Dial {} failed, remove it from network_state.public_addrs",
"Dial {} failed, remove it from network_state.public_addrs.discovered",
address
);
}
Expand Down

0 comments on commit 965ea17

Please sign in to comment.