Skip to content

Commit d23b7c5

Browse files
authored
Properly handle messages sent by ourselves (#166)
1 parent f030b75 commit d23b7c5

File tree

8 files changed

+83
-74
lines changed

8 files changed

+83
-74
lines changed

anchor/common/qbft/src/lib.rs

+14-16
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,10 @@ use types::Hash256;
1212
// Re-Exports for Manager
1313
pub use config::{Config, ConfigBuilder};
1414
pub use error::ConfigBuilderError;
15-
pub use qbft_types::Message;
1615
pub use qbft_types::WrappedQbftMessage;
1716
pub use qbft_types::{
1817
Completed, ConsensusData, DefaultLeaderFunction, InstanceHeight, InstanceState, LeaderFunction,
19-
Round,
18+
Round, UnsignedWrappedQbftMessage,
2019
};
2120

2221
mod config;
@@ -71,7 +70,7 @@ pub struct Qbft<F, D, S>
7170
where
7271
F: LeaderFunction + Clone,
7372
D: QbftData<Hash = Hash256>,
74-
S: FnMut(Message),
73+
S: FnMut(UnsignedWrappedQbftMessage),
7574
{
7675
/// The initial configuration used to establish this instance of QBFT.
7776
config: Config<F>,
@@ -121,7 +120,7 @@ impl<F, D, S> Qbft<F, D, S>
121120
where
122121
F: LeaderFunction + Clone,
123122
D: QbftData<Hash = Hash256>,
124-
S: FnMut(Message),
123+
S: FnMut(UnsignedWrappedQbftMessage),
125124
{
126125
// Construct a new QBFT Instance and start the first round
127126
pub fn new(config: Config<F>, start_data: D, send_message: S) -> Self {
@@ -908,7 +907,7 @@ where
908907
data_hash: D::Hash,
909908
round_change_justification: Vec<SignedSSVMessage>,
910909
prepare_justification: Vec<SignedSSVMessage>,
911-
) -> UnsignedSSVMessage {
910+
) -> UnsignedWrappedQbftMessage {
912911
let data = self.get_message_data(&msg_type, data_hash);
913912

914913
// Create the QBFT message
@@ -931,9 +930,12 @@ where
931930
.expect("SSVMessage should be valid."); //TODO revisit this
932931

933932
// Wrap in unsigned SSV message
934-
UnsignedSSVMessage {
935-
ssv_message,
936-
full_data: data.full_data,
933+
UnsignedWrappedQbftMessage {
934+
unsigned_message: UnsignedSSVMessage {
935+
ssv_message,
936+
full_data: data.full_data,
937+
},
938+
qbft_message,
937939
}
938940
}
939941

@@ -1066,8 +1068,7 @@ where
10661068
prepare_justifications,
10671069
);
10681070

1069-
let operator_id = self.config.operator_id();
1070-
(self.send_message)(Message::Propose(operator_id, unsigned_msg.clone()));
1071+
(self.send_message)(unsigned_msg);
10711072
}
10721073

10731074
// Send a new qbft prepare message
@@ -1082,8 +1083,7 @@ where
10821083
let unsigned_msg =
10831084
self.new_unsigned_message(QbftMessageType::Prepare, data_hash, vec![], vec![]);
10841085

1085-
let operator_id = self.config.operator_id();
1086-
(self.send_message)(Message::Prepare(operator_id, unsigned_msg.clone()));
1086+
(self.send_message)(unsigned_msg);
10871087
}
10881088

10891089
// Send a new qbft commit message
@@ -1092,8 +1092,7 @@ where
10921092
let unsigned_msg =
10931093
self.new_unsigned_message(QbftMessageType::Commit, data_hash, vec![], vec![]);
10941094

1095-
let operator_id = self.config.operator_id();
1096-
(self.send_message)(Message::Commit(operator_id, unsigned_msg.clone()));
1095+
(self.send_message)(unsigned_msg);
10971096
}
10981097

10991098
// Send a new qbft round change message
@@ -1114,8 +1113,7 @@ where
11141113
// forget that we accpeted a proposal
11151114
self.proposal_accepted_for_current_round = false;
11161115

1117-
let operator_id = self.config.operator_id();
1118-
(self.send_message)(Message::RoundChange(operator_id, unsigned_msg.clone()));
1116+
(self.send_message)(unsigned_msg);
11191117
}
11201118

11211119
/// Extract the data that the instance has come to consensus on

anchor/common/qbft/src/qbft_types.rs

+8-25
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ pub struct WrappedQbftMessage {
5050
pub qbft_message: QbftMessage,
5151
}
5252

53+
// Wrapped qbft message is a wrapper around both an unsigned ssv message, and the underlying qbft
54+
// message.
55+
#[derive(Debug, Clone)]
56+
pub struct UnsignedWrappedQbftMessage {
57+
pub unsigned_message: UnsignedSSVMessage,
58+
pub qbft_message: QbftMessage,
59+
}
60+
5361
/// This represents an individual round, these change on regular time intervals
5462
#[derive(Clone, Copy, Debug, Deref, PartialEq, Eq, Hash, PartialOrd, Ord)]
5563
pub struct Round(NonZeroUsize);
@@ -113,31 +121,6 @@ impl From<InstanceState> for u8 {
113121
}
114122
}
115123

116-
/// Generic Data trait to allow for future implementations of the QBFT module
117-
// Messages that can be received from the message_in channel
118-
#[derive(Debug, Clone)]
119-
pub enum Message {
120-
/// A PROPOSE message to be sent on the network.
121-
Propose(OperatorId, UnsignedSSVMessage),
122-
/// A PREPARE message to be sent on the network.
123-
Prepare(OperatorId, UnsignedSSVMessage),
124-
/// A commit message to be sent on the network.
125-
Commit(OperatorId, UnsignedSSVMessage),
126-
/// Round change message to be sent on the network
127-
RoundChange(OperatorId, UnsignedSSVMessage),
128-
}
129-
130-
impl Message {
131-
pub fn desugar(&self) -> (OperatorId, UnsignedSSVMessage) {
132-
match self {
133-
Message::Propose(id, msg)
134-
| Message::Prepare(id, msg)
135-
| Message::Commit(id, msg)
136-
| Message::RoundChange(id, msg) => (*id, msg.clone()),
137-
}
138-
}
139-
}
140-
141124
/// Type definitions for the allowable messages
142125
/// This holds the consensus data for a given round.
143126
#[derive(Debug, Clone)]

anchor/common/qbft/src/tests.rs

+12-24
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
use super::*;
66
use qbft_types::DefaultLeaderFunction;
77
use sha2::{Digest, Sha256};
8-
use ssv_types::consensus::UnsignedSSVMessage;
98
use ssv_types::message::{SignedSSVMessage, RSA_SIGNATURE_SIZE};
109
use ssv_types::OperatorId;
1110
use ssz_derive::{Decode, Encode};
@@ -40,26 +39,22 @@ impl QbftData for TestData {
4039
}
4140
}
4241

