Skip to content

Commit f2629ce

Browse files
committed
Merge torrust#955: udp: processor for requests
019cf9f udp: processor for requests (Cameron Garnham) Pull request description: Little work to cleanup the server udp launcher module. ACKs for top commit: da2ce7: ACK 019cf9f Tree-SHA512: fc32f73f7969d6c6b7f112306b2132cb853fbbe120979ea5826f835b570dff646fc73df66585a33c3223c85a3fd73373d9afcad6761ec5b67d7dca6a4688547d
2 parents 035d630 + 019cf9f commit f2629ce

File tree

7 files changed

+81
-84
lines changed

7 files changed

+81
-84
lines changed

src/servers/udp/handlers.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::shared::bit_torrent::common::MAX_SCRAPE_TORRENTS;
3333
/// - Delegating the request to the correct handler depending on the request type.
3434
///
3535
/// It will return an `Error` response if the request is invalid.
36-
pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Arc<Tracker>, addr: SocketAddr) -> Response {
36+
pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Tracker, local_addr: SocketAddr) -> Response {
3737
debug!("Handling Packets: {udp_request:?}");
3838

3939
let start_time = Instant::now();
@@ -47,7 +47,7 @@ pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Arc<Tracker
4747
}
4848
}) {
4949
Ok(request) => {
50-
log_request(&request, &request_id, &addr);
50+
log_request(&request, &request_id, &local_addr);
5151

5252
let transaction_id = match &request {
5353
Request::Connect(connect_request) => connect_request.transaction_id,
@@ -62,7 +62,7 @@ pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Arc<Tracker
6262

6363
let latency = start_time.elapsed();
6464

65-
log_response(&response, &transaction_id, &request_id, &addr, latency);
65+
log_response(&response, &transaction_id, &request_id, &local_addr, latency);
6666

6767
response
6868
}

src/servers/udp/server/bound_socket.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
use std::fmt::Debug;
22
use std::net::SocketAddr;
33
use std::ops::Deref;
4-
use std::sync::Arc;
54

65
use url::Url;
76

87
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
98

109
/// Wrapper for Tokio [`UdpSocket`][`tokio::net::UdpSocket`] that is bound to a particular socket.
1110
pub struct BoundSocket {
12-
socket: Arc<tokio::net::UdpSocket>,
11+
socket: tokio::net::UdpSocket,
1312
}
1413

1514
impl BoundSocket {
@@ -30,9 +29,7 @@ impl BoundSocket {
3029
let local_addr = format!("udp://{addr}");
3130
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "UdpSocket::new (bound)");
3231

33-
Ok(Self {
34-
socket: Arc::new(socket),
35-
})
32+
Ok(Self { socket })
3633
}
3734

3835
/// # Panics

src/servers/udp/server/launcher.rs

+5-60
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,23 @@
1-
use std::io::Cursor;
21
use std::net::SocketAddr;
32
use std::sync::Arc;
43
use std::time::Duration;
54

6-
use aquatic_udp_protocol::Response;
75
use derive_more::Constructor;
86
use futures_util::StreamExt;
97
use tokio::select;
108
use tokio::sync::oneshot;
119

1210
use super::request_buffer::ActiveRequests;
13-
use super::RawRequest;
1411
use crate::bootstrap::jobs::Started;
1512
use crate::core::Tracker;
1613
use crate::servers::logging::STARTED_ON;
1714
use crate::servers::registar::ServiceHealthCheckJob;
1815
use crate::servers::signals::{shutdown_signal_with_message, Halted};
1916
use crate::servers::udp::server::bound_socket::BoundSocket;
17+
use crate::servers::udp::server::processor::Processor;
2018
use crate::servers::udp::server::receiver::Receiver;
21-
use crate::servers::udp::{handlers, UDP_TRACKER_LOG_TARGET};
19+
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
2220
use crate::shared::bit_torrent::tracker::udp::client::check;
23-
use crate::shared::bit_torrent::tracker::udp::MAX_PACKET_SIZE;
2421

2522
/// A UDP server instance launcher.
2623
#[derive(Constructor)]
@@ -109,6 +106,8 @@ impl Launcher {
109106
let local_addr = format!("udp://{addr}");
110107

111108
loop {
109+
let processor = Processor::new(receiver.socket.clone(), tracker.clone());
110+
112111
if let Some(req) = {
113112
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server (wait for request)");
114113
receiver.next().await
@@ -138,9 +137,7 @@ impl Launcher {
138137
// are only adding and removing tasks without given them the
139138
// chance to finish. However, the buffer is yielding before
140139
// aborting one tasks, giving it the chance to finish.
141-
let abort_handle: tokio::task::AbortHandle =
142-
tokio::task::spawn(Launcher::process_request(req, tracker.clone(), receiver.bound_socket.clone()))
143-
.abort_handle();
140+
let abort_handle: tokio::task::AbortHandle = tokio::task::spawn(processor.process_request(req)).abort_handle();
144141

145142
if abort_handle.is_finished() {
146143
continue;
@@ -156,56 +153,4 @@ impl Launcher {
156153
}
157154
}
158155
}
159-
160-
async fn process_request(request: RawRequest, tracker: Arc<Tracker>, socket: Arc<BoundSocket>) {
161-
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, request = %request.from, "Udp::process_request (receiving)");
162-
Self::process_valid_request(tracker, socket, request).await;
163-
}
164-
165-
async fn process_valid_request(tracker: Arc<Tracker>, socket: Arc<BoundSocket>, udp_request: RawRequest) {
166-
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, "Udp::process_valid_request. Making Response to {udp_request:?}");
167-
let from = udp_request.from;
168-
let response = handlers::handle_packet(udp_request, &tracker.clone(), socket.address()).await;
169-
Self::send_response(&socket.clone(), from, response).await;
170-
}
171-
172-
async fn send_response(bound_socket: &Arc<BoundSocket>, to: SocketAddr, response: Response) {
173-
let response_type = match &response {
174-
Response::Connect(_) => "Connect".to_string(),
175-
Response::AnnounceIpv4(_) => "AnnounceIpv4".to_string(),
176-
Response::AnnounceIpv6(_) => "AnnounceIpv6".to_string(),
177-
Response::Scrape(_) => "Scrape".to_string(),
178-
Response::Error(e) => format!("Error: {e:?}"),
179-
};
180-
181-
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, target = ?to, response_type, "Udp::send_response (sending)");
182-
183-
let buffer = vec![0u8; MAX_PACKET_SIZE];
184-
let mut cursor = Cursor::new(buffer);
185-
186-
match response.write_bytes(&mut cursor) {
187-
Ok(()) => {
188-
#[allow(clippy::cast_possible_truncation)]
189-
let position = cursor.position() as usize;
190-
let inner = cursor.get_ref();
191-
192-
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sending...)" );
193-
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), payload = ?&inner[..position], "Udp::send_response (sending...)");
194-
195-
Self::send_packet(bound_socket, &to, &inner[..position]).await;
196-
197-
tracing::trace!(target:UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sent)");
198-
}
199-
Err(e) => {
200-
tracing::error!(target: UDP_TRACKER_LOG_TARGET, ?to, response_type, err = %e, "Udp::send_response (error)");
201-
}
202-
}
203-
}
204-
205-
async fn send_packet(bound_socket: &Arc<BoundSocket>, remote_addr: &SocketAddr, payload: &[u8]) {
206-
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, to = %remote_addr, ?payload, "Udp::send_response (sending)");
207-
208-
// doesn't matter if it reaches or not
209-
drop(bound_socket.send_to(payload, remote_addr).await);
210-
}
211156
}

