Skip to content

Commit 2be682e

Browse files
committed
refactor: [#1380] refactor: [#1371] add connection context to UDP core events
1 parent 0428972 commit 2be682e

File tree

11 files changed

+401
-159
lines changed

11 files changed

+401
-159
lines changed

packages/udp-tracker-core/benches/helpers/sync.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
12
use std::sync::Arc;
23
use std::time::{Duration, Instant};
34

@@ -8,13 +9,16 @@ use crate::helpers::utils::{sample_ipv4_remote_addr, sample_issue_time};
89

910
#[allow(clippy::unused_async)]
1011
pub async fn connect_once(samples: u64) -> Duration {
12+
let client_socket_addr = sample_ipv4_remote_addr();
13+
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
14+
1115
let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false);
1216
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);
1317
let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));
1418
let start = Instant::now();
1519

1620
for _ in 0..samples {
17-
let _response = connect_service.handle_connect(sample_ipv4_remote_addr(), sample_issue_time());
21+
let _response = connect_service.handle_connect(client_socket_addr, server_socket_addr, sample_issue_time());
1822
}
1923

2024
start.elapsed()

packages/udp-tracker-core/src/services/announce.rs

+14-8
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use torrust_tracker_primitives::core::AnnounceData;
2121

2222
use crate::connection_cookie::{check, gen_remote_fingerprint, ConnectionCookieError};
2323
use crate::statistics;
24+
use crate::statistics::event::ConnectionContext;
2425

