Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

udp: processor for requests #955

Merged
merged 1 commit into from
Jul 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/servers/udp/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::shared::bit_torrent::common::MAX_SCRAPE_TORRENTS;
/// - Delegating the request to the correct handler depending on the request type.
///
/// It will return an `Error` response if the request is invalid.
pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Arc<Tracker>, addr: SocketAddr) -> Response {
pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Tracker, local_addr: SocketAddr) -> Response {
debug!("Handling Packets: {udp_request:?}");

let start_time = Instant::now();
Expand All @@ -47,7 +47,7 @@ pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Arc<Tracker
}
}) {
Ok(request) => {
log_request(&request, &request_id, &addr);
log_request(&request, &request_id, &local_addr);

let transaction_id = match &request {
Request::Connect(connect_request) => connect_request.transaction_id,
Expand All @@ -62,7 +62,7 @@ pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Arc<Tracker

let latency = start_time.elapsed();

log_response(&response, &transaction_id, &request_id, &addr, latency);
log_response(&response, &transaction_id, &request_id, &local_addr, latency);

response
}
Expand Down
7 changes: 2 additions & 5 deletions src/servers/udp/server/bound_socket.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::fmt::Debug;
use std::net::SocketAddr;
use std::ops::Deref;
use std::sync::Arc;

use url::Url;

use crate::servers::udp::UDP_TRACKER_LOG_TARGET;

/// Wrapper for Tokio [`UdpSocket`][`tokio::net::UdpSocket`] that is bound to a particular socket.
pub struct BoundSocket {
socket: Arc<tokio::net::UdpSocket>,
socket: tokio::net::UdpSocket,
}

impl BoundSocket {
Expand All @@ -30,9 +29,7 @@ impl BoundSocket {
let local_addr = format!("udp://{addr}");
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "UdpSocket::new (bound)");

Ok(Self {
socket: Arc::new(socket),
})
Ok(Self { socket })
}

/// # Panics
Expand Down
65 changes: 5 additions & 60 deletions src/servers/udp/server/launcher.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
use std::io::Cursor;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use aquatic_udp_protocol::Response;
use derive_more::Constructor;
use futures_util::StreamExt;
use tokio::select;
use tokio::sync::oneshot;

use super::request_buffer::ActiveRequests;
use super::RawRequest;
use crate::bootstrap::jobs::Started;
use crate::core::Tracker;
use crate::servers::logging::STARTED_ON;
use crate::servers::registar::ServiceHealthCheckJob;
use crate::servers::signals::{shutdown_signal_with_message, Halted};
use crate::servers::udp::server::bound_socket::BoundSocket;
use crate::servers::udp::server::processor::Processor;
use crate::servers::udp::server::receiver::Receiver;
use crate::servers::udp::{handlers, UDP_TRACKER_LOG_TARGET};
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
use crate::shared::bit_torrent::tracker::udp::client::check;
use crate::shared::bit_torrent::tracker::udp::MAX_PACKET_SIZE;

/// A UDP server instance launcher.
#[derive(Constructor)]
Expand Down Expand Up @@ -109,6 +106,8 @@ impl Launcher {
let local_addr = format!("udp://{addr}");

loop {
let processor = Processor::new(receiver.socket.clone(), tracker.clone());

if let Some(req) = {
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server (wait for request)");
receiver.next().await
Expand Down Expand Up @@ -138,9 +137,7 @@ impl Launcher {
// are only adding and removing tasks without given them the
// chance to finish. However, the buffer is yielding before
// aborting one tasks, giving it the chance to finish.
let abort_handle: tokio::task::AbortHandle =
tokio::task::spawn(Launcher::process_request(req, tracker.clone(), receiver.bound_socket.clone()))
.abort_handle();
let abort_handle: tokio::task::AbortHandle = tokio::task::spawn(processor.process_request(req)).abort_handle();

if abort_handle.is_finished() {
continue;
Expand All @@ -156,56 +153,4 @@ impl Launcher {
}
}
}

async fn process_request(request: RawRequest, tracker: Arc<Tracker>, socket: Arc<BoundSocket>) {
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, request = %request.from, "Udp::process_request (receiving)");
Self::process_valid_request(tracker, socket, request).await;
}

async fn process_valid_request(tracker: Arc<Tracker>, socket: Arc<BoundSocket>, udp_request: RawRequest) {
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, "Udp::process_valid_request. Making Response to {udp_request:?}");
let from = udp_request.from;
let response = handlers::handle_packet(udp_request, &tracker.clone(), socket.address()).await;
Self::send_response(&socket.clone(), from, response).await;
}

async fn send_response(bound_socket: &Arc<BoundSocket>, to: SocketAddr, response: Response) {
let response_type = match &response {
Response::Connect(_) => "Connect".to_string(),
Response::AnnounceIpv4(_) => "AnnounceIpv4".to_string(),
Response::AnnounceIpv6(_) => "AnnounceIpv6".to_string(),
Response::Scrape(_) => "Scrape".to_string(),
Response::Error(e) => format!("Error: {e:?}"),
};

tracing::debug!(target: UDP_TRACKER_LOG_TARGET, target = ?to, response_type, "Udp::send_response (sending)");

let buffer = vec![0u8; MAX_PACKET_SIZE];
let mut cursor = Cursor::new(buffer);

match response.write_bytes(&mut cursor) {
Ok(()) => {
#[allow(clippy::cast_possible_truncation)]
let position = cursor.position() as usize;
let inner = cursor.get_ref();

tracing::debug!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sending...)" );
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), payload = ?&inner[..position], "Udp::send_response (sending...)");

Self::send_packet(bound_socket, &to, &inner[..position]).await;

tracing::trace!(target:UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sent)");
}
Err(e) => {
tracing::error!(target: UDP_TRACKER_LOG_TARGET, ?to, response_type, err = %e, "Udp::send_response (error)");
}
}
}

