Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul stats events: merge UDP core events with a different IP version #1381

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion packages/udp-tracker-core/benches/helpers/sync.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand All @@ -8,13 +9,16 @@ use crate::helpers::utils::{sample_ipv4_remote_addr, sample_issue_time};

#[allow(clippy::unused_async)]
pub async fn connect_once(samples: u64) -> Duration {
let client_socket_addr = sample_ipv4_remote_addr();
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);

let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false);
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);
let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));
let start = Instant::now();

for _ in 0..samples {
let _response = connect_service.handle_connect(sample_ipv4_remote_addr(), sample_issue_time());
let _response = connect_service.handle_connect(client_socket_addr, server_socket_addr, sample_issue_time());
}

start.elapsed()
Expand Down
25 changes: 13 additions & 12 deletions packages/udp-tracker-core/src/services/announce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//!
//! It also sends an [`udp_tracker_core::statistics::event::Event`]
//! because events are specific for the HTTP tracker.
use std::net::{IpAddr, SocketAddr};
use std::net::SocketAddr;
use std::ops::Range;
use std::sync::Arc;

Expand All @@ -21,6 +21,7 @@ use torrust_tracker_primitives::core::AnnounceData;

use crate::connection_cookie::{check, gen_remote_fingerprint, ConnectionCookieError};
use crate::statistics;
use crate::statistics::event::ConnectionContext;

/// The `AnnounceService` is responsible for handling the `announce` requests.
///
Expand Down Expand Up @@ -57,17 +58,18 @@ impl AnnounceService {
/// whitelist.
pub async fn handle_announce(
&self,
remote_addr: SocketAddr,
client_socket_addr: SocketAddr,
server_socket_addr: SocketAddr,
request: &AnnounceRequest,
cookie_valid_range: Range<f64>,
) -> Result<AnnounceData, UdpAnnounceError> {
Self::authenticate(remote_addr, request, cookie_valid_range)?;
Self::authenticate(client_socket_addr, request, cookie_valid_range)?;

let info_hash = request.info_hash.into();

self.authorize(&info_hash).await?;

let remote_client_ip = remote_addr.ip();
let remote_client_ip = client_socket_addr.ip();

let mut peer = peer_builder::from_request(request, &remote_client_ip);

Expand All @@ -78,7 +80,7 @@ impl AnnounceService {
.announce(&info_hash, &mut peer, &remote_client_ip, &peers_wanted)
.await?;

self.send_stats_event(remote_client_ip).await;
self.send_stats_event(client_socket_addr, server_socket_addr).await;

Ok(announce_data)
}
Expand All @@ -99,14 +101,13 @@ impl AnnounceService {
self.whitelist_authorization.authorize(info_hash).await
}

async fn send_stats_event(&self, peer_ip: IpAddr) {
async fn send_stats_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) {
if let Some(udp_stats_event_sender) = self.opt_udp_core_stats_event_sender.as_deref() {
let event = match peer_ip {
IpAddr::V4(_) => statistics::event::Event::Udp4Announce,
IpAddr::V6(_) => statistics::event::Event::Udp6Announce,
};

udp_stats_event_sender.send_event(event).await;
udp_stats_event_sender
.send_event(statistics::event::Event::UdpAnnounce {
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
})
.await;
}
}
}
Expand Down
59 changes: 40 additions & 19 deletions packages/udp-tracker-core/src/services/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use aquatic_udp_protocol::ConnectionId;

use crate::connection_cookie::{gen_remote_fingerprint, make};
use crate::statistics;
use crate::statistics::event::ConnectionContext;

/// The `ConnectService` is responsible for handling the `connect` requests.
///
Expand All @@ -30,18 +31,21 @@ impl ConnectService {
/// # Panics
///
/// It will panic if there was an error making the connection cookie.
pub async fn handle_connect(&self, remote_addr: SocketAddr, cookie_issue_time: f64) -> ConnectionId {
let connection_id = make(gen_remote_fingerprint(&remote_addr), cookie_issue_time).expect("it should be a normal value");
pub async fn handle_connect(
&self,
client_socket_addr: SocketAddr,
server_socket_addr: SocketAddr,
cookie_issue_time: f64,
) -> ConnectionId {
let connection_id =
make(gen_remote_fingerprint(&client_socket_addr), cookie_issue_time).expect("it should be a normal value");

if let Some(udp_stats_event_sender) = self.opt_udp_core_stats_event_sender.as_deref() {
match remote_addr {
SocketAddr::V4(_) => {
udp_stats_event_sender.send_event(statistics::event::Event::Udp4Connect).await;
}
SocketAddr::V6(_) => {
udp_stats_event_sender.send_event(statistics::event::Event::Udp6Connect).await;
}
}
udp_stats_event_sender
.send_event(statistics::event::Event::UdpConnect {
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
})
.await;
}

connection_id
Expand All @@ -54,6 +58,7 @@ mod tests {
mod connect_request {

use std::future;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;

use mockall::predicate::eq;
Expand All @@ -65,16 +70,19 @@ mod tests {
sample_ipv6_remote_addr_fingerprint, sample_issue_time, MockUdpCoreStatsEventSender,
};
use crate::statistics;
use crate::statistics::event::ConnectionContext;

#[tokio::test]
async fn a_connect_response_should_contain_the_same_transaction_id_as_the_connect_request() {
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);

let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false);
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);

let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));

let response = connect_service
.handle_connect(sample_ipv4_remote_addr(), sample_issue_time())
.handle_connect(sample_ipv4_remote_addr(), server_socket_addr, sample_issue_time())
.await;

assert_eq!(
Expand All @@ -85,13 +93,15 @@ mod tests {

#[tokio::test]
async fn a_connect_response_should_contain_a_new_connection_id() {
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);

let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false);
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);

let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));

