You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
That would allow concurrent writes/reads but to take advantage of that change I think we need to change the UDP server.
One implementation proposal could be using workers. I think @da2ce7 was working on a refactor where he was using a pool of active requests handled by spawned tasks (when he was working on the graceful shutdown).
This is the draft ChatGTP implementation:
// ... [existing imports] ...implUdp{// ... [other methods] .../// It starts the UDP server instance with graceful shutdown.asyncfnstart_with_graceful_shutdown(tracker:Arc<Tracker>,bind_to:SocketAddr,tx_start:Sender<Started>,rx_halt:Receiver<Halted>,) -> JoinHandle<()>{let socket = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to UDP socket."));let address = socket.local_addr().expect("Could not get local_addr from binding.");info!(target:"UDP Tracker","Starting on: udp://{}", address);let(tx, rx) = mpsc::channel::<(Vec<u8>,SocketAddr)>(100);let rx = Arc::new(Mutex::new(rx));let n_workers = 10;// Set the number of workers as neededfor _ in0..n_workers {let worker_rx = rx.clone();let worker_socket = socket.clone();let worker_tracker = tracker.clone();
tokio::spawn(asyncmove{worker_process(worker_rx, worker_socket, worker_tracker).await;});}let running = tokio::task::spawn(asyncmove{letmut halt = tokio::task::spawn(asyncmove{shutdown_signal_with_message(
rx_halt,format!("Shutting down UDP server on socket address: udp://{address}"),).await;});loop{
tokio::select! {Ok(_) = &mut halt => {
debug!(target:"UDP Tracker","Halt signal received, stopping listening for new requests.");break;}
result = Udp::receive_packet(&socket) => {ifletOk((payload, remote_addr)) = result {if tx.send((payload, remote_addr)).await.is_err(){
error!("Failed to send request to worker");}} else ifletErr(e) = result {
error!("Error reading UDP datagram from socket. Error: {:?}", e);}}}}debug!(target:"UDP Tracker","Main listener stopped, waiting for worker tasks to complete.");});info!(target:"UDP Tracker","Started on: udp://{}", address);
running
}asyncfnreceive_packet(socket:&Arc<UdpSocket>) -> std::io::Result<(Vec<u8>,SocketAddr)>{letmut data = [0;MAX_PACKET_SIZE];let(valid_bytes, remote_addr) = socket.recv_from(&mut data).await?;let payload = data[..valid_bytes].to_vec();Ok((payload, remote_addr))}// ... [existing Udp methods] ...}asyncfnworker_process(rx:Arc<Mutex<mpsc::Receiver<(Vec<u8>,SocketAddr)>>>,socket:Arc<UdpSocket>,tracker:Arc<Tracker>,){whileletSome((payload, remote_addr)) = rx.lock().await.recv().await{let response_fut = handle_packet(remote_addr, payload,&tracker);// Implementing a 5-second timeout for each requestmatch tokio::time::timeout(Duration::from_secs(5), response_fut).await{Ok(response) => {Udp::send_response(socket.clone(), remote_addr, response).await;}Err(_) => {error!("Timeout occurred while processing the UDP request from {}", remote_addr);// Optionally, send a timeout response or error message back to the client// Udp::send_timeout_response(socket.clone(), remote_addr).await;}}}}// ... [existing tests and other code] ...
The text was updated successfully, but these errors were encountered:
Relates to: #565, #596
Depends on: #565
The current version of the UDP server processes one request at a time.
For the time being, that's not a bottleneck because we also write sequentially into the Tracker repository. @WarmBeer is working on the issue:
That would allow concurrent writes/reads but to take advantage of that change I think we need to change the UDP server.
One implementation proposal could be using workers. I think @da2ce7 was working on a refactor where he was using a pool of active requests handled by spawned tasks (when he was working on the graceful shutdown).
This is the draft ChatGTP implementation:
The text was updated successfully, but these errors were encountered: