From 3d2243b9b032318568ae2dfdd311d2239bacfcc9 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 19 Mar 2025 11:57:12 +0000 Subject: [PATCH 1/2] refactor: [#1396] extract event module in HTTP core --- .../src/v1/handlers/scrape.rs | 2 +- .../http-tracker-core/benches/helpers/util.rs | 9 +-- packages/http-tracker-core/src/container.rs | 4 +- packages/http-tracker-core/src/event/mod.rs | 59 +++++++++++++++++++ .../src/{statistics => }/event/sender.rs | 7 +-- packages/http-tracker-core/src/lib.rs | 1 + .../src/services/announce.rs | 38 ++++++------ .../http-tracker-core/src/services/scrape.rs | 40 ++++++------- .../src/statistics/event/handler.rs | 4 +- .../src/statistics/event/listener.rs | 2 +- .../src/statistics/event/mod.rs | 59 ------------------- .../src/statistics/keeper.rs | 2 +- .../http-tracker-core/src/statistics/setup.rs | 13 ++-- src/container.rs | 2 +- 14 files changed, 119 insertions(+), 123 deletions(-) create mode 100644 packages/http-tracker-core/src/event/mod.rs rename packages/http-tracker-core/src/{statistics => }/event/sender.rs (70%) diff --git a/packages/axum-http-tracker-server/src/v1/handlers/scrape.rs b/packages/axum-http-tracker-server/src/v1/handlers/scrape.rs index 1ba89eaaf..e9544c983 100644 --- a/packages/axum-http-tracker-server/src/v1/handlers/scrape.rs +++ b/packages/axum-http-tracker-server/src/v1/handlers/scrape.rs @@ -103,7 +103,7 @@ mod tests { } struct CoreHttpTrackerServices { - pub http_stats_event_sender: Arc>>, + pub http_stats_event_sender: Arc>>, } fn initialize_private_tracker() -> (CoreTrackerServices, CoreHttpTrackerServices) { diff --git a/packages/http-tracker-core/benches/helpers/util.rs b/packages/http-tracker-core/benches/helpers/util.rs index 19010041e..dff516063 100644 --- a/packages/http-tracker-core/benches/helpers/util.rs +++ b/packages/http-tracker-core/benches/helpers/util.rs @@ -26,7 +26,7 @@ pub struct CoreTrackerServices { } pub struct CoreHttpTrackerServices { - pub http_stats_event_sender: Arc>>, + pub http_stats_event_sender: Arc>>, } pub fn initialize_core_tracker_services() -> (CoreTrackerServices, CoreHttpTrackerServices) { @@ -105,14 +105,15 @@ pub fn sample_info_hash() -> InfoHash { .expect("String should be a valid info hash") } -use bittorrent_http_tracker_core::statistics; +use bittorrent_http_tracker_core::event::Event; +use bittorrent_http_tracker_core::{event, statistics}; use futures::future::BoxFuture; use mockall::mock; use tokio::sync::broadcast::error::SendError; mock! { HttpStatsEventSender {} - impl statistics::event::sender::Sender for HttpStatsEventSender { - fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; + impl event::sender::Sender for HttpStatsEventSender { + fn send_event(&self, event: Event) -> BoxFuture<'static,Option > > > ; } } diff --git a/packages/http-tracker-core/src/container.rs b/packages/http-tracker-core/src/container.rs index 448dce246..bb9b5014c 100644 --- a/packages/http-tracker-core/src/container.rs +++ b/packages/http-tracker-core/src/container.rs @@ -9,7 +9,7 @@ use torrust_tracker_configuration::{Core, HttpTracker}; use crate::services::announce::AnnounceService; use crate::services::scrape::ScrapeService; -use crate::statistics; +use crate::{event, statistics}; pub struct HttpTrackerCoreContainer { // todo: replace with TrackerCoreContainer @@ -20,7 +20,7 @@ pub struct HttpTrackerCoreContainer { pub authentication_service: Arc, pub http_tracker_config: Arc, - pub http_stats_event_sender: Arc>>, + pub http_stats_event_sender: Arc>>, pub http_stats_repository: Arc, pub announce_service: Arc, pub scrape_service: Arc, diff --git a/packages/http-tracker-core/src/event/mod.rs b/packages/http-tracker-core/src/event/mod.rs new file mode 100644 index 000000000..da824c240 --- /dev/null +++ b/packages/http-tracker-core/src/event/mod.rs @@ -0,0 +1,59 @@ +use std::net::{IpAddr, SocketAddr}; + +pub mod sender; + +/// An event. +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum Event { + TcpAnnounce { connection: ConnectionContext }, + TcpScrape { connection: ConnectionContext }, +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct ConnectionContext { + client: ClientConnectionContext, + server: ServerConnectionContext, +} + +impl ConnectionContext { + #[must_use] + pub fn new(client_ip_addr: IpAddr, opt_client_port: Option, server_socket_addr: SocketAddr) -> Self { + Self { + client: ClientConnectionContext { + ip_addr: client_ip_addr, + port: opt_client_port, + }, + server: ServerConnectionContext { + socket_addr: server_socket_addr, + }, + } + } + + #[must_use] + pub fn client_ip_addr(&self) -> IpAddr { + self.client.ip_addr + } + + #[must_use] + pub fn client_port(&self) -> Option { + self.client.port + } + + #[must_use] + pub fn server_socket_addr(&self) -> SocketAddr { + self.server.socket_addr + } +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct ClientConnectionContext { + ip_addr: IpAddr, + + /// It's provided if you use the `torrust-axum-http-tracker-server` crate. + port: Option, +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct ServerConnectionContext { + socket_addr: SocketAddr, +} diff --git a/packages/http-tracker-core/src/statistics/event/sender.rs b/packages/http-tracker-core/src/event/sender.rs similarity index 70% rename from packages/http-tracker-core/src/statistics/event/sender.rs rename to packages/http-tracker-core/src/event/sender.rs index 9092a8e0b..59ab4496b 100644 --- a/packages/http-tracker-core/src/statistics/event/sender.rs +++ b/packages/http-tracker-core/src/event/sender.rs @@ -7,16 +7,13 @@ use tokio::sync::broadcast::error::SendError; use super::Event; -/// A trait to allow sending statistics events +/// A trait to allow sending events. #[cfg_attr(test, automock)] pub trait Sender: Sync + Send { fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>>; } -/// An [`statistics::EventSender`](crate::statistics::event::sender::Sender) implementation. -/// -/// It uses a channel sender to send the statistic events. The channel is created by a -/// [`statistics::Keeper`](crate::statistics::keeper::Keeper) +/// An event sender implementation using a broadcast channel. #[allow(clippy::module_name_repetitions)] pub struct ChannelSender { pub(crate) sender: broadcast::Sender, diff --git a/packages/http-tracker-core/src/lib.rs b/packages/http-tracker-core/src/lib.rs index b42b99f8e..0b0b3ba78 100644 --- a/packages/http-tracker-core/src/lib.rs +++ b/packages/http-tracker-core/src/lib.rs @@ -1,4 +1,5 @@ pub mod container; +pub mod event; pub mod services; pub mod statistics; diff --git a/packages/http-tracker-core/src/services/announce.rs b/packages/http-tracker-core/src/services/announce.rs index 25fc1b861..cd7417e98 100644 --- a/packages/http-tracker-core/src/services/announce.rs +++ b/packages/http-tracker-core/src/services/announce.rs @@ -5,7 +5,7 @@ //! It delegates the `announce` logic to the [`AnnounceHandler`] and it returns //! the [`AnnounceData`]. //! -//! It also sends an [`http_tracker_core::statistics::event::Event`] +//! It also sends an [`http_tracker_core::event::Event`] //! because events are specific for the HTTP tracker. use std::net::{IpAddr, SocketAddr}; use std::panic::Location; @@ -22,7 +22,8 @@ use bittorrent_tracker_core::whitelist; use torrust_tracker_configuration::Core; use torrust_tracker_primitives::core::AnnounceData; -use crate::statistics; +use crate::event; +use crate::event::Event; /// The HTTP tracker `announce` service. /// @@ -35,7 +36,7 @@ pub struct AnnounceService { announce_handler: Arc, authentication_service: Arc, whitelist_authorization: Arc, - opt_http_stats_event_sender: Arc>>, + opt_http_stats_event_sender: Arc>>, } impl AnnounceService { @@ -45,7 +46,7 @@ impl AnnounceService { announce_handler: Arc, authentication_service: Arc, whitelist_authorization: Arc, - opt_http_stats_event_sender: Arc>>, + opt_http_stats_event_sender: Arc>>, ) -> Self { Self { core_config, @@ -140,8 +141,8 @@ impl AnnounceService { async fn send_stats_event(&self, peer_ip: IpAddr, opt_peer_ip_port: Option, server_socket_addr: SocketAddr) { if let Some(http_stats_event_sender) = self.opt_http_stats_event_sender.as_deref() { http_stats_event_sender - .send_event(statistics::event::Event::TcpAnnounce { - connection: statistics::event::ConnectionContext::new(peer_ip, opt_peer_ip_port, server_socket_addr), + .send_event(Event::TcpAnnounce { + connection: event::ConnectionContext::new(peer_ip, opt_peer_ip_port, server_socket_addr), }) .await; } @@ -227,7 +228,7 @@ mod tests { } struct CoreHttpTrackerServices { - pub http_stats_event_sender: Arc>>, + pub http_stats_event_sender: Arc>>, } fn initialize_core_tracker_services() -> (CoreTrackerServices, CoreHttpTrackerServices) { @@ -317,13 +318,14 @@ mod tests { use mockall::mock; use tokio::sync::broadcast::error::SendError; - use crate::statistics; + use crate::event::Event; use crate::tests::sample_info_hash; + use crate::{event, statistics}; mock! { HttpStatsEventSender {} - impl statistics::event::sender::Sender for HttpStatsEventSender { - fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; + impl event::sender::Sender for HttpStatsEventSender { + fn send_event(&self, event: Event) -> BoxFuture<'static,Option > > > ; } } @@ -340,13 +342,13 @@ mod tests { use torrust_tracker_test_helpers::configuration; use super::{sample_peer_using_ipv4, sample_peer_using_ipv6}; + use crate::event; + use crate::event::{ConnectionContext, Event}; use crate::services::announce::tests::{ initialize_core_tracker_services, initialize_core_tracker_services_with_config, sample_announce_request_for_peer, sample_peer, MockHttpStatsEventSender, }; use crate::services::announce::AnnounceService; - use crate::statistics; - use crate::statistics::event::ConnectionContext; #[tokio::test] async fn it_should_return_the_announce_data() { @@ -391,12 +393,12 @@ mod tests { let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new(); http_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::TcpAnnounce { + .with(eq(Event::TcpAnnounce { connection: ConnectionContext::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), Some(8080), server_socket_addr), })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let http_stats_event_sender: Arc>> = + let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); let (core_tracker_services, mut core_http_tracker_services) = initialize_core_tracker_services(); @@ -447,12 +449,12 @@ mod tests { let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new(); http_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::TcpAnnounce { + .with(eq(Event::TcpAnnounce { connection: ConnectionContext::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), Some(8080), server_socket_addr), })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let http_stats_event_sender: Arc>> = + let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); let (core_tracker_services, mut core_http_tracker_services) = @@ -486,7 +488,7 @@ mod tests { let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new(); http_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::TcpAnnounce { + .with(eq(Event::TcpAnnounce { connection: ConnectionContext::new( IpAddr::V6(Ipv6Addr::new(0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969)), Some(8080), @@ -495,7 +497,7 @@ mod tests { })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let http_stats_event_sender: Arc>> = + let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); let (core_tracker_services, mut core_http_tracker_services) = initialize_core_tracker_services(); diff --git a/packages/http-tracker-core/src/services/scrape.rs b/packages/http-tracker-core/src/services/scrape.rs index 6341ed301..1f4c14b5a 100644 --- a/packages/http-tracker-core/src/services/scrape.rs +++ b/packages/http-tracker-core/src/services/scrape.rs @@ -19,8 +19,8 @@ use bittorrent_tracker_core::scrape_handler::ScrapeHandler; use torrust_tracker_configuration::Core; use torrust_tracker_primitives::core::ScrapeData; -use crate::statistics; -use crate::statistics::event::ConnectionContext; +use crate::event; +use crate::event::{ConnectionContext, Event}; /// The HTTP tracker `scrape` service. /// @@ -38,7 +38,7 @@ pub struct ScrapeService { core_config: Arc, scrape_handler: Arc, authentication_service: Arc, - opt_http_stats_event_sender: Arc>>, + opt_http_stats_event_sender: Arc>>, } impl ScrapeService { @@ -47,7 +47,7 @@ impl ScrapeService { core_config: Arc, scrape_handler: Arc, authentication_service: Arc, - opt_http_stats_event_sender: Arc>>, + opt_http_stats_event_sender: Arc>>, ) -> Self { Self { core_config, @@ -126,7 +126,7 @@ impl ScrapeService { ) { if let Some(http_stats_event_sender) = self.opt_http_stats_event_sender.as_deref() { http_stats_event_sender - .send_event(statistics::event::Event::TcpScrape { + .send_event(Event::TcpScrape { connection: ConnectionContext::new(original_peer_ip, opt_original_peer_port, server_socket_addr), }) .await; @@ -207,7 +207,7 @@ mod tests { use torrust_tracker_configuration::Configuration; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; - use crate::statistics; + use crate::event::{self, Event}; use crate::tests::sample_info_hash; struct Container { @@ -259,8 +259,8 @@ mod tests { mock! { HttpStatsEventSender {} - impl statistics::event::sender::Sender for HttpStatsEventSender { - fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; + impl event::sender::Sender for HttpStatsEventSender { + fn send_event(&self, event: Event) -> BoxFuture<'static,Option > > > ; } } @@ -278,13 +278,13 @@ mod tests { use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_test_helpers::configuration; + use crate::event::{ConnectionContext, Event}; use crate::services::scrape::tests::{ initialize_services_with_configuration, sample_info_hashes, sample_peer, MockHttpStatsEventSender, }; use crate::services::scrape::ScrapeService; - use crate::statistics; - use crate::statistics::event::ConnectionContext; use crate::tests::sample_info_hash; + use crate::{event, statistics}; #[tokio::test] async fn it_should_return_the_scrape_data_for_a_torrent() { @@ -351,7 +351,7 @@ mod tests { let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new(); http_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::TcpScrape { + .with(eq(Event::TcpScrape { connection: ConnectionContext::new( IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), Some(8080), @@ -360,7 +360,7 @@ mod tests { })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let http_stats_event_sender: Arc>> = + let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); let container = initialize_services_with_configuration(&config); @@ -400,7 +400,7 @@ mod tests { let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new(); http_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::TcpScrape { + .with(eq(Event::TcpScrape { connection: ConnectionContext::new( IpAddr::V6(Ipv6Addr::new(0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969)), Some(8080), @@ -409,7 +409,7 @@ mod tests { })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let http_stats_event_sender: Arc>> = + let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); let container = initialize_services_with_configuration(&config); @@ -454,13 +454,13 @@ mod tests { use torrust_tracker_primitives::core::ScrapeData; use torrust_tracker_test_helpers::configuration; + use crate::event::{ConnectionContext, Event}; use crate::services::scrape::tests::{ initialize_services_with_configuration, sample_info_hashes, sample_peer, MockHttpStatsEventSender, }; use crate::services::scrape::ScrapeService; - use crate::statistics; - use crate::statistics::event::ConnectionContext; use crate::tests::sample_info_hash; + use crate::{event, statistics}; #[tokio::test] async fn it_should_return_the_zeroed_scrape_data_when_the_tracker_is_running_in_private_mode_and_the_peer_is_not_authenticated( @@ -521,7 +521,7 @@ mod tests { let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new(); http_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::TcpScrape { + .with(eq(Event::TcpScrape { connection: ConnectionContext::new( IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), Some(8080), @@ -530,7 +530,7 @@ mod tests { })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let http_stats_event_sender: Arc>> = + let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); let peer_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)); @@ -570,7 +570,7 @@ mod tests { let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new(); http_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::TcpScrape { + .with(eq(Event::TcpScrape { connection: ConnectionContext::new( IpAddr::V6(Ipv6Addr::new(0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969)), Some(8080), @@ -579,7 +579,7 @@ mod tests { })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(1))))); - let http_stats_event_sender: Arc>> = + let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); let peer_ip = IpAddr::V6(Ipv6Addr::new(0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969)); diff --git a/packages/http-tracker-core/src/statistics/event/handler.rs b/packages/http-tracker-core/src/statistics/event/handler.rs index b8806b9d2..700e39476 100644 --- a/packages/http-tracker-core/src/statistics/event/handler.rs +++ b/packages/http-tracker-core/src/statistics/event/handler.rs @@ -1,6 +1,6 @@ use std::net::IpAddr; -use crate::statistics::event::Event; +use crate::event::Event; use crate::statistics::repository::Repository; /// # Panics @@ -34,8 +34,8 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { mod tests { use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + use crate::event::{ConnectionContext, Event}; use crate::statistics::event::handler::handle_event; - use crate::statistics::event::{ConnectionContext, Event}; use crate::statistics::repository::Repository; #[tokio::test] diff --git a/packages/http-tracker-core/src/statistics/event/listener.rs b/packages/http-tracker-core/src/statistics/event/listener.rs index a70992a02..a03a56a21 100644 --- a/packages/http-tracker-core/src/statistics/event/listener.rs +++ b/packages/http-tracker-core/src/statistics/event/listener.rs @@ -1,7 +1,7 @@ use tokio::sync::broadcast; use super::handler::handle_event; -use super::Event; +use crate::event::Event; use crate::statistics::repository::Repository; pub async fn dispatch_events(mut receiver: broadcast::Receiver, stats_repository: Repository) { diff --git a/packages/http-tracker-core/src/statistics/event/mod.rs b/packages/http-tracker-core/src/statistics/event/mod.rs index 2964956d8..dae683398 100644 --- a/packages/http-tracker-core/src/statistics/event/mod.rs +++ b/packages/http-tracker-core/src/statistics/event/mod.rs @@ -1,61 +1,2 @@ -use std::net::{IpAddr, SocketAddr}; - pub mod handler; pub mod listener; -pub mod sender; - -/// An statistics event. It is used to collect tracker metrics. -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum Event { - TcpAnnounce { connection: ConnectionContext }, - TcpScrape { connection: ConnectionContext }, -} - -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct ConnectionContext { - client: ClientConnectionContext, - server: ServerConnectionContext, -} - -impl ConnectionContext { - #[must_use] - pub fn new(client_ip_addr: IpAddr, opt_client_port: Option, server_socket_addr: SocketAddr) -> Self { - Self { - client: ClientConnectionContext { - ip_addr: client_ip_addr, - port: opt_client_port, - }, - server: ServerConnectionContext { - socket_addr: server_socket_addr, - }, - } - } - - #[must_use] - pub fn client_ip_addr(&self) -> IpAddr { - self.client.ip_addr - } - - #[must_use] - pub fn client_port(&self) -> Option { - self.client.port - } - - #[must_use] - pub fn server_socket_addr(&self) -> SocketAddr { - self.server.socket_addr - } -} - -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct ClientConnectionContext { - ip_addr: IpAddr, - - /// It's provided if you use the `torrust-axum-http-tracker-server` crate. - port: Option, -} - -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct ServerConnectionContext { - socket_addr: SocketAddr, -} diff --git a/packages/http-tracker-core/src/statistics/keeper.rs b/packages/http-tracker-core/src/statistics/keeper.rs index f4428ec70..01a7a1569 100644 --- a/packages/http-tracker-core/src/statistics/keeper.rs +++ b/packages/http-tracker-core/src/statistics/keeper.rs @@ -1,8 +1,8 @@ use tokio::sync::broadcast::Receiver; use super::event::listener::dispatch_events; -use super::event::Event; use super::repository::Repository; +use crate::event::Event; /// The service responsible for keeping tracker metrics (listening to statistics events and handle them). /// diff --git a/packages/http-tracker-core/src/statistics/setup.rs b/packages/http-tracker-core/src/statistics/setup.rs index a9ac751c6..ca31e5d52 100644 --- a/packages/http-tracker-core/src/statistics/setup.rs +++ b/packages/http-tracker-core/src/statistics/setup.rs @@ -3,8 +3,8 @@ //! The [`factory`] function builds the structs needed for handling the tracker metrics. use tokio::sync::broadcast; -use super::event::sender::ChannelSender; -use crate::statistics; +use crate::event::sender::ChannelSender; +use crate::{event, statistics}; const CHANNEL_CAPACITY: usize = 1024; @@ -18,13 +18,8 @@ const CHANNEL_CAPACITY: usize = 1024; /// When the input argument `tracker_usage_statistics`is false the setup does not run the event listeners, consequently the statistics /// events are sent are received but not dispatched to the handler. #[must_use] -pub fn factory( - tracker_usage_statistics: bool, -) -> ( - Option>, - statistics::repository::Repository, -) { - let mut stats_event_sender: Option> = None; +pub fn factory(tracker_usage_statistics: bool) -> (Option>, statistics::repository::Repository) { + let mut stats_event_sender: Option> = None; let mut keeper = statistics::keeper::Keeper::new(); diff --git a/src/container.rs b/src/container.rs index 07c30d604..1c8c9c1d3 100644 --- a/src/container.rs +++ b/src/container.rs @@ -60,7 +60,7 @@ pub struct AppContainer { pub udp_scrape_service: Arc, // HTTP Tracker Core Services - pub http_stats_event_sender: Arc>>, + pub http_stats_event_sender: Arc>>, pub http_stats_repository: Arc, pub http_announce_service: Arc, pub http_scrape_service: Arc, From 7e364d14ca468105054e42a92b19719209c63e38 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 19 Mar 2025 16:18:17 +0000 Subject: [PATCH 2/2] refactor: [#1396] move event channel creation to events mod in HTTP tracker core --- .../http-tracker-core/src/event/sender.rs | 23 +++++++++--- .../http-tracker-core/src/statistics/setup.rs | 35 +++++++++---------- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/packages/http-tracker-core/src/event/sender.rs b/packages/http-tracker-core/src/event/sender.rs index 59ab4496b..e9431abf2 100644 --- a/packages/http-tracker-core/src/event/sender.rs +++ b/packages/http-tracker-core/src/event/sender.rs @@ -7,20 +7,35 @@ use tokio::sync::broadcast::error::SendError; use super::Event; -/// A trait to allow sending events. +const CHANNEL_CAPACITY: usize = 1024; + +/// A trait for sending sending. #[cfg_attr(test, automock)] pub trait Sender: Sync + Send { fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>>; } /// An event sender implementation using a broadcast channel. -#[allow(clippy::module_name_repetitions)] -pub struct ChannelSender { +pub struct Broadcaster { pub(crate) sender: broadcast::Sender, } -impl Sender for ChannelSender { +impl Sender for Broadcaster { fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>> { async move { Some(self.sender.send(event)) }.boxed() } } + +impl Default for Broadcaster { + fn default() -> Self { + let (sender, _) = broadcast::channel(CHANNEL_CAPACITY); + Self { sender } + } +} + +impl Broadcaster { + #[must_use] + pub fn subscribe(&self) -> broadcast::Receiver { + self.sender.subscribe() + } +} diff --git a/packages/http-tracker-core/src/statistics/setup.rs b/packages/http-tracker-core/src/statistics/setup.rs index ca31e5d52..e2974e4c0 100644 --- a/packages/http-tracker-core/src/statistics/setup.rs +++ b/packages/http-tracker-core/src/statistics/setup.rs @@ -1,39 +1,36 @@ //! Setup for the tracker statistics. //! //! The [`factory`] function builds the structs needed for handling the tracker metrics. -use tokio::sync::broadcast; - -use crate::event::sender::ChannelSender; +use crate::event::sender::Broadcaster; use crate::{event, statistics}; -const CHANNEL_CAPACITY: usize = 1024; - /// It builds the structs needed for handling the tracker metrics. /// /// It returns: /// -/// - An statistics event [`Sender`](crate::statistics::event::sender::Sender) that allows you to send events related to statistics. -/// - An statistics [`Repository`](crate::statistics::repository::Repository) which is an in-memory repository for the tracker metrics. +/// - An event [`Sender`](crate::event::sender::Sender) that allows you to send +/// events related to statistics. +/// - An statistics [`Repository`](crate::statistics::repository::Repository) +/// which is an in-memory repository for the tracker metrics. /// -/// When the input argument `tracker_usage_statistics`is false the setup does not run the event listeners, consequently the statistics -/// events are sent are received but not dispatched to the handler. +/// When the input argument `tracker_usage_statistics`is false the setup does +/// not run the event listeners, consequently the statistics events are sent are +/// received but not dispatched to the handler. #[must_use] pub fn factory(tracker_usage_statistics: bool) -> (Option>, statistics::repository::Repository) { - let mut stats_event_sender: Option> = None; - let mut keeper = statistics::keeper::Keeper::new(); - if tracker_usage_statistics { - let (sender, _) = broadcast::channel(CHANNEL_CAPACITY); + let opt_event_sender: Option> = if tracker_usage_statistics { + let broadcaster = Broadcaster::default(); - let receiver = sender.subscribe(); + keeper.run_event_listener(broadcaster.subscribe()); - stats_event_sender = Some(Box::new(ChannelSender { sender })); - - keeper.run_event_listener(receiver); - } + Some(Box::new(broadcaster)) + } else { + None + }; - (stats_event_sender, keeper.repository) + (opt_event_sender, keeper.repository) } #[cfg(test)]