Skip to content

Commit 89bb735

Browse files
committed
refactor: reorganize UDP server mod
1 parent c121bf2 commit 89bb735

File tree

12 files changed

+621
-546
lines changed

12 files changed

+621
-546
lines changed

src/bootstrap/jobs/udp_tracker.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ use tracing::debug;
1414

1515
use crate::core;
1616
use crate::servers::registar::ServiceRegistrationForm;
17-
use crate::servers::udp::server::{Spawner, UdpServer};
17+
use crate::servers::udp::server::spawner::Spawner;
18+
use crate::servers::udp::server::Server;
1819
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
1920

2021
/// It starts a new UDP server with the provided configuration.
@@ -30,7 +31,7 @@ use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
3031
pub async fn start_job(config: &UdpTracker, tracker: Arc<core::Tracker>, form: ServiceRegistrationForm) -> JoinHandle<()> {
3132
let bind_to = config.bind_address;
3233

33-
let server = UdpServer::new(Spawner::new(bind_to))
34+
let server = Server::new(Spawner::new(bind_to))
3435
.start(tracker, form)
3536
.await
3637
.expect("it should be able to start the udp tracker");

src/servers/udp/handlers.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use uuid::Uuid;
1717
use zerocopy::network_endian::I32;
1818

1919
use super::connection_cookie::{check, from_connection_id, into_connection_id, make};
20-
use super::UdpRequest;
20+
use super::RawRequest;
2121
use crate::core::{statistics, ScrapeData, Tracker};
2222
use crate::servers::udp::error::Error;
2323
use crate::servers::udp::logging::{log_bad_request, log_error_response, log_request, log_response};
@@ -33,7 +33,7 @@ use crate::shared::bit_torrent::common::MAX_SCRAPE_TORRENTS;
3333
/// - Delegating the request to the correct handler depending on the request type.
3434
///
3535
/// It will return an `Error` response if the request is invalid.
36-
pub(crate) async fn handle_packet(udp_request: UdpRequest, tracker: &Arc<Tracker>, addr: SocketAddr) -> Response {
36+
pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Arc<Tracker>, addr: SocketAddr) -> Response {
3737
debug!("Handling Packets: {udp_request:?}");
3838

3939
let start_time = Instant::now();
@@ -304,7 +304,7 @@ fn handle_error(e: &Error, transaction_id: TransactionId) -> Response {
304304
pub struct RequestId(Uuid);
305305

306306
impl RequestId {
307-
fn make(_request: &UdpRequest) -> RequestId {
307+
fn make(_request: &RawRequest) -> RequestId {
308308
RequestId(Uuid::new_v4())
309309
}
310310
}

src/servers/udp/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,7 @@ pub type Port = u16;
660660
pub type TransactionId = i64;
661661

662662
#[derive(Clone, Debug)]
663-
pub(crate) struct UdpRequest {
663+
pub struct RawRequest {
664664
payload: Vec<u8>,
665665
from: SocketAddr,
666666
}
+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use std::fmt::Debug;
2+
use std::net::SocketAddr;
3+
use std::ops::Deref;
4+
use std::sync::Arc;
5+
6+
use url::Url;
7+
8+
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
9+
10+
/// Wrapper for Tokio [`UdpSocket`][`tokio::net::UdpSocket`] that is bound to a particular socket.
11+
pub struct BoundSocket {
12+
socket: Arc<tokio::net::UdpSocket>,
13+
}
14+
15+
impl BoundSocket {
16+
/// # Errors
17+
///
18+
/// Will return an error if the socket can't be bound the the provided address.
19+
pub async fn new(addr: SocketAddr) -> Result<Self, Box<std::io::Error>> {
20+
let bind_addr = format!("udp://{addr}");
21+
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, bind_addr, "UdpSocket::new (binding)");
22+
23+
let socket = tokio::net::UdpSocket::bind(addr).await;
24+
25+
let socket = match socket {
26+
Ok(socket) => socket,
27+
Err(e) => Err(e)?,
28+
};
29+
30+
let local_addr = format!("udp://{addr}");
31+
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "UdpSocket::new (bound)");
32+
33+
Ok(Self {
34+
socket: Arc::new(socket),
35+
})
36+
}
37+
38+
/// # Panics
39+
///
40+
/// Will panic if the socket can't get the address it was bound to.
41+
#[must_use]
42+
pub fn address(&self) -> SocketAddr {
43+
self.socket.local_addr().expect("it should get local address")
44+
}
45+
46+
/// # Panics
47+
///
48+
/// Will panic if the address the socket was bound to is not a valid address
49+
/// to be used in a URL.
50+
#[must_use]
51+
pub fn url(&self) -> Url {
52+
Url::parse(&format!("udp://{}", self.address())).expect("UDP socket address should be valid")
53+
}
54+
}
55+
56+
impl Deref for BoundSocket {
57+
type Target = tokio::net::UdpSocket;
58+
59+
fn deref(&self) -> &Self::Target {
60+
&self.socket
61+
}
62+
}
63+
64+
impl Debug for BoundSocket {
65+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66+
let local_addr = match self.socket.local_addr() {
67+
Ok(socket) => format!("Receiving From: {socket}"),
68+
Err(err) => format!("Socket Broken: {err}"),
69+
};
70+
71+
f.debug_struct("UdpSocket").field("addr", &local_addr).finish_non_exhaustive()
72+
}
73+
}

src/servers/udp/server/launcher.rs

+219
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
use std::io::Cursor;
2+
use std::net::SocketAddr;
3+
use std::sync::Arc;
4+
use std::time::Duration;
5+
6+
use aquatic_udp_protocol::Response;
7+
use derive_more::Constructor;
8+
use futures_util::StreamExt;
9+
use tokio::select;
10+
use tokio::sync::oneshot;
11+
12+
use super::request_buffer::ActiveRequests;
13+
use super::RawRequest;
14+
use crate::bootstrap::jobs::Started;
15+
use crate::core::Tracker;
16+
use crate::servers::logging::STARTED_ON;
17+
use crate::servers::registar::ServiceHealthCheckJob;
18+
use crate::servers::signals::{shutdown_signal_with_message, Halted};
19+
use crate::servers::udp::server::bound_socket::BoundSocket;
20+
use crate::servers::udp::server::receiver::Receiver;
21+
use crate::servers::udp::{handlers, UDP_TRACKER_LOG_TARGET};
22+
use crate::shared::bit_torrent::tracker::udp::client::check;
23+
use crate::shared::bit_torrent::tracker::udp::MAX_PACKET_SIZE;
24+
25+
/// A UDP server instance launcher.
26+
#[derive(Constructor)]
27+
pub struct Launcher;
28+
29+
impl Launcher {
30+
/// It starts the UDP server instance with graceful shutdown.
31+
///
32+
/// # Panics
33+
///
34+
/// It panics if unable to bind to udp socket, and get the address from the udp socket.
35+
/// It also panics if unable to send address of socket.
36+
pub async fn run_with_graceful_shutdown(
37+
tracker: Arc<Tracker>,
38+
bind_to: SocketAddr,
39+
tx_start: oneshot::Sender<Started>,
40+
rx_halt: oneshot::Receiver<Halted>,
41+
) {
42+
let halt_task = tokio::task::spawn(shutdown_signal_with_message(
43+
rx_halt,
44+
format!("Halting UDP Service Bound to Socket: {bind_to}"),
45+
));
46+
47+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting on: {bind_to}");
48+
49+
let socket = tokio::time::timeout(Duration::from_millis(5000), BoundSocket::new(bind_to))
50+
.await
51+
.expect("it should bind to the socket within five seconds");
52+
53+
let bound_socket = match socket {
54+
Ok(socket) => socket,
55+
Err(e) => {
56+
tracing::error!(target: UDP_TRACKER_LOG_TARGET, addr = %bind_to, err = %e, "Udp::run_with_graceful_shutdown panic! (error when building socket)" );
57+
panic!("could not bind to socket!");
58+
}
59+
};
60+
61+
let address = bound_socket.address();
62+
let local_udp_url = bound_socket.url().to_string();
63+
64+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "{STARTED_ON}: {local_udp_url}");
65+
66+
let receiver = Receiver::new(bound_socket.into());
67+
68+
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (spawning main loop)");
69+
70+
let running = {
71+
let local_addr = local_udp_url.clone();
72+
tokio::task::spawn(async move {
73+
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_with_graceful_shutdown::task (listening...)");
74+
let () = Self::run_udp_server_main(receiver, tracker.clone()).await;
75+
})
76+
};
77+
78+
tx_start
79+
.send(Started { address })
80+
.expect("the UDP Tracker service should not be dropped");
81+
82+
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (started)");
83+
84+
let stop = running.abort_handle();
85+
86+
select! {
87+
_ = running => { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (stopped)"); },
88+
_ = halt_task => { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (halting)"); }
89+
}
90+
stop.abort();
91+
92+
tokio::task::yield_now().await; // lets allow the other threads to complete.
93+
}
94+
95+
#[must_use]
96+
pub fn check(binding: &SocketAddr) -> ServiceHealthCheckJob {
97+
let binding = *binding;
98+
let info = format!("checking the udp tracker health check at: {binding}");
99+
100+
let job = tokio::spawn(async move { check(&binding).await });
101+
102+
ServiceHealthCheckJob::new(binding, info, job)
103+
}
104+
105+
async fn run_udp_server_main(mut receiver: Receiver, tracker: Arc<Tracker>) {
106+
let reqs = &mut ActiveRequests::default();
107+
108+
let addr = receiver.bound_socket_address();
109+
let local_addr = format!("udp://{addr}");
110+
111+
loop {
112+
if let Some(req) = {
113+
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server (wait for request)");
114+
receiver.next().await
115+
} {
116+
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop (in)");
117+
118+
let req = match req {
119+
Ok(req) => req,
120+
Err(e) => {
121+
if e.kind() == std::io::ErrorKind::Interrupted {
122+
tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, err = %e, "Udp::run_udp_server::loop (interrupted)");
123+
return;
124+
}
125+
tracing::error!(target: UDP_TRACKER_LOG_TARGET, local_addr, err = %e, "Udp::run_udp_server::loop break: (got error)");
126+
break;
127+
}
128+
};
129+
130+
/* code-review:
131+
132+
Does it make sense to spawn a new request processor task when
133+
the ActiveRequests buffer is full?
134+
135+
We could store the UDP request in a secondary buffer and wait
136+
until active tasks are finished. When a active request is finished
137+
we can move a new UDP request from the pending to process requests
138+
buffer to the active requests buffer.
139+
140+
This forces us to define an explicit timeout for active requests.
141+
142+
In the current solution the timeout is dynamic, it depends on
143+
the system load. With high load we can remove tasks without
144+
giving them enough time to be processed. With low load we could
145+
keep processing running longer than a reasonable time for
146+
the client to receive the response.
147+
148+
*/
149+
150+
let abort_handle =
151+
tokio::task::spawn(Launcher::process_request(req, tracker.clone(), receiver.bound_socket.clone()))
152+
.abort_handle();
153+
154+
if abort_handle.is_finished() {
155+
continue;
156+
}
157+
158+
reqs.force_push(abort_handle, &local_addr).await;
159+
} else {
160+
tokio::task::yield_now().await;
161+
// the request iterator returned `None`.
162+
tracing::error!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server breaking: (ran dry, should not happen in production!)");
163+
break;
164+
}
165+
}
166+
}
167+
168+
async fn process_request(request: RawRequest, tracker: Arc<Tracker>, socket: Arc<BoundSocket>) {
169+
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, request = %request.from, "Udp::process_request (receiving)");
170+
Self::process_valid_request(tracker, socket, request).await;
171+
}
172+
173+
async fn process_valid_request(tracker: Arc<Tracker>, socket: Arc<BoundSocket>, udp_request: RawRequest) {
174+
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, "Udp::process_valid_request. Making Response to {udp_request:?}");
175+
let from = udp_request.from;
176+
let response = handlers::handle_packet(udp_request, &tracker.clone(), socket.address()).await;
177+
Self::send_response(&socket.clone(), from, response).await;
178+
}
179+
180+
async fn send_response(bound_socket: &Arc<BoundSocket>, to: SocketAddr, response: Response) {
181+
let response_type = match &response {
182+
Response::Connect(_) => "Connect".to_string(),
183+
Response::AnnounceIpv4(_) => "AnnounceIpv4".to_string(),
184+
Response::AnnounceIpv6(_) => "AnnounceIpv6".to_string(),
185+
Response::Scrape(_) => "Scrape".to_string(),
186+
Response::Error(e) => format!("Error: {e:?}"),
187+
};
188+
189+
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, target = ?to, response_type, "Udp::send_response (sending)");
190+
191+
let buffer = vec![0u8; MAX_PACKET_SIZE];
192+
let mut cursor = Cursor::new(buffer);
193+
194+
match response.write_bytes(&mut cursor) {
195+
Ok(()) => {
196+
#[allow(clippy::cast_possible_truncation)]
197+
let position = cursor.position() as usize;
198+
let inner = cursor.get_ref();
199+
200+
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sending...)" );
201+
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), payload = ?&inner[..position], "Udp::send_response (sending...)");
202+
203+
Self::send_packet(bound_socket, &to, &inner[..position]).await;
204+
205+
tracing::trace!(target:UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sent)");
206+
}
207+
Err(e) => {
208+
tracing::error!(target: UDP_TRACKER_LOG_TARGET, ?to, response_type, err = %e, "Udp::send_response (error)");
209+
}
210+
}
211+
}
212+
213+
async fn send_packet(bound_socket: &Arc<BoundSocket>, remote_addr: &SocketAddr, payload: &[u8]) {
214+
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, to = %remote_addr, ?payload, "Udp::send_response (sending)");
215+
216+
// doesn't matter if it reaches or not
217+
drop(bound_socket.send_to(payload, remote_addr).await);
218+
}
219+
}

0 commit comments

Comments
 (0)