Skip to content

Commit 6806346

Browse files
committed
Merge #1393: Overhaul stats: Use broadcast channel for events in UDP Core
d2de1de refactor: [#1389] change channel in UDP core from mpsc to broadcast (Jose Celano) Pull request description: This continues [this refactor](#1391) for the UDP core package. ACKs for top commit: josecelano: ACK d2de1de Tree-SHA512: 72ca8b7c24b691188bcecc8b2de83dec147b702621c3a340e113259131b1c023b268632894714b57ebbf84fe3147e3c6f57aa0a59a2f34d755eeaf0942d992c6
2 parents 50463ae + d2de1de commit 6806346

File tree

11 files changed

+48
-67
lines changed

11 files changed

+48
-67
lines changed

packages/udp-tracker-core/benches/helpers/utils.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
33
use bittorrent_udp_tracker_core::statistics;
44
use futures::future::BoxFuture;
55
use mockall::mock;
6-
use tokio::sync::mpsc::error::SendError;
6+
use tokio::sync::broadcast::error::SendError;
77

88
pub(crate) fn sample_ipv4_remote_addr() -> SocketAddr {
99
sample_ipv4_socket_address()
@@ -20,6 +20,6 @@ pub(crate) fn sample_issue_time() -> f64 {
2020
mock! {
2121
pub(crate) UdpCoreStatsEventSender {}
2222
impl statistics::event::sender::Sender for UdpCoreStatsEventSender {
23-
fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option<Result<(),SendError<statistics::event::Event> > > > ;
23+
fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option<Result<usize,SendError<statistics::event::Event> > > > ;
2424
}
2525
}

packages/udp-tracker-core/src/services/connect.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ mod tests {
142142
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
143143
}))
144144
.times(1)
145-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
145+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
146146
let opt_udp_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
147147
Arc::new(Some(Box::new(udp_stats_event_sender_mock)));
148148

@@ -165,7 +165,7 @@ mod tests {
165165
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
166166
}))
167167
.times(1)
168-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
168+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
169169
let opt_udp_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
170170
Arc::new(Some(Box::new(udp_stats_event_sender_mock)));
171171

