Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly handle messages sent by ourselves #166

Open
wants to merge 2 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions anchor/common/qbft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ use types::Hash256;
// Re-Exports for Manager
pub use config::{Config, ConfigBuilder};
pub use error::ConfigBuilderError;
pub use qbft_types::Message;
pub use qbft_types::WrappedQbftMessage;
pub use qbft_types::{
Completed, ConsensusData, DefaultLeaderFunction, InstanceHeight, InstanceState, LeaderFunction,
Round,
Round, UnsignedWrappedQbftMessage,
};

mod config;
Expand Down Expand Up @@ -71,7 +70,7 @@ pub struct Qbft<F, D, S>
where
F: LeaderFunction + Clone,
D: QbftData<Hash = Hash256>,
S: FnMut(Message),
S: FnMut(UnsignedWrappedQbftMessage),
{
/// The initial configuration used to establish this instance of QBFT.
config: Config<F>,
Expand Down Expand Up @@ -121,7 +120,7 @@ impl<F, D, S> Qbft<F, D, S>
where
F: LeaderFunction + Clone,
D: QbftData<Hash = Hash256>,
S: FnMut(Message),
S: FnMut(UnsignedWrappedQbftMessage),
{
// Construct a new QBFT Instance and start the first round
pub fn new(config: Config<F>, start_data: D, send_message: S) -> Self {
Expand Down Expand Up @@ -908,7 +907,7 @@ where
data_hash: D::Hash,
round_change_justification: Vec<SignedSSVMessage>,
prepare_justification: Vec<SignedSSVMessage>,
) -> UnsignedSSVMessage {
) -> UnsignedWrappedQbftMessage {
let data = self.get_message_data(&msg_type, data_hash);

// Create the QBFT message
Expand All @@ -931,9 +930,12 @@ where
.expect("SSVMessage should be valid."); //TODO revisit this

// Wrap in unsigned SSV message
UnsignedSSVMessage {
ssv_message,
full_data: data.full_data,
UnsignedWrappedQbftMessage {
unsigned_message: UnsignedSSVMessage {
ssv_message,
full_data: data.full_data,
},
qbft_message,
}
}

Expand Down Expand Up @@ -1066,8 +1068,7 @@ where
prepare_justifications,
);

let operator_id = self.config.operator_id();
(self.send_message)(Message::Propose(operator_id, unsigned_msg.clone()));
(self.send_message)(unsigned_msg);
}

// Send a new qbft prepare message
Expand All @@ -1082,8 +1083,7 @@ where
let unsigned_msg =
self.new_unsigned_message(QbftMessageType::Prepare, data_hash, vec![], vec![]);

let operator_id = self.config.operator_id();
(self.send_message)(Message::Prepare(operator_id, unsigned_msg.clone()));
(self.send_message)(unsigned_msg);
}

// Send a new qbft commit message
Expand All @@ -1092,8 +1092,7 @@ where
let unsigned_msg =
self.new_unsigned_message(QbftMessageType::Commit, data_hash, vec![], vec![]);

let operator_id = self.config.operator_id();
(self.send_message)(Message::Commit(operator_id, unsigned_msg.clone()));
(self.send_message)(unsigned_msg);
}

// Send a new qbft round change message
Expand All @@ -1114,8 +1113,7 @@ where
// forget that we accpeted a proposal
self.proposal_accepted_for_current_round = false;

let operator_id = self.config.operator_id();
(self.send_message)(Message::RoundChange(operator_id, unsigned_msg.clone()));
(self.send_message)(unsigned_msg);
}

/// Extract the data that the instance has come to consensus on
Expand Down
33 changes: 8 additions & 25 deletions anchor/common/qbft/src/qbft_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ pub struct WrappedQbftMessage {
pub qbft_message: QbftMessage,
}

// Wrapped qbft message is a wrapper around both an unsigned ssv message, and the underlying qbft
// message.
#[derive(Debug, Clone)]
pub struct UnsignedWrappedQbftMessage {
pub unsigned_message: UnsignedSSVMessage,
pub qbft_message: QbftMessage,
}

/// This represents an individual round, these change on regular time intervals
#[derive(Clone, Copy, Debug, Deref, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Round(NonZeroUsize);
Expand Down Expand Up @@ -113,31 +121,6 @@ impl From<InstanceState> for u8 {
}
}

/// Generic Data trait to allow for future implementations of the QBFT module
// Messages that can be received from the message_in channel
#[derive(Debug, Clone)]
pub enum Message {
/// A PROPOSE message to be sent on the network.
Propose(OperatorId, UnsignedSSVMessage),
/// A PREPARE message to be sent on the network.
Prepare(OperatorId, UnsignedSSVMessage),
/// A commit message to be sent on the network.
Commit(OperatorId, UnsignedSSVMessage),
/// Round change message to be sent on the network
RoundChange(OperatorId, UnsignedSSVMessage),
}

impl Message {
pub fn desugar(&self) -> (OperatorId, UnsignedSSVMessage) {
match self {
Message::Propose(id, msg)
| Message::Prepare(id, msg)
| Message::Commit(id, msg)
| Message::RoundChange(id, msg) => (*id, msg.clone()),
}
}
}

/// Type definitions for the allowable messages
/// This holds the consensus data for a given round.
#[derive(Debug, Clone)]
Expand Down
36 changes: 12 additions & 24 deletions anchor/common/qbft/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use super::*;
use qbft_types::DefaultLeaderFunction;
use sha2::{Digest, Sha256};
use ssv_types::consensus::UnsignedSSVMessage;
use ssv_types::message::{SignedSSVMessage, RSA_SIGNATURE_SIZE};
use ssv_types::OperatorId;
use ssz_derive::{Decode, Encode};
Expand Down Expand Up @@ -40,26 +39,22 @@ impl QbftData for TestData {
}
}

