From 72c8348559f45f936266823d576f8d8798b32d66 Mon Sep 17 00:00:00 2001 From: Cameron Garnham Date: Wed, 24 Jan 2024 18:17:10 +0800 Subject: [PATCH] udp: handle udp requests concurrently --- .github/workflows/coverage.yaml | 3 + .vscode/settings.json | 6 + Cargo.lock | 10 + Cargo.toml | 1 + cSpell.json | 2 + src/servers/udp/handlers.rs | 16 +- src/servers/udp/mod.rs | 9 + src/servers/udp/server.rs | 215 ++++++++++++------- src/shared/bit_torrent/tracker/udp/client.rs | 2 + tests/servers/udp/contract.rs | 8 + tests/servers/udp/environment.rs | 20 +- 11 files changed, 207 insertions(+), 85 deletions(-) diff --git a/.github/workflows/coverage.yaml b/.github/workflows/coverage.yaml index 7f5bf2946..06529d53d 100644 --- a/.github/workflows/coverage.yaml +++ b/.github/workflows/coverage.yaml @@ -55,6 +55,9 @@ jobs: name: Run Build Checks run: cargo check --tests --benches --examples --workspace --all-targets --all-features + # Run Test Locally: + # RUSTFLAGS="-Z profile -C codegen-units=1 -C inline-threshold=0 -C link-dead-code -C overflow-checks=off -C panic=abort -Z panic_abort_tests" RUSTDOCFLAGS="-Z profile -C codegen-units=1 -C inline-threshold=0 -C link-dead-code -C overflow-checks=off -C panic=abort -Z panic_abort_tests" CARGO_INCREMENTAL="0" RUST_BACKTRACE=1 cargo test --tests --benches --examples --workspace --all-targets --all-features + - id: test name: Run Unit Tests run: cargo test --tests --benches --examples --workspace --all-targets --all-features diff --git a/.vscode/settings.json b/.vscode/settings.json index 038da4c18..701e89ccf 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,6 +2,12 @@ "[rust]": { "editor.formatOnSave": true }, + "[ignore]": { "rust-analyzer.cargo.extraEnv" : { + "RUSTFLAGS": "-Z profile -C codegen-units=1 -C inline-threshold=0 -C link-dead-code -C overflow-checks=off -C panic=abort -Z panic_abort_tests", + "RUSTDOCFLAGS": "-Z profile -C codegen-units=1 -C inline-threshold=0 -C link-dead-code -C overflow-checks=off -C panic=abort -Z panic_abort_tests", + "CARGO_INCREMENTAL": "0", + "RUST_BACKTRACE": "1" + }}, "rust-analyzer.checkOnSave": true, "rust-analyzer.check.command": "clippy", "rust-analyzer.check.allTargets": true, diff --git a/Cargo.lock b/Cargo.lock index 1f49aa986..63dfab1c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2621,6 +2621,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "ringbuf" +version = "0.4.0-rc.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b8f7d58e4f67752d63318605656be063e333154aa35b70126075e9d05552979" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "rkyv" version = "0.7.43" @@ -3448,6 +3457,7 @@ dependencies = [ "r2d2_sqlite", "rand", "reqwest", + "ringbuf", "serde", "serde_bencode", "serde_bytes", diff --git a/Cargo.toml b/Cargo.toml index 9b7e71905..6fd542c2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ serde = { version = "1", features = ["derive"] } serde_bencode = "0" serde_bytes = "0" serde_json = "1" +ringbuf = "0.4.0-rc.2" serde_with = "3" serde_repr = "0" tdyne-peer-id = "1" diff --git a/cSpell.json b/cSpell.json index e02c6ed87..acd46284c 100644 --- a/cSpell.json +++ b/cSpell.json @@ -94,8 +94,10 @@ "reannounce", "Registar", "repr", + "reqs", "reqwest", "rerequests", + "ringbuf", "rngs", "routable", "rusqlite", diff --git a/src/servers/udp/handlers.rs b/src/servers/udp/handlers.rs index b77cd3a42..65e3f5b20 100644 --- a/src/servers/udp/handlers.rs +++ b/src/servers/udp/handlers.rs @@ -11,6 +11,7 @@ use log::{debug, info}; use torrust_tracker_located_error::DynError; use super::connection_cookie::{check, from_connection_id, into_connection_id, make}; +use super::UdpRequest; use crate::core::{statistics, ScrapeData, Tracker}; use crate::servers::udp::error::Error; use crate::servers::udp::peer_builder; @@ -27,10 +28,13 @@ use crate::shared::bit_torrent::info_hash::InfoHash; /// type. /// /// It will return an `Error` response if the request is invalid. -pub async fn handle_packet(remote_addr: SocketAddr, payload: Vec, tracker: &Tracker) -> Response { - match Request::from_bytes(&payload[..payload.len()], MAX_SCRAPE_TORRENTS).map_err(|e| Error::InternalServer { - message: format!("{e:?}"), - location: Location::caller(), +pub(crate) async fn handle_packet(udp_request: UdpRequest, tracker: &Arc) -> Response { + debug!("Handling Packets: {udp_request:?}"); + match Request::from_bytes(&udp_request.payload[..udp_request.payload.len()], MAX_SCRAPE_TORRENTS).map_err(|e| { + Error::InternalServer { + message: format!("{e:?}"), + location: Location::caller(), + } }) { Ok(request) => { let transaction_id = match &request { @@ -39,7 +43,7 @@ pub async fn handle_packet(remote_addr: SocketAddr, payload: Vec, tracker: & Request::Scrape(scrape_request) => scrape_request.transaction_id, }; - match handle_request(request, remote_addr, tracker).await { + match handle_request(request, udp_request.from, tracker).await { Ok(response) => response, Err(e) => handle_error(&e, transaction_id), } @@ -60,6 +64,8 @@ pub async fn handle_packet(remote_addr: SocketAddr, payload: Vec, tracker: & /// /// If a error happens in the `handle_request` function, it will just return the `ServerError`. pub async fn handle_request(request: Request, remote_addr: SocketAddr, tracker: &Tracker) -> Result { + debug!("Handling Request: {request:?} to: {remote_addr:?}"); + match request { Request::Connect(connect_request) => handle_connect(remote_addr, &connect_request, tracker).await, Request::Announce(announce_request) => handle_announce(remote_addr, &announce_request, tracker).await, diff --git a/src/servers/udp/mod.rs b/src/servers/udp/mod.rs index 985c1cec7..3b22aeab5 100644 --- a/src/servers/udp/mod.rs +++ b/src/servers/udp/mod.rs @@ -638,6 +638,9 @@ //! documentation by [Arvid Norberg](https://github.com/arvidn) was very //! supportive in the development of this documentation. Some descriptions were //! taken from the [libtorrent](https://www.rasterbar.com/products/libtorrent/udp_tracker_protocol.html). + +use std::net::SocketAddr; + pub mod connection_cookie; pub mod error; pub mod handlers; @@ -652,3 +655,9 @@ pub type Port = u16; /// The transaction id. A random number generated byt the peer that is used to /// match requests and responses. pub type TransactionId = i64; + +#[derive(Clone, Debug)] +pub(crate) struct UdpRequest { + payload: Vec, + from: SocketAddr, +} diff --git a/src/servers/udp/server.rs b/src/servers/udp/server.rs index 5a1977d01..0ab50d3bd 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server.rs @@ -20,21 +20,24 @@ use std::io::Cursor; use std::net::SocketAddr; use std::sync::Arc; -use std::time::Duration; use aquatic_udp_protocol::Response; use derive_more::Constructor; -use futures::pin_mut; -use log::{debug, error, info}; +use log::{debug, error, info, trace}; +use ringbuf::storage::Static; +use ringbuf::traits::{Consumer, Observer, RingBuffer}; +use ringbuf::LocalRb; use tokio::net::UdpSocket; -use tokio::sync::oneshot::{Receiver, Sender}; -use tokio::task::JoinHandle; +use tokio::sync::oneshot; +use tokio::task::{AbortHandle, JoinHandle}; +use tokio::{select, task}; +use super::UdpRequest; use crate::bootstrap::jobs::Started; use crate::core::Tracker; use crate::servers::registar::{ServiceHealthCheckJob, ServiceRegistration, ServiceRegistrationForm}; use crate::servers::signals::{shutdown_signal_with_message, Halted}; -use crate::servers::udp::handlers::handle_packet; +use crate::servers::udp::handlers; use crate::shared::bit_torrent::tracker::udp::client::check; use crate::shared::bit_torrent::tracker::udp::MAX_PACKET_SIZE; @@ -125,17 +128,8 @@ impl UdpServer { assert!(!tx_halt.is_closed(), "Halt channel for UDP tracker should be open"); - let launcher = self.state.launcher; - - let task = tokio::spawn(async move { - debug!(target: "UDP Tracker", "Launcher starting ..."); - - let starting = launcher.start(tracker, tx_start, rx_halt).await; - - starting.await.expect("UDP server should have started running"); - - launcher - }); + // May need to wrap in a task to about a tokio bug. + let task = self.state.launcher.start(tracker, tx_start, rx_halt); let binding = rx_start.await.expect("it should be able to start the service").address; @@ -150,6 +144,8 @@ impl UdpServer { }, }; + trace!("Running UDP Tracker on Socket: {}", running_udp_server.state.binding); + Ok(running_udp_server) } } @@ -182,7 +178,7 @@ impl UdpServer { } } -#[derive(Constructor, Debug)] +#[derive(Constructor, Copy, Clone, Debug)] pub struct Launcher { bind_to: SocketAddr, } @@ -193,8 +189,40 @@ impl Launcher { /// # Panics /// /// It would panic if unable to resolve the `local_addr` from the supplied ´socket´. - pub async fn start(&self, tracker: Arc, tx_start: Sender, rx_halt: Receiver) -> JoinHandle<()> { - Udp::start_with_graceful_shutdown(tracker, self.bind_to, tx_start, rx_halt).await + pub fn start( + &self, + tracker: Arc, + tx_start: oneshot::Sender, + rx_halt: oneshot::Receiver, + ) -> JoinHandle { + let launcher = Launcher::new(self.bind_to); + tokio::spawn(async move { + Udp::run_with_graceful_shutdown(tracker, launcher.bind_to, tx_start, rx_halt).await; + launcher + }) + } +} + +#[derive(Default)] +struct ActiveRequests { + rb: LocalRb>, // the number of requests we handle at the same time. +} + +impl std::fmt::Debug for ActiveRequests { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let (left, right) = &self.rb.as_slices(); + let dbg = format!("capacity: {}, left: {left:?}, right: {right:?}", &self.rb.capacity()); + f.debug_struct("ActiveRequests").field("rb", &dbg).finish() + } +} + +impl Drop for ActiveRequests { + fn drop(&mut self) { + for h in self.rb.pop_iter() { + if !h.is_finished() { + h.abort(); + } + } } } @@ -209,80 +237,103 @@ impl Udp { /// /// It panics if unable to bind to udp socket, and get the address from the udp socket. /// It also panics if unable to send address of socket. - async fn start_with_graceful_shutdown( + async fn run_with_graceful_shutdown( tracker: Arc, bind_to: SocketAddr, - tx_start: Sender, - rx_halt: Receiver, - ) -> JoinHandle<()> { + tx_start: oneshot::Sender, + rx_halt: oneshot::Receiver, + ) { let socket = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}.")); let address = socket.local_addr().expect("Could not get local_addr from {binding}."); + let halt = shutdown_signal_with_message(rx_halt, format!("Halting Http Service Bound to Socket: {address}")); info!(target: "UDP Tracker", "Starting on: udp://{}", address); let running = tokio::task::spawn(async move { - let halt = tokio::task::spawn(async move { - debug!(target: "UDP Tracker", "Waiting for halt signal for socket address: udp://{address} ..."); - - shutdown_signal_with_message( - rx_halt, - format!("Shutting down UDP server on socket address: udp://{address}"), - ) - .await; - }); - - let listen = async move { - debug!(target: "UDP Tracker", "Waiting for packets on socket address: udp://{address} ..."); - - loop { - let mut data = [0; MAX_PACKET_SIZE]; - let socket_clone = socket.clone(); - - match socket_clone.recv_from(&mut data).await { - Ok((valid_bytes, remote_addr)) => { - let payload = data[..valid_bytes].to_vec(); - - debug!(target: "UDP Tracker", "Received {} bytes", payload.len()); - debug!(target: "UDP Tracker", "From: {}", &remote_addr); - debug!(target: "UDP Tracker", "Payload: {:?}", payload); - - let response_fut = handle_packet(remote_addr, payload, &tracker); - - match tokio::time::timeout(Duration::from_secs(5), response_fut).await { - Ok(response) => { - Udp::send_response(socket_clone, remote_addr, response).await; - } - Err(_) => { - error!("Timeout occurred while processing the UDP request."); - } - } - } - Err(err) => { - error!("Error reading UDP datagram from socket. Error: {:?}", err); - } + debug!(target: "UDP Tracker", "Started: Waiting for packets on socket address: udp://{address} ..."); + + let tracker = tracker.clone(); + let socket = socket.clone(); + + let reqs = &mut ActiveRequests::default(); + + // Main Waiting Loop, awaits on async [`receive_request`]. + loop { + if let Some(h) = reqs.rb.push_overwrite( + Self::do_request(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone()).abort_handle(), + ) { + if !h.is_finished() { + // the task is still running, lets yield and give it a chance to flush. + tokio::task::yield_now().await; + h.abort(); } } - }; + } + }); + + tx_start + .send(Started { address }) + .expect("the UDP Tracker service should not be dropped"); + + debug!(target: "UDP Tracker", "Started on: udp://{}", address); - pin_mut!(halt); - pin_mut!(listen); + let stop = running.abort_handle(); - tx_start - .send(Started { address }) - .expect("the UDP Tracker service should not be dropped"); + select! { + _ = running => { debug!(target: "UDP Tracker", "Socket listener stopped on address: udp://{address}"); }, + () = halt => { debug!(target: "UDP Tracker", "Halt signal spawned task stopped on address: udp://{address}"); } + } + stop.abort(); + + task::yield_now().await; // lets allow the other threads to complete. + } - tokio::select! { - _ = & mut halt => { debug!(target: "UDP Tracker", "Halt signal spawned task stopped on address: udp://{address}"); }, - () = & mut listen => { debug!(target: "UDP Tracker", "Socket listener stopped on address: udp://{address}"); }, + async fn receive_request(socket: Arc) -> Result> { + // Wait for the socket to be readable + socket.readable().await?; + + let mut buf = Vec::with_capacity(MAX_PACKET_SIZE); + + match socket.recv_buf_from(&mut buf).await { + Ok((n, from)) => { + Vec::truncate(&mut buf, n); + trace!("GOT {buf:?}"); + Ok(UdpRequest { payload: buf, from }) } - }); - info!(target: "UDP Tracker", "Started on: udp://{}", address); + Err(e) => Err(Box::new(e)), + } + } - running + fn do_request( + result: Result>, + tracker: Arc, + socket: Arc, + ) -> JoinHandle<()> { + // timeout not needed, as udp is non-blocking. + tokio::task::spawn(async move { + match result { + Ok(udp_request) => { + trace!("Received Request from: {}", udp_request.from); + Self::make_response(tracker.clone(), socket.clone(), udp_request).await; + } + Err(error) => { + debug!("error: {error}"); + } + } + }) } - async fn send_response(socket: Arc, remote_addr: SocketAddr, response: Response) { + async fn make_response(tracker: Arc, socket: Arc, udp_request: UdpRequest) { + trace!("Making Response to {udp_request:?}"); + let from = udp_request.from; + let response = handlers::handle_packet(udp_request, &tracker.clone()).await; + Self::send_response(&socket.clone(), from, response).await; + } + + async fn send_response(socket: &Arc, to: SocketAddr, response: Response) { + trace!("Sending Response: {response:?} to: {to:?}"); + let buffer = vec![0u8; MAX_PACKET_SIZE]; let mut cursor = Cursor::new(buffer); @@ -293,10 +344,10 @@ impl Udp { let inner = cursor.get_ref(); debug!("Sending {} bytes ...", &inner[..position].len()); - debug!("To: {:?}", &remote_addr); + debug!("To: {:?}", &to); debug!("Payload: {:?}", &inner[..position]); - Udp::send_packet(socket, &remote_addr, &inner[..position]).await; + Self::send_packet(socket, &to, &inner[..position]).await; debug!("{} bytes sent", &inner[..position].len()); } @@ -306,7 +357,9 @@ impl Udp { } } - async fn send_packet(socket: Arc, remote_addr: &SocketAddr, payload: &[u8]) { + async fn send_packet(socket: &Arc, remote_addr: &SocketAddr, payload: &[u8]) { + trace!("Sending Packets: {payload:?} to: {remote_addr:?}"); + // doesn't matter if it reaches or not drop(socket.send_to(payload, remote_addr).await); } @@ -324,7 +377,9 @@ impl Udp { #[cfg(test)] mod tests { use std::sync::Arc; + use std::time::Duration; + use tokio::time::sleep; use torrust_tracker_test_helpers::configuration::ephemeral_mode_public; use crate::bootstrap::app::initialize_with_configuration; @@ -351,6 +406,8 @@ mod tests { .expect("it should start the server"); let stopped = started.stop().await.expect("it should stop the server"); + sleep(Duration::from_secs(1)).await; + assert_eq!(stopped.state.launcher.bind_to, bind_to); } } diff --git a/src/shared/bit_torrent/tracker/udp/client.rs b/src/shared/bit_torrent/tracker/udp/client.rs index 959001e82..23b718472 100644 --- a/src/shared/bit_torrent/tracker/udp/client.rs +++ b/src/shared/bit_torrent/tracker/udp/client.rs @@ -143,6 +143,8 @@ pub async fn new_udp_tracker_client_connected(remote_address: &str) -> UdpTracke /// /// # Panics pub async fn check(binding: &SocketAddr) -> Result { + debug!("Checking Service (detail): {binding:?}."); + let client = new_udp_tracker_client_connected(binding.to_string().as_str()).await; let connect_request = ConnectRequest { diff --git a/tests/servers/udp/contract.rs b/tests/servers/udp/contract.rs index 0eea650b8..91dca4d42 100644 --- a/tests/servers/udp/contract.rs +++ b/tests/servers/udp/contract.rs @@ -47,6 +47,8 @@ async fn should_return_a_bad_request_response_when_the_client_sends_an_empty_req let response = Response::from_bytes(&buffer, true).unwrap(); assert!(is_error_response(&response, "bad request")); + + env.stop().await; } mod receiving_a_connection_request { @@ -72,6 +74,8 @@ mod receiving_a_connection_request { let response = client.receive().await; assert!(is_connect_response(&response, TransactionId(123))); + + env.stop().await; } } @@ -121,6 +125,8 @@ mod receiving_an_announce_request { println!("test response {response:?}"); assert!(is_ipv4_announce_response(&response)); + + env.stop().await; } } @@ -158,5 +164,7 @@ mod receiving_an_scrape_request { let response = client.receive().await; assert!(is_scrape_response(&response)); + + env.stop().await; } } diff --git a/tests/servers/udp/environment.rs b/tests/servers/udp/environment.rs index 26a47987e..da7705016 100644 --- a/tests/servers/udp/environment.rs +++ b/tests/servers/udp/environment.rs @@ -68,7 +68,7 @@ impl Environment { config: self.config, tracker: self.tracker, registar: Registar::default(), - server: self.server.stop().await.unwrap(), + server: self.server.stop().await.expect("it stop the udp tracker service"), } } @@ -76,3 +76,21 @@ impl Environment { self.server.state.binding } } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tokio::time::sleep; + use torrust_tracker_test_helpers::configuration; + + use crate::servers::udp::Started; + + #[tokio::test] + async fn it_should_make_and_stop_udp_server() { + let env = Started::new(&configuration::ephemeral().into()).await; + sleep(Duration::from_secs(1)).await; + env.stop().await; + sleep(Duration::from_secs(1)).await; + } +}