let response = connect_service
.handle_connect(sample_ipv4_remote_addr(), sample_issue_time())
.handle_connect(sample_ipv4_remote_addr(), server_socket_addr, sample_issue_time())
.await;

assert_eq!(
Expand All @@ -102,13 +112,16 @@ mod tests {

#[tokio::test]
async fn a_connect_response_should_contain_a_new_connection_id_ipv6() {
let client_socket_addr = sample_ipv6_remote_addr();
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);

let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false);
let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender);

let connect_service = Arc::new(ConnectService::new(udp_core_stats_event_sender));

let response = connect_service
.handle_connect(sample_ipv6_remote_addr(), sample_issue_time())
.handle_connect(client_socket_addr, server_socket_addr, sample_issue_time())
.await;

assert_eq!(
Expand All @@ -119,30 +132,38 @@ mod tests {

#[tokio::test]
async fn it_should_send_the_upd4_connect_event_when_a_client_tries_to_connect_using_a_ip4_socket_address() {
let client_socket_addr = sample_ipv4_socket_address();
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);

let mut udp_stats_event_sender_mock = MockUdpCoreStatsEventSender::new();
udp_stats_event_sender_mock
.expect_send_event()
.with(eq(statistics::event::Event::Udp4Connect))
.with(eq(statistics::event::Event::UdpConnect {
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
}))
.times(1)
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
let opt_udp_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
Arc::new(Some(Box::new(udp_stats_event_sender_mock)));

let client_socket_address = sample_ipv4_socket_address();

let connect_service = Arc::new(ConnectService::new(opt_udp_stats_event_sender));

connect_service
.handle_connect(client_socket_address, sample_issue_time())
.handle_connect(client_socket_addr, server_socket_addr, sample_issue_time())
.await;
}

#[tokio::test]
async fn it_should_send_the_upd6_connect_event_when_a_client_tries_to_connect_using_a_ip6_socket_address() {
let client_socket_addr = sample_ipv6_remote_addr();
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);

let mut udp_stats_event_sender_mock = MockUdpCoreStatsEventSender::new();
udp_stats_event_sender_mock
.expect_send_event()
.with(eq(statistics::event::Event::Udp6Connect))
.with(eq(statistics::event::Event::UdpConnect {
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
}))
.times(1)
.returning(|_| Box::pin(future::ready(Some(Ok(())))));
let opt_udp_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
Expand All @@ -151,7 +172,7 @@ mod tests {
let connect_service = Arc::new(ConnectService::new(opt_udp_stats_event_sender));

connect_service
.handle_connect(sample_ipv6_remote_addr(), sample_issue_time())
.handle_connect(client_socket_addr, server_socket_addr, sample_issue_time())
.await;
}
}
Expand Down
20 changes: 11 additions & 9 deletions packages/udp-tracker-core/src/services/scrape.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use torrust_tracker_primitives::core::ScrapeData;

use crate::connection_cookie::{check, gen_remote_fingerprint, ConnectionCookieError};
use crate::statistics;
use crate::statistics::event::ConnectionContext;

/// The `ScrapeService` is responsible for handling the `scrape` requests.
///
Expand Down Expand Up @@ -49,18 +50,19 @@ impl ScrapeService {
/// It will return an error if the tracker core scrape handler returns an error.
pub async fn handle_scrape(
&self,
remote_client_addr: SocketAddr,
client_socket_addr: SocketAddr,
server_socket_addr: SocketAddr,
request: &ScrapeRequest,
cookie_valid_range: Range<f64>,
) -> Result<ScrapeData, UdpScrapeError> {
Self::authenticate(remote_client_addr, request, cookie_valid_range)?;
Self::authenticate(client_socket_addr, request, cookie_valid_range)?;

let scrape_data = self
.scrape_handler
.scrape(&Self::convert_from_aquatic(&request.info_hashes))
.await?;

self.send_stats_event(remote_client_addr).await;
self.send_stats_event(client_socket_addr, server_socket_addr).await;

Ok(scrape_data)
}
Expand All @@ -81,13 +83,13 @@ impl ScrapeService {
aquatic_infohashes.iter().map(|&x| x.into()).collect()
}

async fn send_stats_event(&self, remote_addr: SocketAddr) {
async fn send_stats_event(&self, client_socket_addr: SocketAddr, server_socket_addr: SocketAddr) {
if let Some(udp_stats_event_sender) = self.opt_udp_stats_event_sender.as_deref() {
let event = match remote_addr {
SocketAddr::V4(_) => statistics::event::Event::Udp4Scrape,
SocketAddr::V6(_) => statistics::event::Event::Udp6Scrape,
};
udp_stats_event_sender.send_event(event).await;
udp_stats_event_sender
.send_event(statistics::event::Event::UdpScrape {
context: ConnectionContext::new(client_socket_addr, server_socket_addr),
})
.await;
}
}
}
Expand Down
Loading