diff --git a/packages/udp-tracker-server/src/handlers/announce.rs b/packages/udp-tracker-server/src/handlers/announce.rs index 1988f3d7..a2cb55e5 100644 --- a/packages/udp-tracker-server/src/handlers/announce.rs +++ b/packages/udp-tracker-server/src/handlers/announce.rs @@ -430,7 +430,7 @@ mod tests { kind: UdpRequestKind::Announce, })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); @@ -773,7 +773,7 @@ mod tests { kind: UdpRequestKind::Announce, })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); @@ -866,7 +866,7 @@ mod tests { kind: UdpRequestKind::Announce, })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); diff --git a/packages/udp-tracker-server/src/handlers/connect.rs b/packages/udp-tracker-server/src/handlers/connect.rs index 7e96ce37..992ef459 100644 --- a/packages/udp-tracker-server/src/handlers/connect.rs +++ b/packages/udp-tracker-server/src/handlers/connect.rs @@ -208,7 +208,7 @@ mod tests { kind: UdpRequestKind::Connect, })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); @@ -249,7 +249,7 @@ mod tests { kind: UdpRequestKind::Connect, })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); diff --git a/packages/udp-tracker-server/src/handlers/mod.rs b/packages/udp-tracker-server/src/handlers/mod.rs index 771147b4..e573cc18 100644 --- a/packages/udp-tracker-server/src/handlers/mod.rs +++ b/packages/udp-tracker-server/src/handlers/mod.rs @@ -223,7 +223,6 @@ pub(crate) mod tests { use futures::future::BoxFuture; use mockall::mock; use tokio::sync::broadcast::error::SendError; - use tokio::sync::mpsc::error::SendError as MpscSendError; use torrust_tracker_clock::clock::Time; use torrust_tracker_configuration::{Configuration, Core}; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; @@ -430,7 +429,7 @@ pub(crate) mod tests { mock! { pub(crate) UdpServerStatsEventSender {} impl server_statistics::event::sender::Sender for UdpServerStatsEventSender { - fn send_event(&self, event: server_statistics::event::Event) -> BoxFuture<'static,Option > > > ; + fn send_event(&self, event: server_statistics::event::Event) -> BoxFuture<'static,Option > > > ; } } } diff --git a/packages/udp-tracker-server/src/handlers/scrape.rs b/packages/udp-tracker-server/src/handlers/scrape.rs index db6b4a18..fbf2b7c4 100644 --- a/packages/udp-tracker-server/src/handlers/scrape.rs +++ b/packages/udp-tracker-server/src/handlers/scrape.rs @@ -373,7 +373,7 @@ mod tests { kind: server_statistics::event::UdpRequestKind::Scrape, })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); @@ -422,7 +422,7 @@ mod tests { kind: server_statistics::event::UdpRequestKind::Scrape, })) .times(1) - .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + .returning(|_| Box::pin(future::ready(Some(Ok(1))))); let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); diff --git a/packages/udp-tracker-server/src/statistics/event/listener.rs b/packages/udp-tracker-server/src/statistics/event/listener.rs index f1a2e25d..b755cbf1 100644 --- a/packages/udp-tracker-server/src/statistics/event/listener.rs +++ b/packages/udp-tracker-server/src/statistics/event/listener.rs @@ -1,11 +1,17 @@ -use tokio::sync::mpsc; +use tokio::sync::broadcast; use super::handler::handle_event; use super::Event; use crate::statistics::repository::Repository; -pub async fn dispatch_events(mut receiver: mpsc::Receiver, stats_repository: Repository) { - while let Some(event) = receiver.recv().await { - handle_event(event, &stats_repository).await; +pub async fn dispatch_events(mut receiver: broadcast::Receiver, stats_repository: Repository) { + loop { + match receiver.recv().await { + Ok(event) => handle_event(event, &stats_repository).await, + Err(e) => { + tracing::error!("Error receiving udp tracker server event: {:?}", e); + break; + } + } } } diff --git a/packages/udp-tracker-server/src/statistics/event/sender.rs b/packages/udp-tracker-server/src/statistics/event/sender.rs index ca4b4e21..9092a8e0 100644 --- a/packages/udp-tracker-server/src/statistics/event/sender.rs +++ b/packages/udp-tracker-server/src/statistics/event/sender.rs @@ -2,15 +2,15 @@ use futures::future::BoxFuture; use futures::FutureExt; #[cfg(test)] use mockall::{automock, predicate::str}; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::SendError; +use tokio::sync::broadcast; +use tokio::sync::broadcast::error::SendError; use super::Event; /// A trait to allow sending statistics events #[cfg_attr(test, automock)] pub trait Sender: Sync + Send { - fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>>; + fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>>; } /// An [`statistics::EventSender`](crate::statistics::event::sender::Sender) implementation. @@ -19,11 +19,11 @@ pub trait Sender: Sync + Send { /// [`statistics::Keeper`](crate::statistics::keeper::Keeper) #[allow(clippy::module_name_repetitions)] pub struct ChannelSender { - pub(crate) sender: mpsc::Sender, + pub(crate) sender: broadcast::Sender, } impl Sender for ChannelSender { - fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>> { - async move { Some(self.sender.send(event).await) }.boxed() + fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>> { + async move { Some(self.sender.send(event)) }.boxed() } } diff --git a/packages/udp-tracker-server/src/statistics/keeper.rs b/packages/udp-tracker-server/src/statistics/keeper.rs index 4ce83222..099e0d0a 100644 --- a/packages/udp-tracker-server/src/statistics/keeper.rs +++ b/packages/udp-tracker-server/src/statistics/keeper.rs @@ -1,12 +1,9 @@ -use tokio::sync::mpsc; +use tokio::sync::broadcast::Receiver; use super::event::listener::dispatch_events; -use super::event::sender::{ChannelSender, Sender}; use super::event::Event; use super::repository::Repository; -const CHANNEL_BUFFER_SIZE: usize = 65_535; - /// The service responsible for keeping tracker metrics (listening to statistics events and handle them). /// /// It actively listen to new statistics events. When it receives a new event @@ -29,31 +26,15 @@ impl Keeper { } } - #[must_use] - pub fn new_active_instance() -> (Box, Repository) { - let mut stats_tracker = Self::new(); - - let stats_event_sender = stats_tracker.run_event_listener(); - - (stats_event_sender, stats_tracker.repository) - } - - pub fn run_event_listener(&mut self) -> Box { - let (sender, receiver) = mpsc::channel::(CHANNEL_BUFFER_SIZE); - + pub fn run_event_listener(&mut self, receiver: Receiver) { let stats_repository = self.repository.clone(); tokio::spawn(async move { dispatch_events(receiver, stats_repository).await }); - - Box::new(ChannelSender { sender }) } } #[cfg(test)] mod tests { - use std::net::{IpAddr, Ipv6Addr, SocketAddr}; - - use crate::statistics::event::{ConnectionContext, Event}; use crate::statistics::keeper::Keeper; use crate::statistics::metrics::Metrics; @@ -65,22 +46,4 @@ mod tests { assert_eq!(stats.udp4_requests, Metrics::default().udp4_requests); } - - #[tokio::test] - async fn should_create_an_event_sender_to_send_statistical_events() { - let mut stats_tracker = Keeper::new(); - - let event_sender = stats_tracker.run_event_listener(); - - let result = event_sender - .send_event(Event::UdpRequestReceived { - context: ConnectionContext::new( - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), - ), - }) - .await; - - assert!(result.is_some()); - } } diff --git a/packages/udp-tracker-server/src/statistics/setup.rs b/packages/udp-tracker-server/src/statistics/setup.rs index d3114a75..a9ac751c 100644 --- a/packages/udp-tracker-server/src/statistics/setup.rs +++ b/packages/udp-tracker-server/src/statistics/setup.rs @@ -1,8 +1,13 @@ //! Setup for the tracker statistics. //! //! The [`factory`] function builds the structs needed for handling the tracker metrics. +use tokio::sync::broadcast; + +use super::event::sender::ChannelSender; use crate::statistics; +const CHANNEL_CAPACITY: usize = 1024; + /// It builds the structs needed for handling the tracker metrics. /// /// It returns: @@ -19,15 +24,21 @@ pub fn factory( Option>, statistics::repository::Repository, ) { - let mut stats_event_sender = None; + let mut stats_event_sender: Option> = None; - let mut stats_tracker = statistics::keeper::Keeper::new(); + let mut keeper = statistics::keeper::Keeper::new(); if tracker_usage_statistics { - stats_event_sender = Some(stats_tracker.run_event_listener()); + let (sender, _) = broadcast::channel(CHANNEL_CAPACITY); + + let receiver = sender.subscribe(); + + stats_event_sender = Some(Box::new(ChannelSender { sender })); + + keeper.run_event_listener(receiver); } - (stats_event_sender, stats_tracker.repository) + (stats_event_sender, keeper.repository) } #[cfg(test)]