Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more metrics #1176

Merged
merged 6 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
//! - Tracker REST API: the tracker API can be enabled/disabled.
use std::sync::Arc;

use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use torrust_tracker_configuration::Configuration;
use tracing::instrument;

use crate::bootstrap::jobs::{health_check_api, http_tracker, torrent_cleanup, tracker_apis, udp_tracker};
use crate::servers::registar::Registar;
use crate::servers::udp::server::banning::BanService;
use crate::{core, servers};

/// # Panics
Expand All @@ -37,8 +39,12 @@ use crate::{core, servers};
///
/// - Can't retrieve tracker keys from database.
/// - Can't load whitelist from database.
#[instrument(skip(config, tracker))]
pub async fn start(config: &Configuration, tracker: Arc<core::Tracker>) -> Vec<JoinHandle<()>> {
#[instrument(skip(config, tracker, ban_service))]
pub async fn start(
config: &Configuration,
tracker: Arc<core::Tracker>,
ban_service: Arc<RwLock<BanService>>,
) -> Vec<JoinHandle<()>> {
if config.http_api.is_none()
&& (config.udp_trackers.is_none() || config.udp_trackers.as_ref().map_or(true, std::vec::Vec::is_empty))
&& (config.http_trackers.is_none() || config.http_trackers.as_ref().map_or(true, std::vec::Vec::is_empty))
Expand Down Expand Up @@ -75,7 +81,9 @@ pub async fn start(config: &Configuration, tracker: Arc<core::Tracker>) -> Vec<J
udp_tracker_config.bind_address
);
} else {
jobs.push(udp_tracker::start_job(udp_tracker_config, tracker.clone(), registar.give_form()).await);
jobs.push(
udp_tracker::start_job(udp_tracker_config, tracker.clone(), ban_service.clone(), registar.give_form()).await,
);
}
}
} else {
Expand Down Expand Up @@ -105,6 +113,7 @@ pub async fn start(config: &Configuration, tracker: Arc<core::Tracker>) -> Vec<J
if let Some(job) = tracker_apis::start_job(
http_api_config,
tracker.clone(),
ban_service.clone(),
registar.give_form(),
servers::apis::Version::V1,
)
Expand Down
9 changes: 7 additions & 2 deletions src/bootstrap/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//! 4. Initialize the domain tracker.
use std::sync::Arc;

use tokio::sync::RwLock;
use torrust_tracker_clock::static_time;
use torrust_tracker_configuration::validator::Validator;
use torrust_tracker_configuration::Configuration;
Expand All @@ -22,6 +23,8 @@ use super::config::initialize_configuration;
use crate::bootstrap;
use crate::core::services::tracker_factory;
use crate::core::Tracker;
use crate::servers::udp::server::banning::BanService;
use crate::servers::udp::server::launcher::MAX_CONNECTION_ID_ERRORS_PER_IP;
use crate::shared::crypto::ephemeral_instance_keys;
use crate::shared::crypto::keys::{self, Keeper as _};

Expand All @@ -32,7 +35,7 @@ use crate::shared::crypto::keys::{self, Keeper as _};
/// Setup can file if the configuration is invalid.
#[must_use]
#[instrument(skip())]
pub fn setup() -> (Configuration, Arc<Tracker>) {
pub fn setup() -> (Configuration, Arc<Tracker>, Arc<RwLock<BanService>>) {
#[cfg(not(test))]
check_seed();

Expand All @@ -44,9 +47,11 @@ pub fn setup() -> (Configuration, Arc<Tracker>) {

let tracker = initialize_with_configuration(&configuration);

let udp_ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP)));

tracing::info!("Configuration:\n{}", configuration.clone().mask_secrets().to_json());

(configuration, tracker)
(configuration, tracker, udp_ban_service)
}

/// checks if the seed is the instance seed in production.
Expand Down
18 changes: 13 additions & 5 deletions src/bootstrap/jobs/tracker_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::net::SocketAddr;
use std::sync::Arc;

use axum_server::tls_rustls::RustlsConfig;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use torrust_tracker_configuration::{AccessTokens, HttpApi};
use tracing::instrument;
Expand All @@ -33,6 +34,7 @@ use crate::core;
use crate::servers::apis::server::{ApiServer, Launcher};
use crate::servers::apis::Version;
use crate::servers::registar::ServiceRegistrationForm;
use crate::servers::udp::server::banning::BanService;

