Skip to content

Commit

Permalink
Make built-in indexer work for retrieval client (#110)
Browse files Browse the repository at this point in the history
Co-authored-by: Wellington Barbosa <wellington@eigenlabs.org>
Co-authored-by: Jian Xiao <iamjianxiao@gmail.com>
  • Loading branch information
3 people authored Dec 12, 2023
1 parent 319d87c commit 03cd96a
Show file tree
Hide file tree
Showing 15 changed files with 309 additions and 104 deletions.
5 changes: 5 additions & 0 deletions clients/mock/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ func NewRetrievalClient() *MockRetrievalClient {
return &MockRetrievalClient{}
}

func (c *MockRetrievalClient) StartIndexingChainState(ctx context.Context) error {
args := c.Called()
return args.Error(0)
}

func (c *MockRetrievalClient) RetrieveBlob(
ctx context.Context,
batchHeaderHash [32]byte,
Expand Down
19 changes: 15 additions & 4 deletions clients/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"github.com/gammazero/workerpool"
"github.com/wealdtech/go-merkletree"
"github.com/wealdtech/go-merkletree/keccak256"

coreindexer "github.com/Layr-Labs/eigenda/core/indexer"
"github.com/Layr-Labs/eigenda/indexer"
)

type RetrievalClient interface {
Expand All @@ -35,20 +38,28 @@ var _ RetrievalClient = (*retrievalClient)(nil)

func NewRetrievalClient(
logger common.Logger,
indexedChainState core.IndexedChainState,
chainState core.ChainState,
indexer indexer.Indexer,
assignmentCoordinator core.AssignmentCoordinator,
nodeClient NodeClient,
encoder core.Encoder,
numConnections int,
) *retrievalClient {
) (*retrievalClient, error) {
indexedState, err := coreindexer.NewIndexedChainState(
chainState,
indexer,
)
if err != nil {
return nil, err
}
return &retrievalClient{
logger: logger,
indexedChainState: indexedChainState,
indexedChainState: indexedState,
assignmentCoordinator: assignmentCoordinator,
nodeClient: nodeClient,
encoder: encoder,
numConnections: numConnections,
}
}, nil
}

func (r *retrievalClient) RetrieveBlob(
Expand Down
71 changes: 68 additions & 3 deletions clients/tests/retrieval_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (
"github.com/Layr-Labs/eigenda/common/logging"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/encoding"
coreindexer "github.com/Layr-Labs/eigenda/core/indexer"
coremock "github.com/Layr-Labs/eigenda/core/mock"
indexermock "github.com/Layr-Labs/eigenda/indexer/mock"
"github.com/Layr-Labs/eigenda/pkg/encoding/kzgEncoder"
"github.com/consensys/gnark-crypto/ecc/bn254"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/wealdtech/go-merkletree"
Expand Down Expand Up @@ -42,6 +45,9 @@ func makeTestEncoder() (core.Encoder, error) {

var (
indexedChainState core.IndexedChainState
chainState core.ChainState
indexer *indexermock.MockIndexer
operatorState *core.OperatorState
nodeClient *clientsmock.MockNodeClient
coordinator *core.StdAssignmentCoordinator
retrievalClient clients.RetrievalClient
Expand All @@ -55,11 +61,16 @@ var (
func setup(t *testing.T) {

var err error
indexedChainState, err = coremock.NewChainDataMock(core.OperatorIndex(numOperators))
chainState, err = coremock.NewChainDataMock(core.OperatorIndex(numOperators))
if err != nil {
t.Fatalf("failed to create new mocked chain data: %s", err)
}

indexedChainState, err = coremock.NewChainDataMock(core.OperatorIndex(numOperators))
if err != nil {
t.Fatalf("failed to create new mocked indexed chain data: %s", err)
}

nodeClient = clientsmock.NewNodeClient()
coordinator = &core.StdAssignmentCoordinator{}
encoder, err := makeTestEncoder()
Expand All @@ -70,7 +81,17 @@ func setup(t *testing.T) {
if err != nil {
panic("failed to create a new logger")
}
retrievalClient = clients.NewRetrievalClient(logger, indexedChainState, coordinator, nodeClient, encoder, 2)

indexer = &indexermock.MockIndexer{}
indexer.On("Index").Return(nil).Once()
retrievalClient, err = clients.NewRetrievalClient(logger, chainState, indexer, coordinator, nodeClient, encoder, 2)
if err != nil {
panic("failed to create a new retrieval client")
}
err = indexer.Index(context.Background())
if err != nil {
panic("failed to start indexing")
}

var (
quorumID core.QuorumID = 0
Expand All @@ -90,7 +111,7 @@ func setup(t *testing.T) {
},
Data: gettysburgAddressBytes,
}
operatorState, err := indexedChainState.GetOperatorState(context.Background(), (0), []core.QuorumID{quorumID})
operatorState, err = indexedChainState.GetOperatorState(context.Background(), (0), []core.QuorumID{quorumID})
if err != nil {
t.Fatalf("failed to get operator state: %s", err)
}
Expand Down Expand Up @@ -167,6 +188,38 @@ func setup(t *testing.T) {

}

func mustMakeOpertatorPubKeysPair(t *testing.T) *coreindexer.OperatorPubKeys {
operators := make(map[[32]byte]coreindexer.OperatorPubKeysPair, len(operatorState.Operators))
for operatorId := range operatorState.Operators[0] {
keyPair, err := core.GenRandomBlsKeys()
if err != nil {
t.Fatalf("Generating random BLS keys Error: %s", err.Error())
}
operators[operatorId] = coreindexer.OperatorPubKeysPair{
PubKeyG1: keyPair.PubKey.G1Affine,
PubKeyG2: keyPair.GetPubKeyG2().G2Affine,
}
}
keyPair, err := core.GenRandomBlsKeys()
if err != nil {
t.Fatalf("Generating random BLS keys Error: %s", err.Error())
}
return &coreindexer.OperatorPubKeys{
Operators: operators,
QuorumTotals: map[core.QuorumID]*bn254.G1Affine{
0: keyPair.PubKey.G1Affine,
},
}
}

func musMakeOperatorSocket(t *testing.T) coreindexer.OperatorSockets {
operatorSocket := make(coreindexer.OperatorSockets, len(operatorState.Operators))
for operatorId := range operatorState.Operators[0] {
operatorSocket[operatorId] = "test"
}
return operatorSocket
}

func TestInvalidBlobHeader(t *testing.T) {

setup(t)
Expand All @@ -177,6 +230,12 @@ func TestInvalidBlobHeader(t *testing.T) {
On("GetChunks", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(encodedBlob)

operatorPubKeys := mustMakeOpertatorPubKeysPair(t)
operatorSocket := musMakeOperatorSocket(t)

indexer.On("GetObject", mock.Anything, 0).Return(operatorPubKeys, nil).Once()
indexer.On("GetObject", mock.Anything, 1).Return(operatorSocket, nil).Once()

_, err := retrievalClient.RetrieveBlob(context.Background(), batchHeaderHash, 0, 0, batchRoot, 0)
assert.ErrorContains(t, err, "failed to get blob header from all operators")

Expand All @@ -192,6 +251,12 @@ func TestValidBlobHeader(t *testing.T) {
On("GetChunks", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(encodedBlob)

operatorPubKeys := mustMakeOpertatorPubKeysPair(t)
operatorSocket := musMakeOperatorSocket(t)

indexer.On("GetObject", mock.Anything, 0).Return(operatorPubKeys, nil).Once()
indexer.On("GetObject", mock.Anything, 1).Return(operatorSocket, nil).Once()

data, err := retrievalClient.RetrieveBlob(context.Background(), batchHeaderHash, 0, 0, batchRoot, 0)
assert.NoError(t, err)
recovered := bytes.TrimRight(data, "\x00")
Expand Down
59 changes: 59 additions & 0 deletions core/indexer/indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package indexer

import (
"fmt"

dacommon "github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/indexer"
indexereth "github.com/Layr-Labs/eigenda/indexer/eth"
inmemstore "github.com/Layr-Labs/eigenda/indexer/inmem"
"github.com/ethereum/go-ethereum/common"
)

func CreateNewIndexer(
config *indexer.Config,
gethClient dacommon.EthClient,
rpcClient dacommon.RPCEthClient,
eigenDAServiceManagerAddr string,
logger dacommon.Logger,
) (indexer.Indexer, error) {

eigenDAServiceManager := common.HexToAddress(eigenDAServiceManagerAddr)

pubKeyFilterer, err := NewOperatorPubKeysFilterer(eigenDAServiceManager, gethClient)
if err != nil {
return nil, fmt.Errorf("failed to create new operator pubkeys filter: %w", err)
}

socketsFilterer, err := NewOperatorSocketsFilterer(eigenDAServiceManager, gethClient)
if err != nil {
return nil, fmt.Errorf("failed to create new operator sockets filter: %w", err)
}

handlers := []indexer.AccumulatorHandler{
{
Acc: NewOperatorPubKeysAccumulator(logger),
Filterer: pubKeyFilterer,
Status: indexer.Good,
},
{
Acc: NewOperatorSocketsAccumulator(logger),
Filterer: socketsFilterer,
Status: indexer.Good,
},
}

var (
upgrader = &Upgrader{}
headerStore = inmemstore.NewHeaderStore()
headerSrvc = indexereth.NewHeaderService(logger, rpcClient)
)
return indexer.New(
config,
handlers,
headerSrvc,
headerStore,
upgrader,
logger,
), nil
}
54 changes: 5 additions & 49 deletions core/indexer/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,67 +4,23 @@ import (
"context"
"errors"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/indexer"
"github.com/Layr-Labs/eigenda/indexer/eth"
gethcommon "github.com/ethereum/go-ethereum/common"
)

type IndexedChainState struct {
core.ChainState

Indexer *indexer.Indexer
Indexer indexer.Indexer
}

var _ core.IndexedChainState = (*IndexedChainState)(nil)

// TODO: Pass in dependencies instead of creating them here

func NewIndexedChainState(
config *indexer.Config,
eigenDAServiceManagerAddr gethcommon.Address,
chainState core.ChainState,
headerStore indexer.HeaderStore,
client common.EthClient,
rpcClient common.RPCEthClient,
logger common.Logger,
indexer indexer.Indexer,
) (*IndexedChainState, error) {

pubKeyFilterer, err := NewOperatorPubKeysFilterer(eigenDAServiceManagerAddr, client)
if err != nil {
return nil, err
}

socketsFilterer, err := NewOperatorSocketsFilterer(eigenDAServiceManagerAddr, client)
if err != nil {
return nil, err
}

handlers := []indexer.AccumulatorHandler{
{
Acc: NewOperatorPubKeysAccumulator(logger),
Filterer: pubKeyFilterer,
Status: indexer.Good,
},
{
Acc: NewOperatorSocketsAccumulator(logger),
Filterer: socketsFilterer,
Status: indexer.Good,
},
}

headerSrvc := eth.NewHeaderService(logger, rpcClient)
upgrader := &Upgrader{}
indexer := indexer.NewIndexer(
config,
handlers,
headerSrvc,
headerStore,
upgrader,
logger,
)

return &IndexedChainState{
ChainState: chainState,
Indexer: indexer,
Expand Down Expand Up @@ -121,7 +77,7 @@ func (ics *IndexedChainState) GetIndexedOperatorState(ctx context.Context, block
}

func (ics *IndexedChainState) GetCurrentBlockNumber() (uint, error) {
header, err := ics.Indexer.HeaderStore.GetLatestHeader(false)
header, err := ics.Indexer.GetLatestHeader(false)
if err != nil {
return 0, err
}
Expand All @@ -134,7 +90,7 @@ func (ics *IndexedChainState) getObjects(blockNumber uint) (*OperatorPubKeys, Op
Number: uint64(blockNumber),
}

obj, _, err := ics.Indexer.HeaderStore.GetObject(queryHeader, ics.Indexer.Handlers[0].Acc)
obj, err := ics.Indexer.GetObject(queryHeader, 0)
if err != nil {
return nil, nil, err
}
Expand All @@ -144,7 +100,7 @@ func (ics *IndexedChainState) getObjects(blockNumber uint) (*OperatorPubKeys, Op
return nil, nil, ErrWrongObjectFromIndexer
}

obj, _, err = ics.Indexer.HeaderStore.GetObject(queryHeader, ics.Indexer.Handlers[1].Acc)
obj, err = ics.Indexer.GetObject(queryHeader, 1)
if err != nil {
return nil, nil, err
}
Expand Down
Loading

0 comments on commit 03cd96a

Please sign in to comment.