fn convert_unsigned_to_wrapped(
msg: UnsignedSSVMessage,
fn convert_unsigned_to_signed(
msg: UnsignedWrappedQbftMessage,
operator_id: OperatorId,
) -> WrappedQbftMessage {
// Create a signed message containing just this operator
let signed_message = SignedSSVMessage::new(
vec![vec![0; RSA_SIGNATURE_SIZE]],
vec![OperatorId(*operator_id)],
msg.ssv_message.clone(),
msg.full_data,
msg.unsigned_message.ssv_message,
msg.unsigned_message.full_data,
)
.expect("Should create signed message");

// Parse the QBFT message from the SSV message data
let qbft_message =
QbftMessage::from_ssz_bytes(msg.ssv_message.data()).expect("Should decode QBFT message");

WrappedQbftMessage {
signed_message,
qbft_message,
qbft_message: msg.qbft_message,
}
}

Expand All @@ -85,7 +80,7 @@ impl Default for TestQBFTCommitteeBuilder {
impl TestQBFTCommitteeBuilder {
/// Consumes self and runs a test scenario. This returns a [`TestQBFTCommittee`] which
/// represents a running quorum.
pub fn run<D>(self, data: D) -> TestQBFTCommittee<D, impl FnMut(Message)>
pub fn run<D>(self, data: D) -> TestQBFTCommittee<D, impl FnMut(UnsignedWrappedQbftMessage)>
where
D: Default + QbftData<Hash = Hash256>,
{
Expand All @@ -102,8 +97,8 @@ impl TestQBFTCommitteeBuilder {

/// A testing structure representing a committee of running instances
#[allow(clippy::type_complexity)]
struct TestQBFTCommittee<D: QbftData<Hash = Hash256>, S: FnMut(Message)> {
msg_queue: Rc<RefCell<VecDeque<(OperatorId, Message)>>>,
struct TestQBFTCommittee<D: QbftData<Hash = Hash256>, S: FnMut(UnsignedWrappedQbftMessage)> {
msg_queue: Rc<RefCell<VecDeque<(OperatorId, UnsignedWrappedQbftMessage)>>>,
instances: HashMap<OperatorId, Qbft<DefaultLeaderFunction, D, S>>,
// All of the instances that are currently active, allows us to stop/restart instances by
// controlling the messages being sent and received
Expand All @@ -117,7 +112,7 @@ struct TestQBFTCommittee<D: QbftData<Hash = Hash256>, S: FnMut(Message)> {
fn construct_and_run_committee<D: QbftData<Hash = Hash256>>(
mut config: ConfigBuilder,
validated_data: D,
) -> TestQBFTCommittee<D, impl FnMut(Message)> {
) -> TestQBFTCommittee<D, impl FnMut(UnsignedWrappedQbftMessage)> {
// The ID of a committee is just an integer in [0,committee_size)

let msg_queue = Rc::new(RefCell::new(VecDeque::new()));
Expand Down Expand Up @@ -145,7 +140,7 @@ fn construct_and_run_committee<D: QbftData<Hash = Hash256>>(
}
}

impl<D: QbftData<Hash = Hash256>, S: FnMut(Message)> TestQBFTCommittee<D, S> {
impl<D: QbftData<Hash = Hash256>, S: FnMut(UnsignedWrappedQbftMessage)> TestQBFTCommittee<D, S> {
fn wait_until_end(mut self) -> i32 {
loop {
let msg = self.msg_queue.borrow_mut().pop_front();
Expand All @@ -167,15 +162,8 @@ impl<D: QbftData<Hash = Hash256>, S: FnMut(Message)> TestQBFTCommittee<D, S> {
// We do not make sure that id != sender since we want to loop back and receive our
// own messages
let instance = self.instances.get_mut(id).expect("Instance exists");
// get the unsigned message and the sender
let (_, unsigned) = match msg {
Message::Propose(o, ref u)
| Message::Prepare(o, ref u)
| Message::Commit(o, ref u)
| Message::RoundChange(o, ref u) => (o, u),
};

let wrapped = convert_unsigned_to_wrapped(unsigned.clone(), sender);

let wrapped = convert_unsigned_to_signed(msg.clone(), sender);
instance.receive(wrapped);
}
}
Expand Down
3 changes: 3 additions & 0 deletions anchor/message_sender/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ use ssv_types::consensus::UnsignedSSVMessage;
use ssv_types::message::SignedSSVMessage;
use ssv_types::CommitteeId;

type MessageCallback = dyn FnOnce(&SignedSSVMessage) + Send + 'static;

pub trait MessageSender: Send + Sync {
fn sign_and_send(
&self,
message: UnsignedSSVMessage,
committee_id: CommitteeId,
additional_message_callback: Option<Box<MessageCallback>>,
) -> Result<(), Error>;
fn send(&self, message: SignedSSVMessage, committee_id: CommitteeId) -> Result<(), Error>;
}
Expand Down
6 changes: 5 additions & 1 deletion anchor/message_sender/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Error, MessageSender};
use crate::{Error, MessageCallback, MessageSender};
use openssl::error::ErrorStack;
use openssl::hash::MessageDigest;
use openssl::pkey::{PKey, Private};
Expand Down Expand Up @@ -30,6 +30,7 @@ impl MessageSender for Arc<NetworkMessageSender> {
&self,
message: UnsignedSSVMessage,
committee_id: CommitteeId,
additional_message_callback: Option<Box<MessageCallback>>,
) -> Result<(), Error> {
if self.network_tx.is_closed() {
return Err(Error::NetworkQueueClosed);
Expand Down Expand Up @@ -59,6 +60,9 @@ impl MessageSender for Arc<NetworkMessageSender> {
return;
}
};
if let Some(callback) = additional_message_callback {
callback(&message);
}
sender.do_send(message, committee_id);
},
SIGNER_NAME,
Expand Down
6 changes: 5 additions & 1 deletion anchor/message_sender/src/testing.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Error, MessageSender};
use crate::{Error, MessageCallback, MessageSender};
use ssv_types::consensus::UnsignedSSVMessage;
use ssv_types::message::{SignedSSVMessage, RSA_SIGNATURE_SIZE};
use ssv_types::{CommitteeId, OperatorId};
Expand All @@ -14,6 +14,7 @@ impl MessageSender for MockMessageSender {
&self,
message: UnsignedSSVMessage,
committee_id: CommitteeId,
additional_message_callback: Option<Box<MessageCallback>>,
) -> Result<(), Error> {
let message = SignedSSVMessage::new(
vec![vec![0u8; RSA_SIGNATURE_SIZE]],
Expand All @@ -22,6 +23,9 @@ impl MessageSender for MockMessageSender {
message.full_data,
)
.unwrap();
if let Some(callback) = additional_message_callback {
callback(&message);
}
self.send(message, committee_id)
}

Expand Down
Loading