Skip to content

Commit df507a8

Browse files
committed
Merge #1381: Overhaul stats events: merge UDP core events with a different IP version
8603f8b refactor: [#1380] refactor: [#1373] merge UDP stats events with different IP version (Jose Celano) 2be682e refactor: [#1380] refactor: [#1371] add connection context to UDP core events (Jose Celano) Pull request description: Change events from this: `bittorrent_udp_tracker_core::statistics::event::Event`: ```rust pub enum Event { Udp4Connect, Udp4Announce, Udp4Scrape, Udp6Connect, Udp6Announce, Udp6Scrape, } ``` To this: ```rust pub enum Event { UdpConnect { context: ConnectionContext }, UdpAnnounce { context: ConnectionContext }, UdpScrape { context: ConnectionContext }, } pub struct ConnectionContext { client_socket_addr: SocketAddr, server_socket_addr: SocketAddr, } ``` This have also be done for [HTTP core events](#1374). ### Sub-tasks - [x] Add `ConnectionContext` to events. - [x] Merge events with the same request type (`connect`, `announce` and `scrape`). ACKs for top commit: josecelano: ACK 8603f8b Tree-SHA512: dde61535ec1f6c768ae2ad4d911e4ecece9be94c2546fd7b06a0d8d45d122caa6fcbf370cfa5de61a0d10424927cef17f7faf0a351252ab28ccb17e38058269e
2 parents 0428972 + 8603f8b commit df507a8

File tree

11 files changed

+375
-176
lines changed

11 files changed

+375
-176
lines changed

packages/udp-tracker-core/benches/helpers/sync.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
12
use std::sync::Arc;
23
use std::time::{Duration, Instant};
34

@@ -8,13 +9,16 @@ use crate::helpers::utils::{sample_ipv4_remote_addr, sample_issue_time};
89

910
#[allow(clippy::unused_async)]
1011
pub async fn connect_once(samples: u64) -> Duration {
12+
let client_socket_addr = sample_ipv4_remote_addr();
13+
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
14+
1115
let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false);
1216
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);
1317
let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));
1418
let start = Instant::now();
1519

1620
for _ in 0..samples {
17-
let _response = connect_service.handle_connect(sample_ipv4_remote_addr(), sample_issue_time());
21+
let _response = connect_service.handle_connect(client_socket_addr, server_socket_addr, sample_issue_time());
1822
}
1923

2024
start.elapsed()

packages/udp-tracker-core/src/services/announce.rs

+13-12
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
//!
88
//! It also sends an [`udp_tracker_core::statistics::event::Event`]
99
//! because events are specific for the HTTP tracker.
10-
use std::net::{IpAddr, SocketAddr};
10+
use std::net::SocketAddr;
1111
use std::ops::Range;
1212
use std::sync::Arc;
1313

@@ -21,6 +21,7 @@ use torrust_tracker_primitives::core::AnnounceData;
2121

2222
use crate::connection_cookie::{check, gen_remote_fingerprint, ConnectionCookieError};
2323
use crate::statistics;
24+
use crate::statistics::event::ConnectionContext;
2425

