Skip to content

Commit 72c8348

Browse files
committedJan 24, 2024
udp: handle udp requests concurrently
1 parent 444c395 commit 72c8348

File tree

11 files changed

+207
-85
lines changed

11 files changed

+207
-85
lines changed
 

‎.github/workflows/coverage.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ jobs:
5555
name: Run Build Checks
5656
run: cargo check --tests --benches --examples --workspace --all-targets --all-features
5757

58+
# Run Test Locally:
59+
# RUSTFLAGS="-Z profile -C codegen-units=1 -C inline-threshold=0 -C link-dead-code -C overflow-checks=off -C panic=abort -Z panic_abort_tests" RUSTDOCFLAGS="-Z profile -C codegen-units=1 -C inline-threshold=0 -C link-dead-code -C overflow-checks=off -C panic=abort -Z panic_abort_tests" CARGO_INCREMENTAL="0" RUST_BACKTRACE=1 cargo test --tests --benches --examples --workspace --all-targets --all-features
60+
5861
- id: test
5962
name: Run Unit Tests
6063
run: cargo test --tests --benches --examples --workspace --all-targets --all-features

‎.vscode/settings.json

+6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22
"[rust]": {
33
"editor.formatOnSave": true
44
},
5+
"[ignore]": { "rust-analyzer.cargo.extraEnv" : {
6+
"RUSTFLAGS": "-Z profile -C codegen-units=1 -C inline-threshold=0 -C link-dead-code -C overflow-checks=off -C panic=abort -Z panic_abort_tests",
7+
"RUSTDOCFLAGS": "-Z profile -C codegen-units=1 -C inline-threshold=0 -C link-dead-code -C overflow-checks=off -C panic=abort -Z panic_abort_tests",
8+
"CARGO_INCREMENTAL": "0",
9+
"RUST_BACKTRACE": "1"
10+
}},
511
"rust-analyzer.checkOnSave": true,
612
"rust-analyzer.check.command": "clippy",
713
"rust-analyzer.check.allTargets": true,

‎Cargo.lock

+10
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ serde = { version = "1", features = ["derive"] }
5656
serde_bencode = "0"
5757
serde_bytes = "0"
5858
serde_json = "1"
59+
ringbuf = "0.4.0-rc.2"
5960
serde_with = "3"
6061
serde_repr = "0"
6162
tdyne-peer-id = "1"

‎cSpell.json

+2
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,10 @@
9494
"reannounce",
9595
"Registar",
9696
"repr",
97+
"reqs",
9798
"reqwest",
9899
"rerequests",
100+
"ringbuf",
99101
"rngs",
100102
"routable",
101103
"rusqlite",

‎src/servers/udp/handlers.rs

+11-5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use log::{debug, info};
1111
use torrust_tracker_located_error::DynError;
1212

