Skip to content

Commit

Permalink
refactor(consensus): return with an error if channels are closed
Browse files Browse the repository at this point in the history
StreamHandler expects the network and client to maintain their sides
of the channels forever
  • Loading branch information
matan-starkware committed Mar 4, 2025
1 parent 936bc96 commit a489bbc
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 58 deletions.
74 changes: 52 additions & 22 deletions crates/starknet_consensus/src/stream_handler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Stream handler, see StreamManager struct.
//! Overlay streaming logic onto individual messages.
use std::cmp::Ordering;
use std::collections::hash_map::Entry::{Occupied, Vacant};
Expand All @@ -7,6 +7,7 @@ use std::fmt::{Debug, Display};
use std::hash::Hash;

use futures::channel::mpsc;
use futures::never::Never;
use futures::StreamExt;
use papyrus_network::network_manager::{BroadcastTopicClientTrait, ReceivedBroadcastedMessage};
use papyrus_network::utils::StreamHashMap;
Expand All @@ -17,6 +18,7 @@ use tracing::{instrument, warn};

#[cfg(test)]
#[path = "stream_handler_test.rs"]
#[allow(clippy::as_conversions)]
mod stream_handler_test;

type PeerId = OpaquePeerId;
Expand All @@ -26,6 +28,17 @@ type MessageId = u64;
// TODO(guy): make this configurable.
pub const CHANNEL_BUFFER_LENGTH: usize = 100;

/// Errors which cause the stream handler to stop functioning.
#[derive(thiserror::Error, PartialEq, Debug)]
pub enum StreamHandlerError {
/// Client has closed their sender, so no more outbound streams can be sent.
#[error("Client has closed their sender, so no more outbound streams can be sent.")]
OutboundChannelClosed,
/// Network has closed their sender, so no more inbound streams can be sent.
#[error("Network has closed their sender, so no more inbound streams can be sent.")]
InboundChannelClosed,
}

/// A combination of trait bounds needed for the content of the stream.
pub trait StreamContentTrait:
Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError> + Send
Expand Down Expand Up @@ -151,49 +164,63 @@ where
}

/// Run the stream handler indefinitely.
pub async fn run(mut self) {
pub async fn run(mut self) -> Result<Never, StreamHandlerError> {
loop {
self.handle_next_msg().await
self.handle_next_msg().await?
}
}

/// Listen for a single message coming from the network or from an application.
/// - Outbound messages are wrapped as StreamMessage and sent to the network directly.
/// - Inbound messages are stripped of StreamMessage and buffered until they can be sent in the
/// correct order to the application.
pub async fn handle_next_msg(&mut self) {
///
/// Expects to live forever, returning an Error if the client or network close their sender.
pub async fn handle_next_msg(&mut self) -> Result<(), StreamHandlerError> {
tokio::select!(
// New outbound stream.
Some((stream_id, receiver)) = self.outbound_channel_receiver.next() => {
self.outbound_stream_receivers.insert(stream_id, receiver);
outbound_stream = self.outbound_channel_receiver.next() => {
self.handle_new_stream(outbound_stream).await
}
// New message on an existing outbound stream.
output = self.outbound_stream_receivers.next() => {
self.handle_outbound_message(output).await
}
// New inbound message from the network.
Some(message) = self.inbound_receiver.next() => {
self.handle_inbound_message(message);
message = self.inbound_receiver.next() => {
self.handle_inbound_message(message)
}
);
)
}

async fn handle_new_stream(
&mut self,
outbound_stream: Option<(StreamId, mpsc::Receiver<StreamContent>)>,
) -> Result<(), StreamHandlerError> {
match outbound_stream {
None => Err(StreamHandlerError::OutboundChannelClosed),
Some((stream_id, receiver)) => {
self.outbound_stream_receivers.insert(stream_id, receiver);
Ok(())
}
}
}

async fn handle_outbound_message(
&mut self,
message: Option<(StreamId, Option<StreamContent>)>,
) {
) -> Result<(), StreamHandlerError> {
match message {
Some((key, Some(msg))) => {
self.broadcast(key, msg).await;
Ok(())
}
Some((key, None)) => {
self.broadcast_fin(key).await;
Ok(())
}
None => {
warn!(
"StreamHashMap should not be closed! Usually only the individual channels are \
closed. "
)
panic!("StreamHashMap should never be closed");
}
}
}
Expand Down Expand Up @@ -274,19 +301,21 @@ where

// Handle a message that was received from the network.
#[instrument(skip_all, level = "warn")]
#[allow(clippy::type_complexity)]
fn handle_inbound_message(
&mut self,
message: (
message: Option<(
Result<StreamMessage<StreamContent, StreamId>, ProtobufConversionError>,
BroadcastedMessageMetadata,
),
) {
let (message, metadata) = message;
let message = match message {
Ok(message) => message,
Err(e) => {
)>,
) -> Result<(), StreamHandlerError> {
let (message, metadata) = match message {
None => return Err(StreamHandlerError::InboundChannelClosed),
Some((Ok(message), metadata)) => (message, metadata),
Some((Err(e), _)) => {
// TODO(guy): switch to debug when network is opened to "all".
warn!("Error converting message: {:?}", e);
return;
return Ok(());
}
};

Expand All @@ -306,6 +335,7 @@ where
if let Some(data) = self.handle_message_inner(message, metadata, data) {
self.inbound_stream_data.insert(key, data);
}
Ok(())
}

/// Returns the StreamData struct if it should be put back into the hash map. None if the data
Expand Down
Loading

0 comments on commit a489bbc

Please sign in to comment.