Skip to content

Commit 4e3260a

Browse files
committed
refactor: [#1388] change channel in HTTP core from mpsc to broadcast
Stats events were introduced to colelct tracker metrics. WE only have global metrics (aggregate metrics for all UDP and HTTP trackers). This will change in the future: - Seggregated metrics: one listeners per tracker (per socket). - Generic events: there could be other event consumers. Events will be decoupled from stats. This change allows multiple receivers in the channel. For now, we one use one listener but with this change it will be easy to add more.
1 parent 64c7b21 commit 4e3260a

File tree

7 files changed

+44
-64
lines changed

7 files changed

+44
-64
lines changed

packages/http-tracker-core/benches/helpers/util.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,11 @@ pub fn sample_info_hash() -> InfoHash {
108108
use bittorrent_http_tracker_core::statistics;
109109
use futures::future::BoxFuture;
110110
use mockall::mock;
111-
use tokio::sync::mpsc::error::SendError;
111+
use tokio::sync::broadcast::error::SendError;
112112

113113
mock! {
114114
HttpStatsEventSender {}
115115
impl statistics::event::sender::Sender for HttpStatsEventSender {
116-
fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option<Result<(),SendError<statistics::event::Event> > > > ;
116+
fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option<Result<usize,SendError<statistics::event::Event> > > > ;
117117
}
118118
}

packages/http-tracker-core/src/services/announce.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -315,15 +315,15 @@ mod tests {
315315

316316
use futures::future::BoxFuture;
317317
use mockall::mock;
318-
use tokio::sync::mpsc::error::SendError;
318+
use tokio::sync::broadcast::error::SendError;
319319

320320
use crate::statistics;
321321
use crate::tests::sample_info_hash;
322322

323323
mock! {
324324
HttpStatsEventSender {}
325325
impl statistics::event::sender::Sender for HttpStatsEventSender {
326-
fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option<Result<(),SendError<statistics::event::Event> > > > ;
326+
fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option<Result<usize,SendError<statistics::event::Event> > > > ;
327327
}
328328
}
329329

@@ -395,7 +395,7 @@ mod tests {
395395
connection: ConnectionContext::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), Some(8080), server_socket_addr),
396396
}))
397397
.times(1)
398-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
398+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
399399
let http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
400400
Arc::new(Some(Box::new(http_stats_event_sender_mock)));
401401

@@ -451,7 +451,7 @@ mod tests {
451451
connection: ConnectionContext::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), Some(8080), server_socket_addr),
452452
}))
453453
.times(1)
454-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
454+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
455455
let http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
456456
Arc::new(Some(Box::new(http_stats_event_sender_mock)));
457457

@@ -494,7 +494,7 @@ mod tests {
494494
),
495495
}))
496496
.times(1)
497-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
497+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
498498
let http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
499499
Arc::new(Some(Box::new(http_stats_event_sender_mock)));
500500

packages/http-tracker-core/src/services/scrape.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ mod tests {
203203
use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist;
204204
use futures::future::BoxFuture;
205205
use mockall::mock;
206-
use tokio::sync::mpsc::error::SendError;
206+
use tokio::sync::broadcast::error::SendError;
207207
use torrust_tracker_configuration::Configuration;
208208
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};
209209

@@ -260,7 +260,7 @@ mod tests {
260260
mock! {
261261
HttpStatsEventSender {}
262262
impl statistics::event::sender::Sender for HttpStatsEventSender {
263-
fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option<Result<(),SendError<statistics::event::Event> > > > ;
263+
fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option<Result<usize,SendError<statistics::event::Event> > > > ;
264264
}
265265
}
266266

@@ -359,7 +359,7 @@ mod tests {
359359
),
360360
}))
361361
.times(1)
362-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
362+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
363363
let http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
364364
Arc::new(Some(Box::new(http_stats_event_sender_mock)));
365365

@@ -408,7 +408,7 @@ mod tests {
408408
),
409409
}))
410410
.times(1)
411-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
411+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
412412
let http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
413413
Arc::new(Some(Box::new(http_stats_event_sender_mock)));
414414

@@ -529,7 +529,7 @@ mod tests {
529529
),
530530
}))
531531
.times(1)
532-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
532+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
533533
let http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
534534
Arc::new(Some(Box::new(http_stats_event_sender_mock)));
535535

@@ -578,7 +578,7 @@ mod tests {
578578
),
579579
}))
580580
.times(1)
581-
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
581+
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
582582
let http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
583583
Arc::new(Some(Box::new(http_stats_event_sender_mock)));
584584

Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
1-
use tokio::sync::mpsc;
1+
use tokio::sync::broadcast;
22

33
use super::handler::handle_event;
44
use super::Event;
55
use crate::statistics::repository::Repository;
66

