diff --git a/packages/http-tracker-core/benches/helpers/util.rs b/packages/http-tracker-core/benches/helpers/util.rs index 169c4a56..19010041 100644 --- a/packages/http-tracker-core/benches/helpers/util.rs +++ b/packages/http-tracker-core/benches/helpers/util.rs @@ -108,11 +108,11 @@ pub fn sample_info_hash() -> InfoHash { use bittorrent_http_tracker_core::statistics; use futures::future::BoxFuture; use mockall::mock; -use tokio::sync::mpsc::error::SendError; +use tokio::sync::broadcast::error::SendError; mock! { HttpStatsEventSender {} impl statistics::event::sender::Sender for HttpStatsEventSender { - fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; + fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; } } diff --git a/packages/http-tracker-core/src/services/announce.rs b/packages/http-tracker-core/src/services/announce.rs index 6b8b700c..25fc1b86 100644 --- a/packages/http-tracker-core/src/services/announce.rs +++ b/packages/http-tracker-core/src/services/announce.rs @@ -315,7 +315,7 @@ mod tests { use futures::future::BoxFuture; use mockall::mock; - use tokio::sync::mpsc::error::SendError; + use tokio::sync::broadcast::error::SendError; use crate::statistics; use crate::tests::sample_info_hash; @@ -323,7 +323,7 @@ mod tests { mock! { HttpStatsEventSender {} impl statistics::event::sender::Sender for HttpStatsEventSender { - fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; + fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; } } @@ -395,7 +395,7 @@ mod tests { connection: ConnectionContext::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), Some(8080), server_socket_addr), })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); @@ -451,7 +451,7 @@ mod tests { connection: ConnectionContext::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), Some(8080), server_socket_addr), })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); @@ -494,7 +494,7 @@ mod tests { ), })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); diff --git a/packages/http-tracker-core/src/services/scrape.rs b/packages/http-tracker-core/src/services/scrape.rs index ed927efc..6341ed30 100644 --- a/packages/http-tracker-core/src/services/scrape.rs +++ b/packages/http-tracker-core/src/services/scrape.rs @@ -203,7 +203,7 @@ mod tests { use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist; use futures::future::BoxFuture; use mockall::mock; - use tokio::sync::mpsc::error::SendError; + use tokio::sync::broadcast::error::SendError; use torrust_tracker_configuration::Configuration; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; @@ -260,7 +260,7 @@ mod tests { mock! { HttpStatsEventSender {} impl statistics::event::sender::Sender for HttpStatsEventSender { - fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; + fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; } } @@ -359,7 +359,7 @@ mod tests { ), })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); @@ -408,7 +408,7 @@ mod tests { ), })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); @@ -529,7 +529,7 @@ mod tests { ), })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); @@ -578,7 +578,7 @@ mod tests { ), })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let http_stats_event_sender: Arc>> = Arc::new(Some(Box::new(http_stats_event_sender_mock))); diff --git a/packages/http-tracker-core/src/statistics/event/listener.rs b/packages/http-tracker-core/src/statistics/event/listener.rs index f1a2e25d..a70992a0 100644 --- a/packages/http-tracker-core/src/statistics/event/listener.rs +++ b/packages/http-tracker-core/src/statistics/event/listener.rs @@ -1,11 +1,17 @@ -use tokio::sync::mpsc; +use tokio::sync::broadcast; use super::handler::handle_event; use super::Event; use crate::statistics::repository::Repository; -pub async fn dispatch_events(mut receiver: mpsc::Receiver, stats_repository: Repository) { - while let Some(event) = receiver.recv().await { - handle_event(event, &stats_repository).await; +pub async fn dispatch_events(mut receiver: broadcast::Receiver, stats_repository: Repository) { + loop { + match receiver.recv().await { + Ok(event) => handle_event(event, &stats_repository).await, + Err(e) => { + tracing::error!("Error receiving http tracker core event: {:?}", e); + break; + } + } } } diff --git a/packages/http-tracker-core/src/statistics/event/mod.rs b/packages/http-tracker-core/src/statistics/event/mod.rs index 7520e1a9..2964956d 100644 --- a/packages/http-tracker-core/src/statistics/event/mod.rs +++ b/packages/http-tracker-core/src/statistics/event/mod.rs @@ -5,13 +5,13 @@ pub mod listener; pub mod sender; /// An statistics event. It is used to collect tracker metrics. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum Event { TcpAnnounce { connection: ConnectionContext }, TcpScrape { connection: ConnectionContext }, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct ConnectionContext { client: ClientConnectionContext, server: ServerConnectionContext, @@ -35,9 +35,19 @@ impl ConnectionContext { pub fn client_ip_addr(&self) -> IpAddr { self.client.ip_addr } + + #[must_use] + pub fn client_port(&self) -> Option { + self.client.port + } + + #[must_use] + pub fn server_socket_addr(&self) -> SocketAddr { + self.server.socket_addr + } } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct ClientConnectionContext { ip_addr: IpAddr, @@ -45,7 +55,7 @@ pub struct ClientConnectionContext { port: Option, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct ServerConnectionContext { socket_addr: SocketAddr, } diff --git a/packages/http-tracker-core/src/statistics/event/sender.rs b/packages/http-tracker-core/src/statistics/event/sender.rs index ca4b4e21..9092a8e0 100644 --- a/packages/http-tracker-core/src/statistics/event/sender.rs +++ b/packages/http-tracker-core/src/statistics/event/sender.rs @@ -2,15 +2,15 @@ use futures::future::BoxFuture; use futures::FutureExt; #[cfg(test)] use mockall::{automock, predicate::str}; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::SendError; +use tokio::sync::broadcast; +use tokio::sync::broadcast::error::SendError; use super::Event; /// A trait to allow sending statistics events #[cfg_attr(test, automock)] pub trait Sender: Sync + Send { - fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>>; + fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>>; } /// An [`statistics::EventSender`](crate::statistics::event::sender::Sender) implementation. @@ -19,11 +19,11 @@ pub trait Sender: Sync + Send { /// [`statistics::Keeper`](crate::statistics::keeper::Keeper) #[allow(clippy::module_name_repetitions)] pub struct ChannelSender { - pub(crate) sender: mpsc::Sender, + pub(crate) sender: broadcast::Sender, } impl Sender for ChannelSender { - fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>> { - async move { Some(self.sender.send(event).await) }.boxed() + fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>> { + async move { Some(self.sender.send(event)) }.boxed() } } diff --git a/packages/http-tracker-core/src/statistics/keeper.rs b/packages/http-tracker-core/src/statistics/keeper.rs index 783309ef..f4428ec7 100644 --- a/packages/http-tracker-core/src/statistics/keeper.rs +++ b/packages/http-tracker-core/src/statistics/keeper.rs @@ -1,12 +1,9 @@ -use tokio::sync::mpsc; +use tokio::sync::broadcast::Receiver; use super::event::listener::dispatch_events; -use super::event::sender::{ChannelSender, Sender}; use super::event::Event; use super::repository::Repository; -const CHANNEL_BUFFER_SIZE: usize = 65_535; - /// The service responsible for keeping tracker metrics (listening to statistics events and handle them). /// /// It actively listen to new statistics events. When it receives a new event @@ -29,31 +26,16 @@ impl Keeper { } } - #[must_use] - pub fn new_active_instance() -> (Box, Repository) { - let mut stats_tracker = Self::new(); - - let stats_event_sender = stats_tracker.run_event_listener(); - - (stats_event_sender, stats_tracker.repository) - } - - pub fn run_event_listener(&mut self) -> Box { - let (sender, receiver) = mpsc::channel::(CHANNEL_BUFFER_SIZE); - + pub fn run_event_listener(&mut self, receiver: Receiver) { let stats_repository = self.repository.clone(); tokio::spawn(async move { dispatch_events(receiver, stats_repository).await }); - - Box::new(ChannelSender { sender }) } } #[cfg(test)] mod tests { - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use crate::statistics::event::{ConnectionContext, Event}; use crate::statistics::keeper::Keeper; use crate::statistics::metrics::Metrics; @@ -65,23 +47,4 @@ mod tests { assert_eq!(stats.tcp4_announces_handled, Metrics::default().tcp4_announces_handled); } - - #[tokio::test] - async fn should_create_an_event_sender_to_send_statistical_events() { - let mut stats_tracker = Keeper::new(); - - let event_sender = stats_tracker.run_event_listener(); - - let result = event_sender - .send_event(Event::TcpAnnounce { - connection: ConnectionContext::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), - Some(8080), - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7070), - ), - }) - .await; - - assert!(result.is_some()); - } } diff --git a/packages/http-tracker-core/src/statistics/setup.rs b/packages/http-tracker-core/src/statistics/setup.rs index d3114a75..a9ac751c 100644 --- a/packages/http-tracker-core/src/statistics/setup.rs +++ b/packages/http-tracker-core/src/statistics/setup.rs @@ -1,8 +1,13 @@ //! Setup for the tracker statistics. //! //! The [`factory`] function builds the structs needed for handling the tracker metrics. +use tokio::sync::broadcast; + +use super::event::sender::ChannelSender; use crate::statistics; +const CHANNEL_CAPACITY: usize = 1024; + /// It builds the structs needed for handling the tracker metrics. /// /// It returns: @@ -19,15 +24,21 @@ pub fn factory( Option>, statistics::repository::Repository, ) { - let mut stats_event_sender = None; + let mut stats_event_sender: Option> = None; - let mut stats_tracker = statistics::keeper::Keeper::new(); + let mut keeper = statistics::keeper::Keeper::new(); if tracker_usage_statistics { - stats_event_sender = Some(stats_tracker.run_event_listener()); + let (sender, _) = broadcast::channel(CHANNEL_CAPACITY); + + let receiver = sender.subscribe(); + + stats_event_sender = Some(Box::new(ChannelSender { sender })); + + keeper.run_event_listener(receiver); } - (stats_event_sender, stats_tracker.repository) + (stats_event_sender, keeper.repository) } #[cfg(test)]