2526
/// The `AnnounceService` is responsible for handling the `announce` requests.
2627
///
@@ -57,17 +58,18 @@ impl AnnounceService {
5758
/// whitelist.
5859
pub async fn handle_announce(
5960
&self,
60-
remote_addr: SocketAddr,
61+
client_socket_addr: SocketAddr,
62+
server_socket_addr: SocketAddr,
6163
request: &AnnounceRequest,
6264
cookie_valid_range: Range<f64>,
6365
) -> Result<AnnounceData, UdpAnnounceError> {
64-
Self::authenticate(remote_addr, request, cookie_valid_range)?;
66+
Self::authenticate(client_socket_addr, request, cookie_valid_range)?;
6567

6668
let info_hash = request.info_hash.into();
6769

6870
self.authorize(&info_hash).await?;
6971

70-
let remote_client_ip = remote_addr.ip();
72+
let remote_client_ip = client_socket_addr.ip();
7173

7274
let mut peer = peer_builder::from_request(request, &remote_client_ip);
7375

@@ -78,7 +80,7 @@ impl AnnounceService {
7880
.announce(&info_hash, &mut peer, &remote_client_ip, &peers_wanted)
7981
.await?;
8082

81-
self.send_stats_event(remote_client_ip).await;
83+
self.send_stats_event(client_socket_addr, server_socket_addr).await;
8284

8385
Ok(announce_data)
8486
}
@@ -99,14 +101,13 @@ impl AnnounceService {
99101
self.whitelist_authorization.authorize(info_hash).await
100102
}
101103

102-
async fn send_stats_event(&self, peer_ip: IpAddr) {
104+
async fn send_stats_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) {
103105
if let Some(udp_stats_event_sender) = self.opt_udp_core_stats_event_sender.as_deref() {
104-
let event = match peer_ip {
105-
IpAddr::V4(_) => statistics::event::Event::Udp4Announce,
106-
IpAddr::V6(_) => statistics::event::Event::Udp6Announce,
107-
};
108-
109-
udp_stats_event_sender.send_event(event).await;
106+
udp_stats_event_sender
107+
.send_event(statistics::event::Event::UdpAnnounce {
108+
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
109+
})
110+
.await;
110111
}
111112
}
112113
}

packages/udp-tracker-core/src/services/connect.rs

+40-19
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use aquatic_udp_protocol::ConnectionId;
88

99
use crate::connection_cookie::{gen_remote_fingerprint, make};
1010
use crate::statistics;
11+
use crate::statistics::event::ConnectionContext;
1112

1213
/// The `ConnectService` is responsible for handling the `connect` requests.
1314
///
@@ -30,18 +31,21 @@ impl ConnectService {
3031
/// # Panics
3132
///
3233
/// It will panic if there was an error making the connection cookie.
33-
pub async fn handle_connect(&self, remote_addr: SocketAddr, cookie_issue_time: f64) -> ConnectionId {
34-
let connection_id = make(gen_remote_fingerprint(&remote_addr), cookie_issue_time).expect("it should be a normal value");
34+
pub async fn handle_connect(
35+
&self,
36+
client_socket_addr: SocketAddr,
37+
server_socket_addr: SocketAddr,
38+
cookie_issue_time: f64,
39+
) -> ConnectionId {
40+
let connection_id =
41+
make(gen_remote_fingerprint(&client_socket_addr), cookie_issue_time).expect("it should be a normal value");
3542

3643
if let Some(udp_stats_event_sender) = self.opt_udp_core_stats_event_sender.as_deref() {
37-
match remote_addr {
38-
SocketAddr::V4(_) => {
39-
udp_stats_event_sender.send_event(statistics::event::Event::Udp4Connect).await;
40-
}
41-
SocketAddr::V6(_) => {
42-
udp_stats_event_sender.send_event(statistics::event::Event::Udp6Connect).await;
43-
}
44-
}
44+
udp_stats_event_sender
45+
.send_event(statistics::event::Event::UdpConnect {
46+
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
47+
})
48+
.await;
4549
}
4650

4751
connection_id
@@ -54,6 +58,7 @@ mod tests {
5458
mod connect_request {
5559

5660
use std::future;
61+
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
5762
use std::sync::Arc;
5863

5964
use mockall::predicate::eq;
@@ -65,16 +70,19 @@ mod tests {
6570
sample_ipv6_remote_addr_fingerprint, sample_issue_time, MockUdpCoreStatsEventSender,
6671
};
6772
use crate::statistics;
73+
use crate::statistics::event::ConnectionContext;
6874

6975
#[tokio::test]
7076
async fn a_connect_response_should_contain_the_same_transaction_id_as_the_connect_request() {
77+
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
78+
7179
let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false);
7280
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);
7381

7482
let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));
7583

7684
let response = connect_service
77-
.handle_connect(sample_ipv4_remote_addr(), sample_issue_time())
85+
.handle_connect(sample_ipv4_remote_addr(), server_socket_addr, sample_issue_time())
7886
.await;
7987

8088
assert_eq!(
@@ -85,13 +93,15 @@ mod tests {
8593

8694
#[tokio::test]
8795
async fn a_connect_response_should_contain_a_new_connection_id() {
96+
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
97+
8898
let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false);
8999
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);
90100

91101
let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));
92102

93103
let response = connect_service
94-
.handle_connect(sample_ipv4_remote_addr(), sample_issue_time())
104+
.handle_connect(sample_ipv4_remote_addr(), server_socket_addr, sample_issue_time())
95105
.await;
96106

97107
assert_eq!(
@@ -102,13 +112,16 @@ mod tests {
102112

103113
#[tokio::test]
104114
async fn a_connect_response_should_contain_a_new_connection_id_ipv6() {
115+
let client_socket_addr = sample_ipv6_remote_addr();
116+
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
117+
105118
let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false);
106119
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);
107120

108121
let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));
109122

110123
let response = connect_service
111-
.handle_connect(sample_ipv6_remote_addr(), sample_issue_time())
124+
.handle_connect(client_socket_addr, server_socket_addr, sample_issue_time())
112125
.await;
113126

114127
assert_eq!(
@@ -119,30 +132,38 @@ mod tests {
119132

120133
#[tokio::test]
121134
async fn it_should_send_the_upd4_connect_event_when_a_client_tries_to_connect_using_a_ip4_socket_address() {
135+
let client_socket_addr = sample_ipv4_socket_address();
136+
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
137+
122138
let mut udp_stats_event_sender_mock = MockUdpCoreStatsEventSender::new();
123139
udp_stats_event_sender_mock
124140
.expect_send_event()
125-
.with(eq(statistics::event::Event::Udp4Connect))
141+
.with(eq(statistics::event::Event::UdpConnect {
142+
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
143+
}))
126144
.times(1)
127145
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
128146
let opt_udp_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
129147
Arc::new(Some(Box::new(udp_stats_event_sender_mock)));
130148

131-
let client_socket_address = sample_ipv4_socket_address();
132-
133149
let connect_service = Arc::new(ConnectService::new(opt_udp_stats_event_sender));
134150

135151
connect_service
136-
.handle_connect(client_socket_address, sample_issue_time())
152+
.handle_connect(client_socket_addr, server_socket_addr, sample_issue_time())
137153
.await;
138154
}
139155

140156
#[tokio::test]
141157
async fn it_should_send_the_upd6_connect_event_when_a_client_tries_to_connect_using_a_ip6_socket_address() {
158+
let client_socket_addr = sample_ipv6_remote_addr();
159+
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
160+
142161
let mut udp_stats_event_sender_mock = MockUdpCoreStatsEventSender::new();
143162
udp_stats_event_sender_mock
144163
.expect_send_event()
145-
.with(eq(statistics::event::Event::Udp6Connect))
164+
.with(eq(statistics::event::Event::UdpConnect {
165+
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
166+
}))
146167
.times(1)
147168
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
148169
let opt_udp_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
@@ -151,7 +172,7 @@ mod tests {
151172
let connect_service = Arc::new(ConnectService::new(opt_udp_stats_event_sender));
152173

153174
connect_service
154-
.handle_connect(sample_ipv6_remote_addr(), sample_issue_time())
175+
.handle_connect(client_socket_addr, server_socket_addr, sample_issue_time())
155176
.await;
156177
}
157178
}

packages/udp-tracker-core/src/services/scrape.rs

+11-9
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use torrust_tracker_primitives::core::ScrapeData;
1919

2020
use crate::connection_cookie::{check, gen_remote_fingerprint, ConnectionCookieError};
2121
use crate::statistics;
22+
use crate::statistics::event::ConnectionContext;
2223

2324
/// The `ScrapeService` is responsible for handling the `scrape` requests.
2425
///
@@ -49,18 +50,19 @@ impl ScrapeService {
4950
/// It will return an error if the tracker core scrape handler returns an error.
5051
pub async fn handle_scrape(
5152
&self,
52-
remote_client_addr: SocketAddr,
53+
client_socket_addr: SocketAddr,
54+
server_socket_addr: SocketAddr,
5355
request: &ScrapeRequest,
5456
cookie_valid_range: Range<f64>,
5557
) -> Result<ScrapeData, UdpScrapeError> {
56-
Self::authenticate(remote_client_addr, request, cookie_valid_range)?;
58+
Self::authenticate(client_socket_addr, request, cookie_valid_range)?;
5759

5860
let scrape_data = self
5961
.scrape_handler
6062
.scrape(&Self::convert_from_aquatic(&request.info_hashes))
6163
.await?;
6264

63-
self.send_stats_event(remote_client_addr).await;
65+
self.send_stats_event(client_socket_addr, server_socket_addr).await;
6466

6567
Ok(scrape_data)
6668
}
@@ -81,13 +83,13 @@ impl ScrapeService {
8183
aquatic_infohashes.iter().map(|&x| x.into()).collect()
8284
}
8385

84-
async fn send_stats_event(&self, remote_addr: SocketAddr) {
86+
async fn send_stats_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) {
8587
if let Some(udp_stats_event_sender) = self.opt_udp_stats_event_sender.as_deref() {
86-
let event = match remote_addr {
87-
SocketAddr::V4(_) => statistics::event::Event::Udp4Scrape,
88-
SocketAddr::V6(_) => statistics::event::Event::Udp6Scrape,
89-
};
90-
udp_stats_event_sender.send_event(event).await;
88+
udp_stats_event_sender
89+
.send_event(statistics::event::Event::UdpScrape {
90+
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
91+
})
92+
.await;
9193
}
9294
}
9395
}

0 commit comments

Comments
 (0)