From 3d2243b9b032318568ae2dfdd311d2239bacfcc9 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 19 Mar 2025 11:57:12 +0000 Subject: [PATCH 1/6] refactor: [#1396] extract event module in HTTP core --- .../src/v1/handlers/scrape.rs | 2 +- .../http-tracker-core/benches/helpers/util.rs | 9 +-- packages/http-tracker-core/src/container.rs | 4 +- packages/http-tracker-core/src/event/mod.rs | 59 +++++++++++++++++++ .../src/{statistics => }/event/sender.rs | 7 +-- packages/http-tracker-core/src/lib.rs | 1 + .../src/services/announce.rs | 38 ++++++------ .../http-tracker-core/src/services/scrape.rs | 40 ++++++------- .../src/statistics/event/handler.rs | 4 +- .../src/statistics/event/listener.rs | 2 +- .../src/statistics/event/mod.rs | 59 ------------------- .../src/statistics/keeper.rs | 2 +- .../http-tracker-core/src/statistics/setup.rs | 13 ++-- src/container.rs | 2 +- 14 files changed, 119 insertions(+), 123 deletions(-) create mode 100644 packages/http-tracker-core/src/event/mod.rs rename packages/http-tracker-core/src/{statistics => }/event/sender.rs (70%) diff --git a/packages/axum-http-tracker-server/src/v1/handlers/scrape.rs b/packages/axum-http-tracker-server/src/v1/handlers/scrape.rs index 1ba89eaaf..e9544c983 100644 --- a/packages/axum-http-tracker-server/src/v1/handlers/scrape.rs +++ b/packages/axum-http-tracker-server/src/v1/handlers/scrape.rs @@ -103,7 +103,7 @@ mod tests { } struct CoreHttpTrackerServices { - pub http_stats_event_sender: Arc>>, + pub http_stats_event_sender: Arc>>, } fn initialize_private_tracker() -> (CoreTrackerServices, CoreHttpTrackerServices) { diff --git a/packages/http-tracker-core/benches/helpers/util.rs b/packages/http-tracker-core/benches/helpers/util.rs index 19010041e..dff516063 100644 --- a/packages/http-tracker-core/benches/helpers/util.rs +++ b/packages/http-tracker-core/benches/helpers/util.rs @@ -26,7 +26,7 @@ pub struct CoreTrackerServices { } pub struct CoreHttpTrackerServices { - pub http_stats_event_sender: Arc>>, + pub http_stats_event_sender: Arc>>, } pub fn initialize_core_tracker_services() -> (CoreTrackerServices, CoreHttpTrackerServices) { @@ -105,14 +105,15 @@ pub fn sample_info_hash() -> InfoHash { .expect("String should be a valid info hash") } -use bittorrent_http_tracker_core::statistics; +use bittorrent_http_tracker_core::event::Event; +use bittorrent_http_tracker_core::{event, statistics}; use futures::future::BoxFuture; use mockall::mock; use tokio::sync::broadcast::error::SendError; mock! { HttpStatsEventSender {} - impl statistics::event::sender::Sender for HttpStatsEventSender { - fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; + impl event::sender::Sender for HttpStatsEventSender { + fn send_event(&self, event: Event) -> BoxFuture<'static,Option > > > ; } } diff --git a/packages/http-tracker-core/src/container.rs b/packages/http-tracker-core/src/container.rs index 448dce246..bb9b5014c 100644 --- a/packages/http-tracker-core/src/container.rs +++ b/packages/http-tracker-core/src/container.rs @@ -9,7 +9,7 @@ use torrust_tracker_configuration::{Core, HttpTracker}; use crate::services::announce::AnnounceService; use crate::services::scrape::ScrapeService; -use crate::statistics; +use crate::{event, statistics}; pub struct HttpTrackerCoreContainer { // todo: replace with TrackerCoreContainer @@ -20,7 +20,7 @@ pub struct HttpTrackerCoreContainer { pub authentication_service: Arc, pub http_tracker_config: Arc, - pub http_stats_event_sender: Arc>>, + pub http_stats_event_sender: Arc>>, pub http_stats_repository: Arc, pub announce_service: Arc, pub scrape_service: Arc, diff --git a/packages/http-tracker-core/src/event/mod.rs b/packages/http-tracker-core/src/event/mod.rs new file mode 100644 index 000000000..da824c240 --- /dev/null +++ b/packages/http-tracker-core/src/event/mod.rs @@ -0,0 +1,59 @@ +use std::net::{IpAddr, SocketAddr}; + +pub mod sender; + +/// An event. +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum Event { + TcpAnnounce { connection: ConnectionContext }, + TcpScrape { connection: ConnectionContext }, +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct ConnectionContext { + client: ClientConnectionContext, + server: ServerConnectionContext, +} + +impl ConnectionContext { + #[must_use] + pub fn new(client_ip_addr: IpAddr, opt_client_port: Option, server_socket_addr: SocketAddr) -> Self { + Self { + client: ClientConnectionContext { + ip_addr: client_ip_addr, + port: opt_client_port, + }, + server: ServerConnectionContext { + socket_addr: server_socket_addr, + }, + } + } + + #[must_use] + pub fn client_ip_addr(&self) -> IpAddr { + self.client.ip_addr + } + + #[must_use] + pub fn client_port(&self) -> Option { + self.client.port + } + + #[must_use] + pub fn server_socket_addr(&self) -> SocketAddr { + self.server.socket_addr + } +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct ClientConnectionContext { + ip_addr: IpAddr, + + /// It's provided if you use the `torrust-axum-http-tracker-server` crate. + port: Option, +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct ServerConnectionContext { + socket_addr: SocketAddr, +} diff --git a/packages/http-tracker-core/src/statistics/event/sender.rs b/packages/http-tracker-core/src/event/sender.rs similarity index 70% rename from packages/http-tracker-core/src/statistics/event/sender.rs rename to packages/http-tracker-core/src/event/sender.rs index 9092a8e0b..59ab4496b 100644 --- a/packages/http-tracker-core/src/statistics/event/sender.rs +++ b/packages/http-tracker-core/src/event/sender.rs @@ -7,16 +7,13 @@ use tokio::sync::broadcast::error::SendError; use super::Event; -/// A trait to allow sending statistics events +/// A trait to allow sending events. #[cfg_attr(test, automock)] pub trait Sender: Sync + Send { fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>>; } -/// An [`statistics::EventSender`](crate::statistics::event::sender::Sender) implementation. -/// -/// It uses a channel sender to send the statistic events. The channel is created by a -/// [`statistics::Keeper`](crate::statistics::keeper::Keeper) +/// An event sender implementation using a broadcast channel. #[allow(clippy::module_name_repetitions)] pub struct ChannelSender { pub(crate) sender: broadcast::Sender, diff --git a/packages/http-tracker-core/src/lib.rs b/packages/http-tracker-core/src/lib.rs index b42b99f8e..0b0b3ba78 100644 --- a/packages/http-tracker-core/src/lib.rs +++ b/packages/http-tracker-core/src/lib.rs @@ -1,4 +1,5 @@ pub mod container; +pub mod event; pub mod services; pub mod statistics; diff --git a/packages/http-tracker-core/src/services/announce.rs b/packages/http-tracker-core/src/services/announce.rs index 25fc1b861..cd7417e98 100644 --- a/packages/http-tracker-core/src/services/announce.rs +++ b/packages/http-tracker-core/src/services/announce.rs @@ -5,7 +5,7 @@ //! It delegates the `announce` logic to the [`AnnounceHandler`] and it returns //! the [`AnnounceData`]. //! -//! It also sends an [`http_tracker_core::statistics::event::Event`] +//! It also sends an [`http_tracker_core::event::Event`] //! because events are specific for the HTTP tracker. use std::net::{IpAddr, SocketAddr}; use std::panic::Location; @@ -22,7 +22,8 @@ use bittorrent_tracker_core::whitelist; use torrust_tracker_configuration::Core; use torrust_tracker_primitives::core::AnnounceData; -use crate::statistics; +use crate::event; +use crate::event::Event; /// The HTTP tracker `announce` service. /// @@ -35,7 +36,7 @@ pub struct AnnounceService { announce_handler: Arc, authentication_service: Arc, whitelist_authorization: Arc, - opt_http_stats_event_sender: Arc>>, + opt_http_stats_event_sender: Arc>>, } impl AnnounceService { @@ -45,7 +46,7 @@ impl AnnounceService { announce_handler: Arc, authentication_service: Arc, whitelist_authorization: Arc, - opt_http_stats_event_sender: Arc>>, + opt_http_stats_event_sender: Arc>>, ) -> Self { Self { core_config, @@ -140,8 +141,8 @@ impl AnnounceService { async fn send_stats_event(&self, peer_ip: IpAddr, opt_peer_ip_port: Option, server_socket_addr: SocketAddr) { if let Some(http_stats_event_sender) = self.opt_http_stats_event_sender.as_deref() { http_stats_event_sender - .send_event(statistics::event::Event::TcpAnnounce { - connection: statistics::event::ConnectionContext::new(peer_ip, opt_peer_ip_port, server_socket_addr), + .send_event(Event::TcpAnnounce { + connection: event::ConnectionContext::new(peer_ip, opt_peer_ip_port, server_socket_addr), }) .await; } @@ -227,7 +228,7 @@ mod tests { } struct CoreHttpTrackerServices { - pub http_stats_event_sender: Arc>>, + pub http_stats_event_sender: Arc>>, } fn initialize_core_tracker_services() -> (CoreTrackerServices, CoreHttpTrackerServices) { @@ -317,13 +318,14 @@ mod tests { use mockall::mock; use tokio::sync::broadcast::error::SendError; - use crate::statistics; + use crate::event::Event; use crate::tests::sample_info_hash; + use crate::{event, statistics}; mock! { HttpStatsEventSender {} - impl statistics::event::sender::Sender for HttpStatsEventSender { - fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; + impl event::sender::Sender for HttpStatsEventSender { + fn send_event(&self, event: Event) -> BoxFuture<'static,Option > > > ; } } @@ -340,13 +342,13 @@ mod tests { use torrust_tracker_test_helpers::configuration; use super::{sample_peer_using_ipv4, sample_peer_using_ipv6}; + use crate::event; + use crate::event::{ConnectionContext, Event}; use crate::services::announce::tests::{ initialize_core_tracker_services, initialize_core_tracker_services_with_config, sample_announce_request_for_peer, sample_peer, MockHttpStatsEventSender, }; use crate::services::announce::AnnounceService; - use crate::statistics; - use crate::statistics::event::ConnectionContext; #[tokio::test] async fn it_should_return_the_announce_data() { @@ -391,12 +393,12 @@ mod tests { let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new(); http_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::TcpAnnounce { + .with(eq(Event::TcpAnnounce { connection: ConnectionContext::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), Some(8080), server_socket_addr), })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let http_stats_event_sender: Arc>> = + let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); let (core_tracker_services, mut core_http_tracker_services) = initialize_core_tracker_services(); @@ -447,12 +449,12 @@ mod tests { let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new(); http_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::TcpAnnounce { + .with(eq(Event::TcpAnnounce { connection: ConnectionContext::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), Some(8080), server_socket_addr), })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let http_stats_event_sender: Arc>> = + let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); let (core_tracker_services, mut core_http_tracker_services) = @@ -486,7 +488,7 @@ mod tests { let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new(); http_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::TcpAnnounce { + .with(eq(Event::TcpAnnounce { connection: ConnectionContext::new( IpAddr::V6(Ipv6Addr::new(0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969)), Some(8080), @@ -495,7 +497,7 @@ mod tests { })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let http_stats_event_sender: Arc>> = + let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); let (core_tracker_services, mut core_http_tracker_services) = initialize_core_tracker_services(); diff --git a/packages/http-tracker-core/src/services/scrape.rs b/packages/http-tracker-core/src/services/scrape.rs index 6341ed301..1f4c14b5a 100644 --- a/packages/http-tracker-core/src/services/scrape.rs +++ b/packages/http-tracker-core/src/services/scrape.rs @@ -19,8 +19,8 @@ use bittorrent_tracker_core::scrape_handler::ScrapeHandler; use torrust_tracker_configuration::Core; use torrust_tracker_primitives::core::ScrapeData; -use crate::statistics; -use crate::statistics::event::ConnectionContext; +use crate::event; +use crate::event::{ConnectionContext, Event}; /// The HTTP tracker `scrape` service. /// @@ -38,7 +38,7 @@ pub struct ScrapeService { core_config: Arc, scrape_handler: Arc, authentication_service: Arc, - opt_http_stats_event_sender: Arc>>, + opt_http_stats_event_sender: Arc>>, } impl ScrapeService { @@ -47,7 +47,7 @@ impl ScrapeService { core_config: Arc, scrape_handler: Arc, authentication_service: Arc, - opt_http_stats_event_sender: Arc>>, + opt_http_stats_event_sender: Arc>>, ) -> Self { Self { core_config, @@ -126,7 +126,7 @@ impl ScrapeService { ) { if let Some(http_stats_event_sender) = self.opt_http_stats_event_sender.as_deref() { http_stats_event_sender - .send_event(statistics::event::Event::TcpScrape { + .send_event(Event::TcpScrape { connection: ConnectionContext::new(original_peer_ip, opt_original_peer_port, server_socket_addr), }) .await; @@ -207,7 +207,7 @@ mod tests { use torrust_tracker_configuration::Configuration; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; - use crate::statistics; + use crate::event::{self, Event}; use crate::tests::sample_info_hash; struct Container { @@ -259,8 +259,8 @@ mod tests { mock! { HttpStatsEventSender {} - impl statistics::event::sender::Sender for HttpStatsEventSender { - fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; + impl event::sender::Sender for HttpStatsEventSender { + fn send_event(&self, event: Event) -> BoxFuture<'static,Option > > > ; } } @@ -278,13 +278,13 @@ mod tests { use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_test_helpers::configuration; + use crate::event::{ConnectionContext, Event}; use crate::services::scrape::tests::{ initialize_services_with_configuration, sample_info_hashes, sample_peer, MockHttpStatsEventSender, }; use crate::services::scrape::ScrapeService; - use crate::statistics; - use crate::statistics::event::ConnectionContext; use crate::tests::sample_info_hash; + use crate::{event, statistics}; #[tokio::test] async fn it_should_return_the_scrape_data_for_a_torrent() { @@ -351,7 +351,7 @@ mod tests { let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new(); http_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::TcpScrape { + .with(eq(Event::TcpScrape { connection: ConnectionContext::new( IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), Some(8080), @@ -360,7 +360,7 @@ mod tests { })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let http_stats_event_sender: Arc>> = + let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); let container = initialize_services_with_configuration(&config); @@ -400,7 +400,7 @@ mod tests { let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new(); http_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::TcpScrape { + .with(eq(Event::TcpScrape { connection: ConnectionContext::new( IpAddr::V6(Ipv6Addr::new(0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969)), Some(8080), @@ -409,7 +409,7 @@ mod tests { })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let http_stats_event_sender: Arc>> = + let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); let container = initialize_services_with_configuration(&config); @@ -454,13 +454,13 @@ mod tests { use torrust_tracker_primitives::core::ScrapeData; use torrust_tracker_test_helpers::configuration; + use crate::event::{ConnectionContext, Event}; use crate::services::scrape::tests::{ initialize_services_with_configuration, sample_info_hashes, sample_peer, MockHttpStatsEventSender, }; use crate::services::scrape::ScrapeService; - use crate::statistics; - use crate::statistics::event::ConnectionContext; use crate::tests::sample_info_hash; + use crate::{event, statistics}; #[tokio::test] async fn it_should_return_the_zeroed_scrape_data_when_the_tracker_is_running_in_private_mode_and_the_peer_is_not_authenticated( @@ -521,7 +521,7 @@ mod tests { let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new(); http_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::TcpScrape { + .with(eq(Event::TcpScrape { connection: ConnectionContext::new( IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), Some(8080), @@ -530,7 +530,7 @@ mod tests { })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let http_stats_event_sender: Arc>> = + let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); let peer_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)); @@ -570,7 +570,7 @@ mod tests { let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new(); http_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::TcpScrape { + .with(eq(Event::TcpScrape { connection: ConnectionContext::new( IpAddr::V6(Ipv6Addr::new(0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969)), Some(8080), @@ -579,7 +579,7 @@ mod tests { })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let http_stats_event_sender: Arc>> = + let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); let peer_ip = IpAddr::V6(Ipv6Addr::new(0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969)); diff --git a/packages/http-tracker-core/src/statistics/event/handler.rs b/packages/http-tracker-core/src/statistics/event/handler.rs index b8806b9d2..700e39476 100644 --- a/packages/http-tracker-core/src/statistics/event/handler.rs +++ b/packages/http-tracker-core/src/statistics/event/handler.rs @@ -1,6 +1,6 @@ use std::net::IpAddr; -use crate::statistics::event::Event; +use crate::event::Event; use crate::statistics::repository::Repository; /// # Panics @@ -34,8 +34,8 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { mod tests { use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + use crate::event::{ConnectionContext, Event}; use crate::statistics::event::handler::handle_event; - use crate::statistics::event::{ConnectionContext, Event}; use crate::statistics::repository::Repository; #[tokio::test] diff --git a/packages/http-tracker-core/src/statistics/event/listener.rs b/packages/http-tracker-core/src/statistics/event/listener.rs index a70992a02..a03a56a21 100644 --- a/packages/http-tracker-core/src/statistics/event/listener.rs +++ b/packages/http-tracker-core/src/statistics/event/listener.rs @@ -1,7 +1,7 @@ use tokio::sync::broadcast; use super::handler::handle_event; -use super::Event; +use crate::event::Event; use crate::statistics::repository::Repository; pub async fn dispatch_events(mut receiver: broadcast::Receiver, stats_repository: Repository) { diff --git a/packages/http-tracker-core/src/statistics/event/mod.rs b/packages/http-tracker-core/src/statistics/event/mod.rs index 2964956d8..dae683398 100644 --- a/packages/http-tracker-core/src/statistics/event/mod.rs +++ b/packages/http-tracker-core/src/statistics/event/mod.rs @@ -1,61 +1,2 @@ -use std::net::{IpAddr, SocketAddr}; - pub mod handler; pub mod listener; -pub mod sender; - -/// An statistics event. It is used to collect tracker metrics. -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum Event { - TcpAnnounce { connection: ConnectionContext }, - TcpScrape { connection: ConnectionContext }, -} - -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct ConnectionContext { - client: ClientConnectionContext, - server: ServerConnectionContext, -} - -impl ConnectionContext { - #[must_use] - pub fn new(client_ip_addr: IpAddr, opt_client_port: Option, server_socket_addr: SocketAddr) -> Self { - Self { - client: ClientConnectionContext { - ip_addr: client_ip_addr, - port: opt_client_port, - }, - server: ServerConnectionContext { - socket_addr: server_socket_addr, - }, - } - } - - #[must_use] - pub fn client_ip_addr(&self) -> IpAddr { - self.client.ip_addr - } - - #[must_use] - pub fn client_port(&self) -> Option { - self.client.port - } - - #[must_use] - pub fn server_socket_addr(&self) -> SocketAddr { - self.server.socket_addr - } -} - -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct ClientConnectionContext { - ip_addr: IpAddr, - - /// It's provided if you use the `torrust-axum-http-tracker-server` crate. - port: Option, -} - -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct ServerConnectionContext { - socket_addr: SocketAddr, -} diff --git a/packages/http-tracker-core/src/statistics/keeper.rs b/packages/http-tracker-core/src/statistics/keeper.rs index f4428ec70..01a7a1569 100644 --- a/packages/http-tracker-core/src/statistics/keeper.rs +++ b/packages/http-tracker-core/src/statistics/keeper.rs @@ -1,8 +1,8 @@ use tokio::sync::broadcast::Receiver; use super::event::listener::dispatch_events; -use super::event::Event; use super::repository::Repository; +use crate::event::Event; /// The service responsible for keeping tracker metrics (listening to statistics events and handle them). /// diff --git a/packages/http-tracker-core/src/statistics/setup.rs b/packages/http-tracker-core/src/statistics/setup.rs index a9ac751c6..ca31e5d52 100644 --- a/packages/http-tracker-core/src/statistics/setup.rs +++ b/packages/http-tracker-core/src/statistics/setup.rs @@ -3,8 +3,8 @@ //! The [`factory`] function builds the structs needed for handling the tracker metrics. use tokio::sync::broadcast; -use super::event::sender::ChannelSender; -use crate::statistics; +use crate::event::sender::ChannelSender; +use crate::{event, statistics}; const CHANNEL_CAPACITY: usize = 1024; @@ -18,13 +18,8 @@ const CHANNEL_CAPACITY: usize = 1024; /// When the input argument `tracker_usage_statistics`is false the setup does not run the event listeners, consequently the statistics /// events are sent are received but not dispatched to the handler. #[must_use] -pub fn factory( - tracker_usage_statistics: bool, -) -> ( - Option>, - statistics::repository::Repository, -) { - let mut stats_event_sender: Option> = None; +pub fn factory(tracker_usage_statistics: bool) -> (Option>, statistics::repository::Repository) { + let mut stats_event_sender: Option> = None; let mut keeper = statistics::keeper::Keeper::new(); diff --git a/src/container.rs b/src/container.rs index 07c30d604..1c8c9c1d3 100644 --- a/src/container.rs +++ b/src/container.rs @@ -60,7 +60,7 @@ pub struct AppContainer { pub udp_scrape_service: Arc, // HTTP Tracker Core Services - pub http_stats_event_sender: Arc>>, + pub http_stats_event_sender: Arc>>, pub http_stats_repository: Arc, pub http_announce_service: Arc, pub http_scrape_service: Arc, From 7e364d14ca468105054e42a92b19719209c63e38 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 19 Mar 2025 16:18:17 +0000 Subject: [PATCH 2/6] refactor: [#1396] move event channel creation to events mod in HTTP tracker core --- .../http-tracker-core/src/event/sender.rs | 23 +++++++++--- .../http-tracker-core/src/statistics/setup.rs | 35 +++++++++---------- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/packages/http-tracker-core/src/event/sender.rs b/packages/http-tracker-core/src/event/sender.rs index 59ab4496b..e9431abf2 100644 --- a/packages/http-tracker-core/src/event/sender.rs +++ b/packages/http-tracker-core/src/event/sender.rs @@ -7,20 +7,35 @@ use tokio::sync::broadcast::error::SendError; use super::Event; -/// A trait to allow sending events. +const CHANNEL_CAPACITY: usize = 1024; + +/// A trait for sending sending. #[cfg_attr(test, automock)] pub trait Sender: Sync + Send { fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>>; } /// An event sender implementation using a broadcast channel. -#[allow(clippy::module_name_repetitions)] -pub struct ChannelSender { +pub struct Broadcaster { pub(crate) sender: broadcast::Sender, } -impl Sender for ChannelSender { +impl Sender for Broadcaster { fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>> { async move { Some(self.sender.send(event)) }.boxed() } } + +impl Default for Broadcaster { + fn default() -> Self { + let (sender, _) = broadcast::channel(CHANNEL_CAPACITY); + Self { sender } + } +} + +impl Broadcaster { + #[must_use] + pub fn subscribe(&self) -> broadcast::Receiver { + self.sender.subscribe() + } +} diff --git a/packages/http-tracker-core/src/statistics/setup.rs b/packages/http-tracker-core/src/statistics/setup.rs index ca31e5d52..e2974e4c0 100644 --- a/packages/http-tracker-core/src/statistics/setup.rs +++ b/packages/http-tracker-core/src/statistics/setup.rs @@ -1,39 +1,36 @@ //! Setup for the tracker statistics. //! //! The [`factory`] function builds the structs needed for handling the tracker metrics. -use tokio::sync::broadcast; - -use crate::event::sender::ChannelSender; +use crate::event::sender::Broadcaster; use crate::{event, statistics}; -const CHANNEL_CAPACITY: usize = 1024; - /// It builds the structs needed for handling the tracker metrics. /// /// It returns: /// -/// - An statistics event [`Sender`](crate::statistics::event::sender::Sender) that allows you to send events related to statistics. -/// - An statistics [`Repository`](crate::statistics::repository::Repository) which is an in-memory repository for the tracker metrics. +/// - An event [`Sender`](crate::event::sender::Sender) that allows you to send +/// events related to statistics. +/// - An statistics [`Repository`](crate::statistics::repository::Repository) +/// which is an in-memory repository for the tracker metrics. /// -/// When the input argument `tracker_usage_statistics`is false the setup does not run the event listeners, consequently the statistics -/// events are sent are received but not dispatched to the handler. +/// When the input argument `tracker_usage_statistics`is false the setup does +/// not run the event listeners, consequently the statistics events are sent are +/// received but not dispatched to the handler. #[must_use] pub fn factory(tracker_usage_statistics: bool) -> (Option>, statistics::repository::Repository) { - let mut stats_event_sender: Option> = None; - let mut keeper = statistics::keeper::Keeper::new(); - if tracker_usage_statistics { - let (sender, _) = broadcast::channel(CHANNEL_CAPACITY); + let opt_event_sender: Option> = if tracker_usage_statistics { + let broadcaster = Broadcaster::default(); - let receiver = sender.subscribe(); + keeper.run_event_listener(broadcaster.subscribe()); - stats_event_sender = Some(Box::new(ChannelSender { sender })); - - keeper.run_event_listener(receiver); - } + Some(Box::new(broadcaster)) + } else { + None + }; - (stats_event_sender, keeper.repository) + (opt_event_sender, keeper.repository) } #[cfg(test)] From ed9383610337492ca3d6f7f7c499fd4ba735cbc6 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 19 Mar 2025 16:44:43 +0000 Subject: [PATCH 3/6] refactor: [#1397] extract event module in UDP core --- .../udp-tracker-core/benches/helpers/utils.rs | 7 +-- packages/udp-tracker-core/src/container.rs | 4 +- packages/udp-tracker-core/src/event/mod.rs | 40 +++++++++++++++++ .../src/{statistics => }/event/sender.rs | 28 ++++++++---- packages/udp-tracker-core/src/lib.rs | 1 + .../udp-tracker-core/src/services/announce.rs | 9 ++-- .../udp-tracker-core/src/services/connect.rs | 21 +++++---- packages/udp-tracker-core/src/services/mod.rs | 7 +-- .../udp-tracker-core/src/services/scrape.rs | 9 ++-- .../src/statistics/event/handler.rs | 4 +- .../src/statistics/event/listener.rs | 2 +- .../src/statistics/event/mod.rs | 40 ----------------- .../udp-tracker-core/src/statistics/keeper.rs | 2 +- .../src/statistics/services.rs | 2 +- .../udp-tracker-core/src/statistics/setup.rs | 44 ++++++++----------- .../src/handlers/announce.rs | 8 ++-- .../src/handlers/connect.rs | 14 +++--- .../udp-tracker-server/src/handlers/mod.rs | 6 +-- src/container.rs | 2 +- 19 files changed, 127 insertions(+), 123 deletions(-) create mode 100644 packages/udp-tracker-core/src/event/mod.rs rename packages/udp-tracker-core/src/{statistics => }/event/sender.rs (54%) diff --git a/packages/udp-tracker-core/benches/helpers/utils.rs b/packages/udp-tracker-core/benches/helpers/utils.rs index aed4d9542..f6c2f6fad 100644 --- a/packages/udp-tracker-core/benches/helpers/utils.rs +++ b/packages/udp-tracker-core/benches/helpers/utils.rs @@ -1,6 +1,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use bittorrent_udp_tracker_core::statistics; +use bittorrent_udp_tracker_core::event; +use bittorrent_udp_tracker_core::event::Event; use futures::future::BoxFuture; use mockall::mock; use tokio::sync::broadcast::error::SendError; @@ -19,7 +20,7 @@ pub(crate) fn sample_issue_time() -> f64 { mock! { pub(crate) UdpCoreStatsEventSender {} - impl statistics::event::sender::Sender for UdpCoreStatsEventSender { - fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; + impl event::sender::Sender for UdpCoreStatsEventSender { + fn send_event(&self, event: Event) -> BoxFuture<'static,Option > > > ; } } diff --git a/packages/udp-tracker-core/src/container.rs b/packages/udp-tracker-core/src/container.rs index c4cce3dc1..aaa07f150 100644 --- a/packages/udp-tracker-core/src/container.rs +++ b/packages/udp-tracker-core/src/container.rs @@ -11,7 +11,7 @@ use crate::services::announce::AnnounceService; use crate::services::banning::BanService; use crate::services::connect::ConnectService; use crate::services::scrape::ScrapeService; -use crate::{statistics, MAX_CONNECTION_ID_ERRORS_PER_IP}; +use crate::{event, statistics, MAX_CONNECTION_ID_ERRORS_PER_IP}; pub struct UdpTrackerCoreContainer { // todo: replace with TrackerCoreContainer @@ -21,7 +21,7 @@ pub struct UdpTrackerCoreContainer { pub whitelist_authorization: Arc, pub udp_tracker_config: Arc, - pub udp_core_stats_event_sender: Arc>>, + pub udp_core_stats_event_sender: Arc>>, pub udp_core_stats_repository: Arc, pub ban_service: Arc>, pub connect_service: Arc, diff --git a/packages/udp-tracker-core/src/event/mod.rs b/packages/udp-tracker-core/src/event/mod.rs new file mode 100644 index 000000000..48a5b501b --- /dev/null +++ b/packages/udp-tracker-core/src/event/mod.rs @@ -0,0 +1,40 @@ +use std::net::SocketAddr; + +pub mod sender; + +/// An statistics event. It is used to collect tracker metrics. +/// +/// - `Udp` prefix means the event was triggered by the UDP tracker. +/// - The event suffix is the type of request: `announce`, `scrape` or `connection`. +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum Event { + UdpConnect { context: ConnectionContext }, + UdpAnnounce { context: ConnectionContext }, + UdpScrape { context: ConnectionContext }, +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct ConnectionContext { + pub client_socket_addr: SocketAddr, + pub server_socket_addr: SocketAddr, +} + +impl ConnectionContext { + #[must_use] + pub fn new(client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) -> Self { + Self { + client_socket_addr, + server_socket_addr, + } + } + + #[must_use] + pub fn client_socket_addr(&self) -> SocketAddr { + self.client_socket_addr + } + + #[must_use] + pub fn server_socket_addr(&self) -> SocketAddr { + self.server_socket_addr + } +} diff --git a/packages/udp-tracker-core/src/statistics/event/sender.rs b/packages/udp-tracker-core/src/event/sender.rs similarity index 54% rename from packages/udp-tracker-core/src/statistics/event/sender.rs rename to packages/udp-tracker-core/src/event/sender.rs index 9092a8e0b..e9431abf2 100644 --- a/packages/udp-tracker-core/src/statistics/event/sender.rs +++ b/packages/udp-tracker-core/src/event/sender.rs @@ -7,23 +7,35 @@ use tokio::sync::broadcast::error::SendError; use super::Event; -/// A trait to allow sending statistics events +const CHANNEL_CAPACITY: usize = 1024; + +/// A trait for sending sending. #[cfg_attr(test, automock)] pub trait Sender: Sync + Send { fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>>; } -/// An [`statistics::EventSender`](crate::statistics::event::sender::Sender) implementation. -/// -/// It uses a channel sender to send the statistic events. The channel is created by a -/// [`statistics::Keeper`](crate::statistics::keeper::Keeper) -#[allow(clippy::module_name_repetitions)] -pub struct ChannelSender { +/// An event sender implementation using a broadcast channel. +pub struct Broadcaster { pub(crate) sender: broadcast::Sender, } -impl Sender for ChannelSender { +impl Sender for Broadcaster { fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>> { async move { Some(self.sender.send(event)) }.boxed() } } + +impl Default for Broadcaster { + fn default() -> Self { + let (sender, _) = broadcast::channel(CHANNEL_CAPACITY); + Self { sender } + } +} + +impl Broadcaster { + #[must_use] + pub fn subscribe(&self) -> broadcast::Receiver { + self.sender.subscribe() + } +} diff --git a/packages/udp-tracker-core/src/lib.rs b/packages/udp-tracker-core/src/lib.rs index 5aa714d35..94ce93068 100644 --- a/packages/udp-tracker-core/src/lib.rs +++ b/packages/udp-tracker-core/src/lib.rs @@ -1,6 +1,7 @@ pub mod connection_cookie; pub mod container; pub mod crypto; +pub mod event; pub mod services; pub mod statistics; diff --git a/packages/udp-tracker-core/src/services/announce.rs b/packages/udp-tracker-core/src/services/announce.rs index f745a90fd..bba9b51fc 100644 --- a/packages/udp-tracker-core/src/services/announce.rs +++ b/packages/udp-tracker-core/src/services/announce.rs @@ -20,8 +20,7 @@ use bittorrent_udp_tracker_protocol::peer_builder; use torrust_tracker_primitives::core::AnnounceData; use crate::connection_cookie::{check, gen_remote_fingerprint, ConnectionCookieError}; -use crate::statistics; -use crate::statistics::event::ConnectionContext; +use crate::event::{self, ConnectionContext, Event}; /// The `AnnounceService` is responsible for handling the `announce` requests. /// @@ -31,7 +30,7 @@ use crate::statistics::event::ConnectionContext; pub struct AnnounceService { announce_handler: Arc, whitelist_authorization: Arc, - opt_udp_core_stats_event_sender: Arc>>, + opt_udp_core_stats_event_sender: Arc>>, } impl AnnounceService { @@ -39,7 +38,7 @@ impl AnnounceService { pub fn new( announce_handler: Arc, whitelist_authorization: Arc, - opt_udp_core_stats_event_sender: Arc>>, + opt_udp_core_stats_event_sender: Arc>>, ) -> Self { Self { announce_handler, @@ -104,7 +103,7 @@ impl AnnounceService { async fn send_stats_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) { if let Some(udp_stats_event_sender) = self.opt_udp_core_stats_event_sender.as_deref() { udp_stats_event_sender - .send_event(statistics::event::Event::UdpAnnounce { + .send_event(Event::UdpAnnounce { context: ConnectionContext::new(client_socket_addr, server_socket_addr), }) .await; diff --git a/packages/udp-tracker-core/src/services/connect.rs b/packages/udp-tracker-core/src/services/connect.rs index fb28fe70b..e543fbb1e 100644 --- a/packages/udp-tracker-core/src/services/connect.rs +++ b/packages/udp-tracker-core/src/services/connect.rs @@ -7,20 +7,19 @@ use std::sync::Arc; use aquatic_udp_protocol::ConnectionId; use crate::connection_cookie::{gen_remote_fingerprint, make}; -use crate::statistics; -use crate::statistics::event::ConnectionContext; +use crate::event::{self, ConnectionContext, Event}; /// The `ConnectService` is responsible for handling the `connect` requests. /// /// It is responsible for generating the connection cookie and sending the /// appropriate statistics events. pub struct ConnectService { - pub opt_udp_core_stats_event_sender: Arc>>, + pub opt_udp_core_stats_event_sender: Arc>>, } impl ConnectService { #[must_use] - pub fn new(opt_udp_core_stats_event_sender: Arc>>) -> Self { + pub fn new(opt_udp_core_stats_event_sender: Arc>>) -> Self { Self { opt_udp_core_stats_event_sender, } @@ -42,7 +41,7 @@ impl ConnectService { if let Some(udp_stats_event_sender) = self.opt_udp_core_stats_event_sender.as_deref() { udp_stats_event_sender - .send_event(statistics::event::Event::UdpConnect { + .send_event(Event::UdpConnect { context: ConnectionContext::new(client_socket_addr, server_socket_addr), }) .await; @@ -64,13 +63,13 @@ mod tests { use mockall::predicate::eq; use crate::connection_cookie::make; + use crate::event::{ConnectionContext, Event}; use crate::services::connect::ConnectService; use crate::services::tests::{ sample_ipv4_remote_addr, sample_ipv4_remote_addr_fingerprint, sample_ipv4_socket_address, sample_ipv6_remote_addr, sample_ipv6_remote_addr_fingerprint, sample_issue_time, MockUdpCoreStatsEventSender, }; - use crate::statistics; - use crate::statistics::event::ConnectionContext; + use crate::{event, statistics}; #[tokio::test] async fn a_connect_response_should_contain_the_same_transaction_id_as_the_connect_request() { @@ -138,12 +137,12 @@ mod tests { let mut udp_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); udp_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::UdpConnect { + .with(eq(Event::UdpConnect { context: ConnectionContext::new(client_socket_addr, server_socket_addr), })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let opt_udp_stats_event_sender: Arc>> = + let opt_udp_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_stats_event_sender_mock))); let connect_service = Arc::new(ConnectService::new(opt_udp_stats_event_sender)); @@ -161,12 +160,12 @@ mod tests { let mut udp_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); udp_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::UdpConnect { + .with(eq(Event::UdpConnect { context: ConnectionContext::new(client_socket_addr, server_socket_addr), })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let opt_udp_stats_event_sender: Arc>> = + let opt_udp_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_stats_event_sender_mock))); let connect_service = Arc::new(ConnectService::new(opt_udp_stats_event_sender)); diff --git a/packages/udp-tracker-core/src/services/mod.rs b/packages/udp-tracker-core/src/services/mod.rs index 55a533a22..ac82d71e8 100644 --- a/packages/udp-tracker-core/src/services/mod.rs +++ b/packages/udp-tracker-core/src/services/mod.rs @@ -13,7 +13,8 @@ pub(crate) mod tests { use tokio::sync::broadcast::error::SendError; use crate::connection_cookie::gen_remote_fingerprint; - use crate::statistics; + use crate::event; + use crate::event::Event; pub(crate) fn sample_ipv4_remote_addr() -> SocketAddr { sample_ipv4_socket_address() @@ -45,8 +46,8 @@ pub(crate) mod tests { mock! { pub(crate) UdpCoreStatsEventSender {} - impl statistics::event::sender::Sender for UdpCoreStatsEventSender { - fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; + impl event::sender::Sender for UdpCoreStatsEventSender { + fn send_event(&self, event: Event) -> BoxFuture<'static,Option > > > ; } } } diff --git a/packages/udp-tracker-core/src/services/scrape.rs b/packages/udp-tracker-core/src/services/scrape.rs index 446c1182f..9f0941c2a 100644 --- a/packages/udp-tracker-core/src/services/scrape.rs +++ b/packages/udp-tracker-core/src/services/scrape.rs @@ -18,8 +18,7 @@ use bittorrent_tracker_core::scrape_handler::ScrapeHandler; use torrust_tracker_primitives::core::ScrapeData; use crate::connection_cookie::{check, gen_remote_fingerprint, ConnectionCookieError}; -use crate::statistics; -use crate::statistics::event::ConnectionContext; +use crate::event::{self, ConnectionContext, Event}; /// The `ScrapeService` is responsible for handling the `scrape` requests. /// @@ -28,14 +27,14 @@ use crate::statistics::event::ConnectionContext; /// - The number of UDP `scrape` requests handled by the UDP tracker. pub struct ScrapeService { scrape_handler: Arc, - opt_udp_stats_event_sender: Arc>>, + opt_udp_stats_event_sender: Arc>>, } impl ScrapeService { #[must_use] pub fn new( scrape_handler: Arc, - opt_udp_stats_event_sender: Arc>>, + opt_udp_stats_event_sender: Arc>>, ) -> Self { Self { scrape_handler, @@ -86,7 +85,7 @@ impl ScrapeService { async fn send_stats_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) { if let Some(udp_stats_event_sender) = self.opt_udp_stats_event_sender.as_deref() { udp_stats_event_sender - .send_event(statistics::event::Event::UdpScrape { + .send_event(Event::UdpScrape { context: ConnectionContext::new(client_socket_addr, server_socket_addr), }) .await; diff --git a/packages/udp-tracker-core/src/statistics/event/handler.rs b/packages/udp-tracker-core/src/statistics/event/handler.rs index 98860592f..a9ac0dade 100644 --- a/packages/udp-tracker-core/src/statistics/event/handler.rs +++ b/packages/udp-tracker-core/src/statistics/event/handler.rs @@ -1,4 +1,4 @@ -use crate::statistics::event::Event; +use crate::event::Event; use crate::statistics::repository::Repository; /// # Panics @@ -39,8 +39,8 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { mod tests { use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + use crate::event::{ConnectionContext, Event}; use crate::statistics::event::handler::handle_event; - use crate::statistics::event::{ConnectionContext, Event}; use crate::statistics::repository::Repository; #[tokio::test] diff --git a/packages/udp-tracker-core/src/statistics/event/listener.rs b/packages/udp-tracker-core/src/statistics/event/listener.rs index 36b1e7a22..f3afafc4f 100644 --- a/packages/udp-tracker-core/src/statistics/event/listener.rs +++ b/packages/udp-tracker-core/src/statistics/event/listener.rs @@ -1,7 +1,7 @@ use tokio::sync::broadcast; use super::handler::handle_event; -use super::Event; +use crate::event::Event; use crate::statistics::repository::Repository; pub async fn dispatch_events(mut receiver: broadcast::Receiver, stats_repository: Repository) { diff --git a/packages/udp-tracker-core/src/statistics/event/mod.rs b/packages/udp-tracker-core/src/statistics/event/mod.rs index 2e8ae39a9..dae683398 100644 --- a/packages/udp-tracker-core/src/statistics/event/mod.rs +++ b/packages/udp-tracker-core/src/statistics/event/mod.rs @@ -1,42 +1,2 @@ -use std::net::SocketAddr; - pub mod handler; pub mod listener; -pub mod sender; - -/// An statistics event. It is used to collect tracker metrics. -/// -/// - `Udp` prefix means the event was triggered by the UDP tracker. -/// - The event suffix is the type of request: `announce`, `scrape` or `connection`. -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum Event { - UdpConnect { context: ConnectionContext }, - UdpAnnounce { context: ConnectionContext }, - UdpScrape { context: ConnectionContext }, -} - -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct ConnectionContext { - client_socket_addr: SocketAddr, - server_socket_addr: SocketAddr, -} - -impl ConnectionContext { - #[must_use] - pub fn new(client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) -> Self { - Self { - client_socket_addr, - server_socket_addr, - } - } - - #[must_use] - pub fn client_socket_addr(&self) -> SocketAddr { - self.client_socket_addr - } - - #[must_use] - pub fn server_socket_addr(&self) -> SocketAddr { - self.server_socket_addr - } -} diff --git a/packages/udp-tracker-core/src/statistics/keeper.rs b/packages/udp-tracker-core/src/statistics/keeper.rs index f06642908..16ea51aac 100644 --- a/packages/udp-tracker-core/src/statistics/keeper.rs +++ b/packages/udp-tracker-core/src/statistics/keeper.rs @@ -1,8 +1,8 @@ use tokio::sync::broadcast::Receiver; use super::event::listener::dispatch_events; -use super::event::Event; use super::repository::Repository; +use crate::event::Event; /// The service responsible for keeping tracker metrics (listening to statistics events and handle them). /// diff --git a/packages/udp-tracker-core/src/statistics/services.rs b/packages/udp-tracker-core/src/statistics/services.rs index 56814f5d5..d3c1d4710 100644 --- a/packages/udp-tracker-core/src/statistics/services.rs +++ b/packages/udp-tracker-core/src/statistics/services.rs @@ -9,7 +9,7 @@ //! //! The factory function builds two structs: //! -//! - An statistics event [`Sender`](crate::statistics::event::sender::Sender) +//! - An event [`Sender`](crate::event::sender::Sender) //! - An statistics [`Repository`] //! //! ```text diff --git a/packages/udp-tracker-core/src/statistics/setup.rs b/packages/udp-tracker-core/src/statistics/setup.rs index a9ac751c6..e2974e4c0 100644 --- a/packages/udp-tracker-core/src/statistics/setup.rs +++ b/packages/udp-tracker-core/src/statistics/setup.rs @@ -1,44 +1,36 @@ //! Setup for the tracker statistics. //! //! The [`factory`] function builds the structs needed for handling the tracker metrics. -use tokio::sync::broadcast; - -use super::event::sender::ChannelSender; -use crate::statistics; - -const CHANNEL_CAPACITY: usize = 1024; +use crate::event::sender::Broadcaster; +use crate::{event, statistics}; /// It builds the structs needed for handling the tracker metrics. /// /// It returns: /// -/// - An statistics event [`Sender`](crate::statistics::event::sender::Sender) that allows you to send events related to statistics. -/// - An statistics [`Repository`](crate::statistics::repository::Repository) which is an in-memory repository for the tracker metrics. +/// - An event [`Sender`](crate::event::sender::Sender) that allows you to send +/// events related to statistics. +/// - An statistics [`Repository`](crate::statistics::repository::Repository) +/// which is an in-memory repository for the tracker metrics. /// -/// When the input argument `tracker_usage_statistics`is false the setup does not run the event listeners, consequently the statistics -/// events are sent are received but not dispatched to the handler. +/// When the input argument `tracker_usage_statistics`is false the setup does +/// not run the event listeners, consequently the statistics events are sent are +/// received but not dispatched to the handler. #[must_use] -pub fn factory( - tracker_usage_statistics: bool, -) -> ( - Option>, - statistics::repository::Repository, -) { - let mut stats_event_sender: Option> = None; - +pub fn factory(tracker_usage_statistics: bool) -> (Option>, statistics::repository::Repository) { let mut keeper = statistics::keeper::Keeper::new(); - if tracker_usage_statistics { - let (sender, _) = broadcast::channel(CHANNEL_CAPACITY); + let opt_event_sender: Option> = if tracker_usage_statistics { + let broadcaster = Broadcaster::default(); - let receiver = sender.subscribe(); + keeper.run_event_listener(broadcaster.subscribe()); - stats_event_sender = Some(Box::new(ChannelSender { sender })); - - keeper.run_event_listener(receiver); - } + Some(Box::new(broadcaster)) + } else { + None + }; - (stats_event_sender, keeper.repository) + (opt_event_sender, keeper.repository) } #[cfg(test)] diff --git a/packages/udp-tracker-server/src/handlers/announce.rs b/packages/udp-tracker-server/src/handlers/announce.rs index a2cb55e59..a26961a05 100644 --- a/packages/udp-tracker-server/src/handlers/announce.rs +++ b/packages/udp-tracker-server/src/handlers/announce.rs @@ -811,7 +811,7 @@ mod tests { use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist; use bittorrent_udp_tracker_core::connection_cookie::{gen_remote_fingerprint, make}; use bittorrent_udp_tracker_core::services::announce::AnnounceService; - use bittorrent_udp_tracker_core::{self, statistics as core_statistics}; + use bittorrent_udp_tracker_core::{self, event as core_event}; use mockall::predicate::eq; use crate::handlers::announce::tests::announce_request::AnnounceRequestBuilder; @@ -850,12 +850,12 @@ mod tests { let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); udp_core_stats_event_sender_mock .expect_send_event() - .with(eq(core_statistics::event::Event::UdpAnnounce { - context: core_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), + .with(eq(core_event::Event::UdpAnnounce { + context: core_event::ConnectionContext::new(client_socket_addr, server_socket_addr), })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let udp_core_stats_event_sender: Arc>> = + let udp_core_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_core_stats_event_sender_mock))); let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); diff --git a/packages/udp-tracker-server/src/handlers/connect.rs b/packages/udp-tracker-server/src/handlers/connect.rs index 992ef459d..aae9f1136 100644 --- a/packages/udp-tracker-server/src/handlers/connect.rs +++ b/packages/udp-tracker-server/src/handlers/connect.rs @@ -58,8 +58,8 @@ mod tests { use aquatic_udp_protocol::{ConnectRequest, ConnectResponse, Response, TransactionId}; use bittorrent_udp_tracker_core::connection_cookie::make; + use bittorrent_udp_tracker_core::event as core_event; use bittorrent_udp_tracker_core::services::connect::ConnectService; - use bittorrent_udp_tracker_core::statistics as core_statistics; use mockall::predicate::eq; use crate::handlers::handle_connect; @@ -192,12 +192,12 @@ mod tests { let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); udp_core_stats_event_sender_mock .expect_send_event() - .with(eq(core_statistics::event::Event::UdpConnect { - context: core_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), + .with(eq(core_event::Event::UdpConnect { + context: core_event::ConnectionContext::new(client_socket_addr, server_socket_addr), })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let udp_core_stats_event_sender: Arc>> = + let udp_core_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_core_stats_event_sender_mock))); let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); @@ -233,12 +233,12 @@ mod tests { let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); udp_core_stats_event_sender_mock .expect_send_event() - .with(eq(core_statistics::event::Event::UdpConnect { - context: core_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), + .with(eq(core_event::Event::UdpConnect { + context: core_event::ConnectionContext::new(client_socket_addr, server_socket_addr), })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let udp_core_stats_event_sender: Arc>> = + let udp_core_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_core_stats_event_sender_mock))); let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); diff --git a/packages/udp-tracker-server/src/handlers/mod.rs b/packages/udp-tracker-server/src/handlers/mod.rs index e573cc184..98f7a2fa2 100644 --- a/packages/udp-tracker-server/src/handlers/mod.rs +++ b/packages/udp-tracker-server/src/handlers/mod.rs @@ -219,7 +219,7 @@ pub(crate) mod tests { use bittorrent_udp_tracker_core::connection_cookie::gen_remote_fingerprint; use bittorrent_udp_tracker_core::services::announce::AnnounceService; use bittorrent_udp_tracker_core::services::scrape::ScrapeService; - use bittorrent_udp_tracker_core::{self, statistics as core_statistics}; + use bittorrent_udp_tracker_core::{self, event as core_event}; use futures::future::BoxFuture; use mockall::mock; use tokio::sync::broadcast::error::SendError; @@ -421,8 +421,8 @@ pub(crate) mod tests { mock! { pub(crate) UdpCoreStatsEventSender {} - impl core_statistics::event::sender::Sender for UdpCoreStatsEventSender { - fn send_event(&self, event: core_statistics::event::Event) -> BoxFuture<'static,Option > > > ; + impl core_event::sender::Sender for UdpCoreStatsEventSender { + fn send_event(&self, event: core_event::Event) -> BoxFuture<'static,Option > > > ; } } diff --git a/src/container.rs b/src/container.rs index 1c8c9c1d3..7822b5d61 100644 --- a/src/container.rs +++ b/src/container.rs @@ -52,7 +52,7 @@ pub struct AppContainer { pub torrents_manager: Arc, // UDP Tracker Core Services - pub udp_core_stats_event_sender: Arc>>, + pub udp_core_stats_event_sender: Arc>>, pub udp_core_stats_repository: Arc, pub udp_ban_service: Arc>, pub udp_connect_service: Arc, From d8f1696141c065ac41ae81182752da4e1c7714de Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 19 Mar 2025 17:03:47 +0000 Subject: [PATCH 4/6] refactor: [#1398] extract event module in UDP server --- packages/udp-tracker-server/src/container.rs | 4 +- packages/udp-tracker-server/src/event/mod.rs | 76 +++++++++++++++++++ .../src/{statistics => }/event/sender.rs | 28 +++++-- .../src/handlers/announce.rs | 34 ++++----- .../src/handlers/connect.rs | 22 +++--- .../udp-tracker-server/src/handlers/error.rs | 7 +- .../udp-tracker-server/src/handlers/mod.rs | 10 +-- .../udp-tracker-server/src/handlers/scrape.rs | 26 +++---- packages/udp-tracker-server/src/lib.rs | 1 + .../udp-tracker-server/src/server/launcher.rs | 9 +-- .../src/server/processor.rs | 20 ++--- .../src/statistics/event/handler.rs | 20 ++--- .../src/statistics/event/listener.rs | 2 +- .../src/statistics/event/mod.rs | 76 ------------------- .../src/statistics/keeper.rs | 2 +- .../src/statistics/setup.rs | 47 +++++------- src/container.rs | 2 +- 17 files changed, 191 insertions(+), 195 deletions(-) create mode 100644 packages/udp-tracker-server/src/event/mod.rs rename packages/udp-tracker-server/src/{statistics => }/event/sender.rs (54%) diff --git a/packages/udp-tracker-server/src/container.rs b/packages/udp-tracker-server/src/container.rs index 36ad0e671..0c8039b26 100644 --- a/packages/udp-tracker-server/src/container.rs +++ b/packages/udp-tracker-server/src/container.rs @@ -2,10 +2,10 @@ use std::sync::Arc; use torrust_tracker_configuration::Core; -use crate::statistics; +use crate::{event, statistics}; pub struct UdpTrackerServerContainer { - pub udp_server_stats_event_sender: Arc>>, + pub udp_server_stats_event_sender: Arc>>, pub udp_server_stats_repository: Arc, } diff --git a/packages/udp-tracker-server/src/event/mod.rs b/packages/udp-tracker-server/src/event/mod.rs new file mode 100644 index 000000000..adc1396cc --- /dev/null +++ b/packages/udp-tracker-server/src/event/mod.rs @@ -0,0 +1,76 @@ +use std::net::SocketAddr; +use std::time::Duration; + +pub mod sender; + +/// An statistics event. It is used to collect tracker metrics. +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum Event { + UdpRequestReceived { + context: ConnectionContext, + }, + UdpRequestAborted { + context: ConnectionContext, + }, + UdpRequestBanned { + context: ConnectionContext, + }, + UdpRequestAccepted { + context: ConnectionContext, + kind: UdpRequestKind, + }, + UdpResponseSent { + context: ConnectionContext, + kind: UdpResponseKind, + req_processing_time: Duration, + }, + UdpError { + context: ConnectionContext, + }, +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum UdpRequestKind { + Connect, + Announce, + Scrape, +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum UdpResponseKind { + Ok { + req_kind: UdpRequestKind, + }, + + /// There was an error handling the request. The error contains the request + /// kind if the request was parsed successfully. + Error { + opt_req_kind: Option, + }, +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct ConnectionContext { + client_socket_addr: SocketAddr, + server_socket_addr: SocketAddr, +} + +impl ConnectionContext { + #[must_use] + pub fn new(client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) -> Self { + Self { + client_socket_addr, + server_socket_addr, + } + } + + #[must_use] + pub fn client_socket_addr(&self) -> SocketAddr { + self.client_socket_addr + } + + #[must_use] + pub fn server_socket_addr(&self) -> SocketAddr { + self.server_socket_addr + } +} diff --git a/packages/udp-tracker-server/src/statistics/event/sender.rs b/packages/udp-tracker-server/src/event/sender.rs similarity index 54% rename from packages/udp-tracker-server/src/statistics/event/sender.rs rename to packages/udp-tracker-server/src/event/sender.rs index 9092a8e0b..e9431abf2 100644 --- a/packages/udp-tracker-server/src/statistics/event/sender.rs +++ b/packages/udp-tracker-server/src/event/sender.rs @@ -7,23 +7,35 @@ use tokio::sync::broadcast::error::SendError; use super::Event; -/// A trait to allow sending statistics events +const CHANNEL_CAPACITY: usize = 1024; + +/// A trait for sending sending. #[cfg_attr(test, automock)] pub trait Sender: Sync + Send { fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>>; } -/// An [`statistics::EventSender`](crate::statistics::event::sender::Sender) implementation. -/// -/// It uses a channel sender to send the statistic events. The channel is created by a -/// [`statistics::Keeper`](crate::statistics::keeper::Keeper) -#[allow(clippy::module_name_repetitions)] -pub struct ChannelSender { +/// An event sender implementation using a broadcast channel. +pub struct Broadcaster { pub(crate) sender: broadcast::Sender, } -impl Sender for ChannelSender { +impl Sender for Broadcaster { fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>> { async move { Some(self.sender.send(event)) }.boxed() } } + +impl Default for Broadcaster { + fn default() -> Self { + let (sender, _) = broadcast::channel(CHANNEL_CAPACITY); + Self { sender } + } +} + +impl Broadcaster { + #[must_use] + pub fn subscribe(&self) -> broadcast::Receiver { + self.sender.subscribe() + } +} diff --git a/packages/udp-tracker-server/src/handlers/announce.rs b/packages/udp-tracker-server/src/handlers/announce.rs index a26961a05..5df46125d 100644 --- a/packages/udp-tracker-server/src/handlers/announce.rs +++ b/packages/udp-tracker-server/src/handlers/announce.rs @@ -15,8 +15,7 @@ use tracing::{instrument, Level}; use zerocopy::network_endian::I32; use crate::error::Error; -use crate::statistics as server_statistics; -use crate::statistics::event::{ConnectionContext, UdpRequestKind}; +use crate::event::{self, ConnectionContext, Event, UdpRequestKind}; /// It handles the `Announce` request. /// @@ -30,7 +29,7 @@ pub async fn handle_announce( server_socket_addr: SocketAddr, request: &AnnounceRequest, core_config: &Arc, - opt_udp_server_stats_event_sender: &Arc>>, + opt_udp_server_stats_event_sender: &Arc>>, cookie_valid_range: Range, ) -> Result { tracing::Span::current() @@ -42,7 +41,7 @@ pub async fn handle_announce( if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender - .send_event(server_statistics::event::Event::UdpRequestAccepted { + .send_event(Event::UdpRequestAccepted { context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, }) @@ -207,6 +206,7 @@ mod tests { use bittorrent_udp_tracker_core::connection_cookie::{gen_remote_fingerprint, make}; use mockall::predicate::eq; + use crate::event::{self, ConnectionContext, Event, UdpRequestKind}; use crate::handlers::announce::tests::announce_request::AnnounceRequestBuilder; use crate::handlers::handle_announce; use crate::handlers::tests::{ @@ -215,8 +215,6 @@ mod tests { sample_issue_time, CoreTrackerServices, CoreUdpTrackerServices, MockUdpServerStatsEventSender, TorrentPeerBuilder, }; - use crate::statistics as server_statistics; - use crate::statistics::event::UdpRequestKind; #[tokio::test] async fn an_announced_peer_should_be_added_to_the_tracker() { @@ -425,13 +423,13 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::UdpRequestAccepted { - context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), + .with(eq(Event::UdpRequestAccepted { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let udp_server_stats_event_sender: Arc>> = + let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); let (core_tracker_services, core_udp_tracker_services, _server_udp_tracker_services) = @@ -532,6 +530,7 @@ mod tests { use mockall::predicate::eq; use torrust_tracker_configuration::Core; + use crate::event::{self, ConnectionContext, Event, UdpRequestKind}; use crate::handlers::announce::tests::announce_request::AnnounceRequestBuilder; use crate::handlers::handle_announce; use crate::handlers::tests::{ @@ -539,8 +538,6 @@ mod tests { initialize_core_tracker_services_for_public_tracker, sample_cookie_valid_range, sample_ipv6_remote_addr, sample_issue_time, MockUdpServerStatsEventSender, TorrentPeerBuilder, }; - use crate::statistics as server_statistics; - use crate::statistics::event::UdpRequestKind; #[tokio::test] async fn an_announced_peer_should_be_added_to_the_tracker() { @@ -768,13 +765,13 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::UdpRequestAccepted { - context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), + .with(eq(Event::UdpRequestAccepted { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let udp_server_stats_event_sender: Arc>> = + let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); let (core_tracker_services, core_udp_tracker_services, _server_udp_tracker_services) = @@ -814,14 +811,13 @@ mod tests { use bittorrent_udp_tracker_core::{self, event as core_event}; use mockall::predicate::eq; + use crate::event::{self, ConnectionContext, Event, UdpRequestKind}; use crate::handlers::announce::tests::announce_request::AnnounceRequestBuilder; use crate::handlers::handle_announce; use crate::handlers::tests::{ sample_cookie_valid_range, sample_issue_time, MockUdpCoreStatsEventSender, MockUdpServerStatsEventSender, TrackerConfigurationBuilder, }; - use crate::statistics as server_statistics; - use crate::statistics::event::UdpRequestKind; #[tokio::test] async fn the_peer_ip_should_be_changed_to_the_external_ip_in_the_tracker_configuration() { @@ -861,13 +857,13 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::UdpRequestAccepted { - context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), + .with(eq(Event::UdpRequestAccepted { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let udp_server_stats_event_sender: Arc>> = + let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); let announce_handler = Arc::new(AnnounceHandler::new( diff --git a/packages/udp-tracker-server/src/handlers/connect.rs b/packages/udp-tracker-server/src/handlers/connect.rs index aae9f1136..a0fbaead3 100644 --- a/packages/udp-tracker-server/src/handlers/connect.rs +++ b/packages/udp-tracker-server/src/handlers/connect.rs @@ -6,8 +6,7 @@ use aquatic_udp_protocol::{ConnectRequest, ConnectResponse, ConnectionId, Respon use bittorrent_udp_tracker_core::services::connect::ConnectService; use tracing::{instrument, Level}; -use crate::statistics as server_statistics; -use crate::statistics::event::{ConnectionContext, UdpRequestKind}; +use crate::event::{self, ConnectionContext, Event, UdpRequestKind}; /// It handles the `Connect` request. #[instrument(fields(transaction_id), skip(connect_service, opt_udp_server_stats_event_sender), ret(level = Level::TRACE))] @@ -16,7 +15,7 @@ pub async fn handle_connect( server_socket_addr: SocketAddr, request: &ConnectRequest, connect_service: &Arc, - opt_udp_server_stats_event_sender: &Arc>>, + opt_udp_server_stats_event_sender: &Arc>>, cookie_issue_time: f64, ) -> Response { tracing::Span::current().record("transaction_id", request.transaction_id.0.to_string()); @@ -24,7 +23,7 @@ pub async fn handle_connect( if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender - .send_event(server_statistics::event::Event::UdpRequestAccepted { + .send_event(Event::UdpRequestAccepted { context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Connect, }) @@ -62,13 +61,12 @@ mod tests { use bittorrent_udp_tracker_core::services::connect::ConnectService; use mockall::predicate::eq; + use crate::event::{self, ConnectionContext, Event, UdpRequestKind}; use crate::handlers::handle_connect; use crate::handlers::tests::{ sample_ipv4_remote_addr, sample_ipv4_remote_addr_fingerprint, sample_ipv4_socket_address, sample_ipv6_remote_addr, sample_ipv6_remote_addr_fingerprint, sample_issue_time, MockUdpCoreStatsEventSender, MockUdpServerStatsEventSender, }; - use crate::statistics as server_statistics; - use crate::statistics::event::UdpRequestKind; fn sample_connect_request() -> ConnectRequest { ConnectRequest { @@ -203,13 +201,13 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::UdpRequestAccepted { - context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), + .with(eq(Event::UdpRequestAccepted { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Connect, })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let udp_server_stats_event_sender: Arc>> = + let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender)); @@ -244,13 +242,13 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::UdpRequestAccepted { - context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), + .with(eq(Event::UdpRequestAccepted { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Connect, })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let udp_server_stats_event_sender: Arc>> = + let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender)); diff --git a/packages/udp-tracker-server/src/handlers/error.rs b/packages/udp-tracker-server/src/handlers/error.rs index d1ffe2fd4..70c33b5ba 100644 --- a/packages/udp-tracker-server/src/handlers/error.rs +++ b/packages/udp-tracker-server/src/handlers/error.rs @@ -11,8 +11,7 @@ use uuid::Uuid; use zerocopy::network_endian::I32; use crate::error::Error; -use crate::statistics as server_statistics; -use crate::statistics::event::{ConnectionContext, UdpRequestKind}; +use crate::event::{self, ConnectionContext, Event, UdpRequestKind}; #[allow(clippy::too_many_arguments)] #[instrument(fields(transaction_id), skip(opt_udp_server_stats_event_sender), ret(level = Level::TRACE))] @@ -21,7 +20,7 @@ pub async fn handle_error( client_socket_addr: SocketAddr, server_socket_addr: SocketAddr, request_id: Uuid, - opt_udp_server_stats_event_sender: &Arc>>, + opt_udp_server_stats_event_sender: &Arc>>, cookie_valid_range: Range, e: &Error, transaction_id: Option, @@ -60,7 +59,7 @@ pub async fn handle_error( if e.1.is_some() { if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender - .send_event(server_statistics::event::Event::UdpError { + .send_event(Event::UdpError { context: ConnectionContext::new(client_socket_addr, server_socket_addr), }) .await; diff --git a/packages/udp-tracker-server/src/handlers/mod.rs b/packages/udp-tracker-server/src/handlers/mod.rs index 98f7a2fa2..61f7bb187 100644 --- a/packages/udp-tracker-server/src/handlers/mod.rs +++ b/packages/udp-tracker-server/src/handlers/mod.rs @@ -24,7 +24,7 @@ use uuid::Uuid; use super::RawRequest; use crate::container::UdpTrackerServerContainer; use crate::error::Error; -use crate::statistics::event::UdpRequestKind; +use crate::event::UdpRequestKind; use crate::CurrentClock; #[derive(Debug, Clone, PartialEq)] @@ -228,7 +228,7 @@ pub(crate) mod tests { use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use torrust_tracker_test_helpers::configuration; - use crate::{statistics as server_statistics, CurrentClock}; + use crate::{event as server_event, CurrentClock}; pub(crate) struct CoreTrackerServices { pub core_config: Arc, @@ -244,7 +244,7 @@ pub(crate) mod tests { } pub(crate) struct ServerUdpTrackerServices { - pub udp_server_stats_event_sender: Arc>>, + pub udp_server_stats_event_sender: Arc>>, } fn default_testing_tracker_configuration() -> Configuration { @@ -428,8 +428,8 @@ pub(crate) mod tests { mock! { pub(crate) UdpServerStatsEventSender {} - impl server_statistics::event::sender::Sender for UdpServerStatsEventSender { - fn send_event(&self, event: server_statistics::event::Event) -> BoxFuture<'static,Option > > > ; + impl server_event::sender::Sender for UdpServerStatsEventSender { + fn send_event(&self, event: server_event::Event) -> BoxFuture<'static,Option > > > ; } } } diff --git a/packages/udp-tracker-server/src/handlers/scrape.rs b/packages/udp-tracker-server/src/handlers/scrape.rs index fbf2b7c43..ac0faef61 100644 --- a/packages/udp-tracker-server/src/handlers/scrape.rs +++ b/packages/udp-tracker-server/src/handlers/scrape.rs @@ -13,8 +13,7 @@ use tracing::{instrument, Level}; use zerocopy::network_endian::I32; use crate::error::Error; -use crate::statistics as server_statistics; -use crate::statistics::event::{ConnectionContext, UdpRequestKind}; +use crate::event::{self, ConnectionContext, Event, UdpRequestKind}; /// It handles the `Scrape` request. /// @@ -27,7 +26,7 @@ pub async fn handle_scrape( client_socket_addr: SocketAddr, server_socket_addr: SocketAddr, request: &ScrapeRequest, - opt_udp_server_stats_event_sender: &Arc>>, + opt_udp_server_stats_event_sender: &Arc>>, cookie_valid_range: Range, ) -> Result { tracing::Span::current() @@ -38,7 +37,7 @@ pub async fn handle_scrape( if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender - .send_event(server_statistics::event::Event::UdpRequestAccepted { + .send_event(Event::UdpRequestAccepted { context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Scrape, }) @@ -352,13 +351,13 @@ mod tests { use mockall::predicate::eq; use super::sample_scrape_request; + use crate::event; + use crate::event::{ConnectionContext, Event, UdpRequestKind}; use crate::handlers::handle_scrape; use crate::handlers::tests::{ initialize_core_tracker_services_for_default_tracker_configuration, sample_cookie_valid_range, sample_ipv4_remote_addr, MockUdpServerStatsEventSender, }; - use crate::statistics as server_statistics; - use crate::statistics::event::ConnectionContext; #[tokio::test] async fn should_send_the_upd4_scrape_event() { @@ -368,13 +367,13 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::UdpRequestAccepted { + .with(eq(Event::UdpRequestAccepted { context: ConnectionContext::new(client_socket_addr, server_socket_addr), - kind: server_statistics::event::UdpRequestKind::Scrape, + kind: UdpRequestKind::Scrape, })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let udp_server_stats_event_sender: Arc>> = + let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); let (_core_tracker_services, core_udp_tracker_services, _server_udp_tracker_services) = @@ -401,13 +400,12 @@ mod tests { use mockall::predicate::eq; use super::sample_scrape_request; + use crate::event::{self, ConnectionContext, Event, UdpRequestKind}; use crate::handlers::handle_scrape; use crate::handlers::tests::{ initialize_core_tracker_services_for_default_tracker_configuration, sample_cookie_valid_range, sample_ipv6_remote_addr, MockUdpServerStatsEventSender, }; - use crate::statistics as server_statistics; - use crate::statistics::event::ConnectionContext; #[tokio::test] async fn should_send_the_upd6_scrape_event() { @@ -417,13 +415,13 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::UdpRequestAccepted { + .with(eq(Event::UdpRequestAccepted { context: ConnectionContext::new(client_socket_addr, server_socket_addr), - kind: server_statistics::event::UdpRequestKind::Scrape, + kind: UdpRequestKind::Scrape, })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let udp_server_stats_event_sender: Arc>> = + let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); let (_core_tracker_services, core_udp_tracker_services, _server_udp_tracker_services) = diff --git a/packages/udp-tracker-server/src/lib.rs b/packages/udp-tracker-server/src/lib.rs index 9e013bf81..ff53adcfb 100644 --- a/packages/udp-tracker-server/src/lib.rs +++ b/packages/udp-tracker-server/src/lib.rs @@ -637,6 +637,7 @@ pub mod container; pub mod environment; pub mod error; +pub mod event; pub mod handlers; pub mod server; pub mod statistics; diff --git a/packages/udp-tracker-server/src/server/launcher.rs b/packages/udp-tracker-server/src/server/launcher.rs index c6a105230..c98db0500 100644 --- a/packages/udp-tracker-server/src/server/launcher.rs +++ b/packages/udp-tracker-server/src/server/launcher.rs @@ -17,11 +17,10 @@ use tracing::instrument; use super::request_buffer::ActiveRequests; use crate::container::UdpTrackerServerContainer; +use crate::event::{ConnectionContext, Event}; use crate::server::bound_socket::BoundSocket; use crate::server::processor::Processor; use crate::server::receiver::Receiver; -use crate::statistics; -use crate::statistics::event::ConnectionContext; const IP_BANS_RESET_INTERVAL_IN_SECS: u64 = 3600; @@ -173,7 +172,7 @@ impl Launcher { if let Some(udp_server_stats_event_sender) = udp_tracker_server_container.udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender - .send_event(statistics::event::Event::UdpRequestReceived { + .send_event(Event::UdpRequestReceived { context: ConnectionContext::new(client_socket_addr, server_socket_addr), }) .await; @@ -186,7 +185,7 @@ impl Launcher { udp_tracker_server_container.udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender - .send_event(statistics::event::Event::UdpRequestBanned { + .send_event(Event::UdpRequestBanned { context: ConnectionContext::new(client_socket_addr, server_socket_addr), }) .await; @@ -228,7 +227,7 @@ impl Launcher { udp_tracker_server_container.udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender - .send_event(statistics::event::Event::UdpRequestAborted { + .send_event(Event::UdpRequestAborted { context: ConnectionContext::new(client_socket_addr, server_socket_addr), }) .await; diff --git a/packages/udp-tracker-server/src/server/processor.rs b/packages/udp-tracker-server/src/server/processor.rs index 4d1e4429a..02e084356 100644 --- a/packages/udp-tracker-server/src/server/processor.rs +++ b/packages/udp-tracker-server/src/server/processor.rs @@ -11,9 +11,9 @@ use tracing::{instrument, Level}; use super::bound_socket::BoundSocket; use crate::container::UdpTrackerServerContainer; +use crate::event::{self, ConnectionContext, Event, UdpRequestKind}; use crate::handlers::CookieTimeValues; -use crate::statistics::event::{ConnectionContext, UdpRequestKind}; -use crate::{handlers, statistics, RawRequest}; +use crate::{handlers, RawRequest}; pub struct Processor { socket: Arc, @@ -77,16 +77,16 @@ impl Processor { }; let udp_response_kind = match &response { - Response::Connect(_) => statistics::event::UdpResponseKind::Ok { - req_kind: statistics::event::UdpRequestKind::Connect, + Response::Connect(_) => event::UdpResponseKind::Ok { + req_kind: event::UdpRequestKind::Connect, }, - Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => statistics::event::UdpResponseKind::Ok { - req_kind: statistics::event::UdpRequestKind::Announce, + Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => event::UdpResponseKind::Ok { + req_kind: event::UdpRequestKind::Announce, }, - Response::Scrape(_) => statistics::event::UdpResponseKind::Ok { - req_kind: statistics::event::UdpRequestKind::Scrape, + Response::Scrape(_) => event::UdpResponseKind::Ok { + req_kind: event::UdpRequestKind::Scrape, }, - Response::Error(_e) => statistics::event::UdpResponseKind::Error { opt_req_kind: None }, + Response::Error(_e) => event::UdpResponseKind::Error { opt_req_kind: None }, }; let mut writer = Cursor::new(Vec::with_capacity(200)); @@ -108,7 +108,7 @@ impl Processor { self.udp_tracker_server_container.udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender - .send_event(statistics::event::Event::UdpResponseSent { + .send_event(Event::UdpResponseSent { context: ConnectionContext::new(client_socket_addr, self.socket.address()), kind: udp_response_kind, req_processing_time, diff --git a/packages/udp-tracker-server/src/statistics/event/handler.rs b/packages/udp-tracker-server/src/statistics/event/handler.rs index 6abf7d3c7..f65a1e567 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler.rs @@ -1,4 +1,4 @@ -use crate::statistics::event::{Event, UdpRequestKind, UdpResponseKind}; +use crate::event::{Event, UdpRequestKind, UdpResponseKind}; use crate::statistics::repository::Repository; /// # Panics @@ -100,8 +100,8 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { mod tests { use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + use crate::event::{ConnectionContext, Event, UdpRequestKind}; use crate::statistics::event::handler::handle_event; - use crate::statistics::event::{ConnectionContext, Event, UdpRequestKind}; use crate::statistics::repository::Repository; #[tokio::test] @@ -209,7 +209,7 @@ mod tests { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), ), - kind: crate::statistics::event::UdpRequestKind::Connect, + kind: crate::event::UdpRequestKind::Connect, }, &stats_repository, ) @@ -230,7 +230,7 @@ mod tests { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), ), - kind: crate::statistics::event::UdpRequestKind::Announce, + kind: crate::event::UdpRequestKind::Announce, }, &stats_repository, ) @@ -251,7 +251,7 @@ mod tests { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), ), - kind: crate::statistics::event::UdpRequestKind::Scrape, + kind: crate::event::UdpRequestKind::Scrape, }, &stats_repository, ) @@ -272,7 +272,7 @@ mod tests { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), ), - kind: crate::statistics::event::UdpResponseKind::Ok { + kind: crate::event::UdpResponseKind::Ok { req_kind: UdpRequestKind::Announce, }, req_processing_time: std::time::Duration::from_secs(1), @@ -316,7 +316,7 @@ mod tests { SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), ), - kind: crate::statistics::event::UdpRequestKind::Connect, + kind: crate::event::UdpRequestKind::Connect, }, &stats_repository, ) @@ -337,7 +337,7 @@ mod tests { SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), ), - kind: crate::statistics::event::UdpRequestKind::Announce, + kind: crate::event::UdpRequestKind::Announce, }, &stats_repository, ) @@ -358,7 +358,7 @@ mod tests { SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), ), - kind: crate::statistics::event::UdpRequestKind::Scrape, + kind: crate::event::UdpRequestKind::Scrape, }, &stats_repository, ) @@ -379,7 +379,7 @@ mod tests { SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), ), - kind: crate::statistics::event::UdpResponseKind::Ok { + kind: crate::event::UdpResponseKind::Ok { req_kind: UdpRequestKind::Announce, }, req_processing_time: std::time::Duration::from_secs(1), diff --git a/packages/udp-tracker-server/src/statistics/event/listener.rs b/packages/udp-tracker-server/src/statistics/event/listener.rs index b755cbf18..b23260747 100644 --- a/packages/udp-tracker-server/src/statistics/event/listener.rs +++ b/packages/udp-tracker-server/src/statistics/event/listener.rs @@ -1,7 +1,7 @@ use tokio::sync::broadcast; use super::handler::handle_event; -use super::Event; +use crate::event::Event; use crate::statistics::repository::Repository; pub async fn dispatch_events(mut receiver: broadcast::Receiver, stats_repository: Repository) { diff --git a/packages/udp-tracker-server/src/statistics/event/mod.rs b/packages/udp-tracker-server/src/statistics/event/mod.rs index 1b0be960b..dae683398 100644 --- a/packages/udp-tracker-server/src/statistics/event/mod.rs +++ b/packages/udp-tracker-server/src/statistics/event/mod.rs @@ -1,78 +1,2 @@ -use std::net::SocketAddr; -use std::time::Duration; - pub mod handler; pub mod listener; -pub mod sender; - -/// An statistics event. It is used to collect tracker metrics. -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum Event { - UdpRequestReceived { - context: ConnectionContext, - }, - UdpRequestAborted { - context: ConnectionContext, - }, - UdpRequestBanned { - context: ConnectionContext, - }, - UdpRequestAccepted { - context: ConnectionContext, - kind: UdpRequestKind, - }, - UdpResponseSent { - context: ConnectionContext, - kind: UdpResponseKind, - req_processing_time: Duration, - }, - UdpError { - context: ConnectionContext, - }, -} - -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum UdpRequestKind { - Connect, - Announce, - Scrape, -} - -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum UdpResponseKind { - Ok { - req_kind: UdpRequestKind, - }, - - /// There was an error handling the request. The error contains the request - /// kind if the request was parsed successfully. - Error { - opt_req_kind: Option, - }, -} - -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct ConnectionContext { - client_socket_addr: SocketAddr, - server_socket_addr: SocketAddr, -} - -impl ConnectionContext { - #[must_use] - pub fn new(client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) -> Self { - Self { - client_socket_addr, - server_socket_addr, - } - } - - #[must_use] - pub fn client_socket_addr(&self) -> SocketAddr { - self.client_socket_addr - } - - #[must_use] - pub fn server_socket_addr(&self) -> SocketAddr { - self.server_socket_addr - } -} diff --git a/packages/udp-tracker-server/src/statistics/keeper.rs b/packages/udp-tracker-server/src/statistics/keeper.rs index 099e0d0aa..62216ce88 100644 --- a/packages/udp-tracker-server/src/statistics/keeper.rs +++ b/packages/udp-tracker-server/src/statistics/keeper.rs @@ -1,8 +1,8 @@ use tokio::sync::broadcast::Receiver; use super::event::listener::dispatch_events; -use super::event::Event; use super::repository::Repository; +use crate::event::Event; /// The service responsible for keeping tracker metrics (listening to statistics events and handle them). /// diff --git a/packages/udp-tracker-server/src/statistics/setup.rs b/packages/udp-tracker-server/src/statistics/setup.rs index a9ac751c6..d8cc7bca9 100644 --- a/packages/udp-tracker-server/src/statistics/setup.rs +++ b/packages/udp-tracker-server/src/statistics/setup.rs @@ -1,44 +1,37 @@ //! Setup for the tracker statistics. //! -//! The [`factory`] function builds the structs needed for handling the tracker metrics. -use tokio::sync::broadcast; - -use super::event::sender::ChannelSender; -use crate::statistics; - -const CHANNEL_CAPACITY: usize = 1024; +//! The [`factory`] function builds the structs needed for handling the tracker +//! metrics. +use crate::event::sender::Broadcaster; +use crate::{event, statistics}; /// It builds the structs needed for handling the tracker metrics. /// /// It returns: /// -/// - An statistics event [`Sender`](crate::statistics::event::sender::Sender) that allows you to send events related to statistics. -/// - An statistics [`Repository`](crate::statistics::repository::Repository) which is an in-memory repository for the tracker metrics. +/// - An event [`Sender`](crate::event::sender::Sender) that allows you to send +/// events related to statistics. +/// - An statistics [`Repository`](crate::statistics::repository::Repository) +/// which is an in-memory repository for the tracker metrics. /// -/// When the input argument `tracker_usage_statistics`is false the setup does not run the event listeners, consequently the statistics -/// events are sent are received but not dispatched to the handler. +/// When the input argument `tracker_usage_statistics`is false the setup does +/// not run the event listeners, consequently the statistics events are sent are +/// received but not dispatched to the handler. #[must_use] -pub fn factory( - tracker_usage_statistics: bool, -) -> ( - Option>, - statistics::repository::Repository, -) { - let mut stats_event_sender: Option> = None; - +pub fn factory(tracker_usage_statistics: bool) -> (Option>, statistics::repository::Repository) { let mut keeper = statistics::keeper::Keeper::new(); - if tracker_usage_statistics { - let (sender, _) = broadcast::channel(CHANNEL_CAPACITY); + let opt_event_sender: Option> = if tracker_usage_statistics { + let broadcaster = Broadcaster::default(); - let receiver = sender.subscribe(); + keeper.run_event_listener(broadcaster.subscribe()); - stats_event_sender = Some(Box::new(ChannelSender { sender })); - - keeper.run_event_listener(receiver); - } + Some(Box::new(broadcaster)) + } else { + None + }; - (stats_event_sender, keeper.repository) + (opt_event_sender, keeper.repository) } #[cfg(test)] diff --git a/src/container.rs b/src/container.rs index 7822b5d61..3fcda55f0 100644 --- a/src/container.rs +++ b/src/container.rs @@ -66,7 +66,7 @@ pub struct AppContainer { pub http_scrape_service: Arc, // UDP Tracker Server Services - pub udp_server_stats_event_sender: Arc>>, + pub udp_server_stats_event_sender: Arc>>, pub udp_server_stats_repository: Arc, } From 055db4e67dd89183a9e838ba101c0567479de45c Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 19 Mar 2025 17:06:18 +0000 Subject: [PATCH 5/6] docs: [#1395] minor changes in comments --- packages/http-tracker-core/src/event/mod.rs | 2 +- packages/udp-tracker-core/src/event/mod.rs | 5 +---- packages/udp-tracker-server/src/event/mod.rs | 2 +- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/packages/http-tracker-core/src/event/mod.rs b/packages/http-tracker-core/src/event/mod.rs index da824c240..3db258238 100644 --- a/packages/http-tracker-core/src/event/mod.rs +++ b/packages/http-tracker-core/src/event/mod.rs @@ -2,7 +2,7 @@ use std::net::{IpAddr, SocketAddr}; pub mod sender; -/// An event. +/// A HTTP core event. #[derive(Debug, PartialEq, Eq, Clone)] pub enum Event { TcpAnnounce { connection: ConnectionContext }, diff --git a/packages/udp-tracker-core/src/event/mod.rs b/packages/udp-tracker-core/src/event/mod.rs index 48a5b501b..04b3170e2 100644 --- a/packages/udp-tracker-core/src/event/mod.rs +++ b/packages/udp-tracker-core/src/event/mod.rs @@ -2,10 +2,7 @@ use std::net::SocketAddr; pub mod sender; -/// An statistics event. It is used to collect tracker metrics. -/// -/// - `Udp` prefix means the event was triggered by the UDP tracker. -/// - The event suffix is the type of request: `announce`, `scrape` or `connection`. +/// A UDP core event. #[derive(Debug, PartialEq, Eq, Clone)] pub enum Event { UdpConnect { context: ConnectionContext }, diff --git a/packages/udp-tracker-server/src/event/mod.rs b/packages/udp-tracker-server/src/event/mod.rs index adc1396cc..0adf29c8b 100644 --- a/packages/udp-tracker-server/src/event/mod.rs +++ b/packages/udp-tracker-server/src/event/mod.rs @@ -3,7 +3,7 @@ use std::time::Duration; pub mod sender; -/// An statistics event. It is used to collect tracker metrics. +/// A UDP server event. #[derive(Debug, PartialEq, Eq, Clone)] pub enum Event { UdpRequestReceived { From 9eba80fa62f0f8e971c3c6fd726c681eb1fdd2fa Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 19 Mar 2025 17:07:31 +0000 Subject: [PATCH 6/6] refactor: [#1395] rename send_stats_event to send_event Events are now generic even if they are only used for stats for now. --- packages/http-tracker-core/src/services/announce.rs | 4 ++-- packages/http-tracker-core/src/services/scrape.rs | 10 ++-------- packages/udp-tracker-core/src/services/announce.rs | 4 ++-- packages/udp-tracker-core/src/services/scrape.rs | 4 ++-- 4 files changed, 8 insertions(+), 14 deletions(-) diff --git a/packages/http-tracker-core/src/services/announce.rs b/packages/http-tracker-core/src/services/announce.rs index cd7417e98..f8d2e0b11 100644 --- a/packages/http-tracker-core/src/services/announce.rs +++ b/packages/http-tracker-core/src/services/announce.rs @@ -87,7 +87,7 @@ impl AnnounceService { .announce(&announce_request.info_hash, &mut peer, &remote_client_ip, &peers_wanted) .await?; - self.send_stats_event(remote_client_ip, opt_remote_client_port, *server_socket_addr) + self.send_event(remote_client_ip, opt_remote_client_port, *server_socket_addr) .await; Ok(announce_data) @@ -138,7 +138,7 @@ impl AnnounceService { } } - async fn send_stats_event(&self, peer_ip: IpAddr, opt_peer_ip_port: Option, server_socket_addr: SocketAddr) { + async fn send_event(&self, peer_ip: IpAddr, opt_peer_ip_port: Option, server_socket_addr: SocketAddr) { if let Some(http_stats_event_sender) = self.opt_http_stats_event_sender.as_deref() { http_stats_event_sender .send_event(Event::TcpAnnounce { diff --git a/packages/http-tracker-core/src/services/scrape.rs b/packages/http-tracker-core/src/services/scrape.rs index 1f4c14b5a..c9b3182f8 100644 --- a/packages/http-tracker-core/src/services/scrape.rs +++ b/packages/http-tracker-core/src/services/scrape.rs @@ -82,8 +82,7 @@ impl ScrapeService { let (remote_client_ip, opt_client_port) = self.resolve_remote_client_ip(client_ip_sources)?; - self.send_stats_event(remote_client_ip, opt_client_port, *server_socket_addr) - .await; + self.send_event(remote_client_ip, opt_client_port, *server_socket_addr).await; Ok(scrape_data) } @@ -118,12 +117,7 @@ impl ScrapeService { Ok((ip, port)) } - async fn send_stats_event( - &self, - original_peer_ip: IpAddr, - opt_original_peer_port: Option, - server_socket_addr: SocketAddr, - ) { + async fn send_event(&self, original_peer_ip: IpAddr, opt_original_peer_port: Option, server_socket_addr: SocketAddr) { if let Some(http_stats_event_sender) = self.opt_http_stats_event_sender.as_deref() { http_stats_event_sender .send_event(Event::TcpScrape { diff --git a/packages/udp-tracker-core/src/services/announce.rs b/packages/udp-tracker-core/src/services/announce.rs index bba9b51fc..d99618316 100644 --- a/packages/udp-tracker-core/src/services/announce.rs +++ b/packages/udp-tracker-core/src/services/announce.rs @@ -79,7 +79,7 @@ impl AnnounceService { .announce(&info_hash, &mut peer, &remote_client_ip, &peers_wanted) .await?; - self.send_stats_event(client_socket_addr, server_socket_addr).await; + self.send_event(client_socket_addr, server_socket_addr).await; Ok(announce_data) } @@ -100,7 +100,7 @@ impl AnnounceService { self.whitelist_authorization.authorize(info_hash).await } - async fn send_stats_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) { + async fn send_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) { if let Some(udp_stats_event_sender) = self.opt_udp_core_stats_event_sender.as_deref() { udp_stats_event_sender .send_event(Event::UdpAnnounce { diff --git a/packages/udp-tracker-core/src/services/scrape.rs b/packages/udp-tracker-core/src/services/scrape.rs index 9f0941c2a..3b6898311 100644 --- a/packages/udp-tracker-core/src/services/scrape.rs +++ b/packages/udp-tracker-core/src/services/scrape.rs @@ -61,7 +61,7 @@ impl ScrapeService { .scrape(&Self::convert_from_aquatic(&request.info_hashes)) .await?; - self.send_stats_event(client_socket_addr, server_socket_addr).await; + self.send_event(client_socket_addr, server_socket_addr).await; Ok(scrape_data) } @@ -82,7 +82,7 @@ impl ScrapeService { aquatic_infohashes.iter().map(|&x| x.into()).collect() } - async fn send_stats_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) { + async fn send_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) { if let Some(udp_stats_event_sender) = self.opt_udp_stats_event_sender.as_deref() { udp_stats_event_sender .send_event(Event::UdpScrape {