Skip to content

Commit 3f55b9d

Browse files
committed
refactor: [torrust#1319] add UDP server events
We will splot UDP stats events into: - UDP core events - UDP server event This is step 1 in the refactor: - Step 1. Create UDP server events. - Step 2. Remove UDP server events from core events.
1 parent 47aae95 commit 3f55b9d

File tree

11 files changed

+788
-0
lines changed

11 files changed

+788
-0
lines changed

packages/udp-tracker-server/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,7 @@ pub mod environment;
638638
pub mod error;
639639
pub mod handlers;
640640
pub mod server;
641+
pub mod statistics;
641642

642643
use std::net::SocketAddr;
643644

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
use crate::statistics::event::{Event, UdpResponseKind};
2+
use crate::statistics::repository::Repository;
3+
4+
pub async fn handle_event(event: Event, stats_repository: &Repository) {
5+
match event {
6+
// UDP
7+
Event::UdpRequestAborted => {
8+
stats_repository.increase_udp_requests_aborted().await;
9+
}
10+
Event::UdpRequestBanned => {
11+
stats_repository.increase_udp_requests_banned().await;
12+
}
13+
14+
// UDP4
15+
Event::Udp4Request { kind } => {
16+
stats_repository.increase_udp4_requests().await;
17+
match kind {
18+
UdpResponseKind::Connect => {
19+
stats_repository.increase_udp4_connections().await;
20+
}
21+
UdpResponseKind::Announce => {
22+
stats_repository.increase_udp4_announces().await;
23+
}
24+
UdpResponseKind::Scrape => {
25+
stats_repository.increase_udp4_scrapes().await;
26+
}
27+
UdpResponseKind::Error => {}
28+
}
29+
}
30+
Event::Udp4Response {
31+
kind,
32+
req_processing_time,
33+
} => {
34+
stats_repository.increase_udp4_responses().await;
35+
36+
match kind {
37+
UdpResponseKind::Connect => {
38+
stats_repository
39+
.recalculate_udp_avg_connect_processing_time_ns(req_processing_time)
40+
.await;
41+
}
42+
UdpResponseKind::Announce => {
43+
stats_repository
44+
.recalculate_udp_avg_announce_processing_time_ns(req_processing_time)
45+
.await;
46+
}
47+
UdpResponseKind::Scrape => {
48+
stats_repository
49+
.recalculate_udp_avg_scrape_processing_time_ns(req_processing_time)
50+
.await;
51+
}
52+
UdpResponseKind::Error => {}
53+
}
54+
}
55+
Event::Udp4Error => {
56+
stats_repository.increase_udp4_errors().await;
57+
}
58+
59+
// UDP6
60+
Event::Udp6Request => {
61+
stats_repository.increase_udp6_requests().await;
62+
}
63+
Event::Udp6Response {
64+
kind: _,
65+
req_processing_time: _,
66+
} => {
67+
stats_repository.increase_udp6_responses().await;
68+
}
69+
Event::Udp6Error => {
70+
stats_repository.increase_udp6_errors().await;
71+
}
72+
}
73+
74+
tracing::debug!("stats: {:?}", stats_repository.get_stats().await);
75+
}
76+
77+
#[cfg(test)]
78+
mod tests {
79+
use crate::statistics::event::handler::handle_event;
80+
use crate::statistics::event::{Event, UdpResponseKind};
81+
use crate::statistics::repository::Repository;
82+
83+
#[tokio::test]
84+
async fn should_increase_the_udp_abort_counter_when_it_receives_a_udp_abort_event() {
85+
let stats_repository = Repository::new();
86+
87+
handle_event(Event::UdpRequestAborted, &stats_repository).await;
88+
let stats = stats_repository.get_stats().await;
89+
assert_eq!(stats.udp_requests_aborted, 1);
90+
}
91+
#[tokio::test]
92+
async fn should_increase_the_udp_ban_counter_when_it_receives_a_udp_banned_event() {
93+
let stats_repository = Repository::new();
94+
95+
handle_event(Event::UdpRequestBanned, &stats_repository).await;
96+
let stats = stats_repository.get_stats().await;
97+
assert_eq!(stats.udp_requests_banned, 1);
98+
}
99+
100+
#[tokio::test]
101+
async fn should_increase_the_udp4_requests_counter_when_it_receives_a_udp4_request_event() {
102+
let stats_repository = Repository::new();
103+
104+
handle_event(
105+
Event::Udp4Request {
106+
kind: UdpResponseKind::Connect,
107+
},
108+
&stats_repository,
109+
)
110+
.await;
111+
112+
let stats = stats_repository.get_stats().await;
113+
114+
assert_eq!(stats.udp4_requests, 1);
115+
}
116+
117+
#[tokio::test]
118+
async fn should_increase_the_udp4_responses_counter_when_it_receives_a_udp4_response_event() {
119+
let stats_repository = Repository::new();
120+
121+
handle_event(
122+
Event::Udp4Response {
123+
kind: crate::statistics::event::UdpResponseKind::Announce,
124+
req_processing_time: std::time::Duration::from_secs(1),
125+
},
126+
&stats_repository,
127+
)
128+
.await;
129+
130+
let stats = stats_repository.get_stats().await;
131+
132+
assert_eq!(stats.udp4_responses, 1);
133+
}
134+
135+
#[tokio::test]
136+
async fn should_increase_the_udp4_errors_counter_when_it_receives_a_udp4_error_event() {
137+
let stats_repository = Repository::new();
138+
139+
handle_event(Event::Udp4Error, &stats_repository).await;
140+
141+
let stats = stats_repository.get_stats().await;
142+
143+
assert_eq!(stats.udp4_errors_handled, 1);
144+
}
145+
146+
#[tokio::test]
147+
async fn should_increase_the_udp6_requests_counter_when_it_receives_a_udp6_request_event() {
148+
let stats_repository = Repository::new();
149+
150+
handle_event(Event::Udp6Request, &stats_repository).await;
151+
152+
let stats = stats_repository.get_stats().await;
153+
154+
assert_eq!(stats.udp6_requests, 1);
155+
}
156+
157+
#[tokio::test]
158+
async fn should_increase_the_udp6_response_counter_when_it_receives_a_udp6_response_event() {
159+
let stats_repository = Repository::new();
160+
161+
handle_event(
162+
Event::Udp6Response {
163+
kind: crate::statistics::event::UdpResponseKind::Announce,
164+
req_processing_time: std::time::Duration::from_secs(1),
165+
},
166+
&stats_repository,
167+
)
168+
.await;
169+
170+
let stats = stats_repository.get_stats().await;
171+
172+
assert_eq!(stats.udp6_responses, 1);
173+
}
174+
#[tokio::test]
175+
async fn should_increase_the_udp6_errors_counter_when_it_receives_a_udp6_error_event() {
176+
let stats_repository = Repository::new();
177+
178+
handle_event(Event::Udp6Error, &stats_repository).await;
179+
180+
let stats = stats_repository.get_stats().await;
181+
182+
assert_eq!(stats.udp6_errors_handled, 1);
183+
}
184+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
use tokio::sync::mpsc;
2+
3+
use super::handler::handle_event;
4+
use super::Event;
5+
use crate::statistics::repository::Repository;
6+
7+
pub async fn dispatch_events(mut receiver: mpsc::Receiver<Event>, stats_repository: Repository) {
8+
while let Some(event) = receiver.recv().await {
9+
handle_event(event, &stats_repository).await;
10+
}
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use std::time::Duration;
2+
3+
pub mod handler;
4+
pub mod listener;
5+
pub mod sender;
6+
7+
/// An statistics event. It is used to collect tracker metrics.
8+
///
9+
/// - `Tcp` prefix means the event was triggered by the HTTP tracker
10+
/// - `Udp` prefix means the event was triggered by the UDP tracker
11+
/// - `4` or `6` prefixes means the IP version used by the peer
12+
/// - Finally the event suffix is the type of request: `announce`, `scrape` or `connection`
13+
///
14+
/// > NOTE: HTTP trackers do not use `connection` requests.
15+
#[derive(Debug, PartialEq, Eq)]
16+
pub enum Event {
17+
// code-review: consider one single event for request type with data: Event::Announce { scheme: HTTPorUDP, ip_version: V4orV6 }
18+
// Attributes are enums too.
19+
UdpRequestAborted,
20+
UdpRequestBanned,
21+
Udp4Request {
22+
kind: UdpResponseKind,
23+
},
24+
Udp4Response {
25+
kind: UdpResponseKind,
26+
req_processing_time: Duration,
27+
},
28+
Udp4Error,
29+
Udp6Request,
30+
Udp6Response {
31+
kind: UdpResponseKind,
32+
req_processing_time: Duration,
33+
},
34+
Udp6Error,
35+
}
36+
37+
#[derive(Debug, PartialEq, Eq)]
38+
pub enum UdpResponseKind {
39+
Connect,
40+
Announce,
41+
Scrape,
42+
Error,
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use futures::future::BoxFuture;
2+
use futures::FutureExt;
3+
#[cfg(test)]
4+
use mockall::{automock, predicate::str};
5+
use tokio::sync::mpsc;
6+
use tokio::sync::mpsc::error::SendError;
7+
8+
use super::Event;
9+
10+
/// A trait to allow sending statistics events
11+
#[cfg_attr(test, automock)]
12+
pub trait Sender: Sync + Send {
13+
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>>;
14+
}
15+
16+
/// An [`statistics::EventSender`](crate::statistics::event::sender::Sender) implementation.
17+
///
18+
/// It uses a channel sender to send the statistic events. The channel is created by a
19+
/// [`statistics::Keeper`](crate::statistics::keeper::Keeper)
20+
#[allow(clippy::module_name_repetitions)]
21+
pub struct ChannelSender {
22+
pub(crate) sender: mpsc::Sender<Event>,
23+
}
24+
25+
impl Sender for ChannelSender {
26+
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>> {
27+
async move { Some(self.sender.send(event).await) }.boxed()
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use tokio::sync::mpsc;
2+
3+
use super::event::listener::dispatch_events;
4+
use super::event::sender::{ChannelSender, Sender};
5+
use super::event::Event;
6+
use super::repository::Repository;
7+
8+
const CHANNEL_BUFFER_SIZE: usize = 65_535;
9+
10+
/// The service responsible for keeping tracker metrics (listening to statistics events and handle them).
11+
///
12+
/// It actively listen to new statistics events. When it receives a new event
13+
/// it accordingly increases the counters.
14+
pub struct Keeper {
15+
pub repository: Repository,
16+
}
17+
18+
impl Default for Keeper {
19+
fn default() -> Self {
20+
Self::new()
21+
}
22+
}
23+
24+
impl Keeper {
25+
#[must_use]
26+
pub fn new() -> Self {
27+
Self {
28+
repository: Repository::new(),
29+
}
30+
}
31+
32+
#[must_use]
33+
pub fn new_active_instance() -> (Box<dyn Sender>, Repository) {
34+
let mut stats_tracker = Self::new();
35+
36+
let stats_event_sender = stats_tracker.run_event_listener();
37+
38+
(stats_event_sender, stats_tracker.repository)
39+
}
40+
41+
pub fn run_event_listener(&mut self) -> Box<dyn Sender> {
42+
let (sender, receiver) = mpsc::channel::<Event>(CHANNEL_BUFFER_SIZE);
43+
44+
let stats_repository = self.repository.clone();
45+
46+
tokio::spawn(async move { dispatch_events(receiver, stats_repository).await });
47+
48+
Box::new(ChannelSender { sender })
49+
}
50+
}
51+
52+
#[cfg(test)]
53+
mod tests {
54+
use crate::statistics::event::{Event, UdpResponseKind};
55+
use crate::statistics::keeper::Keeper;
56+
use crate::statistics::metrics::Metrics;
57+
58+
#[tokio::test]
59+
async fn should_contain_the_tracker_statistics() {
60+
let stats_tracker = Keeper::new();
61+
62+
let stats = stats_tracker.repository.get_stats().await;
63+
64+
assert_eq!(stats.udp4_requests, Metrics::default().udp4_requests);
65+
}
66+
67+
#[tokio::test]
68+
async fn should_create_an_event_sender_to_send_statistical_events() {
69+
let mut stats_tracker = Keeper::new();
70+
71+
let event_sender = stats_tracker.run_event_listener();
72+
73+
let result = event_sender
74+
.send_event(Event::Udp4Request {
75+
kind: UdpResponseKind::Connect,
76+
})
77+
.await;
78+
79+
assert!(result.is_some());
80+
}
81+
}

0 commit comments

Comments
 (0)