From a9b1c14b8436d4eff2819ccc1a191f818d73c699 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 21 Mar 2025 13:07:12 +0000 Subject: [PATCH] feat: [#1403] collect http tracker instance metrics It creates services per HTTP tracker instance (server bound to a socket address) to collect metrics only for that instace. There should be test failing becuase any server can use only one event sender and metrics repository. Therefore global metrics should not be updated. I think the problem is there are no E2E tests for statistics. There are only integration tests at the HTTP Core pacakge level. --- src/app.rs | 4 +- src/console/profiling.rs | 5 +-- src/container.rs | 87 ++++++++++++++++++++++++++++++++++++++-- src/main.rs | 6 +-- 4 files changed, 88 insertions(+), 14 deletions(-) diff --git a/src/app.rs b/src/app.rs index 60e907a88..0713bc659 100644 --- a/src/app.rs +++ b/src/app.rs @@ -38,7 +38,7 @@ use crate::container::AppContainer; /// - Can't retrieve tracker keys from database. /// - Can't load whitelist from database. #[instrument(skip(config, app_container))] -pub async fn start(config: &Configuration, app_container: &Arc) -> Vec> { +pub async fn start(config: &Configuration, mut app_container: AppContainer) -> Vec> { if config.http_api.is_none() && (config.udp_trackers.is_none() || config.udp_trackers.as_ref().map_or(true, std::vec::Vec::is_empty)) && (config.http_trackers.is_none() || config.http_trackers.as_ref().map_or(true, std::vec::Vec::is_empty)) @@ -94,7 +94,7 @@ pub async fn start(config: &Configuration, app_container: &Arc) -> if let Some(http_trackers) = &config.http_trackers { for http_tracker_config in http_trackers { let http_tracker_config = Arc::new(http_tracker_config.clone()); - let http_tracker_container = Arc::new(app_container.http_tracker_container(&http_tracker_config)); + let http_tracker_container = Arc::new(app_container.http_tracker_container(&http_tracker_config).await); if let Some(job) = http_tracker::start_job( http_tracker_container, diff --git a/src/console/profiling.rs b/src/console/profiling.rs index f3829c073..f30cab8b8 100644 --- a/src/console/profiling.rs +++ b/src/console/profiling.rs @@ -157,7 +157,6 @@ //! kcachegrind callgrind.out //! ``` use std::env; -use std::sync::Arc; use std::time::Duration; use tokio::time::sleep; @@ -182,9 +181,7 @@ pub async fn run() { let (config, app_container) = bootstrap::app::setup(); - let app_container = Arc::new(app_container); - - let jobs = app::start(&config, &app_container).await; + let jobs = app::start(&config, app_container).await; // Run the tracker for a fixed duration let run_duration = sleep(Duration::from_secs(duration_secs)); diff --git a/src/container.rs b/src/container.rs index 3fcda55f0..2da4cdcd3 100644 --- a/src/container.rs +++ b/src/container.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; +use std::net::SocketAddr; use std::sync::Arc; use bittorrent_http_tracker_core::container::HttpTrackerCoreContainer; @@ -65,6 +67,9 @@ pub struct AppContainer { pub http_announce_service: Arc, pub http_scrape_service: Arc, + // HTTP Tracker Server Containers (one container per HTTP Tracker) + pub http_server_instance_containers: Arc>, + // UDP Tracker Server Services pub udp_server_stats_event_sender: Arc>>, pub udp_server_stats_repository: Arc, @@ -96,6 +101,9 @@ impl AppContainer { http_stats_event_sender.clone(), )); + // HTTP Tracker Server Containers (one container per HTTP Tracker) + let http_server_instance_containers = Arc::new(RwLock::new(HttpTrackerInstanceContainers::default())); + // UDP Tracker Core Services let (udp_core_stats_event_sender, udp_core_stats_repository) = bittorrent_udp_tracker_core::statistics::setup::factory(configuration.core.tracker_usage_statistics); @@ -150,6 +158,9 @@ impl AppContainer { http_announce_service, http_scrape_service, + // HTTP Tracker Server Containers + http_server_instance_containers, + // UDP Tracker Server Services udp_server_stats_event_sender, udp_server_stats_repository, @@ -157,7 +168,27 @@ impl AppContainer { } #[must_use] - pub fn http_tracker_container(&self, http_tracker_config: &Arc) -> HttpTrackerCoreContainer { + pub async fn http_tracker_container(&mut self, http_tracker_config: &Arc) -> HttpTrackerCoreContainer { + let http_tracker_instance_container = if let Some(http_tracker_instance_container) = self + .http_server_instance_containers + .read() + .await + .get(&http_tracker_config.bind_address) + .await + { + http_tracker_instance_container + } else { + let http_server_instance_container = Arc::new(HttpTrackerInstanceContainer::initialize(http_tracker_config)); + + self.http_server_instance_containers + .write() + .await + .insert(http_tracker_config, http_server_instance_container.clone()) + .await; + + http_server_instance_container + }; + HttpTrackerCoreContainer { core_config: self.core_config.clone(), announce_handler: self.announce_handler.clone(), @@ -166,8 +197,8 @@ impl AppContainer { authentication_service: self.authentication_service.clone(), http_tracker_config: http_tracker_config.clone(), - http_stats_event_sender: self.http_stats_event_sender.clone(), - http_stats_repository: self.http_stats_repository.clone(), + http_stats_event_sender: http_tracker_instance_container.http_core_stats_event_sender.clone(), + http_stats_repository: http_tracker_instance_container.http_core_stats_repository.clone(), announce_service: self.http_announce_service.clone(), scrape_service: self.http_scrape_service.clone(), } @@ -214,3 +245,53 @@ impl AppContainer { } } } + +/// Container for each HTTP Tracker Server instance. +/// +/// Each instance runs on a different socket address. These services are not +/// shared between instances. +#[derive(Default)] +pub struct HttpTrackerInstanceContainers { + instances: RwLock>>, +} + +impl HttpTrackerInstanceContainers { + pub async fn insert( + &mut self, + http_tracker_config: &Arc, + http_server_instance_container: Arc, + ) { + self.instances + .write() + .await + .insert(http_tracker_config.bind_address, http_server_instance_container); + } + + #[must_use] + pub async fn get(&self, socket_addr: &SocketAddr) -> Option> { + self.instances.read().await.get(socket_addr).cloned() + } +} + +/// Container for HTTP Tracker Server instances. +#[derive(Clone, Default)] +pub struct HttpTrackerInstanceContainer { + pub http_core_stats_event_sender: Arc>>, + pub http_core_stats_repository: Arc, +} + +impl HttpTrackerInstanceContainer { + #[must_use] + pub fn initialize(configuration: &HttpTracker) -> Self { + let (http_core_stats_event_sender, http_core_stats_repository) = + bittorrent_http_tracker_core::statistics::setup::factory(configuration.tracker_usage_statistics); + + let http_core_stats_event_sender = Arc::new(http_core_stats_event_sender); + let http_core_stats_repository = Arc::new(http_core_stats_repository); + + Self { + http_core_stats_event_sender, + http_core_stats_repository, + } + } +} diff --git a/src/main.rs b/src/main.rs index 77f6e32a3..a132f42b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,10 @@ -use std::sync::Arc; - use torrust_tracker_lib::{app, bootstrap}; #[tokio::main] async fn main() { let (config, app_container) = bootstrap::app::setup(); - let app_container = Arc::new(app_container); - - let jobs = app::start(&config, &app_container).await; + let jobs = app::start(&config, app_container).await; // handle the signals tokio::select! {