2526
/// The `AnnounceService` is responsible for handling the `announce` requests.
2627
///
@@ -57,17 +58,18 @@ impl AnnounceService {
5758
/// whitelist.
5859
pub async fn handle_announce(
5960
&self,
60-
remote_addr: SocketAddr,
61+
client_socket_addr: SocketAddr,
62+
server_socket_addr: SocketAddr,
6163
request: &AnnounceRequest,
6264
cookie_valid_range: Range<f64>,
6365
) -> Result<AnnounceData, UdpAnnounceError> {
64-
Self::authenticate(remote_addr, request, cookie_valid_range)?;
66+
Self::authenticate(client_socket_addr, request, cookie_valid_range)?;
6567

6668
let info_hash = request.info_hash.into();
6769

6870
self.authorize(&info_hash).await?;
6971

70-
let remote_client_ip = remote_addr.ip();
72+
let remote_client_ip = client_socket_addr.ip();
7173

7274
let mut peer = peer_builder::from_request(request, &remote_client_ip);
7375

@@ -78,7 +80,7 @@ impl AnnounceService {
7880
.announce(&info_hash, &mut peer, &remote_client_ip, &peers_wanted)
7981
.await?;
8082

81-
self.send_stats_event(remote_client_ip).await;
83+
self.send_stats_event(client_socket_addr, server_socket_addr).await;
8284

8385
Ok(announce_data)
8486
}
@@ -99,11 +101,15 @@ impl AnnounceService {
99101
self.whitelist_authorization.authorize(info_hash).await
100102
}
101103

102-
async fn send_stats_event(&self, peer_ip: IpAddr) {
104+
async fn send_stats_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) {
103105
if let Some(udp_stats_event_sender) = self.opt_udp_core_stats_event_sender.as_deref() {
104-
let event = match peer_ip {
105-
IpAddr::V4(_) => statistics::event::Event::Udp4Announce,
106-
IpAddr::V6(_) => statistics::event::Event::Udp6Announce,
106+
let event = match client_socket_addr.ip() {
107+
IpAddr::V4(_) => statistics::event::Event::Udp4Announce {
108+
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
109+
},
110+
IpAddr::V6(_) => statistics::event::Event::Udp6Announce {
111+
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
112+
},
107113
};
108114

109115
udp_stats_event_sender.send_event(event).await;

packages/udp-tracker-core/src/services/connect.rs

+46-14
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use aquatic_udp_protocol::ConnectionId;
88

99
use crate::connection_cookie::{gen_remote_fingerprint, make};
1010
use crate::statistics;
11+
use crate::statistics::event::ConnectionContext;
1112

1213
/// The `ConnectService` is responsible for handling the `connect` requests.
1314
///
@@ -30,16 +31,30 @@ impl ConnectService {
3031
/// # Panics
3132
///
3233
/// It will panic if there was an error making the connection cookie.
33-
pub async fn handle_connect(&self, remote_addr: SocketAddr, cookie_issue_time: f64) -> ConnectionId {
34-
let connection_id = make(gen_remote_fingerprint(&remote_addr), cookie_issue_time).expect("it should be a normal value");
34+
pub async fn handle_connect(
35+
&self,
36+
client_socket_addr: SocketAddr,
37+
server_socket_addr: SocketAddr,
38+
cookie_issue_time: f64,
39+
) -> ConnectionId {
40+
let connection_id =
41+
make(gen_remote_fingerprint(&client_socket_addr), cookie_issue_time).expect("it should be a normal value");
3542

3643
if let Some(udp_stats_event_sender) = self.opt_udp_core_stats_event_sender.as_deref() {
37-
match remote_addr {
44+
match client_socket_addr {
3845
SocketAddr::V4(_) => {
39-
udp_stats_event_sender.send_event(statistics::event::Event::Udp4Connect).await;
46+
udp_stats_event_sender
47+
.send_event(statistics::event::Event::Udp4Connect {
48+
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
49+
})
50+
.await;
4051
}
4152
SocketAddr::V6(_) => {
42-
udp_stats_event_sender.send_event(statistics::event::Event::Udp6Connect).await;
53+
udp_stats_event_sender
54+
.send_event(statistics::event::Event::Udp6Connect {
55+
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
56+
})
57+
.await;
4358
}
4459
}
4560
}
@@ -54,6 +69,7 @@ mod tests {
5469
mod connect_request {
5570

5671
use std::future;
72+
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
5773
use std::sync::Arc;
5874

5975
use mockall::predicate::eq;
@@ -65,16 +81,19 @@ mod tests {
6581
sample_ipv6_remote_addr_fingerprint, sample_issue_time, MockUdpCoreStatsEventSender,
6682
};
6783
use crate::statistics;
84+
use crate::statistics::event::ConnectionContext;
6885

6986
#[tokio::test]
7087
async fn a_connect_response_should_contain_the_same_transaction_id_as_the_connect_request() {
88+
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
89+
7190
let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false);
7291
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);
7392

7493
let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));
7594

7695
let response = connect_service
77-
.handle_connect(sample_ipv4_remote_addr(), sample_issue_time())
96+
.handle_connect(sample_ipv4_remote_addr(), server_socket_addr, sample_issue_time())
7897
.await;
7998

8099
assert_eq!(
@@ -85,13 +104,15 @@ mod tests {
85104

86105
#[tokio::test]
87106
async fn a_connect_response_should_contain_a_new_connection_id() {
107+
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
108+
88109
let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false);
89110
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);
90111

91112
let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));
92113

93114
let response = connect_service
94-
.handle_connect(sample_ipv4_remote_addr(), sample_issue_time())
115+
.handle_connect(sample_ipv4_remote_addr(), server_socket_addr, sample_issue_time())
95116
.await;
96117

97118
assert_eq!(
@@ -102,13 +123,16 @@ mod tests {
102123

103124
#[tokio::test]
104125
async fn a_connect_response_should_contain_a_new_connection_id_ipv6() {
126+
let client_socket_addr = sample_ipv6_remote_addr();
127+
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
128+
105129
let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false);
106130
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);
107131

108132
let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));
109133

110134
let response = connect_service
111-
.handle_connect(sample_ipv6_remote_addr(), sample_issue_time())
135+
.handle_connect(client_socket_addr, server_socket_addr, sample_issue_time())
112136
.await;
113137

114138
assert_eq!(
@@ -119,30 +143,38 @@ mod tests {
119143

120144
#[tokio::test]
121145
async fn it_should_send_the_upd4_connect_event_when_a_client_tries_to_connect_using_a_ip4_socket_address() {
146+
let client_socket_addr = sample_ipv4_socket_address();
147+
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
148+
122149
let mut udp_stats_event_sender_mock = MockUdpCoreStatsEventSender::new();
123150
udp_stats_event_sender_mock
124151
.expect_send_event()
125-
.with(eq(statistics::event::Event::Udp4Connect))
152+
.with(eq(statistics::event::Event::Udp4Connect {
153+
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
154+
}))
126155
.times(1)
127156
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
128157
let opt_udp_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
129158
Arc::new(Some(Box::new(udp_stats_event_sender_mock)));
130159

131-
let client_socket_address = sample_ipv4_socket_address();
132-
133160
let connect_service = Arc::new(ConnectService::new(opt_udp_stats_event_sender));
134161

135162
connect_service
136-
.handle_connect(client_socket_address, sample_issue_time())
163+
.handle_connect(client_socket_addr, server_socket_addr, sample_issue_time())
137164
.await;
138165
}
139166

140167
#[tokio::test]
141168
async fn it_should_send_the_upd6_connect_event_when_a_client_tries_to_connect_using_a_ip6_socket_address() {
169+
let client_socket_addr = sample_ipv6_remote_addr();
170+
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
171+
142172
let mut udp_stats_event_sender_mock = MockUdpCoreStatsEventSender::new();
143173
udp_stats_event_sender_mock
144174
.expect_send_event()
145-
.with(eq(statistics::event::Event::Udp6Connect))
175+
.with(eq(statistics::event::Event::Udp6Connect {
176+
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
177+
}))
146178
.times(1)
147179
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
148180
let opt_udp_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
@@ -151,7 +183,7 @@ mod tests {
151183
let connect_service = Arc::new(ConnectService::new(opt_udp_stats_event_sender));
152184

153185
connect_service
154-
.handle_connect(sample_ipv6_remote_addr(), sample_issue_time())
186+
.handle_connect(client_socket_addr, server_socket_addr, sample_issue_time())
155187
.await;
156188
}
157189
}

packages/udp-tracker-core/src/services/scrape.rs

+13-7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use torrust_tracker_primitives::core::ScrapeData;
1919

2020
use crate::connection_cookie::{check, gen_remote_fingerprint, ConnectionCookieError};
2121
use crate::statistics;
22+
use crate::statistics::event::ConnectionContext;
2223

2324
/// The `ScrapeService` is responsible for handling the `scrape` requests.
2425
///
@@ -49,18 +50,19 @@ impl ScrapeService {
4950
/// It will return an error if the tracker core scrape handler returns an error.
5051
pub async fn handle_scrape(
5152
&self,
52-
remote_client_addr: SocketAddr,
53+
client_socket_addr: SocketAddr,
54+
server_socket_addr: SocketAddr,
5355
request: &ScrapeRequest,
5456
cookie_valid_range: Range<f64>,
5557
) -> Result<ScrapeData, UdpScrapeError> {
56-
Self::authenticate(remote_client_addr, request, cookie_valid_range)?;
58+
Self::authenticate(client_socket_addr, request, cookie_valid_range)?;
5759

5860
let scrape_data = self
5961
.scrape_handler
6062
.scrape(&Self::convert_from_aquatic(&request.info_hashes))
6163
.await?;
6264

63-
self.send_stats_event(remote_client_addr).await;
65+
self.send_stats_event(client_socket_addr, server_socket_addr).await;
6466

6567
Ok(scrape_data)
6668
}
@@ -81,11 +83,15 @@ impl ScrapeService {
8183
aquatic_infohashes.iter().map(|&x| x.into()).collect()
8284
}
8385

84-
async fn send_stats_event(&self, remote_addr: SocketAddr) {
86+
async fn send_stats_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) {
8587
if let Some(udp_stats_event_sender) = self.opt_udp_stats_event_sender.as_deref() {
86-
let event = match remote_addr {
87-
SocketAddr::V4(_) => statistics::event::Event::Udp4Scrape,
88-
SocketAddr::V6(_) => statistics::event::Event::Udp6Scrape,
88+
let event = match client_socket_addr {
89+
SocketAddr::V4(_) => statistics::event::Event::Udp4Scrape {
90+
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
91+
},
92+
SocketAddr::V6(_) => statistics::event::Event::Udp6Scrape {
93+
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
94+
},
8995
};
9096
udp_stats_event_sender.send_event(event).await;
9197
}

0 commit comments

Comments
 (0)