From 37c8f2bc8187197e81b1d7a338e490b739fe438e Mon Sep 17 00:00:00 2001
From: Jose Celano <josecelano@gmail.com>
Date: Wed, 19 Mar 2025 09:03:10 +0000
Subject: [PATCH] refactor: [#1390] change channel in UDP server from mpsc to
 broadcast

---
 .../src/handlers/announce.rs                  |  6 +--
 .../src/handlers/connect.rs                   |  4 +-
 .../udp-tracker-server/src/handlers/mod.rs    |  3 +-
 .../udp-tracker-server/src/handlers/scrape.rs |  4 +-
 .../src/statistics/event/listener.rs          | 14 +++++--
 .../src/statistics/event/sender.rs            | 12 +++---
 .../src/statistics/keeper.rs                  | 41 +------------------
 .../src/statistics/setup.rs                   | 19 +++++++--
 8 files changed, 41 insertions(+), 62 deletions(-)

diff --git a/packages/udp-tracker-server/src/handlers/announce.rs b/packages/udp-tracker-server/src/handlers/announce.rs
index 1988f3d79..a2cb55e59 100644
--- a/packages/udp-tracker-server/src/handlers/announce.rs
+++ b/packages/udp-tracker-server/src/handlers/announce.rs
@@ -430,7 +430,7 @@ mod tests {
                         kind: UdpRequestKind::Announce,
                     }))
                     .times(1)
-                    .returning(|_| Box::pin(future::ready(Some(Ok(())))));
+                    .returning(|_| Box::pin(future::ready(Some(Ok(1)))));
                 let udp_server_stats_event_sender: Arc<Option<Box<dyn server_statistics::event::sender::Sender>>> =
                     Arc::new(Some(Box::new(udp_server_stats_event_sender_mock)));
 
@@ -773,7 +773,7 @@ mod tests {
                         kind: UdpRequestKind::Announce,
                     }))
                     .times(1)
-                    .returning(|_| Box::pin(future::ready(Some(Ok(())))));
+                    .returning(|_| Box::pin(future::ready(Some(Ok(1)))));
                 let udp_server_stats_event_sender: Arc<Option<Box<dyn server_statistics::event::sender::Sender>>> =
                     Arc::new(Some(Box::new(udp_server_stats_event_sender_mock)));
 
@@ -866,7 +866,7 @@ mod tests {
                             kind: UdpRequestKind::Announce,
                         }))
                         .times(1)
-                        .returning(|_| Box::pin(future::ready(Some(Ok(())))));
+                        .returning(|_| Box::pin(future::ready(Some(Ok(1)))));
                     let udp_server_stats_event_sender: Arc<Option<Box<dyn server_statistics::event::sender::Sender>>> =
                         Arc::new(Some(Box::new(udp_server_stats_event_sender_mock)));
 
diff --git a/packages/udp-tracker-server/src/handlers/connect.rs b/packages/udp-tracker-server/src/handlers/connect.rs
index 7e96ce37a..992ef459d 100644
--- a/packages/udp-tracker-server/src/handlers/connect.rs
+++ b/packages/udp-tracker-server/src/handlers/connect.rs
@@ -208,7 +208,7 @@ mod tests {
                     kind: UdpRequestKind::Connect,
                 }))
                 .times(1)
-                .returning(|_| Box::pin(future::ready(Some(Ok(())))));
+                .returning(|_| Box::pin(future::ready(Some(Ok(1)))));
             let udp_server_stats_event_sender: Arc<Option<Box<dyn server_statistics::event::sender::Sender>>> =
                 Arc::new(Some(Box::new(udp_server_stats_event_sender_mock)));
 
@@ -249,7 +249,7 @@ mod tests {
                     kind: UdpRequestKind::Connect,
                 }))
                 .times(1)
-                .returning(|_| Box::pin(future::ready(Some(Ok(())))));
+                .returning(|_| Box::pin(future::ready(Some(Ok(1)))));
             let udp_server_stats_event_sender: Arc<Option<Box<dyn server_statistics::event::sender::Sender>>> =
                 Arc::new(Some(Box::new(udp_server_stats_event_sender_mock)));
 
diff --git a/packages/udp-tracker-server/src/handlers/mod.rs b/packages/udp-tracker-server/src/handlers/mod.rs
index 771147b4a..e573cc184 100644
--- a/packages/udp-tracker-server/src/handlers/mod.rs
+++ b/packages/udp-tracker-server/src/handlers/mod.rs
@@ -223,7 +223,6 @@ pub(crate) mod tests {
     use futures::future::BoxFuture;
     use mockall::mock;
     use tokio::sync::broadcast::error::SendError;
-    use tokio::sync::mpsc::error::SendError as MpscSendError;
     use torrust_tracker_clock::clock::Time;
     use torrust_tracker_configuration::{Configuration, Core};
     use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};
