Skip to content

Commit aabfedb

Browse files
committed
refactor: [torrust#639] Tracker Checker: extract checker:Client to check UDP servers
It will be used in teh Tracker Checker too.
1 parent 8d07a34 commit aabfedb

File tree

4 files changed

+216
-113
lines changed

4 files changed

+216
-113
lines changed

src/console/clients/udp/app.rs

+28-113
Original file line numberDiff line numberDiff line change
@@ -56,25 +56,22 @@
5656
//! ```
5757
//!
5858
//! The protocol (`udp://`) in the URL is mandatory. The path (`\scrape`) is optional. It always uses `\scrape`.
59-
use std::net::{Ipv4Addr, SocketAddr, ToSocketAddrs};
59+
use std::net::{SocketAddr, ToSocketAddrs};
6060
use std::str::FromStr;
6161

6262
use anyhow::Context;
63-
use aquatic_udp_protocol::common::InfoHash;
6463
use aquatic_udp_protocol::Response::{AnnounceIpv4, AnnounceIpv6, Scrape};
65-
use aquatic_udp_protocol::{
66-
AnnounceEvent, AnnounceRequest, ConnectRequest, ConnectionId, NumberOfBytes, NumberOfPeers, PeerId, PeerKey, Port, Response,
67-
ScrapeRequest, TransactionId,
68-
};
64+
use aquatic_udp_protocol::{Port, TransactionId};
6965
use clap::{Parser, Subcommand};
70-
use log::{debug, LevelFilter};
66+
use log::{debug, info, LevelFilter};
7167
use serde_json::json;
7268
use url::Url;
7369

70+
use crate::console::clients::udp::checker;
7471
use crate::shared::bit_torrent::info_hash::InfoHash as TorrustInfoHash;
75-
use crate::shared::bit_torrent::tracker::udp::client::{UdpClient, UdpTrackerClient};
72+
use crate::shared::bit_torrent::tracker::udp::client::UdpClient;
7673

77-
const ASSIGNED_BY_OS: i32 = 0;
74+
const ASSIGNED_BY_OS: u16 = 0;
7875
const RANDOM_TRANSACTION_ID: i32 = -888_840_697;
7976

8077
#[derive(Parser, Debug)]
@@ -110,6 +107,8 @@ pub async fn run() -> anyhow::Result<()> {
110107

111108
let args = Args::parse();
112109

110+
let mut new_udp_client = checker::Client::default();
111+
113112
// Configuration
114113
let local_port = ASSIGNED_BY_OS;
115114
let local_bind_to = format!("0.0.0.0:{local_port}");
@@ -121,30 +120,38 @@ pub async fn run() -> anyhow::Result<()> {
121120
let bound_to = udp_client.socket.local_addr().context("binding local address")?;
122121
debug!("Bound to: {bound_to}");
123122

123+
let bound_to_2 = new_udp_client.bind(local_port).await?;
124+
info!("Bound to 2: {bound_to_2}");
125+
124126
let transaction_id = TransactionId(transaction_id);
125127

126128
let response = match args.command {
127129
Command::Announce {
128130
tracker_socket_addr,
129131
info_hash,
130132
} => {
131-
let (connection_id, udp_tracker_client) = connect(&tracker_socket_addr, udp_client, transaction_id).await;
132-
133-
send_announce_request(
134-
connection_id,
135-
transaction_id,
136-
info_hash,
137-
Port(bound_to.port()),
138-
&udp_tracker_client,
139-
)
140-
.await
133+
let connection_id = new_udp_client
134+
.connect(&tracker_socket_addr, udp_client, transaction_id)
135+
.await?;
136+
137+
new_udp_client
138+
.send_announce_request(connection_id, transaction_id, info_hash, Port(bound_to.port()))
139+
.await?
141140
}
142141
Command::Scrape {
143142
tracker_socket_addr,
144143
info_hashes,
145144
} => {
146-
let (connection_id, udp_tracker_client) = connect(&tracker_socket_addr, udp_client, transaction_id).await;
147-
send_scrape_request(connection_id, transaction_id, info_hashes, &udp_tracker_client).await
145+
//let (connection_id, udp_tracker_client) = connect(&tracker_socket_addr, udp_client, transaction_id).await;
146+
//send_scrape_request(connection_id, transaction_id, info_hashes, &udp_tracker_client).await
147+
148+
let connection_id = new_udp_client
149+
.connect(&tracker_socket_addr, udp_client, transaction_id)
150+
.await?;
151+
152+
new_udp_client
153+
.send_scrape_request(connection_id, transaction_id, info_hashes)
154+
.await?
148155
}
149156
};
150157

@@ -265,95 +272,3 @@ fn parse_info_hash(info_hash_str: &str) -> anyhow::Result<TorrustInfoHash> {
265272
TorrustInfoHash::from_str(info_hash_str)
266273
.map_err(|e| anyhow::Error::msg(format!("failed to parse info-hash `{info_hash_str}`: {e:?}")))
267274
}
268-
269-
async fn connect(
270-
tracker_socket_addr: &SocketAddr,
271-
udp_client: UdpClient,
272-
transaction_id: TransactionId,
273-
) -> (ConnectionId, UdpTrackerClient) {
274-
debug!("Connecting to tracker: udp://{tracker_socket_addr}");
275-
276-
udp_client.connect(&tracker_socket_addr.to_string()).await;
277-
278-
let udp_tracker_client = UdpTrackerClient { udp_client };
279-
280-
let connection_id = send_connection_request(transaction_id, &udp_tracker_client).await;
281-
282-
(connection_id, udp_tracker_client)
283-
}
284-
285-
async fn send_connection_request(transaction_id: TransactionId, client: &UdpTrackerClient) -> ConnectionId {
286-
debug!("Sending connection request with transaction id: {transaction_id:#?}");
287-
288-
let connect_request = ConnectRequest { transaction_id };
289-
290-
client.send(connect_request.into()).await;
291-
292-
let response = client.receive().await;
293-
294-
debug!("connection request response:\n{response:#?}");
295-
296-
match response {
297-
Response::Connect(connect_response) => connect_response.connection_id,
298-
_ => panic!("error connecting to udp server. Unexpected response"),
299-
}
300-
}
301-
302-
async fn send_announce_request(
303-
connection_id: ConnectionId,
304-
transaction_id: TransactionId,
305-
info_hash: TorrustInfoHash,
306-
port: Port,
307-
client: &UdpTrackerClient,
308-
) -> Response {
309-
debug!("Sending announce request with transaction id: {transaction_id:#?}");
310-
311-
let announce_request = AnnounceRequest {
312-
connection_id,
313-
transaction_id,
314-
info_hash: InfoHash(info_hash.bytes()),
315-
peer_id: PeerId(*b"-qB00000000000000001"),
316-
bytes_downloaded: NumberOfBytes(0i64),
317-
bytes_uploaded: NumberOfBytes(0i64),
318-
bytes_left: NumberOfBytes(0i64),
319-
event: AnnounceEvent::Started,
320-
ip_address: Some(Ipv4Addr::new(0, 0, 0, 0)),
321-
key: PeerKey(0u32),
322-
peers_wanted: NumberOfPeers(1i32),
323-
port,
324-
};
325-
326-
client.send(announce_request.into()).await;
327-
328-
let response = client.receive().await;
329-
330-
debug!("announce request response:\n{response:#?}");
331-
332-
response
333-
}
334-
335-
async fn send_scrape_request(
336-
connection_id: ConnectionId,
337-
transaction_id: TransactionId,
338-
info_hashes: Vec<TorrustInfoHash>,
339-
client: &UdpTrackerClient,
340-
) -> Response {
341-
debug!("Sending scrape request with transaction id: {transaction_id:#?}");
342-
343-
let scrape_request = ScrapeRequest {
344-
connection_id,
345-
transaction_id,
346-
info_hashes: info_hashes
347-
.iter()
348-
.map(|torrust_info_hash| InfoHash(torrust_info_hash.bytes()))
349-
.collect(),
350-
};
351-
352-
client.send(scrape_request.into()).await;
353-
354-
let response = client.receive().await;
355-
356-
debug!("scrape request response:\n{response:#?}");
357-
358-
response
359-
}

src/console/clients/udp/checker.rs

+185
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
use std::net::{Ipv4Addr, SocketAddr};
2+
3+
use anyhow::Context;
4+
use aquatic_udp_protocol::common::InfoHash;
5+
use aquatic_udp_protocol::{
6+
AnnounceEvent, AnnounceRequest, ConnectRequest, ConnectionId, NumberOfBytes, NumberOfPeers, PeerId, PeerKey, Port, Response,
7+
ScrapeRequest, TransactionId,
8+
};
9+
use log::debug;
10+
use thiserror::Error;
11+
12+
use crate::shared::bit_torrent::info_hash::InfoHash as TorrustInfoHash;
13+
use crate::shared::bit_torrent::tracker::udp::client::{UdpClient, UdpTrackerClient};
14+
15+
#[derive(Error, Debug)]
16+
pub enum ClientError {
17+
#[error("Not connected to remote tracker UDP socket. Try connecting before making requests.")]
18+
NotConnected,
19+
#[error("Unexpected response while connecting the the remote server.")]
20+
UnexpectedConnectionResponse,
21+
}
22+
23+
#[derive(Debug, Default)]
24+
pub struct Client {
25+
// Local socket
26+
local_binding_address: Option<SocketAddr>,
27+
local_bound_address: Option<SocketAddr>,
28+
29+
// Remote socket
30+
remote_socket: Option<SocketAddr>,
31+
32+
udp_tracker_client: Option<UdpTrackerClient>,
33+
}
34+
35+
impl Client {
36+
/// # Errors
37+
///
38+
/// Will return an error if it can't get the bound local address.
39+
pub async fn bind(&mut self, local_port: u16) -> anyhow::Result<SocketAddr> {
40+
let local_bind_to = format!("0.0.0.0:{local_port}");
41+
let binding_address = local_bind_to.parse().context("binding local address")?;
42+
43+
debug!("Binding to: {local_bind_to}");
44+
let udp_client = UdpClient::bind(&local_bind_to).await;
45+
46+
let bound_to = udp_client.socket.local_addr().context("bound local address")?;
47+
debug!("Bound to: {bound_to}");
48+
49+
self.local_binding_address = Some(binding_address);
50+
self.local_bound_address = Some(bound_to);
51+
52+
Ok(bound_to)
53+
}
54+
55+
/// # Errors
56+
///
57+
/// Will return and error if it can't make a connection request successfully
58+
/// to the remote UDP server.
59+
pub async fn connect(
60+
&mut self,
61+
tracker_socket_addr: &SocketAddr,
62+
udp_client: UdpClient,
63+
transaction_id: TransactionId,
64+
) -> anyhow::Result<ConnectionId> {
65+
debug!("Connecting to tracker: udp://{tracker_socket_addr}");
66+
67+
udp_client.connect(&tracker_socket_addr.to_string()).await;
68+
69+
self.remote_socket = Some(*tracker_socket_addr);
70+
71+
self.udp_tracker_client = Some(UdpTrackerClient { udp_client });
72+
73+
self.send_connection_request(transaction_id).await
74+
}
75+
76+
/// # Errors
77+
///
78+
/// Will return and error if
79+
///
80+
/// - It can't connect to the remote UDP socket.
81+
/// - It can't make a connection request successfully to the remote UDP
82+
/// server (after successfully connecting to the remote UDP socket).
83+
///
84+
/// # Panics
85+
///
86+
/// Will panic if it receives an unexpected response.
87+
pub async fn send_connection_request(&self, transaction_id: TransactionId) -> anyhow::Result<ConnectionId> {
88+
debug!("Sending connection request with transaction id: {transaction_id:#?}");
89+
90+
let connect_request = ConnectRequest { transaction_id };
91+
92+
match &self.udp_tracker_client {
93+
Some(client) => {
94+
client.send(connect_request.into()).await;
95+
96+
let response = client.receive().await;
97+
98+
debug!("connection request response:\n{response:#?}");
99+
100+
match response {
101+
Response::Connect(connect_response) => Ok(connect_response.connection_id),
102+
_ => Err(ClientError::UnexpectedConnectionResponse.into()),
103+
}
104+
}
105+
None => Err(ClientError::NotConnected.into()),
106+
}
107+
}
108+
109+
/// # Errors
110+
///
111+
/// Will return and error if the client is not connected. You have to connect
112+
/// before calling this function.
113+
pub async fn send_announce_request(
114+
&self,
115+
connection_id: ConnectionId,
116+
transaction_id: TransactionId,
117+
info_hash: TorrustInfoHash,
118+
port: Port,
119+
) -> anyhow::Result<Response> {
120+
debug!("Sending announce request with transaction id: {transaction_id:#?}");
121+
122+
let announce_request = AnnounceRequest {
123+
connection_id,
124+
transaction_id,
125+
info_hash: InfoHash(info_hash.bytes()),
126+
peer_id: PeerId(*b"-qB00000000000000001"),
127+
bytes_downloaded: NumberOfBytes(0i64),
128+
bytes_uploaded: NumberOfBytes(0i64),
129+
bytes_left: NumberOfBytes(0i64),
130+
event: AnnounceEvent::Started,
131+
ip_address: Some(Ipv4Addr::new(0, 0, 0, 0)),
132+
key: PeerKey(0u32),
133+
peers_wanted: NumberOfPeers(1i32),
134+
port,
135+
};
136+
137+
match &self.udp_tracker_client {
138+
Some(client) => {
139+
client.send(announce_request.into()).await;
140+
141+
let response = client.receive().await;
142+
143+
debug!("announce request response:\n{response:#?}");
144+
145+
Ok(response)
146+
}
147+
None => Err(ClientError::NotConnected.into()),
148+
}
149+
}
150+
151+
/// # Errors
152+
///
153+
/// Will return and error if the client is not connected. You have to connect
154+
/// before calling this function.
155+
pub async fn send_scrape_request(
156+
&self,
157+
connection_id: ConnectionId,
158+
transaction_id: TransactionId,
159+
info_hashes: Vec<TorrustInfoHash>,
160+
) -> anyhow::Result<Response> {
161+
debug!("Sending scrape request with transaction id: {transaction_id:#?}");
162+
163+
let scrape_request = ScrapeRequest {
164+
connection_id,
165+
transaction_id,
166+
info_hashes: info_hashes
167+
.iter()
168+
.map(|torrust_info_hash| InfoHash(torrust_info_hash.bytes()))
169+
.collect(),
170+
};
171+
172+
match &self.udp_tracker_client {
173+
Some(client) => {
174+
client.send(scrape_request.into()).await;
175+
176+
let response = client.receive().await;
177+
178+
debug!("scrape request response:\n{response:#?}");
179+
180+
Ok(response)
181+
}
182+
None => Err(ClientError::NotConnected.into()),
183+
}
184+
}
185+
}

src/console/clients/udp/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub mod app;
2+
pub mod checker;

src/shared/bit_torrent/tracker/udp/client.rs

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use tokio::time;
1111
use crate::shared::bit_torrent::tracker::udp::{source_address, MAX_PACKET_SIZE};
1212

1313
#[allow(clippy::module_name_repetitions)]
14+
#[derive(Debug)]
1415
pub struct UdpClient {
1516
pub socket: Arc<UdpSocket>,
1617
}
@@ -86,6 +87,7 @@ pub async fn new_udp_client_connected(remote_address: &str) -> UdpClient {
8687
}
8788

8889
#[allow(clippy::module_name_repetitions)]
90+
#[derive(Debug)]
8991
pub struct UdpTrackerClient {
9092
pub udp_client: UdpClient,
9193
}

0 commit comments

Comments
 (0)