From 74ffa4cfd2549fdc28484f3c4cb8844eaf6c61cf Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 17 Mar 2025 18:30:09 +0000 Subject: [PATCH 1/6] refactor: [#1382] error request kind in UDP req does not make sense --- .../src/statistics/event/mod.rs | 2 + .../src/handlers/announce.rs | 18 ++--- .../src/handlers/connect.rs | 12 ++-- .../udp-tracker-server/src/handlers/scrape.rs | 10 +-- .../src/server/processor.rs | 12 +++- .../src/statistics/event/handler.rs | 70 ++++++++++--------- .../src/statistics/event/mod.rs | 13 ++-- 7 files changed, 77 insertions(+), 60 deletions(-) diff --git a/packages/udp-tracker-core/src/statistics/event/mod.rs b/packages/udp-tracker-core/src/statistics/event/mod.rs index 05de5d118..216562506 100644 --- a/packages/udp-tracker-core/src/statistics/event/mod.rs +++ b/packages/udp-tracker-core/src/statistics/event/mod.rs @@ -30,10 +30,12 @@ impl ConnectionContext { } } + #[must_use] pub fn client_socket_addr(&self) -> SocketAddr { self.client_socket_addr } + #[must_use] pub fn server_socket_addr(&self) -> SocketAddr { self.server_socket_addr } diff --git a/packages/udp-tracker-server/src/handlers/announce.rs b/packages/udp-tracker-server/src/handlers/announce.rs index a0aabb765..38fe5acc6 100644 --- a/packages/udp-tracker-server/src/handlers/announce.rs +++ b/packages/udp-tracker-server/src/handlers/announce.rs @@ -16,7 +16,7 @@ use zerocopy::network_endian::I32; use crate::error::Error; use crate::statistics as server_statistics; -use crate::statistics::event::UdpResponseKind; +use crate::statistics::event::UdpRequestKind; /// It handles the `Announce` request. /// @@ -45,14 +45,14 @@ pub async fn handle_announce( IpAddr::V4(_) => { udp_server_stats_event_sender .send_event(server_statistics::event::Event::Udp4Request { - kind: UdpResponseKind::Announce, + kind: UdpRequestKind::Announce, }) .await; } IpAddr::V6(_) => { udp_server_stats_event_sender .send_event(server_statistics::event::Event::Udp6Request { - kind: UdpResponseKind::Announce, + kind: UdpRequestKind::Announce, }) .await; } @@ -226,7 +226,7 @@ mod tests { TorrentPeerBuilder, }; use crate::statistics as server_statistics; - use crate::statistics::event::UdpResponseKind; + use crate::statistics::event::UdpRequestKind; #[tokio::test] async fn an_announced_peer_should_be_added_to_the_tracker() { @@ -433,7 +433,7 @@ mod tests { udp_server_stats_event_sender_mock .expect_send_event() .with(eq(server_statistics::event::Event::Udp4Request { - kind: UdpResponseKind::Announce, + kind: UdpRequestKind::Announce, })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); @@ -549,7 +549,7 @@ mod tests { sample_issue_time, MockUdpServerStatsEventSender, TorrentPeerBuilder, }; use crate::statistics as server_statistics; - use crate::statistics::event::UdpResponseKind; + use crate::statistics::event::UdpRequestKind; #[tokio::test] async fn an_announced_peer_should_be_added_to_the_tracker() { @@ -775,7 +775,7 @@ mod tests { udp_server_stats_event_sender_mock .expect_send_event() .with(eq(server_statistics::event::Event::Udp6Request { - kind: UdpResponseKind::Announce, + kind: UdpRequestKind::Announce, })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); @@ -830,7 +830,7 @@ mod tests { TrackerConfigurationBuilder, }; use crate::statistics as server_statistics; - use crate::statistics::event::UdpResponseKind; + use crate::statistics::event::UdpRequestKind; #[tokio::test] async fn the_peer_ip_should_be_changed_to_the_external_ip_in_the_tracker_configuration() { @@ -871,7 +871,7 @@ mod tests { udp_server_stats_event_sender_mock .expect_send_event() .with(eq(server_statistics::event::Event::Udp6Request { - kind: UdpResponseKind::Announce, + kind: UdpRequestKind::Announce, })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); diff --git a/packages/udp-tracker-server/src/handlers/connect.rs b/packages/udp-tracker-server/src/handlers/connect.rs index bac3d7961..2111e7584 100644 --- a/packages/udp-tracker-server/src/handlers/connect.rs +++ b/packages/udp-tracker-server/src/handlers/connect.rs @@ -7,7 +7,7 @@ use bittorrent_udp_tracker_core::services::connect::ConnectService; use tracing::{instrument, Level}; use crate::statistics as server_statistics; -use crate::statistics::event::UdpResponseKind; +use crate::statistics::event::UdpRequestKind; /// It handles the `Connect` request. #[instrument(fields(transaction_id), skip(connect_service, opt_udp_server_stats_event_sender), ret(level = Level::TRACE))] @@ -27,14 +27,14 @@ pub async fn handle_connect( IpAddr::V4(_) => { udp_server_stats_event_sender .send_event(server_statistics::event::Event::Udp4Request { - kind: UdpResponseKind::Connect, + kind: UdpRequestKind::Connect, }) .await; } IpAddr::V6(_) => { udp_server_stats_event_sender .send_event(server_statistics::event::Event::Udp6Request { - kind: UdpResponseKind::Connect, + kind: UdpRequestKind::Connect, }) .await; } @@ -79,7 +79,7 @@ mod tests { sample_ipv6_remote_addr_fingerprint, sample_issue_time, MockUdpCoreStatsEventSender, MockUdpServerStatsEventSender, }; use crate::statistics as server_statistics; - use crate::statistics::event::UdpResponseKind; + use crate::statistics::event::UdpRequestKind; fn sample_connect_request() -> ConnectRequest { ConnectRequest { @@ -215,7 +215,7 @@ mod tests { udp_server_stats_event_sender_mock .expect_send_event() .with(eq(server_statistics::event::Event::Udp4Request { - kind: UdpResponseKind::Connect, + kind: UdpRequestKind::Connect, })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); @@ -255,7 +255,7 @@ mod tests { udp_server_stats_event_sender_mock .expect_send_event() .with(eq(server_statistics::event::Event::Udp6Request { - kind: UdpResponseKind::Connect, + kind: UdpRequestKind::Connect, })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); diff --git a/packages/udp-tracker-server/src/handlers/scrape.rs b/packages/udp-tracker-server/src/handlers/scrape.rs index e820b2e96..137c8a3cb 100644 --- a/packages/udp-tracker-server/src/handlers/scrape.rs +++ b/packages/udp-tracker-server/src/handlers/scrape.rs @@ -14,7 +14,7 @@ use zerocopy::network_endian::I32; use crate::error::Error; use crate::statistics as server_statistics; -use crate::statistics::event::UdpResponseKind; +use crate::statistics::event::UdpRequestKind; /// It handles the `Scrape` request. /// @@ -41,14 +41,14 @@ pub async fn handle_scrape( IpAddr::V4(_) => { udp_server_stats_event_sender .send_event(server_statistics::event::Event::Udp4Request { - kind: UdpResponseKind::Scrape, + kind: UdpRequestKind::Scrape, }) .await; } IpAddr::V6(_) => { udp_server_stats_event_sender .send_event(server_statistics::event::Event::Udp6Request { - kind: UdpResponseKind::Scrape, + kind: UdpRequestKind::Scrape, }) .await; } @@ -375,7 +375,7 @@ mod tests { udp_server_stats_event_sender_mock .expect_send_event() .with(eq(server_statistics::event::Event::Udp4Request { - kind: server_statistics::event::UdpResponseKind::Scrape, + kind: server_statistics::event::UdpRequestKind::Scrape, })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); @@ -422,7 +422,7 @@ mod tests { udp_server_stats_event_sender_mock .expect_send_event() .with(eq(server_statistics::event::Event::Udp6Request { - kind: server_statistics::event::UdpResponseKind::Scrape, + kind: server_statistics::event::UdpRequestKind::Scrape, })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); diff --git a/packages/udp-tracker-server/src/server/processor.rs b/packages/udp-tracker-server/src/server/processor.rs index 44b543571..52188c4c2 100644 --- a/packages/udp-tracker-server/src/server/processor.rs +++ b/packages/udp-tracker-server/src/server/processor.rs @@ -69,9 +69,15 @@ impl Processor { }; let udp_response_kind = match &response { - Response::Connect(_) => statistics::event::UdpResponseKind::Connect, - Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => statistics::event::UdpResponseKind::Announce, - Response::Scrape(_) => statistics::event::UdpResponseKind::Scrape, + Response::Connect(_) => statistics::event::UdpResponseKind::Ok { + req_kind: statistics::event::UdpRequestKind::Connect, + }, + Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => statistics::event::UdpResponseKind::Ok { + req_kind: statistics::event::UdpRequestKind::Announce, + }, + Response::Scrape(_) => statistics::event::UdpResponseKind::Ok { + req_kind: statistics::event::UdpRequestKind::Scrape, + }, Response::Error(_e) => statistics::event::UdpResponseKind::Error, }; diff --git a/packages/udp-tracker-server/src/statistics/event/handler.rs b/packages/udp-tracker-server/src/statistics/event/handler.rs index 5ce9f6307..7c7e4a8e7 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler.rs @@ -1,4 +1,4 @@ -use crate::statistics::event::{Event, UdpResponseKind}; +use crate::statistics::event::{Event, UdpRequestKind, UdpResponseKind}; use crate::statistics::repository::Repository; pub async fn handle_event(event: Event, stats_repository: &Repository) { @@ -16,16 +16,15 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { stats_repository.increase_udp4_requests().await; } Event::Udp4Request { kind } => match kind { - UdpResponseKind::Connect => { + UdpRequestKind::Connect => { stats_repository.increase_udp4_connections().await; } - UdpResponseKind::Announce => { + UdpRequestKind::Announce => { stats_repository.increase_udp4_announces().await; } - UdpResponseKind::Scrape => { + UdpRequestKind::Scrape => { stats_repository.increase_udp4_scrapes().await; } - UdpResponseKind::Error => {} }, Event::Udp4Response { kind, @@ -34,21 +33,23 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { stats_repository.increase_udp4_responses().await; match kind { - UdpResponseKind::Connect => { - stats_repository - .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) - .await; - } - UdpResponseKind::Announce => { - stats_repository - .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) - .await; - } - UdpResponseKind::Scrape => { - stats_repository - .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) - .await; - } + UdpResponseKind::Ok { req_kind } => match req_kind { + UdpRequestKind::Connect => { + stats_repository + .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) + .await; + } + UdpRequestKind::Announce => { + stats_repository + .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) + .await; + } + UdpRequestKind::Scrape => { + stats_repository + .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) + .await; + } + }, UdpResponseKind::Error => {} } } @@ -61,16 +62,15 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { stats_repository.increase_udp6_requests().await; } Event::Udp6Request { kind } => match kind { - UdpResponseKind::Connect => { + UdpRequestKind::Connect => { stats_repository.increase_udp6_connections().await; } - UdpResponseKind::Announce => { + UdpRequestKind::Announce => { stats_repository.increase_udp6_announces().await; } - UdpResponseKind::Scrape => { + UdpRequestKind::Scrape => { stats_repository.increase_udp6_scrapes().await; } - UdpResponseKind::Error => {} }, Event::Udp6Response { kind: _, @@ -89,7 +89,7 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { #[cfg(test)] mod tests { use crate::statistics::event::handler::handle_event; - use crate::statistics::event::Event; + use crate::statistics::event::{Event, UdpRequestKind}; use crate::statistics::repository::Repository; #[tokio::test] @@ -148,7 +148,7 @@ mod tests { handle_event( Event::Udp4Request { - kind: crate::statistics::event::UdpResponseKind::Connect, + kind: crate::statistics::event::UdpRequestKind::Connect, }, &stats_repository, ) @@ -165,7 +165,7 @@ mod tests { handle_event( Event::Udp4Request { - kind: crate::statistics::event::UdpResponseKind::Announce, + kind: crate::statistics::event::UdpRequestKind::Announce, }, &stats_repository, ) @@ -182,7 +182,7 @@ mod tests { handle_event( Event::Udp4Request { - kind: crate::statistics::event::UdpResponseKind::Scrape, + kind: crate::statistics::event::UdpRequestKind::Scrape, }, &stats_repository, ) @@ -199,7 +199,9 @@ mod tests { handle_event( Event::Udp4Response { - kind: crate::statistics::event::UdpResponseKind::Announce, + kind: crate::statistics::event::UdpResponseKind::Ok { + req_kind: UdpRequestKind::Announce, + }, req_processing_time: std::time::Duration::from_secs(1), }, &stats_repository, @@ -228,7 +230,7 @@ mod tests { handle_event( Event::Udp6Request { - kind: crate::statistics::event::UdpResponseKind::Connect, + kind: crate::statistics::event::UdpRequestKind::Connect, }, &stats_repository, ) @@ -245,7 +247,7 @@ mod tests { handle_event( Event::Udp6Request { - kind: crate::statistics::event::UdpResponseKind::Announce, + kind: crate::statistics::event::UdpRequestKind::Announce, }, &stats_repository, ) @@ -262,7 +264,7 @@ mod tests { handle_event( Event::Udp6Request { - kind: crate::statistics::event::UdpResponseKind::Scrape, + kind: crate::statistics::event::UdpRequestKind::Scrape, }, &stats_repository, ) @@ -279,7 +281,9 @@ mod tests { handle_event( Event::Udp6Response { - kind: crate::statistics::event::UdpResponseKind::Announce, + kind: crate::statistics::event::UdpResponseKind::Ok { + req_kind: UdpRequestKind::Announce, + }, req_processing_time: std::time::Duration::from_secs(1), }, &stats_repository, diff --git a/packages/udp-tracker-server/src/statistics/event/mod.rs b/packages/udp-tracker-server/src/statistics/event/mod.rs index 6a48b9449..3b14806aa 100644 --- a/packages/udp-tracker-server/src/statistics/event/mod.rs +++ b/packages/udp-tracker-server/src/statistics/event/mod.rs @@ -22,7 +22,7 @@ pub enum Event { // UDP4 Udp4IncomingRequest, Udp4Request { - kind: UdpResponseKind, + kind: UdpRequestKind, }, Udp4Response { kind: UdpResponseKind, @@ -33,7 +33,7 @@ pub enum Event { // UDP6 Udp6IncomingRequest, Udp6Request { - kind: UdpResponseKind, + kind: UdpRequestKind, }, Udp6Response { kind: UdpResponseKind, @@ -43,9 +43,14 @@ pub enum Event { } #[derive(Debug, PartialEq, Eq)] -pub enum UdpResponseKind { +pub enum UdpRequestKind { Connect, Announce, Scrape, - Error, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum UdpResponseKind { + Ok { req_kind: UdpRequestKind }, + Error, // todo: add the request kind `{ req_kind: Option(UdpRequestKind) }` when we know it. } From e4c6000645bca78ba6d69900574931603b4581f1 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 18 Mar 2025 08:46:29 +0000 Subject: [PATCH 2/6] refactor: [#1382] add connection context to UDP server events --- .../src/handlers/announce.rs | 22 +- .../src/handlers/connect.rs | 17 +- .../udp-tracker-server/src/handlers/error.rs | 21 +- .../udp-tracker-server/src/handlers/scrape.rs | 20 +- .../udp-tracker-server/src/server/launcher.rs | 23 +- .../src/server/processor.rs | 13 +- .../src/statistics/event/handler.rs | 273 ++++++++++++++---- .../src/statistics/event/mod.rs | 60 +++- .../src/statistics/keeper.rs | 13 +- 9 files changed, 357 insertions(+), 105 deletions(-) diff --git a/packages/udp-tracker-server/src/handlers/announce.rs b/packages/udp-tracker-server/src/handlers/announce.rs index 38fe5acc6..41e40695d 100644 --- a/packages/udp-tracker-server/src/handlers/announce.rs +++ b/packages/udp-tracker-server/src/handlers/announce.rs @@ -16,7 +16,7 @@ use zerocopy::network_endian::I32; use crate::error::Error; use crate::statistics as server_statistics; -use crate::statistics::event::UdpRequestKind; +use crate::statistics::event::{ConnectionContext, UdpRequestKind}; /// It handles the `Announce` request. /// @@ -45,6 +45,7 @@ pub async fn handle_announce( IpAddr::V4(_) => { udp_server_stats_event_sender .send_event(server_statistics::event::Event::Udp4Request { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, }) .await; @@ -52,6 +53,7 @@ pub async fn handle_announce( IpAddr::V6(_) => { udp_server_stats_event_sender .send_event(server_statistics::event::Event::Udp6Request { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, }) .await; @@ -429,10 +431,14 @@ mod tests { #[tokio::test] async fn should_send_the_upd4_announce_event() { + let client_socket_addr = sample_ipv4_socket_address(); + let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); + let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() .with(eq(server_statistics::event::Event::Udp4Request { + context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, })) .times(1) @@ -443,9 +449,6 @@ mod tests { let (core_tracker_services, core_udp_tracker_services, _server_udp_tracker_services) = initialize_core_tracker_services_for_default_tracker_configuration(); - let client_socket_addr = sample_ipv4_socket_address(); - let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969); - handle_announce( &core_udp_tracker_services.announce_service, client_socket_addr, @@ -771,10 +774,14 @@ mod tests { #[tokio::test] async fn should_send_the_upd6_announce_event() { + let client_socket_addr = sample_ipv6_remote_addr(); + let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969); + let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() .with(eq(server_statistics::event::Event::Udp6Request { + context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, })) .times(1) @@ -785,9 +792,6 @@ mod tests { let (core_tracker_services, core_udp_tracker_services, _server_udp_tracker_services) = initialize_core_tracker_services_for_default_tracker_configuration(); - let client_socket_addr = sample_ipv6_remote_addr(); - let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969); - let announce_request = AnnounceRequestBuilder::default() .with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap()) .into(); @@ -819,7 +823,6 @@ mod tests { use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist; use bittorrent_udp_tracker_core::connection_cookie::{gen_remote_fingerprint, make}; use bittorrent_udp_tracker_core::services::announce::AnnounceService; - use bittorrent_udp_tracker_core::statistics::event::ConnectionContext; use bittorrent_udp_tracker_core::{self, statistics as core_statistics}; use mockall::predicate::eq; @@ -860,7 +863,7 @@ mod tests { udp_core_stats_event_sender_mock .expect_send_event() .with(eq(core_statistics::event::Event::UdpAnnounce { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), + context: core_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); @@ -871,6 +874,7 @@ mod tests { udp_server_stats_event_sender_mock .expect_send_event() .with(eq(server_statistics::event::Event::Udp6Request { + context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, })) .times(1) diff --git a/packages/udp-tracker-server/src/handlers/connect.rs b/packages/udp-tracker-server/src/handlers/connect.rs index 2111e7584..3e0012d7d 100644 --- a/packages/udp-tracker-server/src/handlers/connect.rs +++ b/packages/udp-tracker-server/src/handlers/connect.rs @@ -7,13 +7,13 @@ use bittorrent_udp_tracker_core::services::connect::ConnectService; use tracing::{instrument, Level}; use crate::statistics as server_statistics; -use crate::statistics::event::UdpRequestKind; +use crate::statistics::event::{ConnectionContext, UdpRequestKind}; /// It handles the `Connect` request. #[instrument(fields(transaction_id), skip(connect_service, opt_udp_server_stats_event_sender), ret(level = Level::TRACE))] pub async fn handle_connect( - remote_addr: SocketAddr, - server_addr: SocketAddr, + client_socket_addr: SocketAddr, + server_socket_addr: SocketAddr, request: &ConnectRequest, connect_service: &Arc, opt_udp_server_stats_event_sender: &Arc>>, @@ -23,10 +23,11 @@ pub async fn handle_connect( tracing::trace!("handle connect"); if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { - match remote_addr.ip() { + match client_socket_addr.ip() { IpAddr::V4(_) => { udp_server_stats_event_sender .send_event(server_statistics::event::Event::Udp4Request { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Connect, }) .await; @@ -34,6 +35,7 @@ pub async fn handle_connect( IpAddr::V6(_) => { udp_server_stats_event_sender .send_event(server_statistics::event::Event::Udp6Request { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Connect, }) .await; @@ -42,7 +44,7 @@ pub async fn handle_connect( } let connection_id = connect_service - .handle_connect(remote_addr, server_addr, cookie_issue_time) + .handle_connect(client_socket_addr, server_socket_addr, cookie_issue_time) .await; build_response(*request, connection_id) @@ -70,7 +72,6 @@ mod tests { use bittorrent_udp_tracker_core::connection_cookie::make; use bittorrent_udp_tracker_core::services::connect::ConnectService; use bittorrent_udp_tracker_core::statistics as core_statistics; - use bittorrent_udp_tracker_core::statistics::event::ConnectionContext; use mockall::predicate::eq; use crate::handlers::handle_connect; @@ -215,6 +216,7 @@ mod tests { udp_server_stats_event_sender_mock .expect_send_event() .with(eq(server_statistics::event::Event::Udp4Request { + context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Connect, })) .times(1) @@ -244,7 +246,7 @@ mod tests { udp_core_stats_event_sender_mock .expect_send_event() .with(eq(core_statistics::event::Event::UdpConnect { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), + context: core_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); @@ -255,6 +257,7 @@ mod tests { udp_server_stats_event_sender_mock .expect_send_event() .with(eq(server_statistics::event::Event::Udp6Request { + context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Connect, })) .times(1) diff --git a/packages/udp-tracker-server/src/handlers/error.rs b/packages/udp-tracker-server/src/handlers/error.rs index e4bd382da..df553be9f 100644 --- a/packages/udp-tracker-server/src/handlers/error.rs +++ b/packages/udp-tracker-server/src/handlers/error.rs @@ -12,12 +12,13 @@ use zerocopy::network_endian::I32; use crate::error::Error; use crate::statistics as server_statistics; +use crate::statistics::event::ConnectionContext; #[allow(clippy::too_many_arguments)] #[instrument(fields(transaction_id), skip(opt_udp_server_stats_event_sender), ret(level = Level::TRACE))] pub async fn handle_error( - remote_addr: SocketAddr, - local_addr: SocketAddr, + client_socket_addr: SocketAddr, + server_socket_addr: SocketAddr, request_id: Uuid, opt_udp_server_stats_event_sender: &Arc>>, cookie_valid_range: Range, @@ -29,10 +30,10 @@ pub async fn handle_error( match transaction_id { Some(transaction_id) => { let transaction_id = transaction_id.0.to_string(); - tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %e, %remote_addr, %local_addr, %request_id, %transaction_id, "response error"); + tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %e, %client_socket_addr, %server_socket_addr, %request_id, %transaction_id, "response error"); } None => { - tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %e, %remote_addr, %local_addr, %request_id, "response error"); + tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %e, %client_socket_addr, %server_socket_addr, %request_id, "response error"); } } @@ -43,7 +44,7 @@ pub async fn handle_error( transaction_id, err, } => { - if let Err(e) = check(connection_id, gen_remote_fingerprint(&remote_addr), cookie_valid_range) { + if let Err(e) = check(connection_id, gen_remote_fingerprint(&client_socket_addr), cookie_valid_range) { (e.to_string(), Some(*transaction_id)) } else { ((*err).to_string(), Some(*transaction_id)) @@ -57,15 +58,19 @@ pub async fn handle_error( if e.1.is_some() { if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { - match remote_addr { + match client_socket_addr { SocketAddr::V4(_) => { udp_server_stats_event_sender - .send_event(server_statistics::event::Event::Udp4Error) + .send_event(server_statistics::event::Event::Udp4Error { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + }) .await; } SocketAddr::V6(_) => { udp_server_stats_event_sender - .send_event(server_statistics::event::Event::Udp6Error) + .send_event(server_statistics::event::Event::Udp6Error { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + }) .await; } } diff --git a/packages/udp-tracker-server/src/handlers/scrape.rs b/packages/udp-tracker-server/src/handlers/scrape.rs index 137c8a3cb..5f33f55ad 100644 --- a/packages/udp-tracker-server/src/handlers/scrape.rs +++ b/packages/udp-tracker-server/src/handlers/scrape.rs @@ -14,7 +14,7 @@ use zerocopy::network_endian::I32; use crate::error::Error; use crate::statistics as server_statistics; -use crate::statistics::event::UdpRequestKind; +use crate::statistics::event::{ConnectionContext, UdpRequestKind}; /// It handles the `Scrape` request. /// @@ -41,6 +41,7 @@ pub async fn handle_scrape( IpAddr::V4(_) => { udp_server_stats_event_sender .send_event(server_statistics::event::Event::Udp4Request { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Scrape, }) .await; @@ -48,6 +49,7 @@ pub async fn handle_scrape( IpAddr::V6(_) => { udp_server_stats_event_sender .send_event(server_statistics::event::Event::Udp6Request { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Scrape, }) .await; @@ -368,13 +370,18 @@ mod tests { sample_ipv4_remote_addr, MockUdpServerStatsEventSender, }; use crate::statistics as server_statistics; + use crate::statistics::event::ConnectionContext; #[tokio::test] async fn should_send_the_upd4_scrape_event() { + let client_socket_addr = sample_ipv4_remote_addr(); + let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969); + let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() .with(eq(server_statistics::event::Event::Udp4Request { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: server_statistics::event::UdpRequestKind::Scrape, })) .times(1) @@ -382,9 +389,6 @@ mod tests { let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); - let client_socket_addr = sample_ipv4_remote_addr(); - let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969); - let (_core_tracker_services, core_udp_tracker_services, _server_udp_tracker_services) = initialize_core_tracker_services_for_default_tracker_configuration(); @@ -415,13 +419,18 @@ mod tests { sample_ipv6_remote_addr, MockUdpServerStatsEventSender, }; use crate::statistics as server_statistics; + use crate::statistics::event::ConnectionContext; #[tokio::test] async fn should_send_the_upd6_scrape_event() { + let client_socket_addr = sample_ipv6_remote_addr(); + let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969); + let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() .with(eq(server_statistics::event::Event::Udp6Request { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: server_statistics::event::UdpRequestKind::Scrape, })) .times(1) @@ -429,9 +438,6 @@ mod tests { let udp_server_stats_event_sender: Arc>> = Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); - let client_socket_addr = sample_ipv6_remote_addr(); - let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969); - let (_core_tracker_services, core_udp_tracker_services, _server_udp_tracker_services) = initialize_core_tracker_services_for_default_tracker_configuration(); diff --git a/packages/udp-tracker-server/src/server/launcher.rs b/packages/udp-tracker-server/src/server/launcher.rs index acd214ab0..0dfbba174 100644 --- a/packages/udp-tracker-server/src/server/launcher.rs +++ b/packages/udp-tracker-server/src/server/launcher.rs @@ -21,6 +21,7 @@ use crate::server::bound_socket::BoundSocket; use crate::server::processor::Processor; use crate::server::receiver::Receiver; use crate::statistics; +use crate::statistics::event::ConnectionContext; const IP_BANS_RESET_INTERVAL_IN_SECS: u64 = 3600; @@ -129,9 +130,9 @@ impl Launcher { ) { let active_requests = &mut ActiveRequests::default(); - let addr = receiver.bound_socket_address(); + let server_socket_addr = receiver.bound_socket_address(); - let local_addr = format!("udp://{addr}"); + let local_addr = format!("udp://{server_socket_addr}"); let cookie_lifetime = cookie_lifetime.as_secs_f64(); @@ -167,17 +168,23 @@ impl Launcher { } }; + let client_socket_addr = req.from; + if let Some(udp_server_stats_event_sender) = udp_tracker_server_container.udp_server_stats_event_sender.as_deref() { match req.from.ip() { IpAddr::V4(_) => { udp_server_stats_event_sender - .send_event(statistics::event::Event::Udp4IncomingRequest) + .send_event(statistics::event::Event::Udp4IncomingRequest { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + }) .await; } IpAddr::V6(_) => { udp_server_stats_event_sender - .send_event(statistics::event::Event::Udp6IncomingRequest) + .send_event(statistics::event::Event::Udp6IncomingRequest { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + }) .await; } } @@ -190,7 +197,9 @@ impl Launcher { udp_tracker_server_container.udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender - .send_event(statistics::event::Event::UdpRequestBanned) + .send_event(statistics::event::Event::UdpRequestBanned { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + }) .await; } @@ -230,7 +239,9 @@ impl Launcher { udp_tracker_server_container.udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender - .send_event(statistics::event::Event::UdpRequestAborted) + .send_event(statistics::event::Event::UdpRequestAborted { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + }) .await; } } diff --git a/packages/udp-tracker-server/src/server/processor.rs b/packages/udp-tracker-server/src/server/processor.rs index 52188c4c2..999d74d00 100644 --- a/packages/udp-tracker-server/src/server/processor.rs +++ b/packages/udp-tracker-server/src/server/processor.rs @@ -12,6 +12,7 @@ use tracing::{instrument, Level}; use super::bound_socket::BoundSocket; use crate::container::UdpTrackerServerContainer; use crate::handlers::CookieTimeValues; +use crate::statistics::event::ConnectionContext; use crate::{handlers, statistics, RawRequest}; pub struct Processor { @@ -38,7 +39,7 @@ impl Processor { #[instrument(skip(self, request))] pub async fn process_request(self, request: RawRequest) { - let from = request.from; + let client_socket_addr = request.from; let start_time = Instant::now(); @@ -53,11 +54,11 @@ impl Processor { let elapsed_time = start_time.elapsed(); - self.send_response(from, response, elapsed_time).await; + self.send_response(client_socket_addr, response, elapsed_time).await; } #[instrument(skip(self))] - async fn send_response(self, target: SocketAddr, response: Response, req_processing_time: Duration) { + async fn send_response(self, client_socket_addr: SocketAddr, response: Response, req_processing_time: Duration) { tracing::debug!("send response"); let response_type = match &response { @@ -88,7 +89,7 @@ impl Processor { let bytes_count = writer.get_ref().len(); let payload = writer.get_ref(); - let () = match self.send_packet(&target, payload).await { + let () = match self.send_packet(&client_socket_addr, payload).await { Ok(sent_bytes) => { if tracing::event_enabled!(Level::TRACE) { tracing::debug!(%bytes_count, %sent_bytes, ?payload, "sent {response_type}"); @@ -99,10 +100,11 @@ impl Processor { if let Some(udp_server_stats_event_sender) = self.udp_tracker_server_container.udp_server_stats_event_sender.as_deref() { - match target.ip() { + match client_socket_addr.ip() { IpAddr::V4(_) => { udp_server_stats_event_sender .send_event(statistics::event::Event::Udp4Response { + context: ConnectionContext::new(client_socket_addr, self.socket.address()), kind: udp_response_kind, req_processing_time, }) @@ -111,6 +113,7 @@ impl Processor { IpAddr::V6(_) => { udp_server_stats_event_sender .send_event(statistics::event::Event::Udp6Response { + context: ConnectionContext::new(client_socket_addr, self.socket.address()), kind: udp_response_kind, req_processing_time, }) diff --git a/packages/udp-tracker-server/src/statistics/event/handler.rs b/packages/udp-tracker-server/src/statistics/event/handler.rs index 7c7e4a8e7..bda07f678 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler.rs @@ -1,85 +1,161 @@ use crate::statistics::event::{Event, UdpRequestKind, UdpResponseKind}; use crate::statistics::repository::Repository; +/// # Panics +/// +/// This function panics if the client IP version does not match the expected +/// version. +#[allow(clippy::too_many_lines)] pub async fn handle_event(event: Event, stats_repository: &Repository) { match event { // UDP - Event::UdpRequestAborted => { + Event::UdpRequestAborted { .. } => { stats_repository.increase_udp_requests_aborted().await; } - Event::UdpRequestBanned => { + Event::UdpRequestBanned { .. } => { stats_repository.increase_udp_requests_banned().await; } // UDP4 - Event::Udp4IncomingRequest => { - stats_repository.increase_udp4_requests().await; + Event::Udp4IncomingRequest { context } => { + if context.client_socket_addr.is_ipv4() { + stats_repository.increase_udp4_requests().await; + } else { + panic!("Client IP version does not match the expected version IPv4 for incoming request"); + } } - Event::Udp4Request { kind } => match kind { + Event::Udp4Request { context, kind } => match kind { UdpRequestKind::Connect => { - stats_repository.increase_udp4_connections().await; + if context.client_socket_addr.is_ipv4() { + stats_repository.increase_udp4_connections().await; + } else { + panic!("Client IP version does not match the expected version IPv4 for connect request"); + } } UdpRequestKind::Announce => { - stats_repository.increase_udp4_announces().await; + if context.client_socket_addr.is_ipv4() { + stats_repository.increase_udp4_announces().await; + } else { + panic!("Client IP version does not match the expected version IPv4 for announce request"); + } } UdpRequestKind::Scrape => { - stats_repository.increase_udp4_scrapes().await; + if context.client_socket_addr.is_ipv4() { + stats_repository.increase_udp4_scrapes().await; + } else { + panic!("Client IP version does not match the expected version IPv4 for scrape request"); + } } }, Event::Udp4Response { + context, kind, req_processing_time, } => { - stats_repository.increase_udp4_responses().await; - - match kind { - UdpResponseKind::Ok { req_kind } => match req_kind { - UdpRequestKind::Connect => { - stats_repository - .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) - .await; - } - UdpRequestKind::Announce => { - stats_repository - .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) - .await; - } - UdpRequestKind::Scrape => { - stats_repository - .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) - .await; - } - }, - UdpResponseKind::Error => {} + if context.client_socket_addr.is_ipv4() { + stats_repository.increase_udp4_responses().await; + + match kind { + UdpResponseKind::Ok { req_kind } => match req_kind { + UdpRequestKind::Connect => { + stats_repository + .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) + .await; + } + UdpRequestKind::Announce => { + stats_repository + .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) + .await; + } + UdpRequestKind::Scrape => { + stats_repository + .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) + .await; + } + }, + UdpResponseKind::Error => {} + } + } else { + panic!("Client IP version does not match the expected version IPv4 for response"); } } - Event::Udp4Error => { - stats_repository.increase_udp4_errors().await; + Event::Udp4Error { context } => { + if context.client_socket_addr.is_ipv4() { + stats_repository.increase_udp4_errors().await; + } else { + panic!("Client IP version does not match the expected version IPv4 for error"); + } } // UDP6 - Event::Udp6IncomingRequest => { - stats_repository.increase_udp6_requests().await; + Event::Udp6IncomingRequest { context } => { + if context.client_socket_addr.is_ipv6() { + stats_repository.increase_udp6_requests().await; + } else { + panic!("Client IP version does not match the expected version IPv6 for incoming request"); + } } - Event::Udp6Request { kind } => match kind { + Event::Udp6Request { context, kind } => match kind { UdpRequestKind::Connect => { - stats_repository.increase_udp6_connections().await; + if context.client_socket_addr.is_ipv6() { + stats_repository.increase_udp6_connections().await; + } else { + panic!("Client IP version does not match the expected version IPv6 for connect request"); + } } UdpRequestKind::Announce => { - stats_repository.increase_udp6_announces().await; + if context.client_socket_addr.is_ipv6() { + stats_repository.increase_udp6_announces().await; + } else { + panic!("Client IP version does not match the expected version IPv6 for announce request"); + } } UdpRequestKind::Scrape => { - stats_repository.increase_udp6_scrapes().await; + if context.client_socket_addr.is_ipv6() { + stats_repository.increase_udp6_scrapes().await; + } else { + panic!("Client IP version does not match the expected version IPv6 for scrape request"); + } } }, Event::Udp6Response { - kind: _, - req_processing_time: _, + context, + kind, + req_processing_time, } => { - stats_repository.increase_udp6_responses().await; + if context.client_socket_addr.is_ipv6() { + stats_repository.increase_udp6_responses().await; + + match kind { + UdpResponseKind::Ok { req_kind } => match req_kind { + UdpRequestKind::Connect => { + stats_repository + .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) + .await; + } + UdpRequestKind::Announce => { + stats_repository + .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) + .await; + } + UdpRequestKind::Scrape => { + stats_repository + .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) + .await; + } + }, + UdpResponseKind::Error => {} + } + } else { + panic!("Client IP version does not match the expected version IPv6 for response"); + } } - Event::Udp6Error => { - stats_repository.increase_udp6_errors().await; + Event::Udp6Error { context } => { + if context.client_socket_addr.is_ipv6() { + stats_repository.increase_udp6_errors().await; + } else { + panic!("Client IP version does not match the expected version IPv6 for error"); + } } } @@ -88,15 +164,26 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { #[cfg(test)] mod tests { + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + use crate::statistics::event::handler::handle_event; - use crate::statistics::event::{Event, UdpRequestKind}; + use crate::statistics::event::{ConnectionContext, Event, UdpRequestKind}; use crate::statistics::repository::Repository; #[tokio::test] async fn should_increase_the_number_of_aborted_requests_when_it_receives_a_udp_request_aborted_event() { let stats_repository = Repository::new(); - handle_event(Event::UdpRequestAborted, &stats_repository).await; + handle_event( + Event::UdpRequestAborted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ), + }, + &stats_repository, + ) + .await; let stats = stats_repository.get_stats().await; @@ -107,7 +194,16 @@ mod tests { async fn should_increase_the_number_of_banned_requests_when_it_receives_a_udp_request_banned_event() { let stats_repository = Repository::new(); - handle_event(Event::UdpRequestBanned, &stats_repository).await; + handle_event( + Event::UdpRequestBanned { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ), + }, + &stats_repository, + ) + .await; let stats = stats_repository.get_stats().await; @@ -118,7 +214,16 @@ mod tests { async fn should_increase_the_number_of_incoming_requests_when_it_receives_a_udp4_incoming_request_event() { let stats_repository = Repository::new(); - handle_event(Event::Udp4IncomingRequest, &stats_repository).await; + handle_event( + Event::Udp4IncomingRequest { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ), + }, + &stats_repository, + ) + .await; let stats = stats_repository.get_stats().await; @@ -129,7 +234,16 @@ mod tests { async fn should_increase_the_udp_abort_counter_when_it_receives_a_udp_abort_event() { let stats_repository = Repository::new(); - handle_event(Event::UdpRequestAborted, &stats_repository).await; + handle_event( + Event::UdpRequestAborted { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ), + }, + &stats_repository, + ) + .await; let stats = stats_repository.get_stats().await; assert_eq!(stats.udp_requests_aborted, 1); } @@ -137,7 +251,16 @@ mod tests { async fn should_increase_the_udp_ban_counter_when_it_receives_a_udp_banned_event() { let stats_repository = Repository::new(); - handle_event(Event::UdpRequestBanned, &stats_repository).await; + handle_event( + Event::UdpRequestBanned { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ), + }, + &stats_repository, + ) + .await; let stats = stats_repository.get_stats().await; assert_eq!(stats.udp_requests_banned, 1); } @@ -148,6 +271,10 @@ mod tests { handle_event( Event::Udp4Request { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ), kind: crate::statistics::event::UdpRequestKind::Connect, }, &stats_repository, @@ -165,6 +292,10 @@ mod tests { handle_event( Event::Udp4Request { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ), kind: crate::statistics::event::UdpRequestKind::Announce, }, &stats_repository, @@ -182,6 +313,10 @@ mod tests { handle_event( Event::Udp4Request { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ), kind: crate::statistics::event::UdpRequestKind::Scrape, }, &stats_repository, @@ -199,6 +334,10 @@ mod tests { handle_event( Event::Udp4Response { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ), kind: crate::statistics::event::UdpResponseKind::Ok { req_kind: UdpRequestKind::Announce, }, @@ -217,7 +356,16 @@ mod tests { async fn should_increase_the_udp4_errors_counter_when_it_receives_a_udp4_error_event() { let stats_repository = Repository::new(); - handle_event(Event::Udp4Error, &stats_repository).await; + handle_event( + Event::Udp4Error { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), + ), + }, + &stats_repository, + ) + .await; let stats = stats_repository.get_stats().await; @@ -230,6 +378,10 @@ mod tests { handle_event( Event::Udp6Request { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ), kind: crate::statistics::event::UdpRequestKind::Connect, }, &stats_repository, @@ -247,6 +399,10 @@ mod tests { handle_event( Event::Udp6Request { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ), kind: crate::statistics::event::UdpRequestKind::Announce, }, &stats_repository, @@ -264,6 +420,10 @@ mod tests { handle_event( Event::Udp6Request { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ), kind: crate::statistics::event::UdpRequestKind::Scrape, }, &stats_repository, @@ -281,6 +441,10 @@ mod tests { handle_event( Event::Udp6Response { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ), kind: crate::statistics::event::UdpResponseKind::Ok { req_kind: UdpRequestKind::Announce, }, @@ -298,7 +462,16 @@ mod tests { async fn should_increase_the_udp6_errors_counter_when_it_receives_a_udp6_error_event() { let stats_repository = Repository::new(); - handle_event(Event::Udp6Error, &stats_repository).await; + handle_event( + Event::Udp6Error { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ), + }, + &stats_repository, + ) + .await; let stats = stats_repository.get_stats().await; diff --git a/packages/udp-tracker-server/src/statistics/event/mod.rs b/packages/udp-tracker-server/src/statistics/event/mod.rs index 3b14806aa..64e2cb9c1 100644 --- a/packages/udp-tracker-server/src/statistics/event/mod.rs +++ b/packages/udp-tracker-server/src/statistics/event/mod.rs @@ -1,3 +1,4 @@ +use std::net::SocketAddr; use std::time::Duration; pub mod handler; @@ -6,40 +7,51 @@ pub mod sender; /// An statistics event. It is used to collect tracker metrics. /// -/// - `Tcp` prefix means the event was triggered by the HTTP tracker /// - `Udp` prefix means the event was triggered by the UDP tracker /// - `4` or `6` prefixes means the IP version used by the peer /// - Finally the event suffix is the type of request: `announce`, `scrape` or `connection` -/// -/// > NOTE: HTTP trackers do not use `connection` requests. #[derive(Debug, PartialEq, Eq)] pub enum Event { - // code-review: consider one single event for request type with data: Event::Announce { scheme: HTTPorUDP, ip_version: V4orV6 } - // Attributes are enums too. - UdpRequestAborted, - UdpRequestBanned, + UdpRequestAborted { + context: ConnectionContext, + }, + UdpRequestBanned { + context: ConnectionContext, + }, // UDP4 - Udp4IncomingRequest, + Udp4IncomingRequest { + context: ConnectionContext, + }, Udp4Request { + context: ConnectionContext, kind: UdpRequestKind, }, Udp4Response { + context: ConnectionContext, kind: UdpResponseKind, req_processing_time: Duration, }, - Udp4Error, + Udp4Error { + context: ConnectionContext, + }, // UDP6 - Udp6IncomingRequest, + Udp6IncomingRequest { + context: ConnectionContext, + }, Udp6Request { + context: ConnectionContext, kind: UdpRequestKind, }, Udp6Response { + context: ConnectionContext, kind: UdpResponseKind, req_processing_time: Duration, }, - Udp6Error, + Udp6Error { + context: ConnectionContext, + }, } #[derive(Debug, PartialEq, Eq)] @@ -54,3 +66,29 @@ pub enum UdpResponseKind { Ok { req_kind: UdpRequestKind }, Error, // todo: add the request kind `{ req_kind: Option(UdpRequestKind) }` when we know it. } + +#[derive(Debug, PartialEq, Eq)] +pub struct ConnectionContext { + client_socket_addr: SocketAddr, + server_socket_addr: SocketAddr, +} + +impl ConnectionContext { + #[must_use] + pub fn new(client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) -> Self { + Self { + client_socket_addr, + server_socket_addr, + } + } + + #[must_use] + pub fn client_socket_addr(&self) -> SocketAddr { + self.client_socket_addr + } + + #[must_use] + pub fn server_socket_addr(&self) -> SocketAddr { + self.server_socket_addr + } +} diff --git a/packages/udp-tracker-server/src/statistics/keeper.rs b/packages/udp-tracker-server/src/statistics/keeper.rs index ae80e7970..a6e6dde70 100644 --- a/packages/udp-tracker-server/src/statistics/keeper.rs +++ b/packages/udp-tracker-server/src/statistics/keeper.rs @@ -51,7 +51,9 @@ impl Keeper { #[cfg(test)] mod tests { - use crate::statistics::event::Event; + use std::net::{IpAddr, Ipv6Addr, SocketAddr}; + + use crate::statistics::event::{ConnectionContext, Event}; use crate::statistics::keeper::Keeper; use crate::statistics::metrics::Metrics; @@ -70,7 +72,14 @@ mod tests { let event_sender = stats_tracker.run_event_listener(); - let result = event_sender.send_event(Event::Udp4IncomingRequest).await; + let result = event_sender + .send_event(Event::Udp4IncomingRequest { + context: ConnectionContext::new( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), + ), + }) + .await; assert!(result.is_some()); } From 203a1b45e4e34103b9788393533e79613aab3dfa Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 18 Mar 2025 10:36:50 +0000 Subject: [PATCH 3/6] refactor: [#1382] merge UDP server stats events with different IP version --- .../src/handlers/announce.rs | 30 +-- .../src/handlers/connect.rs | 30 +-- .../udp-tracker-server/src/handlers/error.rs | 21 +- .../udp-tracker-server/src/handlers/scrape.rs | 30 +-- .../udp-tracker-server/src/server/launcher.rs | 23 +- .../src/server/processor.rs | 29 +-- .../src/statistics/event/handler.rs | 200 ++++++------------ .../src/statistics/event/mod.rs | 35 +-- .../src/statistics/keeper.rs | 2 +- 9 files changed, 120 insertions(+), 280 deletions(-) diff --git a/packages/udp-tracker-server/src/handlers/announce.rs b/packages/udp-tracker-server/src/handlers/announce.rs index 41e40695d..6b5cbb42b 100644 --- a/packages/udp-tracker-server/src/handlers/announce.rs +++ b/packages/udp-tracker-server/src/handlers/announce.rs @@ -41,24 +41,12 @@ pub async fn handle_announce( tracing::trace!("handle announce"); if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { - match client_socket_addr.ip() { - IpAddr::V4(_) => { - udp_server_stats_event_sender - .send_event(server_statistics::event::Event::Udp4Request { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), - kind: UdpRequestKind::Announce, - }) - .await; - } - IpAddr::V6(_) => { - udp_server_stats_event_sender - .send_event(server_statistics::event::Event::Udp6Request { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), - kind: UdpRequestKind::Announce, - }) - .await; - } - } + udp_server_stats_event_sender + .send_event(server_statistics::event::Event::UdpRequest { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + kind: UdpRequestKind::Announce, + }) + .await; } let announce_data = announce_service @@ -437,7 +425,7 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::Udp4Request { + .with(eq(server_statistics::event::Event::UdpRequest { context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, })) @@ -780,7 +768,7 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::Udp6Request { + .with(eq(server_statistics::event::Event::UdpRequest { context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, })) @@ -873,7 +861,7 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::Udp6Request { + .with(eq(server_statistics::event::Event::UdpRequest { context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, })) diff --git a/packages/udp-tracker-server/src/handlers/connect.rs b/packages/udp-tracker-server/src/handlers/connect.rs index 3e0012d7d..7d96f4cbd 100644 --- a/packages/udp-tracker-server/src/handlers/connect.rs +++ b/packages/udp-tracker-server/src/handlers/connect.rs @@ -1,5 +1,5 @@ //! UDP tracker connect handler. -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use std::sync::Arc; use aquatic_udp_protocol::{ConnectRequest, ConnectResponse, ConnectionId, Response}; @@ -23,24 +23,12 @@ pub async fn handle_connect( tracing::trace!("handle connect"); if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { - match client_socket_addr.ip() { - IpAddr::V4(_) => { - udp_server_stats_event_sender - .send_event(server_statistics::event::Event::Udp4Request { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), - kind: UdpRequestKind::Connect, - }) - .await; - } - IpAddr::V6(_) => { - udp_server_stats_event_sender - .send_event(server_statistics::event::Event::Udp6Request { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), - kind: UdpRequestKind::Connect, - }) - .await; - } - } + udp_server_stats_event_sender + .send_event(server_statistics::event::Event::UdpRequest { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + kind: UdpRequestKind::Connect, + }) + .await; } let connection_id = connect_service @@ -215,7 +203,7 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::Udp4Request { + .with(eq(server_statistics::event::Event::UdpRequest { context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Connect, })) @@ -256,7 +244,7 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::Udp6Request { + .with(eq(server_statistics::event::Event::UdpRequest { context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Connect, })) diff --git a/packages/udp-tracker-server/src/handlers/error.rs b/packages/udp-tracker-server/src/handlers/error.rs index df553be9f..cb341bc5c 100644 --- a/packages/udp-tracker-server/src/handlers/error.rs +++ b/packages/udp-tracker-server/src/handlers/error.rs @@ -58,22 +58,11 @@ pub async fn handle_error( if e.1.is_some() { if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { - match client_socket_addr { - SocketAddr::V4(_) => { - udp_server_stats_event_sender - .send_event(server_statistics::event::Event::Udp4Error { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), - }) - .await; - } - SocketAddr::V6(_) => { - udp_server_stats_event_sender - .send_event(server_statistics::event::Event::Udp6Error { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), - }) - .await; - } - } + udp_server_stats_event_sender + .send_event(server_statistics::event::Event::UdpError { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + }) + .await; } } diff --git a/packages/udp-tracker-server/src/handlers/scrape.rs b/packages/udp-tracker-server/src/handlers/scrape.rs index 5f33f55ad..7597c9b8e 100644 --- a/packages/udp-tracker-server/src/handlers/scrape.rs +++ b/packages/udp-tracker-server/src/handlers/scrape.rs @@ -1,5 +1,5 @@ //! UDP tracker scrape handler. -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use std::ops::Range; use std::sync::Arc; @@ -37,24 +37,12 @@ pub async fn handle_scrape( tracing::trace!("handle scrape"); if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { - match client_socket_addr.ip() { - IpAddr::V4(_) => { - udp_server_stats_event_sender - .send_event(server_statistics::event::Event::Udp4Request { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), - kind: UdpRequestKind::Scrape, - }) - .await; - } - IpAddr::V6(_) => { - udp_server_stats_event_sender - .send_event(server_statistics::event::Event::Udp6Request { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), - kind: UdpRequestKind::Scrape, - }) - .await; - } - } + udp_server_stats_event_sender + .send_event(server_statistics::event::Event::UdpRequest { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + kind: UdpRequestKind::Scrape, + }) + .await; } let scrape_data = scrape_service @@ -380,7 +368,7 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::Udp4Request { + .with(eq(server_statistics::event::Event::UdpRequest { context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: server_statistics::event::UdpRequestKind::Scrape, })) @@ -429,7 +417,7 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::Udp6Request { + .with(eq(server_statistics::event::Event::UdpRequest { context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: server_statistics::event::UdpRequestKind::Scrape, })) diff --git a/packages/udp-tracker-server/src/server/launcher.rs b/packages/udp-tracker-server/src/server/launcher.rs index 0dfbba174..a3da6a2a8 100644 --- a/packages/udp-tracker-server/src/server/launcher.rs +++ b/packages/udp-tracker-server/src/server/launcher.rs @@ -1,4 +1,4 @@ -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -172,22 +172,11 @@ impl Launcher { if let Some(udp_server_stats_event_sender) = udp_tracker_server_container.udp_server_stats_event_sender.as_deref() { - match req.from.ip() { - IpAddr::V4(_) => { - udp_server_stats_event_sender - .send_event(statistics::event::Event::Udp4IncomingRequest { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), - }) - .await; - } - IpAddr::V6(_) => { - udp_server_stats_event_sender - .send_event(statistics::event::Event::Udp6IncomingRequest { - context: ConnectionContext::new(client_socket_addr, server_socket_addr), - }) - .await; - } - } + udp_server_stats_event_sender + .send_event(statistics::event::Event::UdpIncomingRequest { + context: ConnectionContext::new(client_socket_addr, server_socket_addr), + }) + .await; } if udp_tracker_core_container.ban_service.read().await.is_banned(&req.from.ip()) { diff --git a/packages/udp-tracker-server/src/server/processor.rs b/packages/udp-tracker-server/src/server/processor.rs index 999d74d00..acf8e8ae3 100644 --- a/packages/udp-tracker-server/src/server/processor.rs +++ b/packages/udp-tracker-server/src/server/processor.rs @@ -1,5 +1,5 @@ use std::io::Cursor; -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -100,26 +100,13 @@ impl Processor { if let Some(udp_server_stats_event_sender) = self.udp_tracker_server_container.udp_server_stats_event_sender.as_deref() { - match client_socket_addr.ip() { - IpAddr::V4(_) => { - udp_server_stats_event_sender - .send_event(statistics::event::Event::Udp4Response { - context: ConnectionContext::new(client_socket_addr, self.socket.address()), - kind: udp_response_kind, - req_processing_time, - }) - .await; - } - IpAddr::V6(_) => { - udp_server_stats_event_sender - .send_event(statistics::event::Event::Udp6Response { - context: ConnectionContext::new(client_socket_addr, self.socket.address()), - kind: udp_response_kind, - req_processing_time, - }) - .await; - } - } + udp_server_stats_event_sender + .send_event(statistics::event::Event::UdpResponse { + context: ConnectionContext::new(client_socket_addr, self.socket.address()), + kind: udp_response_kind, + req_processing_time, + }) + .await; } } Err(error) => tracing::warn!(%bytes_count, %error, ?payload, "failed to send"), diff --git a/packages/udp-tracker-server/src/statistics/event/handler.rs b/packages/udp-tracker-server/src/statistics/event/handler.rs index bda07f678..5200561c7 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler.rs @@ -8,155 +8,89 @@ use crate::statistics::repository::Repository; #[allow(clippy::too_many_lines)] pub async fn handle_event(event: Event, stats_repository: &Repository) { match event { - // UDP Event::UdpRequestAborted { .. } => { stats_repository.increase_udp_requests_aborted().await; } Event::UdpRequestBanned { .. } => { stats_repository.increase_udp_requests_banned().await; } - - // UDP4 - Event::Udp4IncomingRequest { context } => { - if context.client_socket_addr.is_ipv4() { + Event::UdpIncomingRequest { context } => match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { stats_repository.increase_udp4_requests().await; - } else { - panic!("Client IP version does not match the expected version IPv4 for incoming request"); - } - } - Event::Udp4Request { context, kind } => match kind { - UdpRequestKind::Connect => { - if context.client_socket_addr.is_ipv4() { - stats_repository.increase_udp4_connections().await; - } else { - panic!("Client IP version does not match the expected version IPv4 for connect request"); - } } - UdpRequestKind::Announce => { - if context.client_socket_addr.is_ipv4() { - stats_repository.increase_udp4_announces().await; - } else { - panic!("Client IP version does not match the expected version IPv4 for announce request"); - } - } - UdpRequestKind::Scrape => { - if context.client_socket_addr.is_ipv4() { - stats_repository.increase_udp4_scrapes().await; - } else { - panic!("Client IP version does not match the expected version IPv4 for scrape request"); - } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_requests().await; } }, - Event::Udp4Response { - context, - kind, - req_processing_time, - } => { - if context.client_socket_addr.is_ipv4() { - stats_repository.increase_udp4_responses().await; - - match kind { - UdpResponseKind::Ok { req_kind } => match req_kind { - UdpRequestKind::Connect => { - stats_repository - .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) - .await; - } - UdpRequestKind::Announce => { - stats_repository - .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) - .await; - } - UdpRequestKind::Scrape => { - stats_repository - .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) - .await; - } - }, - UdpResponseKind::Error => {} + Event::UdpRequest { context, kind } => match kind { + UdpRequestKind::Connect => match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_connections().await; } - } else { - panic!("Client IP version does not match the expected version IPv4 for response"); - } - } - Event::Udp4Error { context } => { - if context.client_socket_addr.is_ipv4() { - stats_repository.increase_udp4_errors().await; - } else { - panic!("Client IP version does not match the expected version IPv4 for error"); - } - } - - // UDP6 - Event::Udp6IncomingRequest { context } => { - if context.client_socket_addr.is_ipv6() { - stats_repository.increase_udp6_requests().await; - } else { - panic!("Client IP version does not match the expected version IPv6 for incoming request"); - } - } - Event::Udp6Request { context, kind } => match kind { - UdpRequestKind::Connect => { - if context.client_socket_addr.is_ipv6() { + std::net::IpAddr::V6(_) => { stats_repository.increase_udp6_connections().await; - } else { - panic!("Client IP version does not match the expected version IPv6 for connect request"); } - } - UdpRequestKind::Announce => { - if context.client_socket_addr.is_ipv6() { + }, + UdpRequestKind::Announce => match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_announces().await; + } + std::net::IpAddr::V6(_) => { stats_repository.increase_udp6_announces().await; - } else { - panic!("Client IP version does not match the expected version IPv6 for announce request"); } - } - UdpRequestKind::Scrape => { - if context.client_socket_addr.is_ipv6() { + }, + UdpRequestKind::Scrape => match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_scrapes().await; + } + std::net::IpAddr::V6(_) => { stats_repository.increase_udp6_scrapes().await; - } else { - panic!("Client IP version does not match the expected version IPv6 for scrape request"); } - } + }, }, - Event::Udp6Response { + Event::UdpResponse { context, kind, req_processing_time, } => { - if context.client_socket_addr.is_ipv6() { - stats_repository.increase_udp6_responses().await; - - match kind { - UdpResponseKind::Ok { req_kind } => match req_kind { - UdpRequestKind::Connect => { - stats_repository - .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) - .await; - } - UdpRequestKind::Announce => { - stats_repository - .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) - .await; - } - UdpRequestKind::Scrape => { - stats_repository - .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) - .await; - } - }, - UdpResponseKind::Error => {} + match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_responses().await; + } + std::net::IpAddr::V6(_) => { + stats_repository.increase_udp6_responses().await; } - } else { - panic!("Client IP version does not match the expected version IPv6 for response"); + } + + match kind { + UdpResponseKind::Ok { req_kind } => match req_kind { + UdpRequestKind::Connect => { + stats_repository + .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) + .await; + } + UdpRequestKind::Announce => { + stats_repository + .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) + .await; + } + UdpRequestKind::Scrape => { + stats_repository + .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) + .await; + } + }, + UdpResponseKind::Error => {} } } - Event::Udp6Error { context } => { - if context.client_socket_addr.is_ipv6() { + Event::UdpError { context } => match context.client_socket_addr().ip() { + std::net::IpAddr::V4(_) => { + stats_repository.increase_udp4_errors().await; + } + std::net::IpAddr::V6(_) => { stats_repository.increase_udp6_errors().await; - } else { - panic!("Client IP version does not match the expected version IPv6 for error"); } - } + }, } tracing::debug!("stats: {:?}", stats_repository.get_stats().await); @@ -215,7 +149,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp4IncomingRequest { + Event::UdpIncomingRequest { context: ConnectionContext::new( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), @@ -270,7 +204,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp4Request { + Event::UdpRequest { context: ConnectionContext::new( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), @@ -291,7 +225,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp4Request { + Event::UdpRequest { context: ConnectionContext::new( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), @@ -312,7 +246,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp4Request { + Event::UdpRequest { context: ConnectionContext::new( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), @@ -333,7 +267,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp4Response { + Event::UdpResponse { context: ConnectionContext::new( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), @@ -357,7 +291,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp4Error { + Event::UdpError { context: ConnectionContext::new( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), @@ -377,7 +311,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp6Request { + Event::UdpRequest { context: ConnectionContext::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), @@ -398,7 +332,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp6Request { + Event::UdpRequest { context: ConnectionContext::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), @@ -419,7 +353,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp6Request { + Event::UdpRequest { context: ConnectionContext::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), @@ -440,7 +374,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp6Response { + Event::UdpResponse { context: ConnectionContext::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), @@ -463,7 +397,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::Udp6Error { + Event::UdpError { context: ConnectionContext::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), diff --git a/packages/udp-tracker-server/src/statistics/event/mod.rs b/packages/udp-tracker-server/src/statistics/event/mod.rs index 64e2cb9c1..b22cd455d 100644 --- a/packages/udp-tracker-server/src/statistics/event/mod.rs +++ b/packages/udp-tracker-server/src/statistics/event/mod.rs @@ -6,50 +6,27 @@ pub mod listener; pub mod sender; /// An statistics event. It is used to collect tracker metrics. -/// -/// - `Udp` prefix means the event was triggered by the UDP tracker -/// - `4` or `6` prefixes means the IP version used by the peer -/// - Finally the event suffix is the type of request: `announce`, `scrape` or `connection` #[derive(Debug, PartialEq, Eq)] pub enum Event { - UdpRequestAborted { + UdpIncomingRequest { context: ConnectionContext, }, - UdpRequestBanned { - context: ConnectionContext, - }, - - // UDP4 - Udp4IncomingRequest { - context: ConnectionContext, - }, - Udp4Request { - context: ConnectionContext, - kind: UdpRequestKind, - }, - Udp4Response { - context: ConnectionContext, - kind: UdpResponseKind, - req_processing_time: Duration, - }, - Udp4Error { + UdpRequestAborted { context: ConnectionContext, }, - - // UDP6 - Udp6IncomingRequest { + UdpRequestBanned { context: ConnectionContext, }, - Udp6Request { + UdpRequest { context: ConnectionContext, kind: UdpRequestKind, }, - Udp6Response { + UdpResponse { context: ConnectionContext, kind: UdpResponseKind, req_processing_time: Duration, }, - Udp6Error { + UdpError { context: ConnectionContext, }, } diff --git a/packages/udp-tracker-server/src/statistics/keeper.rs b/packages/udp-tracker-server/src/statistics/keeper.rs index a6e6dde70..c29dcb1b2 100644 --- a/packages/udp-tracker-server/src/statistics/keeper.rs +++ b/packages/udp-tracker-server/src/statistics/keeper.rs @@ -73,7 +73,7 @@ mod tests { let event_sender = stats_tracker.run_event_listener(); let result = event_sender - .send_event(Event::Udp4IncomingRequest { + .send_event(Event::UdpIncomingRequest { context: ConnectionContext::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), From 625d20adf7a502ecbab01b21bbaf5002635418f6 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 18 Mar 2025 10:50:33 +0000 Subject: [PATCH 4/6] refactor: [#1382] rename torrust_udp_tracker_server::statistics::event::Event::UdpRequest --- .../udp-tracker-server/src/handlers/announce.rs | 8 ++++---- .../udp-tracker-server/src/handlers/connect.rs | 6 +++--- packages/udp-tracker-server/src/handlers/scrape.rs | 6 +++--- .../src/statistics/event/handler.rs | 14 +++++++------- .../udp-tracker-server/src/statistics/event/mod.rs | 2 +- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/packages/udp-tracker-server/src/handlers/announce.rs b/packages/udp-tracker-server/src/handlers/announce.rs index 6b5cbb42b..32c9e0cbd 100644 --- a/packages/udp-tracker-server/src/handlers/announce.rs +++ b/packages/udp-tracker-server/src/handlers/announce.rs @@ -42,7 +42,7 @@ pub async fn handle_announce( if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender - .send_event(server_statistics::event::Event::UdpRequest { + .send_event(server_statistics::event::Event::UdpRequestAccepted { context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, }) @@ -425,7 +425,7 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::UdpRequest { + .with(eq(server_statistics::event::Event::UdpRequestAccepted { context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, })) @@ -768,7 +768,7 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::UdpRequest { + .with(eq(server_statistics::event::Event::UdpRequestAccepted { context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, })) @@ -861,7 +861,7 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::UdpRequest { + .with(eq(server_statistics::event::Event::UdpRequestAccepted { context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Announce, })) diff --git a/packages/udp-tracker-server/src/handlers/connect.rs b/packages/udp-tracker-server/src/handlers/connect.rs index 7d96f4cbd..c38eb56e5 100644 --- a/packages/udp-tracker-server/src/handlers/connect.rs +++ b/packages/udp-tracker-server/src/handlers/connect.rs @@ -24,7 +24,7 @@ pub async fn handle_connect( if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender - .send_event(server_statistics::event::Event::UdpRequest { + .send_event(server_statistics::event::Event::UdpRequestAccepted { context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Connect, }) @@ -203,7 +203,7 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::UdpRequest { + .with(eq(server_statistics::event::Event::UdpRequestAccepted { context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Connect, })) @@ -244,7 +244,7 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::UdpRequest { + .with(eq(server_statistics::event::Event::UdpRequestAccepted { context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Connect, })) diff --git a/packages/udp-tracker-server/src/handlers/scrape.rs b/packages/udp-tracker-server/src/handlers/scrape.rs index 7597c9b8e..aeca7bd12 100644 --- a/packages/udp-tracker-server/src/handlers/scrape.rs +++ b/packages/udp-tracker-server/src/handlers/scrape.rs @@ -38,7 +38,7 @@ pub async fn handle_scrape( if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender - .send_event(server_statistics::event::Event::UdpRequest { + .send_event(server_statistics::event::Event::UdpRequestAccepted { context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: UdpRequestKind::Scrape, }) @@ -368,7 +368,7 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::UdpRequest { + .with(eq(server_statistics::event::Event::UdpRequestAccepted { context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: server_statistics::event::UdpRequestKind::Scrape, })) @@ -417,7 +417,7 @@ mod tests { let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(server_statistics::event::Event::UdpRequest { + .with(eq(server_statistics::event::Event::UdpRequestAccepted { context: ConnectionContext::new(client_socket_addr, server_socket_addr), kind: server_statistics::event::UdpRequestKind::Scrape, })) diff --git a/packages/udp-tracker-server/src/statistics/event/handler.rs b/packages/udp-tracker-server/src/statistics/event/handler.rs index 5200561c7..03bfeae65 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler.rs @@ -22,7 +22,7 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { stats_repository.increase_udp6_requests().await; } }, - Event::UdpRequest { context, kind } => match kind { + Event::UdpRequestAccepted { context, kind } => match kind { UdpRequestKind::Connect => match context.client_socket_addr().ip() { std::net::IpAddr::V4(_) => { stats_repository.increase_udp4_connections().await; @@ -204,7 +204,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::UdpRequest { + Event::UdpRequestAccepted { context: ConnectionContext::new( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), @@ -225,7 +225,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::UdpRequest { + Event::UdpRequestAccepted { context: ConnectionContext::new( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), @@ -246,7 +246,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::UdpRequest { + Event::UdpRequestAccepted { context: ConnectionContext::new( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), @@ -311,7 +311,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::UdpRequest { + Event::UdpRequestAccepted { context: ConnectionContext::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), @@ -332,7 +332,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::UdpRequest { + Event::UdpRequestAccepted { context: ConnectionContext::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), @@ -353,7 +353,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::UdpRequest { + Event::UdpRequestAccepted { context: ConnectionContext::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), diff --git a/packages/udp-tracker-server/src/statistics/event/mod.rs b/packages/udp-tracker-server/src/statistics/event/mod.rs index b22cd455d..207916846 100644 --- a/packages/udp-tracker-server/src/statistics/event/mod.rs +++ b/packages/udp-tracker-server/src/statistics/event/mod.rs @@ -17,7 +17,7 @@ pub enum Event { UdpRequestBanned { context: ConnectionContext, }, - UdpRequest { + UdpRequestAccepted { context: ConnectionContext, kind: UdpRequestKind, }, From 27e2db4b8f7515cf9ae5c08232431ae5719f8b7a Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 18 Mar 2025 11:31:33 +0000 Subject: [PATCH 5/6] refactor: [#1382] include req kin in UDP error response if it's known It could be unkown if the request couldb be parsed succesfully. --- .../src/handlers/announce.rs | 4 +- .../udp-tracker-server/src/handlers/error.rs | 3 +- .../udp-tracker-server/src/handlers/mod.rs | 60 ++++++++++++------- .../udp-tracker-server/src/handlers/scrape.rs | 4 +- .../src/server/processor.rs | 17 ++++-- .../src/statistics/event/handler.rs | 2 +- .../src/statistics/event/mod.rs | 19 ++++-- 7 files changed, 71 insertions(+), 38 deletions(-) diff --git a/packages/udp-tracker-server/src/handlers/announce.rs b/packages/udp-tracker-server/src/handlers/announce.rs index 32c9e0cbd..5cc3cf3c8 100644 --- a/packages/udp-tracker-server/src/handlers/announce.rs +++ b/packages/udp-tracker-server/src/handlers/announce.rs @@ -32,7 +32,7 @@ pub async fn handle_announce( core_config: &Arc, opt_udp_server_stats_event_sender: &Arc>>, cookie_valid_range: Range, -) -> Result { +) -> Result { tracing::Span::current() .record("transaction_id", request.transaction_id.0.to_string()) .record("connection_id", request.connection_id.0.to_string()) @@ -52,7 +52,7 @@ pub async fn handle_announce( let announce_data = announce_service .handle_announce(client_socket_addr, server_socket_addr, request, cookie_valid_range) .await - .map_err(|e| (e.into(), request.transaction_id))?; + .map_err(|e| (e.into(), request.transaction_id, UdpRequestKind::Announce))?; Ok(build_response(client_socket_addr, request, core_config, &announce_data)) } diff --git a/packages/udp-tracker-server/src/handlers/error.rs b/packages/udp-tracker-server/src/handlers/error.rs index cb341bc5c..d1ffe2fd4 100644 --- a/packages/udp-tracker-server/src/handlers/error.rs +++ b/packages/udp-tracker-server/src/handlers/error.rs @@ -12,11 +12,12 @@ use zerocopy::network_endian::I32; use crate::error::Error; use crate::statistics as server_statistics; -use crate::statistics::event::ConnectionContext; +use crate::statistics::event::{ConnectionContext, UdpRequestKind}; #[allow(clippy::too_many_arguments)] #[instrument(fields(transaction_id), skip(opt_udp_server_stats_event_sender), ret(level = Level::TRACE))] pub async fn handle_error( + req_kind: Option, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr, request_id: Uuid, diff --git a/packages/udp-tracker-server/src/handlers/mod.rs b/packages/udp-tracker-server/src/handlers/mod.rs index 162af3020..e346d1953 100644 --- a/packages/udp-tracker-server/src/handlers/mod.rs +++ b/packages/udp-tracker-server/src/handlers/mod.rs @@ -24,6 +24,7 @@ use uuid::Uuid; use super::RawRequest; use crate::container::UdpTrackerServerContainer; use crate::error::Error; +use crate::statistics::event::UdpRequestKind; use crate::CurrentClock; #[derive(Debug, Clone, PartialEq)] @@ -60,7 +61,7 @@ pub(crate) async fn handle_packet( udp_tracker_server_container: Arc, server_socket_addr: SocketAddr, cookie_time_values: CookieTimeValues, -) -> Response { +) -> (Response, Option) { let request_id = Uuid::new_v4(); tracing::Span::current().record("request_id", request_id.to_string()); @@ -68,7 +69,7 @@ pub(crate) async fn handle_packet( let start_time = Instant::now(); - let response = + let (response, opt_req_kind) = match Request::parse_bytes(&udp_request.payload[..udp_request.payload.len()], MAX_SCRAPE_TORRENTS).map_err(Error::from) { Ok(request) => match handle_request( request, @@ -80,8 +81,8 @@ pub(crate) async fn handle_packet( ) .await { - Ok(response) => return response, - Err((error, transaction_id)) => { + Ok((response, req_kid)) => return (response, Some(req_kid)), + Err((error, transaction_id, req_kind)) => { if let Error::UdpAnnounceError { source: UdpAnnounceError::ConnectionCookieError { .. }, } = error @@ -91,7 +92,8 @@ pub(crate) async fn handle_packet( ban_service.increase_counter(&udp_request.from.ip()); } - handle_error( + let response = handle_error( + Some(req_kind.clone()), udp_request.from, server_socket_addr, request_id, @@ -100,11 +102,14 @@ pub(crate) async fn handle_packet( &error, Some(transaction_id), ) - .await + .await; + + (response, Some(req_kind)) } }, Err(e) => { - handle_error( + let response = handle_error( + None, udp_request.from, server_socket_addr, request_id, @@ -113,14 +118,16 @@ pub(crate) async fn handle_packet( &e, None, ) - .await + .await; + + (response, None) } }; let latency = start_time.elapsed(); tracing::trace!(?latency, "responded"); - response + (response, opt_req_kind) } /// It dispatches the request to the correct handler. @@ -143,21 +150,24 @@ pub async fn handle_request( udp_tracker_core_container: Arc, udp_tracker_server_container: Arc, cookie_time_values: CookieTimeValues, -) -> Result { +) -> Result<(Response, UdpRequestKind), (Error, TransactionId, UdpRequestKind)> { tracing::trace!("handle request"); match request { - Request::Connect(connect_request) => Ok(handle_connect( - client_socket_addr, - server_socket_addr, - &connect_request, - &udp_tracker_core_container.connect_service, - &udp_tracker_server_container.udp_server_stats_event_sender, - cookie_time_values.issue_time, - ) - .await), + Request::Connect(connect_request) => Ok(( + handle_connect( + client_socket_addr, + server_socket_addr, + &connect_request, + &udp_tracker_core_container.connect_service, + &udp_tracker_server_container.udp_server_stats_event_sender, + cookie_time_values.issue_time, + ) + .await, + UdpRequestKind::Connect, + )), Request::Announce(announce_request) => { - handle_announce( + match handle_announce( &udp_tracker_core_container.announce_service, client_socket_addr, server_socket_addr, @@ -167,9 +177,13 @@ pub async fn handle_request( cookie_time_values.valid_range, ) .await + { + Ok(response) => Ok((response, UdpRequestKind::Announce)), + Err(err) => Err(err), + } } Request::Scrape(scrape_request) => { - handle_scrape( + match handle_scrape( &udp_tracker_core_container.scrape_service, client_socket_addr, server_socket_addr, @@ -178,6 +192,10 @@ pub async fn handle_request( cookie_time_values.valid_range, ) .await + { + Ok(response) => Ok((response, UdpRequestKind::Scrape)), + Err(err) => Err(err), + } } } } diff --git a/packages/udp-tracker-server/src/handlers/scrape.rs b/packages/udp-tracker-server/src/handlers/scrape.rs index aeca7bd12..db6b4a18b 100644 --- a/packages/udp-tracker-server/src/handlers/scrape.rs +++ b/packages/udp-tracker-server/src/handlers/scrape.rs @@ -29,7 +29,7 @@ pub async fn handle_scrape( request: &ScrapeRequest, opt_udp_server_stats_event_sender: &Arc>>, cookie_valid_range: Range, -) -> Result { +) -> Result { tracing::Span::current() .record("transaction_id", request.transaction_id.0.to_string()) .record("connection_id", request.connection_id.0.to_string()); @@ -48,7 +48,7 @@ pub async fn handle_scrape( let scrape_data = scrape_service .handle_scrape(client_socket_addr, server_socket_addr, request, cookie_valid_range) .await - .map_err(|e| (e.into(), request.transaction_id))?; + .map_err(|e| (e.into(), request.transaction_id, UdpRequestKind::Scrape))?; Ok(build_response(request, &scrape_data)) } diff --git a/packages/udp-tracker-server/src/server/processor.rs b/packages/udp-tracker-server/src/server/processor.rs index acf8e8ae3..59d21673f 100644 --- a/packages/udp-tracker-server/src/server/processor.rs +++ b/packages/udp-tracker-server/src/server/processor.rs @@ -12,7 +12,7 @@ use tracing::{instrument, Level}; use super::bound_socket::BoundSocket; use crate::container::UdpTrackerServerContainer; use crate::handlers::CookieTimeValues; -use crate::statistics::event::ConnectionContext; +use crate::statistics::event::{ConnectionContext, UdpRequestKind}; use crate::{handlers, statistics, RawRequest}; pub struct Processor { @@ -43,7 +43,7 @@ impl Processor { let start_time = Instant::now(); - let response = handlers::handle_packet( + let (response, opt_req_kind) = handlers::handle_packet( request, self.udp_tracker_core_container.clone(), self.udp_tracker_server_container.clone(), @@ -54,11 +54,18 @@ impl Processor { let elapsed_time = start_time.elapsed(); - self.send_response(client_socket_addr, response, elapsed_time).await; + self.send_response(client_socket_addr, response, opt_req_kind, elapsed_time) + .await; } #[instrument(skip(self))] - async fn send_response(self, client_socket_addr: SocketAddr, response: Response, req_processing_time: Duration) { + async fn send_response( + self, + client_socket_addr: SocketAddr, + response: Response, + opt_req_kind: Option, + req_processing_time: Duration, + ) { tracing::debug!("send response"); let response_type = match &response { @@ -79,7 +86,7 @@ impl Processor { Response::Scrape(_) => statistics::event::UdpResponseKind::Ok { req_kind: statistics::event::UdpRequestKind::Scrape, }, - Response::Error(_e) => statistics::event::UdpResponseKind::Error, + Response::Error(_e) => statistics::event::UdpResponseKind::Error { opt_req_kind: None }, }; let mut writer = Cursor::new(Vec::with_capacity(200)); diff --git a/packages/udp-tracker-server/src/statistics/event/handler.rs b/packages/udp-tracker-server/src/statistics/event/handler.rs index 03bfeae65..75441f7e4 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler.rs @@ -80,7 +80,7 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { .await; } }, - UdpResponseKind::Error => {} + UdpResponseKind::Error { opt_req_kind: _ } => {} } } Event::UdpError { context } => match context.client_socket_addr().ip() { diff --git a/packages/udp-tracker-server/src/statistics/event/mod.rs b/packages/udp-tracker-server/src/statistics/event/mod.rs index 207916846..1516d79c3 100644 --- a/packages/udp-tracker-server/src/statistics/event/mod.rs +++ b/packages/udp-tracker-server/src/statistics/event/mod.rs @@ -6,7 +6,7 @@ pub mod listener; pub mod sender; /// An statistics event. It is used to collect tracker metrics. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum Event { UdpIncomingRequest { context: ConnectionContext, @@ -31,20 +31,27 @@ pub enum Event { }, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum UdpRequestKind { Connect, Announce, Scrape, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum UdpResponseKind { - Ok { req_kind: UdpRequestKind }, - Error, // todo: add the request kind `{ req_kind: Option(UdpRequestKind) }` when we know it. + Ok { + req_kind: UdpRequestKind, + }, + + /// There was an error handling the requests. The error contains the request + /// kind if the request was parsed successfully. + Error { + opt_req_kind: Option, + }, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct ConnectionContext { client_socket_addr: SocketAddr, server_socket_addr: SocketAddr, From 9a8a0dc0e575c127c032649de9e6b9155e1cc329 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 18 Mar 2025 11:46:17 +0000 Subject: [PATCH 6/6] refactor: [#1382] rename UDP server event enum variants --- packages/udp-tracker-server/src/server/launcher.rs | 2 +- packages/udp-tracker-server/src/server/processor.rs | 2 +- .../udp-tracker-server/src/statistics/event/handler.rs | 10 +++++----- .../udp-tracker-server/src/statistics/event/mod.rs | 6 +++--- packages/udp-tracker-server/src/statistics/keeper.rs | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/udp-tracker-server/src/server/launcher.rs b/packages/udp-tracker-server/src/server/launcher.rs index a3da6a2a8..c6a105230 100644 --- a/packages/udp-tracker-server/src/server/launcher.rs +++ b/packages/udp-tracker-server/src/server/launcher.rs @@ -173,7 +173,7 @@ impl Launcher { if let Some(udp_server_stats_event_sender) = udp_tracker_server_container.udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender - .send_event(statistics::event::Event::UdpIncomingRequest { + .send_event(statistics::event::Event::UdpRequestReceived { context: ConnectionContext::new(client_socket_addr, server_socket_addr), }) .await; diff --git a/packages/udp-tracker-server/src/server/processor.rs b/packages/udp-tracker-server/src/server/processor.rs index 59d21673f..4d1e4429a 100644 --- a/packages/udp-tracker-server/src/server/processor.rs +++ b/packages/udp-tracker-server/src/server/processor.rs @@ -108,7 +108,7 @@ impl Processor { self.udp_tracker_server_container.udp_server_stats_event_sender.as_deref() { udp_server_stats_event_sender - .send_event(statistics::event::Event::UdpResponse { + .send_event(statistics::event::Event::UdpResponseSent { context: ConnectionContext::new(client_socket_addr, self.socket.address()), kind: udp_response_kind, req_processing_time, diff --git a/packages/udp-tracker-server/src/statistics/event/handler.rs b/packages/udp-tracker-server/src/statistics/event/handler.rs index 75441f7e4..6abf7d3c7 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler.rs @@ -14,7 +14,7 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { Event::UdpRequestBanned { .. } => { stats_repository.increase_udp_requests_banned().await; } - Event::UdpIncomingRequest { context } => match context.client_socket_addr().ip() { + Event::UdpRequestReceived { context } => match context.client_socket_addr().ip() { std::net::IpAddr::V4(_) => { stats_repository.increase_udp4_requests().await; } @@ -48,7 +48,7 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { } }, }, - Event::UdpResponse { + Event::UdpResponseSent { context, kind, req_processing_time, @@ -149,7 +149,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::UdpIncomingRequest { + Event::UdpRequestReceived { context: ConnectionContext::new( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), @@ -267,7 +267,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::UdpResponse { + Event::UdpResponseSent { context: ConnectionContext::new( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969), @@ -374,7 +374,7 @@ mod tests { let stats_repository = Repository::new(); handle_event( - Event::UdpResponse { + Event::UdpResponseSent { context: ConnectionContext::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969), diff --git a/packages/udp-tracker-server/src/statistics/event/mod.rs b/packages/udp-tracker-server/src/statistics/event/mod.rs index 1516d79c3..1b0be960b 100644 --- a/packages/udp-tracker-server/src/statistics/event/mod.rs +++ b/packages/udp-tracker-server/src/statistics/event/mod.rs @@ -8,7 +8,7 @@ pub mod sender; /// An statistics event. It is used to collect tracker metrics. #[derive(Debug, PartialEq, Eq, Clone)] pub enum Event { - UdpIncomingRequest { + UdpRequestReceived { context: ConnectionContext, }, UdpRequestAborted { @@ -21,7 +21,7 @@ pub enum Event { context: ConnectionContext, kind: UdpRequestKind, }, - UdpResponse { + UdpResponseSent { context: ConnectionContext, kind: UdpResponseKind, req_processing_time: Duration, @@ -44,7 +44,7 @@ pub enum UdpResponseKind { req_kind: UdpRequestKind, }, - /// There was an error handling the requests. The error contains the request + /// There was an error handling the request. The error contains the request /// kind if the request was parsed successfully. Error { opt_req_kind: Option, diff --git a/packages/udp-tracker-server/src/statistics/keeper.rs b/packages/udp-tracker-server/src/statistics/keeper.rs index c29dcb1b2..4ce832227 100644 --- a/packages/udp-tracker-server/src/statistics/keeper.rs +++ b/packages/udp-tracker-server/src/statistics/keeper.rs @@ -73,7 +73,7 @@ mod tests { let event_sender = stats_tracker.run_event_listener(); let result = event_sender - .send_event(Event::UdpIncomingRequest { + .send_event(Event::UdpRequestReceived { context: ConnectionContext::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 195)), 8080), SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969),