diff --git a/services/bls_aggregation/blsagg.go b/services/bls_aggregation/blsagg.go index 840b8499d..90bf5ff6f 100644 --- a/services/bls_aggregation/blsagg.go +++ b/services/bls_aggregation/blsagg.go @@ -81,12 +81,13 @@ type aggregatedOperators struct { // BlsAggregationService is the interface provided to avs aggregator code for doing bls aggregation // Currently its only implementation is the BlsAggregatorService, so see the comment there for more details type BlsAggregationService interface { - // InitializeNewTask should be called whenever a new task is created. ProcessNewSignature will return an error - // if the task it is trying to process has not been initialized yet. - // quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered complete, which + // InitializeNewTask creates a new task goroutine meant to process new signed task responses for that task + // (that are sent via ProcessNewSignature) and adds a channel to a.taskChans to send the signed task responses to + // it. The quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered + // complete, which // happens when a particular TaskResponseDigest (received via the a.taskChans[taskIndex]) has been signed by signers - // whose stake - // in each of the listed quorums adds up to at least quorumThresholdPercentages[i] of the total stake in that quorum + // whose stake in each of the listed quorums adds up to at least quorumThresholdPercentages[i] of the total stake in + // that quorum InitializeNewTask( taskIndex types.TaskIndex, taskCreatedBlock uint32, @@ -95,6 +96,27 @@ type BlsAggregationService interface { timeToExpiry time.Duration, ) error + // InitializeNewTaskWithWindow creates a new task goroutine meant to process new signed task responses for that task + // (that are sent via ProcessNewSignature) and adds a channel to a.taskChans to send the signed task responses to + // it. The quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered + // complete, which + // happens when a particular TaskResponseDigest (received via the a.taskChans[taskIndex]) has been signed by signers + // whose stake in each of the listed quorums adds up to at least quorumThresholdPercentages[i] of the total stake in + // that quorum. + // Once the quorum is reached, the task is still open for a window of `windowDuration` time to receive more + // signatures, + // before sending the aggregation response through the aggregatedResponsesC channel. + // If the task expiration is reached before the window finishes, the task response will still be sent to the + // aggregatedResponsesC channel. + InitializeNewTaskWithWindow( + taskIndex types.TaskIndex, + taskCreatedBlock uint32, + quorumNumbers types.QuorumNums, + quorumThresholdPercentages types.QuorumThresholdPercentages, + timeToExpiry time.Duration, + windowDuration time.Duration, + ) error + // ProcessNewSignature processes a new signature over a taskResponseDigest for a particular taskIndex by a // particular operator It verifies that the signature is correct and returns an error if it is not, and then // aggregates the signature and stake of @@ -185,17 +207,43 @@ func (a *BlsAggregatorService) GetResponseChannel() <-chan BlsAggregationService } // InitializeNewTask creates a new task goroutine meant to process new signed task responses for that task -// (that are sent via ProcessNewSignature) and adds a channel to a.taskChans to send the signed task responses to it -// quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered complete, which -// happens -// when a particular TaskResponseDigest (received via the a.taskChans[taskIndex]) has been signed by signers whose stake -// in each of the listed quorums adds up to at least quorumThresholdPercentages[i] of the total stake in that quorum +// (that are sent via ProcessNewSignature) and adds a channel to a.taskChans to send the signed task responses to it. +// The quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered complete, which +// happens when a particular TaskResponseDigest (received via the a.taskChans[taskIndex]) has been signed by signers +// whose stake in each of the listed quorums adds up to at least quorumThresholdPercentages[i] of the total stake in +// that quorum func (a *BlsAggregatorService) InitializeNewTask( taskIndex types.TaskIndex, taskCreatedBlock uint32, quorumNumbers types.QuorumNums, quorumThresholdPercentages types.QuorumThresholdPercentages, timeToExpiry time.Duration, +) error { + return a.InitializeNewTaskWithWindow( + taskIndex, + taskCreatedBlock, + quorumNumbers, + quorumThresholdPercentages, + timeToExpiry, + 0, + ) +} + +// InitializeNewTaskWithWindow creates a new task goroutine meant to process new signed task responses for that task +// (that are sent via ProcessNewSignature) and adds a channel to a.taskChans to send the signed task responses to it. +// The quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered complete, which +// happens when a particular TaskResponseDigest (received via the a.taskChans[taskIndex]) has been signed by signers +// whose stake in each of the listed quorums adds up to at least quorumThresholdPercentages[i] of the total stake in +// that quorum. +// Once the quorum is reached, the task is still open for a window of `windowDuration` time to receive more signatures, +// before sending the aggregation response through the aggregatedResponsesC channel. +func (a *BlsAggregatorService) InitializeNewTaskWithWindow( + taskIndex types.TaskIndex, + taskCreatedBlock uint32, + quorumNumbers types.QuorumNums, + quorumThresholdPercentages types.QuorumThresholdPercentages, + timeToExpiry time.Duration, + windowDuration time.Duration, ) error { a.logger.Debug( "AggregatorService initializing new task", @@ -225,6 +273,7 @@ func (a *BlsAggregatorService) InitializeNewTask( quorumNumbers, quorumThresholdPercentages, timeToExpiry, + windowDuration, signedTaskRespsC, ) return nil @@ -271,6 +320,7 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc( quorumNumbers types.QuorumNums, quorumThresholdPercentages []types.QuorumThresholdPercentage, timeToExpiry time.Duration, + windowDuration time.Duration, signedTaskRespsC <-chan types.SignedTaskResponseDigest, ) { a.logger.Debug("AggregatorService goroutine processing new task", @@ -342,6 +392,13 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc( taskExpiredTimer := time.NewTimer(timeToExpiry) aggregatedOperatorsDict := map[types.TaskResponseDigest]aggregatedOperators{} + // The windowTimer is initialized to be longer than the taskExpiredTimer as it will + // be overwritten once the stake threshold is met + windowTimer := time.NewTimer(timeToExpiry + 1*time.Second) + openWindow := false + var lastSignedTaskResponseDigest types.SignedTaskResponseDigest + var lastDigestAggregatedOperators aggregatedOperators + var lastTaskResponseDigest types.TaskResponseDigest for { select { case signedTaskResponseDigest := <-signedTaskRespsC: @@ -415,11 +472,17 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc( digestAggregatedOperators.signersTotalStakePerQuorum[quorumNum].Add(digestAggregatedOperators.signersTotalStakePerQuorum[quorumNum], stake) } } + + // update the buffer variables to be used when the window timer fires + lastDigestAggregatedOperators = digestAggregatedOperators + lastTaskResponseDigest = taskResponseDigest + lastSignedTaskResponseDigest = signedTaskResponseDigest + // update the aggregatedOperatorsDict. Note that we need to assign the whole struct value at once, // because of https://github.com/golang/go/issues/3117 aggregatedOperatorsDict[taskResponseDigest] = digestAggregatedOperators - if checkIfStakeThresholdsMet( + if !openWindow && checkIfStakeThresholdsMet( a.logger, digestAggregatedOperators.signersTotalStakePerQuorum, totalStakePerQuorum, @@ -429,67 +492,105 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc( "taskIndex", taskIndex, "taskResponseDigest", taskResponseDigest) - nonSignersOperatorIds := []types.OperatorId{} - for operatorId := range operatorsAvsStateDict { - if _, operatorSigned := digestAggregatedOperators.signersOperatorIdsSet[operatorId]; !operatorSigned { - nonSignersOperatorIds = append(nonSignersOperatorIds, operatorId) - } - } - - // the contract requires a sorted nonSignersOperatorIds - sort.SliceStable(nonSignersOperatorIds, func(i, j int) bool { - iOprInt := new(big.Int).SetBytes(nonSignersOperatorIds[i][:]) - jOprInt := new(big.Int).SetBytes(nonSignersOperatorIds[j][:]) - return iOprInt.Cmp(jOprInt) == -1 - }) - - nonSignersG1Pubkeys := []*bls.G1Point{} - for _, operatorId := range nonSignersOperatorIds { - operator := operatorsAvsStateDict[operatorId] - nonSignersG1Pubkeys = append(nonSignersG1Pubkeys, operator.OperatorInfo.Pubkeys.G1Pubkey) - } - - indices, err := a.avsRegistryService.GetCheckSignaturesIndices( - &bind.CallOpts{}, + openWindow = true + windowTimer = time.NewTimer(windowDuration) + a.logger.Debug("Window timer started") + } + case <-taskExpiredTimer.C: + if openWindow { + a.sendAggregatedResponse( + operatorsAvsStateDict, + taskIndex, taskCreatedBlock, + lastSignedTaskResponseDigest, + lastDigestAggregatedOperators, quorumNumbers, - nonSignersOperatorIds, + lastTaskResponseDigest, + quorumApksG1, ) - if err != nil { - a.aggregatedResponsesC <- BlsAggregationServiceResponse{ - Err: utils.WrapError(errors.New("Failed to get check signatures indices"), err), - TaskIndex: taskIndex, - } - return - } - - blsAggregationServiceResponse := BlsAggregationServiceResponse{ - Err: nil, - TaskIndex: taskIndex, - TaskResponse: signedTaskResponseDigest.TaskResponse, - TaskResponseDigest: taskResponseDigest, - NonSignersPubkeysG1: nonSignersG1Pubkeys, - QuorumApksG1: quorumApksG1, - SignersApkG2: digestAggregatedOperators.signersApkG2, - SignersAggSigG1: digestAggregatedOperators.signersAggSigG1, - NonSignerQuorumBitmapIndices: indices.NonSignerQuorumBitmapIndices, - QuorumApkIndices: indices.QuorumApkIndices, - TotalStakeIndices: indices.TotalStakeIndices, - NonSignerStakeIndices: indices.NonSignerStakeIndices, - } - a.aggregatedResponsesC <- blsAggregationServiceResponse - taskExpiredTimer.Stop() - return } - case <-taskExpiredTimer.C: + a.aggregatedResponsesC <- BlsAggregationServiceResponse{ Err: TaskExpiredErrorFn(taskIndex), TaskIndex: taskIndex, } return + case <-windowTimer.C: + a.logger.Debug("Window timer expired") + a.sendAggregatedResponse( + operatorsAvsStateDict, + taskIndex, + taskCreatedBlock, + lastSignedTaskResponseDigest, + lastDigestAggregatedOperators, + quorumNumbers, + lastTaskResponseDigest, + quorumApksG1, + ) + return } } +} +func (a *BlsAggregatorService) sendAggregatedResponse( + operatorsAvsStateDict map[types.OperatorId]types.OperatorAvsState, + taskIndex types.TaskIndex, + taskCreatedBlock uint32, + signedTaskResponseDigest types.SignedTaskResponseDigest, + digestAggregatedOperators aggregatedOperators, + quorumNumbers types.QuorumNums, + taskResponseDigest types.TaskResponseDigest, + quorumApksG1 []*bls.G1Point, +) { + nonSignersOperatorIds := []types.OperatorId{} + for operatorId := range operatorsAvsStateDict { + if _, operatorSigned := digestAggregatedOperators.signersOperatorIdsSet[operatorId]; !operatorSigned { + nonSignersOperatorIds = append(nonSignersOperatorIds, operatorId) + } + } + + // the contract requires a sorted nonSignersOperatorIds + sort.SliceStable(nonSignersOperatorIds, func(i, j int) bool { + iOprInt := new(big.Int).SetBytes(nonSignersOperatorIds[i][:]) + jOprInt := new(big.Int).SetBytes(nonSignersOperatorIds[j][:]) + return iOprInt.Cmp(jOprInt) == -1 + }) + + nonSignersG1Pubkeys := []*bls.G1Point{} + for _, operatorId := range nonSignersOperatorIds { + operator := operatorsAvsStateDict[operatorId] + nonSignersG1Pubkeys = append(nonSignersG1Pubkeys, operator.OperatorInfo.Pubkeys.G1Pubkey) + } + + indices, err := a.avsRegistryService.GetCheckSignaturesIndices( + &bind.CallOpts{}, + taskCreatedBlock, + quorumNumbers, + nonSignersOperatorIds, + ) + if err != nil { + a.aggregatedResponsesC <- BlsAggregationServiceResponse{ + Err: utils.WrapError(errors.New("Failed to get check signatures indices"), err), + TaskIndex: taskIndex, + } + return + } + + blsAggregationServiceResponse := BlsAggregationServiceResponse{ + Err: nil, + TaskIndex: taskIndex, + TaskResponse: signedTaskResponseDigest.TaskResponse, + TaskResponseDigest: taskResponseDigest, + NonSignersPubkeysG1: nonSignersG1Pubkeys, + QuorumApksG1: quorumApksG1, + SignersApkG2: digestAggregatedOperators.signersApkG2, + SignersAggSigG1: digestAggregatedOperators.signersAggSigG1, + NonSignerQuorumBitmapIndices: indices.NonSignerQuorumBitmapIndices, + QuorumApkIndices: indices.QuorumApkIndices, + TotalStakeIndices: indices.TotalStakeIndices, + NonSignerStakeIndices: indices.NonSignerStakeIndices, + } + a.aggregatedResponsesC <- blsAggregationServiceResponse } // closeTaskGoroutine is run when the goroutine processing taskIndex's task responses ends (for whatever reason) diff --git a/services/bls_aggregation/blsagg_test.go b/services/bls_aggregation/blsagg_test.go index fd10338c1..72d5b0f24 100644 --- a/services/bls_aggregation/blsagg_test.go +++ b/services/bls_aggregation/blsagg_test.go @@ -1188,6 +1188,423 @@ func TestBlsAgg(t *testing.T) { require.EqualError(t, err, "Signature verification failed. Incorrect Signature.") }, ) + + t.Run("signatures are processed during window after quorum", func(t *testing.T) { + testOperator1 := types.TestOperator{ + OperatorId: types.OperatorId{1}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x1"), + } + testOperator2 := types.TestOperator{ + OperatorId: types.OperatorId{2}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x2"), + } + testOperator3 := types.TestOperator{ + OperatorId: types.OperatorId{3}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100)}, + BlsKeypair: newBlsKeyPairPanics("0x3"), + } + blockNum := uint32(1) + taskIndex := types.TaskIndex(0) + quorumNumbers := types.QuorumNums{0} + quorumThresholdPercentages := []types.QuorumThresholdPercentage{67} + taskResponse := mockTaskResponse{123} + + timeToExpiry := 5 * time.Second + windowDuration := 1 * time.Second + + taskResponseDigest, err := hashFunction(taskResponse) + require.Nil(t, err) + + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1, testOperator2, testOperator3}, + ) + logger := testutils.GetTestLogger() + blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) + + start := time.Now() + err = blsAggServ.InitializeNewTaskWithWindow( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + timeToExpiry, + windowDuration, + ) + require.Nil(t, err) + + blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp1, + testOperator1.OperatorId, + ) + require.Nil(t, err) + blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp2, + testOperator2.OperatorId, + ) + // quorum has already been reached, but window should still be open + require.Nil(t, err) + blsSigOp3 := testOperator3.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp3, + testOperator3.OperatorId, + ) + require.Nil(t, err) + + wantAggregationServiceResponse := BlsAggregationServiceResponse{ + Err: nil, + TaskIndex: taskIndex, + TaskResponse: taskResponse, + TaskResponseDigest: taskResponseDigest, + NonSignersPubkeysG1: []*bls.G1Point{}, + QuorumApksG1: []*bls.G1Point{testOperator1.BlsKeypair.GetPubKeyG1(). + Add(testOperator2.BlsKeypair.GetPubKeyG1()). + Add(testOperator3.BlsKeypair.GetPubKeyG1()), + }, + SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2(). + Add(testOperator2.BlsKeypair.GetPubKeyG2()). + Add(testOperator3.BlsKeypair.GetPubKeyG2()), + SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest). + Add(testOperator2.BlsKeypair.SignMessage(taskResponseDigest)). + Add(testOperator3.BlsKeypair.SignMessage(taskResponseDigest)), + } + gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC + require.Equal(t, wantAggregationServiceResponse, gotAggregationServiceResponse) + require.EqualValues(t, taskIndex, gotAggregationServiceResponse.TaskIndex) + elapsed := time.Since(start) + t.Log("elapsed: ", elapsed.Seconds()) + require.True( + t, + elapsed.Seconds() >= windowDuration.Seconds(), + "The aggregation response should be sent after the window finishes", + ) + require.True(t, elapsed.Seconds() < timeToExpiry.Seconds()) + }) + + t.Run("if quorum has been reached and the task expires during window, the response is sent", func(t *testing.T) { + testOperator1 := types.TestOperator{ + OperatorId: types.OperatorId{1}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x1"), + } + testOperator2 := types.TestOperator{ + OperatorId: types.OperatorId{2}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x2"), + } + testOperator3 := types.TestOperator{ + OperatorId: types.OperatorId{3}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(100)}, + BlsKeypair: newBlsKeyPairPanics("0x3"), + } + blockNum := uint32(1) + taskIndex := types.TaskIndex(0) + quorumNumbers := types.QuorumNums{0} + quorumThresholdPercentages := []types.QuorumThresholdPercentage{50} + taskResponse := mockTaskResponse{123} + + timeToExpiry := 5 * time.Second + windowDuration := 6 * time.Second + + taskResponseDigest, err := hashFunction(taskResponse) + require.Nil(t, err) + + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1, testOperator2, testOperator3}, + ) + logger := testutils.GetTestLogger() + blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) + + start := time.Now() + err = blsAggServ.InitializeNewTaskWithWindow( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + timeToExpiry, + windowDuration, + ) + require.Nil(t, err) + + blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp1, + testOperator1.OperatorId, + ) + require.Nil(t, err) + blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp2, + testOperator2.OperatorId, + ) + require.Nil(t, err) + + // quorum has already been reached, window will be open and receiving signature until the task expires + + blsSigOp3 := testOperator3.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp3, + testOperator3.OperatorId, + ) + require.Nil(t, err) + + wantAggregationServiceResponse := BlsAggregationServiceResponse{ + Err: nil, + TaskIndex: taskIndex, + TaskResponse: taskResponse, + TaskResponseDigest: taskResponseDigest, + NonSignersPubkeysG1: []*bls.G1Point{}, + QuorumApksG1: []*bls.G1Point{testOperator1.BlsKeypair.GetPubKeyG1(). + Add(testOperator2.BlsKeypair.GetPubKeyG1()). + Add(testOperator3.BlsKeypair.GetPubKeyG1()), + }, + SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2(). + Add(testOperator2.BlsKeypair.GetPubKeyG2()). + Add(testOperator3.BlsKeypair.GetPubKeyG2()), + SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest). + Add(testOperator2.BlsKeypair.SignMessage(taskResponseDigest)). + Add(testOperator3.BlsKeypair.SignMessage(taskResponseDigest)), + } + gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC + require.Equal(t, wantAggregationServiceResponse, gotAggregationServiceResponse) + require.EqualValues(t, taskIndex, gotAggregationServiceResponse.TaskIndex) + elapsed := time.Since(start) + t.Log("elapsed: ", elapsed.Seconds()) + require.True(t, elapsed.Seconds() >= timeToExpiry.Seconds()) + require.True(t, elapsed.Seconds() < windowDuration.Seconds()) + }) + + t.Run("if window duration is zero, no signatures are aggregated after reaching quorum", func(t *testing.T) { + testOperator1 := types.TestOperator{ + OperatorId: types.OperatorId{1}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x1"), + } + testOperator2 := types.TestOperator{ + OperatorId: types.OperatorId{2}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x2"), + } + testOperator3 := types.TestOperator{ + OperatorId: types.OperatorId{3}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(100)}, + BlsKeypair: newBlsKeyPairPanics("0x3"), + } + blockNum := uint32(1) + taskIndex := types.TaskIndex(0) + quorumNumbers := types.QuorumNums{0} + quorumThresholdPercentages := []types.QuorumThresholdPercentage{50} + taskResponse := mockTaskResponse{123} + + timeToExpiry := 5 * time.Second + windowDuration := 0 * time.Second + + taskResponseDigest, err := hashFunction(taskResponse) + require.Nil(t, err) + + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1, testOperator2, testOperator3}, + ) + logger := testutils.GetTestLogger() + blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) + + err = blsAggServ.InitializeNewTaskWithWindow( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + timeToExpiry, + windowDuration, + ) + require.Nil(t, err) + + start := time.Now() + + blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp1, + testOperator1.OperatorId, + ) + require.Nil(t, err) + blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp2, + testOperator2.OperatorId, + ) + require.Nil(t, err) + + time.Sleep(1 * time.Millisecond) + // quorum has already been reached, next signatures should not be aggregated + // this should timeout as the task goroutine is blocked on the response channel + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + blsSigOp3 := testOperator3.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + ctx, + taskIndex, + taskResponse, + blsSigOp3, + testOperator3.OperatorId, + ) + require.Equal(t, context.DeadlineExceeded, err) + + wantAggregationServiceResponse := BlsAggregationServiceResponse{ + Err: nil, + TaskIndex: taskIndex, + TaskResponse: taskResponse, + TaskResponseDigest: taskResponseDigest, + NonSignersPubkeysG1: []*bls.G1Point{testOperator3.BlsKeypair.GetPubKeyG1()}, + QuorumApksG1: []*bls.G1Point{testOperator1.BlsKeypair.GetPubKeyG1(). + Add(testOperator2.BlsKeypair.GetPubKeyG1()). + Add(testOperator3.BlsKeypair.GetPubKeyG1()), + }, + SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2(). + Add(testOperator2.BlsKeypair.GetPubKeyG2()), + SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest). + Add(testOperator2.BlsKeypair.SignMessage(taskResponseDigest)), + } + gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC + require.Equal(t, wantAggregationServiceResponse, gotAggregationServiceResponse) + require.EqualValues(t, taskIndex, gotAggregationServiceResponse.TaskIndex) + elapsed := time.Since(start) + t.Log("elapsed: ", elapsed.Seconds()) + require.True(t, elapsed.Seconds() < timeToExpiry.Seconds()) + }) + + t.Run("no signatures are aggregated after window", func(t *testing.T) { + testOperator1 := types.TestOperator{ + OperatorId: types.OperatorId{1}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x1"), + } + testOperator2 := types.TestOperator{ + OperatorId: types.OperatorId{2}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(200)}, + BlsKeypair: newBlsKeyPairPanics("0x2"), + } + testOperator3 := types.TestOperator{ + OperatorId: types.OperatorId{3}, + StakePerQuorum: map[types.QuorumNum]types.StakeAmount{0: big.NewInt(100), 1: big.NewInt(100)}, + BlsKeypair: newBlsKeyPairPanics("0x3"), + } + blockNum := uint32(1) + taskIndex := types.TaskIndex(0) + quorumNumbers := types.QuorumNums{0} + quorumThresholdPercentages := []types.QuorumThresholdPercentage{50} + taskResponse := mockTaskResponse{123} + + timeToExpiry := 10 * time.Second + windowDuration := 1 * time.Second + + taskResponseDigest, err := hashFunction(taskResponse) + require.Nil(t, err) + + fakeAvsRegistryService := avsregistry.NewFakeAvsRegistryService( + blockNum, + []types.TestOperator{testOperator1, testOperator2, testOperator3}, + ) + logger := testutils.GetTestLogger() + blsAggServ := NewBlsAggregatorService(fakeAvsRegistryService, hashFunction, logger) + + err = blsAggServ.InitializeNewTaskWithWindow( + taskIndex, + blockNum, + quorumNumbers, + quorumThresholdPercentages, + timeToExpiry, + windowDuration, + ) + require.Nil(t, err) + + start := time.Now() + + blsSigOp1 := testOperator1.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp1, + testOperator1.OperatorId, + ) + require.Nil(t, err) + blsSigOp2 := testOperator2.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + context.Background(), + taskIndex, + taskResponse, + blsSigOp2, + testOperator2.OperatorId, + ) + require.Nil(t, err) + + time.Sleep(2 * time.Second) + + // quorum has already been reached, next signatures should not be aggregated + // this should timeout as the task goroutine is blocked on the response channel + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + blsSigOp3 := testOperator3.BlsKeypair.SignMessage(taskResponseDigest) + err = blsAggServ.ProcessNewSignature( + ctx, + taskIndex, + taskResponse, + blsSigOp3, + testOperator3.OperatorId, + ) + require.Equal(t, context.DeadlineExceeded, err) + + wantAggregationServiceResponse := BlsAggregationServiceResponse{ + Err: nil, + TaskIndex: taskIndex, + TaskResponse: taskResponse, + TaskResponseDigest: taskResponseDigest, + NonSignersPubkeysG1: []*bls.G1Point{testOperator3.BlsKeypair.GetPubKeyG1()}, + QuorumApksG1: []*bls.G1Point{testOperator1.BlsKeypair.GetPubKeyG1(). + Add(testOperator2.BlsKeypair.GetPubKeyG1()). + Add(testOperator3.BlsKeypair.GetPubKeyG1()), + }, + SignersApkG2: testOperator1.BlsKeypair.GetPubKeyG2(). + Add(testOperator2.BlsKeypair.GetPubKeyG2()), + SignersAggSigG1: testOperator1.BlsKeypair.SignMessage(taskResponseDigest). + Add(testOperator2.BlsKeypair.SignMessage(taskResponseDigest)), + } + gotAggregationServiceResponse := <-blsAggServ.aggregatedResponsesC + require.Equal(t, wantAggregationServiceResponse, gotAggregationServiceResponse) + require.EqualValues(t, taskIndex, gotAggregationServiceResponse.TaskIndex) + elapsed := time.Since(start) + t.Log("elapsed: ", elapsed.Seconds()) + require.True(t, elapsed.Seconds() < timeToExpiry.Seconds()) + }) } func TestIntegrationBlsAgg(t *testing.T) {