From 64c7b21fb812fd14e4865de1500a718b8bbd4929 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 18 Mar 2025 16:19:29 +0000 Subject: [PATCH 1/2] refactor: [#1388] minor changes to HTTP core events --- .../src/statistics/event/mod.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/packages/http-tracker-core/src/statistics/event/mod.rs b/packages/http-tracker-core/src/statistics/event/mod.rs index 7520e1a9..2964956d 100644 --- a/packages/http-tracker-core/src/statistics/event/mod.rs +++ b/packages/http-tracker-core/src/statistics/event/mod.rs @@ -5,13 +5,13 @@ pub mod listener; pub mod sender; /// An statistics event. It is used to collect tracker metrics. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum Event { TcpAnnounce { connection: ConnectionContext }, TcpScrape { connection: ConnectionContext }, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct ConnectionContext { client: ClientConnectionContext, server: ServerConnectionContext, @@ -35,9 +35,19 @@ impl ConnectionContext { pub fn client_ip_addr(&self) -> IpAddr { self.client.ip_addr } + + #[must_use] + pub fn client_port(&self) -> Option { + self.client.port + } + + #[must_use] + pub fn server_socket_addr(&self) -> SocketAddr { + self.server.socket_addr + } } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct ClientConnectionContext { ip_addr: IpAddr, @@ -45,7 +55,7 @@ pub struct ClientConnectionContext { port: Option, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct ServerConnectionContext { socket_addr: SocketAddr, } From 5f9c4d3f2a04289055e6ccfc2f530cb82d3f47ea Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 18 Mar 2025 17:04:55 +0000 Subject: [PATCH 2/2] refactor: [#1388] change channel in HTTP core from mpsc to broadcast Stats events were introduced to collect tracker metrics. We only have global metrics (aggregate metrics for all UDP and HTTP trackers). This will change in the future. We will have: - Segregated metrics: one listeners per tracker (per socket). - Generic events: there could be other event consumers. Events will be decoupled from stats. This change allows multiple receivers in the channel. For now, we one use one listener but with this change will be easy to add more. --- .../http-tracker-core/benches/helpers/util.rs | 4 +- .../src/services/announce.rs | 10 ++--- .../http-tracker-core/src/services/scrape.rs | 12 +++--- .../src/statistics/event/listener.rs | 14 +++++-- .../src/statistics/event/sender.rs | 12 +++--- .../src/statistics/keeper.rs | 41 +------------------ .../http-tracker-core/src/statistics/setup.rs | 19 +++++++-- 7 files changed, 46 insertions(+), 66 deletions(-) diff --git a/packages/http-tracker-core/benches/helpers/util.rs b/packages/http-tracker-core/benches/helpers/util.rs index 169c4a56..19010041 100644 --- a/packages/http-tracker-core/benches/helpers/util.rs +++ b/packages/http-tracker-core/benches/helpers/util.rs @@ -108,11 +108,11 @@ pub fn sample_info_hash() -> InfoHash { use bittorrent_http_tracker_core::statistics; use futures::future::BoxFuture; use mockall::mock; -use tokio::sync::mpsc::error::SendError; +use tokio::sync::broadcast::error::SendError; mock! { HttpStatsEventSender {} impl statistics::event::sender::Sender for HttpStatsEventSender { - fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; + fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; } } diff --git a/packages/http-tracker-core/src/services/announce.rs b/packages/http-tracker-core/src/services/announce.rs index 6b8b700c..25fc1b86 100644 --- a/packages/http-tracker-core/src/services/announce.rs +++ b/packages/http-tracker-core/src/services/announce.rs @@ -315,7 +315,7 @@ mod tests { use futures::future::BoxFuture; use mockall::mock; - use tokio::sync::mpsc::error::SendError; + use tokio::sync::broadcast::error::SendError; use crate::statistics; use crate::tests::sample_info_hash; @@ -323,7 +323,7 @@ mod tests { mock! { HttpStatsEventSender {} impl statistics::event::sender::Sender for HttpStatsEventSender { - fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; + fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; } } @@ -395,7 +395,7 @@ mod tests { connection: ConnectionContext::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), Some(8080), server_socket_addr), })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); @@ -451,7 +451,7 @@ mod tests { connection: ConnectionContext::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), Some(8080), server_socket_addr), })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); @@ -494,7 +494,7 @@ mod tests { ), })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); diff --git a/packages/http-tracker-core/src/services/scrape.rs b/packages/http-tracker-core/src/services/scrape.rs index ed927efc..6341ed30 100644 --- a/packages/http-tracker-core/src/services/scrape.rs +++ b/packages/http-tracker-core/src/services/scrape.rs @@ -203,7 +203,7 @@ mod tests { use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist; use futures::future::BoxFuture; use mockall::mock; - use tokio::sync::mpsc::error::SendError; + use tokio::sync::broadcast::error::SendError; use torrust_tracker_configuration::Configuration; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; @@ -260,7 +260,7 @@ mod tests { mock! { HttpStatsEventSender {} impl statistics::event::sender::Sender for HttpStatsEventSender { - fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; + fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; } } @@ -359,7 +359,7 @@ mod tests { ), })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); @@ -408,7 +408,7 @@ mod tests { ), })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); @@ -529,7 +529,7 @@ mod tests { ), })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); @@ -578,7 +578,7 @@ mod tests { ), })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); diff --git a/packages/http-tracker-core/src/statistics/event/listener.rs b/packages/http-tracker-core/src/statistics/event/listener.rs index f1a2e25d..a70992a0 100644 --- a/packages/http-tracker-core/src/statistics/event/listener.rs +++ b/packages/http-tracker-core/src/statistics/event/listener.rs @@ -1,11 +1,17 @@ -use tokio::sync::mpsc; +use tokio::sync::broadcast; use super::handler::handle_event; use super::Event; use crate::statistics::repository::Repository; -pub async fn dispatch_events(mut receiver: mpsc::Receiver, stats_repository: Repository) { - while let Some(event) = receiver.recv().await { - handle_event(event, &stats_repository).await; +pub async fn dispatch_events(mut receiver: broadcast::Receiver, stats_repository: Repository) { + loop { + match receiver.recv().await { + Ok(event) => handle_event(event, &stats_repository).await, + Err(e) => { + tracing::error!("Error receiving http tracker core event: {:?}", e); + break; + } + } } } diff --git a/packages/http-tracker-core/src/statistics/event/sender.rs b/packages/http-tracker-core/src/statistics/event/sender.rs index ca4b4e21..9092a8e0 100644 --- a/packages/http-tracker-core/src/statistics/event/sender.rs +++ b/packages/http-tracker-core/src/statistics/event/sender.rs @@ -2,15 +2,15 @@ use futures::future::BoxFuture; use futures::FutureExt; #[cfg(test)] use mockall::{automock, predicate::str}; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::SendError; +use tokio::sync::broadcast; +use tokio::sync::broadcast::error::SendError; use super::Event; /// A trait to allow sending statistics events #[cfg_attr(test, automock)] pub trait Sender: Sync + Send { - fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>>; + fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>>; } /// An [`statistics::EventSender`](crate::statistics::event::sender::Sender) implementation. @@ -19,11 +19,11 @@ pub trait Sender: Sync + Send { /// [`statistics::Keeper`](crate::statistics::keeper::Keeper) #[allow(clippy::module_name_repetitions)] pub struct ChannelSender { - pub(crate) sender: mpsc::Sender, + pub(crate) sender: broadcast::Sender, } impl Sender for ChannelSender { - fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>> { - async move { Some(self.sender.send(event).await) }.boxed() + fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>> { + async move { Some(self.sender.send(event)) }.boxed() } } diff --git a/packages/http-tracker-core/src/statistics/keeper.rs b/packages/http-tracker-core/src/statistics/keeper.rs index 783309ef..f4428ec7 100644 --- a/packages/http-tracker-core/src/statistics/keeper.rs +++ b/packages/http-tracker-core/src/statistics/keeper.rs @@ -1,12 +1,9 @@ -use tokio::sync::mpsc; +use tokio::sync::broadcast::Receiver; use super::event::listener::dispatch_events; -use super::event::sender::{ChannelSender, Sender}; use super::event::Event; use super::repository::Repository; -const CHANNEL_BUFFER_SIZE: usize = 65_535; - /// The service responsible for keeping tracker metrics (listening to statistics events and handle them). /// /// It actively listen to new statistics events. When it receives a new event @@ -29,31 +26,16 @@ impl Keeper { } } - #[must_use] - pub fn new_active_instance() -> (Box, Repository) { - let mut stats_tracker = Self::new(); - - let stats_event_sender = stats_tracker.run_event_listener(); - - (stats_event_sender, stats_tracker.repository) - } - - pub fn run_event_listener(&mut self) -> Box { - let (sender, receiver) = mpsc::channel::(CHANNEL_BUFFER_SIZE); - + pub fn run_event_listener(&mut self, receiver: Receiver) { let stats_repository = self.repository.clone(); tokio::spawn(async move { dispatch_events(receiver, stats_repository).await }); - - Box::new(ChannelSender { sender }) } } #[cfg(test)] mod tests { - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use crate::statistics::event::{ConnectionContext, Event}; use crate::statistics::keeper::Keeper; use crate::statistics::metrics::Metrics; @@ -65,23 +47,4 @@ mod tests { assert_eq!(stats.tcp4_announces_handled, Metrics::default().tcp4_announces_handled); } - - #[tokio::test] - async fn should_create_an_event_sender_to_send_statistical_events() { - let mut stats_tracker = Keeper::new(); - - let event_sender = stats_tracker.run_event_listener(); - - let result = event_sender - .send_event(Event::TcpAnnounce { - connection: ConnectionContext::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), - Some(8080), - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7070), - ), - }) - .await; - - assert!(result.is_some()); - } } diff --git a/packages/http-tracker-core/src/statistics/setup.rs b/packages/http-tracker-core/src/statistics/setup.rs index d3114a75..a9ac751c 100644 --- a/packages/http-tracker-core/src/statistics/setup.rs +++ b/packages/http-tracker-core/src/statistics/setup.rs @@ -1,8 +1,13 @@ //! Setup for the tracker statistics. //! //! The [`factory`] function builds the structs needed for handling the tracker metrics. +use tokio::sync::broadcast; + +use super::event::sender::ChannelSender; use crate::statistics; +const CHANNEL_CAPACITY: usize = 1024; + /// It builds the structs needed for handling the tracker metrics. /// /// It returns: @@ -19,15 +24,21 @@ pub fn factory( Option>, statistics::repository::Repository, ) { - let mut stats_event_sender = None; + let mut stats_event_sender: Option> = None; - let mut stats_tracker = statistics::keeper::Keeper::new(); + let mut keeper = statistics::keeper::Keeper::new(); if tracker_usage_statistics { - stats_event_sender = Some(stats_tracker.run_event_listener()); + let (sender, _) = broadcast::channel(CHANNEL_CAPACITY); + + let receiver = sender.subscribe(); + + stats_event_sender = Some(Box::new(ChannelSender { sender })); + + keeper.run_event_listener(receiver); } - (stats_event_sender, stats_tracker.repository) + (stats_event_sender, keeper.repository) } #[cfg(test)]