1
+ use core:: result:: Result :: { Err , Ok } ;
1
2
use std:: io:: Cursor ;
2
3
use std:: net:: SocketAddr ;
3
4
use std:: sync:: Arc ;
4
5
use std:: time:: Duration ;
5
6
6
- use crate :: shared:: bit_torrent:: tracker:: udp:: { source_address, MAX_PACKET_SIZE } ;
7
- use anyhow:: anyhow;
8
- use anyhow:: { Context , Result } ;
9
- use core:: result:: Result :: { Ok , Err } ;
10
- use anyhow:: Error as AError ;
7
+ use anyhow:: { anyhow, Context , Result } ;
11
8
use aquatic_udp_protocol:: { ConnectRequest , Request , Response , TransactionId } ;
12
9
use log:: debug;
13
10
use tokio:: net:: UdpSocket ;
14
11
use tokio:: time;
15
12
13
+ use crate :: shared:: bit_torrent:: tracker:: udp:: { source_address, MAX_PACKET_SIZE } ;
14
+
16
15
/// Default timeout for sending and receiving packets. And waiting for sockets
17
16
/// to be readable and writable.
18
17
const DEFAULT_TIMEOUT : Duration = Duration :: from_secs ( 5 ) ;
@@ -28,107 +27,120 @@ pub struct UdpClient {
28
27
}
29
28
30
29
impl UdpClient {
31
- /// # Panics
30
+ /// # Errors
32
31
///
33
- /// Will panic if the local address can't be bound.
32
+ /// Will return error if the local address can't be bound.
34
33
///
35
34
pub async fn bind ( local_address : & str ) -> Result < Self > {
36
- let socket_addr = local_address. parse :: < SocketAddr > ( ) . map_err ( |err| err) . context ( "{local_address} is not a valid socket address" ) ?;
35
+ let socket_addr = local_address
36
+ . parse :: < SocketAddr > ( )
37
+ . context ( format ! ( "{local_address} is not a valid socket address" ) ) ?;
38
+
37
39
let socket = UdpSocket :: bind ( socket_addr) . await ?;
40
+
38
41
let udp_client = Self {
39
42
socket : Arc :: new ( socket) ,
40
43
timeout : DEFAULT_TIMEOUT ,
41
44
} ;
42
45
Ok ( udp_client)
43
46
}
44
47
45
- /// # Panics
48
+ /// # Errors
46
49
///
47
- /// Will panic if can't connect to the socket.
50
+ /// Will return error if can't connect to the socket.
48
51
pub async fn connect ( & self , remote_address : & str ) -> anyhow:: Result < ( ) > {
49
- let socket_addr = remote_address. parse :: < SocketAddr > ( ) . map_err ( |err| err) . context ( format ! ( "{} is not a valid socket address" , remote_address) ) ?;
50
- self . socket . connect ( socket_addr) . await . map_err ( |err| err) ?;
51
- Ok ( ( ) )
52
+ let socket_addr = remote_address
53
+ . parse :: < SocketAddr > ( )
54
+ . context ( format ! ( "{remote_address} is not a valid socket address" ) ) ?;
55
+
56
+ match self . socket . connect ( socket_addr) . await {
57
+ Ok ( ( ) ) => {
58
+ debug ! ( "Connected successfully" ) ;
59
+ Ok ( ( ) )
60
+ }
61
+ Err ( e) => Err ( anyhow ! ( "Failed to connect: {e:?}" ) ) ,
62
+ }
52
63
}
53
64
54
- /// # Panics
65
+ /// # Errors
55
66
///
56
- /// Will panic if:
67
+ /// Will return error if:
57
68
///
58
69
/// - Can't write to the socket.
59
70
/// - Can't send data.
60
71
pub async fn send ( & self , bytes : & [ u8 ] ) -> Result < usize , anyhow:: Error > {
61
72
debug ! ( target: "UDP client" , "sending {bytes:?} ..." ) ;
62
73
63
- let _ : Result < ( ) , anyhow :: Error > = match time:: timeout ( self . timeout , self . socket . writable ( ) ) . await {
74
+ match time:: timeout ( self . timeout , self . socket . writable ( ) ) . await {
64
75
Ok ( writable_result) => {
65
- let writable_result_status : Result < ( ) , anyhow :: Error > = match writable_result {
66
- Ok ( ( ) ) => Ok ( ( ) ) ,
67
- Err ( e) => Err ( anyhow ! ( "IO error waiting for the socket to become readable: {e:?}" ) )
76
+ match writable_result {
77
+ Ok ( ( ) ) => ( ) ,
78
+ Err ( e) => return Err ( anyhow ! ( "IO error waiting for the socket to become readable: {e:?}" ) ) ,
68
79
} ;
69
- writable_result_status
70
80
}
71
- Err ( e) => Err ( anyhow ! ( "Timeout waiting for the socket to become readable: {e:?}" ) )
81
+ Err ( e) => return Err ( anyhow ! ( "Timeout waiting for the socket to become readable: {e:?}" ) ) ,
72
82
} ;
73
83
74
- let send_status: Result < usize , anyhow:: Error > = match time:: timeout ( self . timeout , self . socket . send ( bytes) ) . await {
75
- Ok ( send_result) => {
76
- let send_result_status: Result < usize , anyhow:: Error > = match send_result {
77
- Ok ( size) => Ok ( size) ,
78
- Err ( e) => Err ( anyhow ! ( "IO error waiting for the socket to become readable: {}" , e) )
79
- } ;
80
- send_result_status
81
- }
82
- Err ( e) => Err ( anyhow ! ( "IO error waiting for the socket to become readable: {}" , e) )
83
- } ;
84
- send_status
84
+ match time:: timeout ( self . timeout , self . socket . send ( bytes) ) . await {
85
+ Ok ( send_result) => match send_result {
86
+ Ok ( size) => Ok ( size) ,
87
+ Err ( e) => Err ( anyhow ! ( "IO error during send: {e:?}" ) ) ,
88
+ } ,
89
+ Err ( e) => Err ( anyhow ! ( "Send operation timed out: {e:?}" ) ) ,
90
+ }
85
91
}
86
92
87
- /// # Panics
93
+ /// # Errors
88
94
///
89
- /// Will panic if:
95
+ /// Will return error if:
90
96
///
91
97
/// - Can't read from the socket.
92
98
/// - Can't receive data.
99
+ ///
100
+ /// # Panics
101
+ ///
93
102
pub async fn receive ( & self , bytes : & mut [ u8 ] ) -> Result < usize > {
94
103
debug ! ( target: "UDP client" , "receiving ..." ) ;
95
104
96
- let _ : Result < ( ) , anyhow :: Error > = match time:: timeout ( self . timeout , self . socket . readable ( ) ) . await {
105
+ match time:: timeout ( self . timeout , self . socket . readable ( ) ) . await {
97
106
Ok ( readable_result) => {
98
- let readable_result_status = match readable_result {
99
- Ok ( ( ) ) => Ok ( ( ) ) ,
100
- Err ( e) => Err ( anyhow ! ( "IO error waiting for the socket to become readable: {e:?}" ) ) ,
107
+ match readable_result {
108
+ Ok ( ( ) ) => ( ) ,
109
+ Err ( e) => return Err ( anyhow ! ( "IO error waiting for the socket to become readable: {e:?}" ) ) ,
101
110
} ;
102
- readable_result_status
103
- } ,
104
- Err ( e) => Err ( anyhow ! ( "Timeout waiting for the socket to become readable: {e:?}" ) ) ,
111
+ }
112
+ Err ( e) => return Err ( anyhow ! ( "Timeout waiting for the socket to become readable: {e:?}" ) ) ,
105
113
} ;
106
114
107
- let size : Result < usize , anyhow :: Error > = match time:: timeout ( self . timeout , self . socket . recv ( bytes) ) . await {
115
+ let size_result = match time:: timeout ( self . timeout , self . socket . recv ( bytes) ) . await {
108
116
Ok ( recv_result) => match recv_result {
109
117
Ok ( size) => Ok ( size) ,
110
118
Err ( e) => Err ( anyhow ! ( "IO error during send: {e:?}" ) ) ,
111
119
} ,
112
120
Err ( e) => Err ( anyhow ! ( "Receive operation timed out: {e:?}" ) ) ,
113
121
} ;
114
122
115
- debug ! ( target: "UDP client" , "{size} bytes received {bytes:?}" ) ;
116
-
117
- size
123
+ if size_result. is_ok ( ) {
124
+ let size = size_result. as_ref ( ) . unwrap ( ) ;
125
+ debug ! ( target: "UDP client" , "{size} bytes received {bytes:?}" ) ;
126
+ size_result
127
+ } else {
128
+ size_result
129
+ }
118
130
}
119
-
131
+ }
120
132
121
133
/// Creates a new `UdpClient` connected to a Udp server
134
+ ///
135
+ /// # Errors
136
+ ///
137
+ /// Will return any errors present in the call stack
138
+ ///
122
139
pub async fn new_udp_client_connected ( remote_address : & str ) -> Result < UdpClient > {
123
140
let port = 0 ; // Let OS choose an unused port.
124
- match UdpClient :: bind ( & source_address ( port) ) . await {
125
- Ok ( client) => {
126
- client. connect ( remote_address) . await ;
127
- Ok ( client)
128
- }
129
- Err ( err) => Err ( err) ,
130
- }
131
- }
141
+ let client = UdpClient :: bind ( & source_address ( port) ) . await ?;
142
+ client. connect ( remote_address) . await ?;
143
+ Ok ( client)
132
144
}
133
145
134
146
#[ allow( clippy:: module_name_repetitions) ]
@@ -138,85 +150,103 @@ pub struct UdpTrackerClient {
138
150
}
139
151
140
152
impl UdpTrackerClient {
141
- /// # Panics
153
+ /// # Errors
142
154
///
143
- /// Will panic if can't write request to bytes.
155
+ /// Will return error if can't write request to bytes.
144
156
pub async fn send ( & self , request : Request ) -> Result < usize > {
145
157
debug ! ( target: "UDP tracker client" , "send request {request:?}" ) ;
146
158
147
159
// Write request into a buffer
148
160
let request_buffer = vec ! [ 0u8 ; MAX_PACKET_SIZE ] ;
149
161
let mut cursor = Cursor :: new ( request_buffer) ;
150
162
151
- let request_data = match request. write ( & mut cursor) {
163
+ let request_data_result = match request. write ( & mut cursor) {
152
164
Ok ( ( ) ) => {
153
165
#[ allow( clippy:: cast_possible_truncation) ]
154
166
let position = cursor. position ( ) as usize ;
155
167
let inner_request_buffer = cursor. get_ref ( ) ;
156
168
// Return slice which contains written request data
157
- & inner_request_buffer[ ..position]
169
+ Ok ( & inner_request_buffer[ ..position] )
158
170
}
159
171
Err ( e) => Err ( anyhow ! ( "could not write request to bytes: {e}." ) ) ,
160
172
} ;
161
173
174
+ let request_data = request_data_result?;
175
+
162
176
self . udp_client . send ( request_data) . await
163
177
}
164
178
165
- /// # Panics
179
+ /// # Errors
166
180
///
167
- /// Will panic if can't create response from the received payload (bytes buffer).
168
- pub async fn receive ( & self ) -> Response {
181
+ /// Will return error if can't create response from the received payload (bytes buffer).
182
+ pub async fn receive ( & self ) -> Result < Response > {
169
183
let mut response_buffer = [ 0u8 ; MAX_PACKET_SIZE ] ;
170
184
171
- let payload_size = self . udp_client . receive ( & mut response_buffer) . await ;
185
+ let payload_size = self . udp_client . receive ( & mut response_buffer) . await ? ;
172
186
173
187
debug ! ( target: "UDP tracker client" , "received {payload_size} bytes. Response {response_buffer:?}" ) ;
174
188
175
- Response :: from_bytes ( & response_buffer[ ..payload_size] , true ) . unwrap ( )
189
+ let response = Response :: from_bytes ( & response_buffer[ ..payload_size] , true ) ?;
190
+
191
+ Ok ( response)
176
192
}
177
193
}
178
194
179
195
/// Creates a new `UdpTrackerClient` connected to a Udp Tracker server
180
- pub async fn new_udp_tracker_client_connected ( remote_address : & str ) -> UdpTrackerClient {
196
+ ///
197
+ /// # Errors
198
+ ///
199
+ /// Will return any errors present in the call stack
200
+ ///
201
+ pub async fn new_udp_tracker_client_connected ( remote_address : & str ) -> Result < UdpTrackerClient > {
181
202
let udp_client = new_udp_client_connected ( remote_address) . await ?;
182
- UdpTrackerClient { udp_client. unwrap ( ) }
203
+ let udp_tracker_client = UdpTrackerClient { udp_client } ;
204
+ Ok ( udp_tracker_client)
183
205
}
184
206
185
207
/// Helper Function to Check if a UDP Service is Connectable
186
208
///
187
- /// # Errors
209
+ /// # Panics
188
210
///
189
211
/// It will return an error if unable to connect to the UDP service.
190
212
///
191
- /// # Panics
213
+ /// # Errors
214
+ ///
192
215
pub async fn check ( binding : & SocketAddr ) -> Result < String , String > {
193
216
debug ! ( "Checking Service (detail): {binding:?}." ) ;
194
217
195
- let client = new_udp_tracker_client_connected ( binding. to_string ( ) . as_str ( ) ) . await ;
196
-
197
- let connect_request = ConnectRequest {
198
- transaction_id : TransactionId ( 123 ) ,
199
- } ;
200
-
201
- client. send ( connect_request. into ( ) ) . await ;
202
-
203
- let process = move |response| {
204
- if matches ! ( response, Response :: Connect ( _connect_response) ) {
205
- Ok ( "Connected" . to_string ( ) )
206
- } else {
207
- Error ( "Did not Connect" . to_string ( ) )
208
- }
209
- } ;
210
-
211
- let sleep = time:: sleep ( Duration :: from_millis ( 2000 ) ) ;
212
- tokio:: pin!( sleep) ;
213
-
214
- tokio:: select! {
215
- ( ) = & mut sleep => {
216
- Error ( "Timed Out" . to_string( ) )
217
- }
218
- response = client. receive( ) => {
219
- process( response)
218
+ match new_udp_tracker_client_connected ( binding. to_string ( ) . as_str ( ) ) . await {
219
+ Ok ( client) => {
220
+ let connect_request = ConnectRequest {
221
+ transaction_id : TransactionId ( 123 ) ,
222
+ } ;
223
+
224
+ // client.send() return usize, but doesn't use here
225
+ match client. send ( connect_request. into ( ) ) . await {
226
+ Ok ( _) => ( ) ,
227
+ Err ( e) => debug ! ( "Error: {e:?}." ) ,
228
+ } ;
229
+
230
+ let process = move |response| {
231
+ if matches ! ( response, Response :: Connect ( _connect_response) ) {
232
+ Ok ( "Connected" . to_string ( ) )
233
+ } else {
234
+ Err ( "Did not Connect" . to_string ( ) )
235
+ }
236
+ } ;
237
+
238
+ let sleep = time:: sleep ( Duration :: from_millis ( 2000 ) ) ;
239
+ tokio:: pin!( sleep) ;
240
+
241
+ tokio:: select! {
242
+ ( ) = & mut sleep => {
243
+ Err ( "Timed Out" . to_string( ) )
244
+ }
245
+ response = client. receive( ) => {
246
+ process( response. unwrap( ) )
247
+ }
248
+ }
220
249
}
250
+ Err ( e) => Err ( format ! ( "{e:?}" ) ) ,
221
251
}
222
252
}
0 commit comments