Skip to content

Commit f99534a

Browse files
committedJan 31, 2025··
refactor: [torrust#1228] split statistics mod into UDO and HTTP statistics
1 parent 9318842 commit f99534a

23 files changed

+1292
-0
lines changed
 

‎src/packages/http_tracker_core/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod statistics;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
use crate::packages::http_tracker_core::statistics::event::Event;
2+
use crate::packages::http_tracker_core::statistics::repository::Repository;
3+
4+
pub async fn handle_event(event: Event, stats_repository: &Repository) {
5+
match event {
6+
// TCP4
7+
Event::Tcp4Announce => {
8+
stats_repository.increase_tcp4_announces().await;
9+
stats_repository.increase_tcp4_connections().await;
10+
}
11+
Event::Tcp4Scrape => {
12+
stats_repository.increase_tcp4_scrapes().await;
13+
stats_repository.increase_tcp4_connections().await;
14+
}
15+
16+
// TCP6
17+
Event::Tcp6Announce => {
18+
stats_repository.increase_tcp6_announces().await;
19+
stats_repository.increase_tcp6_connections().await;
20+
}
21+
Event::Tcp6Scrape => {
22+
stats_repository.increase_tcp6_scrapes().await;
23+
stats_repository.increase_tcp6_connections().await;
24+
}
25+
}
26+
27+
tracing::debug!("stats: {:?}", stats_repository.get_stats().await);
28+
}
29+
30+
#[cfg(test)]
31+
mod tests {
32+
use crate::packages::http_tracker_core::statistics::event::handler::handle_event;
33+
use crate::packages::http_tracker_core::statistics::event::Event;
34+
use crate::packages::http_tracker_core::statistics::repository::Repository;
35+
36+
#[tokio::test]
37+
async fn should_increase_the_tcp4_announces_counter_when_it_receives_a_tcp4_announce_event() {
38+
let stats_repository = Repository::new();
39+
40+
handle_event(Event::Tcp4Announce, &stats_repository).await;
41+
42+
let stats = stats_repository.get_stats().await;
43+
44+
assert_eq!(stats.tcp4_announces_handled, 1);
45+
}
46+
47+
#[tokio::test]
48+
async fn should_increase_the_tcp4_connections_counter_when_it_receives_a_tcp4_announce_event() {
49+
let stats_repository = Repository::new();
50+
51+
handle_event(Event::Tcp4Announce, &stats_repository).await;
52+
53+
let stats = stats_repository.get_stats().await;
54+
55+
assert_eq!(stats.tcp4_connections_handled, 1);
56+
}
57+
58+
#[tokio::test]
59+
async fn should_increase_the_tcp4_scrapes_counter_when_it_receives_a_tcp4_scrape_event() {
60+
let stats_repository = Repository::new();
61+
62+
handle_event(Event::Tcp4Scrape, &stats_repository).await;
63+
64+
let stats = stats_repository.get_stats().await;
65+
66+
assert_eq!(stats.tcp4_scrapes_handled, 1);
67+
}
68+
69+
#[tokio::test]
70+
async fn should_increase_the_tcp4_connections_counter_when_it_receives_a_tcp4_scrape_event() {
71+
let stats_repository = Repository::new();
72+
73+
handle_event(Event::Tcp4Scrape, &stats_repository).await;
74+
75+
let stats = stats_repository.get_stats().await;
76+
77+
assert_eq!(stats.tcp4_connections_handled, 1);
78+
}
79+
80+
#[tokio::test]
81+
async fn should_increase_the_tcp6_announces_counter_when_it_receives_a_tcp6_announce_event() {
82+
let stats_repository = Repository::new();
83+
84+
handle_event(Event::Tcp6Announce, &stats_repository).await;
85+
86+
let stats = stats_repository.get_stats().await;
87+
88+
assert_eq!(stats.tcp6_announces_handled, 1);
89+
}
90+
91+
#[tokio::test]
92+
async fn should_increase_the_tcp6_connections_counter_when_it_receives_a_tcp6_announce_event() {
93+
let stats_repository = Repository::new();
94+
95+
handle_event(Event::Tcp6Announce, &stats_repository).await;
96+
97+
let stats = stats_repository.get_stats().await;
98+
99+
assert_eq!(stats.tcp6_connections_handled, 1);
100+
}
101+
102+
#[tokio::test]
103+
async fn should_increase_the_tcp6_scrapes_counter_when_it_receives_a_tcp6_scrape_event() {
104+
let stats_repository = Repository::new();
105+
106+
handle_event(Event::Tcp6Scrape, &stats_repository).await;
107+
108+
let stats = stats_repository.get_stats().await;
109+
110+
assert_eq!(stats.tcp6_scrapes_handled, 1);
111+
}
112+
113+
#[tokio::test]
114+
async fn should_increase_the_tcp6_connections_counter_when_it_receives_a_tcp6_scrape_event() {
115+
let stats_repository = Repository::new();
116+
117+
handle_event(Event::Tcp6Scrape, &stats_repository).await;
118+
119+
let stats = stats_repository.get_stats().await;
120+
121+
assert_eq!(stats.tcp6_connections_handled, 1);
122+
}
123+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
use tokio::sync::mpsc;
2+
3+
use super::handler::handle_event;
4+
use super::Event;
5+
use crate::packages::http_tracker_core::statistics::repository::Repository;
6+
7+
pub async fn dispatch_events(mut receiver: mpsc::Receiver<Event>, stats_repository: Repository) {
8+
while let Some(event) = receiver.recv().await {
9+
handle_event(event, &stats_repository).await;
10+
}
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
pub mod handler;
2+
pub mod listener;
3+
pub mod sender;
4+
5+
/// An statistics event. It is used to collect tracker metrics.
6+
///
7+
/// - `Tcp` prefix means the event was triggered by the HTTP tracker
8+
/// - `Udp` prefix means the event was triggered by the UDP tracker
9+
/// - `4` or `6` prefixes means the IP version used by the peer
10+
/// - Finally the event suffix is the type of request: `announce`, `scrape` or `connection`
11+
///
12+
/// > NOTE: HTTP trackers do not use `connection` requests.
13+
#[derive(Debug, PartialEq, Eq)]
14+
pub enum Event {
15+
// code-review: consider one single event for request type with data: Event::Announce { scheme: HTTPorUDP, ip_version: V4orV6 }
16+
// Attributes are enums too.
17+
Tcp4Announce,
18+
Tcp4Scrape,
19+
Tcp6Announce,
20+
Tcp6Scrape,
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use futures::future::BoxFuture;
2+
use futures::FutureExt;
3+
#[cfg(test)]
4+
use mockall::{automock, predicate::str};
5+
use tokio::sync::mpsc;
6+
use tokio::sync::mpsc::error::SendError;
7+
8+
use super::Event;
9+
10+
/// A trait to allow sending statistics events
11+
#[cfg_attr(test, automock)]
12+
pub trait Sender: Sync + Send {
13+
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>>;
14+
}
15+
16+
/// An [`statistics::EventSender`](crate::packages::http_tracker_core::statistics::event::sender::Sender) implementation.
17+
///
18+
/// It uses a channel sender to send the statistic events. The channel is created by a
19+
/// [`statistics::Keeper`](crate::packages::http_tracker_core::statistics::keeper::Keeper)
20+
#[allow(clippy::module_name_repetitions)]
21+
pub struct ChannelSender {
22+
pub(crate) sender: mpsc::Sender<Event>,
23+
}
24+
25+
impl Sender for ChannelSender {
26+
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>> {
27+
async move { Some(self.sender.send(event).await) }.boxed()
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
use tokio::sync::mpsc;
2+
3+
use super::event::listener::dispatch_events;
4+
use super::event::sender::{ChannelSender, Sender};
5+
use super::event::Event;
6+
use super::repository::Repository;
7+
8+
const CHANNEL_BUFFER_SIZE: usize = 65_535;
9+
10+
/// The service responsible for keeping tracker metrics (listening to statistics events and handle them).
11+
///
12+
/// It actively listen to new statistics events. When it receives a new event
13+
/// it accordingly increases the counters.
14+
pub struct Keeper {
15+
pub repository: Repository,
16+
}
17+
18+
impl Default for Keeper {
19+
fn default() -> Self {
20+
Self::new()
21+
}
22+
}
23+
24+
impl Keeper {
25+
#[must_use]
26+
pub fn new() -> Self {
27+
Self {
28+
repository: Repository::new(),
29+
}
30+
}
31+
32+
#[must_use]
33+
pub fn new_active_instance() -> (Box<dyn Sender>, Repository) {
34+
let mut stats_tracker = Self::new();
35+
36+
let stats_event_sender = stats_tracker.run_event_listener();
37+
38+
(stats_event_sender, stats_tracker.repository)
39+
}
40+
41+
pub fn run_event_listener(&mut self) -> Box<dyn Sender> {
42+
let (sender, receiver) = mpsc::channel::<Event>(CHANNEL_BUFFER_SIZE);
43+
44+
let stats_repository = self.repository.clone();
45+
46+
tokio::spawn(async move { dispatch_events(receiver, stats_repository).await });
47+
48+
Box::new(ChannelSender { sender })
49+
}
50+
}
51+
52+
#[cfg(test)]
53+
mod tests {
54+
use crate::packages::http_tracker_core::statistics::event::Event;
55+
use crate::packages::http_tracker_core::statistics::keeper::Keeper;
56+
use crate::packages::http_tracker_core::statistics::metrics::Metrics;
57+
58+
#[tokio::test]
59+
async fn should_contain_the_tracker_statistics() {
60+
let stats_tracker = Keeper::new();
61+
62+
let stats = stats_tracker.repository.get_stats().await;
63+
64+
assert_eq!(stats.tcp4_announces_handled, Metrics::default().tcp4_announces_handled);
65+
}
66+
67+
#[tokio::test]
68+
async fn should_create_an_event_sender_to_send_statistical_events() {
69+
let mut stats_tracker = Keeper::new();
70+
71+
let event_sender = stats_tracker.run_event_listener();
72+
73+
let result = event_sender.send_event(Event::Tcp4Announce).await;
74+
75+
assert!(result.is_some());
76+
}
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/// Metrics collected by the tracker.
2+
///
3+
/// - Number of connections handled
4+
/// - Number of `announce` requests handled
5+
/// - Number of `scrape` request handled
6+
///
7+
/// These metrics are collected for each connection type: UDP and HTTP
8+
/// and also for each IP version used by the peers: IPv4 and IPv6.
9+
#[derive(Debug, PartialEq, Default)]
10+
pub struct Metrics {
11+
/// Total number of TCP (HTTP tracker) connections from IPv4 peers.
12+
/// Since the HTTP tracker spec does not require a handshake, this metric
13+
/// increases for every HTTP request.
14+
pub tcp4_connections_handled: u64,
15+
16+
/// Total number of TCP (HTTP tracker) `announce` requests from IPv4 peers.
17+
pub tcp4_announces_handled: u64,
18+
19+
/// Total number of TCP (HTTP tracker) `scrape` requests from IPv4 peers.
20+
pub tcp4_scrapes_handled: u64,
21+
22+
/// Total number of TCP (HTTP tracker) connections from IPv6 peers.
23+
pub tcp6_connections_handled: u64,
24+
25+
/// Total number of TCP (HTTP tracker) `announce` requests from IPv6 peers.
26+
pub tcp6_announces_handled: u64,
27+
28+
/// Total number of TCP (HTTP tracker) `scrape` requests from IPv6 peers.
29+
pub tcp6_scrapes_handled: u64,
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pub mod event;
2+
pub mod keeper;
3+
pub mod metrics;
4+
pub mod repository;
5+
pub mod services;
6+
pub mod setup;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use std::sync::Arc;
2+
3+
use tokio::sync::{RwLock, RwLockReadGuard};
4+
5+
use super::metrics::Metrics;
6+
7+
/// A repository for the tracker metrics.
8+
#[derive(Clone)]
9+
pub struct Repository {
10+
pub stats: Arc<RwLock<Metrics>>,
11+
}
12+
13+
impl Default for Repository {
14+
fn default() -> Self {
15+
Self::new()
16+
}
17+
}
18+
19+
impl Repository {
20+
#[must_use]
21+
pub fn new() -> Self {
22+
Self {
23+
stats: Arc::new(RwLock::new(Metrics::default())),
24+
}
25+
}
26+
27+
pub async fn get_stats(&self) -> RwLockReadGuard<'_, Metrics> {
28+
self.stats.read().await
29+
}
30+
31+
pub async fn increase_tcp4_announces(&self) {
32+
let mut stats_lock = self.stats.write().await;
33+
stats_lock.tcp4_announces_handled += 1;
34+
drop(stats_lock);
35+
}
36+
37+
pub async fn increase_tcp4_connections(&self) {
38+
let mut stats_lock = self.stats.write().await;
39+
stats_lock.tcp4_connections_handled += 1;
40+
drop(stats_lock);
41+
}
42+
43+
pub async fn increase_tcp4_scrapes(&self) {
44+
let mut stats_lock = self.stats.write().await;
45+
stats_lock.tcp4_scrapes_handled += 1;
46+
drop(stats_lock);
47+
}
48+
49+
pub async fn increase_tcp6_announces(&self) {
50+
let mut stats_lock = self.stats.write().await;
51+
stats_lock.tcp6_announces_handled += 1;
52+
drop(stats_lock);
53+
}
54+
55+
pub async fn increase_tcp6_connections(&self) {
56+
let mut stats_lock = self.stats.write().await;
57+
stats_lock.tcp6_connections_handled += 1;
58+
drop(stats_lock);
59+
}
60+
61+
pub async fn increase_tcp6_scrapes(&self) {
62+
let mut stats_lock = self.stats.write().await;
63+
stats_lock.tcp6_scrapes_handled += 1;
64+
drop(stats_lock);
65+
}
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
//! Statistics services.
2+
//!
3+
//! It includes:
4+
//!
5+
//! - A [`factory`](crate::packages::http_tracker_core::statistics::setup::factory) function to build the structs needed to collect the tracker metrics.
6+
//! - A [`get_metrics`] service to get the tracker [`metrics`](crate::packages::http_tracker_core::statistics::metrics::Metrics).
7+
//!
8+
//! Tracker metrics are collected using a Publisher-Subscribe pattern.
9+
//!
10+
//! The factory function builds two structs:
11+
//!
12+
//! - An statistics event [`Sender`](crate::packages::http_tracker_core::statistics::event::sender::Sender)
13+
//! - An statistics [`Repository`]
14+
//!
15+
//! ```text
16+
//! let (stats_event_sender, stats_repository) = factory(tracker_usage_statistics);
17+
//! ```
18+
//!
19+
//! The statistics repository is responsible for storing the metrics in memory.
20+
//! The statistics event sender allows sending events related to metrics.
21+
//! There is an event listener that is receiving all the events and processing them with an event handler.
22+
//! Then, the event handler updates the metrics depending on the received event.
23+
use std::sync::Arc;
24+
25+
use bittorrent_tracker_core::torrent::repository::in_memory::InMemoryTorrentRepository;
26+
use packages::http_tracker_core::statistics::metrics::Metrics;
27+
use packages::http_tracker_core::statistics::repository::Repository;
28+
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
29+
30+
use crate::packages;
31+
32+
/// All the metrics collected by the tracker.
33+
#[derive(Debug, PartialEq)]
34+
pub struct TrackerMetrics {
35+
/// Domain level metrics.
36+
///
37+
/// General metrics for all torrents (number of seeders, leechers, etcetera)
38+
pub torrents_metrics: TorrentsMetrics,
39+
40+
/// Application level metrics. Usage statistics/metrics.
41+
///
42+
/// Metrics about how the tracker is been used (number of number of http scrape requests, etcetera)
43+
pub protocol_metrics: Metrics,
44+
}
45+
46+
/// It returns all the [`TrackerMetrics`]
47+
pub async fn get_metrics(
48+
in_memory_torrent_repository: Arc<InMemoryTorrentRepository>,
49+
stats_repository: Arc<Repository>,
50+
) -> TrackerMetrics {
51+
let torrents_metrics = in_memory_torrent_repository.get_torrents_metrics();
52+
let stats = stats_repository.get_stats().await;
53+
54+
TrackerMetrics {
55+
torrents_metrics,
56+
protocol_metrics: Metrics {
57+
// TCPv4
58+
tcp4_connections_handled: stats.tcp4_connections_handled,
59+
tcp4_announces_handled: stats.tcp4_announces_handled,
60+
tcp4_scrapes_handled: stats.tcp4_scrapes_handled,
61+
// TCPv6
62+
tcp6_connections_handled: stats.tcp6_connections_handled,
63+
tcp6_announces_handled: stats.tcp6_announces_handled,
64+
tcp6_scrapes_handled: stats.tcp6_scrapes_handled,
65+
},
66+
}
67+
}
68+
69+
#[cfg(test)]
70+
mod tests {
71+
use std::sync::Arc;
72+
73+
use bittorrent_tracker_core::torrent::repository::in_memory::InMemoryTorrentRepository;
74+
use bittorrent_tracker_core::{self};
75+
use torrust_tracker_configuration::Configuration;
76+
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
77+
use torrust_tracker_test_helpers::configuration;
78+
79+
use crate::packages::http_tracker_core::statistics;
80+
use crate::packages::http_tracker_core::statistics::services::{get_metrics, TrackerMetrics};
81+
82+
pub fn tracker_configuration() -> Configuration {
83+
configuration::ephemeral()
84+
}
85+
86+
#[tokio::test]
87+
async fn the_statistics_service_should_return_the_tracker_metrics() {
88+
let config = tracker_configuration();
89+
90+
let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::default());
91+
let (_stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics);
92+
let stats_repository = Arc::new(stats_repository);
93+
94+
let tracker_metrics = get_metrics(in_memory_torrent_repository.clone(), stats_repository.clone()).await;
95+
96+
assert_eq!(
97+
tracker_metrics,
98+
TrackerMetrics {
99+
torrents_metrics: TorrentsMetrics::default(),
100+
protocol_metrics: statistics::metrics::Metrics::default(),
101+
}
102+
);
103+
}
104+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
//! Setup for the tracker statistics.
2+
//!
3+
//! The [`factory`] function builds the structs needed for handling the tracker metrics.
4+
use crate::packages::http_tracker_core::statistics;
5+
6+
/// It builds the structs needed for handling the tracker metrics.
7+
///
8+
/// It returns:
9+
///
10+
/// - An statistics event [`Sender`](crate::packages::http_tracker_core::statistics::event::sender::Sender) that allows you to send events related to statistics.
11+
/// - An statistics [`Repository`](crate::packages::http_tracker_core::statistics::repository::Repository) which is an in-memory repository for the tracker metrics.
12+
///
13+
/// When the input argument `tracker_usage_statistics`is false the setup does not run the event listeners, consequently the statistics
14+
/// events are sent are received but not dispatched to the handler.
15+
#[must_use]
16+
pub fn factory(
17+
tracker_usage_statistics: bool,
18+
) -> (
19+
Option<Box<dyn statistics::event::sender::Sender>>,
20+
statistics::repository::Repository,
21+
) {
22+
let mut stats_event_sender = None;
23+
24+
let mut stats_tracker = statistics::keeper::Keeper::new();
25+
26+
if tracker_usage_statistics {
27+
stats_event_sender = Some(stats_tracker.run_event_listener());
28+
}
29+
30+
(stats_event_sender, stats_tracker.repository)
31+
}
32+
33+
#[cfg(test)]
34+
mod test {
35+
use super::factory;
36+
37+
#[tokio::test]
38+
async fn should_not_send_any_event_when_statistics_are_disabled() {
39+
let tracker_usage_statistics = false;
40+
41+
let (stats_event_sender, _stats_repository) = factory(tracker_usage_statistics);
42+
43+
assert!(stats_event_sender.is_none());
44+
}
45+
46+
#[tokio::test]
47+
async fn should_send_events_when_statistics_are_enabled() {
48+
let tracker_usage_statistics = true;
49+
50+
let (stats_event_sender, _stats_repository) = factory(tracker_usage_statistics);
51+
52+
assert!(stats_event_sender.is_some());
53+
}
54+
}

‎src/packages/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -1 +1,6 @@
1+
//! This module contains logic pending to be extracted into workspace packages.
2+
//!
3+
//! It will be moved to the directory `packages`.
4+
pub mod http_tracker_core;
15
pub mod statistics;
6+
pub mod udp_tracker_core;

‎src/packages/udp_tracker_core/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod statistics;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
use crate::packages::udp_tracker_core::statistics::event::{Event, UdpResponseKind};
2+
use crate::packages::udp_tracker_core::statistics::repository::Repository;
3+
4+
pub async fn handle_event(event: Event, stats_repository: &Repository) {
5+
match event {
6+
// UDP
7+
Event::UdpRequestAborted => {
8+
stats_repository.increase_udp_requests_aborted().await;
9+
}
10+
Event::UdpRequestBanned => {
11+
stats_repository.increase_udp_requests_banned().await;
12+
}
13+
14+
// UDP4
15+
Event::Udp4Request => {
16+
stats_repository.increase_udp4_requests().await;
17+
}
18+
Event::Udp4Connect => {
19+
stats_repository.increase_udp4_connections().await;
20+
}
21+
Event::Udp4Announce => {
22+
stats_repository.increase_udp4_announces().await;
23+
}
24+
Event::Udp4Scrape => {
25+
stats_repository.increase_udp4_scrapes().await;
26+
}
27+
Event::Udp4Response {
28+
kind,
29+
req_processing_time,
30+
} => {
31+
stats_repository.increase_udp4_responses().await;
32+
33+
match kind {
34+
UdpResponseKind::Connect => {
35+
stats_repository
36+
.recalculate_udp_avg_connect_processing_time_ns(req_processing_time)
37+
.await;
38+
}
39+
UdpResponseKind::Announce => {
40+
stats_repository
41+
.recalculate_udp_avg_announce_processing_time_ns(req_processing_time)
42+
.await;
43+
}
44+
UdpResponseKind::Scrape => {
45+
stats_repository
46+
.recalculate_udp_avg_scrape_processing_time_ns(req_processing_time)
47+
.await;
48+
}
49+
UdpResponseKind::Error => {}
50+
}
51+
}
52+
Event::Udp4Error => {
53+
stats_repository.increase_udp4_errors().await;
54+
}
55+
56+
// UDP6
57+
Event::Udp6Request => {
58+
stats_repository.increase_udp6_requests().await;
59+
}
60+
Event::Udp6Connect => {
61+
stats_repository.increase_udp6_connections().await;
62+
}
63+
Event::Udp6Announce => {
64+
stats_repository.increase_udp6_announces().await;
65+
}
66+
Event::Udp6Scrape => {
67+
stats_repository.increase_udp6_scrapes().await;
68+
}
69+
Event::Udp6Response {
70+
kind: _,
71+
req_processing_time: _,
72+
} => {
73+
stats_repository.increase_udp6_responses().await;
74+
}
75+
Event::Udp6Error => {
76+
stats_repository.increase_udp6_errors().await;
77+
}
78+
}
79+
80+
tracing::debug!("stats: {:?}", stats_repository.get_stats().await);
81+
}
82+
83+
#[cfg(test)]
84+
mod tests {
85+
use crate::packages::udp_tracker_core::statistics::event::handler::handle_event;
86+
use crate::packages::udp_tracker_core::statistics::event::Event;
87+
use crate::packages::udp_tracker_core::statistics::repository::Repository;
88+
89+
#[tokio::test]
90+
async fn should_increase_the_udp4_connections_counter_when_it_receives_a_udp4_connect_event() {
91+
let stats_repository = Repository::new();
92+
93+
handle_event(Event::Udp4Connect, &stats_repository).await;
94+
95+
let stats = stats_repository.get_stats().await;
96+
97+
assert_eq!(stats.udp4_connections_handled, 1);
98+
}
99+
100+
#[tokio::test]
101+
async fn should_increase_the_udp4_announces_counter_when_it_receives_a_udp4_announce_event() {
102+
let stats_repository = Repository::new();
103+
104+
handle_event(Event::Udp4Announce, &stats_repository).await;
105+
106+
let stats = stats_repository.get_stats().await;
107+
108+
assert_eq!(stats.udp4_announces_handled, 1);
109+
}
110+
111+
#[tokio::test]
112+
async fn should_increase_the_udp4_scrapes_counter_when_it_receives_a_udp4_scrape_event() {
113+
let stats_repository = Repository::new();
114+
115+
handle_event(Event::Udp4Scrape, &stats_repository).await;
116+
117+
let stats = stats_repository.get_stats().await;
118+
119+
assert_eq!(stats.udp4_scrapes_handled, 1);
120+
}
121+
122+
#[tokio::test]
123+
async fn should_increase_the_udp6_connections_counter_when_it_receives_a_udp6_connect_event() {
124+
let stats_repository = Repository::new();
125+
126+
handle_event(Event::Udp6Connect, &stats_repository).await;
127+
128+
let stats = stats_repository.get_stats().await;
129+
130+
assert_eq!(stats.udp6_connections_handled, 1);
131+
}
132+
133+
#[tokio::test]
134+
async fn should_increase_the_udp6_announces_counter_when_it_receives_a_udp6_announce_event() {
135+
let stats_repository = Repository::new();
136+
137+
handle_event(Event::Udp6Announce, &stats_repository).await;
138+
139+
let stats = stats_repository.get_stats().await;
140+
141+
assert_eq!(stats.udp6_announces_handled, 1);
142+
}
143+
144+
#[tokio::test]
145+
async fn should_increase_the_udp6_scrapes_counter_when_it_receives_a_udp6_scrape_event() {
146+
let stats_repository = Repository::new();
147+
148+
handle_event(Event::Udp6Scrape, &stats_repository).await;
149+
150+
let stats = stats_repository.get_stats().await;
151+
152+
assert_eq!(stats.udp6_scrapes_handled, 1);
153+
}
154+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
use tokio::sync::mpsc;
2+
3+
use super::handler::handle_event;
4+
use super::Event;
5+
use crate::packages::udp_tracker_core::statistics::repository::Repository;
6+
7+
pub async fn dispatch_events(mut receiver: mpsc::Receiver<Event>, stats_repository: Repository) {
8+
while let Some(event) = receiver.recv().await {
9+
handle_event(event, &stats_repository).await;
10+
}
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use std::time::Duration;
2+
3+
pub mod handler;
4+
pub mod listener;
5+
pub mod sender;
6+
7+
/// An statistics event. It is used to collect tracker metrics.
8+
///
9+
/// - `Tcp` prefix means the event was triggered by the HTTP tracker
10+
/// - `Udp` prefix means the event was triggered by the UDP tracker
11+
/// - `4` or `6` prefixes means the IP version used by the peer
12+
/// - Finally the event suffix is the type of request: `announce`, `scrape` or `connection`
13+
///
14+
/// > NOTE: HTTP trackers do not use `connection` requests.
15+
#[derive(Debug, PartialEq, Eq)]
16+
pub enum Event {
17+
// code-review: consider one single event for request type with data: Event::Announce { scheme: HTTPorUDP, ip_version: V4orV6 }
18+
// Attributes are enums too.
19+
UdpRequestAborted,
20+
UdpRequestBanned,
21+
Udp4Request,
22+
Udp4Connect,
23+
Udp4Announce,
24+
Udp4Scrape,
25+
Udp4Response {
26+
kind: UdpResponseKind,
27+
req_processing_time: Duration,
28+
},
29+
Udp4Error,
30+
Udp6Request,
31+
Udp6Connect,
32+
Udp6Announce,
33+
Udp6Scrape,
34+
Udp6Response {
35+
kind: UdpResponseKind,
36+
req_processing_time: Duration,
37+
},
38+
Udp6Error,
39+
}
40+
41+
#[derive(Debug, PartialEq, Eq)]
42+
pub enum UdpResponseKind {
43+
Connect,
44+
Announce,
45+
Scrape,
46+
Error,
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use futures::future::BoxFuture;
2+
use futures::FutureExt;
3+
#[cfg(test)]
4+
use mockall::{automock, predicate::str};
5+
use tokio::sync::mpsc;
6+
use tokio::sync::mpsc::error::SendError;
7+
8+
use super::Event;
9+
10+
/// A trait to allow sending statistics events
11+
#[cfg_attr(test, automock)]
12+
pub trait Sender: Sync + Send {
13+
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>>;
14+
}
15+
16+
/// An [`statistics::EventSender`](crate::packages::udp_tracker_core::statistics::event::sender::Sender) implementation.
17+
///
18+
/// It uses a channel sender to send the statistic events. The channel is created by a
19+
/// [`statistics::Keeper`](crate::packages::udp_tracker_core::statistics::keeper::Keeper)
20+
#[allow(clippy::module_name_repetitions)]
21+
pub struct ChannelSender {
22+
pub(crate) sender: mpsc::Sender<Event>,
23+
}
24+
25+
impl Sender for ChannelSender {
26+
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>> {
27+
async move { Some(self.sender.send(event).await) }.boxed()
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
use tokio::sync::mpsc;
2+
3+
use super::event::listener::dispatch_events;
4+
use super::event::sender::{ChannelSender, Sender};
5+
use super::event::Event;
6+
use super::repository::Repository;
7+
8+
const CHANNEL_BUFFER_SIZE: usize = 65_535;
9+
10+
/// The service responsible for keeping tracker metrics (listening to statistics events and handle them).
11+
///
12+
/// It actively listen to new statistics events. When it receives a new event
13+
/// it accordingly increases the counters.
14+
pub struct Keeper {
15+
pub repository: Repository,
16+
}
17+
18+
impl Default for Keeper {
19+
fn default() -> Self {
20+
Self::new()
21+
}
22+
}
23+
24+
impl Keeper {
25+
#[must_use]
26+
pub fn new() -> Self {
27+
Self {
28+
repository: Repository::new(),
29+
}
30+
}
31+
32+
#[must_use]
33+
pub fn new_active_instance() -> (Box<dyn Sender>, Repository) {
34+
let mut stats_tracker = Self::new();
35+
36+
let stats_event_sender = stats_tracker.run_event_listener();
37+
38+
(stats_event_sender, stats_tracker.repository)
39+
}
40+
41+
pub fn run_event_listener(&mut self) -> Box<dyn Sender> {
42+
let (sender, receiver) = mpsc::channel::<Event>(CHANNEL_BUFFER_SIZE);
43+
44+
let stats_repository = self.repository.clone();
45+
46+
tokio::spawn(async move { dispatch_events(receiver, stats_repository).await });
47+
48+
Box::new(ChannelSender { sender })
49+
}
50+
}
51+
52+
#[cfg(test)]
53+
mod tests {
54+
use crate::packages::udp_tracker_core::statistics::event::Event;
55+
use crate::packages::udp_tracker_core::statistics::keeper::Keeper;
56+
use crate::packages::udp_tracker_core::statistics::metrics::Metrics;
57+
58+
#[tokio::test]
59+
async fn should_contain_the_tracker_statistics() {
60+
let stats_tracker = Keeper::new();
61+
62+
let stats = stats_tracker.repository.get_stats().await;
63+
64+
assert_eq!(stats.udp4_announces_handled, Metrics::default().udp4_announces_handled);
65+
}
66+
67+
#[tokio::test]
68+
async fn should_create_an_event_sender_to_send_statistical_events() {
69+
let mut stats_tracker = Keeper::new();
70+
71+
let event_sender = stats_tracker.run_event_listener();
72+
73+
let result = event_sender.send_event(Event::Udp4Connect).await;
74+
75+
assert!(result.is_some());
76+
}
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/// Metrics collected by the tracker.
2+
///
3+
/// - Number of connections handled
4+
/// - Number of `announce` requests handled
5+
/// - Number of `scrape` request handled
6+
///
7+
/// These metrics are collected for each connection type: UDP and HTTP
8+
/// and also for each IP version used by the peers: IPv4 and IPv6.
9+
#[derive(Debug, PartialEq, Default)]
10+
pub struct Metrics {
11+
// UDP
12+
/// Total number of UDP (UDP tracker) requests aborted.
13+
pub udp_requests_aborted: u64,
14+
15+
/// Total number of UDP (UDP tracker) requests banned.
16+
pub udp_requests_banned: u64,
17+
18+
/// Total number of banned IPs.
19+
pub udp_banned_ips_total: u64,
20+
21+
/// Average rounded time spent processing UDP connect requests.
22+
pub udp_avg_connect_processing_time_ns: u64,
23+
24+
/// Average rounded time spent processing UDP announce requests.
25+
pub udp_avg_announce_processing_time_ns: u64,
26+
27+
/// Average rounded time spent processing UDP scrape requests.
28+
pub udp_avg_scrape_processing_time_ns: u64,
29+
30+
// UDPv4
31+
/// Total number of UDP (UDP tracker) requests from IPv4 peers.
32+
pub udp4_requests: u64,
33+
34+
/// Total number of UDP (UDP tracker) connections from IPv4 peers.
35+
pub udp4_connections_handled: u64,
36+
37+
/// Total number of UDP (UDP tracker) `announce` requests from IPv4 peers.
38+
pub udp4_announces_handled: u64,
39+
40+
/// Total number of UDP (UDP tracker) `scrape` requests from IPv4 peers.
41+
pub udp4_scrapes_handled: u64,
42+
43+
/// Total number of UDP (UDP tracker) responses from IPv4 peers.
44+
pub udp4_responses: u64,
45+
46+
/// Total number of UDP (UDP tracker) `error` requests from IPv4 peers.
47+
pub udp4_errors_handled: u64,
48+
49+
// UDPv6
50+
/// Total number of UDP (UDP tracker) requests from IPv6 peers.
51+
pub udp6_requests: u64,
52+
53+
/// Total number of UDP (UDP tracker) `connection` requests from IPv6 peers.
54+
pub udp6_connections_handled: u64,
55+
56+
/// Total number of UDP (UDP tracker) `announce` requests from IPv6 peers.
57+
pub udp6_announces_handled: u64,
58+
59+
/// Total number of UDP (UDP tracker) `scrape` requests from IPv6 peers.
60+
pub udp6_scrapes_handled: u64,
61+
62+
/// Total number of UDP (UDP tracker) responses from IPv6 peers.
63+
pub udp6_responses: u64,
64+
65+
/// Total number of UDP (UDP tracker) `error` requests from IPv6 peers.
66+
pub udp6_errors_handled: u64,
67+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pub mod event;
2+
pub mod keeper;
3+
pub mod metrics;
4+
pub mod repository;
5+
pub mod services;
6+
pub mod setup;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
use std::sync::Arc;
2+
use std::time::Duration;
3+
4+
use tokio::sync::{RwLock, RwLockReadGuard};
5+
6+
use super::metrics::Metrics;
7+
8+
/// A repository for the tracker metrics.
9+
#[derive(Clone)]
10+
pub struct Repository {
11+
pub stats: Arc<RwLock<Metrics>>,
12+
}
13+
14+
impl Default for Repository {
15+
fn default() -> Self {
16+
Self::new()
17+
}
18+
}
19+
20+
impl Repository {
21+
#[must_use]
22+
pub fn new() -> Self {
23+
Self {
24+
stats: Arc::new(RwLock::new(Metrics::default())),
25+
}
26+
}
27+
28+
pub async fn get_stats(&self) -> RwLockReadGuard<'_, Metrics> {
29+
self.stats.read().await
30+
}
31+
32+
pub async fn increase_udp_requests_aborted(&self) {
33+
let mut stats_lock = self.stats.write().await;
34+
stats_lock.udp_requests_aborted += 1;
35+
drop(stats_lock);
36+
}
37+
38+
pub async fn increase_udp_requests_banned(&self) {
39+
let mut stats_lock = self.stats.write().await;
40+
stats_lock.udp_requests_banned += 1;
41+
drop(stats_lock);
42+
}
43+
44+
pub async fn increase_udp4_requests(&self) {
45+
let mut stats_lock = self.stats.write().await;
46+
stats_lock.udp4_requests += 1;
47+
drop(stats_lock);
48+
}
49+
50+
pub async fn increase_udp4_connections(&self) {
51+
let mut stats_lock = self.stats.write().await;
52+
stats_lock.udp4_connections_handled += 1;
53+
drop(stats_lock);
54+
}
55+
56+
pub async fn increase_udp4_announces(&self) {
57+
let mut stats_lock = self.stats.write().await;
58+
stats_lock.udp4_announces_handled += 1;
59+
drop(stats_lock);
60+
}
61+
62+
pub async fn increase_udp4_scrapes(&self) {
63+
let mut stats_lock = self.stats.write().await;
64+
stats_lock.udp4_scrapes_handled += 1;
65+
drop(stats_lock);
66+
}
67+
68+
pub async fn increase_udp4_responses(&self) {
69+
let mut stats_lock = self.stats.write().await;
70+
stats_lock.udp4_responses += 1;
71+
drop(stats_lock);
72+
}
73+
74+
pub async fn increase_udp4_errors(&self) {
75+
let mut stats_lock = self.stats.write().await;
76+
stats_lock.udp4_errors_handled += 1;
77+
drop(stats_lock);
78+
}
79+
80+
#[allow(clippy::cast_precision_loss)]
81+
#[allow(clippy::cast_possible_truncation)]
82+
#[allow(clippy::cast_sign_loss)]
83+
pub async fn recalculate_udp_avg_connect_processing_time_ns(&self, req_processing_time: Duration) {
84+
let mut stats_lock = self.stats.write().await;
85+
86+
let req_processing_time = req_processing_time.as_nanos() as f64;
87+
let udp_connections_handled = (stats_lock.udp4_connections_handled + stats_lock.udp6_connections_handled) as f64;
88+
89+
let previous_avg = stats_lock.udp_avg_connect_processing_time_ns;
90+
91+
// Moving average: https://en.wikipedia.org/wiki/Moving_average
92+
let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_connections_handled;
93+
94+
stats_lock.udp_avg_connect_processing_time_ns = new_avg.ceil() as u64;
95+
96+
drop(stats_lock);
97+
}
98+
99+
#[allow(clippy::cast_precision_loss)]
100+
#[allow(clippy::cast_possible_truncation)]
101+
#[allow(clippy::cast_sign_loss)]
102+
pub async fn recalculate_udp_avg_announce_processing_time_ns(&self, req_processing_time: Duration) {
103+
let mut stats_lock = self.stats.write().await;
104+
105+
let req_processing_time = req_processing_time.as_nanos() as f64;
106+
107+
let udp_announces_handled = (stats_lock.udp4_announces_handled + stats_lock.udp6_announces_handled) as f64;
108+
109+
let previous_avg = stats_lock.udp_avg_announce_processing_time_ns;
110+
111+
// Moving average: https://en.wikipedia.org/wiki/Moving_average
112+
let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_announces_handled;
113+
114+
stats_lock.udp_avg_announce_processing_time_ns = new_avg.ceil() as u64;
115+
116+
drop(stats_lock);
117+
}
118+
119+
#[allow(clippy::cast_precision_loss)]
120+
#[allow(clippy::cast_possible_truncation)]
121+
#[allow(clippy::cast_sign_loss)]
122+
pub async fn recalculate_udp_avg_scrape_processing_time_ns(&self, req_processing_time: Duration) {
123+
let mut stats_lock = self.stats.write().await;
124+
125+
let req_processing_time = req_processing_time.as_nanos() as f64;
126+
let udp_scrapes_handled = (stats_lock.udp4_scrapes_handled + stats_lock.udp6_scrapes_handled) as f64;
127+
128+
let previous_avg = stats_lock.udp_avg_scrape_processing_time_ns;
129+
130+
// Moving average: https://en.wikipedia.org/wiki/Moving_average
131+
let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_scrapes_handled;
132+
133+
stats_lock.udp_avg_scrape_processing_time_ns = new_avg.ceil() as u64;
134+
135+
drop(stats_lock);
136+
}
137+
138+
pub async fn increase_udp6_requests(&self) {
139+
let mut stats_lock = self.stats.write().await;
140+
stats_lock.udp6_requests += 1;
141+
drop(stats_lock);
142+
}
143+
144+
pub async fn increase_udp6_connections(&self) {
145+
let mut stats_lock = self.stats.write().await;
146+
stats_lock.udp6_connections_handled += 1;
147+
drop(stats_lock);
148+
}
149+
150+
pub async fn increase_udp6_announces(&self) {
151+
let mut stats_lock = self.stats.write().await;
152+
stats_lock.udp6_announces_handled += 1;
153+
drop(stats_lock);
154+
}
155+
156+
pub async fn increase_udp6_scrapes(&self) {
157+
let mut stats_lock = self.stats.write().await;
158+
stats_lock.udp6_scrapes_handled += 1;
159+
drop(stats_lock);
160+
}
161+
162+
pub async fn increase_udp6_responses(&self) {
163+
let mut stats_lock = self.stats.write().await;
164+
stats_lock.udp6_responses += 1;
165+
drop(stats_lock);
166+
}
167+
168+
pub async fn increase_udp6_errors(&self) {
169+
let mut stats_lock = self.stats.write().await;
170+
stats_lock.udp6_errors_handled += 1;
171+
drop(stats_lock);
172+
}
173+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
//! Statistics services.
2+
//!
3+
//! It includes:
4+
//!
5+
//! - A [`factory`](crate::packages::udp_tracker_core::statistics::setup::factory) function to build the structs needed to collect the tracker metrics.
6+
//! - A [`get_metrics`] service to get the tracker [`metrics`](crate::packages::udp_tracker_core::statistics::metrics::Metrics).
7+
//!
8+
//! Tracker metrics are collected using a Publisher-Subscribe pattern.
9+
//!
10+
//! The factory function builds two structs:
11+
//!
12+
//! - An statistics event [`Sender`](crate::packages::udp_tracker_core::statistics::event::sender::Sender)
13+
//! - An statistics [`Repository`]
14+
//!
15+
//! ```text
16+
//! let (stats_event_sender, stats_repository) = factory(tracker_usage_statistics);
17+
//! ```
18+
//!
19+
//! The statistics repository is responsible for storing the metrics in memory.
20+
//! The statistics event sender allows sending events related to metrics.
21+
//! There is an event listener that is receiving all the events and processing them with an event handler.
22+
//! Then, the event handler updates the metrics depending on the received event.
23+
//!
24+
//! For example, if you send the event [`Event::Udp4Connect`](crate::packages::udp_tracker_core::statistics::event::Event::Udp4Connect):
25+
//!
26+
//! ```text
27+
//! let result = event_sender.send_event(Event::Udp4Connect).await;
28+
//! ```
29+
//!
30+
//! Eventually the counter for UDP connections from IPv4 peers will be increased.
31+
//!
32+
//! ```rust,no_run
33+
//! pub struct Metrics {
34+
//! // ...
35+
//! pub udp4_connections_handled: u64, // This will be incremented
36+
//! // ...
37+
//! }
38+
//! ```
39+
use std::sync::Arc;
40+
41+
use bittorrent_tracker_core::torrent::repository::in_memory::InMemoryTorrentRepository;
42+
use packages::udp_tracker_core::statistics::metrics::Metrics;
43+
use packages::udp_tracker_core::statistics::repository::Repository;
44+
use tokio::sync::RwLock;
45+
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
46+
47+
use crate::packages;
48+
use crate::servers::udp::server::banning::BanService;
49+
50+
/// All the metrics collected by the tracker.
51+
#[derive(Debug, PartialEq)]
52+
pub struct TrackerMetrics {
53+
/// Domain level metrics.
54+
///
55+
/// General metrics for all torrents (number of seeders, leechers, etcetera)
56+
pub torrents_metrics: TorrentsMetrics,
57+
58+
/// Application level metrics. Usage statistics/metrics.
59+
///
60+
/// Metrics about how the tracker is been used (number of udp announce requests, etcetera)
61+
pub protocol_metrics: Metrics,
62+
}
63+
64+
/// It returns all the [`TrackerMetrics`]
65+
pub async fn get_metrics(
66+
in_memory_torrent_repository: Arc<InMemoryTorrentRepository>,
67+
ban_service: Arc<RwLock<BanService>>,
68+
stats_repository: Arc<Repository>,
69+
) -> TrackerMetrics {
70+
let torrents_metrics = in_memory_torrent_repository.get_torrents_metrics();
71+
let stats = stats_repository.get_stats().await;
72+
let udp_banned_ips_total = ban_service.read().await.get_banned_ips_total();
73+
74+
TrackerMetrics {
75+
torrents_metrics,
76+
protocol_metrics: Metrics {
77+
// UDP
78+
udp_requests_aborted: stats.udp_requests_aborted,
79+
udp_requests_banned: stats.udp_requests_banned,
80+
udp_banned_ips_total: udp_banned_ips_total as u64,
81+
udp_avg_connect_processing_time_ns: stats.udp_avg_connect_processing_time_ns,
82+
udp_avg_announce_processing_time_ns: stats.udp_avg_announce_processing_time_ns,
83+
udp_avg_scrape_processing_time_ns: stats.udp_avg_scrape_processing_time_ns,
84+
// UDPv4
85+
udp4_requests: stats.udp4_requests,
86+
udp4_connections_handled: stats.udp4_connections_handled,
87+
udp4_announces_handled: stats.udp4_announces_handled,
88+
udp4_scrapes_handled: stats.udp4_scrapes_handled,
89+
udp4_responses: stats.udp4_responses,
90+
udp4_errors_handled: stats.udp4_errors_handled,
91+
// UDPv6
92+
udp6_requests: stats.udp6_requests,
93+
udp6_connections_handled: stats.udp6_connections_handled,
94+
udp6_announces_handled: stats.udp6_announces_handled,
95+
udp6_scrapes_handled: stats.udp6_scrapes_handled,
96+
udp6_responses: stats.udp6_responses,
97+
udp6_errors_handled: stats.udp6_errors_handled,
98+
},
99+
}
100+
}
101+
102+
#[cfg(test)]
103+
mod tests {
104+
use std::sync::Arc;
105+
106+
use bittorrent_tracker_core::torrent::repository::in_memory::InMemoryTorrentRepository;
107+
use bittorrent_tracker_core::{self};
108+
use tokio::sync::RwLock;
109+
use torrust_tracker_configuration::Configuration;
110+
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
111+
use torrust_tracker_test_helpers::configuration;
112+
113+
use crate::packages::udp_tracker_core::statistics;
114+
use crate::packages::udp_tracker_core::statistics::services::{get_metrics, TrackerMetrics};
115+
use crate::servers::udp::server::banning::BanService;
116+
use crate::servers::udp::server::launcher::MAX_CONNECTION_ID_ERRORS_PER_IP;
117+
118+
pub fn tracker_configuration() -> Configuration {
119+
configuration::ephemeral()
120+
}
121+
122+
#[tokio::test]
123+
async fn the_statistics_service_should_return_the_tracker_metrics() {
124+
let config = tracker_configuration();
125+
126+
let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::default());
127+
let (_stats_event_sender, stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics);
128+
let stats_repository = Arc::new(stats_repository);
129+
let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP)));
130+
131+
let tracker_metrics = get_metrics(
132+
in_memory_torrent_repository.clone(),
133+
ban_service.clone(),
134+
stats_repository.clone(),
135+
)
136+
.await;
137+
138+
assert_eq!(
139+
tracker_metrics,
140+
TrackerMetrics {
141+
torrents_metrics: TorrentsMetrics::default(),
142+
protocol_metrics: statistics::metrics::Metrics::default(),
143+
}
144+
);
145+
}
146+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
//! Setup for the tracker statistics.
2+
//!
3+
//! The [`factory`] function builds the structs needed for handling the tracker metrics.
4+
use crate::packages::udp_tracker_core::statistics;
5+
6+
/// It builds the structs needed for handling the tracker metrics.
7+
///
8+
/// It returns:
9+
///
10+
/// - An statistics event [`Sender`](crate::packages::udp_tracker_core::statistics::event::sender::Sender) that allows you to send events related to statistics.
11+
/// - An statistics [`Repository`](crate::packages::udp_tracker_core::statistics::repository::Repository) which is an in-memory repository for the tracker metrics.
12+
///
13+
/// When the input argument `tracker_usage_statistics`is false the setup does not run the event listeners, consequently the statistics
14+
/// events are sent are received but not dispatched to the handler.
15+
#[must_use]
16+
pub fn factory(
17+
tracker_usage_statistics: bool,
18+
) -> (
19+
Option<Box<dyn statistics::event::sender::Sender>>,
20+
statistics::repository::Repository,
21+
) {
22+
let mut stats_event_sender = None;
23+
24+
let mut stats_tracker = statistics::keeper::Keeper::new();
25+
26+
if tracker_usage_statistics {
27+
stats_event_sender = Some(stats_tracker.run_event_listener());
28+
}
29+
30+
(stats_event_sender, stats_tracker.repository)
31+
}
32+
33+
#[cfg(test)]
34+
mod test {
35+
use super::factory;
36+
37+
#[tokio::test]
38+
async fn should_not_send_any_event_when_statistics_are_disabled() {
39+
let tracker_usage_statistics = false;
40+
41+
let (stats_event_sender, _stats_repository) = factory(tracker_usage_statistics);
42+
43+
assert!(stats_event_sender.is_none());
44+
}
45+
46+
#[tokio::test]
47+
async fn should_send_events_when_statistics_are_enabled() {
48+
let tracker_usage_statistics = true;
49+
50+
let (stats_event_sender, _stats_repository) = factory(tracker_usage_statistics);
51+
52+
assert!(stats_event_sender.is_some());
53+
}
54+
}

0 commit comments

Comments
 (0)
Please sign in to comment.