From 618db13240019a22a45a6c235f8cc1217e5c8435 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Fri, 7 Mar 2025 11:38:52 +0100 Subject: [PATCH] Add CLI flag to subscribe to all subnets --- anchor/client/src/cli.rs | 8 ++++++++ anchor/client/src/config.rs | 2 ++ anchor/client/src/lib.rs | 8 ++++++-- anchor/network/src/config.rs | 4 ++++ anchor/network/src/network.rs | 10 ++-------- anchor/subnet_tracker/src/lib.rs | 21 ++++++++++++++++----- 6 files changed, 38 insertions(+), 15 deletions(-) diff --git a/anchor/client/src/cli.rs b/anchor/client/src/cli.rs index 3adc0728..82ec8d94 100644 --- a/anchor/client/src/cli.rs +++ b/anchor/client/src/cli.rs @@ -434,6 +434,14 @@ pub struct Anchor { display_order = 0 )] pub enr_quic6_port: Option, + + #[clap( + long, + help = "Subscribe to all subnets, regardless of committee membership.", + display_order = 0, + help_heading = FLAG_HEADER, + )] + pub subscribe_all_subnets: bool, } pub fn get_color_style() -> Styles { diff --git a/anchor/client/src/config.rs b/anchor/client/src/config.rs index 8435b53f..6850e072 100644 --- a/anchor/client/src/config.rs +++ b/anchor/client/src/config.rs @@ -177,6 +177,8 @@ pub fn from_cli(cli_args: &Anchor) -> Result { config.network.enr_udp6_port = cli_args.enr_udp6_port; config.network.enr_quic6_port = cli_args.enr_quic6_port; + config.network.subscribe_all_subnets = cli_args.subscribe_all_subnets; + config.beacon_nodes_tls_certs = cli_args.beacon_nodes_tls_certs.clone(); config.execution_nodes_tls_certs = cli_args.execution_nodes_tls_certs.clone(); diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 8f92f23e..7436da2b 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -143,8 +143,12 @@ impl Client { .map_err(|e| format!("Unable to open Anchor database: {e}"))?, ); - let subnet_tracker = - start_subnet_tracker(database.watch(), network::SUBNET_COUNT, &executor); + let subnet_tracker = start_subnet_tracker( + database.watch(), + network::SUBNET_COUNT, + config.network.subscribe_all_subnets, + &executor, + ); // Initialize slashing protection. let slashing_db_path = config.data_dir.join(SLASHING_PROTECTION_FILENAME); diff --git a/anchor/network/src/config.rs b/anchor/network/src/config.rs index 99a87d2e..94a5a33f 100644 --- a/anchor/network/src/config.rs +++ b/anchor/network/src/config.rs @@ -61,6 +61,9 @@ pub struct Config { /// Disables quic support. pub disable_quic_support: bool, + /// Subscribe to all subnets regardless of committee membership. + pub subscribe_all_subnets: bool, + /// List of extra topics to initially subscribe to as strings. pub topics: Vec, @@ -101,6 +104,7 @@ impl Default for Config { disable_peer_scoring: false, disable_discovery: false, disable_quic_support: false, + subscribe_all_subnets: false, topics: vec![], domain_type: DomainType::default(), } diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 9baec1ba..108714c3 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -215,14 +215,8 @@ impl Network { } } }, - event = self.subnet_event_receiver.recv() => { - match event { - Some(event) => self.on_subnet_tracker_event(event), - None => { - error!("subnet tracker has quit"); - return; - } - } + Some(event) = self.subnet_event_receiver.recv() => { + self.on_subnet_tracker_event(event) } event = self.message_rx.recv() => { match event { diff --git a/anchor/subnet_tracker/src/lib.rs b/anchor/subnet_tracker/src/lib.rs index 0a40a934..30779280 100644 --- a/anchor/subnet_tracker/src/lib.rs +++ b/anchor/subnet_tracker/src/lib.rs @@ -8,7 +8,7 @@ use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::{mpsc, watch}; use tokio::time::sleep; -use tracing::{debug, warn}; +use tracing::{debug, error, warn}; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(transparent)] @@ -52,12 +52,23 @@ pub enum SubnetEvent { pub fn start_subnet_tracker( db: watch::Receiver, subnet_count: usize, + subscribe_all_subnets: bool, executor: &TaskExecutor, ) -> mpsc::Receiver { - // a channel capacity of 1 is fine - the subnet_tracker does not do anything else, it can wait. - let (tx, rx) = mpsc::channel(1); - executor.spawn(subnet_tracker(tx, db, subnet_count), "subnet_tracker"); - rx + if !subscribe_all_subnets { + // a channel capacity of 1 is fine - the subnet_tracker does not do anything else, it can wait. + let (tx, rx) = mpsc::channel(1); + executor.spawn(subnet_tracker(tx, db, subnet_count), "subnet_tracker"); + rx + } else { + let (tx, rx) = mpsc::channel(subnet_count); + for subnet in (0..(subnet_count as u64)).map(SubnetId) { + if let Err(err) = tx.try_send(SubnetEvent::Join(subnet)) { + error!(?err, "Impossible error while subscribing to all subnets"); + } + } + rx + } } /// The main background task: