diff --git a/anchor/common/qbft/src/lib.rs b/anchor/common/qbft/src/lib.rs index dd14e914..b2cc4f41 100644 --- a/anchor/common/qbft/src/lib.rs +++ b/anchor/common/qbft/src/lib.rs @@ -12,11 +12,10 @@ use types::Hash256; // Re-Exports for Manager pub use config::{Config, ConfigBuilder}; pub use error::ConfigBuilderError; -pub use qbft_types::Message; pub use qbft_types::WrappedQbftMessage; pub use qbft_types::{ Completed, ConsensusData, DefaultLeaderFunction, InstanceHeight, InstanceState, LeaderFunction, - Round, + Round, UnsignedWrappedQbftMessage, }; mod config; @@ -71,7 +70,7 @@ pub struct Qbft where F: LeaderFunction + Clone, D: QbftData, - S: FnMut(Message), + S: FnMut(UnsignedWrappedQbftMessage), { /// The initial configuration used to establish this instance of QBFT. config: Config, @@ -121,7 +120,7 @@ impl Qbft where F: LeaderFunction + Clone, D: QbftData, - S: FnMut(Message), + S: FnMut(UnsignedWrappedQbftMessage), { // Construct a new QBFT Instance and start the first round pub fn new(config: Config, start_data: D, send_message: S) -> Self { @@ -908,7 +907,7 @@ where data_hash: D::Hash, round_change_justification: Vec, prepare_justification: Vec, - ) -> UnsignedSSVMessage { + ) -> UnsignedWrappedQbftMessage { let data = self.get_message_data(&msg_type, data_hash); // Create the QBFT message @@ -931,9 +930,12 @@ where .expect("SSVMessage should be valid."); //TODO revisit this // Wrap in unsigned SSV message - UnsignedSSVMessage { - ssv_message, - full_data: data.full_data, + UnsignedWrappedQbftMessage { + unsigned_message: UnsignedSSVMessage { + ssv_message, + full_data: data.full_data, + }, + qbft_message, } } @@ -1066,8 +1068,7 @@ where prepare_justifications, ); - let operator_id = self.config.operator_id(); - (self.send_message)(Message::Propose(operator_id, unsigned_msg.clone())); + (self.send_message)(unsigned_msg); } // Send a new qbft prepare message @@ -1082,8 +1083,7 @@ where let unsigned_msg = self.new_unsigned_message(QbftMessageType::Prepare, data_hash, vec![], vec![]); - let operator_id = self.config.operator_id(); - (self.send_message)(Message::Prepare(operator_id, unsigned_msg.clone())); + (self.send_message)(unsigned_msg); } // Send a new qbft commit message @@ -1092,8 +1092,7 @@ where let unsigned_msg = self.new_unsigned_message(QbftMessageType::Commit, data_hash, vec![], vec![]); - let operator_id = self.config.operator_id(); - (self.send_message)(Message::Commit(operator_id, unsigned_msg.clone())); + (self.send_message)(unsigned_msg); } // Send a new qbft round change message @@ -1114,8 +1113,7 @@ where // forget that we accpeted a proposal self.proposal_accepted_for_current_round = false; - let operator_id = self.config.operator_id(); - (self.send_message)(Message::RoundChange(operator_id, unsigned_msg.clone())); + (self.send_message)(unsigned_msg); } /// Extract the data that the instance has come to consensus on diff --git a/anchor/common/qbft/src/qbft_types.rs b/anchor/common/qbft/src/qbft_types.rs index f8af9857..89cd20f8 100644 --- a/anchor/common/qbft/src/qbft_types.rs +++ b/anchor/common/qbft/src/qbft_types.rs @@ -50,6 +50,14 @@ pub struct WrappedQbftMessage { pub qbft_message: QbftMessage, } +// Wrapped qbft message is a wrapper around both an unsigned ssv message, and the underlying qbft +// message. +#[derive(Debug, Clone)] +pub struct UnsignedWrappedQbftMessage { + pub unsigned_message: UnsignedSSVMessage, + pub qbft_message: QbftMessage, +} + /// This represents an individual round, these change on regular time intervals #[derive(Clone, Copy, Debug, Deref, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct Round(NonZeroUsize); @@ -113,31 +121,6 @@ impl From for u8 { } } -/// Generic Data trait to allow for future implementations of the QBFT module -// Messages that can be received from the message_in channel -#[derive(Debug, Clone)] -pub enum Message { - /// A PROPOSE message to be sent on the network. - Propose(OperatorId, UnsignedSSVMessage), - /// A PREPARE message to be sent on the network. - Prepare(OperatorId, UnsignedSSVMessage), - /// A commit message to be sent on the network. - Commit(OperatorId, UnsignedSSVMessage), - /// Round change message to be sent on the network - RoundChange(OperatorId, UnsignedSSVMessage), -} - -impl Message { - pub fn desugar(&self) -> (OperatorId, UnsignedSSVMessage) { - match self { - Message::Propose(id, msg) - | Message::Prepare(id, msg) - | Message::Commit(id, msg) - | Message::RoundChange(id, msg) => (*id, msg.clone()), - } - } -} - /// Type definitions for the allowable messages /// This holds the consensus data for a given round. #[derive(Debug, Clone)] diff --git a/anchor/common/qbft/src/tests.rs b/anchor/common/qbft/src/tests.rs index 0243956d..3e1f734c 100644 --- a/anchor/common/qbft/src/tests.rs +++ b/anchor/common/qbft/src/tests.rs @@ -5,7 +5,6 @@ use super::*; use qbft_types::DefaultLeaderFunction; use sha2::{Digest, Sha256}; -use ssv_types::consensus::UnsignedSSVMessage; use ssv_types::message::{SignedSSVMessage, RSA_SIGNATURE_SIZE}; use ssv_types::OperatorId; use ssz_derive::{Decode, Encode}; @@ -40,26 +39,22 @@ impl QbftData for TestData { } } -fn convert_unsigned_to_wrapped( - msg: UnsignedSSVMessage, +fn convert_unsigned_to_signed( + msg: UnsignedWrappedQbftMessage, operator_id: OperatorId, ) -> WrappedQbftMessage { // Create a signed message containing just this operator let signed_message = SignedSSVMessage::new( vec![vec![0; RSA_SIGNATURE_SIZE]], vec![OperatorId(*operator_id)], - msg.ssv_message.clone(), - msg.full_data, + msg.unsigned_message.ssv_message, + msg.unsigned_message.full_data, ) .expect("Should create signed message"); - // Parse the QBFT message from the SSV message data - let qbft_message = - QbftMessage::from_ssz_bytes(msg.ssv_message.data()).expect("Should decode QBFT message"); - WrappedQbftMessage { signed_message, - qbft_message, + qbft_message: msg.qbft_message, } } @@ -85,7 +80,7 @@ impl Default for TestQBFTCommitteeBuilder { impl TestQBFTCommitteeBuilder { /// Consumes self and runs a test scenario. This returns a [`TestQBFTCommittee`] which /// represents a running quorum. - pub fn run(self, data: D) -> TestQBFTCommittee + pub fn run(self, data: D) -> TestQBFTCommittee where D: Default + QbftData, { @@ -102,8 +97,8 @@ impl TestQBFTCommitteeBuilder { /// A testing structure representing a committee of running instances #[allow(clippy::type_complexity)] -struct TestQBFTCommittee, S: FnMut(Message)> { - msg_queue: Rc>>, +struct TestQBFTCommittee, S: FnMut(UnsignedWrappedQbftMessage)> { + msg_queue: Rc>>, instances: HashMap>, // All of the instances that are currently active, allows us to stop/restart instances by // controlling the messages being sent and received @@ -117,7 +112,7 @@ struct TestQBFTCommittee, S: FnMut(Message)> { fn construct_and_run_committee>( mut config: ConfigBuilder, validated_data: D, -) -> TestQBFTCommittee { +) -> TestQBFTCommittee { // The ID of a committee is just an integer in [0,committee_size) let msg_queue = Rc::new(RefCell::new(VecDeque::new())); @@ -145,7 +140,7 @@ fn construct_and_run_committee>( } } -impl, S: FnMut(Message)> TestQBFTCommittee { +impl, S: FnMut(UnsignedWrappedQbftMessage)> TestQBFTCommittee { fn wait_until_end(mut self) -> i32 { loop { let msg = self.msg_queue.borrow_mut().pop_front(); @@ -167,15 +162,8 @@ impl, S: FnMut(Message)> TestQBFTCommittee { // We do not make sure that id != sender since we want to loop back and receive our // own messages let instance = self.instances.get_mut(id).expect("Instance exists"); - // get the unsigned message and the sender - let (_, unsigned) = match msg { - Message::Propose(o, ref u) - | Message::Prepare(o, ref u) - | Message::Commit(o, ref u) - | Message::RoundChange(o, ref u) => (o, u), - }; - - let wrapped = convert_unsigned_to_wrapped(unsigned.clone(), sender); + + let wrapped = convert_unsigned_to_signed(msg.clone(), sender); instance.receive(wrapped); } } diff --git a/anchor/message_sender/src/lib.rs b/anchor/message_sender/src/lib.rs index 503f3b7c..1d46664b 100644 --- a/anchor/message_sender/src/lib.rs +++ b/anchor/message_sender/src/lib.rs @@ -8,11 +8,14 @@ use ssv_types::consensus::UnsignedSSVMessage; use ssv_types::message::SignedSSVMessage; use ssv_types::CommitteeId; +type MessageCallback = dyn FnOnce(&SignedSSVMessage) + Send + 'static; + pub trait MessageSender: Send + Sync { fn sign_and_send( &self, message: UnsignedSSVMessage, committee_id: CommitteeId, + additional_message_callback: Option>, ) -> Result<(), Error>; fn send(&self, message: SignedSSVMessage, committee_id: CommitteeId) -> Result<(), Error>; } diff --git a/anchor/message_sender/src/network.rs b/anchor/message_sender/src/network.rs index 84858380..2815f3af 100644 --- a/anchor/message_sender/src/network.rs +++ b/anchor/message_sender/src/network.rs @@ -1,4 +1,4 @@ -use crate::{Error, MessageSender}; +use crate::{Error, MessageCallback, MessageSender}; use openssl::error::ErrorStack; use openssl::hash::MessageDigest; use openssl::pkey::{PKey, Private}; @@ -30,6 +30,7 @@ impl MessageSender for Arc { &self, message: UnsignedSSVMessage, committee_id: CommitteeId, + additional_message_callback: Option>, ) -> Result<(), Error> { if self.network_tx.is_closed() { return Err(Error::NetworkQueueClosed); @@ -59,6 +60,9 @@ impl MessageSender for Arc { return; } }; + if let Some(callback) = additional_message_callback { + callback(&message); + } sender.do_send(message, committee_id); }, SIGNER_NAME, diff --git a/anchor/message_sender/src/testing.rs b/anchor/message_sender/src/testing.rs index da7ac031..3c545b55 100644 --- a/anchor/message_sender/src/testing.rs +++ b/anchor/message_sender/src/testing.rs @@ -1,4 +1,4 @@ -use crate::{Error, MessageSender}; +use crate::{Error, MessageCallback, MessageSender}; use ssv_types::consensus::UnsignedSSVMessage; use ssv_types::message::{SignedSSVMessage, RSA_SIGNATURE_SIZE}; use ssv_types::{CommitteeId, OperatorId}; @@ -14,6 +14,7 @@ impl MessageSender for MockMessageSender { &self, message: UnsignedSSVMessage, committee_id: CommitteeId, + additional_message_callback: Option>, ) -> Result<(), Error> { let message = SignedSSVMessage::new( vec![vec![0u8; RSA_SIGNATURE_SIZE]], @@ -22,6 +23,9 @@ impl MessageSender for MockMessageSender { message.full_data, ) .unwrap(); + if let Some(callback) = additional_message_callback { + callback(&message); + } self.send(message, committee_id) } diff --git a/anchor/qbft_manager/src/lib.rs b/anchor/qbft_manager/src/lib.rs index 7cd5b22f..50cf1c40 100644 --- a/anchor/qbft_manager/src/lib.rs +++ b/anchor/qbft_manager/src/lib.rs @@ -2,8 +2,8 @@ use dashmap::DashMap; use message_sender::MessageSender; use processor::{DropOnFinish, Senders}; use qbft::{ - Completed, ConfigBuilder, ConfigBuilderError, DefaultLeaderFunction, InstanceHeight, Message, - WrappedQbftMessage, + Completed, ConfigBuilder, ConfigBuilderError, DefaultLeaderFunction, InstanceHeight, + UnsignedWrappedQbftMessage, WrappedQbftMessage, }; use slot_clock::SlotClock; @@ -257,7 +257,7 @@ impl QbftDecidable for BeaconVote { } // States that Qbft instance may be in -enum QbftInstance, S: FnMut(Message)> { +enum QbftInstance, S: FnMut(UnsignedWrappedQbftMessage)> { // The instance is uninitialized Uninitialized { // todo: proooobably limit this @@ -269,6 +269,7 @@ enum QbftInstance, S: FnMut(Message)> { Initialized { qbft: Box>, round_end: Interval, + sent_by_us: UnboundedReceiver, on_completed: Vec>>, }, // The instance has been decided @@ -292,11 +293,21 @@ async fn qbft_instance>( QbftInstance::Uninitialized { .. } | QbftInstance::Decided { .. } => rx.recv().await, QbftInstance::Initialized { qbft: instance, + sent_by_us, round_end, .. } => { select! { message = rx.recv() => message, + sent_by_us = sent_by_us.recv() => { + if let Some(sent_by_us) = sent_by_us { + instance.receive(sent_by_us); + } else { + // should not ever happen + error!("QBFT instance dropped message callback"); + } + continue; + }, _ = round_end.tick() => { warn!("Round timer elapsed"); instance.end_round(); @@ -320,6 +331,8 @@ async fn qbft_instance>( // The instance is uninitialized and we have received a manager message to // initialize it QbftInstance::Uninitialized { message_buffer } => { + let (sent_by_us_tx, sent_by_us_rx) = mpsc::unbounded_channel(); + let message_sender = message_sender.clone(); let committee_id = config .committee_members() @@ -329,10 +342,19 @@ async fn qbft_instance>( .into(); // Create a new instance and receive any buffered messages let mut instance = Box::new(Qbft::new(config, initial, move |message| { - let (_, unsigned) = message.desugar(); - if let Err(err) = - message_sender.clone().sign_and_send(unsigned, committee_id) - { + let sent_by_us_tx = sent_by_us_tx.clone(); + if let Err(err) = message_sender.clone().sign_and_send( + message.unsigned_message, + committee_id, + Some(Box::new(move |signed| { + // this might fail, but that's ok: it simply means that the + // instance has shut down (e.g. because it's done) + let _ = sent_by_us_tx.send(WrappedQbftMessage { + signed_message: signed.clone(), + qbft_message: message.qbft_message, + }); + })), + ) { error!(?err, "Unable to send qbft message!"); } })); @@ -347,12 +369,14 @@ async fn qbft_instance>( QbftInstance::Initialized { round_end: interval, qbft: instance, + sent_by_us: sent_by_us_rx, on_completed: vec![on_completed], } } QbftInstance::Initialized { qbft, round_end, + sent_by_us, on_completed: mut on_completed_vec, } => { if qbft.start_data_hash() != &initial.hash() { @@ -362,6 +386,7 @@ async fn qbft_instance>( QbftInstance::Initialized { qbft, round_end, + sent_by_us, on_completed: on_completed_vec, } } @@ -395,6 +420,7 @@ async fn qbft_instance>( if let QbftInstance::Initialized { qbft, round_end, + sent_by_us, on_completed, } = instance { @@ -428,6 +454,7 @@ async fn qbft_instance>( instance = QbftInstance::Initialized { qbft, round_end, + sent_by_us, on_completed, } } diff --git a/anchor/signature_collector/src/lib.rs b/anchor/signature_collector/src/lib.rs index 6bc95901..2bb2793d 100644 --- a/anchor/signature_collector/src/lib.rs +++ b/anchor/signature_collector/src/lib.rs @@ -146,6 +146,7 @@ impl SignatureCollectorManager { &DutyExecutor::Validator(pubkey), ), metadata.committee_id, + None, ) { error!(?err, "Error sending validator partial signature"); } @@ -191,6 +192,7 @@ impl SignatureCollectorManager { &DutyExecutor::Committee(metadata.committee_id), ), metadata.committee_id, + None, ) { error!(?err, "Error sending committee partial signatures"); }