Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor stats collection and add more cucumber test #273

Merged
merged 5 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions integration_tests/src/p2pool_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,12 @@ pub async fn verify_peer_connected(world: &mut TariWorld, p2pool_name: String, p
)
.into());
}
if counter % 10 == 0 {
debug!(target: LOG_TARGET, "{}: waiting for '{}' to show peer connected", counter, connections_url);
if counter % 50 == 0 {
debug!(
target: LOG_TARGET,
"Iteration {}: waiting {:.2?} for '{}' to show peer connected",
counter, start.elapsed(), connections_url
);
}
counter += 1;

Expand Down
25 changes: 21 additions & 4 deletions integration_tests/tests/features/Sync.feature
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,35 @@
Feature: Sync p2pool nodes

@critical
Scenario: New node sync with peers
Scenario: New node sync with peers on startup
Given I have a base node BASE_NODE_A
And I have a p2pool seed node SEED in squad DOLPHINS connected to base node BASE_NODE_A
And I have a p2pool node NODE_A in squad DOLPHINS connected to base node BASE_NODE_A
And I add 10 blocks to p2pool node NODE_A
And p2pool node NODE_A stats is at height 10
# Add new node, it syncs
# Add new nodes, they sync
And I have a p2pool node NODE_B in squad DOLPHINS connected to base node BASE_NODE_A
And p2pool node NODE_A stats shows connected to peer NODE_B
And p2pool node NODE_B stats is at height 10
And I have a p2pool node NODE_C in squad DOLPHINS connected to base node BASE_NODE_A
And I have a p2pool node NODE_D in squad DOLPHINS connected to base node BASE_NODE_A
And p2pool node NODE_D stats shows connected to peer NODE_A
And p2pool node NODE_D stats shows connected to peer NODE_B
And p2pool node NODE_D stats shows connected to peer NODE_C
And p2pool node NODE_D stats is at height 10
Then I wait 1 seconds and stop

@critical
Scenario: Node will load up blocks from storage on startup
Given I have a base node BASE_NODE_A
And I have a p2pool seed node SEED in squad DOLPHINS connected to base node BASE_NODE_A
And I have a p2pool node NODE_A in squad DOLPHINS connected to base node BASE_NODE_A
And I add 10 blocks to p2pool node NODE_A
And p2pool node NODE_A stats is at height 10
# Stop the node
And I stop p2pool node NODE_A
# Start-up node again, it loads blocks from storage
And I re-start p2pool node NODE_A
And p2pool node NODE_A stats is at height 10

@critical
Scenario: New node can be offline and then sync with peers
Given I have a base node BASE_NODE_A
Expand Down
1 change: 1 addition & 0 deletions p2pool/src/cli/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ pub struct StartArgs {

#[arg(long, default_value_t = false)]
pub randomx_disabled: bool,

#[arg(long, default_value_t = false)]
pub sha3x_disabled: bool,

Expand Down
9 changes: 5 additions & 4 deletions p2pool/src/cli/commands/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,18 @@ pub async fn server(
));
let coinbase_extras_sha3x = Arc::new(RwLock::new(HashMap::<String, Vec<u8>>::new()));

let (stats_tx, stats_rx) = tokio::sync::broadcast::channel(1000);
let stats_broadcast_client = StatsBroadcastClient::new(stats_tx);
let stats_collector = StatsCollector::new(shutdown_signal.clone(), stats_rx);

let swarm = crate::server::p2p::setup::new_swarm(&config).await?;
let squad = config.p2p_service.squad_override.clone().unwrap_or_else(|| {
let squad_id =
(*swarm.local_peer_id().to_bytes().last().unwrap_or(&0) as usize) % max(1, config.p2p_service.num_squads);
format!("{}_{}", config.p2p_service.squad_prefix.clone(), squad_id)
});
info!(target: LOG_TARGET, "Swarm created. Our id: {}, our squad:{}", swarm.local_peer_id(), squad);

let (stats_tx, stats_rx) = tokio::sync::broadcast::channel(1000);
let stats_broadcast_client = StatsBroadcastClient::new(stats_tx);
let stats_collector = StatsCollector::new(shutdown_signal.clone(), stats_rx);

if let Some(path) = args.export_libp2p_info.clone() {
let libp2p_info = LibP2pInfo {
peer_id: *swarm.local_peer_id(),
Expand Down
53 changes: 28 additions & 25 deletions p2pool/src/server/http/stats_collector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::time::Duration;
use std::{fmt::Debug, time::Duration};

use human_format::Formatter;
use libp2p::PeerId;
Expand Down Expand Up @@ -92,6 +92,7 @@ impl StatsCollector {
}
}

#[allow(clippy::too_many_lines)]
fn handle_stat(&mut self, sample: StatData) {
match sample {
StatData::InfoChanged {
Expand Down Expand Up @@ -185,6 +186,7 @@ impl StatsCollector {
}
}

#[allow(clippy::too_many_lines)]
pub(crate) async fn run(&mut self) -> Result<(), anyhow::Error> {
let mut stats_report_timer = tokio::time::interval(tokio::time::Duration::from_secs(10));
stats_report_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
Expand All @@ -197,36 +199,37 @@ impl StatsCollector {
_ = stats_report_timer.tick() => {
let formatter = Formatter::new();

info!(target: LOG_TARGET,
info!(
target: LOG_TARGET,
"========= Uptime: {}. v{}, Sqd: {}, Chains: Rx {}..{}, Sha3 {}..{}. Difficulty (Target/Network): Rx: {}/{} Sha3x: {}/{} Miner accepts(rx/sha): {}/{}. Pool accepts (rx/sha) {}/{}. Peers(tot/gr/bl/non) {}/{}/{}/{} libp2p (i/o) {}/{} Last gossip: {}==== ",
humantime::format_duration(Duration::from_secs(EpochTime::now().as_u64().checked_sub(
self.first_stat_received.unwrap_or(EpochTime::now()).as_u64()
).unwrap_or_default())),
env!("CARGO_PKG_VERSION"),
self.last_squad.as_deref().unwrap_or("Not set"),
self.randomx_chain_height.saturating_sub(self.randomx_chain_length),
self.randomx_chain_height,
self.sha3x_chain_height.saturating_sub(self.sha3x_chain_length),
self.sha3x_chain_height,
formatter.format(self.randomx_target_difficulty.as_u64() as f64 ),
formatter.format(self.randomx_network_difficulty.as_u64() as f64),
formatter.format(self.sha_target_difficulty.as_u64() as f64),
formatter.format(self.sha_network_difficulty.as_u64() as f64),
self.miner_rx_accepted,
self.miner_sha_accepted,
self.pool_rx_accepted,
self.pool_sha_accepted,
self.total_peers,
self.total_grey_list,
self.total_black_list,
self.total_non_squad_peers,
self.established_incoming,
self.established_outgoing,
humantime::format_duration(Duration::from_secs(EpochTime::now().as_u64().checked_sub(
self.last_gossip_message.as_u64()
).unwrap_or_default())),
);
},
self.randomx_chain_height.saturating_sub(self.randomx_chain_length),
self.randomx_chain_height,
self.sha3x_chain_height.saturating_sub(self.sha3x_chain_length),
self.sha3x_chain_height,
formatter.format(self.randomx_target_difficulty.as_u64() as f64 ),
formatter.format(self.randomx_network_difficulty.as_u64() as f64),
formatter.format(self.sha_target_difficulty.as_u64() as f64),
formatter.format(self.sha_network_difficulty.as_u64() as f64),
self.miner_rx_accepted,
self.miner_sha_accepted,
self.pool_rx_accepted,
self.pool_sha_accepted,
self.total_peers,
self.total_grey_list,
self.total_black_list,
self.total_non_squad_peers,
self.established_incoming,
self.established_outgoing,
humantime::format_duration(Duration::from_secs(EpochTime::now().as_u64().checked_sub(
self.last_gossip_message.as_u64()
).unwrap_or_default())),
);
},
res = self.request_rx.recv() => {
match res {
Some(StatsRequest::GetStats(pow, tx)) => {
Expand Down
48 changes: 13 additions & 35 deletions p2pool/src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl PeerStore {
record.num_grey_listings = 0;

self.whitelist_peers.insert(peer_id.to_base58(), record);
self.update_peer_stats();
}

if let Some(entry) = self.blacklist_peers.get_mut(&peer_id.to_base58()) {
Expand Down Expand Up @@ -296,6 +297,7 @@ impl PeerStore {
peer_record.last_grey_list_reason = Some("Seed peer".to_string());

self.greylist_peers.insert(peer_id.to_base58(), peer_record);
self.update_peer_stats();
return AddPeerStatus::Greylisted;
}
if self.blacklist_peers.contains_key(&peer_id.to_base58()) {
Expand All @@ -314,14 +316,7 @@ impl PeerStore {
};
self.non_squad_peers
.insert(peer_id.to_base58(), PeerStoreRecord::new(peer_id, peer_info));
if return_type == AddPeerStatus::NonSquad {
let _unused = self.stats_broadcast_client.send_new_peer(
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
}
self.update_peer_stats();
return return_type;
}

Expand All @@ -337,20 +332,23 @@ impl PeerStore {
new_record.last_grey_list_reason = entry.last_grey_list_reason.clone();

*entry = new_record;
// self.whitelist_peers.insert(peer_id, PeerStoreRecord::new(peer_info));
return AddPeerStatus::Existing;
}

self.whitelist_peers
.insert(peer_id.to_base58(), PeerStoreRecord::new(peer_id, peer_info));
self.update_peer_stats();
debug!(target: LOG_TARGET, "Peer NewPeer: {}", peer_id);
AddPeerStatus::NewPeer
}

fn update_peer_stats(&self) {
let _unused = self.stats_broadcast_client.send_new_peer(
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
debug!(target: LOG_TARGET, "Peer NewPeer: {}", peer_id);
AddPeerStatus::NewPeer
}

pub fn clear_grey_list(&mut self) {
Expand All @@ -366,12 +364,7 @@ impl PeerStore {
self.whitelist_peers.insert(peer_id.clone(), record.clone());
}
}
let _unused = self.stats_broadcast_client.send_new_peer(
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
self.update_peer_stats();
}

pub fn clear_black_list(&mut self) {
Expand All @@ -381,12 +374,7 @@ impl PeerStore {
record.num_grey_listings = 0;
self.whitelist_peers.insert(peer_id, record);
}
let _unused = self.stats_broadcast_client.send_new_peer(
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
self.update_peer_stats();
}

pub fn move_to_grey_list(&mut self, peer_id: PeerId, reason: String) {
Expand All @@ -397,12 +385,7 @@ impl PeerStore {
record.last_grey_list_reason = Some(reason.clone());
record.num_grey_listings += 1;
self.greylist_peers.insert(peer_id.to_base58(), record);
let _unused = self.stats_broadcast_client.send_new_peer(
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
self.update_peer_stats();
}
}
}
Expand All @@ -419,12 +402,7 @@ impl PeerStore {
if let Some(record) = record {
warn!(target: LOG_TARGET, "Blacklisting peer {} because of: {}", peer, reason);
self.blacklist_peers.insert(peer.to_base58(), record);
let _unused = self.stats_broadcast_client.send_new_peer(
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
self.update_peer_stats();
}
}

Expand Down
Loading
Loading