Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul stats: implement a graceful shutdown for broadcast channels #1405

Open
josecelano opened this issue Mar 21, 2025 · 1 comment
Open

Comments

@josecelano
Copy link
Member

Relates to: #1385

In this issue I changed the events channels from mpsc to broadcast. I wanted to have many listeners. It's working fine. However when you send the halt signal to the applcaition (CRTL+c) you get this erros in the logs:

2025-03-21T16:11:39.493220Z ERROR bittorrent_udp_tracker_core::statistics::event::listener: Error receiving udp tracker core event: Closed
2025-03-21T16:11:39.493352Z ERROR bittorrent_http_tracker_core::statistics::event::listener: Error receiving http tracker core event: Closed
2025-03-21T16:11:39.493361Z ERROR torrust_udp_tracker_server::statistics::event::listener: Error receiving udp tracker server event: Closed

The problem is we drop the sender and the listener return an error because the channel is closed. We should implement a gracefully shutdown notifying listeners and giving them time to finish processing the current events.

Since I don't see an option in the broadcast channel to notify listener we need to research other alternatives.

cc @da2ce7

@josecelano
Copy link
Member Author

First ideas using AI.

cc @da2ce7


In Tokio, you can gracefully close a broadcast channel by using a combination of techniques. Here are a few approaches:

  1. Using a dedicated shutdown signal:
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;

struct Sender<T> {
    tx: broadcast::Sender<T>,
    shutdown_tx: broadcast::Sender<()>,
}

struct Receiver<T> {
    rx: broadcast::Receiver<T>,
    shutdown_rx: broadcast::Receiver<()>,
}

impl<T> Sender<T> {
    fn shutdown(&self) {
        // Send shutdown signal before dropping the sender
        let _ = self.shutdown_tx.send(());
    }
}

async fn listener(mut receiver: Receiver<String>) {
    loop {
        tokio::select! {
            // Check for shutdown signal
            Ok(()) = receiver.shutdown_rx.recv() => {
                println!("Received shutdown signal, stopping listener...");
                break;
            }
            // Process regular messages
            result = receiver.rx.recv() => {
                match result {
                    Ok(msg) => println!("Received: {}", msg),
                    Err(RecvError::Closed) => {
                        println!("Channel closed, stopping listener...");
                        break;
                    }
                    Err(RecvError::Lagged(n)) => {
                        println!("Lagged by {} messages", n);
                    }
                }
            }
        }
    }
    
    println!("Listener gracefully stopped");
}
  1. Using a sentinel value:
use tokio::sync::broadcast;
use serde::Serialize;

#[derive(Clone, Serialize)]
enum Message {
    Regular(String),
    Shutdown,
}

async fn listener(mut rx: broadcast::Receiver<Message>) {
    while let Ok(msg) = rx.recv().await {
        match msg {
            Message::Regular(content) => {
                println!("Received: {}", content);
            }
            Message::Shutdown => {
                println!("Received shutdown signal, stopping listener...");
                break;
            }
        }
    }
    
    println!("Listener gracefully stopped");
}

async fn sender(tx: broadcast::Sender<Message>) {
    // Send regular messages
    tx.send(Message::Regular("Hello".to_string())).unwrap();
    
    // Signal shutdown
    tx.send(Message::Shutdown).unwrap();
}
  1. Using Drop implementation:
use tokio::sync::broadcast;
use std::sync::Arc;
use tokio::sync::Notify;

struct GracefulSender<T> {
    tx: broadcast::Sender<T>,
    shutdown_notify: Arc<Notify>,
}

impl<T> Drop for GracefulSender<T> {
    fn drop(&mut self) {
        self.shutdown_notify.notify_waiters();
    }
}

struct GracefulReceiver<T> {
    rx: broadcast::Receiver<T>,
    shutdown_notify: Arc<Notify>,
}

async fn listener(receiver: GracefulReceiver<String>) {
    loop {
        tokio::select! {
            // Check for shutdown notification
            _ = receiver.shutdown_notify.notified() => {
                println!("Received shutdown notification, stopping listener...");
                break;
            }
            // Process regular messages
            result = receiver.rx.recv() => {
                match result {
                    Ok(msg) => println!("Received: {}", msg),
                    Err(RecvError::Closed) => {
                        println!("Channel closed, stopping listener...");
                        break;
                    }
                    Err(RecvError::Lagged(n)) => {
                        println!("Lagged by {} messages", n);
                    }
                }
            }
        }
    }
    
    println!("Listener gracefully stopped");
}

