Skip to content

Commit

Permalink
Add CLI flag to subscribe to all subnets
Browse files Browse the repository at this point in the history
  • Loading branch information
dknopik committed Mar 7, 2025
1 parent f030b75 commit 618db13
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 15 deletions.
8 changes: 8 additions & 0 deletions anchor/client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,14 @@ pub struct Anchor {
display_order = 0
)]
pub enr_quic6_port: Option<NonZeroU16>,

#[clap(
long,
help = "Subscribe to all subnets, regardless of committee membership.",
display_order = 0,
help_heading = FLAG_HEADER,
)]
pub subscribe_all_subnets: bool,
}

pub fn get_color_style() -> Styles {
Expand Down
2 changes: 2 additions & 0 deletions anchor/client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ pub fn from_cli(cli_args: &Anchor) -> Result<Config, String> {
config.network.enr_udp6_port = cli_args.enr_udp6_port;
config.network.enr_quic6_port = cli_args.enr_quic6_port;

config.network.subscribe_all_subnets = cli_args.subscribe_all_subnets;

config.beacon_nodes_tls_certs = cli_args.beacon_nodes_tls_certs.clone();
config.execution_nodes_tls_certs = cli_args.execution_nodes_tls_certs.clone();

Expand Down
8 changes: 6 additions & 2 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,12 @@ impl Client {
.map_err(|e| format!("Unable to open Anchor database: {e}"))?,
);

let subnet_tracker =
start_subnet_tracker(database.watch(), network::SUBNET_COUNT, &executor);
let subnet_tracker = start_subnet_tracker(
database.watch(),
network::SUBNET_COUNT,
config.network.subscribe_all_subnets,
&executor,
);

// Initialize slashing protection.
let slashing_db_path = config.data_dir.join(SLASHING_PROTECTION_FILENAME);
Expand Down
4 changes: 4 additions & 0 deletions anchor/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ pub struct Config {
/// Disables quic support.
pub disable_quic_support: bool,

/// Subscribe to all subnets regardless of committee membership.
pub subscribe_all_subnets: bool,

/// List of extra topics to initially subscribe to as strings.
pub topics: Vec<GossipKind>,

Expand Down Expand Up @@ -101,6 +104,7 @@ impl Default for Config {
disable_peer_scoring: false,
disable_discovery: false,
disable_quic_support: false,
subscribe_all_subnets: false,
topics: vec![],
domain_type: DomainType::default(),
}
Expand Down
10 changes: 2 additions & 8 deletions anchor/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,8 @@ impl<V: ValidatorService> Network<V> {
}
}
},
event = self.subnet_event_receiver.recv() => {
match event {
Some(event) => self.on_subnet_tracker_event(event),
None => {
error!("subnet tracker has quit");
return;
}
}
Some(event) = self.subnet_event_receiver.recv() => {
self.on_subnet_tracker_event(event)
}
event = self.message_rx.recv() => {
match event {
Expand Down
21 changes: 16 additions & 5 deletions anchor/subnet_tracker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::{mpsc, watch};
use tokio::time::sleep;
use tracing::{debug, warn};
use tracing::{debug, error, warn};

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
Expand Down Expand Up @@ -52,12 +52,23 @@ pub enum SubnetEvent {
pub fn start_subnet_tracker(
db: watch::Receiver<NetworkState>,
subnet_count: usize,
subscribe_all_subnets: bool,
executor: &TaskExecutor,
) -> mpsc::Receiver<SubnetEvent> {
// a channel capacity of 1 is fine - the subnet_tracker does not do anything else, it can wait.
let (tx, rx) = mpsc::channel(1);
executor.spawn(subnet_tracker(tx, db, subnet_count), "subnet_tracker");
rx
if !subscribe_all_subnets {
// a channel capacity of 1 is fine - the subnet_tracker does not do anything else, it can wait.
let (tx, rx) = mpsc::channel(1);
executor.spawn(subnet_tracker(tx, db, subnet_count), "subnet_tracker");
rx
} else {
let (tx, rx) = mpsc::channel(subnet_count);
for subnet in (0..(subnet_count as u64)).map(SubnetId) {
if let Err(err) = tx.try_send(SubnetEvent::Join(subnet)) {
error!(?err, "Impossible error while subscribing to all subnets");
}
}
rx
}
}

/// The main background task:
Expand Down

0 comments on commit 618db13

Please sign in to comment.