43-
fn convert_unsigned_to_wrapped(
44-
msg: UnsignedSSVMessage,
42+
fn convert_unsigned_to_signed(
43+
msg: UnsignedWrappedQbftMessage,
4544
operator_id: OperatorId,
4645
) -> WrappedQbftMessage {
4746
// Create a signed message containing just this operator
4847
let signed_message = SignedSSVMessage::new(
4948
vec![vec![0; RSA_SIGNATURE_SIZE]],
5049
vec![OperatorId(*operator_id)],
51-
msg.ssv_message.clone(),
52-
msg.full_data,
50+
msg.unsigned_message.ssv_message,
51+
msg.unsigned_message.full_data,
5352
)
5453
.expect("Should create signed message");
5554

56-
// Parse the QBFT message from the SSV message data
57-
let qbft_message =
58-
QbftMessage::from_ssz_bytes(msg.ssv_message.data()).expect("Should decode QBFT message");
59-
6055
WrappedQbftMessage {
6156
signed_message,
62-
qbft_message,
57+
qbft_message: msg.qbft_message,
6358
}
6459
}
6560

@@ -85,7 +80,7 @@ impl Default for TestQBFTCommitteeBuilder {
8580
impl TestQBFTCommitteeBuilder {
8681
/// Consumes self and runs a test scenario. This returns a [`TestQBFTCommittee`] which
8782
/// represents a running quorum.
88-
pub fn run<D>(self, data: D) -> TestQBFTCommittee<D, impl FnMut(Message)>
83+
pub fn run<D>(self, data: D) -> TestQBFTCommittee<D, impl FnMut(UnsignedWrappedQbftMessage)>
8984
where
9085
D: Default + QbftData<Hash = Hash256>,
9186
{
@@ -102,8 +97,8 @@ impl TestQBFTCommitteeBuilder {
10297

10398
/// A testing structure representing a committee of running instances
10499
#[allow(clippy::type_complexity)]
105-
struct TestQBFTCommittee<D: QbftData<Hash = Hash256>, S: FnMut(Message)> {
106-
msg_queue: Rc<RefCell<VecDeque<(OperatorId, Message)>>>,
100+
struct TestQBFTCommittee<D: QbftData<Hash = Hash256>, S: FnMut(UnsignedWrappedQbftMessage)> {
101+
msg_queue: Rc<RefCell<VecDeque<(OperatorId, UnsignedWrappedQbftMessage)>>>,
107102
instances: HashMap<OperatorId, Qbft<DefaultLeaderFunction, D, S>>,
108103
// All of the instances that are currently active, allows us to stop/restart instances by
109104
// controlling the messages being sent and received
@@ -117,7 +112,7 @@ struct TestQBFTCommittee<D: QbftData<Hash = Hash256>, S: FnMut(Message)> {
117112
fn construct_and_run_committee<D: QbftData<Hash = Hash256>>(
118113
mut config: ConfigBuilder,
119114
validated_data: D,
120-
) -> TestQBFTCommittee<D, impl FnMut(Message)> {
115+
) -> TestQBFTCommittee<D, impl FnMut(UnsignedWrappedQbftMessage)> {
121116
// The ID of a committee is just an integer in [0,committee_size)
122117

123118
let msg_queue = Rc::new(RefCell::new(VecDeque::new()));
@@ -145,7 +140,7 @@ fn construct_and_run_committee<D: QbftData<Hash = Hash256>>(
145140
}
146141
}
147142

148-
impl<D: QbftData<Hash = Hash256>, S: FnMut(Message)> TestQBFTCommittee<D, S> {
143+
impl<D: QbftData<Hash = Hash256>, S: FnMut(UnsignedWrappedQbftMessage)> TestQBFTCommittee<D, S> {
149144
fn wait_until_end(mut self) -> i32 {
150145
loop {
151146
let msg = self.msg_queue.borrow_mut().pop_front();
@@ -167,15 +162,8 @@ impl<D: QbftData<Hash = Hash256>, S: FnMut(Message)> TestQBFTCommittee<D, S> {
167162
// We do not make sure that id != sender since we want to loop back and receive our
168163
// own messages
169164
let instance = self.instances.get_mut(id).expect("Instance exists");
170-
// get the unsigned message and the sender
171-
let (_, unsigned) = match msg {
172-
Message::Propose(o, ref u)
173-
| Message::Prepare(o, ref u)
174-
| Message::Commit(o, ref u)
175-
| Message::RoundChange(o, ref u) => (o, u),
176-
};
177-
178-
let wrapped = convert_unsigned_to_wrapped(unsigned.clone(), sender);
165+
166+
let wrapped = convert_unsigned_to_signed(msg.clone(), sender);
179167
instance.receive(wrapped);
180168
}
181169
}

anchor/message_sender/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@ use ssv_types::consensus::UnsignedSSVMessage;
88
use ssv_types::message::SignedSSVMessage;
99
use ssv_types::CommitteeId;
1010

11+
type MessageCallback = dyn FnOnce(&SignedSSVMessage) + Send + 'static;
12+
1113
pub trait MessageSender: Send + Sync {
1214
fn sign_and_send(
1315
&self,
1416
message: UnsignedSSVMessage,
1517
committee_id: CommitteeId,
18+
additional_message_callback: Option<Box<MessageCallback>>,
1619
) -> Result<(), Error>;
1720
fn send(&self, message: SignedSSVMessage, committee_id: CommitteeId) -> Result<(), Error>;
1821
}

anchor/message_sender/src/network.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{Error, MessageSender};
1+
use crate::{Error, MessageCallback, MessageSender};
22
use openssl::error::ErrorStack;
33
use openssl::hash::MessageDigest;
44
use openssl::pkey::{PKey, Private};
@@ -30,6 +30,7 @@ impl MessageSender for Arc<NetworkMessageSender> {
3030
&self,
3131
message: UnsignedSSVMessage,
3232
committee_id: CommitteeId,
33+
additional_message_callback: Option<Box<MessageCallback>>,
3334
) -> Result<(), Error> {
3435
if self.network_tx.is_closed() {
3536
return Err(Error::NetworkQueueClosed);
@@ -59,6 +60,9 @@ impl MessageSender for Arc<NetworkMessageSender> {
5960
return;
6061
}
6162
};
63+
if let Some(callback) = additional_message_callback {
64+
callback(&message);
65+
}
6266
sender.do_send(message, committee_id);
6367
},
6468
SIGNER_NAME,

anchor/message_sender/src/testing.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{Error, MessageSender};
1+
use crate::{Error, MessageCallback, MessageSender};
22
use ssv_types::consensus::UnsignedSSVMessage;
33
use ssv_types::message::{SignedSSVMessage, RSA_SIGNATURE_SIZE};
44
use ssv_types::{CommitteeId, OperatorId};
@@ -14,6 +14,7 @@ impl MessageSender for MockMessageSender {
1414
&self,
1515
message: UnsignedSSVMessage,
1616
committee_id: CommitteeId,
17+
additional_message_callback: Option<Box<MessageCallback>>,
1718
) -> Result<(), Error> {
1819
let message = SignedSSVMessage::new(
1920
vec![vec![0u8; RSA_SIGNATURE_SIZE]],
@@ -22,6 +23,9 @@ impl MessageSender for MockMessageSender {
2223
message.full_data,
2324
)
2425
.unwrap();
26+
if let Some(callback) = additional_message_callback {
27+
callback(&message);
28+
}
2529
self.send(message, committee_id)
2630
}
2731

0 commit comments

Comments
 (0)