src/servers/udp/server/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use super::RawRequest;
55

66
pub mod bound_socket;
77
pub mod launcher;
8+
pub mod processor;
89
pub mod receiver;
910
pub mod request_buffer;
1011
pub mod spawner;

src/servers/udp/server/processor.rs

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use std::io::Cursor;
2+
use std::net::SocketAddr;
3+
use std::sync::Arc;
4+
5+
use aquatic_udp_protocol::Response;
6+
7+
use super::bound_socket::BoundSocket;
8+
use crate::core::Tracker;
9+
use crate::servers::udp::{handlers, RawRequest, UDP_TRACKER_LOG_TARGET};
10+
11+
pub struct Processor {
12+
socket: Arc<BoundSocket>,
13+
tracker: Arc<Tracker>,
14+
}
15+
16+
impl Processor {
17+
pub fn new(socket: Arc<BoundSocket>, tracker: Arc<Tracker>) -> Self {
18+
Self { socket, tracker }
19+
}
20+
21+
pub async fn process_request(self, request: RawRequest) {
22+
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, request = %request.from, "Udp::process_request (receiving)");
23+
24+
let from = request.from;
25+
let response = handlers::handle_packet(request, &self.tracker, self.socket.address()).await;
26+
self.send_response(from, response).await;
27+
}
28+
29+
async fn send_response(self, to: SocketAddr, response: Response) {
30+
let response_type = match &response {
31+
Response::Connect(_) => "Connect".to_string(),
32+
Response::AnnounceIpv4(_) => "AnnounceIpv4".to_string(),
33+
Response::AnnounceIpv6(_) => "AnnounceIpv6".to_string(),
34+
Response::Scrape(_) => "Scrape".to_string(),
35+
Response::Error(e) => format!("Error: {e:?}"),
36+
};
37+
38+
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, target = ?to, response_type, "Udp::send_response (sending)");
39+
40+
let mut writer = Cursor::new(Vec::with_capacity(200));
41+
42+
match response.write_bytes(&mut writer) {
43+
Ok(()) => {
44+
let bytes_count = writer.get_ref().len();
45+
let payload = writer.get_ref();
46+
47+
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count, "Udp::send_response (sending...)" );
48+
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count, ?payload, "Udp::send_response (sending...)");
49+
50+
self.send_packet(&to, payload).await;
51+
52+
tracing::trace!(target:UDP_TRACKER_LOG_TARGET, ?to, bytes_count, "Udp::send_response (sent)");
53+
}
54+
Err(e) => {
55+
tracing::error!(target: UDP_TRACKER_LOG_TARGET, ?to, response_type, err = %e, "Udp::send_response (error)");
56+
}
57+
}
58+
}
59+
60+
async fn send_packet(&self, remote_addr: &SocketAddr, payload: &[u8]) {
61+
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, to = %remote_addr, ?payload, "Udp::send_response (sending)");
62+
63+
// doesn't matter if it reaches or not
64+
drop(self.socket.send_to(payload, remote_addr).await);
65+
}
66+
}