packages/udp-tracker-core/src/services/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub(crate) mod tests {
1010

1111
use futures::future::BoxFuture;
1212
use mockall::mock;
13-
use tokio::sync::mpsc::error::SendError;
13+
use tokio::sync::broadcast::error::SendError;
1414

1515
use crate::connection_cookie::gen_remote_fingerprint;
1616
use crate::statistics;
@@ -46,7 +46,7 @@ pub(crate) mod tests {
4646
mock! {
4747
pub(crate) UdpCoreStatsEventSender {}
4848
impl statistics::event::sender::Sender for UdpCoreStatsEventSender {
49-
fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option<Result<(),SendError<statistics::event::Event> > > > ;
49+
fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option<Result<usize,SendError<statistics::event::Event> > > > ;
5050
}
5151
}
5252
}
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
1-
use tokio::sync::mpsc;
1+
use tokio::sync::broadcast;
22

33
use super::handler::handle_event;
44
use super::Event;
55
use crate::statistics::repository::Repository;
66

7-
pub async fn dispatch_events(mut receiver: mpsc::Receiver<Event>, stats_repository: Repository) {
8-
while let Some(event) = receiver.recv().await {
9-
handle_event(event, &stats_repository).await;
7+
pub async fn dispatch_events(mut receiver: broadcast::Receiver<Event>, stats_repository: Repository) {
8+
loop {
9+
match receiver.recv().await {
10+
Ok(event) => handle_event(event, &stats_repository).await,
11+
Err(e) => {
12+
tracing::error!("Error receiving udp tracker core event: {:?}", e);
13+
break;
14+
}
15+
}
1016
}
1117
}

packages/udp-tracker-core/src/statistics/event/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ pub mod sender;
88
///
99
/// - `Udp` prefix means the event was triggered by the UDP tracker.
1010
/// - The event suffix is the type of request: `announce`, `scrape` or `connection`.
11-
#[derive(Debug, PartialEq, Eq)]
11+
#[derive(Debug, PartialEq, Eq, Clone)]
1212
pub enum Event {
1313
UdpConnect { context: ConnectionContext },
1414
UdpAnnounce { context: ConnectionContext },
1515
UdpScrape { context: ConnectionContext },
1616
}
1717

18-
#[derive(Debug, PartialEq, Eq)]
18+
#[derive(Debug, PartialEq, Eq, Clone)]
1919
pub struct ConnectionContext {
2020
client_socket_addr: SocketAddr,
2121
server_socket_addr: SocketAddr,

packages/udp-tracker-core/src/statistics/event/sender.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@ use futures::future::BoxFuture;
22
use futures::FutureExt;
33
#[cfg(test)]
44
use mockall::{automock, predicate::str};
5-
use tokio::sync::mpsc;
6-
use tokio::sync::mpsc::error::SendError;
5+
use tokio::sync::broadcast;
6+
use tokio::sync::broadcast::error::SendError;
77

88
use super::Event;
99

1010
/// A trait to allow sending statistics events
1111
#[cfg_attr(test, automock)]
1212
pub trait Sender: Sync + Send {
13-
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>>;
13+
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<usize, SendError<Event>>>>;
1414
}
1515

1616
/// An [`statistics::EventSender`](crate::statistics::event::sender::Sender) implementation.
@@ -19,11 +19,11 @@ pub trait Sender: Sync + Send {
1919
/// [`statistics::Keeper`](crate::statistics::keeper::Keeper)
2020
#[allow(clippy::module_name_repetitions)]
2121
pub struct ChannelSender {
22-
pub(crate) sender: mpsc::Sender<Event>,
22+
pub(crate) sender: broadcast::Sender<Event>,
2323
}
2424

2525
impl Sender for ChannelSender {
26-
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>> {
27-
async move { Some(self.sender.send(event).await) }.boxed()
26+
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<usize, SendError<Event>>>> {
27+
async move { Some(self.sender.send(event)) }.boxed()
2828
}
2929
}
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1-
use tokio::sync::mpsc;
1+
use tokio::sync::broadcast::Receiver;
22

33
use super::event::listener::dispatch_events;
4-
use super::event::sender::{ChannelSender, Sender};
54
use super::event::Event;
65
use super::repository::Repository;
76

8-
const CHANNEL_BUFFER_SIZE: usize = 65_535;
9-
107
/// The service responsible for keeping tracker metrics (listening to statistics events and handle them).
118
///
129
/// It actively listen to new statistics events. When it receives a new event
@@ -29,31 +26,15 @@ impl Keeper {
2926
}
3027
}
3128

32-
#[must_use]
33-
pub fn new_active_instance() -> (Box<dyn Sender>, Repository) {
34-
let mut stats_tracker = Self::new();
35-
36-
let stats_event_sender = stats_tracker.run_event_listener();
37-
38-
(stats_event_sender, stats_tracker.repository)
39-
}
40-
41-
pub fn run_event_listener(&mut self) -> Box<dyn Sender> {
42-
let (sender, receiver) = mpsc::channel::<Event>(CHANNEL_BUFFER_SIZE);
43-
29+
pub fn run_event_listener(&mut self, receiver: Receiver<Event>) {
4430
let stats_repository = self.repository.clone();
4531

4632
tokio::spawn(async move { dispatch_events(receiver, stats_repository).await });
47-
48-
Box::new(ChannelSender { sender })
4933
}
5034
}
5135

5236
#[cfg(test)]
5337
mod tests {
54-
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
55-
56-
use crate::statistics::event::{ConnectionContext, Event};
5738
use crate::statistics::keeper::Keeper;
5839
use crate::statistics::metrics::Metrics;
5940

@@ -65,22 +46,4 @@ mod tests {
6546

6647
assert_eq!(stats.udp4_announces_handled, Metrics::default().udp4_announces_handled);
6748
}
68-
69-
#[tokio::test]
70-
async fn should_create_an_event_sender_to_send_statistical_events() {
71-
let mut stats_tracker = Keeper::new();
72-
73-
let event_sender = stats_tracker.run_event_listener();
74-
75-
let result = event_sender
76-
.send_event(Event::UdpConnect {
77-
context: ConnectionContext::new(
78-
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080),
79-
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969),
80-
),
81-
})
82-
.await;
83-
84-
assert!(result.is_some());
85-
}
8649
}

packages/udp-tracker-core/src/statistics/setup.rs

+15-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
//! Setup for the tracker statistics.
22
//!
33
//! The [`factory`] function builds the structs needed for handling the tracker metrics.
4+
use tokio::sync::broadcast;
5+
6+
use super::event::sender::ChannelSender;
47
use crate::statistics;
58

9+
const CHANNEL_CAPACITY: usize = 1024;
10+
611
/// It builds the structs needed for handling the tracker metrics.
712
///
813
/// It returns:
@@ -19,15 +24,21 @@ pub fn factory(
1924
Option<Box<dyn statistics::event::sender::Sender>>,
2025
statistics::repository::Repository,
2126
) {
22-
let mut stats_event_sender = None;
27+
let mut stats_event_sender: Option<Box<dyn statistics::event::sender::Sender>> = None;
2328

24-
let mut stats_tracker = statistics::keeper::Keeper::new();
29+
let mut keeper = statistics::keeper::Keeper::new();
2530

2631
if tracker_usage_statistics {
27-
stats_event_sender = Some(stats_tracker.run_event_listener());
32+
let (sender, _) = broadcast::channel(CHANNEL_CAPACITY);
33+
34+
let receiver = sender.subscribe();
35+
36+
stats_event_sender = Some(Box::new(ChannelSender { sender }));
37+
38+
keeper.run_event_listener(receiver);
2839
}
2940

30-
(stats_event_sender, stats_tracker.repository)
41+
(stats_event_sender, keeper.repository)
3142
}
3243

3344
#[cfg(test)]

packages/udp-tracker-server/src/handlers/announce.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -854,7 +854,7 @@ mod tests {
854854
context: core_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr),
855855
}))
856856
.times(1)
857-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
857+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
858858
let udp_core_stats_event_sender: Arc<Option<Box<dyn core_statistics::event::sender::Sender>>> =
859859
Arc::new(Some(Box::new(udp_core_stats_event_sender_mock)));
860860

