Skip to content

Commit 34c4249

Browse files
committed
Merge #1394: Overhaul stats: Use broadcast channel for events in UDP Server
37c8f2b refactor: [#1390] change channel in UDP server from mpsc to broadcast (Jose Celano) Pull request description: This continues [this refactor](#1391) for the UDP core package. ACKs for top commit: josecelano: ACK 37c8f2b Tree-SHA512: ca04ef293a6eb6d172a1a7dbbbc213d165274f574d15d5e8a521aea81795f067fb785c1307a64dae69ca31d1b2f30279082ea6ad10beb2ca13771101ea9b0069
2 parents 6806346 + 37c8f2b commit 34c4249

File tree

8 files changed

+41
-62
lines changed

8 files changed

+41
-62
lines changed

packages/udp-tracker-server/src/handlers/announce.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ mod tests {
430430
kind: UdpRequestKind::Announce,
431431
}))
432432
.times(1)
433-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
433+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
434434
let udp_server_stats_event_sender: Arc<Option<Box<dyn server_statistics::event::sender::Sender>>> =
435435
Arc::new(Some(Box::new(udp_server_stats_event_sender_mock)));
436436

@@ -773,7 +773,7 @@ mod tests {
773773
kind: UdpRequestKind::Announce,
774774
}))
775775
.times(1)
776-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
776+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
777777
let udp_server_stats_event_sender: Arc<Option<Box<dyn server_statistics::event::sender::Sender>>> =
778778
Arc::new(Some(Box::new(udp_server_stats_event_sender_mock)));
779779

@@ -866,7 +866,7 @@ mod tests {
866866
kind: UdpRequestKind::Announce,
867867
}))
868868
.times(1)
869-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
869+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
870870
let udp_server_stats_event_sender: Arc<Option<Box<dyn server_statistics::event::sender::Sender>>> =
871871
Arc::new(Some(Box::new(udp_server_stats_event_sender_mock)));
872872

packages/udp-tracker-server/src/handlers/connect.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ mod tests {
208208
kind: UdpRequestKind::Connect,
209209
}))
210210
.times(1)
211-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
211+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
212212
let udp_server_stats_event_sender: Arc<Option<Box<dyn server_statistics::event::sender::Sender>>> =
213213
Arc::new(Some(Box::new(udp_server_stats_event_sender_mock)));
214214

@@ -249,7 +249,7 @@ mod tests {
249249
kind: UdpRequestKind::Connect,
250250
}))
251251
.times(1)
252-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
252+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
253253
let udp_server_stats_event_sender: Arc<Option<Box<dyn server_statistics::event::sender::Sender>>> =
254254
Arc::new(Some(Box::new(udp_server_stats_event_sender_mock)));
255255

packages/udp-tracker-server/src/handlers/mod.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,6 @@ pub(crate) mod tests {
223223
use futures::future::BoxFuture;
224224
use mockall::mock;
225225
use tokio::sync::broadcast::error::SendError;
226-
use tokio::sync::mpsc::error::SendError as MpscSendError;
227226
use torrust_tracker_clock::clock::Time;
228227
use torrust_tracker_configuration::{Configuration, Core};
229228
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};
@@ -430,7 +429,7 @@ pub(crate) mod tests {
430429
mock! {
431430
pub(crate) UdpServerStatsEventSender {}
432431
impl server_statistics::event::sender::Sender for UdpServerStatsEventSender {
433-
fn send_event(&self, event: server_statistics::event::Event) -> BoxFuture<'static,Option<Result<(),MpscSendError<server_statistics::event::Event> > > > ;
432+
fn send_event(&self, event: server_statistics::event::Event) -> BoxFuture<'static,Option<Result<usize,SendError<server_statistics::event::Event> > > > ;
434433
}
435434
}
436435
}

packages/udp-tracker-server/src/handlers/scrape.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ mod tests {
373373
kind: server_statistics::event::UdpRequestKind::Scrape,
374374
}))
375375
.times(1)
376-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
376+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
377377
let udp_server_stats_event_sender: Arc<Option<Box<dyn server_statistics::event::sender::Sender>>> =
378378
Arc::new(Some(Box::new(udp_server_stats_event_sender_mock)));
379379

@@ -422,7 +422,7 @@ mod tests {
422422
kind: server_statistics::event::UdpRequestKind::Scrape,
423423
}))
424424
.times(1)
425-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
425+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
426426
let udp_server_stats_event_sender: Arc<Option<Box<dyn server_statistics::event::sender::Sender>>> =
427427
Arc::new(Some(Box::new(udp_server_stats_event_sender_mock)));
428428

Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
1-
use tokio::sync::mpsc;
1+
use tokio::sync::broadcast;
22

33
use super::handler::handle_event;
44
use super::Event;
55
use crate::statistics::repository::Repository;
66