async fn send_packet(bound_socket: &Arc<BoundSocket>, remote_addr: &SocketAddr, payload: &[u8]) {
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, to = %remote_addr, ?payload, "Udp::send_response (sending)");

// doesn't matter if it reaches or not
drop(bound_socket.send_to(payload, remote_addr).await);
}
}
1 change: 1 addition & 0 deletions src/servers/udp/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::RawRequest;

pub mod bound_socket;
pub mod launcher;
pub mod processor;
pub mod receiver;
pub mod request_buffer;
pub mod spawner;
Expand Down
66 changes: 66 additions & 0 deletions src/servers/udp/server/processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::io::Cursor;
use std::net::SocketAddr;
use std::sync::Arc;

use aquatic_udp_protocol::Response;

use super::bound_socket::BoundSocket;
use crate::core::Tracker;
use crate::servers::udp::{handlers, RawRequest, UDP_TRACKER_LOG_TARGET};

pub struct Processor {
socket: Arc<BoundSocket>,
tracker: Arc<Tracker>,
}

impl Processor {
pub fn new(socket: Arc<BoundSocket>, tracker: Arc<Tracker>) -> Self {
Self { socket, tracker }
}

pub async fn process_request(self, request: RawRequest) {
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, request = %request.from, "Udp::process_request (receiving)");

let from = request.from;
let response = handlers::handle_packet(request, &self.tracker, self.socket.address()).await;
self.send_response(from, response).await;
}

async fn send_response(self, to: SocketAddr, response: Response) {
let response_type = match &response {
Response::Connect(_) => "Connect".to_string(),
Response::AnnounceIpv4(_) => "AnnounceIpv4".to_string(),
Response::AnnounceIpv6(_) => "AnnounceIpv6".to_string(),

Check warning on line 33 in src/servers/udp/server/processor.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/server/processor.rs#L33

Added line #L33 was not covered by tests
Response::Scrape(_) => "Scrape".to_string(),
Response::Error(e) => format!("Error: {e:?}"),
};

tracing::debug!(target: UDP_TRACKER_LOG_TARGET, target = ?to, response_type, "Udp::send_response (sending)");

let mut writer = Cursor::new(Vec::with_capacity(200));

match response.write_bytes(&mut writer) {
Ok(()) => {
let bytes_count = writer.get_ref().len();
let payload = writer.get_ref();

tracing::debug!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count, "Udp::send_response (sending...)" );
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count, ?payload, "Udp::send_response (sending...)");

self.send_packet(&to, payload).await;

tracing::trace!(target:UDP_TRACKER_LOG_TARGET, ?to, bytes_count, "Udp::send_response (sent)");
}
Err(e) => {
tracing::error!(target: UDP_TRACKER_LOG_TARGET, ?to, response_type, err = %e, "Udp::send_response (error)");
}

Check warning on line 56 in src/servers/udp/server/processor.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/server/processor.rs#L54-L56

Added lines #L54 - L56 were not covered by tests
}
}

async fn send_packet(&self, remote_addr: &SocketAddr, payload: &[u8]) {
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, to = %remote_addr, ?payload, "Udp::send_response (sending)");

// doesn't matter if it reaches or not
drop(self.socket.send_to(payload, remote_addr).await);
}
}
8 changes: 4 additions & 4 deletions src/servers/udp/server/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ use super::RawRequest;
use crate::shared::bit_torrent::tracker::udp::MAX_PACKET_SIZE;

pub struct Receiver {
pub bound_socket: Arc<BoundSocket>,
pub socket: Arc<BoundSocket>,
data: RefCell<[u8; MAX_PACKET_SIZE]>,
}

impl Receiver {
#[must_use]
pub fn new(bound_socket: Arc<BoundSocket>) -> Self {
Receiver {
bound_socket,
socket: bound_socket,
data: RefCell::new([0; MAX_PACKET_SIZE]),
}
}

pub fn bound_socket_address(&self) -> SocketAddr {
self.bound_socket.address()
self.socket.address()
}
}

Expand All @@ -36,7 +36,7 @@ impl Stream for Receiver {
let mut buf = *self.data.borrow_mut();
let mut buf = tokio::io::ReadBuf::new(&mut buf);

let Poll::Ready(ready) = self.bound_socket.poll_recv_from(cx, &mut buf) else {
let Poll::Ready(ready) = self.socket.poll_recv_from(cx, &mut buf) else {
return Poll::Pending;
};

Expand Down
12 changes: 0 additions & 12 deletions src/shared/bit_torrent/common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! `BitTorrent` protocol primitive types
//!
//! [BEP 3. The `BitTorrent` Protocol Specification](https://www.bittorrent.org/beps/bep_0003.html)
use serde::{Deserialize, Serialize};

/// The maximum number of torrents that can be returned in an `scrape` response.
///
Expand All @@ -21,14 +20,3 @@ pub const MAX_SCRAPE_TORRENTS: u8 = 74;
/// See function to [`generate`](crate::core::auth::generate) the
/// [`ExpiringKeys`](crate::core::auth::ExpiringKey) for more information.
pub const AUTH_KEY_LENGTH: usize = 32;

#[repr(u32)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
enum Actions {
// todo: it seems this enum is not used anywhere. Values match the ones in
// aquatic_udp_protocol::request::Request::from_bytes.
Connect = 0,
Announce = 1,
Scrape = 2,
Error = 3,
}
Loading