|
| 1 | +use core::result::Result::{Err, Ok}; |
1 | 2 | use std::io::Cursor;
|
2 | 3 | use std::net::SocketAddr;
|
3 | 4 | use std::sync::Arc;
|
4 | 5 | use std::time::Duration;
|
5 | 6 |
|
6 |
| -use crate::shared::bit_torrent::tracker::udp::{source_address, MAX_PACKET_SIZE}; |
7 |
| -use anyhow::anyhow; |
8 |
| -use anyhow::{Context, Result}; |
9 |
| -use core::result::Result::{Ok, Err}; |
10 |
| -use anyhow::Error as AError; |
| 7 | +use anyhow::{anyhow, Context, Result}; |
11 | 8 | use aquatic_udp_protocol::{ConnectRequest, Request, Response, TransactionId};
|
12 | 9 | use log::debug;
|
13 | 10 | use tokio::net::UdpSocket;
|
14 | 11 | use tokio::time;
|
15 | 12 |
|
| 13 | +use crate::shared::bit_torrent::tracker::udp::{source_address, MAX_PACKET_SIZE}; |
| 14 | + |
16 | 15 | /// Default timeout for sending and receiving packets. And waiting for sockets
|
17 | 16 | /// to be readable and writable.
|
18 | 17 | const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
|
|
28 | 27 | }
|
29 | 28 |
|
30 | 29 | impl UdpClient {
|
31 |
| - /// # Panics |
| 30 | + /// # Errors |
32 | 31 | ///
|
33 |
| - /// Will panic if the local address can't be bound. |
| 32 | + /// Will return error if the local address can't be bound. |
34 | 33 | ///
|
35 | 34 | pub async fn bind(local_address: &str) -> Result<Self> {
|
36 |
| - let socket_addr = local_address.parse::<SocketAddr>().map_err(|err| err).context("{local_address} is not a valid socket address")?; |
| 35 | + let socket_addr = local_address |
| 36 | + .parse::<SocketAddr>() |
| 37 | + .context(format!("{local_address} is not a valid socket address"))?; |
| 38 | + |
37 | 39 | let socket = UdpSocket::bind(socket_addr).await?;
|
| 40 | + |
38 | 41 | let udp_client = Self {
|
39 | 42 | socket: Arc::new(socket),
|
40 | 43 | timeout: DEFAULT_TIMEOUT,
|
41 | 44 | };
|
42 | 45 | Ok(udp_client)
|
43 | 46 | }
|
44 | 47 |
|
45 |
| - /// # Panics |
| 48 | + /// # Errors |
46 | 49 | ///
|
47 |
| - /// Will panic if can't connect to the socket. |
48 |
| - pub async fn connect(&self, remote_address: &str) -> anyhow::Result<()> { |
49 |
| - let socket_addr = remote_address.parse::<SocketAddr>().map_err(|err| err).context(format!("{} is not a valid socket address", remote_address))?; |
50 |
| - self.socket.connect(socket_addr).await.map_err(|err| err)?; |
51 |
| - Ok(()) |
| 50 | + /// Will return error if can't connect to the socket. |
| 51 | + pub async fn connect(&self, remote_address: &str) -> Result<()> { |
| 52 | + let socket_addr = remote_address |
| 53 | + .parse::<SocketAddr>() |
| 54 | + .context(format!("{remote_address} is not a valid socket address"))?; |
| 55 | + |
| 56 | + match self.socket.connect(socket_addr).await { |
| 57 | + Ok(()) => { |
| 58 | + debug!("Connected successfully"); |
| 59 | + Ok(()) |
| 60 | + } |
| 61 | + Err(e) => Err(anyhow!("Failed to connect: {e:?}")), |
| 62 | + } |
52 | 63 | }
|
53 | 64 |
|
54 |
| - /// # Panics |
| 65 | + /// # Errors |
55 | 66 | ///
|
56 |
| - /// Will panic if: |
| 67 | + /// Will return error if: |
57 | 68 | ///
|
58 | 69 | /// - Can't write to the socket.
|
59 | 70 | /// - Can't send data.
|
60 |
| - pub async fn send(&self, bytes: &[u8]) -> Result<usize, anyhow::Error> { |
| 71 | + pub async fn send(&self, bytes: &[u8]) -> Result<usize> { |
61 | 72 | debug!(target: "UDP client", "sending {bytes:?} ...");
|
62 | 73 |
|
63 |
| - let _:Result<(), anyhow::Error> = match time::timeout(self.timeout, self.socket.writable()).await { |
| 74 | + match time::timeout(self.timeout, self.socket.writable()).await { |
64 | 75 | Ok(writable_result) => {
|
65 |
| - let writable_result_status : Result<(), anyhow::Error> = match writable_result { |
66 |
| - Ok(()) => Ok(()), |
67 |
| - Err(e) => Err(anyhow!("IO error waiting for the socket to become readable: {e:?}")) |
| 76 | + match writable_result { |
| 77 | + Ok(()) => (), |
| 78 | + Err(e) => return Err(anyhow!("IO error waiting for the socket to become readable: {e:?}")), |
68 | 79 | };
|
69 |
| - writable_result_status |
70 | 80 | }
|
71 |
| - Err(e) => Err(anyhow!("Timeout waiting for the socket to become readable: {e:?}")) |
| 81 | + Err(e) => return Err(anyhow!("Timeout waiting for the socket to become readable: {e:?}")), |
72 | 82 | };
|
73 | 83 |
|
74 |
| - let send_status:Result<usize, anyhow::Error> = match time::timeout(self.timeout, self.socket.send(bytes)).await { |
75 |
| - Ok(send_result) => { |
76 |
| - let send_result_status: Result<usize, anyhow::Error> = match send_result { |
77 |
| - Ok(size) => Ok(size), |
78 |
| - Err(e) => Err(anyhow!("IO error waiting for the socket to become readable: {}", e)) |
79 |
| - }; |
80 |
| - send_result_status |
81 |
| - } |
82 |
| - Err(e) => Err(anyhow!("IO error waiting for the socket to become readable: {}", e)) |
83 |
| - }; |
84 |
| - send_status |
| 84 | + match time::timeout(self.timeout, self.socket.send(bytes)).await { |
| 85 | + Ok(send_result) => match send_result { |
| 86 | + Ok(size) => Ok(size), |
| 87 | + Err(e) => Err(anyhow!("IO error during send: {e:?}")), |
| 88 | + }, |
| 89 | + Err(e) => Err(anyhow!("Send operation timed out: {e:?}")), |
| 90 | + } |
85 | 91 | }
|
86 | 92 |
|
87 |
| - /// # Panics |
| 93 | + /// # Errors |
88 | 94 | ///
|
89 |
| - /// Will panic if: |
| 95 | + /// Will return error if: |
90 | 96 | ///
|
91 | 97 | /// - Can't read from the socket.
|
92 | 98 | /// - Can't receive data.
|
| 99 | + /// |
| 100 | + /// # Panics |
| 101 | + /// |
93 | 102 | pub async fn receive(&self, bytes: &mut [u8]) -> Result<usize> {
|
94 | 103 | debug!(target: "UDP client", "receiving ...");
|
95 | 104 |
|
96 |
| - let _ :Result<(), anyhow::Error>= match time::timeout(self.timeout, self.socket.readable()).await { |
| 105 | + match time::timeout(self.timeout, self.socket.readable()).await { |
97 | 106 | Ok(readable_result) => {
|
98 |
| - let readable_result_status = match readable_result { |
99 |
| - Ok(()) => Ok(()), |
100 |
| - Err(e) => Err(anyhow!("IO error waiting for the socket to become readable: {e:?}")), |
| 107 | + match readable_result { |
| 108 | + Ok(()) => (), |
| 109 | + Err(e) => return Err(anyhow!("IO error waiting for the socket to become readable: {e:?}")), |
101 | 110 | };
|
102 |
| - readable_result_status |
103 |
| - }, |
104 |
| - Err(e) => Err(anyhow!("Timeout waiting for the socket to become readable: {e:?}")), |
| 111 | + } |
| 112 | + Err(e) => return Err(anyhow!("Timeout waiting for the socket to become readable: {e:?}")), |
105 | 113 | };
|
106 | 114 |
|
107 |
| - let size: Result<usize, anyhow::Error> = match time::timeout(self.timeout, self.socket.recv(bytes)).await { |
| 115 | + let size_result = match time::timeout(self.timeout, self.socket.recv(bytes)).await { |
108 | 116 | Ok(recv_result) => match recv_result {
|
109 | 117 | Ok(size) => Ok(size),
|
110 | 118 | Err(e) => Err(anyhow!("IO error during send: {e:?}")),
|
111 | 119 | },
|
112 | 120 | Err(e) => Err(anyhow!("Receive operation timed out: {e:?}")),
|
113 | 121 | };
|
114 | 122 |
|
115 |
| - debug!(target: "UDP client", "{size} bytes received {bytes:?}"); |
116 |
| - |
117 |
| - size |
| 123 | + if size_result.is_ok() { |
| 124 | + let size = size_result.as_ref().unwrap(); |
| 125 | + debug!(target: "UDP client", "{size} bytes received {bytes:?}"); |
| 126 | + size_result |
| 127 | + } else { |
| 128 | + size_result |
| 129 | + } |
118 | 130 | }
|
119 |
| - |
| 131 | +} |
120 | 132 |
|
121 | 133 | /// Creates a new `UdpClient` connected to a Udp server
|
| 134 | +/// |
| 135 | +/// # Errors |
| 136 | +/// |
| 137 | +/// Will return any errors present in the call stack |
| 138 | +/// |
122 | 139 | pub async fn new_udp_client_connected(remote_address: &str) -> Result<UdpClient> {
|
123 | 140 | let port = 0; // Let OS choose an unused port.
|
124 |
| - match UdpClient::bind(&source_address(port)).await { |
125 |
| - Ok(client) => { |
126 |
| - client.connect(remote_address).await; |
127 |
| - Ok(client) |
128 |
| - } |
129 |
| - Err(err) => Err(err), |
130 |
| - } |
131 |
| -} |
| 141 | + let client = UdpClient::bind(&source_address(port)).await?; |
| 142 | + client.connect(remote_address).await?; |
| 143 | + Ok(client) |
132 | 144 | }
|
133 | 145 |
|
134 | 146 | #[allow(clippy::module_name_repetitions)]
|
|
138 | 150 | }
|
139 | 151 |
|
140 | 152 | impl UdpTrackerClient {
|
141 |
| - /// # Panics |
| 153 | + /// # Errors |
142 | 154 | ///
|
143 |
| - /// Will panic if can't write request to bytes. |
| 155 | + /// Will return error if can't write request to bytes. |
144 | 156 | pub async fn send(&self, request: Request) -> Result<usize> {
|
145 | 157 | debug!(target: "UDP tracker client", "send request {request:?}");
|
146 | 158 |
|
147 | 159 | // Write request into a buffer
|
148 | 160 | let request_buffer = vec![0u8; MAX_PACKET_SIZE];
|
149 | 161 | let mut cursor = Cursor::new(request_buffer);
|
150 | 162 |
|
151 |
| - let request_data = match request.write(&mut cursor) { |
| 163 | + let request_data_result = match request.write(&mut cursor) { |
152 | 164 | Ok(()) => {
|
153 | 165 | #[allow(clippy::cast_possible_truncation)]
|
154 | 166 | let position = cursor.position() as usize;
|
155 | 167 | let inner_request_buffer = cursor.get_ref();
|
156 | 168 | // Return slice which contains written request data
|
157 |
| - &inner_request_buffer[..position] |
| 169 | + Ok(&inner_request_buffer[..position]) |
158 | 170 | }
|
159 | 171 | Err(e) => Err(anyhow!("could not write request to bytes: {e}.")),
|
160 | 172 | };
|
161 | 173 |
|
| 174 | + let request_data = request_data_result?; |
| 175 | + |
162 | 176 | self.udp_client.send(request_data).await
|
163 | 177 | }
|
164 | 178 |
|
165 |
| - /// # Panics |
| 179 | + /// # Errors |
166 | 180 | ///
|
167 |
| - /// Will panic if can't create response from the received payload (bytes buffer). |
168 |
| - pub async fn receive(&self) -> Response { |
| 181 | + /// Will return error if can't create response from the received payload (bytes buffer). |
| 182 | + pub async fn receive(&self) -> Result<Response> { |
169 | 183 | let mut response_buffer = [0u8; MAX_PACKET_SIZE];
|
170 | 184 |
|
171 |
| - let payload_size = self.udp_client.receive(&mut response_buffer).await; |
| 185 | + let payload_size = self.udp_client.receive(&mut response_buffer).await?; |
172 | 186 |
|
173 | 187 | debug!(target: "UDP tracker client", "received {payload_size} bytes. Response {response_buffer:?}");
|
174 | 188 |
|
175 |
| - Response::from_bytes(&response_buffer[..payload_size], true).unwrap() |
| 189 | + let response = Response::from_bytes(&response_buffer[..payload_size], true)?; |
| 190 | + |
| 191 | + Ok(response) |
176 | 192 | }
|
177 | 193 | }
|
178 | 194 |
|
179 | 195 | /// Creates a new `UdpTrackerClient` connected to a Udp Tracker server
|
180 |
| -pub async fn new_udp_tracker_client_connected(remote_address: &str) -> UdpTrackerClient { |
| 196 | +/// |
| 197 | +/// # Errors |
| 198 | +/// |
| 199 | +/// Will return any errors present in the call stack |
| 200 | +/// |
| 201 | +pub async fn new_udp_tracker_client_connected(remote_address: &str) -> Result<UdpTrackerClient> { |
181 | 202 | let udp_client = new_udp_client_connected(remote_address).await?;
|
182 |
| - UdpTrackerClient { udp_client.unwrap() } |
| 203 | + let udp_tracker_client = UdpTrackerClient { udp_client }; |
| 204 | + Ok(udp_tracker_client) |
183 | 205 | }
|
184 | 206 |
|
185 | 207 | /// Helper Function to Check if a UDP Service is Connectable
|
186 | 208 | ///
|
187 |
| -/// # Errors |
| 209 | +/// # Panics |
188 | 210 | ///
|
189 | 211 | /// It will return an error if unable to connect to the UDP service.
|
190 | 212 | ///
|
191 |
| -/// # Panics |
| 213 | +/// # Errors |
| 214 | +/// |
192 | 215 | pub async fn check(binding: &SocketAddr) -> Result<String, String> {
|
193 | 216 | debug!("Checking Service (detail): {binding:?}.");
|
194 | 217 |
|
195 |
| - let client = new_udp_tracker_client_connected(binding.to_string().as_str()).await; |
196 |
| - |
197 |
| - let connect_request = ConnectRequest { |
198 |
| - transaction_id: TransactionId(123), |
199 |
| - }; |
200 |
| - |
201 |
| - client.send(connect_request.into()).await; |
202 |
| - |
203 |
| - let process = move |response| { |
204 |
| - if matches!(response, Response::Connect(_connect_response)) { |
205 |
| - Ok("Connected".to_string()) |
206 |
| - } else { |
207 |
| - Error("Did not Connect".to_string()) |
208 |
| - } |
209 |
| - }; |
210 |
| - |
211 |
| - let sleep = time::sleep(Duration::from_millis(2000)); |
212 |
| - tokio::pin!(sleep); |
213 |
| - |
214 |
| - tokio::select! { |
215 |
| - () = &mut sleep => { |
216 |
| - Error("Timed Out".to_string()) |
217 |
| - } |
218 |
| - response = client.receive() => { |
219 |
| - process(response) |
| 218 | + match new_udp_tracker_client_connected(binding.to_string().as_str()).await { |
| 219 | + Ok(client) => { |
| 220 | + let connect_request = ConnectRequest { |
| 221 | + transaction_id: TransactionId(123), |
| 222 | + }; |
| 223 | + |
| 224 | + // client.send() return usize, but doesn't use here |
| 225 | + match client.send(connect_request.into()).await { |
| 226 | + Ok(_) => (), |
| 227 | + Err(e) => debug!("Error: {e:?}."), |
| 228 | + }; |
| 229 | + |
| 230 | + let process = move |response| { |
| 231 | + if matches!(response, Response::Connect(_connect_response)) { |
| 232 | + Ok("Connected".to_string()) |
| 233 | + } else { |
| 234 | + Err("Did not Connect".to_string()) |
| 235 | + } |
| 236 | + }; |
| 237 | + |
| 238 | + let sleep = time::sleep(Duration::from_millis(2000)); |
| 239 | + tokio::pin!(sleep); |
| 240 | + |
| 241 | + tokio::select! { |
| 242 | + () = &mut sleep => { |
| 243 | + Err("Timed Out".to_string()) |
| 244 | + } |
| 245 | + response = client.receive() => { |
| 246 | + process(response.unwrap()) |
| 247 | + } |
| 248 | + } |
220 | 249 | }
|
| 250 | + Err(e) => Err(format!("{e:?}")), |
221 | 251 | }
|
222 | 252 | }
|
0 commit comments