-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Overhaul stats: implement a graceful shutdown for broadcast channels #1405
Comments
First ideas using AI.
cc @da2ce7 In Tokio, you can gracefully close a broadcast channel by using a combination of techniques. Here are a few approaches:
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
struct Sender<T> {
tx: broadcast::Sender<T>,
shutdown_tx: broadcast::Sender<()>,
}
struct Receiver<T> {
rx: broadcast::Receiver<T>,
shutdown_rx: broadcast::Receiver<()>,
}
impl<T> Sender<T> {
fn shutdown(&self) {
// Send shutdown signal before dropping the sender
let _ = self.shutdown_tx.send(());
}
}
async fn listener(mut receiver: Receiver<String>) {
loop {
tokio::select! {
// Check for shutdown signal
Ok(()) = receiver.shutdown_rx.recv() => {
println!("Received shutdown signal, stopping listener...");
break;
}
// Process regular messages
result = receiver.rx.recv() => {
match result {
Ok(msg) => println!("Received: {}", msg),
Err(RecvError::Closed) => {
println!("Channel closed, stopping listener...");
break;
}
Err(RecvError::Lagged(n)) => {
println!("Lagged by {} messages", n);
}
}
}
}
}
println!("Listener gracefully stopped");
}
use tokio::sync::broadcast;
use serde::Serialize;
#[derive(Clone, Serialize)]
enum Message {
Regular(String),
Shutdown,
}
async fn listener(mut rx: broadcast::Receiver<Message>) {
while let Ok(msg) = rx.recv().await {
match msg {
Message::Regular(content) => {
println!("Received: {}", content);
}
Message::Shutdown => {
println!("Received shutdown signal, stopping listener...");
break;
}
}
}
println!("Listener gracefully stopped");
}
async fn sender(tx: broadcast::Sender<Message>) {
// Send regular messages
tx.send(Message::Regular("Hello".to_string())).unwrap();
// Signal shutdown
tx.send(Message::Shutdown).unwrap();
}
use tokio::sync::broadcast;
use std::sync::Arc;
use tokio::sync::Notify;
struct GracefulSender<T> {
tx: broadcast::Sender<T>,
shutdown_notify: Arc<Notify>,
}
impl<T> Drop for GracefulSender<T> {
fn drop(&mut self) {
self.shutdown_notify.notify_waiters();
}
}
struct GracefulReceiver<T> {
rx: broadcast::Receiver<T>,
shutdown_notify: Arc<Notify>,
}
async fn listener(receiver: GracefulReceiver<String>) {
loop {
tokio::select! {
// Check for shutdown notification
_ = receiver.shutdown_notify.notified() => {
println!("Received shutdown notification, stopping listener...");
break;
}
// Process regular messages
result = receiver.rx.recv() => {
match result {
Ok(msg) => println!("Received: {}", msg),
Err(RecvError::Closed) => {
println!("Channel closed, stopping listener...");
break;
}
Err(RecvError::Lagged(n)) => {
println!("Lagged by {} messages", n);
}
}
}
}
}
println!("Listener gracefully stopped");
}
// Creating sender/receiver pairs
fn create_channel<T>() -> (GracefulSender<T>, GracefulReceiver<T>) {
let (tx, rx) = broadcast::channel(100);
let shutdown_notify = Arc::new(Notify::new());
(
GracefulSender {
tx,
shutdown_notify: Arc::clone(&shutdown_notify),
},
GracefulReceiver {
rx,
shutdown_notify,
},
)
}
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use std::sync::Arc;
use tokio::sync::Notify;
use std::time::Duration;
#[derive(Clone)]
struct Channel<T> {
tx: broadcast::Sender<T>,
shutdown_notify: Arc<Notify>,
}
impl<T> Channel<T> {
fn new(capacity: usize) -> Self {
let (tx, _) = broadcast::channel(capacity);
Self {
tx,
shutdown_notify: Arc::new(Notify::new()),
}
}
fn sender(&self) -> Sender<T> {
Sender {
tx: self.tx.clone(),
shutdown_notify: Arc::clone(&self.shutdown_notify),
}
}
fn receiver(&self) -> Receiver<T> {
Receiver {
rx: self.tx.subscribe(),
shutdown_notify: Arc::clone(&self.shutdown_notify),
}
}
}
struct Sender<T> {
tx: broadcast::Sender<T>,
shutdown_notify: Arc<Notify>,
}
impl<T> Sender<T> {
async fn send(&self, value: T) -> Result<usize, broadcast::error::SendError<T>> {
self.tx.send(value)
}
async fn shutdown(&self) {
self.shutdown_notify.notify_waiters();
// Give some time for receivers to process the shutdown signal
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
struct Receiver<T> {
rx: broadcast::Receiver<T>,
shutdown_notify: Arc<Notify>,
}
impl<T> Receiver<T> {
async fn recv(&mut self) -> Result<T, RecvError> {
tokio::select! {
_ = self.shutdown_notify.notified() => {
println!("Receiver got shutdown signal");
Err(RecvError::Closed)
}
result = self.rx.recv() => {
result
}
}
}
}
#[tokio::main]
async fn main() {
let channel = Channel::<String>::new(100);
// Spawn multiple listeners
let mut handles = vec![];
for i in 0..3 {
let mut receiver = channel.receiver();
let handle = tokio::spawn(async move {
println!("Listener {} started", i);
while let Ok(msg) = receiver.recv().await {
println!("Listener {} received: {}", i, msg);
}
println!("Listener {} stopped", i);
});
handles.push(handle);
}
// Send some messages
let sender = channel.sender();
sender.send("Hello".to_string()).await.unwrap();
sender.send("World".to_string()).await.unwrap();
// Graceful shutdown
println!("Initiating graceful shutdown...");
sender.shutdown().await;
// Wait for all listeners to complete
for handle in handles {
handle.await.unwrap();
}
println!("All listeners stopped gracefully");
} This example provides:
The shutdown process:
This approach ensures:
You can extend this pattern by:
|
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Relates to: #1385
In this issue I changed the events channels from
mpsc
tobroadcast
. I wanted to have many listeners. It's working fine. However when you send the halt signal to the applcaition (CRTL+c) you get this erros in the logs:The problem is we drop the sender and the listener return an error because the channel is closed. We should implement a gracefully shutdown notifying listeners and giving them time to finish processing the current events.
Since I don't see an option in the
broadcast
channel to notify listener we need to research other alternatives.cc @da2ce7
The text was updated successfully, but these errors were encountered: