diff --git a/Cargo.lock b/Cargo.lock index 6f8227b5c..0af2940a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2228,6 +2228,7 @@ dependencies = [ "eigen-types", "parking_lot", "sha2", + "thiserror", "tokio", ] diff --git a/crates/services/bls_aggregation/Cargo.toml b/crates/services/bls_aggregation/Cargo.toml index 5683fb211..934a023a4 100644 --- a/crates/services/bls_aggregation/Cargo.toml +++ b/crates/services/bls_aggregation/Cargo.toml @@ -17,6 +17,7 @@ eigen-crypto-bn254.workspace = true eigen-services-avsregistry.workspace = true eigen-types.workspace = true parking_lot.workspace = true +thiserror.workspace = true tokio = { workspace = true, features = ["full"] } [dev-dependencies] diff --git a/crates/services/bls_aggregation/src/bls_agg.rs b/crates/services/bls_aggregation/src/bls_agg.rs index 7e77bc18b..883b9b973 100644 --- a/crates/services/bls_aggregation/src/bls_agg.rs +++ b/crates/services/bls_aggregation/src/bls_agg.rs @@ -12,6 +12,7 @@ use eigen_types::{ use parking_lot::RwLock; use std::collections::HashMap; use std::sync::Arc; +use thiserror::Error; use tokio::sync::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, Mutex, @@ -33,6 +34,12 @@ pub struct BlsAggregationServiceResponse { non_signer_stake_indices: Vec>, } +#[derive(Error, Debug, Clone, PartialEq, Eq)] +pub enum BlsAggregationServiceError { + #[error("timeout error")] + Timeout, +} + #[derive(Debug, Clone)] pub struct AggregatedOperators { signers_apk_g2: BlsG2Point, @@ -46,8 +53,11 @@ pub struct BlsAggregatorService where A: Clone, { - aggregated_response_sender: UnboundedSender, - pub aggregated_response_receiver: Arc>>, + aggregated_response_sender: + UnboundedSender>, + pub aggregated_response_receiver: Arc< + Mutex>>, + >, signed_task_response: Arc>>>, @@ -288,7 +298,7 @@ impl BlsAggregatorService let _ = self .aggregated_response_sender - .send(bls_aggregation_service_response); + .send(Ok(bls_aggregation_service_response)); } Ok(None) => { // channel closed @@ -297,6 +307,9 @@ impl BlsAggregatorService Err(_) => { // timeout println!("expire"); + let _ = self + .aggregated_response_sender + .send(Err(BlsAggregationServiceError::Timeout)); return; } } @@ -369,7 +382,7 @@ mod tests { use std::time::Duration; use std::vec; - use super::{BlsAggregationServiceResponse, BlsAggregatorService}; + use super::{BlsAggregationServiceError, BlsAggregationServiceResponse, BlsAggregatorService}; fn new_bls_key_pair_panics(hex_key: String) -> BlsKeyPair { let keypair = BlsKeyPair::new(hex_key); @@ -454,8 +467,11 @@ mod tests { .recv() .await; - assert_eq!(expected_agg_service_response, response.clone().unwrap()); - assert_eq!(task_index, response.unwrap().task_index); + assert_eq!( + expected_agg_service_response, + response.clone().unwrap().unwrap() + ); + assert_eq!(task_index, response.unwrap().unwrap().task_index); } #[tokio::test] @@ -585,8 +601,11 @@ mod tests { .recv() .await; - assert_eq!(expected_agg_service_response, response.clone().unwrap()); - assert_eq!(task_index, response.unwrap().task_index); + assert_eq!( + expected_agg_service_response, + response.clone().unwrap().unwrap() + ); + assert_eq!(task_index, response.unwrap().unwrap().task_index); } #[tokio::test] @@ -696,7 +715,7 @@ mod tests { .recv() .await; - assert_eq!(expected_agg_service_response, response.unwrap()); + assert_eq!(expected_agg_service_response, response.unwrap().unwrap()); } #[tokio::test] @@ -879,14 +898,15 @@ mod tests { .await .unwrap(); - let (task_1_response, task_2_response) = if first_response.task_index == 1 { + let (task_1_response, task_2_response) = if first_response.clone().unwrap().task_index == 1 + { (first_response, second_response) } else { (second_response, first_response) }; - assert_eq!(expected_response_task_1, task_1_response); - assert_eq!(expected_response_task_2, task_2_response); + assert_eq!(expected_response_task_1, task_1_response.unwrap()); + assert_eq!(expected_response_task_2, task_2_response.unwrap()); } // #[tokio::test] @@ -933,8 +953,7 @@ mod tests { } #[tokio::test] - async fn test_1_quorum_2_operator_1_signatures_50_quorum() { - // 1 quorum 2 operator 1 correct signature quorumThreshold 50% - verified + async fn test_1_quorum_2_operator_1_signatures_50_threshold() { let test_operator_1 = TestOperator { operator_id: U256::from(1).into(), stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]), @@ -1012,8 +1031,11 @@ mod tests { .recv() .await; - assert_eq!(expected_agg_service_response, response.clone().unwrap()); - assert_eq!(task_index, response.unwrap().task_index); + assert_eq!( + expected_agg_service_response, + response.clone().unwrap().unwrap() + ); + assert_eq!(task_index, response.unwrap().unwrap().task_index); } #[tokio::test] @@ -1112,12 +1134,15 @@ mod tests { .recv() .await; - assert_eq!(expected_agg_service_response, response.clone().unwrap()); - assert_eq!(task_index, response.unwrap().task_index); + assert_eq!( + expected_agg_service_response, + response.clone().unwrap().unwrap() + ); + assert_eq!(task_index, response.unwrap().unwrap().task_index); } #[tokio::test] - async fn test_2_quorums_3_operators_which_just_stake_1_quorum_2_correct_signatures() { + async fn test_2_quorums_3_operators_which_just_stake_1_quorum_50_threshold() { let test_operator_1 = TestOperator { operator_id: U256::from(1).into(), // Note the quorums is [0, 1], but operator id 1 just stake 0. @@ -1235,7 +1260,103 @@ mod tests { .recv() .await; - assert_eq!(expected_agg_service_response, response.clone().unwrap()); - assert_eq!(task_index, response.unwrap().task_index); + assert_eq!( + expected_agg_service_response, + response.clone().unwrap().unwrap() + ); + assert_eq!(task_index, response.unwrap().unwrap().task_index); + } + + #[tokio::test] + async fn test_2_quorums_3_operators_which_just_stake_1_quorum_60_threshold() { + // results in `task expired` + let test_operator_1 = TestOperator { + operator_id: U256::from(1).into(), + // Note the quorums is [0, 1], but operator id 1 just stake 0. + stake_per_quorum: HashMap::from([(0u8, U256::from(100))]), + bls_keypair: new_bls_key_pair_panics( + "13710126902690889134622698668747132666439281256983827313388062967626731803599" + .into(), + ), + }; + let test_operator_2 = TestOperator { + operator_id: U256::from(2).into(), + // Note the quorums is [0, 1], but operator id 2 just stake 1. + stake_per_quorum: HashMap::from([(1u8, U256::from(200))]), + bls_keypair: new_bls_key_pair_panics( + "14610126902690889134622698668747132666439281256983827313388062967626731803500" + .into(), + ), + }; + + let test_operator_3 = TestOperator { + operator_id: U256::from(3).into(), + // Note the quorums is [0, 1], but operator id 3 just stake 0. + stake_per_quorum: HashMap::from([(0u8, U256::from(100)), (1u8, U256::from(200))]), + bls_keypair: new_bls_key_pair_panics( + "15710126902690889134622698668747132666439281256983827313388062967626731803599" + .into(), + ), + }; + + let test_operators = vec![ + test_operator_1.clone(), + test_operator_2.clone(), + test_operator_3.clone(), + ]; + let block_number = 1; + let task_index = 0; + let quorum_numbers: Vec = vec![0, 1]; + let quorum_threshold_percentages: QuorumThresholdPercentages = vec![60u8, 60u8]; + let time_to_expiry = Duration::from_secs(1); + let task_response = 123; // Initialize with appropriate data + let task_response_digest = hash(task_response); + + let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators); + let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service); + + bls_agg_service + .initialize_new_task::( + task_index, + block_number as u32, + quorum_numbers, + quorum_threshold_percentages, + time_to_expiry, + ) + .await; + + let bls_sig_op_1 = test_operator_1 + .bls_keypair + .sign_message(task_response_digest.as_ref()); + bls_agg_service + .process_new_signature( + task_index, + task_response_digest, + bls_sig_op_1.clone(), + test_operator_1.operator_id, + ) + .await; + + let bls_sig_op_2 = test_operator_2 + .bls_keypair + .sign_message(task_response_digest.as_ref()); + bls_agg_service + .process_new_signature( + task_index, + task_response_digest, + bls_sig_op_2.clone(), + test_operator_2.operator_id, + ) + .await; + + dbg!("waiting response"); + let response = bls_agg_service + .aggregated_response_receiver + .lock() + .await + .recv() + .await; + + assert_eq!(Err(BlsAggregationServiceError::Timeout), response.unwrap()); } }