Skip to content

Commit

Permalink
add test and timeout error response
Browse files Browse the repository at this point in the history
  • Loading branch information
TomasArrachea committed Aug 16, 2024
1 parent a14c172 commit b7858bf
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/services/bls_aggregation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ eigen-crypto-bn254.workspace = true
eigen-services-avsregistry.workspace = true
eigen-types.workspace = true
parking_lot.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }

[dev-dependencies]
Expand Down
163 changes: 142 additions & 21 deletions crates/services/bls_aggregation/src/bls_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use eigen_types::{
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
Mutex,
Expand All @@ -33,6 +34,12 @@ pub struct BlsAggregationServiceResponse {
non_signer_stake_indices: Vec<Vec<u32>>,
}

#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum BlsAggregationServiceError {
#[error("timeout error")]
Timeout,
}

#[derive(Debug, Clone)]
pub struct AggregatedOperators {
signers_apk_g2: BlsG2Point,
Expand All @@ -46,8 +53,11 @@ pub struct BlsAggregatorService<A: AvsRegistryService>
where
A: Clone,
{
aggregated_response_sender: UnboundedSender<BlsAggregationServiceResponse>,
pub aggregated_response_receiver: Arc<Mutex<UnboundedReceiver<BlsAggregationServiceResponse>>>,
aggregated_response_sender:
UnboundedSender<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>,
pub aggregated_response_receiver: Arc<
Mutex<UnboundedReceiver<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>>,
>,
signed_task_response:
Arc<RwLock<HashMap<TaskIndex, UnboundedSender<SignedTaskResponseDigest>>>>,

Expand Down Expand Up @@ -288,7 +298,7 @@ impl<A: AvsRegistryService + Send + Sync + Clone + 'static> BlsAggregatorService

let _ = self
.aggregated_response_sender
.send(bls_aggregation_service_response);
.send(Ok(bls_aggregation_service_response));
}
Ok(None) => {
// channel closed
Expand All @@ -297,6 +307,9 @@ impl<A: AvsRegistryService + Send + Sync + Clone + 'static> BlsAggregatorService
Err(_) => {
// timeout
println!("expire");
let _ = self
.aggregated_response_sender
.send(Err(BlsAggregationServiceError::Timeout));
return;
}
}
Expand Down Expand Up @@ -369,7 +382,7 @@ mod tests {
use std::time::Duration;
use std::vec;

use super::{BlsAggregationServiceResponse, BlsAggregatorService};
use super::{BlsAggregationServiceError, BlsAggregationServiceResponse, BlsAggregatorService};

fn new_bls_key_pair_panics(hex_key: String) -> BlsKeyPair {
let keypair = BlsKeyPair::new(hex_key);
Expand Down Expand Up @@ -454,8 +467,11 @@ mod tests {
.recv()
.await;

assert_eq!(expected_agg_service_response, response.clone().unwrap());
assert_eq!(task_index, response.unwrap().task_index);
assert_eq!(
expected_agg_service_response,
response.clone().unwrap().unwrap()
);
assert_eq!(task_index, response.unwrap().unwrap().task_index);
}

#[tokio::test]
Expand Down Expand Up @@ -585,8 +601,11 @@ mod tests {
.recv()
.await;

assert_eq!(expected_agg_service_response, response.clone().unwrap());
assert_eq!(task_index, response.unwrap().task_index);
assert_eq!(
expected_agg_service_response,
response.clone().unwrap().unwrap()
);
assert_eq!(task_index, response.unwrap().unwrap().task_index);
}

#[tokio::test]
Expand Down Expand Up @@ -696,7 +715,7 @@ mod tests {
.recv()
.await;

assert_eq!(expected_agg_service_response, response.unwrap());
assert_eq!(expected_agg_service_response, response.unwrap().unwrap());
}

#[tokio::test]
Expand Down Expand Up @@ -879,14 +898,15 @@ mod tests {
.await
.unwrap();

let (task_1_response, task_2_response) = if first_response.task_index == 1 {
let (task_1_response, task_2_response) = if first_response.clone().unwrap().task_index == 1
{
(first_response, second_response)
} else {
(second_response, first_response)
};

assert_eq!(expected_response_task_1, task_1_response);
assert_eq!(expected_response_task_2, task_2_response);
assert_eq!(expected_response_task_1, task_1_response.unwrap());
assert_eq!(expected_response_task_2, task_2_response.unwrap());
}

// #[tokio::test]
Expand Down Expand Up @@ -933,8 +953,7 @@ mod tests {
}

#[tokio::test]
async fn test_1_quorum_2_operator_1_signatures_50_quorum() {
// 1 quorum 2 operator 1 correct signature quorumThreshold 50% - verified
async fn test_1_quorum_2_operator_1_signatures_50_threshold() {
let test_operator_1 = TestOperator {
operator_id: U256::from(1).into(),
stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
Expand Down Expand Up @@ -1012,8 +1031,11 @@ mod tests {
.recv()
.await;

assert_eq!(expected_agg_service_response, response.clone().unwrap());
assert_eq!(task_index, response.unwrap().task_index);
assert_eq!(
expected_agg_service_response,
response.clone().unwrap().unwrap()
);
assert_eq!(task_index, response.unwrap().unwrap().task_index);
}

#[tokio::test]
Expand Down Expand Up @@ -1112,12 +1134,15 @@ mod tests {
.recv()
.await;

assert_eq!(expected_agg_service_response, response.clone().unwrap());
assert_eq!(task_index, response.unwrap().task_index);
assert_eq!(
expected_agg_service_response,
response.clone().unwrap().unwrap()
);
assert_eq!(task_index, response.unwrap().unwrap().task_index);
}

#[tokio::test]
async fn test_2_quorums_3_operators_which_just_stake_1_quorum_2_correct_signatures() {
async fn test_2_quorums_3_operators_which_just_stake_1_quorum_50_threshold() {
let test_operator_1 = TestOperator {
operator_id: U256::from(1).into(),
// Note the quorums is [0, 1], but operator id 1 just stake 0.
Expand Down Expand Up @@ -1235,7 +1260,103 @@ mod tests {
.recv()
.await;

assert_eq!(expected_agg_service_response, response.clone().unwrap());
assert_eq!(task_index, response.unwrap().task_index);
assert_eq!(
expected_agg_service_response,
response.clone().unwrap().unwrap()
);
assert_eq!(task_index, response.unwrap().unwrap().task_index);
}

#[tokio::test]
async fn test_2_quorums_3_operators_which_just_stake_1_quorum_60_threshold() {
// results in `task expired`
let test_operator_1 = TestOperator {
operator_id: U256::from(1).into(),
// Note the quorums is [0, 1], but operator id 1 just stake 0.
stake_per_quorum: HashMap::from([(0u8, U256::from(100))]),
bls_keypair: new_bls_key_pair_panics(
"13710126902690889134622698668747132666439281256983827313388062967626731803599"
.into(),
),
};
let test_operator_2 = TestOperator {
operator_id: U256::from(2).into(),
// Note the quorums is [0, 1], but operator id 2 just stake 1.
stake_per_quorum: HashMap::from([(1u8, U256::from(200))]),
bls_keypair: new_bls_key_pair_panics(
"14610126902690889134622698668747132666439281256983827313388062967626731803500"
.into(),
),
};

let test_operator_3 = TestOperator {
operator_id: U256::from(3).into(),
// Note the quorums is [0, 1], but operator id 3 just stake 0.
stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]),
bls_keypair: new_bls_key_pair_panics(
"15710126902690889134622698668747132666439281256983827313388062967626731803599"
.into(),
),
};

let test_operators = vec![
test_operator_1.clone(),
test_operator_2.clone(),
test_operator_3.clone(),
];
let block_number = 1;
let task_index = 0;
let quorum_numbers: Vec<QuorumNum> = vec![0, 1];
let quorum_threshold_percentages: QuorumThresholdPercentages = vec![60u8, 60u8];
let time_to_expiry = Duration::from_secs(1);
let task_response = 123; // Initialize with appropriate data
let task_response_digest = hash(task_response);

let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators);
let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service);

bls_agg_service
.initialize_new_task::<FakeAvsRegistryService>(
task_index,
block_number as u32,
quorum_numbers,
quorum_threshold_percentages,
time_to_expiry,
)
.await;

let bls_sig_op_1 = test_operator_1
.bls_keypair
.sign_message(task_response_digest.as_ref());
bls_agg_service
.process_new_signature(
task_index,
task_response_digest,
bls_sig_op_1.clone(),
test_operator_1.operator_id,
)
.await;

let bls_sig_op_2 = test_operator_2
.bls_keypair
.sign_message(task_response_digest.as_ref());
bls_agg_service
.process_new_signature(
task_index,
task_response_digest,
bls_sig_op_2.clone(),
test_operator_2.operator_id,
)
.await;

dbg!("waiting response");
let response = bls_agg_service
.aggregated_response_receiver
.lock()
.await
.recv()
.await;

assert_eq!(Err(BlsAggregationServiceError::Timeout), response.unwrap());
}
}

0 comments on commit b7858bf

Please sign in to comment.