packages/udp-tracker-server/src/handlers/connect.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ mod tests {
196196
context: core_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr),
197197
}))
198198
.times(1)
199-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
199+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
200200
let udp_core_stats_event_sender: Arc<Option<Box<dyn core_statistics::event::sender::Sender>>> =
201201
Arc::new(Some(Box::new(udp_core_stats_event_sender_mock)));
202202

@@ -237,7 +237,7 @@ mod tests {
237237
context: core_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr),
238238
}))
239239
.times(1)
240-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
240+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
241241
let udp_core_stats_event_sender: Arc<Option<Box<dyn core_statistics::event::sender::Sender>>> =
242242
Arc::new(Some(Box::new(udp_core_stats_event_sender_mock)));
243243

packages/udp-tracker-server/src/handlers/mod.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,8 @@ pub(crate) mod tests {
222222
use bittorrent_udp_tracker_core::{self, statistics as core_statistics};
223223
use futures::future::BoxFuture;
224224
use mockall::mock;
225-
use tokio::sync::mpsc::error::SendError;
225+
use tokio::sync::broadcast::error::SendError;
226+
use tokio::sync::mpsc::error::SendError as MpscSendError;
226227
use torrust_tracker_clock::clock::Time;
227228
use torrust_tracker_configuration::{Configuration, Core};
228229
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};
@@ -422,14 +423,14 @@ pub(crate) mod tests {
422423
mock! {
423424
pub(crate) UdpCoreStatsEventSender {}
424425
impl core_statistics::event::sender::Sender for UdpCoreStatsEventSender {
425-
fn send_event(&self, event: core_statistics::event::Event) -> BoxFuture<'static,Option<Result<(),SendError<core_statistics::event::Event> > > > ;
426+
fn send_event(&self, event: core_statistics::event::Event) -> BoxFuture<'static,Option<Result<usize,SendError<core_statistics::event::Event> > > > ;
426427
}
427428
}
428429

429430
mock! {
430431
pub(crate) UdpServerStatsEventSender {}
431432
impl server_statistics::event::sender::Sender for UdpServerStatsEventSender {
432-
fn send_event(&self, event: server_statistics::event::Event) -> BoxFuture<'static,Option<Result<(),SendError<server_statistics::event::Event> > > > ;
433+
fn send_event(&self, event: server_statistics::event::Event) -> BoxFuture<'static,Option<Result<(),MpscSendError<server_statistics::event::Event> > > > ;
433434
}
434435
}
435436
}

0 commit comments

Comments
 (0)