Skip to content

Commit cce6b61

Browse files
committed
Merge #1400: Overhaul stats: decouple events from stats
9eba80f refactor: [#1395] rename send_stats_event to send_event (Jose Celano) 055db4e docs: [#1395] minor changes in comments (Jose Celano) d8f1696 refactor: [#1398] extract event module in UDP server (Jose Celano) ed93836 refactor: [#1397] extract event module in UDP core (Jose Celano) 7e364d1 refactor: [#1396] move event channel creation to events mod in HTTP tracker core (Jose Celano) 3d2243b refactor: [#1396] extract event module in HTTP core (Jose Celano) Pull request description: The new events are "generic" events that can be used for any purpose, not only generating stats. This will allow later to create one listener and metrics repository per server (socket address). See #1395. ACKs for top commit: josecelano: ACK 9eba80f Tree-SHA512: a88a6eeea28d08f3537704452056c086494a906da375de19fe4671ca7e74f6e00168543e4d4bdd9924da19b22b83aa307f6ff337a9fe6b33b1f3cd9c7e918bb6
2 parents 34c4249 + 9eba80f commit cce6b61

File tree

45 files changed

+474
-475
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+474
-475
lines changed

packages/axum-http-tracker-server/src/v1/handlers/scrape.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ mod tests {
103103
}
104104

105105
struct CoreHttpTrackerServices {
106-
pub http_stats_event_sender: Arc<Option<Box<dyn bittorrent_http_tracker_core::statistics::event::sender::Sender>>>,
106+
pub http_stats_event_sender: Arc<Option<Box<dyn bittorrent_http_tracker_core::event::sender::Sender>>>,
107107
}
108108

109109
fn initialize_private_tracker() -> (CoreTrackerServices, CoreHttpTrackerServices) {

packages/http-tracker-core/benches/helpers/util.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub struct CoreTrackerServices {
2626
}
2727

2828
pub struct CoreHttpTrackerServices {
29-
pub http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>>,
29+
pub http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
3030
}
3131

3232
pub fn initialize_core_tracker_services() -> (CoreTrackerServices, CoreHttpTrackerServices) {
@@ -105,14 +105,15 @@ pub fn sample_info_hash() -> InfoHash {
105105
.expect("String should be a valid info hash")
106106
}
107107

108-
use bittorrent_http_tracker_core::statistics;
108+
use bittorrent_http_tracker_core::event::Event;
109+
use bittorrent_http_tracker_core::{event, statistics};
109110
use futures::future::BoxFuture;
110111
use mockall::mock;
111112
use tokio::sync::broadcast::error::SendError;
112113

113114
mock! {
114115
HttpStatsEventSender {}
115-
impl statistics::event::sender::Sender for HttpStatsEventSender {
116-
fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option<Result<usize,SendError<statistics::event::Event> > > > ;
116+
impl event::sender::Sender for HttpStatsEventSender {
117+
fn send_event(&self, event: Event) -> BoxFuture<'static,Option<Result<usize,SendError<Event> > > > ;
117118
}
118119
}

packages/http-tracker-core/src/container.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use torrust_tracker_configuration::{Core, HttpTracker};
99

1010
use crate::services::announce::AnnounceService;
1111
use crate::services::scrape::ScrapeService;
12-
use crate::statistics;
12+
use crate::{event, statistics};
1313

1414
pub struct HttpTrackerCoreContainer {
1515
// todo: replace with TrackerCoreContainer
@@ -20,7 +20,7 @@ pub struct HttpTrackerCoreContainer {
2020
pub authentication_service: Arc<AuthenticationService>,
2121

2222
pub http_tracker_config: Arc<HttpTracker>,
23-
pub http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>>,
23+
pub http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
2424
pub http_stats_repository: Arc<statistics::repository::Repository>,
2525
pub announce_service: Arc<AnnounceService>,
2626
pub scrape_service: Arc<ScrapeService>,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use std::net::{IpAddr, SocketAddr};
2+
3+
pub mod sender;
4+
5+
/// A HTTP core event.
6+
#[derive(Debug, PartialEq, Eq, Clone)]
7+
pub enum Event {
8+
TcpAnnounce { connection: ConnectionContext },
9+
TcpScrape { connection: ConnectionContext },
10+
}
11+
12+
#[derive(Debug, PartialEq, Eq, Clone)]
13+
pub struct ConnectionContext {
14+
client: ClientConnectionContext,
15+
server: ServerConnectionContext,
16+
}
17+
18+
impl ConnectionContext {
19+
#[must_use]
20+
pub fn new(client_ip_addr: IpAddr, opt_client_port: Option<u16>, server_socket_addr: SocketAddr) -> Self {
21+
Self {
22+
client: ClientConnectionContext {
23+
ip_addr: client_ip_addr,
24+
port: opt_client_port,
25+
},
26+
server: ServerConnectionContext {
27+
socket_addr: server_socket_addr,
28+
},
29+
}
30+
}
31+
32+
#[must_use]
33+
pub fn client_ip_addr(&self) -> IpAddr {
34+
self.client.ip_addr
35+
}
36+
37+
#[must_use]
38+
pub fn client_port(&self) -> Option<u16> {
39+
self.client.port
40+
}
41+
42+
#[must_use]
43+
pub fn server_socket_addr(&self) -> SocketAddr {
44+
self.server.socket_addr
45+
}
46+
}
47+
48+
#[derive(Debug, PartialEq, Eq, Clone)]
49+
pub struct ClientConnectionContext {
50+
ip_addr: IpAddr,
51+
52+
/// It's provided if you use the `torrust-axum-http-tracker-server` crate.
53+
port: Option<u16>,
54+
}
55+
56+
#[derive(Debug, PartialEq, Eq, Clone)]
57+
pub struct ServerConnectionContext {
58+
socket_addr: SocketAddr,
59+
}

packages/http-tracker-core/src/statistics/event/sender.rs packages/http-tracker-core/src/event/sender.rs

+20-8
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,35 @@ use tokio::sync::broadcast::error::SendError;
77

88
use super::Event;
99

10-
/// A trait to allow sending statistics events
10+
const CHANNEL_CAPACITY: usize = 1024;
11+
12+
/// A trait for sending sending.
1113
#[cfg_attr(test, automock)]
1214
pub trait Sender: Sync + Send {
1315
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<usize, SendError<Event>>>>;
1416
}
1517

16-
/// An [`statistics::EventSender`](crate::statistics::event::sender::Sender) implementation.
17-
///
18-
/// It uses a channel sender to send the statistic events. The channel is created by a
19-
/// [`statistics::Keeper`](crate::statistics::keeper::Keeper)
20-
#[allow(clippy::module_name_repetitions)]
21-
pub struct ChannelSender {
18+
/// An event sender implementation using a broadcast channel.
19+
pub struct Broadcaster {
2220
pub(crate) sender: broadcast::Sender<Event>,
2321
}
2422

25-
impl Sender for ChannelSender {
23+
impl Sender for Broadcaster {
2624
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<usize, SendError<Event>>>> {
2725
async move { Some(self.sender.send(event)) }.boxed()
2826
}
2927
}
28+
29+
impl Default for Broadcaster {
30+
fn default() -> Self {
31+
let (sender, _) = broadcast::channel(CHANNEL_CAPACITY);
32+
Self { sender }
33+
}
34+
}
35+
36+
impl Broadcaster {
37+
#[must_use]
38+
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
39+
self.sender.subscribe()
40+
}
41+
}

packages/http-tracker-core/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod container;
2+
pub mod event;
23
pub mod services;
34
pub mod statistics;
45

packages/http-tracker-core/src/services/announce.rs

+22-20
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
//! It delegates the `announce` logic to the [`AnnounceHandler`] and it returns
66
//! the [`AnnounceData`].
77
//!
8-
//! It also sends an [`http_tracker_core::statistics::event::Event`]
8+
//! It also sends an [`http_tracker_core::event::Event`]
99
//! because events are specific for the HTTP tracker.
1010
use std::net::{IpAddr, SocketAddr};
1111
use std::panic::Location;
@@ -22,7 +22,8 @@ use bittorrent_tracker_core::whitelist;
2222
use torrust_tracker_configuration::Core;
2323
use torrust_tracker_primitives::core::AnnounceData;
2424

25-
use crate::statistics;
25+
use crate::event;
26+
use crate::event::Event;
2627

2728
/// The HTTP tracker `announce` service.
2829
///
@@ -35,7 +36,7 @@ pub struct AnnounceService {
3536
announce_handler: Arc<AnnounceHandler>,
3637
authentication_service: Arc<AuthenticationService>,
3738
whitelist_authorization: Arc<whitelist::authorization::WhitelistAuthorization>,
38-
opt_http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>>,
39+
opt_http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
3940
}
4041

4142
impl AnnounceService {
@@ -45,7 +46,7 @@ impl AnnounceService {
4546
announce_handler: Arc<AnnounceHandler>,
4647
authentication_service: Arc<AuthenticationService>,
4748
whitelist_authorization: Arc<whitelist::authorization::WhitelistAuthorization>,
48-
opt_http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>>,
49+
opt_http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
4950
) -> Self {
5051
Self {
5152
core_config,
@@ -86,7 +87,7 @@ impl AnnounceService {
8687
.announce(&announce_request.info_hash, &mut peer, &remote_client_ip, &peers_wanted)
8788
.await?;
8889

89-
self.send_stats_event(remote_client_ip, opt_remote_client_port, *server_socket_addr)
90+
self.send_event(remote_client_ip, opt_remote_client_port, *server_socket_addr)
9091
.await;
9192

9293
Ok(announce_data)
@@ -137,11 +138,11 @@ impl AnnounceService {
137138
}
138139
}
139140

140-
async fn send_stats_event(&self, peer_ip: IpAddr, opt_peer_ip_port: Option<u16>, server_socket_addr: SocketAddr) {
141+
async fn send_event(&self, peer_ip: IpAddr, opt_peer_ip_port: Option<u16>, server_socket_addr: SocketAddr) {
141142
if let Some(http_stats_event_sender) = self.opt_http_stats_event_sender.as_deref() {
142143
http_stats_event_sender
143-
.send_event(statistics::event::Event::TcpAnnounce {
144-
connection: statistics::event::ConnectionContext::new(peer_ip, opt_peer_ip_port, server_socket_addr),
144+
.send_event(Event::TcpAnnounce {
145+
connection: event::ConnectionContext::new(peer_ip, opt_peer_ip_port, server_socket_addr),
145146
})
146147
.await;
147148
}
@@ -227,7 +228,7 @@ mod tests {
227228
}
228229

229230
struct CoreHttpTrackerServices {
230-
pub http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>>,
231+
pub http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
231232
}
232233

233234
fn initialize_core_tracker_services() -> (CoreTrackerServices, CoreHttpTrackerServices) {
@@ -317,13 +318,14 @@ mod tests {
317318
use mockall::mock;
318319
use tokio::sync::broadcast::error::SendError;
319320

320-
use crate::statistics;
321+
use crate::event::Event;
321322
use crate::tests::sample_info_hash;
323+
use crate::{event, statistics};
322324

323325
mock! {
324326
HttpStatsEventSender {}
325-
impl statistics::event::sender::Sender for HttpStatsEventSender {
326-
fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option<Result<usize,SendError<statistics::event::Event> > > > ;
327+
impl event::sender::Sender for HttpStatsEventSender {
328+
fn send_event(&self, event: Event) -> BoxFuture<'static,Option<Result<usize,SendError<Event> > > > ;
327329
}
328330
}
329331

@@ -340,13 +342,13 @@ mod tests {
340342
use torrust_tracker_test_helpers::configuration;
341343

342344
use super::{sample_peer_using_ipv4, sample_peer_using_ipv6};
345+
use crate::event;
346+
use crate::event::{ConnectionContext, Event};
343347
use crate::services::announce::tests::{
344348
initialize_core_tracker_services, initialize_core_tracker_services_with_config, sample_announce_request_for_peer,
345349
sample_peer, MockHttpStatsEventSender,
346350
};
347351
use crate::services::announce::AnnounceService;
348-
use crate::statistics;
349-
use crate::statistics::event::ConnectionContext;
350352

351353
#[tokio::test]
352354
async fn it_should_return_the_announce_data() {
@@ -391,12 +393,12 @@ mod tests {
391393
let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new();
392394
http_stats_event_sender_mock
393395
.expect_send_event()
394-
.with(eq(statistics::event::Event::TcpAnnounce {
396+
.with(eq(Event::TcpAnnounce {
395397
connection: ConnectionContext::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), Some(8080), server_socket_addr),
396398
}))
397399
.times(1)
398400
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
399-
let http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
401+
let http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>> =
400402
Arc::new(Some(Box::new(http_stats_event_sender_mock)));
401403

402404
let (core_tracker_services, mut core_http_tracker_services) = initialize_core_tracker_services();
@@ -447,12 +449,12 @@ mod tests {
447449
let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new();
448450
http_stats_event_sender_mock
449451
.expect_send_event()
450-
.with(eq(statistics::event::Event::TcpAnnounce {
452+
.with(eq(Event::TcpAnnounce {
451453
connection: ConnectionContext::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), Some(8080), server_socket_addr),
452454
}))
453455
.times(1)
454456
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
455-
let http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
457+
let http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>> =
456458
Arc::new(Some(Box::new(http_stats_event_sender_mock)));
457459

458460
let (core_tracker_services, mut core_http_tracker_services) =
@@ -486,7 +488,7 @@ mod tests {
486488
let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new();
487489
http_stats_event_sender_mock
488490
.expect_send_event()
489-
.with(eq(statistics::event::Event::TcpAnnounce {
491+
.with(eq(Event::TcpAnnounce {
490492
connection: ConnectionContext::new(
491493
IpAddr::V6(Ipv6Addr::new(0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969)),
492494
Some(8080),
@@ -495,7 +497,7 @@ mod tests {
495497
}))
496498
.times(1)
497499
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
498-
let http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
500+
let http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>> =
499501
Arc::new(Some(Box::new(http_stats_event_sender_mock)));
500502

501503
let (core_tracker_services, mut core_http_tracker_services) = initialize_core_tracker_services();

0 commit comments

Comments
 (0)