// Creating sender/receiver pairs
fn create_channel<T>() -> (GracefulSender<T>, GracefulReceiver<T>) {
    let (tx, rx) = broadcast::channel(100);
    let shutdown_notify = Arc::new(Notify::new());
    
    (
        GracefulSender {
            tx,
            shutdown_notify: Arc::clone(&shutdown_notify),
        },
        GracefulReceiver {
            rx,
            shutdown_notify,
        },
    )
}
  1. Complete example with graceful shutdown:
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use std::sync::Arc;
use tokio::sync::Notify;
use std::time::Duration;

#[derive(Clone)]
struct Channel<T> {
    tx: broadcast::Sender<T>,
    shutdown_notify: Arc<Notify>,
}

impl<T> Channel<T> {
    fn new(capacity: usize) -> Self {
        let (tx, _) = broadcast::channel(capacity);
        Self {
            tx,
            shutdown_notify: Arc::new(Notify::new()),
        }
    }

    fn sender(&self) -> Sender<T> {
        Sender {
            tx: self.tx.clone(),
            shutdown_notify: Arc::clone(&self.shutdown_notify),
        }
    }

    fn receiver(&self) -> Receiver<T> {
        Receiver {
            rx: self.tx.subscribe(),
            shutdown_notify: Arc::clone(&self.shutdown_notify),
        }
    }
}

struct Sender<T> {
    tx: broadcast::Sender<T>,
    shutdown_notify: Arc<Notify>,
}

impl<T> Sender<T> {
    async fn send(&self, value: T) -> Result<usize, broadcast::error::SendError<T>> {
        self.tx.send(value)
    }

    async fn shutdown(&self) {
        self.shutdown_notify.notify_waiters();
        // Give some time for receivers to process the shutdown signal
        tokio::time::sleep(Duration::from_millis(100)).await;
    }
}

struct Receiver<T> {
    rx: broadcast::Receiver<T>,
    shutdown_notify: Arc<Notify>,
}

impl<T> Receiver<T> {
    async fn recv(&mut self) -> Result<T, RecvError> {
        tokio::select! {
            _ = self.shutdown_notify.notified() => {
                println!("Receiver got shutdown signal");
                Err(RecvError::Closed)
            }
            result = self.rx.recv() => {
                result
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let channel = Channel::<String>::new(100);
    
    // Spawn multiple listeners
    let mut handles = vec![];
    
    for i in 0..3 {
        let mut receiver = channel.receiver();
        let handle = tokio::spawn(async move {
            println!("Listener {} started", i);
            while let Ok(msg) = receiver.recv().await {
                println!("Listener {} received: {}", i, msg);
            }
            println!("Listener {} stopped", i);
        });
        handles.push(handle);
    }

    // Send some messages
    let sender = channel.sender();
    sender.send("Hello".to_string()).await.unwrap();
    sender.send("World".to_string()).await.unwrap();

    // Graceful shutdown
    println!("Initiating graceful shutdown...");
    sender.shutdown().await;

    // Wait for all listeners to complete
    for handle in handles {
        handle.await.unwrap();
    }
    
    println!("All listeners stopped gracefully");
}

This example provides:

  1. A clean channel abstraction with separate sender and receiver types
  2. Graceful shutdown mechanism using Notify
  3. Proper error handling
  4. Clean shutdown sequence
  5. Ability to wait for all listeners to complete

The shutdown process:

  1. When shutdown is initiated, all receivers are notified
  2. Receivers get the shutdown signal and stop processing
  3. The sender waits briefly to ensure receivers process the signal
  4. The main process can wait for all listeners to complete

This approach ensures:

  • No messages are lost during shutdown
  • All listeners are properly notified
  • Clean shutdown logs
  • No unexpected errors in logs
  • Proper resource cleanup

You can extend this pattern by:

  • Adding shutdown timeouts
  • Including shutdown confirmation messages
  • Adding metrics for shutdown process
  • Implementing custom shutdown behaviors
  • Adding retry mechanisms for important messages during shutdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant