Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul stats events: merge UDP server events with a different IP version #1383

2 changes: 2 additions & 0 deletions packages/udp-tracker-core/src/statistics/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ impl ConnectionContext {
}
}

#[must_use]
pub fn client_socket_addr(&self) -> SocketAddr {
self.client_socket_addr
}

#[must_use]
pub fn server_socket_addr(&self) -> SocketAddr {
self.server_socket_addr
}
Expand Down
64 changes: 28 additions & 36 deletions packages/udp-tracker-server/src/handlers/announce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use zerocopy::network_endian::I32;

use crate::error::Error;
use crate::statistics as server_statistics;
use crate::statistics::event::UdpResponseKind;
use crate::statistics::event::{ConnectionContext, UdpRequestKind};

/// It handles the `Announce` request.
///
Expand All @@ -32,7 +32,7 @@ pub async fn handle_announce(
core_config: &Arc<Core>,
opt_udp_server_stats_event_sender: &Arc<Option<Box<dyn server_statistics::event::sender::Sender>>>,
cookie_valid_range: Range<f64>,
) -> Result<Response, (Error, TransactionId)> {
) -> Result<Response, (Error, TransactionId, UdpRequestKind)> {
tracing::Span::current()
.record("transaction_id", request.transaction_id.0.to_string())
.record("connection_id", request.connection_id.0.to_string())
Expand All @@ -41,28 +41,18 @@ pub async fn handle_announce(
tracing::trace!("handle announce");

if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() {
match client_socket_addr.ip() {
IpAddr::V4(_) => {
udp_server_stats_event_sender
.send_event(server_statistics::event::Event::Udp4Request {
kind: UdpResponseKind::Announce,
})
.await;
}
IpAddr::V6(_) => {
udp_server_stats_event_sender
.send_event(server_statistics::event::Event::Udp6Request {
kind: UdpResponseKind::Announce,
})
.await;
}
}
udp_server_stats_event_sender
.send_event(server_statistics::event::Event::UdpRequestAccepted {
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
kind: UdpRequestKind::Announce,
})
.await;
}

let announce_data = announce_service
.handle_announce(client_socket_addr, server_socket_addr, request, cookie_valid_range)
.await
.map_err(|e| (e.into(), request.transaction_id))?;
.map_err(|e| (e.into(), request.transaction_id, UdpRequestKind::Announce))?;

Ok(build_response(client_socket_addr, request, core_config, &announce_data))
}
Expand Down Expand Up @@ -226,7 +216,7 @@ mod tests {
TorrentPeerBuilder,
};
use crate::statistics as server_statistics;
use crate::statistics::event::UdpResponseKind;
use crate::statistics::event::UdpRequestKind;

#[tokio::test]
async fn an_announced_peer_should_be_added_to_the_tracker() {
Expand Down Expand Up @@ -429,11 +419,15 @@ mod tests {

#[tokio::test]
async fn should_send_the_upd4_announce_event() {
let client_socket_addr = sample_ipv4_socket_address();
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);

let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new();
udp_server_stats_event_sender_mock
.expect_send_event()
.with(eq(server_statistics::event::Event::Udp4Request {
kind: UdpResponseKind::Announce,
.with(eq(server_statistics::event::Event::UdpRequestAccepted {
context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr),
kind: UdpRequestKind::Announce,
}))
.times(1)
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
Expand All @@ -443,9 +437,6 @@ mod tests {
let (core_tracker_services, core_udp_tracker_services, _server_udp_tracker_services) =
initialize_core_tracker_services_for_default_tracker_configuration();

let client_socket_addr = sample_ipv4_socket_address();
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);

handle_announce(
&core_udp_tracker_services.announce_service,
client_socket_addr,
Expand Down Expand Up @@ -549,7 +540,7 @@ mod tests {
sample_issue_time, MockUdpServerStatsEventSender, TorrentPeerBuilder,
};
use crate::statistics as server_statistics;
use crate::statistics::event::UdpResponseKind;
use crate::statistics::event::UdpRequestKind;

#[tokio::test]
async fn an_announced_peer_should_be_added_to_the_tracker() {
Expand Down Expand Up @@ -771,11 +762,15 @@ mod tests {

#[tokio::test]
async fn should_send_the_upd6_announce_event() {
let client_socket_addr = sample_ipv6_remote_addr();
let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969);

let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new();
udp_server_stats_event_sender_mock
.expect_send_event()
.with(eq(server_statistics::event::Event::Udp6Request {
kind: UdpResponseKind::Announce,
.with(eq(server_statistics::event::Event::UdpRequestAccepted {
context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr),
kind: UdpRequestKind::Announce,
}))
.times(1)
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
Expand All @@ -785,9 +780,6 @@ mod tests {
let (core_tracker_services, core_udp_tracker_services, _server_udp_tracker_services) =
initialize_core_tracker_services_for_default_tracker_configuration();

let client_socket_addr = sample_ipv6_remote_addr();
let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969);

let announce_request = AnnounceRequestBuilder::default()
.with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap())
.into();
Expand Down Expand Up @@ -819,7 +811,6 @@ mod tests {
use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist;
use bittorrent_udp_tracker_core::connection_cookie::{gen_remote_fingerprint, make};
use bittorrent_udp_tracker_core::services::announce::AnnounceService;
use bittorrent_udp_tracker_core::statistics::event::ConnectionContext;
use bittorrent_udp_tracker_core::{self, statistics as core_statistics};
use mockall::predicate::eq;

Expand All @@ -830,7 +821,7 @@ mod tests {
TrackerConfigurationBuilder,
};
use crate::statistics as server_statistics;
use crate::statistics::event::UdpResponseKind;
use crate::statistics::event::UdpRequestKind;

#[tokio::test]
async fn the_peer_ip_should_be_changed_to_the_external_ip_in_the_tracker_configuration() {
Expand Down Expand Up @@ -860,7 +851,7 @@ mod tests {
udp_core_stats_event_sender_mock
.expect_send_event()
.with(eq(core_statistics::event::Event::UdpAnnounce {
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
context: core_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr),
}))
.times(1)
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
Expand All @@ -870,8 +861,9 @@ mod tests {
let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new();
udp_server_stats_event_sender_mock
.expect_send_event()
.with(eq(server_statistics::event::Event::Udp6Request {
kind: UdpResponseKind::Announce,
.with(eq(server_statistics::event::Event::UdpRequestAccepted {
context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr),
kind: UdpRequestKind::Announce,
}))
.times(1)
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
Expand Down
47 changes: 19 additions & 28 deletions packages/udp-tracker-server/src/handlers/connect.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
//! UDP tracker connect handler.
use std::net::{IpAddr, SocketAddr};
use std::net::SocketAddr;
use std::sync::Arc;

use aquatic_udp_protocol::{ConnectRequest, ConnectResponse, ConnectionId, Response};
use bittorrent_udp_tracker_core::services::connect::ConnectService;
use tracing::{instrument, Level};

use crate::statistics as server_statistics;
use crate::statistics::event::UdpResponseKind;
use crate::statistics::event::{ConnectionContext, UdpRequestKind};

/// It handles the `Connect` request.
#[instrument(fields(transaction_id), skip(connect_service, opt_udp_server_stats_event_sender), ret(level = Level::TRACE))]
pub async fn handle_connect(
remote_addr: SocketAddr,
server_addr: SocketAddr,
client_socket_addr: SocketAddr,
server_socket_addr: SocketAddr,
request: &ConnectRequest,
connect_service: &Arc<ConnectService>,
opt_udp_server_stats_event_sender: &Arc<Option<Box<dyn server_statistics::event::sender::Sender>>>,
Expand All @@ -23,26 +23,16 @@ pub async fn handle_connect(
tracing::trace!("handle connect");

if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() {
match remote_addr.ip() {
IpAddr::V4(_) => {
udp_server_stats_event_sender
.send_event(server_statistics::event::Event::Udp4Request {
kind: UdpResponseKind::Connect,
})
.await;
}
IpAddr::V6(_) => {
udp_server_stats_event_sender
.send_event(server_statistics::event::Event::Udp6Request {
kind: UdpResponseKind::Connect,
})
.await;
}
}
udp_server_stats_event_sender
.send_event(server_statistics::event::Event::UdpRequestAccepted {
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
kind: UdpRequestKind::Connect,
})
.await;
}

let connection_id = connect_service
.handle_connect(remote_addr, server_addr, cookie_issue_time)
.handle_connect(client_socket_addr, server_socket_addr, cookie_issue_time)
.await;

build_response(*request, connection_id)
Expand Down Expand Up @@ -70,7 +60,6 @@ mod tests {
use bittorrent_udp_tracker_core::connection_cookie::make;
use bittorrent_udp_tracker_core::services::connect::ConnectService;
use bittorrent_udp_tracker_core::statistics as core_statistics;
use bittorrent_udp_tracker_core::statistics::event::ConnectionContext;
use mockall::predicate::eq;

use crate::handlers::handle_connect;
Expand All @@ -79,7 +68,7 @@ mod tests {
sample_ipv6_remote_addr_fingerprint, sample_issue_time, MockUdpCoreStatsEventSender, MockUdpServerStatsEventSender,
};
use crate::statistics as server_statistics;
use crate::statistics::event::UdpResponseKind;
use crate::statistics::event::UdpRequestKind;

fn sample_connect_request() -> ConnectRequest {
ConnectRequest {
Expand Down Expand Up @@ -214,8 +203,9 @@ mod tests {
let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new();
udp_server_stats_event_sender_mock
.expect_send_event()
.with(eq(server_statistics::event::Event::Udp4Request {
kind: UdpResponseKind::Connect,
.with(eq(server_statistics::event::Event::UdpRequestAccepted {
context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr),
kind: UdpRequestKind::Connect,
}))
.times(1)
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
Expand Down Expand Up @@ -244,7 +234,7 @@ mod tests {
udp_core_stats_event_sender_mock
.expect_send_event()
.with(eq(core_statistics::event::Event::UdpConnect {
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
context: core_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr),
}))
.times(1)
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
Expand All @@ -254,8 +244,9 @@ mod tests {
let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new();
udp_server_stats_event_sender_mock
.expect_send_event()
.with(eq(server_statistics::event::Event::Udp6Request {
kind: UdpResponseKind::Connect,
.with(eq(server_statistics::event::Event::UdpRequestAccepted {
context: server_statistics::event::ConnectionContext::new(client_socket_addr, server_socket_addr),
kind: UdpRequestKind::Connect,
}))
.times(1)
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
Expand Down
29 changes: 12 additions & 17 deletions packages/udp-tracker-server/src/handlers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ use zerocopy::network_endian::I32;

use crate::error::Error;
use crate::statistics as server_statistics;
use crate::statistics::event::{ConnectionContext, UdpRequestKind};

#[allow(clippy::too_many_arguments)]
#[instrument(fields(transaction_id), skip(opt_udp_server_stats_event_sender), ret(level = Level::TRACE))]
pub async fn handle_error(
remote_addr: SocketAddr,
local_addr: SocketAddr,
req_kind: Option<UdpRequestKind>,
client_socket_addr: SocketAddr,
server_socket_addr: SocketAddr,
request_id: Uuid,
opt_udp_server_stats_event_sender: &Arc<Option<Box<dyn server_statistics::event::sender::Sender>>>,
cookie_valid_range: Range<f64>,
Expand All @@ -29,10 +31,10 @@ pub async fn handle_error(
match transaction_id {
Some(transaction_id) => {
let transaction_id = transaction_id.0.to_string();
tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %e, %remote_addr, %local_addr, %request_id, %transaction_id, "response error");
tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %e, %client_socket_addr, %server_socket_addr, %request_id, %transaction_id, "response error");
}
None => {
tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %e, %remote_addr, %local_addr, %request_id, "response error");
tracing::error!(target: UDP_TRACKER_LOG_TARGET, error = %e, %client_socket_addr, %server_socket_addr, %request_id, "response error");
}
}

Expand All @@ -43,7 +45,7 @@ pub async fn handle_error(
transaction_id,
err,
} => {
if let Err(e) = check(connection_id, gen_remote_fingerprint(&remote_addr), cookie_valid_range) {
if let Err(e) = check(connection_id, gen_remote_fingerprint(&client_socket_addr), cookie_valid_range) {
(e.to_string(), Some(*transaction_id))
} else {
((*err).to_string(), Some(*transaction_id))
Expand All @@ -57,18 +59,11 @@ pub async fn handle_error(

if e.1.is_some() {
if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() {
match remote_addr {
SocketAddr::V4(_) => {
udp_server_stats_event_sender
.send_event(server_statistics::event::Event::Udp4Error)
.await;
}
SocketAddr::V6(_) => {
udp_server_stats_event_sender
.send_event(server_statistics::event::Event::Udp6Error)
.await;
}
}
udp_server_stats_event_sender
.send_event(server_statistics::event::Event::UdpError {
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
})
.await;
}
}

Expand Down
Loading