7-
pub async fn dispatch_events(mut receiver: mpsc::Receiver<Event>, stats_repository: Repository) {
8-
while let Some(event) = receiver.recv().await {
9-
handle_event(event, &stats_repository).await;
7+
pub async fn dispatch_events(mut receiver: broadcast::Receiver<Event>, stats_repository: Repository) {
8+
loop {
9+
match receiver.recv().await {
10+
Ok(event) => handle_event(event, &stats_repository).await,
11+
Err(e) => {
12+
tracing::error!("Error receiving http tracker core event: {:?}", e);
13+
break;
14+
}
15+
}
1016
}
1117
}

packages/http-tracker-core/src/statistics/event/sender.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@ use futures::future::BoxFuture;
22
use futures::FutureExt;
33
#[cfg(test)]
44
use mockall::{automock, predicate::str};
5-
use tokio::sync::mpsc;
6-
use tokio::sync::mpsc::error::SendError;
5+
use tokio::sync::broadcast;
6+
use tokio::sync::broadcast::error::SendError;
77

88
use super::Event;
99

1010
/// A trait to allow sending statistics events
1111
#[cfg_attr(test, automock)]
1212
pub trait Sender: Sync + Send {
13-
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>>;
13+
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<usize, SendError<Event>>>>;
1414
}
1515

1616
/// An [`statistics::EventSender`](crate::statistics::event::sender::Sender) implementation.
@@ -19,11 +19,11 @@ pub trait Sender: Sync + Send {
1919
/// [`statistics::Keeper`](crate::statistics::keeper::Keeper)
2020
#[allow(clippy::module_name_repetitions)]
2121
pub struct ChannelSender {
22-
pub(crate) sender: mpsc::Sender<Event>,
22+
pub(crate) sender: broadcast::Sender<Event>,
2323
}
2424

2525
impl Sender for ChannelSender {
26-
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>> {
27-
async move { Some(self.sender.send(event).await) }.boxed()
26+
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<usize, SendError<Event>>>> {
27+
async move { Some(self.sender.send(event)) }.boxed()
2828
}
2929
}
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1-
use tokio::sync::mpsc;
1+
use tokio::sync::broadcast::Receiver;
22

33
use super::event::listener::dispatch_events;
4-
use super::event::sender::{ChannelSender, Sender};
54
use super::event::Event;
65
use super::repository::Repository;
76

8-
const CHANNEL_BUFFER_SIZE: usize = 65_535;
9-
107
/// The service responsible for keeping tracker metrics (listening to statistics events and handle them).
118
///
129
/// It actively listen to new statistics events. When it receives a new event
@@ -29,31 +26,16 @@ impl Keeper {
2926
}
3027
}
3128

32-
#[must_use]
33-
pub fn new_active_instance() -> (Box<dyn Sender>, Repository) {
34-
let mut stats_tracker = Self::new();
35-
36-
let stats_event_sender = stats_tracker.run_event_listener();
37-
38-
(stats_event_sender, stats_tracker.repository)
39-
}
40-
41-
pub fn run_event_listener(&mut self) -> Box<dyn Sender> {
42-
let (sender, receiver) = mpsc::channel::<Event>(CHANNEL_BUFFER_SIZE);
43-
29+
pub fn run_event_listener(&mut self, receiver: Receiver<Event>) {
4430
let stats_repository = self.repository.clone();
4531

4632
tokio::spawn(async move { dispatch_events(receiver, stats_repository).await });
47-
48-
Box::new(ChannelSender { sender })
4933
}
5034
}
5135

5236
#[cfg(test)]
5337
mod tests {
54-
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
5538

56-
use crate::statistics::event::{ConnectionContext, Event};
5739
use crate::statistics::keeper::Keeper;
5840
use crate::statistics::metrics::Metrics;
5941

@@ -65,23 +47,4 @@ mod tests {
6547

6648
assert_eq!(stats.tcp4_announces_handled, Metrics::default().tcp4_announces_handled);
6749
}
68-
69-
#[tokio::test]
70-
async fn should_create_an_event_sender_to_send_statistical_events() {
71-
let mut stats_tracker = Keeper::new();
72-
73-
let event_sender = stats_tracker.run_event_listener();
74-
75-
let result = event_sender
76-
.send_event(Event::TcpAnnounce {
77-
connection: ConnectionContext::new(
78-
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)),
79-
Some(8080),
80-
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7070),
81-
),
82-
})
83-
.await;
84-
85-
assert!(result.is_some());
86-
}
8750
}

packages/http-tracker-core/src/statistics/setup.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
//! Setup for the tracker statistics.
22
//!
33
//! The [`factory`] function builds the structs needed for handling the tracker metrics.
4+
use tokio::sync::broadcast;
5+
6+
use super::event::sender::ChannelSender;
47
use crate::statistics;
58

9+
const CHANNEL_CAPACITY: usize = 1024;
10+
611
/// It builds the structs needed for handling the tracker metrics.
712
///
813
/// It returns:
@@ -19,12 +24,18 @@ pub fn factory(
1924
Option<Box<dyn statistics::event::sender::Sender>>,
2025
statistics::repository::Repository,
2126
) {
22-
let mut stats_event_sender = None;
27+
let mut stats_event_sender: Option<Box<dyn statistics::event::sender::Sender>> = None;
2328

2429
let mut stats_tracker = statistics::keeper::Keeper::new();
2530

2631
if tracker_usage_statistics {
27-
stats_event_sender = Some(stats_tracker.run_event_listener());
32+
let (sender, _) = broadcast::channel(CHANNEL_CAPACITY);
33+
34+
let receiver = sender.subscribe();
35+
36+
stats_event_sender = Some(Box::new(ChannelSender { sender }));
37+
38+
stats_tracker.run_event_listener(receiver);
2839
}
2940

3041
(stats_event_sender, stats_tracker.repository)

0 commit comments

Comments
 (0)