From 2be682e1779088432126e7a2c6ee39dd4f4b7094 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 17 Mar 2025 17:07:55 +0000 Subject: [PATCH 1/2] refactor: [#1380] refactor: [#1371] add connection context to UDP core events --- .../udp-tracker-core/benches/helpers/sync.rs | 6 +- .../udp-tracker-core/src/services/announce.rs | 22 ++- .../udp-tracker-core/src/services/connect.rs | 60 ++++++-- .../udp-tracker-core/src/services/scrape.rs | 20 ++- .../src/statistics/event/handler.rs | 139 ++++++++++++++---- .../src/statistics/event/mod.rs | 35 +++-- .../udp-tracker-core/src/statistics/keeper.rs | 13 +- .../src/handlers/announce.rs | 135 ++++++++++------- .../src/handlers/connect.rs | 38 ++++- .../udp-tracker-server/src/handlers/mod.rs | 22 ++- .../udp-tracker-server/src/handlers/scrape.rs | 70 +++++---- 11 files changed, 401 insertions(+), 159 deletions(-) diff --git a/packages/udp-tracker-core/benches/helpers/sync.rs b/packages/udp-tracker-core/benches/helpers/sync.rs index b7d8e848d..ca459c640 100644 --- a/packages/udp-tracker-core/benches/helpers/sync.rs +++ b/packages/udp-tracker-core/benches/helpers/sync.rs @@ -1,3 +1,4 @@ +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -8,13 +9,16 @@ use crate::helpers::utils::{sample_ipv4_remote_addr, sample_issue_time}; #[allow(clippy::unused_async)] pub async fn connect_once(samples: u64) -> Duration { + let client_socket_addr = sample_ipv4_remote_addr(); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false); let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender)); let start = Instant::now(); for _ in 0..samples { - let _response = connect_service.handle_connect(sample_ipv4_remote_addr(), sample_issue_time()); + let _response = connect_service.handle_connect(client_socket_addr, server_socket_addr, sample_issue_time()); } start.elapsed() diff --git a/packages/udp-tracker-core/src/services/announce.rs b/packages/udp-tracker-core/src/services/announce.rs index 698f5fba6..22bc05a9e 100644 --- a/packages/udp-tracker-core/src/services/announce.rs +++ b/packages/udp-tracker-core/src/services/announce.rs @@ -21,6 +21,7 @@ use torrust_tracker_primitives::core::AnnounceData; use crate::connection_cookie::{check, gen_remote_fingerprint, ConnectionCookieError}; use crate::statistics; +use crate::statistics::event::ConnectionContext; /// The `AnnounceService` is responsible for handling the `announce` requests. /// @@ -57,17 +58,18 @@ impl AnnounceService { /// whitelist. pub async fn handle_announce( &self, - remote_addr: SocketAddr, + client_socket_addr: SocketAddr, + server_socket_addr: SocketAddr, request: &AnnounceRequest, cookie_valid_range: Range, ) -> Result { - Self::authenticate(remote_addr, request, cookie_valid_range)?; + Self::authenticate(client_socket_addr, request, cookie_valid_range)?; let info_hash = request.info_hash.into(); self.authorize(&info_hash).await?; - let remote_client_ip = remote_addr.ip(); + let remote_client_ip = client_socket_addr.ip(); let mut peer = peer_builder::from_request(request, &remote_client_ip); @@ -78,7 +80,7 @@ impl AnnounceService { .announce(&info_hash, &mut peer, &remote_client_ip, &peers_wanted) .await?; - self.send_stats_event(remote_client_ip).await; + self.send_stats_event(client_socket_addr, server_socket_addr).await; Ok(announce_data) } @@ -99,11 +101,15 @@ impl AnnounceService { self.whitelist_authorization.authorize(info_hash).await } - async fn send_stats_event(&self, peer_ip: IpAddr) { + async fn send_stats_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) { if let Some(udp_stats_event_sender) = self.opt_udp_core_stats_event_sender.as_deref() { - let event = match peer_ip { - IpAddr::V4(_) => statistics::event::Event::Udp4Announce, - IpAddr::V6(_) => statistics::event::Event::Udp6Announce, + let event = match client_socket_addr.ip() { + IpAddr::V4(_) => statistics::event::Event::Udp4Announce { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + }, + IpAddr::V6(_) => statistics::event::Event::Udp6Announce { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + }, }; udp_stats_event_sender.send_event(event).await; diff --git a/packages/udp-tracker-core/src/services/connect.rs b/packages/udp-tracker-core/src/services/connect.rs index 14a3068e4..5309a79d3 100644 --- a/packages/udp-tracker-core/src/services/connect.rs +++ b/packages/udp-tracker-core/src/services/connect.rs @@ -8,6 +8,7 @@ use aquatic_udp_protocol::ConnectionId; use crate::connection_cookie::{gen_remote_fingerprint, make}; use crate::statistics; +use crate::statistics::event::ConnectionContext; /// The `ConnectService` is responsible for handling the `connect` requests. /// @@ -30,16 +31,30 @@ impl ConnectService { /// # Panics /// /// It will panic if there was an error making the connection cookie. - pub async fn handle_connect(&self, remote_addr: SocketAddr, cookie_issue_time: f64) -> ConnectionId { - let connection_id = make(gen_remote_fingerprint(&remote_addr), cookie_issue_time).expect("it should be a normal value"); + pub async fn handle_connect( + &self, + client_socket_addr: SocketAddr, + server_socket_addr: SocketAddr, + cookie_issue_time: f64, + ) -> ConnectionId { + let connection_id = + make(gen_remote_fingerprint(&client_socket_addr), cookie_issue_time).expect("it should be a normal value"); if let Some(udp_stats_event_sender) = self.opt_udp_core_stats_event_sender.as_deref() { - match remote_addr { + match client_socket_addr { SocketAddr::V4(_) => { - udp_stats_event_sender.send_event(statistics::event::Event::Udp4Connect).await; + udp_stats_event_sender + .send_event(statistics::event::Event::Udp4Connect { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + }) + .await; } SocketAddr::V6(_) => { - udp_stats_event_sender.send_event(statistics::event::Event::Udp6Connect).await; + udp_stats_event_sender + .send_event(statistics::event::Event::Udp6Connect { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + }) + .await; } } } @@ -54,6 +69,7 @@ mod tests { mod connect_request { use std::future; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use mockall::predicate::eq; @@ -65,16 +81,19 @@ mod tests { sample_ipv6_remote_addr_fingerprint, sample_issue_time, MockUdpCoreStatsEventSender, }; use crate::statistics; + use crate::statistics::event::ConnectionContext; #[tokio::test] async fn a_connect_response_should_contain_the_same_transaction_id_as_the_connect_request() { + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false); let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender)); let response = connect_service - .handle_connect(sample_ipv4_remote_addr(), sample_issue_time()) + .handle_connect(sample_ipv4_remote_addr(), server_socket_addr, sample_issue_time()) .await; assert_eq!( @@ -85,13 +104,15 @@ mod tests { #[tokio::test] async fn a_connect_response_should_contain_a_new_connection_id() { + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false); let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender)); let response = connect_service - .handle_connect(sample_ipv4_remote_addr(), sample_issue_time()) + .handle_connect(sample_ipv4_remote_addr(), server_socket_addr, sample_issue_time()) .await; assert_eq!( @@ -102,13 +123,16 @@ mod tests { #[tokio::test] async fn a_connect_response_should_contain_a_new_connection_id_ipv6() { + let client_socket_addr = sample_ipv6_remote_addr(); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false); let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender)); let response = connect_service - .handle_connect(sample_ipv6_remote_addr(), sample_issue_time()) + .handle_connect(client_socket_addr, server_socket_addr, sample_issue_time()) .await; assert_eq!( @@ -119,30 +143,38 @@ mod tests { #[tokio::test] async fn it_should_send_the_upd4_connect_event_when_a_client_tries_to_connect_using_a_ip4_socket_address() { + let client_socket_addr = sample_ipv4_socket_address(); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let mut udp_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); udp_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::Udp4Connect)) + .with(eq(statistics::event::Event::Udp4Connect { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); let opt_udp_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_stats_event_sender_mock))); - let client_socket_address = sample_ipv4_socket_address(); - let connect_service = Arc::new(ConnectService::new(opt_udp_stats_event_sender)); connect_service - .handle_connect(client_socket_address, sample_issue_time()) + .handle_connect(client_socket_addr, server_socket_addr, sample_issue_time()) .await; } #[tokio::test] async fn it_should_send_the_upd6_connect_event_when_a_client_tries_to_connect_using_a_ip6_socket_address() { + let client_socket_addr = sample_ipv6_remote_addr(); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let mut udp_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); udp_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::Udp6Connect)) + .with(eq(statistics::event::Event::Udp6Connect { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); let opt_udp_stats_event_sender: Arc>> = @@ -151,7 +183,7 @@ mod tests { let connect_service = Arc::new(ConnectService::new(opt_udp_stats_event_sender)); connect_service - .handle_connect(sample_ipv6_remote_addr(), sample_issue_time()) + .handle_connect(client_socket_addr, server_socket_addr, sample_issue_time()) .await; } } diff --git a/packages/udp-tracker-core/src/services/scrape.rs b/packages/udp-tracker-core/src/services/scrape.rs index 61301cd43..0f1ab14d8 100644 --- a/packages/udp-tracker-core/src/services/scrape.rs +++ b/packages/udp-tracker-core/src/services/scrape.rs @@ -19,6 +19,7 @@ use torrust_tracker_primitives::core::ScrapeData; use crate::connection_cookie::{check, gen_remote_fingerprint, ConnectionCookieError}; use crate::statistics; +use crate::statistics::event::ConnectionContext; /// The `ScrapeService` is responsible for handling the `scrape` requests. /// @@ -49,18 +50,19 @@ impl ScrapeService { /// It will return an error if the tracker core scrape handler returns an error. pub async fn handle_scrape( &self, - remote_client_addr: SocketAddr, + client_socket_addr: SocketAddr, + server_socket_addr: SocketAddr, request: &ScrapeRequest, cookie_valid_range: Range, ) -> Result { - Self::authenticate(remote_client_addr, request, cookie_valid_range)?; + Self::authenticate(client_socket_addr, request, cookie_valid_range)?; let scrape_data = self .scrape_handler .scrape(&Self::convert_from_aquatic(&request.info_hashes)) .await?; - self.send_stats_event(remote_client_addr).await; + self.send_stats_event(client_socket_addr, server_socket_addr).await; Ok(scrape_data) } @@ -81,11 +83,15 @@ impl ScrapeService { aquatic_infohashes.iter().map(|&x| x.into()).collect() } - async fn send_stats_event(&self, remote_addr: SocketAddr) { + async fn send_stats_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) { if let Some(udp_stats_event_sender) = self.opt_udp_stats_event_sender.as_deref() { - let event = match remote_addr { - SocketAddr::V4(_) => statistics::event::Event::Udp4Scrape, - SocketAddr::V6(_) => statistics::event::Event::Udp6Scrape, + let event = match client_socket_addr { + SocketAddr::V4(_) => statistics::event::Event::Udp4Scrape { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + }, + SocketAddr::V6(_) => statistics::event::Event::Udp6Scrape { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + }, }; udp_stats_event_sender.send_event(event).await; } diff --git a/packages/udp-tracker-core/src/statistics/event/handler.rs b/packages/udp-tracker-core/src/statistics/event/handler.rs index 096059b91..1f8a64a88 100644 --- a/packages/udp-tracker-core/src/statistics/event/handler.rs +++ b/packages/udp-tracker-core/src/statistics/event/handler.rs @@ -1,29 +1,62 @@ use crate::statistics::event::Event; use crate::statistics::repository::Repository; +/// # Panics +/// +/// This function panics if the IP version does not match the event type. pub async fn handle_event(event: Event, stats_repository: &Repository) { match event { // UDP4 - Event::Udp4Connect => { - stats_repository.increase_udp4_connections().await; - } - Event::Udp4Announce => { - stats_repository.increase_udp4_announces().await; - } - Event::Udp4Scrape => { - stats_repository.increase_udp4_scrapes().await; - } + Event::Udp4Connect { context } => match context.client_socket_addr.ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_connections().await; + } + std::net::IpAddr::V6(_) => { + panic!("IP Version 6 does not match the event type for connect"); + } + }, + Event::Udp4Announce { context } => match context.client_socket_addr.ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_announces().await; + } + std::net::IpAddr::V6(_) => { + panic!("IP Version 6 does not match the event type for announce"); + } + }, + Event::Udp4Scrape { context } => match context.client_socket_addr.ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_scrapes().await; + } + std::net::IpAddr::V6(_) => { + panic!("IP Version 6 does not match the event type for scrape"); + } + }, // UDP6 - Event::Udp6Connect => { - stats_repository.increase_udp6_connections().await; - } - Event::Udp6Announce => { - stats_repository.increase_udp6_announces().await; - } - Event::Udp6Scrape => { - stats_repository.increase_udp6_scrapes().await; - } + Event::Udp6Connect { context } => match context.client_socket_addr.ip() { + std::net::IpAddr::V4(_) => { + panic!("IP Version 4 does not match the event type for connect"); + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_connections().await; + } + }, + Event::Udp6Announce { context } => match context.client_socket_addr.ip() { + std::net::IpAddr::V4(_) => { + panic!("IP Version 4 does not match the event type for announce"); + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_announces().await; + } + }, + Event::Udp6Scrape { context } => match context.client_socket_addr.ip() { + std::net::IpAddr::V4(_) => { + panic!("IP Version 4 does not match the event type for scrape"); + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_scrapes().await; + } + }, } tracing::debug!("stats: {:?}", stats_repository.get_stats().await); @@ -31,15 +64,26 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { #[cfg(test)] mod tests { + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + use crate::statistics::event::handler::handle_event; - use crate::statistics::event::Event; + use crate::statistics::event::{ConnectionContext, Event}; use crate::statistics::repository::Repository; #[tokio::test] async fn should_increase_the_udp4_connections_counter_when_it_receives_a_udp4_connect_event() { let stats_repository = Repository::new(); - handle_event(Event::Udp4Connect, &stats_repository).await; + handle_event( + Event::Udp4Connect { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ), + }, + &stats_repository, + ) + .await; let stats = stats_repository.get_stats().await; @@ -50,7 +94,16 @@ mod tests { async fn should_increase_the_udp4_announces_counter_when_it_receives_a_udp4_announce_event() { let stats_repository = Repository::new(); - handle_event(Event::Udp4Announce, &stats_repository).await; + handle_event( + Event::Udp4Announce { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ), + }, + &stats_repository, + ) + .await; let stats = stats_repository.get_stats().await; @@ -61,7 +114,16 @@ mod tests { async fn should_increase_the_udp4_scrapes_counter_when_it_receives_a_udp4_scrape_event() { let stats_repository = Repository::new(); - handle_event(Event::Udp4Scrape, &stats_repository).await; + handle_event( + Event::Udp4Scrape { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ), + }, + &stats_repository, + ) + .await; let stats = stats_repository.get_stats().await; @@ -72,7 +134,16 @@ mod tests { async fn should_increase_the_udp6_connections_counter_when_it_receives_a_udp6_connect_event() { let stats_repository = Repository::new(); - handle_event(Event::Udp6Connect, &stats_repository).await; + handle_event( + Event::Udp6Connect { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ), + }, + &stats_repository, + ) + .await; let stats = stats_repository.get_stats().await; @@ -83,7 +154,16 @@ mod tests { async fn should_increase_the_udp6_announces_counter_when_it_receives_a_udp6_announce_event() { let stats_repository = Repository::new(); - handle_event(Event::Udp6Announce, &stats_repository).await; + handle_event( + Event::Udp6Announce { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ), + }, + &stats_repository, + ) + .await; let stats = stats_repository.get_stats().await; @@ -94,7 +174,16 @@ mod tests { async fn should_increase_the_udp6_scrapes_counter_when_it_receives_a_udp6_scrape_event() { let stats_repository = Repository::new(); - handle_event(Event::Udp6Scrape, &stats_repository).await; + handle_event( + Event::Udp6Scrape { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ), + }, + &stats_repository, + ) + .await; let stats = stats_repository.get_stats().await; diff --git a/packages/udp-tracker-core/src/statistics/event/mod.rs b/packages/udp-tracker-core/src/statistics/event/mod.rs index bfc733657..f460f0113 100644 --- a/packages/udp-tracker-core/src/statistics/event/mod.rs +++ b/packages/udp-tracker-core/src/statistics/event/mod.rs @@ -1,23 +1,36 @@ +use std::net::SocketAddr; + pub mod handler; pub mod listener; pub mod sender; /// An statistics event. It is used to collect tracker metrics. /// -/// - `Tcp` prefix means the event was triggered by the HTTP tracker /// - `Udp` prefix means the event was triggered by the UDP tracker /// - `4` or `6` prefixes means the IP version used by the peer /// - Finally the event suffix is the type of request: `announce`, `scrape` or `connection` -/// -/// > NOTE: HTTP trackers do not use `connection` requests. #[derive(Debug, PartialEq, Eq)] pub enum Event { - // code-review: consider one single event for request type with data: Event::Announce { scheme: HTTPorUDP, ip_version: V4orV6 } - // Attributes are enums too. - Udp4Connect, - Udp4Announce, - Udp4Scrape, - Udp6Connect, - Udp6Announce, - Udp6Scrape, + Udp4Connect { context: ConnectionContext }, + Udp4Announce { context: ConnectionContext }, + Udp4Scrape { context: ConnectionContext }, + Udp6Connect { context: ConnectionContext }, + Udp6Announce { context: ConnectionContext }, + Udp6Scrape { context: ConnectionContext }, +} + +#[derive(Debug, PartialEq, Eq)] +pub struct ConnectionContext { + client_socket_addr: SocketAddr, + server_socket_addr: SocketAddr, +} + +impl ConnectionContext { + #[must_use] + pub fn new(client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) -> Self { + Self { + client_socket_addr, + server_socket_addr, + } + } } diff --git a/packages/udp-tracker-core/src/statistics/keeper.rs b/packages/udp-tracker-core/src/statistics/keeper.rs index dac7e7541..9d0768e31 100644 --- a/packages/udp-tracker-core/src/statistics/keeper.rs +++ b/packages/udp-tracker-core/src/statistics/keeper.rs @@ -51,7 +51,9 @@ impl Keeper { #[cfg(test)] mod tests { - use crate::statistics::event::Event; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + use crate::statistics::event::{ConnectionContext, Event}; use crate::statistics::keeper::Keeper; use crate::statistics::metrics::Metrics; @@ -70,7 +72,14 @@ mod tests { let event_sender = stats_tracker.run_event_listener(); - let result = event_sender.send_event(Event::Udp4Connect).await; + let result = event_sender + .send_event(Event::Udp4Connect { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ), + }) + .await; assert!(result.is_some()); } diff --git a/packages/udp-tracker-server/src/handlers/announce.rs b/packages/udp-tracker-server/src/handlers/announce.rs index e56e1d831..d18a81329 100644 --- a/packages/udp-tracker-server/src/handlers/announce.rs +++ b/packages/udp-tracker-server/src/handlers/announce.rs @@ -26,7 +26,8 @@ use crate::statistics::event::UdpResponseKind; #[instrument(fields(transaction_id, connection_id, info_hash), skip(announce_service, opt_udp_server_stats_event_sender), ret(level = Level::TRACE))] pub async fn handle_announce( announce_service: &Arc, - remote_addr: SocketAddr, + client_socket_addr: SocketAddr, + server_socket_addr: SocketAddr, request: &AnnounceRequest, core_config: &Arc, opt_udp_server_stats_event_sender: &Arc>>, @@ -40,7 +41,7 @@ pub async fn handle_announce( tracing::trace!("handle announce"); if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { - match remote_addr.ip() { + match client_socket_addr.ip() { IpAddr::V4(_) => { udp_server_stats_event_sender .send_event(server_statistics::event::Event::Udp4Request { @@ -59,11 +60,11 @@ pub async fn handle_announce( } let announce_data = announce_service - .handle_announce(remote_addr, request, cookie_valid_range) + .handle_announce(client_socket_addr, server_socket_addr, request, cookie_valid_range) .await .map_err(|e| (e.into(), request.transaction_id))?; - Ok(build_response(remote_addr, request, core_config, &announce_data)) + Ok(build_response(client_socket_addr, request, core_config, &announce_data)) } fn build_response( @@ -237,10 +238,11 @@ mod tests { let info_hash = AquaticInfoHash([0u8; 20]); let peer_id = AquaticPeerId([255u8; 20]); - let remote_addr = SocketAddr::new(IpAddr::V4(client_ip), client_port); + let client_socket_addr = SocketAddr::new(IpAddr::V4(client_ip), client_port); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); let request = AnnounceRequestBuilder::default() - .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) + .with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap()) .with_info_hash(info_hash) .with_peer_id(peer_id) .with_ip_address(client_ip) @@ -249,7 +251,8 @@ mod tests { handle_announce( &core_udp_tracker_services.announce_service, - remote_addr, + client_socket_addr, + server_socket_addr, &request, &core_tracker_services.core_config, &server_udp_tracker_services.udp_server_stats_event_sender, @@ -276,15 +279,17 @@ mod tests { let (core_tracker_services, core_udp_tracker_services, server_udp_tracker_services) = initialize_core_tracker_services_for_public_tracker(); - let remote_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), 8080); + let client_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), 8080); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); let request = AnnounceRequestBuilder::default() - .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) + .with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap()) .into(); let response = handle_announce( &core_udp_tracker_services.announce_service, - remote_addr, + client_socket_addr, + server_socket_addr, &request, &core_tracker_services.core_config, &server_udp_tracker_services.udp_server_stats_event_sender, @@ -325,10 +330,11 @@ mod tests { let remote_client_port = 8081; let peer_address = Ipv4Addr::new(126, 0, 0, 2); - let remote_addr = SocketAddr::new(IpAddr::V4(remote_client_ip), remote_client_port); + let client_socket_addr = SocketAddr::new(IpAddr::V4(remote_client_ip), remote_client_port); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); let request = AnnounceRequestBuilder::default() - .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) + .with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap()) .with_info_hash(info_hash) .with_peer_id(peer_id) .with_ip_address(peer_address) @@ -337,7 +343,8 @@ mod tests { handle_announce( &core_udp_tracker_services.announce_service, - remote_addr, + client_socket_addr, + server_socket_addr, &request, &core_tracker_services.core_config, &server_udp_tracker_services.udp_server_stats_event_sender, @@ -381,14 +388,17 @@ mod tests { let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false); let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender); - let remote_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), 8080); + let client_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), 8080); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let request = AnnounceRequestBuilder::default() - .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) + .with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap()) .into(); handle_announce( &core_udp_tracker_services.announce_service, - remote_addr, + client_socket_addr, + server_socket_addr, &request, &core_tracker_services.core_config, &udp_server_stats_event_sender, @@ -433,9 +443,13 @@ mod tests { let (core_tracker_services, core_udp_tracker_services, _server_udp_tracker_services) = initialize_core_tracker_services_for_default_tracker_configuration(); + let client_socket_addr = sample_ipv4_socket_address(); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + handle_announce( &core_udp_tracker_services.announce_service, - sample_ipv4_socket_address(), + client_socket_addr, + server_socket_addr, &AnnounceRequestBuilder::default().into(), &core_tracker_services.core_config, &udp_server_stats_event_sender, @@ -469,10 +483,11 @@ mod tests { let info_hash = AquaticInfoHash([0u8; 20]); let peer_id = AquaticPeerId([255u8; 20]); - let remote_addr = SocketAddr::new(IpAddr::V4(client_ip), client_port); + let client_socket_addr = SocketAddr::new(IpAddr::V4(client_ip), client_port); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); let request = AnnounceRequestBuilder::default() - .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) + .with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap()) .with_info_hash(info_hash) .with_peer_id(peer_id) .with_ip_address(client_ip) @@ -481,7 +496,8 @@ mod tests { handle_announce( &core_udp_tracker_services.announce_service, - remote_addr, + client_socket_addr, + server_socket_addr, &request, &core_tracker_services.core_config, &server_udp_tracker_services.udp_server_stats_event_sender, @@ -510,7 +526,7 @@ mod tests { mod using_ipv6 { use std::future; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; use aquatic_udp_protocol::{ @@ -546,10 +562,11 @@ mod tests { let info_hash = AquaticInfoHash([0u8; 20]); let peer_id = AquaticPeerId([255u8; 20]); - let remote_addr = SocketAddr::new(IpAddr::V6(client_ip_v6), client_port); + let client_socket_addr = SocketAddr::new(IpAddr::V6(client_ip_v6), client_port); + let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969); let request = AnnounceRequestBuilder::default() - .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) + .with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap()) .with_info_hash(info_hash) .with_peer_id(peer_id) .with_ip_address(client_ip_v4) @@ -558,7 +575,8 @@ mod tests { handle_announce( &core_udp_tracker_services.announce_service, - remote_addr, + client_socket_addr, + server_socket_addr, &request, &core_tracker_services.core_config, &server_udp_tracker_services.udp_server_stats_event_sender, @@ -588,15 +606,17 @@ mod tests { let client_ip_v4 = Ipv4Addr::new(126, 0, 0, 1); let client_ip_v6 = client_ip_v4.to_ipv6_compatible(); - let remote_addr = SocketAddr::new(IpAddr::V6(client_ip_v6), 8080); + let client_socket_addr = SocketAddr::new(IpAddr::V6(client_ip_v6), 8080); + let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969); let request = AnnounceRequestBuilder::default() - .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) + .with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap()) .into(); let response = handle_announce( &core_udp_tracker_services.announce_service, - remote_addr, + client_socket_addr, + server_socket_addr, &request, &core_tracker_services.core_config, &server_udp_tracker_services.udp_server_stats_event_sender, @@ -637,10 +657,11 @@ mod tests { let remote_client_port = 8081; let peer_address = "126.0.0.1".parse().unwrap(); - let remote_addr = SocketAddr::new(IpAddr::V6(remote_client_ip), remote_client_port); + let client_socket_addr = SocketAddr::new(IpAddr::V6(remote_client_ip), remote_client_port); + let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969); let request = AnnounceRequestBuilder::default() - .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) + .with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap()) .with_info_hash(info_hash) .with_peer_id(peer_id) .with_ip_address(peer_address) @@ -649,7 +670,8 @@ mod tests { handle_announce( &core_udp_tracker_services.announce_service, - remote_addr, + client_socket_addr, + server_socket_addr, &request, &core_tracker_services.core_config, &server_udp_tracker_service.udp_server_stats_event_sender, @@ -697,9 +719,12 @@ mod tests { let client_ip_v4 = Ipv4Addr::new(126, 0, 0, 1); let client_ip_v6 = client_ip_v4.to_ipv6_compatible(); let client_port = 8080; - let remote_addr = SocketAddr::new(IpAddr::V6(client_ip_v6), client_port); + + let client_socket_addr = SocketAddr::new(IpAddr::V6(client_ip_v6), client_port); + let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969); + let request = AnnounceRequestBuilder::default() - .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) + .with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap()) .into(); let announce_service = Arc::new(AnnounceService::new( @@ -710,7 +735,8 @@ mod tests { handle_announce( &announce_service, - remote_addr, + client_socket_addr, + server_socket_addr, &request, &core_config, &udp_server_stats_event_sender, @@ -759,15 +785,17 @@ mod tests { let (core_tracker_services, core_udp_tracker_services, _server_udp_tracker_services) = initialize_core_tracker_services_for_default_tracker_configuration(); - let remote_addr = sample_ipv6_remote_addr(); + let client_socket_addr = sample_ipv6_remote_addr(); + let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969); let announce_request = AnnounceRequestBuilder::default() - .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) + .with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap()) .into(); handle_announce( &core_udp_tracker_services.announce_service, - remote_addr, + client_socket_addr, + server_socket_addr, &announce_request, &core_tracker_services.core_config, &udp_server_stats_event_sender, @@ -791,6 +819,7 @@ mod tests { use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist; use bittorrent_udp_tracker_core::connection_cookie::{gen_remote_fingerprint, make}; use bittorrent_udp_tracker_core::services::announce::AnnounceService; + use bittorrent_udp_tracker_core::statistics::event::ConnectionContext; use bittorrent_udp_tracker_core::{self, statistics as core_statistics}; use mockall::predicate::eq; @@ -807,6 +836,19 @@ mod tests { async fn the_peer_ip_should_be_changed_to_the_external_ip_in_the_tracker_configuration() { let config = Arc::new(TrackerConfigurationBuilder::default().with_external_ip("::126.0.0.1").into()); + let loopback_ipv4 = Ipv4Addr::new(127, 0, 0, 1); + let loopback_ipv6 = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1); + + let client_ip_v4 = loopback_ipv4; + let client_ip_v6 = loopback_ipv6; + let client_port = 8080; + + let info_hash = AquaticInfoHash([0u8; 20]); + let peer_id = AquaticPeerId([255u8; 20]); + + let client_socket_addr = SocketAddr::new(IpAddr::V6(client_ip_v6), client_port); + let server_socket_addr = config.udp_trackers.clone().unwrap()[0].bind_address; + let database = initialize_database(&config.core); let in_memory_whitelist = Arc::new(InMemoryWhitelist::default()); let whitelist_authorization = @@ -817,7 +859,9 @@ mod tests { let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); udp_core_stats_event_sender_mock .expect_send_event() - .with(eq(core_statistics::event::Event::Udp6Announce)) + .with(eq(core_statistics::event::Event::Udp6Announce { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); let udp_core_stats_event_sender: Arc>> = @@ -841,20 +885,8 @@ mod tests { &db_torrent_repository, )); - let loopback_ipv4 = Ipv4Addr::new(127, 0, 0, 1); - let loopback_ipv6 = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1); - - let client_ip_v4 = loopback_ipv4; - let client_ip_v6 = loopback_ipv6; - let client_port = 8080; - - let info_hash = AquaticInfoHash([0u8; 20]); - let peer_id = AquaticPeerId([255u8; 20]); - - let remote_addr = SocketAddr::new(IpAddr::V6(client_ip_v6), client_port); - let request = AnnounceRequestBuilder::default() - .with_connection_id(make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap()) + .with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap()) .with_info_hash(info_hash) .with_peer_id(peer_id) .with_ip_address(client_ip_v4) @@ -871,7 +903,8 @@ mod tests { handle_announce( &announce_service, - remote_addr, + client_socket_addr, + server_socket_addr, &request, &core_config, &udp_server_stats_event_sender, diff --git a/packages/udp-tracker-server/src/handlers/connect.rs b/packages/udp-tracker-server/src/handlers/connect.rs index 93d3bb6f1..e3070264d 100644 --- a/packages/udp-tracker-server/src/handlers/connect.rs +++ b/packages/udp-tracker-server/src/handlers/connect.rs @@ -13,6 +13,7 @@ use crate::statistics::event::UdpResponseKind; #[instrument(fields(transaction_id), skip(connect_service, opt_udp_server_stats_event_sender), ret(level = Level::TRACE))] pub async fn handle_connect( remote_addr: SocketAddr, + server_addr: SocketAddr, request: &ConnectRequest, connect_service: &Arc, opt_udp_server_stats_event_sender: &Arc>>, @@ -40,7 +41,9 @@ pub async fn handle_connect( } } - let connection_id = connect_service.handle_connect(remote_addr, cookie_issue_time).await; + let connection_id = connect_service + .handle_connect(remote_addr, server_addr, cookie_issue_time) + .await; build_response(*request, connection_id) } @@ -60,12 +63,14 @@ mod tests { mod connect_request { use std::future; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use aquatic_udp_protocol::{ConnectRequest, ConnectResponse, Response, TransactionId}; use bittorrent_udp_tracker_core::connection_cookie::make; use bittorrent_udp_tracker_core::services::connect::ConnectService; use bittorrent_udp_tracker_core::statistics as core_statistics; + use bittorrent_udp_tracker_core::statistics::event::ConnectionContext; use mockall::predicate::eq; use crate::handlers::handle_connect; @@ -84,6 +89,8 @@ mod tests { #[tokio::test] async fn a_connect_response_should_contain_the_same_transaction_id_as_the_connect_request() { + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let (udp_core_stats_event_sender, _udp_core_stats_repository) = bittorrent_udp_tracker_core::statistics::setup::factory(false); let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); @@ -99,6 +106,7 @@ mod tests { let response = handle_connect( sample_ipv4_remote_addr(), + server_socket_addr, &request, &connect_service, &udp_server_stats_event_sender, @@ -117,6 +125,8 @@ mod tests { #[tokio::test] async fn a_connect_response_should_contain_a_new_connection_id() { + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let (udp_core_stats_event_sender, _udp_core_stats_repository) = bittorrent_udp_tracker_core::statistics::setup::factory(false); let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); @@ -132,6 +142,7 @@ mod tests { let response = handle_connect( sample_ipv4_remote_addr(), + server_socket_addr, &request, &connect_service, &udp_server_stats_event_sender, @@ -150,6 +161,8 @@ mod tests { #[tokio::test] async fn a_connect_response_should_contain_a_new_connection_id_ipv6() { + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let (udp_core_stats_event_sender, _udp_core_stats_repository) = bittorrent_udp_tracker_core::statistics::setup::factory(false); let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); @@ -165,6 +178,7 @@ mod tests { let response = handle_connect( sample_ipv6_remote_addr(), + server_socket_addr, &request, &connect_service, &udp_server_stats_event_sender, @@ -183,10 +197,15 @@ mod tests { #[tokio::test] async fn it_should_send_the_upd4_connect_event_when_a_client_tries_to_connect_using_a_ip4_socket_address() { + let client_socket_addr = sample_ipv4_socket_address(); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); udp_core_stats_event_sender_mock .expect_send_event() - .with(eq(core_statistics::event::Event::Udp4Connect)) + .with(eq(core_statistics::event::Event::Udp4Connect { + context: core_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), + })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); let udp_core_stats_event_sender: Arc>> = @@ -203,12 +222,11 @@ mod tests { let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); - let client_socket_address = sample_ipv4_socket_address(); - let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender)); handle_connect( - client_socket_address, + client_socket_addr, + server_socket_addr, &sample_connect_request(), &connect_service, &udp_server_stats_event_sender, @@ -219,10 +237,15 @@ mod tests { #[tokio::test] async fn it_should_send_the_upd6_connect_event_when_a_client_tries_to_connect_using_a_ip6_socket_address() { + let client_socket_addr = sample_ipv6_remote_addr(); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); udp_core_stats_event_sender_mock .expect_send_event() - .with(eq(core_statistics::event::Event::Udp6Connect)) + .with(eq(core_statistics::event::Event::Udp6Connect { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); let udp_core_stats_event_sender: Arc>> = @@ -242,7 +265,8 @@ mod tests { let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender)); handle_connect( - sample_ipv6_remote_addr(), + client_socket_addr, + server_socket_addr, &sample_connect_request(), &connect_service, &udp_server_stats_event_sender, diff --git a/packages/udp-tracker-server/src/handlers/mod.rs b/packages/udp-tracker-server/src/handlers/mod.rs index 165b307e0..162af3020 100644 --- a/packages/udp-tracker-server/src/handlers/mod.rs +++ b/packages/udp-tracker-server/src/handlers/mod.rs @@ -58,7 +58,7 @@ pub(crate) async fn handle_packet( udp_request: RawRequest, udp_tracker_core_container: Arc, udp_tracker_server_container: Arc, - local_addr: SocketAddr, + server_socket_addr: SocketAddr, cookie_time_values: CookieTimeValues, ) -> Response { let request_id = Uuid::new_v4(); @@ -73,6 +73,7 @@ pub(crate) async fn handle_packet( Ok(request) => match handle_request( request, udp_request.from, + server_socket_addr, udp_tracker_core_container.clone(), udp_tracker_server_container.clone(), cookie_time_values.clone(), @@ -92,7 +93,7 @@ pub(crate) async fn handle_packet( handle_error( udp_request.from, - local_addr, + server_socket_addr, request_id, &udp_tracker_server_container.udp_server_stats_event_sender, cookie_time_values.valid_range.clone(), @@ -105,7 +106,7 @@ pub(crate) async fn handle_packet( Err(e) => { handle_error( udp_request.from, - local_addr, + server_socket_addr, request_id, &udp_tracker_server_container.udp_server_stats_event_sender, cookie_time_values.valid_range.clone(), @@ -129,14 +130,16 @@ pub(crate) async fn handle_packet( /// If a error happens in the `handle_request` function, it will just return the `ServerError`. #[instrument(skip( request, - remote_addr, + client_socket_addr, + server_socket_addr, udp_tracker_core_container, udp_tracker_server_container, cookie_time_values ))] pub async fn handle_request( request: Request, - remote_addr: SocketAddr, + client_socket_addr: SocketAddr, + server_socket_addr: SocketAddr, udp_tracker_core_container: Arc, udp_tracker_server_container: Arc, cookie_time_values: CookieTimeValues, @@ -145,7 +148,8 @@ pub async fn handle_request( match request { Request::Connect(connect_request) => Ok(handle_connect( - remote_addr, + client_socket_addr, + server_socket_addr, &connect_request, &udp_tracker_core_container.connect_service, &udp_tracker_server_container.udp_server_stats_event_sender, @@ -155,7 +159,8 @@ pub async fn handle_request( Request::Announce(announce_request) => { handle_announce( &udp_tracker_core_container.announce_service, - remote_addr, + client_socket_addr, + server_socket_addr, &announce_request, &udp_tracker_core_container.core_config, &udp_tracker_server_container.udp_server_stats_event_sender, @@ -166,7 +171,8 @@ pub async fn handle_request( Request::Scrape(scrape_request) => { handle_scrape( &udp_tracker_core_container.scrape_service, - remote_addr, + client_socket_addr, + server_socket_addr, &scrape_request, &udp_tracker_server_container.udp_server_stats_event_sender, cookie_time_values.valid_range, diff --git a/packages/udp-tracker-server/src/handlers/scrape.rs b/packages/udp-tracker-server/src/handlers/scrape.rs index c385718a2..e820b2e96 100644 --- a/packages/udp-tracker-server/src/handlers/scrape.rs +++ b/packages/udp-tracker-server/src/handlers/scrape.rs @@ -24,7 +24,8 @@ use crate::statistics::event::UdpResponseKind; #[instrument(fields(transaction_id, connection_id), skip(scrape_service, opt_udp_server_stats_event_sender), ret(level = Level::TRACE))] pub async fn handle_scrape( scrape_service: &Arc, - remote_addr: SocketAddr, + client_socket_addr: SocketAddr, + server_socket_addr: SocketAddr, request: &ScrapeRequest, opt_udp_server_stats_event_sender: &Arc>>, cookie_valid_range: Range, @@ -36,7 +37,7 @@ pub async fn handle_scrape( tracing::trace!("handle scrape"); if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { - match remote_addr.ip() { + match client_socket_addr.ip() { IpAddr::V4(_) => { udp_server_stats_event_sender .send_event(server_statistics::event::Event::Udp4Request { @@ -55,7 +56,7 @@ pub async fn handle_scrape( } let scrape_data = scrape_service - .handle_scrape(remote_addr, request, cookie_valid_range) + .handle_scrape(client_socket_addr, server_socket_addr, request, cookie_valid_range) .await .map_err(|e| (e.into(), request.transaction_id))?; @@ -92,7 +93,7 @@ fn build_response(request: &ScrapeRequest, scrape_data: &ScrapeData) -> Response mod tests { mod scrape_request { - use std::net::SocketAddr; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use aquatic_udp_protocol::{ @@ -121,20 +122,22 @@ mod tests { let (_core_tracker_services, core_udp_tracker_services, server_udp_tracker_services) = initialize_core_tracker_services_for_public_tracker(); - let remote_addr = sample_ipv4_remote_addr(); + let client_socket_addr = sample_ipv4_remote_addr(); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); let info_hash = InfoHash([0u8; 20]); let info_hashes = vec![info_hash]; let request = ScrapeRequest { - connection_id: make(gen_remote_fingerprint(&remote_addr), sample_issue_time()).unwrap(), + connection_id: make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap(), transaction_id: TransactionId(0i32.into()), info_hashes, }; let response = handle_scrape( &core_udp_tracker_services.scrape_service, - remote_addr, + client_socket_addr, + server_socket_addr, &request, &server_udp_tracker_services.udp_server_stats_event_sender, sample_cookie_valid_range(), @@ -186,21 +189,24 @@ mod tests { let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false); let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender); - let remote_addr = sample_ipv4_remote_addr(); + let client_socket_addr = sample_ipv4_remote_addr(); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let info_hash = InfoHash([0u8; 20]); add_a_seeder( core_tracker_services.in_memory_torrent_repository.clone(), - &remote_addr, + &client_socket_addr, &info_hash, ) .await; - let request = build_scrape_request(&remote_addr, &info_hash); + let request = build_scrape_request(&client_socket_addr, &info_hash); handle_scrape( &core_udp_tracker_services.scrape_service, - remote_addr, + client_socket_addr, + server_socket_addr, &request, &udp_server_stats_event_sender, sample_cookie_valid_range(), @@ -242,6 +248,8 @@ mod tests { } mod with_a_whitelisted_tracker { + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use aquatic_udp_protocol::{InfoHash, NumberOfDownloads, NumberOfPeers, TorrentScrapeStatistics}; use crate::handlers::handle_scrape; @@ -257,24 +265,27 @@ mod tests { let (core_tracker_services, core_udp_tracker_services, server_udp_tracker_services) = initialize_core_tracker_services_for_listed_tracker(); - let remote_addr = sample_ipv4_remote_addr(); + let client_socket_addr = sample_ipv4_remote_addr(); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let info_hash = InfoHash([0u8; 20]); add_a_seeder( core_tracker_services.in_memory_torrent_repository.clone(), - &remote_addr, + &client_socket_addr, &info_hash, ) .await; core_tracker_services.in_memory_whitelist.add(&info_hash.0.into()).await; - let request = build_scrape_request(&remote_addr, &info_hash); + let request = build_scrape_request(&client_socket_addr, &info_hash); let torrent_stats = match_scrape_response( handle_scrape( &core_udp_tracker_services.scrape_service, - remote_addr, + client_socket_addr, + server_socket_addr, &request, &server_udp_tracker_services.udp_server_stats_event_sender, sample_cookie_valid_range(), @@ -298,22 +309,25 @@ mod tests { let (core_tracker_services, core_udp_tracker_services, server_udp_tracker_services) = initialize_core_tracker_services_for_listed_tracker(); - let remote_addr = sample_ipv4_remote_addr(); + let client_socket_addr = sample_ipv4_remote_addr(); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let info_hash = InfoHash([0u8; 20]); add_a_seeder( core_tracker_services.in_memory_torrent_repository.clone(), - &remote_addr, + &client_socket_addr, &info_hash, ) .await; - let request = build_scrape_request(&remote_addr, &info_hash); + let request = build_scrape_request(&client_socket_addr, &info_hash); let torrent_stats = match_scrape_response( handle_scrape( &core_udp_tracker_services.scrape_service, - remote_addr, + client_socket_addr, + server_socket_addr, &request, &server_udp_tracker_services.udp_server_stats_event_sender, sample_cookie_valid_range(), @@ -342,6 +356,7 @@ mod tests { mod using_ipv4 { use std::future; + use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::sync::Arc; use mockall::predicate::eq; @@ -367,15 +382,17 @@ mod tests { let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); - let remote_addr = sample_ipv4_remote_addr(); + let client_socket_addr = sample_ipv4_remote_addr(); + let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969); let (_core_tracker_services, core_udp_tracker_services, _server_udp_tracker_services) = initialize_core_tracker_services_for_default_tracker_configuration(); handle_scrape( &core_udp_tracker_services.scrape_service, - remote_addr, - &sample_scrape_request(&remote_addr), + client_socket_addr, + server_socket_addr, + &sample_scrape_request(&client_socket_addr), &udp_server_stats_event_sender, sample_cookie_valid_range(), ) @@ -386,6 +403,7 @@ mod tests { mod using_ipv6 { use std::future; + use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::sync::Arc; use mockall::predicate::eq; @@ -411,15 +429,17 @@ mod tests { let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); - let remote_addr = sample_ipv6_remote_addr(); + let client_socket_addr = sample_ipv6_remote_addr(); + let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969); let (_core_tracker_services, core_udp_tracker_services, _server_udp_tracker_services) = initialize_core_tracker_services_for_default_tracker_configuration(); handle_scrape( &core_udp_tracker_services.scrape_service, - remote_addr, - &sample_scrape_request(&remote_addr), + client_socket_addr, + server_socket_addr, + &sample_scrape_request(&client_socket_addr), &udp_server_stats_event_sender, sample_cookie_valid_range(), ) From 8603f8b871bb10c3d86449d9ff471d1af5d26c92 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 17 Mar 2025 17:25:41 +0000 Subject: [PATCH 2/2] refactor: [#1380] refactor: [#1373] merge UDP stats events with different IP version --- .../udp-tracker-core/src/services/announce.rs | 15 ++---- .../udp-tracker-core/src/services/connect.rs | 25 +++------- .../udp-tracker-core/src/services/scrape.rs | 12 ++--- .../src/statistics/event/handler.rs | 49 +++++-------------- .../src/statistics/event/mod.rs | 22 +++++---- .../udp-tracker-core/src/statistics/keeper.rs | 2 +- .../src/handlers/announce.rs | 2 +- .../src/handlers/connect.rs | 4 +- 8 files changed, 44 insertions(+), 87 deletions(-) diff --git a/packages/udp-tracker-core/src/services/announce.rs b/packages/udp-tracker-core/src/services/announce.rs index 22bc05a9e..f745a90fd 100644 --- a/packages/udp-tracker-core/src/services/announce.rs +++ b/packages/udp-tracker-core/src/services/announce.rs @@ -7,7 +7,7 @@ //! //! It also sends an [`udp_tracker_core::statistics::event::Event`] //! because events are specific for the HTTP tracker. -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use std::ops::Range; use std::sync::Arc; @@ -103,16 +103,11 @@ impl AnnounceService { async fn send_stats_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) { if let Some(udp_stats_event_sender) = self.opt_udp_core_stats_event_sender.as_deref() { - let event = match client_socket_addr.ip() { - IpAddr::V4(_) => statistics::event::Event::Udp4Announce { + udp_stats_event_sender + .send_event(statistics::event::Event::UdpAnnounce { context: ConnectionContext::new(client_socket_addr, server_socket_addr), - }, - IpAddr::V6(_) => statistics::event::Event::Udp6Announce { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), - }, - }; - - udp_stats_event_sender.send_event(event).await; + }) + .await; } } } diff --git a/packages/udp-tracker-core/src/services/connect.rs b/packages/udp-tracker-core/src/services/connect.rs index 5309a79d3..c3c2459cd 100644 --- a/packages/udp-tracker-core/src/services/connect.rs +++ b/packages/udp-tracker-core/src/services/connect.rs @@ -41,22 +41,11 @@ impl ConnectService { make(gen_remote_fingerprint(&client_socket_addr), cookie_issue_time).expect("it should be a normal value"); if let Some(udp_stats_event_sender) = self.opt_udp_core_stats_event_sender.as_deref() { - match client_socket_addr { - SocketAddr::V4(_) => { - udp_stats_event_sender - .send_event(statistics::event::Event::Udp4Connect { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), - }) - .await; - } - SocketAddr::V6(_) => { - udp_stats_event_sender - .send_event(statistics::event::Event::Udp6Connect { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), - }) - .await; - } - } + udp_stats_event_sender + .send_event(statistics::event::Event::UdpConnect { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + }) + .await; } connection_id @@ -149,7 +138,7 @@ mod tests { let mut udp_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); udp_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::Udp4Connect { + .with(eq(statistics::event::Event::UdpConnect { context: ConnectionContext::new(client_socket_addr, server_socket_addr), })) .times(1) @@ -172,7 +161,7 @@ mod tests { let mut udp_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); udp_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::Udp6Connect { + .with(eq(statistics::event::Event::UdpConnect { context: ConnectionContext::new(client_socket_addr, server_socket_addr), })) .times(1) diff --git a/packages/udp-tracker-core/src/services/scrape.rs b/packages/udp-tracker-core/src/services/scrape.rs index 0f1ab14d8..446c1182f 100644 --- a/packages/udp-tracker-core/src/services/scrape.rs +++ b/packages/udp-tracker-core/src/services/scrape.rs @@ -85,15 +85,11 @@ impl ScrapeService { async fn send_stats_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) { if let Some(udp_stats_event_sender) = self.opt_udp_stats_event_sender.as_deref() { - let event = match client_socket_addr { - SocketAddr::V4(_) => statistics::event::Event::Udp4Scrape { + udp_stats_event_sender + .send_event(statistics::event::Event::UdpScrape { context: ConnectionContext::new(client_socket_addr, server_socket_addr), - }, - SocketAddr::V6(_) => statistics::event::Event::Udp6Scrape { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), - }, - }; - udp_stats_event_sender.send_event(event).await; + }) + .await; } } } diff --git a/packages/udp-tracker-core/src/statistics/event/handler.rs b/packages/udp-tracker-core/src/statistics/event/handler.rs index 1f8a64a88..98860592f 100644 --- a/packages/udp-tracker-core/src/statistics/event/handler.rs +++ b/packages/udp-tracker-core/src/statistics/event/handler.rs @@ -6,52 +6,25 @@ use crate::statistics::repository::Repository; /// This function panics if the IP version does not match the event type. pub async fn handle_event(event: Event, stats_repository: &Repository) { match event { - // UDP4 - Event::Udp4Connect { context } => match context.client_socket_addr.ip() { + Event::UdpConnect { context } => match context.client_socket_addr.ip() { std::net::IpAddr::V4(_) => { stats_repository.increase_udp4_connections().await; } - std::net::IpAddr::V6(_) => { - panic!("IP Version 6 does not match the event type for connect"); - } - }, - Event::Udp4Announce { context } => match context.client_socket_addr.ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_announces().await; - } - std::net::IpAddr::V6(_) => { - panic!("IP Version 6 does not match the event type for announce"); - } - }, - Event::Udp4Scrape { context } => match context.client_socket_addr.ip() { - std::net::IpAddr::V4(_) => { - stats_repository.increase_udp4_scrapes().await; - } - std::net::IpAddr::V6(_) => { - panic!("IP Version 6 does not match the event type for scrape"); - } - }, - - // UDP6 - Event::Udp6Connect { context } => match context.client_socket_addr.ip() { - std::net::IpAddr::V4(_) => { - panic!("IP Version 4 does not match the event type for connect"); - } std::net::IpAddr::V6(_) => { stats_repository.increase_udp6_connections().await; } }, - Event::Udp6Announce { context } => match context.client_socket_addr.ip() { + Event::UdpAnnounce { context } => match context.client_socket_addr.ip() { std::net::IpAddr::V4(_) => { - panic!("IP Version 4 does not match the event type for announce"); + stats_repository.increase_udp4_announces().await; } std::net::IpAddr::V6(_) => { stats_repository.increase_udp6_announces().await; } }, - Event::Udp6Scrape { context } => match context.client_socket_addr.ip() { + Event::UdpScrape { context } => match context.client_socket_addr.ip() { std::net::IpAddr::V4(_) => { - panic!("IP Version 4 does not match the event type for scrape"); + stats_repository.increase_udp4_scrapes().await; } std::net::IpAddr::V6(_) => { stats_repository.increase_udp6_scrapes().await; @@ -75,7 +48,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp4Connect { + Event::UdpConnect { context: ConnectionContext::new( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), @@ -95,7 +68,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp4Announce { + Event::UdpAnnounce { context: ConnectionContext::new( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), @@ -115,7 +88,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp4Scrape { + Event::UdpScrape { context: ConnectionContext::new( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), @@ -135,7 +108,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp6Connect { + Event::UdpConnect { context: ConnectionContext::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), @@ -155,7 +128,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp6Announce { + Event::UdpAnnounce { context: ConnectionContext::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), @@ -175,7 +148,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp6Scrape { + Event::UdpScrape { context: ConnectionContext::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), diff --git a/packages/udp-tracker-core/src/statistics/event/mod.rs b/packages/udp-tracker-core/src/statistics/event/mod.rs index f460f0113..05de5d118 100644 --- a/packages/udp-tracker-core/src/statistics/event/mod.rs +++ b/packages/udp-tracker-core/src/statistics/event/mod.rs @@ -6,17 +6,13 @@ pub mod sender; /// An statistics event. It is used to collect tracker metrics. /// -/// - `Udp` prefix means the event was triggered by the UDP tracker -/// - `4` or `6` prefixes means the IP version used by the peer -/// - Finally the event suffix is the type of request: `announce`, `scrape` or `connection` +/// - `Udp` prefix means the event was triggered by the UDP tracker. +/// - The event suffix is the type of request: `announce`, `scrape` or `connection`. #[derive(Debug, PartialEq, Eq)] pub enum Event { - Udp4Connect { context: ConnectionContext }, - Udp4Announce { context: ConnectionContext }, - Udp4Scrape { context: ConnectionContext }, - Udp6Connect { context: ConnectionContext }, - Udp6Announce { context: ConnectionContext }, - Udp6Scrape { context: ConnectionContext }, + UdpConnect { context: ConnectionContext }, + UdpAnnounce { context: ConnectionContext }, + UdpScrape { context: ConnectionContext }, } #[derive(Debug, PartialEq, Eq)] @@ -33,4 +29,12 @@ impl ConnectionContext { server_socket_addr, } } + + pub fn client_socket_addr(&self) -> SocketAddr { + self.client_socket_addr + } + + pub fn server_socket_addr(&self) -> SocketAddr { + self.server_socket_addr + } } diff --git a/packages/udp-tracker-core/src/statistics/keeper.rs b/packages/udp-tracker-core/src/statistics/keeper.rs index 9d0768e31..e46e634e8 100644 --- a/packages/udp-tracker-core/src/statistics/keeper.rs +++ b/packages/udp-tracker-core/src/statistics/keeper.rs @@ -73,7 +73,7 @@ mod tests { let event_sender = stats_tracker.run_event_listener(); let result = event_sender - .send_event(Event::Udp4Connect { + .send_event(Event::UdpConnect { context: ConnectionContext::new( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), diff --git a/packages/udp-tracker-server/src/handlers/announce.rs b/packages/udp-tracker-server/src/handlers/announce.rs index d18a81329..a0aabb765 100644 --- a/packages/udp-tracker-server/src/handlers/announce.rs +++ b/packages/udp-tracker-server/src/handlers/announce.rs @@ -859,7 +859,7 @@ mod tests { let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); udp_core_stats_event_sender_mock .expect_send_event() - .with(eq(core_statistics::event::Event::Udp6Announce { + .with(eq(core_statistics::event::Event::UdpAnnounce { context: ConnectionContext::new(client_socket_addr, server_socket_addr), })) .times(1) diff --git a/packages/udp-tracker-server/src/handlers/connect.rs b/packages/udp-tracker-server/src/handlers/connect.rs index e3070264d..bac3d7961 100644 --- a/packages/udp-tracker-server/src/handlers/connect.rs +++ b/packages/udp-tracker-server/src/handlers/connect.rs @@ -203,7 +203,7 @@ mod tests { let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); udp_core_stats_event_sender_mock .expect_send_event() - .with(eq(core_statistics::event::Event::Udp4Connect { + .with(eq(core_statistics::event::Event::UdpConnect { context: core_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), })) .times(1) @@ -243,7 +243,7 @@ mod tests { let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); udp_core_stats_event_sender_mock .expect_send_event() - .with(eq(core_statistics::event::Event::Udp6Connect { + .with(eq(core_statistics::event::Event::UdpConnect { context: ConnectionContext::new(client_socket_addr, server_socket_addr), })) .times(1)