Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process UDP requests concurrently #611

Closed
1 task done
josecelano opened this issue Jan 16, 2024 · 3 comments · Fixed by #644
Closed
1 task done

Process UDP requests concurrently #611

josecelano opened this issue Jan 16, 2024 · 3 comments · Fixed by #644
Labels
Milestone

Comments

@josecelano
Copy link
Member

josecelano commented Jan 16, 2024

Relates to: #565, #596
Depends on: #565

The current version of the UDP server processes one request at a time.

let listen = async move {
    debug!(target: "UDP Tracker", "Waiting for packets on socket address: udp://{address} ...");

    loop {
        let mut data = [0; MAX_PACKET_SIZE];
        let socket_clone = socket.clone();

        match socket_clone.recv_from(&mut data).await {
            Ok((valid_bytes, remote_addr)) => {
                let payload = data[..valid_bytes].to_vec();

                debug!(target: "UDP Tracker", "Received {} bytes", payload.len());
                debug!(target: "UDP Tracker", "From: {}", &remote_addr);
                debug!(target: "UDP Tracker", "Payload: {:?}", payload);

                let response_fut = handle_packet(remote_addr, payload, &tracker);

                match tokio::time::timeout(Duration::from_secs(5), response_fut).await {
                    Ok(response) => {
                        Udp::send_response(socket_clone, remote_addr, response).await;
                    }
                    Err(_) => {
                        error!("Timeout occurred while processing the UDP request.");
                    }
                }
            }
            Err(err) => {
                error!("Error reading UDP datagram from socket. Error: {:?}", err);
            }
        }
    }
};

For the time being, that's not a bottleneck because we also write sequentially into the Tracker repository. @WarmBeer is working on the issue:

That would allow concurrent writes/reads but to take advantage of that change I think we need to change the UDP server.

One implementation proposal could be using workers. I think @da2ce7 was working on a refactor where he was using a pool of active requests handled by spawned tasks (when he was working on the graceful shutdown).

This is the draft ChatGTP implementation:

// ... [existing imports] ...

impl Udp {
    // ... [other methods] ...

    /// It starts the UDP server instance with graceful shutdown.
    async fn start_with_graceful_shutdown(
        tracker: Arc<Tracker>,
        bind_to: SocketAddr,
        tx_start: Sender<Started>,
        rx_halt: Receiver<Halted>,
    ) -> JoinHandle<()> {
        let socket = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to UDP socket."));
        let address = socket.local_addr().expect("Could not get local_addr from binding.");

        info!(target: "UDP Tracker", "Starting on: udp://{}", address);

        let (tx, rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(100);
        let rx = Arc::new(Mutex::new(rx));

        let n_workers = 10; // Set the number of workers as needed
        for _ in 0..n_workers {
            let worker_rx = rx.clone();
            let worker_socket = socket.clone();
            let worker_tracker = tracker.clone();
            tokio::spawn(async move {
                worker_process(worker_rx, worker_socket, worker_tracker).await;
            });
        }

        let running = tokio::task::spawn(async move {
            let mut halt = tokio::task::spawn(async move {
                shutdown_signal_with_message(
                    rx_halt,
                    format!("Shutting down UDP server on socket address: udp://{address}"),
                )
                .await;
            });

            loop {
                tokio::select! {
                    Ok(_) = &mut halt => {
                        debug!(target: "UDP Tracker", "Halt signal received, stopping listening for new requests.");
                        break;
                    }
                    result = Udp::receive_packet(&socket) => {
                        if let Ok((payload, remote_addr)) = result {
                            if tx.send((payload, remote_addr)).await.is_err() {
                                error!("Failed to send request to worker");
                            }
                        } else if let Err(e) = result {
                            error!("Error reading UDP datagram from socket. Error: {:?}", e);
                        }
                    }
                }
            }

            debug!(target: "UDP Tracker", "Main listener stopped, waiting for worker tasks to complete.");
        });

        info!(target: "UDP Tracker", "Started on: udp://{}", address);

        running
    }

    async fn receive_packet(socket: &Arc<UdpSocket>) -> std::io::Result<(Vec<u8>, SocketAddr)> {
        let mut data = [0; MAX_PACKET_SIZE];
        let (valid_bytes, remote_addr) = socket.recv_from(&mut data).await?;
        let payload = data[..valid_bytes].to_vec();

        Ok((payload, remote_addr))
    }

    // ... [existing Udp methods] ...
}

async fn worker_process(
    rx: Arc<Mutex<mpsc::Receiver<(Vec<u8>, SocketAddr)>>>,
    socket: Arc<UdpSocket>,
    tracker: Arc<Tracker>,
) {
    while let Some((payload, remote_addr)) = rx.lock().await.recv().await {
        let response_fut = handle_packet(remote_addr, payload, &tracker);

        // Implementing a 5-second timeout for each request
        match tokio::time::timeout(Duration::from_secs(5), response_fut).await {
            Ok(response) => {
                Udp::send_response(socket.clone(), remote_addr, response).await;
            }
            Err(_) => {
                error!("Timeout occurred while processing the UDP request from {}", remote_addr);
                // Optionally, send a timeout response or error message back to the client
                // Udp::send_timeout_response(socket.clone(), remote_addr).await;
            }
        }
    }
}

// ... [existing tests and other code] ...
@cgbosse cgbosse added this to the v3.1.0 milestone Jan 16, 2024
@cgbosse
Copy link
Member

cgbosse commented Jan 16, 2024

@WarmBeer what do you think?

@mickvandijke mickvandijke changed the title Process UDP requests concurrenlty Process UDP requests concurrently Jan 16, 2024
@mickvandijke
Copy link
Member

We should definitely try to process UDP requests in parallel if possible!

@da2ce7
Copy link
Contributor

da2ce7 commented Jan 17, 2024

@josecelano @WarmBeer
The reference implementation here: #596
Also handles the requests asynchronously.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

4 participants