@@ -430,7 +429,7 @@ pub(crate) mod tests {
     mock! {
         pub(crate) UdpServerStatsEventSender {}
         impl server_statistics::event::sender::Sender for UdpServerStatsEventSender {
-             fn send_event(&self, event: server_statistics::event::Event) -> BoxFuture<'static,Option<Result<(),MpscSendError<server_statistics::event::Event> > > > ;
+             fn send_event(&self, event: server_statistics::event::Event) -> BoxFuture<'static,Option<Result<usize,SendError<server_statistics::event::Event> > > > ;
         }
     }
 }
diff --git a/packages/udp-tracker-server/src/handlers/scrape.rs b/packages/udp-tracker-server/src/handlers/scrape.rs
index db6b4a18b..fbf2b7c43 100644
--- a/packages/udp-tracker-server/src/handlers/scrape.rs
+++ b/packages/udp-tracker-server/src/handlers/scrape.rs
@@ -373,7 +373,7 @@ mod tests {
                         kind: server_statistics::event::UdpRequestKind::Scrape,
                     }))
                     .times(1)
-                    .returning(|_| Box::pin(future::ready(Some(Ok(())))));
+                    .returning(|_| Box::pin(future::ready(Some(Ok(1)))));
                 let udp_server_stats_event_sender: Arc<Option<Box<dyn server_statistics::event::sender::Sender>>> =
                     Arc::new(Some(Box::new(udp_server_stats_event_sender_mock)));
 
@@ -422,7 +422,7 @@ mod tests {
                         kind: server_statistics::event::UdpRequestKind::Scrape,
                     }))
                     .times(1)
-                    .returning(|_| Box::pin(future::ready(Some(Ok(())))));
+                    .returning(|_| Box::pin(future::ready(Some(Ok(1)))));
                 let udp_server_stats_event_sender: Arc<Option<Box<dyn server_statistics::event::sender::Sender>>> =
                     Arc::new(Some(Box::new(udp_server_stats_event_sender_mock)));
 
diff --git a/packages/udp-tracker-server/src/statistics/event/listener.rs b/packages/udp-tracker-server/src/statistics/event/listener.rs
index f1a2e25de..b755cbf18 100644
--- a/packages/udp-tracker-server/src/statistics/event/listener.rs
+++ b/packages/udp-tracker-server/src/statistics/event/listener.rs
@@ -1,11 +1,17 @@
-use tokio::sync::mpsc;
+use tokio::sync::broadcast;
 
 use super::handler::handle_event;
 use super::Event;
 use crate::statistics::repository::Repository;
 
-pub async fn dispatch_events(mut receiver: mpsc::Receiver<Event>, stats_repository: Repository) {
-    while let Some(event) = receiver.recv().await {
-        handle_event(event, &stats_repository).await;
+pub async fn dispatch_events(mut receiver: broadcast::Receiver<Event>, stats_repository: Repository) {
+    loop {
+        match receiver.recv().await {
+            Ok(event) => handle_event(event, &stats_repository).await,
+            Err(e) => {
+                tracing::error!("Error receiving udp tracker server event: {:?}", e);
+                break;
+            }
+        }
     }
 }
diff --git a/packages/udp-tracker-server/src/statistics/event/sender.rs b/packages/udp-tracker-server/src/statistics/event/sender.rs
index ca4b4e210..9092a8e0b 100644
--- a/packages/udp-tracker-server/src/statistics/event/sender.rs
+++ b/packages/udp-tracker-server/src/statistics/event/sender.rs
@@ -2,15 +2,15 @@ use futures::future::BoxFuture;
 use futures::FutureExt;
 #[cfg(test)]
 use mockall::{automock, predicate::str};
-use tokio::sync::mpsc;
-use tokio::sync::mpsc::error::SendError;
+use tokio::sync::broadcast;
+use tokio::sync::broadcast::error::SendError;
 
 use super::Event;
 
 /// A trait to allow sending statistics events
 #[cfg_attr(test, automock)]
 pub trait Sender: Sync + Send {
-    fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>>;
+    fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<usize, SendError<Event>>>>;
 }
 
 /// An [`statistics::EventSender`](crate::statistics::event::sender::Sender) implementation.
@@ -19,11 +19,11 @@ pub trait Sender: Sync + Send {
 /// [`statistics::Keeper`](crate::statistics::keeper::Keeper)
 #[allow(clippy::module_name_repetitions)]
 pub struct ChannelSender {
-    pub(crate) sender: mpsc::Sender<Event>,
+    pub(crate) sender: broadcast::Sender<Event>,
 }
 
 impl Sender for ChannelSender {
-    fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>> {
-        async move { Some(self.sender.send(event).await) }.boxed()
+    fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<usize, SendError<Event>>>> {
+        async move { Some(self.sender.send(event)) }.boxed()
     }
 }
diff --git a/packages/udp-tracker-server/src/statistics/keeper.rs b/packages/udp-tracker-server/src/statistics/keeper.rs
index 4ce832227..099e0d0aa 100644
--- a/packages/udp-tracker-server/src/statistics/keeper.rs
+++ b/packages/udp-tracker-server/src/statistics/keeper.rs
@@ -1,12 +1,9 @@
-use tokio::sync::mpsc;
+use tokio::sync::broadcast::Receiver;
 
 use super::event::listener::dispatch_events;
-use super::event::sender::{ChannelSender, Sender};
 use super::event::Event;
 use super::repository::Repository;
 
-const CHANNEL_BUFFER_SIZE: usize = 65_535;
-
 /// The service responsible for keeping tracker metrics (listening to statistics events and handle them).
 ///
 /// It actively listen to new statistics events. When it receives a new event
@@ -29,31 +26,15 @@ impl Keeper {
         }
     }
 
-    #[must_use]
-    pub fn new_active_instance() -> (Box<dyn Sender>, Repository) {
-        let mut stats_tracker = Self::new();
-
-        let stats_event_sender = stats_tracker.run_event_listener();
-
-        (stats_event_sender, stats_tracker.repository)
-    }
-
-    pub fn run_event_listener(&mut self) -> Box<dyn Sender> {
-        let (sender, receiver) = mpsc::channel::<Event>(CHANNEL_BUFFER_SIZE);
-
+    pub fn run_event_listener(&mut self, receiver: Receiver<Event>) {
         let stats_repository = self.repository.clone();
 
         tokio::spawn(async move { dispatch_events(receiver, stats_repository).await });
-
-        Box::new(ChannelSender { sender })
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use std::net::{IpAddr, Ipv6Addr, SocketAddr};
-
-    use crate::statistics::event::{ConnectionContext, Event};
     use crate::statistics::keeper::Keeper;
     use crate::statistics::metrics::Metrics;
 
@@ -65,22 +46,4 @@ mod tests {
 
         assert_eq!(stats.udp4_requests, Metrics::default().udp4_requests);
     }
-
-    #[tokio::test]
-    async fn should_create_an_event_sender_to_send_statistical_events() {
-        let mut stats_tracker = Keeper::new();
-
-        let event_sender = stats_tracker.run_event_listener();
-
-        let result = event_sender
-            .send_event(Event::UdpRequestReceived {
-                context: ConnectionContext::new(
-                    SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080),
-                    SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969),
-                ),
-            })
-            .await;
-
-        assert!(result.is_some());
-    }
 }
diff --git a/packages/udp-tracker-server/src/statistics/setup.rs b/packages/udp-tracker-server/src/statistics/setup.rs
index d3114a75e..a9ac751c6 100644
--- a/packages/udp-tracker-server/src/statistics/setup.rs
+++ b/packages/udp-tracker-server/src/statistics/setup.rs
@@ -1,8 +1,13 @@
 //! Setup for the tracker statistics.
 //!
 //! The [`factory`] function builds the structs needed for handling the tracker metrics.
+use tokio::sync::broadcast;
+
+use super::event::sender::ChannelSender;
 use crate::statistics;
 
+const CHANNEL_CAPACITY: usize = 1024;
+
 /// It builds the structs needed for handling the tracker metrics.
 ///
 /// It returns:
@@ -19,15 +24,21 @@ pub fn factory(
     Option<Box<dyn statistics::event::sender::Sender>>,
     statistics::repository::Repository,
 ) {
-    let mut stats_event_sender = None;
+    let mut stats_event_sender: Option<Box<dyn statistics::event::sender::Sender>> = None;
 
-    let mut stats_tracker = statistics::keeper::Keeper::new();
+    let mut keeper = statistics::keeper::Keeper::new();
 
     if tracker_usage_statistics {
-        stats_event_sender = Some(stats_tracker.run_event_listener());
+        let (sender, _) = broadcast::channel(CHANNEL_CAPACITY);
+
+        let receiver = sender.subscribe();
+
+        stats_event_sender = Some(Box::new(ChannelSender { sender }));
+
+        keeper.run_event_listener(receiver);
     }
 
-    (stats_event_sender, stats_tracker.repository)
+    (stats_event_sender, keeper.repository)
 }
 
 #[cfg(test)]