1313
use super::connection_cookie::{check, from_connection_id, into_connection_id, make};
14+
use super::UdpRequest;
1415
use crate::core::{statistics, ScrapeData, Tracker};
1516
use crate::servers::udp::error::Error;
1617
use crate::servers::udp::peer_builder;
@@ -27,10 +28,13 @@ use crate::shared::bit_torrent::info_hash::InfoHash;
2728
/// type.
2829
///
2930
/// It will return an `Error` response if the request is invalid.
30-
pub async fn handle_packet(remote_addr: SocketAddr, payload: Vec<u8>, tracker: &Tracker) -> Response {
31-
match Request::from_bytes(&payload[..payload.len()], MAX_SCRAPE_TORRENTS).map_err(|e| Error::InternalServer {
32-
message: format!("{e:?}"),
33-
location: Location::caller(),
31+
pub(crate) async fn handle_packet(udp_request: UdpRequest, tracker: &Arc<Tracker>) -> Response {
32+
debug!("Handling Packets: {udp_request:?}");
33+
match Request::from_bytes(&udp_request.payload[..udp_request.payload.len()], MAX_SCRAPE_TORRENTS).map_err(|e| {
34+
Error::InternalServer {
35+
message: format!("{e:?}"),
36+
location: Location::caller(),
37+
}
3438
}) {
3539
Ok(request) => {
3640
let transaction_id = match &request {
@@ -39,7 +43,7 @@ pub async fn handle_packet(remote_addr: SocketAddr, payload: Vec<u8>, tracker: &
3943
Request::Scrape(scrape_request) => scrape_request.transaction_id,
4044
};
4145

42-
match handle_request(request, remote_addr, tracker).await {
46+
match handle_request(request, udp_request.from, tracker).await {
4347
Ok(response) => response,
4448
Err(e) => handle_error(&e, transaction_id),
4549
}
@@ -60,6 +64,8 @@ pub async fn handle_packet(remote_addr: SocketAddr, payload: Vec<u8>, tracker: &
6064
///
6165
/// If a error happens in the `handle_request` function, it will just return the `ServerError`.
6266
pub async fn handle_request(request: Request, remote_addr: SocketAddr, tracker: &Tracker) -> Result<Response, Error> {
67+
debug!("Handling Request: {request:?} to: {remote_addr:?}");
68+
6369
match request {
6470
Request::Connect(connect_request) => handle_connect(remote_addr, &connect_request, tracker).await,
6571
Request::Announce(announce_request) => handle_announce(remote_addr, &announce_request, tracker).await,

‎src/servers/udp/mod.rs

+9
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,9 @@
638638
//! documentation by [Arvid Norberg](https://github.com/arvidn) was very
639639
//! supportive in the development of this documentation. Some descriptions were
640640
//! taken from the [libtorrent](https://www.rasterbar.com/products/libtorrent/udp_tracker_protocol.html).
641+
642+
use std::net::SocketAddr;
643+
641644
pub mod connection_cookie;
642645
pub mod error;
643646
pub mod handlers;
@@ -652,3 +655,9 @@ pub type Port = u16;
652655
/// The transaction id. A random number generated byt the peer that is used to
653656
/// match requests and responses.
654657
pub type TransactionId = i64;
658+
659+
#[derive(Clone, Debug)]
660+
pub(crate) struct UdpRequest {
661+
payload: Vec<u8>,
662+
from: SocketAddr,
663+
}

‎src/servers/udp/server.rs

+136-79
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,24 @@
2020
use std::io::Cursor;
2121
use std::net::SocketAddr;
2222
use std::sync::Arc;
23-
use std::time::Duration;
2423

2524
use aquatic_udp_protocol::Response;
2625
use derive_more::Constructor;
27-
use futures::pin_mut;
28-
use log::{debug, error, info};
26+
use log::{debug, error, info, trace};
27+
use ringbuf::storage::Static;
28+
use ringbuf::traits::{Consumer, Observer, RingBuffer};
29+
use ringbuf::LocalRb;
2930
use tokio::net::UdpSocket;
30-
use tokio::sync::oneshot::{Receiver, Sender};
31-
use tokio::task::JoinHandle;
31+
use tokio::sync::oneshot;
32+
use tokio::task::{AbortHandle, JoinHandle};
33+
use tokio::{select, task};
3234

35+
use super::UdpRequest;
3336
use crate::bootstrap::jobs::Started;
3437
use crate::core::Tracker;
3538
use crate::servers::registar::{ServiceHealthCheckJob, ServiceRegistration, ServiceRegistrationForm};
3639
use crate::servers::signals::{shutdown_signal_with_message, Halted};
37-
use crate::servers::udp::handlers::handle_packet;
40+
use crate::servers::udp::handlers;
3841
use crate::shared::bit_torrent::tracker::udp::client::check;
3942
use crate::shared::bit_torrent::tracker::udp::MAX_PACKET_SIZE;
4043

@@ -125,17 +128,8 @@ impl UdpServer<Stopped> {
125128

126129
assert!(!tx_halt.is_closed(), "Halt channel for UDP tracker should be open");
127130

128-
let launcher = self.state.launcher;
129-
130-
let task = tokio::spawn(async move {
131-
debug!(target: "UDP Tracker", "Launcher starting ...");
132-
133-
let starting = launcher.start(tracker, tx_start, rx_halt).await;
134-
135-
starting.await.expect("UDP server should have started running");
136-
137-
launcher
138-
});
131+
// May need to wrap in a task to about a tokio bug.
132+
let task = self.state.launcher.start(tracker, tx_start, rx_halt);
139133

140134
let binding = rx_start.await.expect("it should be able to start the service").address;
141135

@@ -150,6 +144,8 @@ impl UdpServer<Stopped> {
150144
},
151145
};
152146

147+
trace!("Running UDP Tracker on Socket: {}", running_udp_server.state.binding);
148+
153149
Ok(running_udp_server)
154150
}
155151
}
@@ -182,7 +178,7 @@ impl UdpServer<Running> {
182178
}
183179
}
184180

185-
#[derive(Constructor, Debug)]
181+
#[derive(Constructor, Copy, Clone, Debug)]
186182
pub struct Launcher {
187183
bind_to: SocketAddr,
188184
}
@@ -193,8 +189,40 @@ impl Launcher {
193189
/// # Panics
194190
///
195191
/// It would panic if unable to resolve the `local_addr` from the supplied ´socket´.
196-
pub async fn start(&self, tracker: Arc<Tracker>, tx_start: Sender<Started>, rx_halt: Receiver<Halted>) -> JoinHandle<()> {
197-
Udp::start_with_graceful_shutdown(tracker, self.bind_to, tx_start, rx_halt).await
192+
pub fn start(
193+
&self,
194+
tracker: Arc<Tracker>,
195+
tx_start: oneshot::Sender<Started>,
196+
rx_halt: oneshot::Receiver<Halted>,
197+
) -> JoinHandle<Launcher> {
198+
let launcher = Launcher::new(self.bind_to);
199+
tokio::spawn(async move {
200+
Udp::run_with_graceful_shutdown(tracker, launcher.bind_to, tx_start, rx_halt).await;
201+
launcher
202+
})
203+
}
204+
}
205+
206+
#[derive(Default)]
207+
struct ActiveRequests {
208+
rb: LocalRb<Static<AbortHandle, 50>>, // the number of requests we handle at the same time.
209+
}
210+
211+
impl std::fmt::Debug for ActiveRequests {
212+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213+
let (left, right) = &self.rb.as_slices();
214+
let dbg = format!("capacity: {}, left: {left:?}, right: {right:?}", &self.rb.capacity());
215+
f.debug_struct("ActiveRequests").field("rb", &dbg).finish()
216+
}
217+
}
218+
219+
impl Drop for ActiveRequests {
220+
fn drop(&mut self) {
221+
for h in self.rb.pop_iter() {
222+
if !h.is_finished() {
223+
h.abort();
224+
}
225+
}
198226
}
199227
}
200228

@@ -209,80 +237,103 @@ impl Udp {
209237
///
210238
/// It panics if unable to bind to udp socket, and get the address from the udp socket.
211239
/// It also panics if unable to send address of socket.
212-
async fn start_with_graceful_shutdown(
240+
async fn run_with_graceful_shutdown(
213241
tracker: Arc<Tracker>,
214242
bind_to: SocketAddr,
215-
tx_start: Sender<Started>,
216-
rx_halt: Receiver<Halted>,
217-
) -> JoinHandle<()> {
243+
tx_start: oneshot::Sender<Started>,
244+
rx_halt: oneshot::Receiver<Halted>,
245+
) {
218246
let socket = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}."));
219247
let address = socket.local_addr().expect("Could not get local_addr from {binding}.");
248+
let halt = shutdown_signal_with_message(rx_halt, format!("Halting Http Service Bound to Socket: {address}"));
220249

221250
info!(target: "UDP Tracker", "Starting on: udp://{}", address);
222251

223252
let running = tokio::task::spawn(async move {
224-
let halt = tokio::task::spawn(async move {
225-
debug!(target: "UDP Tracker", "Waiting for halt signal for socket address: udp://{address} ...");
226-
227-
shutdown_signal_with_message(
228-
rx_halt,
229-
format!("Shutting down UDP server on socket address: udp://{address}"),
230-
)
231-
.await;
232-
});
233-
234-
let listen = async move {
235-
debug!(target: "UDP Tracker", "Waiting for packets on socket address: udp://{address} ...");
236-
237-
loop {
238-
let mut data = [0; MAX_PACKET_SIZE];
239-
let socket_clone = socket.clone();
240-
241-
match socket_clone.recv_from(&mut data).await {
242-
Ok((valid_bytes, remote_addr)) => {
243-
let payload = data[..valid_bytes].to_vec();
244-
245-
debug!(target: "UDP Tracker", "Received {} bytes", payload.len());
246-
debug!(target: "UDP Tracker", "From: {}", &remote_addr);
247-
debug!(target: "UDP Tracker", "Payload: {:?}", payload);
248-
249-
let response_fut = handle_packet(remote_addr, payload, &tracker);
250-
251-
match tokio::time::timeout(Duration::from_secs(5), response_fut).await {
252-
Ok(response) => {
253-
Udp::send_response(socket_clone, remote_addr, response).await;
254-
}
255-
Err(_) => {
256-
error!("Timeout occurred while processing the UDP request.");
257-
}
258-
}
259-
}
260-
Err(err) => {
261-
error!("Error reading UDP datagram from socket. Error: {:?}", err);
262-
}
253+
debug!(target: "UDP Tracker", "Started: Waiting for packets on socket address: udp://{address} ...");
254+
255+
let tracker = tracker.clone();
256+
let socket = socket.clone();
257+
258+
let reqs = &mut ActiveRequests::default();
259+
260+
// Main Waiting Loop, awaits on async [`receive_request`].
261+
loop {
262+
if let Some(h) = reqs.rb.push_overwrite(
263+
Self::do_request(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone()).abort_handle(),
264+
) {
265+
if !h.is_finished() {
266+
// the task is still running, lets yield and give it a chance to flush.
267+
tokio::task::yield_now().await;
268+
h.abort();
263269
}
264270
}
265-
};
271+
}
272+
});
273+
274+
tx_start
275+
.send(Started { address })
276+
.expect("the UDP Tracker service should not be dropped");
277+
278+
debug!(target: "UDP Tracker", "Started on: udp://{}", address);
266279

267-
pin_mut!(halt);
268-
pin_mut!(listen);
280+
let stop = running.abort_handle();
269281

270-
tx_start
271-
.send(Started { address })
272-
.expect("the UDP Tracker service should not be dropped");
282+
select! {
283+
_ = running => { debug!(target: "UDP Tracker", "Socket listener stopped on address: udp://{address}"); },
284+
() = halt => { debug!(target: "UDP Tracker", "Halt signal spawned task stopped on address: udp://{address}"); }
285+
}
286+
stop.abort();
287+
288+
task::yield_now().await; // lets allow the other threads to complete.
289+
}
273290

274-
tokio::select! {
275-
_ = & mut halt => { debug!(target: "UDP Tracker", "Halt signal spawned task stopped on address: udp://{address}"); },
276-
() = & mut listen => { debug!(target: "UDP Tracker", "Socket listener stopped on address: udp://{address}"); },
291+
async fn receive_request(socket: Arc<UdpSocket>) -> Result<UdpRequest, Box<std::io::Error>> {
292+
// Wait for the socket to be readable
293+
socket.readable().await?;
294+
295+
let mut buf = Vec::with_capacity(MAX_PACKET_SIZE);
296+
297+
match socket.recv_buf_from(&mut buf).await {
298+
Ok((n, from)) => {
299+
Vec::truncate(&mut buf, n);
300+
trace!("GOT {buf:?}");
301+
Ok(UdpRequest { payload: buf, from })
277302
}
278-
});
279303

280-
info!(target: "UDP Tracker", "Started on: udp://{}", address);
304+
Err(e) => Err(Box::new(e)),
305+
}
306+
}
281307

282-
running
308+
fn do_request(
309+
result: Result<UdpRequest, Box<std::io::Error>>,
310+
tracker: Arc<Tracker>,
311+
socket: Arc<UdpSocket>,
312+
) -> JoinHandle<()> {
313+
// timeout not needed, as udp is non-blocking.
314+
tokio::task::spawn(async move {
315+
match result {
316+
Ok(udp_request) => {
317+
trace!("Received Request from: {}", udp_request.from);
318+
Self::make_response(tracker.clone(), socket.clone(), udp_request).await;
319+
}
320+
Err(error) => {
321+
debug!("error: {error}");
322+
}
323+
}
324+
})
283325
}
284326

285-
async fn send_response(socket: Arc<UdpSocket>, remote_addr: SocketAddr, response: Response) {
327+
async fn make_response(tracker: Arc<Tracker>, socket: Arc<UdpSocket>, udp_request: UdpRequest) {
328+
trace!("Making Response to {udp_request:?}");
329+
let from = udp_request.from;
330+
let response = handlers::handle_packet(udp_request, &tracker.clone()).await;
331+
Self::send_response(&socket.clone(), from, response).await;
332+
}
333+
334+
async fn send_response(socket: &Arc<UdpSocket>, to: SocketAddr, response: Response) {
335+
trace!("Sending Response: {response:?} to: {to:?}");
336+
286337
let buffer = vec![0u8; MAX_PACKET_SIZE];
287338
let mut cursor = Cursor::new(buffer);
288339

@@ -293,10 +344,10 @@ impl Udp {
293344
let inner = cursor.get_ref();
294345

295346
debug!("Sending {} bytes ...", &inner[..position].len());
296-
debug!("To: {:?}", &remote_addr);
347+
debug!("To: {:?}", &to);
297348
debug!("Payload: {:?}", &inner[..position]);
298349

299-
Udp::send_packet(socket, &remote_addr, &inner[..position]).await;
350+
Self::send_packet(socket, &to, &inner[..position]).await;
300351

301352
debug!("{} bytes sent", &inner[..position].len());
302353
}
@@ -306,7 +357,9 @@ impl Udp {
306357
}
307358
}
308359

309-
async fn send_packet(socket: Arc<UdpSocket>, remote_addr: &SocketAddr, payload: &[u8]) {
360+
async fn send_packet(socket: &Arc<UdpSocket>, remote_addr: &SocketAddr, payload: &[u8]) {
361+
trace!("Sending Packets: {payload:?} to: {remote_addr:?}");
362+
310363
// doesn't matter if it reaches or not
311364
drop(socket.send_to(payload, remote_addr).await);
312365
}
@@ -324,7 +377,9 @@ impl Udp {
324377
#[cfg(test)]
325378
mod tests {
326379
use std::sync::Arc;
380+
use std::time::Duration;
327381

382+
use tokio::time::sleep;
328383
use torrust_tracker_test_helpers::configuration::ephemeral_mode_public;
329384

330385
use crate::bootstrap::app::initialize_with_configuration;
@@ -351,6 +406,8 @@ mod tests {
351406
.expect("it should start the server");
352407
let stopped = started.stop().await.expect("it should stop the server");
353408

409+
sleep(Duration::from_secs(1)).await;
410+
354411
assert_eq!(stopped.state.launcher.bind_to, bind_to);
355412
}
356413
}

‎src/shared/bit_torrent/tracker/udp/client.rs

+2
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ pub async fn new_udp_tracker_client_connected(remote_address: &str) -> UdpTracke
143143
///
144144
/// # Panics
145145
pub async fn check(binding: &SocketAddr) -> Result<String, String> {
146+
debug!("Checking Service (detail): {binding:?}.");
147+
146148
let client = new_udp_tracker_client_connected(binding.to_string().as_str()).await;
147149

148150
let connect_request = ConnectRequest {

‎tests/servers/udp/contract.rs

+8
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ async fn should_return_a_bad_request_response_when_the_client_sends_an_empty_req
4747
let response = Response::from_bytes(&buffer, true).unwrap();
4848

4949
assert!(is_error_response(&response, "bad request"));
50+
51+
env.stop().await;
5052
}
5153

5254
mod receiving_a_connection_request {
@@ -72,6 +74,8 @@ mod receiving_a_connection_request {
7274
let response = client.receive().await;
7375

7476
assert!(is_connect_response(&response, TransactionId(123)));
77+
78+
env.stop().await;
7579
}
7680
}
7781

@@ -121,6 +125,8 @@ mod receiving_an_announce_request {
121125
println!("test response {response:?}");
122126

123127
assert!(is_ipv4_announce_response(&response));
128+
129+
env.stop().await;
124130
}
125131
}
126132

@@ -158,5 +164,7 @@ mod receiving_an_scrape_request {
158164
let response = client.receive().await;
159165

160166
assert!(is_scrape_response(&response));
167+
168+
env.stop().await;
161169
}
162170
}

‎tests/servers/udp/environment.rs

+19-1
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,29 @@ impl Environment<Running> {
6868
config: self.config,
6969
tracker: self.tracker,
7070
registar: Registar::default(),
71-
server: self.server.stop().await.unwrap(),
71+
server: self.server.stop().await.expect("it stop the udp tracker service"),
7272
}
7373
}
7474

7575
pub fn bind_address(&self) -> SocketAddr {
7676
self.server.state.binding
7777
}
7878
}
79+
80+
#[cfg(test)]
81+
mod tests {
82+
use std::time::Duration;
83+
84+
use tokio::time::sleep;
85+
use torrust_tracker_test_helpers::configuration;
86+
87+
use crate::servers::udp::Started;
88+
89+
#[tokio::test]
90+
async fn it_should_make_and_stop_udp_server() {
91+
let env = Started::new(&configuration::ephemeral().into()).await;
92+
sleep(Duration::from_secs(1)).await;
93+
env.stop().await;
94+
sleep(Duration::from_secs(1)).await;
95+
}
96+
}

0 commit comments

Comments
 (0)
Please sign in to comment.