7-
pub async fn dispatch_events(mut receiver: mpsc::Receiver<Event>, stats_repository: Repository) {
8-
while let Some(event) = receiver.recv().await {
9-
handle_event(event, &stats_repository).await;
7+
pub async fn dispatch_events(mut receiver: broadcast::Receiver<Event>, stats_repository: Repository) {
8+
loop {
9+
match receiver.recv().await {
10+
Ok(event) => handle_event(event, &stats_repository).await,
11+
Err(e) => {
12+
tracing::error!("Error receiving udp tracker server event: {:?}", e);
13+
break;
14+
}
15+
}
1016
}
1117
}

packages/udp-tracker-server/src/statistics/event/sender.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@ use futures::future::BoxFuture;
22
use futures::FutureExt;
33
#[cfg(test)]
44
use mockall::{automock, predicate::str};
5-
use tokio::sync::mpsc;
6-
use tokio::sync::mpsc::error::SendError;
5+
use tokio::sync::broadcast;
6+
use tokio::sync::broadcast::error::SendError;
77

88
use super::Event;
99

1010
/// A trait to allow sending statistics events
1111
#[cfg_attr(test, automock)]
1212
pub trait Sender: Sync + Send {
13-
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>>;
13+
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<usize, SendError<Event>>>>;
1414
}
1515

1616
/// An [`statistics::EventSender`](crate::statistics::event::sender::Sender) implementation.
@@ -19,11 +19,11 @@ pub trait Sender: Sync + Send {
1919
/// [`statistics::Keeper`](crate::statistics::keeper::Keeper)
2020
#[allow(clippy::module_name_repetitions)]
2121
pub struct ChannelSender {
22-
pub(crate) sender: mpsc::Sender<Event>,
22+
pub(crate) sender: broadcast::Sender<Event>,
2323
}
2424

2525
impl Sender for ChannelSender {
26-
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>> {
27-
async move { Some(self.sender.send(event).await) }.boxed()
26+
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<usize, SendError<Event>>>> {
27+
async move { Some(self.sender.send(event)) }.boxed()
2828
}
2929
}
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1-
use tokio::sync::mpsc;
1+
use tokio::sync::broadcast::Receiver;
22

33
use super::event::listener::dispatch_events;
4-
use super::event::sender::{ChannelSender, Sender};
54
use super::event::Event;
65
use super::repository::Repository;
76

8-
const CHANNEL_BUFFER_SIZE: usize = 65_535;
9-
107
/// The service responsible for keeping tracker metrics (listening to statistics events and handle them).
118
///
129
/// It actively listen to new statistics events. When it receives a new event
@@ -29,31 +26,15 @@ impl Keeper {
2926
}
3027
}
3128

32-
#[must_use]
33-
pub fn new_active_instance() -> (Box<dyn Sender>, Repository) {
34-
let mut stats_tracker = Self::new();
35-
36-
let stats_event_sender = stats_tracker.run_event_listener();
37-
38-
(stats_event_sender, stats_tracker.repository)
39-
}
40-
41-
pub fn run_event_listener(&mut self) -> Box<dyn Sender> {
42-
let (sender, receiver) = mpsc::channel::<Event>(CHANNEL_BUFFER_SIZE);
43-
29+
pub fn run_event_listener(&mut self, receiver: Receiver<Event>) {
4430
let stats_repository = self.repository.clone();
4531

4632
tokio::spawn(async move { dispatch_events(receiver, stats_repository).await });
47-
48-
Box::new(ChannelSender { sender })
4933
}
5034
}
5135

5236
#[cfg(test)]
5337
mod tests {
54-
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
55-
56-
use crate::statistics::event::{ConnectionContext, Event};
5738
use crate::statistics::keeper::Keeper;
5839
use crate::statistics::metrics::Metrics;
5940

@@ -65,22 +46,4 @@ mod tests {
6546

6647
assert_eq!(stats.udp4_requests, Metrics::default().udp4_requests);
6748
}
68-
69-
#[tokio::test]
70-
async fn should_create_an_event_sender_to_send_statistical_events() {
71-
let mut stats_tracker = Keeper::new();
72-
73-
let event_sender = stats_tracker.run_event_listener();
74-
75-
let result = event_sender
76-
.send_event(Event::UdpRequestReceived {
77-
context: ConnectionContext::new(
78-
SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080),
79-
SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969),
80-
),
81-
})
82-
.await;
83-
84-
assert!(result.is_some());
85-
}
8649
}

packages/udp-tracker-server/src/statistics/setup.rs

+15-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
//! Setup for the tracker statistics.
22
//!
33
//! The [`factory`] function builds the structs needed for handling the tracker metrics.
4+
use tokio::sync::broadcast;
5+
6+
use super::event::sender::ChannelSender;
47
use crate::statistics;
58

9+
const CHANNEL_CAPACITY: usize = 1024;
10+
611
/// It builds the structs needed for handling the tracker metrics.
712
///
813
/// It returns:
@@ -19,15 +24,21 @@ pub fn factory(
1924
Option<Box<dyn statistics::event::sender::Sender>>,
2025
statistics::repository::Repository,
2126
) {
22-
let mut stats_event_sender = None;
27+
let mut stats_event_sender: Option<Box<dyn statistics::event::sender::Sender>> = None;
2328

24-
let mut stats_tracker = statistics::keeper::Keeper::new();
29+
let mut keeper = statistics::keeper::Keeper::new();
2530

2631
if tracker_usage_statistics {
27-
stats_event_sender = Some(stats_tracker.run_event_listener());
32+
let (sender, _) = broadcast::channel(CHANNEL_CAPACITY);
33+
34+
let receiver = sender.subscribe();
35+
36+
stats_event_sender = Some(Box::new(ChannelSender { sender }));
37+
38+
keeper.run_event_listener(receiver);
2839
}
2940

30-
(stats_event_sender, stats_tracker.repository)
41+
(stats_event_sender, keeper.repository)
3142
}
3243

3344
#[cfg(test)]

0 commit comments

Comments
 (0)