@@ -3,13 +3,16 @@ use std::net::SocketAddr;
3
3
use std:: sync:: Arc ;
4
4
use std:: time:: Duration ;
5
5
6
+ use crate :: shared:: bit_torrent:: tracker:: udp:: { source_address, MAX_PACKET_SIZE } ;
7
+ use anyhow:: anyhow;
8
+ use anyhow:: { Context , Result } ;
9
+ use core:: result:: Result :: { Ok , Err } ;
10
+ use anyhow:: Error as AError ;
6
11
use aquatic_udp_protocol:: { ConnectRequest , Request , Response , TransactionId } ;
7
12
use log:: debug;
8
13
use tokio:: net:: UdpSocket ;
9
14
use tokio:: time;
10
15
11
- use crate :: shared:: bit_torrent:: tracker:: udp:: { source_address, MAX_PACKET_SIZE } ;
12
-
13
16
/// Default timeout for sending and receiving packets. And waiting for sockets
14
17
/// to be readable and writable.
15
18
const DEFAULT_TIMEOUT : Duration = Duration :: from_secs ( 5 ) ;
@@ -28,31 +31,24 @@ impl UdpClient {
28
31
/// # Panics
29
32
///
30
33
/// Will panic if the local address can't be bound.
31
- pub async fn bind ( local_address : & str ) -> Self {
32
- let valid_socket_addr = local_address
33
- . parse :: < SocketAddr > ( )
34
- . unwrap_or_else ( |_| panic ! ( "{local_address} is not a valid socket address" ) ) ;
35
-
36
- let socket = UdpSocket :: bind ( valid_socket_addr) . await . unwrap ( ) ;
37
-
38
- Self {
34
+ ///
35
+ pub async fn bind ( local_address : & str ) -> Result < Self > {
36
+ let socket_addr = local_address. parse :: < SocketAddr > ( ) . map_err ( |err| err) . context ( "{local_address} is not a valid socket address" ) ?;
37
+ let socket = UdpSocket :: bind ( socket_addr) . await ?;
38
+ let udp_client = Self {
39
39
socket : Arc :: new ( socket) ,
40
40
timeout : DEFAULT_TIMEOUT ,
41
- }
41
+ } ;
42
+ Ok ( udp_client)
42
43
}
43
44
44
45
/// # Panics
45
46
///
46
47
/// Will panic if can't connect to the socket.
47
- pub async fn connect ( & self , remote_address : & str ) {
48
- let valid_socket_addr = remote_address
49
- . parse :: < SocketAddr > ( )
50
- . unwrap_or_else ( |_| panic ! ( "{remote_address} is not a valid socket address" ) ) ;
51
-
52
- match self . socket . connect ( valid_socket_addr) . await {
53
- Ok ( ( ) ) => debug ! ( "Connected successfully" ) ,
54
- Err ( e) => panic ! ( "Failed to connect: {e:?}" ) ,
55
- }
48
+ pub async fn connect ( & self , remote_address : & str ) -> anyhow:: Result < ( ) > {
49
+ let socket_addr = remote_address. parse :: < SocketAddr > ( ) . map_err ( |err| err) . context ( format ! ( "{} is not a valid socket address" , remote_address) ) ?;
50
+ self . socket . connect ( socket_addr) . await . map_err ( |err| err) ?;
51
+ Ok ( ( ) )
56
52
}
57
53
58
54
/// # Panics
@@ -61,24 +57,31 @@ impl UdpClient {
61
57
///
62
58
/// - Can't write to the socket.
63
59
/// - Can't send data.
64
- pub async fn send ( & self , bytes : & [ u8 ] ) -> usize {
60
+ pub async fn send ( & self , bytes : & [ u8 ] ) -> Result < usize , anyhow :: Error > {
65
61
debug ! ( target: "UDP client" , "sending {bytes:?} ..." ) ;
66
62
67
- match time:: timeout ( self . timeout , self . socket . writable ( ) ) . await {
68
- Ok ( writable_result) => match writable_result {
69
- Ok ( ( ) ) => ( ) ,
70
- Err ( e) => panic ! ( "{}" , format!( "IO error waiting for the socket to become readable: {e:?}" ) ) ,
71
- } ,
72
- Err ( e) => panic ! ( "{}" , format!( "Timeout waiting for the socket to become readable: {e:?}" ) ) ,
63
+ let _: Result < ( ) , anyhow:: Error > = match time:: timeout ( self . timeout , self . socket . writable ( ) ) . await {
64
+ Ok ( writable_result) => {
65
+ let writable_result_status : Result < ( ) , anyhow:: Error > = match writable_result {
66
+ Ok ( ( ) ) => Ok ( ( ) ) ,
67
+ Err ( e) => Err ( anyhow ! ( "IO error waiting for the socket to become readable: {e:?}" ) )
68
+ } ;
69
+ writable_result_status
70
+ }
71
+ Err ( e) => Err ( anyhow ! ( "Timeout waiting for the socket to become readable: {e:?}" ) )
73
72
} ;
74
73
75
- match time:: timeout ( self . timeout , self . socket . send ( bytes) ) . await {
76
- Ok ( send_result) => match send_result {
77
- Ok ( size) => size,
78
- Err ( e) => panic ! ( "{}" , format!( "IO error during send: {e:?}" ) ) ,
79
- } ,
80
- Err ( e) => panic ! ( "{}" , format!( "Send operation timed out: {e:?}" ) ) ,
81
- }
74
+ let send_status: Result < usize , anyhow:: Error > = match time:: timeout ( self . timeout , self . socket . send ( bytes) ) . await {
75
+ Ok ( send_result) => {
76
+ let send_result_status: Result < usize , anyhow:: Error > = match send_result {
77
+ Ok ( size) => Ok ( size) ,
78
+ Err ( e) => Err ( anyhow ! ( "IO error waiting for the socket to become readable: {}" , e) )
79
+ } ;
80
+ send_result_status
81
+ }
82
+ Err ( e) => Err ( anyhow ! ( "IO error waiting for the socket to become readable: {}" , e) )
83
+ } ;
84
+ send_status
82
85
}
83
86
84
87
/// # Panics
@@ -87,37 +90,45 @@ impl UdpClient {
87
90
///
88
91
/// - Can't read from the socket.
89
92
/// - Can't receive data.
90
- pub async fn receive ( & self , bytes : & mut [ u8 ] ) -> usize {
93
+ pub async fn receive ( & self , bytes : & mut [ u8 ] ) -> Result < usize > {
91
94
debug ! ( target: "UDP client" , "receiving ..." ) ;
92
95
93
- match time:: timeout ( self . timeout , self . socket . readable ( ) ) . await {
94
- Ok ( readable_result) => match readable_result {
95
- Ok ( ( ) ) => ( ) ,
96
- Err ( e) => panic ! ( "{}" , format!( "IO error waiting for the socket to become readable: {e:?}" ) ) ,
96
+ let _ : Result < ( ) , anyhow:: Error > = match time:: timeout ( self . timeout , self . socket . readable ( ) ) . await {
97
+ Ok ( readable_result) => {
98
+ let readable_result_status = match readable_result {
99
+ Ok ( ( ) ) => Ok ( ( ) ) ,
100
+ Err ( e) => Err ( anyhow ! ( "IO error waiting for the socket to become readable: {e:?}" ) ) ,
101
+ } ;
102
+ readable_result_status
97
103
} ,
98
- Err ( e) => panic ! ( "{}" , format !( "Timeout waiting for the socket to become readable: {e:?}" ) ) ,
104
+ Err ( e) => Err ( anyhow ! ( "Timeout waiting for the socket to become readable: {e:?}" ) ) ,
99
105
} ;
100
106
101
- let size = match time:: timeout ( self . timeout , self . socket . recv ( bytes) ) . await {
107
+ let size: Result < usize , anyhow :: Error > = match time:: timeout ( self . timeout , self . socket . recv ( bytes) ) . await {
102
108
Ok ( recv_result) => match recv_result {
103
- Ok ( size) => size,
104
- Err ( e) => panic ! ( "{}" , format !( "IO error during send: {e:?}" ) ) ,
109
+ Ok ( size) => Ok ( size) ,
110
+ Err ( e) => Err ( anyhow ! ( "IO error during send: {e:?}" ) ) ,
105
111
} ,
106
- Err ( e) => panic ! ( "{}" , format !( "Receive operation timed out: {e:?}" ) ) ,
112
+ Err ( e) => Err ( anyhow ! ( "Receive operation timed out: {e:?}" ) ) ,
107
113
} ;
108
114
109
115
debug ! ( target: "UDP client" , "{size} bytes received {bytes:?}" ) ;
110
116
111
117
size
112
118
}
113
- }
119
+
114
120
115
121
/// Creates a new `UdpClient` connected to a Udp server
116
- pub async fn new_udp_client_connected ( remote_address : & str ) -> UdpClient {
122
+ pub async fn new_udp_client_connected ( remote_address : & str ) -> Result < UdpClient > {
117
123
let port = 0 ; // Let OS choose an unused port.
118
- let client = UdpClient :: bind ( & source_address ( port) ) . await ;
119
- client. connect ( remote_address) . await ;
120
- client
124
+ match UdpClient :: bind ( & source_address ( port) ) . await {
125
+ Ok ( client) => {
126
+ client. connect ( remote_address) . await ;
127
+ Ok ( client)
128
+ }
129
+ Err ( err) => Err ( err) ,
130
+ }
131
+ }
121
132
}
122
133
123
134
#[ allow( clippy:: module_name_repetitions) ]
@@ -130,7 +141,7 @@ impl UdpTrackerClient {
130
141
/// # Panics
131
142
///
132
143
/// Will panic if can't write request to bytes.
133
- pub async fn send ( & self , request : Request ) -> usize {
144
+ pub async fn send ( & self , request : Request ) -> Result < usize > {
134
145
debug ! ( target: "UDP tracker client" , "send request {request:?}" ) ;
135
146
136
147
// Write request into a buffer
@@ -145,7 +156,7 @@ impl UdpTrackerClient {
145
156
// Return slice which contains written request data
146
157
& inner_request_buffer[ ..position]
147
158
}
148
- Err ( e) => panic ! ( "could not write request to bytes: {e}." ) ,
159
+ Err ( e) => Err ( anyhow ! ( "could not write request to bytes: {e}." ) ) ,
149
160
} ;
150
161
151
162
self . udp_client . send ( request_data) . await
@@ -167,8 +178,8 @@ impl UdpTrackerClient {
167
178
168
179
/// Creates a new `UdpTrackerClient` connected to a Udp Tracker server
169
180
pub async fn new_udp_tracker_client_connected ( remote_address : & str ) -> UdpTrackerClient {
170
- let udp_client = new_udp_client_connected ( remote_address) . await ;
171
- UdpTrackerClient { udp_client }
181
+ let udp_client = new_udp_client_connected ( remote_address) . await ? ;
182
+ UdpTrackerClient { udp_client. unwrap ( ) }
172
183
}
173
184
174
185
/// Helper Function to Check if a UDP Service is Connectable
@@ -193,7 +204,7 @@ pub async fn check(binding: &SocketAddr) -> Result<String, String> {
193
204
if matches ! ( response, Response :: Connect ( _connect_response) ) {
194
205
Ok ( "Connected" . to_string ( ) )
195
206
} else {
196
- Err ( "Did not Connect" . to_string ( ) )
207
+ Error ( "Did not Connect" . to_string ( ) )
197
208
}
198
209
} ;
199
210
@@ -202,7 +213,7 @@ pub async fn check(binding: &SocketAddr) -> Result<String, String> {
202
213
203
214
tokio:: select! {
204
215
( ) = & mut sleep => {
205
- Err ( "Timed Out" . to_string( ) )
216
+ Error ( "Timed Out" . to_string( ) )
206
217
}
207
218
response = client. receive( ) => {
208
219
process( response)
0 commit comments