/// This is the message that the "launcher" spawned task sends to the main
/// application process to notify the API server was successfully started.
Expand All @@ -54,10 +56,11 @@ pub struct ApiServerJobStarted();
/// It would panic if unable to send the `ApiServerJobStarted` notice.
///
///
#[instrument(skip(config, tracker, form))]
#[instrument(skip(config, tracker, ban_service, form))]
pub async fn start_job(
config: &HttpApi,
tracker: Arc<core::Tracker>,
ban_service: Arc<RwLock<BanService>>,
form: ServiceRegistrationForm,
version: Version,
) -> Option<JoinHandle<()>> {
Expand All @@ -70,21 +73,22 @@ pub async fn start_job(
let access_tokens = Arc::new(config.access_tokens.clone());

match version {
Version::V1 => Some(start_v1(bind_to, tls, tracker.clone(), form, access_tokens).await),
Version::V1 => Some(start_v1(bind_to, tls, tracker.clone(), ban_service.clone(), form, access_tokens).await),
}
}

#[allow(clippy::async_yields_async)]
#[instrument(skip(socket, tls, tracker, form, access_tokens))]
#[instrument(skip(socket, tls, tracker, ban_service, form, access_tokens))]
async fn start_v1(
socket: SocketAddr,
tls: Option<RustlsConfig>,
tracker: Arc<core::Tracker>,
ban_service: Arc<RwLock<BanService>>,
form: ServiceRegistrationForm,
access_tokens: Arc<AccessTokens>,
) -> JoinHandle<()> {
let server = ApiServer::new(Launcher::new(socket, tls))
.start(tracker, form, access_tokens)
.start(tracker, ban_service, form, access_tokens)
.await
.expect("it should be able to start to the tracker api");

Expand All @@ -98,21 +102,25 @@ async fn start_v1(
mod tests {
use std::sync::Arc;

use tokio::sync::RwLock;
use torrust_tracker_test_helpers::configuration::ephemeral_public;

use crate::bootstrap::app::initialize_with_configuration;
use crate::bootstrap::jobs::tracker_apis::start_job;
use crate::servers::apis::Version;
use crate::servers::registar::Registar;
use crate::servers::udp::server::banning::BanService;
use crate::servers::udp::server::launcher::MAX_CONNECTION_ID_ERRORS_PER_IP;

#[tokio::test]
async fn it_should_start_http_tracker() {
let cfg = Arc::new(ephemeral_public());
let config = &cfg.http_api.clone().unwrap();
let tracker = initialize_with_configuration(&cfg);
let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP)));
let version = Version::V1;

start_job(config, tracker, Registar::default().give_form(), version)
start_job(config, tracker, ban_service, Registar::default().give_form(), version)
.await
.expect("it should be able to join to the tracker api start-job");
}
Expand Down
13 changes: 10 additions & 3 deletions src/bootstrap/jobs/udp_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
//! > for the configuration options.
use std::sync::Arc;

use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use torrust_tracker_configuration::UdpTracker;
use tracing::instrument;

use crate::core;
use crate::servers::registar::ServiceRegistrationForm;
use crate::servers::udp::server::banning::BanService;
use crate::servers::udp::server::spawner::Spawner;
use crate::servers::udp::server::Server;
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
Expand All @@ -29,13 +31,18 @@ use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
/// It will panic if the task did not finish successfully.
#[must_use]
#[allow(clippy::async_yields_async)]
#[instrument(skip(config, tracker, form))]
pub async fn start_job(config: &UdpTracker, tracker: Arc<core::Tracker>, form: ServiceRegistrationForm) -> JoinHandle<()> {
#[instrument(skip(config, tracker, ban_service, form))]
pub async fn start_job(
config: &UdpTracker,
tracker: Arc<core::Tracker>,
ban_service: Arc<RwLock<BanService>>,
form: ServiceRegistrationForm,
) -> JoinHandle<()> {
let bind_to = config.bind_address;
let cookie_lifetime = config.cookie_lifetime;

let server = Server::new(Spawner::new(bind_to))
.start(tracker, form, cookie_lifetime)
.start(tracker, ban_service, form, cookie_lifetime)
.await
.expect("it should be able to start the udp tracker");

Expand Down
4 changes: 2 additions & 2 deletions src/console/profiling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ pub async fn run() {
return;
};

let (config, tracker) = bootstrap::app::setup();
let (config, tracker, ban_service) = bootstrap::app::setup();

let jobs = app::start(&config, tracker).await;
let jobs = app::start(&config, tracker, ban_service).await;

// Run the tracker for a fixed duration
let run_duration = sleep(Duration::from_secs(duration_secs));
Expand Down
21 changes: 18 additions & 3 deletions src/core/services/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ pub mod setup;

use std::sync::Arc;

use tokio::sync::RwLock;
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;

use crate::core::statistics::metrics::Metrics;
use crate::core::Tracker;
use crate::servers::udp::server::banning::BanService;

/// All the metrics collected by the tracker.
#[derive(Debug, PartialEq)]
Expand All @@ -60,28 +62,37 @@ pub struct TrackerMetrics {
}

/// It returns all the [`TrackerMetrics`]
pub async fn get_metrics(tracker: Arc<Tracker>) -> TrackerMetrics {
pub async fn get_metrics(tracker: Arc<Tracker>, ban_service: Arc<RwLock<BanService>>) -> TrackerMetrics {
let torrents_metrics = tracker.get_torrents_metrics();
let stats = tracker.get_stats().await;
let udp_banned_ips_total = ban_service.read().await.get_banned_ips_total();

TrackerMetrics {
torrents_metrics,
protocol_metrics: Metrics {
// TCP
// TCPv4
tcp4_connections_handled: stats.tcp4_connections_handled,
tcp4_announces_handled: stats.tcp4_announces_handled,
tcp4_scrapes_handled: stats.tcp4_scrapes_handled,
// TCPv6
tcp6_connections_handled: stats.tcp6_connections_handled,
tcp6_announces_handled: stats.tcp6_announces_handled,
tcp6_scrapes_handled: stats.tcp6_scrapes_handled,
// UDP
udp_requests_aborted: stats.udp_requests_aborted,
udp_requests_banned: stats.udp_requests_banned,
udp_banned_ips_total: udp_banned_ips_total as u64,
udp_avg_connect_processing_time_ns: stats.udp_avg_connect_processing_time_ns,
udp_avg_announce_processing_time_ns: stats.udp_avg_announce_processing_time_ns,
udp_avg_scrape_processing_time_ns: stats.udp_avg_scrape_processing_time_ns,
// UDPv4
udp4_requests: stats.udp4_requests,
udp4_connections_handled: stats.udp4_connections_handled,
udp4_announces_handled: stats.udp4_announces_handled,
udp4_scrapes_handled: stats.udp4_scrapes_handled,
udp4_responses: stats.udp4_responses,
udp4_errors_handled: stats.udp4_errors_handled,
// UDPv6
udp6_requests: stats.udp6_requests,
udp6_connections_handled: stats.udp6_connections_handled,
udp6_announces_handled: stats.udp6_announces_handled,
Expand All @@ -96,13 +107,16 @@ pub async fn get_metrics(tracker: Arc<Tracker>) -> TrackerMetrics {
mod tests {
use std::sync::Arc;

use tokio::sync::RwLock;
use torrust_tracker_configuration::Configuration;
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
use torrust_tracker_test_helpers::configuration;

use crate::core;
use crate::core::services::statistics::{get_metrics, TrackerMetrics};
use crate::core::services::tracker_factory;
use crate::servers::udp::server::banning::BanService;
use crate::servers::udp::server::launcher::MAX_CONNECTION_ID_ERRORS_PER_IP;

pub fn tracker_configuration() -> Configuration {
configuration::ephemeral()
Expand All @@ -111,8 +125,9 @@ mod tests {
#[tokio::test]
async fn the_statistics_service_should_return_the_tracker_metrics() {
let tracker = Arc::new(tracker_factory(&tracker_configuration()));
let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP)));

let tracker_metrics = get_metrics(tracker.clone()).await;
let tracker_metrics = get_metrics(tracker.clone(), ban_service.clone()).await;

assert_eq!(
tracker_metrics,
Expand Down
36 changes: 32 additions & 4 deletions src/core/statistics/event/handler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::core::statistics::event::Event;
use crate::core::statistics::event::{Event, UdpResponseKind};
use crate::core::statistics::repository::Repository;

pub async fn handle_event(event: Event, stats_repository: &Repository) {
Expand All @@ -24,9 +24,12 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) {
}

// UDP
Event::Udp4RequestAborted => {
Event::UdpRequestAborted => {
stats_repository.increase_udp_requests_aborted().await;
}
Event::UdpRequestBanned => {
stats_repository.increase_udp_requests_banned().await;
}

// UDP4
Event::Udp4Request => {
Expand All @@ -41,8 +44,30 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) {
Event::Udp4Scrape => {
stats_repository.increase_udp4_scrapes().await;
}
Event::Udp4Response => {
Event::Udp4Response {
kind,
req_processing_time,
} => {
stats_repository.increase_udp4_responses().await;

match kind {
UdpResponseKind::Connect => {
stats_repository
.recalculate_udp_avg_connect_processing_time_ns(req_processing_time)
.await;
}
UdpResponseKind::Announce => {
stats_repository
.recalculate_udp_avg_announce_processing_time_ns(req_processing_time)
.await;
}
UdpResponseKind::Scrape => {
stats_repository
.recalculate_udp_avg_scrape_processing_time_ns(req_processing_time)
.await;
}
UdpResponseKind::Error => {}
}
}
Event::Udp4Error => {
stats_repository.increase_udp4_errors().await;
Expand All @@ -61,7 +86,10 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) {
Event::Udp6Scrape => {
stats_repository.increase_udp6_scrapes().await;
}
Event::Udp6Response => {
Event::Udp6Response {
kind: _,
req_processing_time: _,
} => {
stats_repository.increase_udp6_responses().await;
}
Event::Udp6Error => {
Expand Down
Loading
Loading