src/servers/udp/server/receiver.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,21 @@ use super::RawRequest;
1111
use crate::shared::bit_torrent::tracker::udp::MAX_PACKET_SIZE;
1212

1313
pub struct Receiver {
14-
pub bound_socket: Arc<BoundSocket>,
14+
pub socket: Arc<BoundSocket>,
1515
data: RefCell<[u8; MAX_PACKET_SIZE]>,
1616
}
1717

1818
impl Receiver {
1919
#[must_use]
2020
pub fn new(bound_socket: Arc<BoundSocket>) -> Self {
2121
Receiver {
22-
bound_socket,
22+
socket: bound_socket,
2323
data: RefCell::new([0; MAX_PACKET_SIZE]),
2424
}
2525
}
2626

2727
pub fn bound_socket_address(&self) -> SocketAddr {
28-
self.bound_socket.address()
28+
self.socket.address()
2929
}
3030
}
3131

@@ -36,7 +36,7 @@ impl Stream for Receiver {
3636
let mut buf = *self.data.borrow_mut();
3737
let mut buf = tokio::io::ReadBuf::new(&mut buf);
3838

39-
let Poll::Ready(ready) = self.bound_socket.poll_recv_from(cx, &mut buf) else {
39+
let Poll::Ready(ready) = self.socket.poll_recv_from(cx, &mut buf) else {
4040
return Poll::Pending;
4141
};
4242

src/shared/bit_torrent/common.rs

-12
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! `BitTorrent` protocol primitive types
22
//!
33
//! [BEP 3. The `BitTorrent` Protocol Specification](https://www.bittorrent.org/beps/bep_0003.html)
4-
use serde::{Deserialize, Serialize};
54
65
/// The maximum number of torrents that can be returned in an `scrape` response.
76
///
@@ -21,14 +20,3 @@ pub const MAX_SCRAPE_TORRENTS: u8 = 74;
2120
/// See function to [`generate`](crate::core::auth::generate) the
2221
/// [`ExpiringKeys`](crate::core::auth::ExpiringKey) for more information.
2322
pub const AUTH_KEY_LENGTH: usize = 32;
24-
25-
#[repr(u32)]
26-
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
27-
enum Actions {
28-
// todo: it seems this enum is not used anywhere. Values match the ones in
29-
// aquatic_udp_protocol::request::Request::from_bytes.
30-
Connect = 0,
31-
Announce = 1,
32-
Scrape = 2,
33-
Error = 3,
34-
}

0 commit comments

Comments
 (0)