Skip to content

Commit 011fdb7

Browse files
committedFeb 1, 2024
refactor: [torrust#639] Tracker Checker: extract checker:Client to check UDP servers
It will be used in teh Tracker Checker too.
1 parent 3b735a7 commit 011fdb7

File tree

4 files changed

+241
-125
lines changed

4 files changed

+241
-125
lines changed
 

‎src/console/clients/udp/app.rs

+24-125
Original file line numberDiff line numberDiff line change
@@ -56,25 +56,21 @@
5656
//! ```
5757
//!
5858
//! The protocol (`udp://`) in the URL is mandatory. The path (`\scrape`) is optional. It always uses `\scrape`.
59-
use std::net::{Ipv4Addr, SocketAddr, ToSocketAddrs};
59+
use std::net::{SocketAddr, ToSocketAddrs};
6060
use std::str::FromStr;
6161

6262
use anyhow::Context;
63-
use aquatic_udp_protocol::common::InfoHash;
6463
use aquatic_udp_protocol::Response::{AnnounceIpv4, AnnounceIpv6, Scrape};
65-
use aquatic_udp_protocol::{
66-
AnnounceEvent, AnnounceRequest, ConnectRequest, ConnectionId, NumberOfBytes, NumberOfPeers, PeerId, PeerKey, Port, Response,
67-
ScrapeRequest, TransactionId,
68-
};
64+
use aquatic_udp_protocol::{Port, TransactionId};
6965
use clap::{Parser, Subcommand};
7066
use log::{debug, LevelFilter};
7167
use serde_json::json;
7268
use url::Url;
7369

70+
use crate::console::clients::udp::checker;
7471
use crate::shared::bit_torrent::info_hash::InfoHash as TorrustInfoHash;
75-
use crate::shared::bit_torrent::tracker::udp::client::{UdpClient, UdpTrackerClient};
7672

77-
const ASSIGNED_BY_OS: i32 = 0;
73+
const ASSIGNED_BY_OS: u16 = 0;
7874
const RANDOM_TRANSACTION_ID: i32 = -888_840_697;
7975

8076
#[derive(Parser, Debug)]
@@ -110,41 +106,36 @@ pub async fn run() -> anyhow::Result<()> {
110106

111107
let args = Args::parse();
112108

113-
// Configuration
114-
let local_port = ASSIGNED_BY_OS;
115-
let local_bind_to = format!("0.0.0.0:{local_port}");
116-
let transaction_id = RANDOM_TRANSACTION_ID;
117-
118-
// Bind to local port
119-
debug!("Binding to: {local_bind_to}");
120-
let udp_client = UdpClient::bind(&local_bind_to).await;
121-
let bound_to = udp_client.socket.local_addr().context("binding local address")?;
122-
debug!("Bound to: {bound_to}");
123-
124-
let transaction_id = TransactionId(transaction_id);
125-
126109
let response = match args.command {
127110
Command::Announce {
128111
tracker_socket_addr,
129112
info_hash,
130113
} => {
131-
let (connection_id, udp_tracker_client) = connect(&tracker_socket_addr, udp_client, transaction_id).await;
132-
133-
send_announce_request(
134-
connection_id,
135-
transaction_id,
136-
info_hash,
137-
Port(bound_to.port()),
138-
&udp_tracker_client,
139-
)
140-
.await
114+
let transaction_id = TransactionId(RANDOM_TRANSACTION_ID);
115+
116+
let mut client = checker::Client::default();
117+
118+
let bound_to = client.bind_and_connect(ASSIGNED_BY_OS, &tracker_socket_addr).await?;
119+
120+
let connection_id = client.send_connection_request(transaction_id).await?;
121+
122+
client
123+
.send_announce_request(connection_id, transaction_id, info_hash, Port(bound_to.port()))
124+
.await?
141125
}
142126
Command::Scrape {
143127
tracker_socket_addr,
144128
info_hashes,
145129
} => {
146-
let (connection_id, udp_tracker_client) = connect(&tracker_socket_addr, udp_client, transaction_id).await;
147-
send_scrape_request(connection_id, transaction_id, info_hashes, &udp_tracker_client).await
130+
let transaction_id = TransactionId(RANDOM_TRANSACTION_ID);
131+
132+
let mut client = checker::Client::default();
133+
134+
let _bound_to = client.bind_and_connect(ASSIGNED_BY_OS, &tracker_socket_addr).await?;
135+
136+
let connection_id = client.send_connection_request(transaction_id).await?;
137+
138+
client.send_scrape_request(connection_id, transaction_id, info_hashes).await?
148139
}
149140
};
150141

@@ -265,95 +256,3 @@ fn parse_info_hash(info_hash_str: &str) -> anyhow::Result<TorrustInfoHash> {
265256
TorrustInfoHash::from_str(info_hash_str)
266257
.map_err(|e| anyhow::Error::msg(format!("failed to parse info-hash `{info_hash_str}`: {e:?}")))
267258
}
268-
269-
async fn connect(
270-
tracker_socket_addr: &SocketAddr,
271-
udp_client: UdpClient,
272-
transaction_id: TransactionId,
273-
) -> (ConnectionId, UdpTrackerClient) {
274-
debug!("Connecting to tracker: udp://{tracker_socket_addr}");
275-
276-
udp_client.connect(&tracker_socket_addr.to_string()).await;
277-
278-
let udp_tracker_client = UdpTrackerClient { udp_client };
279-
280-
let connection_id = send_connection_request(transaction_id, &udp_tracker_client).await;
281-
282-
(connection_id, udp_tracker_client)
283-
}
284-
285-
async fn send_connection_request(transaction_id: TransactionId, client: &UdpTrackerClient) -> ConnectionId {
286-
debug!("Sending connection request with transaction id: {transaction_id:#?}");
287-
288-
let connect_request = ConnectRequest { transaction_id };
289-
290-
client.send(connect_request.into()).await;
291-
292-
let response = client.receive().await;
293-
294-
debug!("connection request response:\n{response:#?}");
295-
296-
match response {
297-
Response::Connect(connect_response) => connect_response.connection_id,
298-
_ => panic!("error connecting to udp server. Unexpected response"),
299-
}
300-
}
301-
302-
async fn send_announce_request(
303-
connection_id: ConnectionId,
304-
transaction_id: TransactionId,
305-
info_hash: TorrustInfoHash,
306-
port: Port,
307-
client: &UdpTrackerClient,
308-
) -> Response {
309-
debug!("Sending announce request with transaction id: {transaction_id:#?}");
310-
311-
let announce_request = AnnounceRequest {
312-
connection_id,
313-
transaction_id,
314-
info_hash: InfoHash(info_hash.bytes()),
315-
peer_id: PeerId(*b"-qB00000000000000001"),
316-
bytes_downloaded: NumberOfBytes(0i64),
317-
bytes_uploaded: NumberOfBytes(0i64),
318-
bytes_left: NumberOfBytes(0i64),
319-
event: AnnounceEvent::Started,
320-
ip_address: Some(Ipv4Addr::new(0, 0, 0, 0)),
321-
key: PeerKey(0u32),
322-
peers_wanted: NumberOfPeers(1i32),
323-
port,
324-
};
325-
326-
client.send(announce_request.into()).await;
327-
328-
let response = client.receive().await;
329-
330-
debug!("announce request response:\n{response:#?}");
331-
332-
response
333-
}
334-
335-
async fn send_scrape_request(
336-
connection_id: ConnectionId,
337-
transaction_id: TransactionId,
338-
info_hashes: Vec<TorrustInfoHash>,
339-
client: &UdpTrackerClient,
340-
) -> Response {
341-
debug!("Sending scrape request with transaction id: {transaction_id:#?}");
342-
343-
let scrape_request = ScrapeRequest {
344-
connection_id,
345-
transaction_id,
346-
info_hashes: info_hashes
347-
.iter()
348-
.map(|torrust_info_hash| InfoHash(torrust_info_hash.bytes()))
349-
.collect(),
350-
};
351-
352-
client.send(scrape_request.into()).await;
353-
354-
let response = client.receive().await;
355-
356-
debug!("scrape request response:\n{response:#?}");
357-
358-
response
359-
}

‎src/console/clients/udp/checker.rs

+214
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
use std::net::{Ipv4Addr, SocketAddr};
2+
3+
use anyhow::Context;
4+
use aquatic_udp_protocol::common::InfoHash;
5+
use aquatic_udp_protocol::{
6+
AnnounceEvent, AnnounceRequest, ConnectRequest, ConnectionId, NumberOfBytes, NumberOfPeers, PeerId, PeerKey, Port, Response,
7+
ScrapeRequest, TransactionId,
8+
};
9+
use log::debug;
10+
use thiserror::Error;
11+
12+
use crate::shared::bit_torrent::info_hash::InfoHash as TorrustInfoHash;
13+
use crate::shared::bit_torrent::tracker::udp::client::{UdpClient, UdpTrackerClient};
14+
15+
#[derive(Error, Debug)]
16+
pub enum ClientError {
17+
#[error("Local socket address is not bound yet. Try binding before connecting.")]
18+
NotBound,
19+
#[error("Not connected to remote tracker UDP socket. Try connecting before making requests.")]
20+
NotConnected,
21+
#[error("Unexpected response while connecting the the remote server.")]
22+
UnexpectedConnectionResponse,
23+
}
24+
25+
/// A UDP Tracker client to make test requests (checks).
26+
#[derive(Debug, Default)]
27+
pub struct Client {
28+
/// Local UDP socket. It could be 0 to assign a free port.
29+
local_binding_address: Option<SocketAddr>,
30+
31+
/// Local UDP socket after binding. It's equals to binding address if a
32+
/// non- zero port was used.
33+
local_bound_address: Option<SocketAddr>,
34+
35+
/// Remote UDP tracker socket
36+
remote_socket: Option<SocketAddr>,
37+
38+
/// The client used to make UDP requests to the tracker.
39+
udp_tracker_client: Option<UdpTrackerClient>,
40+
}
41+
42+
impl Client {
43+
/// Binds to the local socket and connects to the remote one.
44+
///
45+
/// # Errors
46+
///
47+
/// Will return an error if
48+
///
49+
/// - It can't bound to the local socket address.
50+
/// - It can't make a connection request successfully to the remote UDP server.
51+
pub async fn bind_and_connect(&mut self, local_port: u16, remote_socket_addr: &SocketAddr) -> anyhow::Result<SocketAddr> {
52+
let bound_to = self.bind(local_port).await?;
53+
self.connect(remote_socket_addr).await?;
54+
Ok(bound_to)
55+
}
56+
57+
/// Binds local client socket.
58+
///
59+
/// # Errors
60+
///
61+
/// Will return an error if it can't bound to the local address.
62+
async fn bind(&mut self, local_port: u16) -> anyhow::Result<SocketAddr> {
63+
let local_bind_to = format!("0.0.0.0:{local_port}");
64+
let binding_address = local_bind_to.parse().context("binding local address")?;
65+
66+
debug!("Binding to: {local_bind_to}");
67+
let udp_client = UdpClient::bind(&local_bind_to).await;
68+
69+
let bound_to = udp_client.socket.local_addr().context("bound local address")?;
70+
debug!("Bound to: {bound_to}");
71+
72+
self.local_binding_address = Some(binding_address);
73+
self.local_bound_address = Some(bound_to);
74+
75+
self.udp_tracker_client = Some(UdpTrackerClient { udp_client });
76+
77+
Ok(bound_to)
78+
}
79+
80+
/// Connects to the remote server socket.
81+
///
82+
/// # Errors
83+
///
84+
/// Will return and error if it can't make a connection request successfully
85+
/// to the remote UDP server.
86+
async fn connect(&mut self, tracker_socket_addr: &SocketAddr) -> anyhow::Result<()> {
87+
debug!("Connecting to tracker: udp://{tracker_socket_addr}");
88+
89+
match &self.udp_tracker_client {
90+
Some(client) => {
91+
client.udp_client.connect(&tracker_socket_addr.to_string()).await;
92+
self.remote_socket = Some(*tracker_socket_addr);
93+
Ok(())
94+
}
95+
None => Err(ClientError::NotBound.into()),
96+
}
97+
}
98+
99+
/// Sends a connection request to the UDP Tracker server.
100+
///
101+
/// # Errors
102+
///
103+
/// Will return and error if
104+
///
105+
/// - It can't connect to the remote UDP socket.
106+
/// - It can't make a connection request successfully to the remote UDP
107+
/// server (after successfully connecting to the remote UDP socket).
108+
///
109+
/// # Panics
110+
///
111+
/// Will panic if it receives an unexpected response.
112+
pub async fn send_connection_request(&self, transaction_id: TransactionId) -> anyhow::Result<ConnectionId> {
113+
debug!("Sending connection request with transaction id: {transaction_id:#?}");
114+
115+
let connect_request = ConnectRequest { transaction_id };
116+
117+
match &self.udp_tracker_client {
118+
Some(client) => {
119+
client.send(connect_request.into()).await;
120+
121+
let response = client.receive().await;
122+
123+
debug!("connection request response:\n{response:#?}");
124+
125+
match response {
126+
Response::Connect(connect_response) => Ok(connect_response.connection_id),
127+
_ => Err(ClientError::UnexpectedConnectionResponse.into()),
128+
}
129+
}
130+
None => Err(ClientError::NotConnected.into()),
131+
}
132+
}
133+
134+
/// Sends an announce request to the UDP Tracker server.
135+
///
136+
/// # Errors
137+
///
138+
/// Will return and error if the client is not connected. You have to connect
139+
/// before calling this function.
140+
pub async fn send_announce_request(
141+
&self,
142+
connection_id: ConnectionId,
143+
transaction_id: TransactionId,
144+
info_hash: TorrustInfoHash,
145+
client_port: Port,
146+
) -> anyhow::Result<Response> {
147+
debug!("Sending announce request with transaction id: {transaction_id:#?}");
148+
149+
let announce_request = AnnounceRequest {
150+
connection_id,
151+
transaction_id,
152+
info_hash: InfoHash(info_hash.bytes()),
153+
peer_id: PeerId(*b"-qB00000000000000001"),
154+
bytes_downloaded: NumberOfBytes(0i64),
155+
bytes_uploaded: NumberOfBytes(0i64),
156+
bytes_left: NumberOfBytes(0i64),
157+
event: AnnounceEvent::Started,
158+
ip_address: Some(Ipv4Addr::new(0, 0, 0, 0)),
159+
key: PeerKey(0u32),
160+
peers_wanted: NumberOfPeers(1i32),
161+
port: client_port,
162+
};
163+
164+
match &self.udp_tracker_client {
165+
Some(client) => {
166+
client.send(announce_request.into()).await;
167+
168+
let response = client.receive().await;
169+
170+
debug!("announce request response:\n{response:#?}");
171+
172+
Ok(response)
173+
}
174+
None => Err(ClientError::NotConnected.into()),
175+
}
176+
}
177+
178+
/// Sends a scrape request to the UDP Tracker server.
179+
///
180+
/// # Errors
181+
///
182+
/// Will return and error if the client is not connected. You have to connect
183+
/// before calling this function.
184+
pub async fn send_scrape_request(
185+
&self,
186+
connection_id: ConnectionId,
187+
transaction_id: TransactionId,
188+
info_hashes: Vec<TorrustInfoHash>,
189+
) -> anyhow::Result<Response> {
190+
debug!("Sending scrape request with transaction id: {transaction_id:#?}");
191+
192+
let scrape_request = ScrapeRequest {
193+
connection_id,
194+
transaction_id,
195+
info_hashes: info_hashes
196+
.iter()
197+
.map(|torrust_info_hash| InfoHash(torrust_info_hash.bytes()))
198+
.collect(),
199+
};
200+
201+
match &self.udp_tracker_client {
202+
Some(client) => {
203+
client.send(scrape_request.into()).await;
204+
205+
let response = client.receive().await;
206+
207+
debug!("scrape request response:\n{response:#?}");
208+
209+
Ok(response)
210+
}
211+
None => Err(ClientError::NotConnected.into()),
212+
}
213+
}
214+
}

‎src/console/clients/udp/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub mod app;
2+
pub mod checker;

‎src/shared/bit_torrent/tracker/udp/client.rs

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use tokio::time;
1111
use crate::shared::bit_torrent::tracker::udp::{source_address, MAX_PACKET_SIZE};
1212

1313
#[allow(clippy::module_name_repetitions)]
14+
#[derive(Debug)]
1415
pub struct UdpClient {
1516
pub socket: Arc<UdpSocket>,
1617
}
@@ -86,6 +87,7 @@ pub async fn new_udp_client_connected(remote_address: &str) -> UdpClient {
8687
}
8788

8889
#[allow(clippy::module_name_repetitions)]
90+
#[derive(Debug)]
8991
pub struct UdpTrackerClient {
9092
pub udp_client: UdpClient,
9193
}

0 commit comments

Comments
 (0)
Please sign in to comment.