From 6b551f865e2e689331e65cea37b120c02aa2947c Mon Sep 17 00:00:00 2001 From: litt3 <102969658+litt3@users.noreply.github.com> Date: Mon, 24 Feb 2025 17:32:50 -0500 Subject: [PATCH 01/13] Draft relay URL provider Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com> --- api/clients/v2/relay_url_provider.go | 97 ++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 api/clients/v2/relay_url_provider.go diff --git a/api/clients/v2/relay_url_provider.go b/api/clients/v2/relay_url_provider.go new file mode 100644 index 0000000000..a24b7d3410 --- /dev/null +++ b/api/clients/v2/relay_url_provider.go @@ -0,0 +1,97 @@ +package clients + +import ( + "context" + "fmt" + "sync" + + "github.com/Layr-Labs/eigenda/common" + relayRegistryBindings "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDARelayRegistry" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + gethcommon "github.com/ethereum/go-ethereum/common" +) + +// RelayUrlProvider provides relay URL strings, based on relay key. +// +// Contains an internal cache, so that a given URL doesn't need to be fetched multiple times. +type RelayUrlProvider struct { + logger logging.Logger + relayRegistryCaller *relayRegistryBindings.ContractEigenDARelayRegistryCaller + relayUrlCache sync.Map +} + +// NewRelayUrlProvider constructs a RelayUrlProvider. +// +// This method initializes the provider's internal cache with the URLs of all relays that exist at the time of construction. +func NewRelayUrlProvider( + ctx context.Context, + logger logging.Logger, + ethClient common.EthClient, + relayRegistryAddress string, +) (*RelayUrlProvider, error) { + relayRegistryContractCaller, err := relayRegistryBindings.NewContractEigenDARelayRegistryCaller( + gethcommon.HexToAddress(relayRegistryAddress), + ethClient) + if err != nil { + return nil, fmt.Errorf("NewContractEigenDARelayRegistryCaller: %w", err) + } + + relayUrlProvider := &RelayUrlProvider{ + logger: logger, + relayRegistryCaller: relayRegistryContractCaller, + } + + err = relayUrlProvider.initializeCache(ctx) + if err != nil { + return nil, fmt.Errorf("initialize relay URL cache: %w", err) + } + + return relayUrlProvider, nil +} + +// GetRelayUrl gets the URL string for a given relayKey +// +// If the internal cache already knows the URL for the relayKey, the known value is returned immediately. +// If the internal cache doesn't already know the URL for the relayKey, it attempts to fetch the URL with a call to the +// EigenDARelayRegistry contract. It returns the fetched value after the contract call succeeds, or returns an error if +// the call fails. +func (rup *RelayUrlProvider) GetRelayUrl(ctx context.Context, relayKey uint32) (string, error) { + // the current contract doesn't allow updating the URL for a given relayKey, so if the value exists in the cache, + // it's guaranteed to be correct. + existingRelayUrl, valueFound := rup.relayUrlCache.Load(relayKey) + if valueFound { + return existingRelayUrl.(string), nil + } + + fetchedUrl, err := rup.relayRegistryCaller.RelayKeyToUrl(&bind.CallOpts{Context: ctx}, relayKey) + if err != nil { + return "", fmt.Errorf("fetch relay key URL from EigenDARelayRegistry contract: %w", err) + } + + rup.relayUrlCache.Store(relayKey, fetchedUrl) + + return fetchedUrl, nil +} + +// initializeCache fetches the URL for all relays that exist at the time the RelayUrlProvider is created +// +// Returns an error if unable to fetch the number of relays that exist. If any given URL fetch fails during +// initialization, no error is returned: this method will do its best to initialize all relay URLs, even +// if some fetches fail. +func (rup *RelayUrlProvider) initializeCache(ctx context.Context) error { + relayCount, err := rup.relayRegistryCaller.NextRelayKey(&bind.CallOpts{Context: ctx}) + if err != nil { + return fmt.Errorf("get next relay key from EigenDARelayRegistry contract: %w", err) + } + + for relayKey := uint32(0); relayKey < relayCount; relayKey++ { + // getting the url causes it to be saved in the cache + _, err := rup.GetRelayUrl(ctx, relayKey) + if err != nil { + rup.logger.Errorf("failed to get URL for relay key %d: %v", relayKey, err) + } + } + + return nil +} From c44fce734d863585d6f10082115824cb5a621e8b Mon Sep 17 00:00:00 2001 From: litt3 <102969658+litt3@users.noreply.github.com> Date: Tue, 25 Feb 2025 13:11:57 -0500 Subject: [PATCH 02/13] Draft usage of relay url provider Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com> --- .../v2/relay/default_relay_url_provider.go | 56 ++++++ api/clients/v2/relay/relay_url_provider.go | 15 ++ .../v2/relay/test_relay_url_provider.go | 28 +++ api/clients/v2/relay_client.go | 165 ++++++++++-------- api/clients/v2/relay_payload_retriever.go | 5 +- api/clients/v2/relay_url_provider.go | 97 ---------- core/chainio.go | 11 +- core/eth/reader.go | 67 +------ core/mock/writer.go | 28 +-- core/thegraph/state_integration_test.go | 2 +- inabox/deploy/deploy.go | 9 +- inabox/tests/integration_suite_test.go | 5 +- inabox/tests/integration_v2_test.go | 27 ++- node/node.go | 55 +----- node/node_v2_test.go | 24 ++- test/v2/client/test_client.go | 15 +- 16 files changed, 260 insertions(+), 349 deletions(-) create mode 100644 api/clients/v2/relay/default_relay_url_provider.go create mode 100644 api/clients/v2/relay/relay_url_provider.go create mode 100644 api/clients/v2/relay/test_relay_url_provider.go delete mode 100644 api/clients/v2/relay_url_provider.go diff --git a/api/clients/v2/relay/default_relay_url_provider.go b/api/clients/v2/relay/default_relay_url_provider.go new file mode 100644 index 0000000000..ad5d8c596a --- /dev/null +++ b/api/clients/v2/relay/default_relay_url_provider.go @@ -0,0 +1,56 @@ +package relay + +import ( + "context" + "fmt" + + "github.com/Layr-Labs/eigenda/common" + relayRegistryBindings "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDARelayRegistry" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + gethcommon "github.com/ethereum/go-ethereum/common" +) + +// DefaultRelayUrlProvider provides relay URL strings, based on relay key. +type DefaultRelayUrlProvider struct { + relayRegistryCaller *relayRegistryBindings.ContractEigenDARelayRegistryCaller +} + +var _ RelayUrlProvider = &DefaultRelayUrlProvider{} + +// NewDefaultRelayUrlProvider constructs a DefaultRelayUrlProvider +func NewDefaultRelayUrlProvider( + ethClient common.EthClient, + relayRegistryAddress gethcommon.Address, +) (*DefaultRelayUrlProvider, error) { + relayRegistryContractCaller, err := relayRegistryBindings.NewContractEigenDARelayRegistryCaller( + relayRegistryAddress, + ethClient) + if err != nil { + return nil, fmt.Errorf("NewContractEigenDARelayRegistryCaller: %w", err) + } + + return &DefaultRelayUrlProvider{ + relayRegistryCaller: relayRegistryContractCaller, + }, nil +} + +// GetRelayUrl gets the URL string for a given relayKey +func (rup *DefaultRelayUrlProvider) GetRelayUrl(ctx context.Context, relayKey v2.RelayKey) (string, error) { + relayUrl, err := rup.relayRegistryCaller.RelayKeyToUrl(&bind.CallOpts{Context: ctx}, relayKey) + if err != nil { + return "", fmt.Errorf("fetch relay key (%d) URL from EigenDARelayRegistry contract: %w", relayKey, err) + } + + return relayUrl, nil +} + +// GetRelayCount gets the number of relays that exist in the registry +func (rup *DefaultRelayUrlProvider) GetRelayCount(ctx context.Context) (uint32, error) { + relayCount, err := rup.relayRegistryCaller.NextRelayKey(&bind.CallOpts{Context: ctx}) + if err != nil { + return 0, fmt.Errorf("get next relay key from EigenDARelayRegistry contract: %w", err) + } + + return relayCount, nil +} diff --git a/api/clients/v2/relay/relay_url_provider.go b/api/clients/v2/relay/relay_url_provider.go new file mode 100644 index 0000000000..0fe638eec0 --- /dev/null +++ b/api/clients/v2/relay/relay_url_provider.go @@ -0,0 +1,15 @@ +package relay + +import ( + "context" + + v2 "github.com/Layr-Labs/eigenda/core/v2" +) + +// RelayUrlProvider provides relay URL strings, based on relay key +type RelayUrlProvider interface { + // GetRelayUrl gets the URL string for a given relayKey + GetRelayUrl(ctx context.Context, relayKey v2.RelayKey) (string, error) + // GetRelayCount returns the number of relays in the registry + GetRelayCount(ctx context.Context) (uint32, error) +} diff --git a/api/clients/v2/relay/test_relay_url_provider.go b/api/clients/v2/relay/test_relay_url_provider.go new file mode 100644 index 0000000000..dab6a22e07 --- /dev/null +++ b/api/clients/v2/relay/test_relay_url_provider.go @@ -0,0 +1,28 @@ +package relay + +import ( + "context" + + v2 "github.com/Layr-Labs/eigenda/core/v2" +) + +// TestRelayUrlProvider implements RelayUrlProvider, for test cases +// +// NOT SAFE for concurrent use +type TestRelayUrlProvider struct { + urlMap map[v2.RelayKey]string +} + +var _ RelayUrlProvider = &TestRelayUrlProvider{} + +func (rup *TestRelayUrlProvider) GetRelayUrl(_ context.Context, relayKey v2.RelayKey) (string, error) { + return rup.urlMap[relayKey], nil +} + +func (rup *TestRelayUrlProvider) GetRelayCount(_ context.Context) (uint32, error) { + return uint32(len(rup.urlMap)), nil +} + +func (rup *TestRelayUrlProvider) StoreRelayUrl(relayKey v2.RelayKey, url string) { + rup.urlMap[relayKey] = url +} diff --git a/api/clients/v2/relay_client.go b/api/clients/v2/relay_client.go index da1b5a4f12..7810c4653d 100644 --- a/api/clients/v2/relay_client.go +++ b/api/clients/v2/relay_client.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" + "github.com/Layr-Labs/eigenda/api/clients/v2/relay" relaygrpc "github.com/Layr-Labs/eigenda/api/grpc/relay" "github.com/Layr-Labs/eigenda/api/hashing" "github.com/Layr-Labs/eigenda/core" @@ -19,7 +20,6 @@ import ( type MessageSigner func(ctx context.Context, data [32]byte) (*core.Signature, error) type RelayClientConfig struct { - Sockets map[corev2.RelayKey]string UseSecureGrpcFlag bool MaxGRPCMessageSize uint OperatorID *core.OperatorID @@ -48,56 +48,60 @@ type RelayClient interface { // The returned slice has the same length and ordering as the input slice, and the i-th element is the bundle for the i-th request. // Each bundle is a sequence of frames in raw form (i.e., serialized core.Bundle bytearray). GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByIndex) ([][]byte, error) - // GetSockets returns the relay sockets - GetSockets() map[corev2.RelayKey]string Close() error } +// relayClient is a client for the entire relay subsystem. +// +// It is a wrapper around a collection of grpc relay clients, which are used to interact with individual relays. type relayClient struct { - config *RelayClientConfig - - // initOnce is used to ensure that the connection to each relay is initialized only once. - // It maps relay key to a sync.Once instance: `map[corev2.RelayKey]*sync.Once` - initOnce *sync.Map - // conns maps relay key to the gRPC connection: `map[corev2.RelayKey]*grpc.ClientConn` - conns sync.Map logger logging.Logger - - // grpcClients maps relay key to the gRPC client: `map[corev2.RelayKey]relaygrpc.RelayClient` - grpcClients sync.Map + config *RelayClientConfig + // initOnce is used to ensure that the connection to each relay is initialized only once + initOnce map[corev2.RelayKey]*sync.Once + // initOnceMutex protects access to the initOnce map + initOnceMutex sync.Mutex + // clientConnections maps relay key to the gRPC connection: `map[corev2.RelayKey]*grpc.ClientConn` + // this map is maintained so that connections can be closed in Close + clientConnections sync.Map + // grpcRelayClients maps relay key to the gRPC client: `map[corev2.RelayKey]relaygrpc.RelayClient` + // these grpc relay clients are used to communicate with individual relays + grpcRelayClients sync.Map + // relayUrlProvider knows how to retrieve the relay URLs, and maintains an internal URL cache + relayUrlProvider relay.RelayUrlProvider } var _ RelayClient = (*relayClient)(nil) // NewRelayClient creates a new RelayClient that connects to the relays specified in the config. // It keeps a connection to each relay and reuses it for subsequent requests, and the connection is lazily instantiated. -func NewRelayClient(config *RelayClientConfig, logger logging.Logger) (RelayClient, error) { +func NewRelayClient( + config *RelayClientConfig, + logger logging.Logger, + relayUrlProvider relay.RelayUrlProvider, +) (RelayClient, error) { + if config == nil { return nil, errors.New("nil config") - } else if len(config.Sockets) <= 0 { - return nil, errors.New("no relay sockets provided") - } else if config.MaxGRPCMessageSize == 0 { + } + + if config.MaxGRPCMessageSize == 0 { return nil, errors.New("max gRPC message size must be greater than 0") } - logger.Info("creating relay client", "urls", config.Sockets) + logger.Info("creating relay client") - initOnce := sync.Map{} - for key := range config.Sockets { - initOnce.Store(key, &sync.Once{}) - } return &relayClient{ - config: config, - - initOnce: &initOnce, - logger: logger.With("component", "RelayClient"), + config: config, + logger: logger.With("component", "RelayClient"), + relayUrlProvider: relayUrlProvider, }, nil } func (c *relayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) { - client, err := c.getClient(relayKey) + client, err := c.getClient(ctx, relayKey) if err != nil { - return nil, err + return nil, fmt.Errorf("get grpc client for key %d: %w", relayKey, err) } res, err := client.GetBlob(ctx, &relaygrpc.GetBlobRequest{ @@ -140,9 +144,10 @@ func (c *relayClient) GetChunksByRange( if len(requests) == 0 { return nil, fmt.Errorf("no requests") } - client, err := c.getClient(relayKey) + + client, err := c.getClient(ctx, relayKey) if err != nil { - return nil, err + return nil, fmt.Errorf("get grpc relay client for key %d: %w", relayKey, err) } grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests)) @@ -184,9 +189,9 @@ func (c *relayClient) GetChunksByIndex( return nil, fmt.Errorf("no requests") } - client, err := c.getClient(relayKey) + client, err := c.getClient(ctx, relayKey) if err != nil { - return nil, err + return nil, fmt.Errorf("get grpc relay client for key %d: %w", relayKey, err) } grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests)) @@ -219,11 +224,12 @@ func (c *relayClient) GetChunksByIndex( return res.GetData(), nil } -func (c *relayClient) getClient(key corev2.RelayKey) (relaygrpc.RelayClient, error) { - if err := c.initOnceGrpcConnection(key); err != nil { - return nil, err +// getClient gets the grpc relay client, which has a connection to a given relay +func (c *relayClient) getClient(ctx context.Context, key corev2.RelayKey) (relaygrpc.RelayClient, error) { + if err := c.initOnceGrpcConnection(ctx, key); err != nil { + return nil, fmt.Errorf("init grpc connection for key %d: %w", key, err) } - maybeClient, ok := c.grpcClients.Load(key) + maybeClient, ok := c.grpcRelayClients.Load(key) if !ok { return nil, fmt.Errorf("no grpc client for relay key: %v", key) } @@ -234,53 +240,62 @@ func (c *relayClient) getClient(key corev2.RelayKey) (relaygrpc.RelayClient, err return client, nil } -func (c *relayClient) initOnceGrpcConnection(key corev2.RelayKey) error { - var initErr error - once, ok := c.initOnce.Load(key) +// initOnceGrpcConnection initializes the GRPC connection for a given relay, and is guaranteed to only do perform +// the initialization once per relay. +func (c *relayClient) initOnceGrpcConnection(ctx context.Context, key corev2.RelayKey) error { + // we must use a mutex here instead of a sync.Map, because this method could be called concurrently, and if + // two concurrent calls tried to `LoadOrStore` from a sync.Map at the same time, it's possible they would + // each create a unique sync.Once object, and perform duplicate initialization + c.initOnceMutex.Lock() + once, ok := c.initOnce[key] if !ok { - return fmt.Errorf("unknown relay key: %v", key) + once = &sync.Once{} + c.initOnce[key] = once } - once.(*sync.Once).Do(func() { - socket, ok := c.config.Sockets[key] - if !ok { - initErr = fmt.Errorf("unknown relay key: %v", key) - return - } - dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag, c.config.MaxGRPCMessageSize) - conn, err := grpc.NewClient(socket, dialOptions...) - if err != nil { - initErr = err - return - } - c.conns.Store(key, conn) - c.grpcClients.Store(key, relaygrpc.NewRelayClient(conn)) - }) - return initErr -} + c.initOnceMutex.Unlock() + + var initErr error + once.Do( + func() { + relayUrl, err := c.relayUrlProvider.GetRelayUrl(ctx, key) + if err != nil { + initErr = fmt.Errorf("get relay url for key %d: %w", key, err) + return + } + + dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag, c.config.MaxGRPCMessageSize) + conn, err := grpc.NewClient(relayUrl, dialOptions...) + if err != nil { + initErr = fmt.Errorf("create grpc client for key %d: %w", key, err) + return + } + c.clientConnections.Store(key, conn) + c.grpcRelayClients.Store(key, relaygrpc.NewRelayClient(conn)) + }) -func (c *relayClient) GetSockets() map[corev2.RelayKey]string { - return c.config.Sockets + return initErr } func (c *relayClient) Close() error { var errList *multierror.Error - c.conns.Range(func(k, v interface{}) bool { - conn, ok := v.(*grpc.ClientConn) - if !ok { - errList = multierror.Append(errList, fmt.Errorf("invalid connection for relay key: %v", k)) - return true - } + c.clientConnections.Range( + func(k, v interface{}) bool { + conn, ok := v.(*grpc.ClientConn) + if !ok { + errList = multierror.Append(errList, fmt.Errorf("invalid connection for relay key: %v", k)) + return true + } - if conn != nil { - err := conn.Close() - c.conns.Delete(k) - c.grpcClients.Delete(k) - if err != nil { - c.logger.Error("failed to close connection", "err", err) - errList = multierror.Append(errList, err) + if conn != nil { + err := conn.Close() + c.clientConnections.Delete(k) + c.grpcRelayClients.Delete(k) + if err != nil { + c.logger.Error("failed to close connection", "err", err) + errList = multierror.Append(errList, err) + } } - } - return true - }) + return true + }) return errList.ErrorOrNil() } diff --git a/api/clients/v2/relay_payload_retriever.go b/api/clients/v2/relay_payload_retriever.go index ac25e9594f..2ec2b3c483 100644 --- a/api/clients/v2/relay_payload_retriever.go +++ b/api/clients/v2/relay_payload_retriever.go @@ -7,6 +7,7 @@ import ( "math/rand" "github.com/Layr-Labs/eigenda/api/clients/v2/coretypes" + "github.com/Layr-Labs/eigenda/api/clients/v2/relay" "github.com/Layr-Labs/eigenda/api/clients/v2/verification" core "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigensdk-go/logging" @@ -41,7 +42,9 @@ func BuildRelayPayloadRetriever( return nil, fmt.Errorf("check and set RelayPayloadRetrieverConfig config: %w", err) } - relayClient, err := NewRelayClient(relayClientConfig, log) + testRelayUrlProvider := relay.TestRelayUrlProvider{} + + relayClient, err := NewRelayClient(relayClientConfig, log, &testRelayUrlProvider) if err != nil { return nil, fmt.Errorf("new relay client: %w", err) } diff --git a/api/clients/v2/relay_url_provider.go b/api/clients/v2/relay_url_provider.go deleted file mode 100644 index a24b7d3410..0000000000 --- a/api/clients/v2/relay_url_provider.go +++ /dev/null @@ -1,97 +0,0 @@ -package clients - -import ( - "context" - "fmt" - "sync" - - "github.com/Layr-Labs/eigenda/common" - relayRegistryBindings "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDARelayRegistry" - "github.com/Layr-Labs/eigensdk-go/logging" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - gethcommon "github.com/ethereum/go-ethereum/common" -) - -// RelayUrlProvider provides relay URL strings, based on relay key. -// -// Contains an internal cache, so that a given URL doesn't need to be fetched multiple times. -type RelayUrlProvider struct { - logger logging.Logger - relayRegistryCaller *relayRegistryBindings.ContractEigenDARelayRegistryCaller - relayUrlCache sync.Map -} - -// NewRelayUrlProvider constructs a RelayUrlProvider. -// -// This method initializes the provider's internal cache with the URLs of all relays that exist at the time of construction. -func NewRelayUrlProvider( - ctx context.Context, - logger logging.Logger, - ethClient common.EthClient, - relayRegistryAddress string, -) (*RelayUrlProvider, error) { - relayRegistryContractCaller, err := relayRegistryBindings.NewContractEigenDARelayRegistryCaller( - gethcommon.HexToAddress(relayRegistryAddress), - ethClient) - if err != nil { - return nil, fmt.Errorf("NewContractEigenDARelayRegistryCaller: %w", err) - } - - relayUrlProvider := &RelayUrlProvider{ - logger: logger, - relayRegistryCaller: relayRegistryContractCaller, - } - - err = relayUrlProvider.initializeCache(ctx) - if err != nil { - return nil, fmt.Errorf("initialize relay URL cache: %w", err) - } - - return relayUrlProvider, nil -} - -// GetRelayUrl gets the URL string for a given relayKey -// -// If the internal cache already knows the URL for the relayKey, the known value is returned immediately. -// If the internal cache doesn't already know the URL for the relayKey, it attempts to fetch the URL with a call to the -// EigenDARelayRegistry contract. It returns the fetched value after the contract call succeeds, or returns an error if -// the call fails. -func (rup *RelayUrlProvider) GetRelayUrl(ctx context.Context, relayKey uint32) (string, error) { - // the current contract doesn't allow updating the URL for a given relayKey, so if the value exists in the cache, - // it's guaranteed to be correct. - existingRelayUrl, valueFound := rup.relayUrlCache.Load(relayKey) - if valueFound { - return existingRelayUrl.(string), nil - } - - fetchedUrl, err := rup.relayRegistryCaller.RelayKeyToUrl(&bind.CallOpts{Context: ctx}, relayKey) - if err != nil { - return "", fmt.Errorf("fetch relay key URL from EigenDARelayRegistry contract: %w", err) - } - - rup.relayUrlCache.Store(relayKey, fetchedUrl) - - return fetchedUrl, nil -} - -// initializeCache fetches the URL for all relays that exist at the time the RelayUrlProvider is created -// -// Returns an error if unable to fetch the number of relays that exist. If any given URL fetch fails during -// initialization, no error is returned: this method will do its best to initialize all relay URLs, even -// if some fetches fail. -func (rup *RelayUrlProvider) initializeCache(ctx context.Context) error { - relayCount, err := rup.relayRegistryCaller.NextRelayKey(&bind.CallOpts{Context: ctx}) - if err != nil { - return fmt.Errorf("get next relay key from EigenDARelayRegistry contract: %w", err) - } - - for relayKey := uint32(0); relayKey < relayCount; relayKey++ { - // getting the url causes it to be saved in the cache - _, err := rup.GetRelayUrl(ctx, relayKey) - if err != nil { - rup.logger.Errorf("failed to get URL for relay key %d: %v", relayKey, err) - } - } - - return nil -} diff --git a/core/chainio.go b/core/chainio.go index 645d9c14af..04029bddc1 100644 --- a/core/chainio.go +++ b/core/chainio.go @@ -128,17 +128,10 @@ type Reader interface { // GetOnDemandPaymentByAccount returns on-demand payment of an account GetOnDemandPaymentByAccount(ctx context.Context, accountID gethcommon.Address) (*OnDemandPayment, error) - // GetNumRelays returns the number of registered relays. - GetNumRelays(ctx context.Context) (uint32, error) - - // GetRelayURL returns the relay URL address for the given key. - GetRelayURL(ctx context.Context, key uint32) (string, error) - - // GetRelayURLs returns the relay URL addresses for all relays. - GetRelayURLs(ctx context.Context) (map[uint32]string, error) - // GetDisperserAddress returns the disperser address with the given ID. GetDisperserAddress(ctx context.Context, disperserID uint32) (gethcommon.Address, error) + + GetRelayRegistryAddress() gethcommon.Address } type Writer interface { diff --git a/core/eth/reader.go b/core/eth/reader.go index 9971ff594c..bfd121583e 100644 --- a/core/eth/reader.go +++ b/core/eth/reader.go @@ -37,6 +37,7 @@ import ( type ContractBindings struct { RegCoordinatorAddr gethcommon.Address ServiceManagerAddr gethcommon.Address + RelayRegistryAddress gethcommon.Address DelegationManager *delegationmgr.ContractDelegationManager OpStateRetriever *opstateretriever.ContractOperatorStateRetriever BLSApkRegistry *blsapkreg.ContractBLSApkRegistry @@ -192,16 +193,10 @@ func (t *Reader) updateContractBindings(blsOperatorStateRetrieverAddr, eigenDASe } } - var contractRelayRegistry *relayreg.ContractEigenDARelayRegistry - relayRegistryAddr, err := contractEigenDAServiceManager.EigenDARelayRegistry(&bind.CallOpts{}) + relayRegistryAddress, err := contractEigenDAServiceManager.EigenDARelayRegistry(&bind.CallOpts{}) if err != nil { t.logger.Error("Failed to fetch IEigenDARelayRegistry contract", "err", err) // TODO(ian-shim): return err when the contract is deployed - } else { - contractRelayRegistry, err = relayreg.NewContractEigenDARelayRegistry(relayRegistryAddr, t.ethClient) - if err != nil { - t.logger.Error("Failed to fetch IEigenDARelayRegistry contract", "err", err) - } } var contractThresholdRegistry *thresholdreg.ContractEigenDAThresholdRegistry @@ -248,6 +243,7 @@ func (t *Reader) updateContractBindings(blsOperatorStateRetrieverAddr, eigenDASe t.bindings = &ContractBindings{ ServiceManagerAddr: eigenDAServiceManagerAddr, RegCoordinatorAddr: registryCoordinatorAddr, + RelayRegistryAddress: relayRegistryAddress, AVSDirectory: contractAVSDirectory, SocketRegistry: contractSocketRegistry, OpStateRetriever: contractBLSOpStateRetr, @@ -258,7 +254,6 @@ func (t *Reader) updateContractBindings(blsOperatorStateRetrieverAddr, eigenDASe StakeRegistry: contractStakeRegistry, EigenDAServiceManager: contractEigenDAServiceManager, DelegationManager: contractDelegationManager, - RelayRegistry: contractRelayRegistry, PaymentVault: contractPaymentVault, ThresholdRegistry: contractThresholdRegistry, DisperserRegistry: contractEigenDADisperserRegistry, @@ -933,58 +928,6 @@ func (t *Reader) GetOperatorSocket(ctx context.Context, operatorId core.Operator return socket, nil } -func (t *Reader) GetNumRelays(ctx context.Context) (uint32, error) { - if t.bindings.RelayRegistry == nil { - return 0, errors.New("relay registry not deployed") - } - - return t.bindings.RelayRegistry.NextRelayKey(&bind.CallOpts{ - Context: ctx, - }) -} - -func (t *Reader) GetRelayURL(ctx context.Context, key uint32) (string, error) { - if t.bindings.RelayRegistry == nil { - return "", errors.New("relay registry not deployed") - } - - return t.bindings.RelayRegistry.RelayKeyToUrl(&bind.CallOpts{ - Context: ctx, - }, uint32(key)) -} - -func (t *Reader) GetRelayURLs(ctx context.Context) (map[uint32]string, error) { - if t.bindings.RelayRegistry == nil { - return nil, errors.New("relay registry not deployed") - } - - numRelays, err := t.GetNumRelays(ctx) - if err != nil { - return nil, err - } - - res := make(map[uint32]string) - for relayKey := uint32(0); relayKey < numRelays; relayKey++ { - url, err := t.bindings.RelayRegistry.RelayKeyToUrl(&bind.CallOpts{ - Context: ctx, - }, relayKey) - - if err != nil && strings.Contains(err.Error(), "execution reverted") { - break - } else if err != nil { - return nil, err - } - - res[relayKey] = url - } - - if len(res) == 0 { - return nil, errors.New("no relay URLs found") - } - - return res, nil -} - func (t *Reader) GetDisperserAddress(ctx context.Context, disperserID uint32) (gethcommon.Address, error) { registry := t.bindings.DisperserRegistry if registry == nil { @@ -1007,3 +950,7 @@ func (t *Reader) GetDisperserAddress(ctx context.Context, disperserID uint32) (g return address, nil } + +func (t *Reader) GetRelayRegistryAddress() gethcommon.Address { + return t.bindings.RelayRegistryAddress +} diff --git a/core/mock/writer.go b/core/mock/writer.go index 4a173e4a2f..dd3c095077 100644 --- a/core/mock/writer.go +++ b/core/mock/writer.go @@ -282,28 +282,6 @@ func (t *MockWriter) GetNumRelays(ctx context.Context) (uint32, error) { return result.(uint32), args.Error(1) } -func (t *MockWriter) GetRelayURL(ctx context.Context, key uint32) (string, error) { - args := t.Called() - if args.Get(0) == nil { - return "", args.Error(1) - } - result := args.Get(0) - return result.(string), args.Error(1) -} - -func (t *MockWriter) GetRelayURLs(ctx context.Context) (map[uint32]string, error) { - args := t.Called() - if args.Get(0) == nil { - return nil, args.Error(1) - } - result := args.Get(0) - if result == nil { - return nil, args.Error(1) - } - - return result.(map[uint32]string), args.Error(1) -} - func (t *MockWriter) GetDisperserAddress(ctx context.Context, disperserID uint32) (gethcommon.Address, error) { args := t.Called(disperserID) result := args.Get(0) @@ -314,3 +292,9 @@ func (t *MockWriter) GetDisperserAddress(ctx context.Context, disperserID uint32 return result.(gethcommon.Address), args.Error(1) } + +func (t *MockWriter) GetRelayRegistryAddress() gethcommon.Address { + args := t.Called() + result := args.Get(0) + return result.(gethcommon.Address) +} diff --git a/core/thegraph/state_integration_test.go b/core/thegraph/state_integration_test.go index fa89ed209c..3734932bf3 100644 --- a/core/thegraph/state_integration_test.go +++ b/core/thegraph/state_integration_test.go @@ -72,7 +72,7 @@ func setup() { if err != nil { panic(err) } - _ = testConfig.RegisterBlobVersionAndRelays(ethClient) + testConfig.RegisterBlobVersionAndRelays(ethClient) fmt.Println("Starting binaries") testConfig.StartBinaries() diff --git a/inabox/deploy/deploy.go b/inabox/deploy/deploy.go index 3e0c2a0e48..29625678c9 100644 --- a/inabox/deploy/deploy.go +++ b/inabox/deploy/deploy.go @@ -265,7 +265,7 @@ func (env *Config) RegisterDisperserKeypair(ethClient common.EthClient) error { return fmt.Errorf("timed out waiting for disperser address to be set") } -func (env *Config) RegisterBlobVersionAndRelays(ethClient common.EthClient) map[uint32]string { +func (env *Config) RegisterBlobVersionAndRelays(ethClient common.EthClient) { dasmAddr := gcommon.HexToAddress(env.EigenDA.ServiceManager) contractEigenDAServiceManager, err := eigendasrvmg.NewContractEigenDAServiceManager(dasmAddr, ethClient) if err != nil { @@ -306,9 +306,9 @@ func (env *Config) RegisterBlobVersionAndRelays(ethClient common.EthClient) map[ if err != nil { log.Panicf("Error: %s", err) } - relays := map[uint32]string{} + ethAddr := ethClient.GetAccountAddress() - for i, relayVars := range env.Relays { + for _, relayVars := range env.Relays { url := fmt.Sprintf("0.0.0.0:%s", relayVars.RELAY_GRPC_PORT) txn, err := contractRelayRegistry.AddRelayInfo(opts, relayreg.RelayInfo{ RelayAddress: ethAddr, @@ -321,10 +321,7 @@ func (env *Config) RegisterBlobVersionAndRelays(ethClient common.EthClient) map[ if err != nil { log.Panicf("Error: %s", err) } - relays[uint32(i)] = url } - - return relays } // TODO: Supply the test path to the runner utility diff --git a/inabox/tests/integration_suite_test.go b/inabox/tests/integration_suite_test.go index 65a232ca9d..1a24d89321 100644 --- a/inabox/tests/integration_suite_test.go +++ b/inabox/tests/integration_suite_test.go @@ -19,7 +19,6 @@ import ( "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/eth" "github.com/Layr-Labs/eigenda/core/thegraph" - corev2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/encoding/kzg" "github.com/Layr-Labs/eigenda/encoding/kzg/verifier" "github.com/Layr-Labs/eigenda/inabox/deploy" @@ -53,7 +52,7 @@ var ( retrievalClientV2 clientsv2.RetrievalClient numConfirmations int = 3 numRetries = 0 - relays = map[corev2.RelayKey]string{} + chainReader core.Reader cancel context.CancelFunc ) @@ -133,7 +132,7 @@ var _ = BeforeSuite(func() { Expect(err).To(BeNil()) fmt.Println("Registering blob versions and relays") - relays = testConfig.RegisterBlobVersionAndRelays(ethClient) + testConfig.RegisterBlobVersionAndRelays(ethClient) fmt.Println("Registering disperser keypair") err = testConfig.RegisterDisperserKeypair(ethClient) diff --git a/inabox/tests/integration_v2_test.go b/inabox/tests/integration_v2_test.go index 8dfa18abbd..cfe1ba01e8 100644 --- a/inabox/tests/integration_v2_test.go +++ b/inabox/tests/integration_v2_test.go @@ -8,6 +8,7 @@ import ( "math/big" "time" + "github.com/Layr-Labs/eigenda/api/clients/v2/relay" "github.com/docker/go-units" "github.com/Layr-Labs/eigenda/api/clients/v2" @@ -208,24 +209,32 @@ var _ = Describe("Inabox v2 Integration", func() { ) Expect(err).To(BeNil()) - // Test retrieval from relay - relayClient, err := clients.NewRelayClient(&clients.RelayClientConfig{ - Sockets: relays, + relayClientConfig := &clients.RelayClientConfig{ MaxGRPCMessageSize: units.GiB, - }, logger) + } + + relayUrlProvider, err := relay.NewDefaultRelayUrlProvider(ethClient, chainReader.GetRelayRegistryAddress()) + Expect(err).To(BeNil()) + + // Test retrieval from relay + relayClient, err := clients.NewRelayClient(relayClientConfig, logger, relayUrlProvider) Expect(err).To(BeNil()) blob1Relays := make(map[corev2.RelayKey]struct{}, 0) blob2Relays := make(map[corev2.RelayKey]struct{}, 0) for _, k := range blobCert1.RelayKeys { - blob1Relays[corev2.RelayKey(k)] = struct{}{} + blob1Relays[k] = struct{}{} } for _, k := range blobCert2.RelayKeys { - blob2Relays[corev2.RelayKey(k)] = struct{}{} + blob2Relays[k] = struct{}{} } - for relayKey := range relays { + + relayCount, err := relayUrlProvider.GetRelayCount(ctx) + Expect(err).To(BeNil()) + + for relayKey := uint32(0); relayKey < relayCount; relayKey++ { blob1, err := relayClient.GetBlob(ctx, relayKey, key1) - if _, ok := blob1Relays[corev2.RelayKey(relayKey)]; ok { + if _, ok := blob1Relays[relayKey]; ok { Expect(err).To(BeNil()) Expect(blob1).To(Equal(paddedData1)) } else { @@ -233,7 +242,7 @@ var _ = Describe("Inabox v2 Integration", func() { } blob2, err := relayClient.GetBlob(ctx, relayKey, key2) - if _, ok := blob2Relays[corev2.RelayKey(relayKey)]; ok { + if _, ok := blob2Relays[relayKey]; ok { Expect(err).To(BeNil()) Expect(blob2).To(Equal(paddedData2)) } else { diff --git a/node/node.go b/node/node.go index e35c00b11d..1285674ce3 100644 --- a/node/node.go +++ b/node/node.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "io" - "maps" "math" "math/big" "net/http" @@ -18,6 +17,7 @@ import ( "sync/atomic" "time" + "github.com/Layr-Labs/eigenda/api/clients/v2/relay" "github.com/Layr-Labs/eigenda/common/kvstore/tablestore" "github.com/Layr-Labs/eigenda/common/pprof" "github.com/Layr-Labs/eigenda/common/pubip" @@ -237,21 +237,19 @@ func NewNode( } blobVersionParams = corev2.NewBlobVersionParameterMap(blobParams) - var relayClient clients.RelayClient - relayURLs, err := tx.GetRelayURLs(context.Background()) - if err != nil { - return nil, fmt.Errorf("failed to get relay URLs: %w", err) - } - - logger.Info("Creating relay client", "relayURLs", relayURLs) - relayClient, err = clients.NewRelayClient(&clients.RelayClientConfig{ - Sockets: relayURLs, + relayClientConfig := &clients.RelayClientConfig{ UseSecureGrpcFlag: config.UseSecureGrpc, OperatorID: &config.ID, MessageSigner: n.SignMessage, MaxGRPCMessageSize: n.Config.RelayMaxMessageSize, - }, logger) + } + + relayUrlProvider, err := relay.NewDefaultRelayUrlProvider(client, tx.GetRelayRegistryAddress()) + if err != nil { + return nil, fmt.Errorf("create relay url provider: %w", err) + } + relayClient, err := clients.NewRelayClient(relayClientConfig, logger, relayUrlProvider) if err != nil { return nil, fmt.Errorf("failed to create new relay client: %w", err) } @@ -408,41 +406,6 @@ func (n *Node) RefreshOnchainState(ctx context.Context) error { } else { n.Logger.Error("error fetching blob params", "err", err) } - - existingRelayClient, ok := n.RelayClient.Load().(clients.RelayClient) - if !ok { - n.Logger.Error("error fetching relay client") - continue - } - - existingURLs := map[corev2.RelayKey]string{} - if existingRelayClient != nil { - existingURLs = existingRelayClient.GetSockets() - } - relayURLs, err := n.Transactor.GetRelayURLs(ctx) - if err != nil { - n.Logger.Error("error fetching relay URLs", "err", err) - continue - } - - if maps.Equal(existingURLs, relayURLs) { - n.Logger.Info("No change in relay URLs") - continue - } - - relayClient, err := clients.NewRelayClient(&clients.RelayClientConfig{ - Sockets: relayURLs, - UseSecureGrpcFlag: n.Config.UseSecureGrpc, - OperatorID: &n.Config.ID, - MessageSigner: n.SignMessage, - MaxGRPCMessageSize: n.Config.RelayMaxMessageSize, - }, n.Logger) - if err != nil { - n.Logger.Error("error creating relay client", "err", err) - continue - } - - n.RelayClient.Store(clients.RelayClient(relayClient)) case <-ctx.Done(): return ctx.Err() } diff --git a/node/node_v2_test.go b/node/node_v2_test.go index 9508912a05..e29d198897 100644 --- a/node/node_v2_test.go +++ b/node/node_v2_test.go @@ -3,10 +3,12 @@ package node_test import ( "context" "fmt" - "github.com/docker/go-units" "testing" "time" + "github.com/Layr-Labs/eigenda/api/clients/v2/relay" + "github.com/docker/go-units" + "github.com/Layr-Labs/eigenda/api/clients/v2" "github.com/Layr-Labs/eigenda/core" v2 "github.com/Layr-Labs/eigenda/core/v2" @@ -249,20 +251,21 @@ func TestRefreshOnchainStateSuccess(t *testing.T) { c := newComponents(t, op0) c.node.Config.EnableV2 = true c.node.Config.OnchainStateRefreshInterval = time.Millisecond - relayURLs := map[v2.RelayKey]string{ - 0: "http://localhost:8080", - } + + relayUrlProvider := &relay.TestRelayUrlProvider{} + relayUrlProvider.StoreRelayUrl(0, "http://localhost:8080") messageSigner := func(ctx context.Context, data [32]byte) (*core.Signature, error) { return nil, nil } - relayClient, err := clients.NewRelayClient(&clients.RelayClientConfig{ - Sockets: relayURLs, + relayClientConfig := &clients.RelayClientConfig{ OperatorID: &c.node.Config.ID, MessageSigner: messageSigner, MaxGRPCMessageSize: units.GiB, - }, c.node.Logger) + } + + relayClient, err := clients.NewRelayClient(relayClientConfig, c.node.Logger, relayUrlProvider) require.NoError(t, err) // set up non-mock client c.node.RelayClient.Store(relayClient) @@ -286,11 +289,7 @@ func TestRefreshOnchainStateSuccess(t *testing.T) { 0: blobParams, 1: blobParams2, }, nil) - newRelayURLs := map[v2.RelayKey]string{ - 1: "http://localhost:8081", - 2: "http://localhost:8082", - } - c.tx.On("GetRelayURLs", mock.Anything).Return(newRelayURLs, nil) + err = c.node.RefreshOnchainState(newCtx) require.ErrorIs(t, err, context.DeadlineExceeded) bp, ok = c.node.BlobVersionParams.Load().Get(0) @@ -301,7 +300,6 @@ func TestRefreshOnchainStateSuccess(t *testing.T) { require.Equal(t, bp, blobParams2) newRelayClient := c.node.RelayClient.Load().(clients.RelayClient) require.NotSame(t, relayClient, newRelayClient) - require.Equal(t, newRelayURLs, newRelayClient.GetSockets()) } func bundleEqual(t *testing.T, expected, actual core.Bundle) { diff --git a/test/v2/client/test_client.go b/test/v2/client/test_client.go index 827402d513..8cb589cf7f 100644 --- a/test/v2/client/test_client.go +++ b/test/v2/client/test_client.go @@ -10,6 +10,7 @@ import ( "github.com/Layr-Labs/eigenda/api/clients/v2" "github.com/Layr-Labs/eigenda/api/clients/v2/coretypes" + relayv2 "github.com/Layr-Labs/eigenda/api/clients/v2/relay" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/encoding/kzg/prover" "github.com/prometheus/client_golang/prometheus" @@ -185,11 +186,6 @@ func NewTestClient( return nil, fmt.Errorf("failed to create Ethereum reader: %w", err) } - relayURLS, err := ethReader.GetRelayURLs(context.Background()) - if err != nil { - return nil, fmt.Errorf("failed to get relay URLs: %w", err) - } - // If the relay client attempts to call GetChunks(), it will use this bogus signer. // This is expected to be rejected by the relays, since this client is not authorized to call GetChunks(). rand := random.NewTestRandom() @@ -203,13 +199,18 @@ func NewTestClient( } relayConfig := &clients.RelayClientConfig{ - Sockets: relayURLS, UseSecureGrpcFlag: true, MaxGRPCMessageSize: units.GiB, OperatorID: &core.OperatorID{0}, MessageSigner: fakeSigner, } - relayClient, err := clients.NewRelayClient(relayConfig, logger) + + relayUrlProvider, err := relayv2.NewDefaultRelayUrlProvider(ethClient, ethReader.GetRelayRegistryAddress()) + if err != nil { + return nil, fmt.Errorf("create relay url provider: %w", err) + } + + relayClient, err := clients.NewRelayClient(relayConfig, logger, relayUrlProvider) if err != nil { return nil, fmt.Errorf("failed to create relay client: %w", err) } From 83c2623a711e472e6530a3fdbbdcafbc409f67e5 Mon Sep 17 00:00:00 2001 From: litt3 <102969658+litt3@users.noreply.github.com> Date: Tue, 25 Feb 2025 13:32:10 -0500 Subject: [PATCH 03/13] Make formatting fixes Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com> --- api/clients/v2/relay/default_relay_url_provider.go | 5 +++-- api/clients/v2/relay_client.go | 4 +++- core/chainio.go | 1 + core/eth/reader.go | 4 ++-- inabox/tests/integration_suite_test.go | 2 +- 5 files changed, 10 insertions(+), 6 deletions(-) diff --git a/api/clients/v2/relay/default_relay_url_provider.go b/api/clients/v2/relay/default_relay_url_provider.go index ad5d8c596a..968d4d98ed 100644 --- a/api/clients/v2/relay/default_relay_url_provider.go +++ b/api/clients/v2/relay/default_relay_url_provider.go @@ -24,8 +24,7 @@ func NewDefaultRelayUrlProvider( relayRegistryAddress gethcommon.Address, ) (*DefaultRelayUrlProvider, error) { relayRegistryContractCaller, err := relayRegistryBindings.NewContractEigenDARelayRegistryCaller( - relayRegistryAddress, - ethClient) + relayRegistryAddress, ethClient) if err != nil { return nil, fmt.Errorf("NewContractEigenDARelayRegistryCaller: %w", err) } @@ -47,6 +46,8 @@ func (rup *DefaultRelayUrlProvider) GetRelayUrl(ctx context.Context, relayKey v2 // GetRelayCount gets the number of relays that exist in the registry func (rup *DefaultRelayUrlProvider) GetRelayCount(ctx context.Context) (uint32, error) { + // NextRelayKey initializes to 0, and is incremented each time a relay is added + // current logic doesn't support removing relays, so NextRelayKey therefore corresponds directly to relay count relayCount, err := rup.relayRegistryCaller.NextRelayKey(&bind.CallOpts{Context: ctx}) if err != nil { return 0, fmt.Errorf("get next relay key from EigenDARelayRegistry contract: %w", err) diff --git a/api/clients/v2/relay_client.go b/api/clients/v2/relay_client.go index 7810c4653d..594e19a7ca 100644 --- a/api/clients/v2/relay_client.go +++ b/api/clients/v2/relay_client.go @@ -67,7 +67,7 @@ type relayClient struct { // grpcRelayClients maps relay key to the gRPC client: `map[corev2.RelayKey]relaygrpc.RelayClient` // these grpc relay clients are used to communicate with individual relays grpcRelayClients sync.Map - // relayUrlProvider knows how to retrieve the relay URLs, and maintains an internal URL cache + // relayUrlProvider knows how to retrieve the relay URLs relayUrlProvider relay.RelayUrlProvider } @@ -254,6 +254,8 @@ func (c *relayClient) initOnceGrpcConnection(ctx context.Context, key corev2.Rel } c.initOnceMutex.Unlock() + // TODO (litt3): should we implement a way to rebuild connections that break, or fail to initialize? as it currently + // stands, if this initialization fails, or a connection breaks, the relay client will never speak to that relay again var initErr error once.Do( func() { diff --git a/core/chainio.go b/core/chainio.go index 04029bddc1..ccb3a1a737 100644 --- a/core/chainio.go +++ b/core/chainio.go @@ -131,6 +131,7 @@ type Reader interface { // GetDisperserAddress returns the disperser address with the given ID. GetDisperserAddress(ctx context.Context, disperserID uint32) (gethcommon.Address, error) + // GetRelayRegistryAddress returns the Address of the EigenDARelayRegistry contract GetRelayRegistryAddress() gethcommon.Address } diff --git a/core/eth/reader.go b/core/eth/reader.go index bfd121583e..8241e60d35 100644 --- a/core/eth/reader.go +++ b/core/eth/reader.go @@ -37,7 +37,7 @@ import ( type ContractBindings struct { RegCoordinatorAddr gethcommon.Address ServiceManagerAddr gethcommon.Address - RelayRegistryAddress gethcommon.Address + RelayRegistryAddress gethcommon.Address DelegationManager *delegationmgr.ContractDelegationManager OpStateRetriever *opstateretriever.ContractOperatorStateRetriever BLSApkRegistry *blsapkreg.ContractBLSApkRegistry @@ -243,7 +243,7 @@ func (t *Reader) updateContractBindings(blsOperatorStateRetrieverAddr, eigenDASe t.bindings = &ContractBindings{ ServiceManagerAddr: eigenDAServiceManagerAddr, RegCoordinatorAddr: registryCoordinatorAddr, - RelayRegistryAddress: relayRegistryAddress, + RelayRegistryAddress: relayRegistryAddress, AVSDirectory: contractAVSDirectory, SocketRegistry: contractSocketRegistry, OpStateRetriever: contractBLSOpStateRetr, diff --git a/inabox/tests/integration_suite_test.go b/inabox/tests/integration_suite_test.go index 1a24d89321..c71ffa2a9c 100644 --- a/inabox/tests/integration_suite_test.go +++ b/inabox/tests/integration_suite_test.go @@ -52,7 +52,7 @@ var ( retrievalClientV2 clientsv2.RetrievalClient numConfirmations int = 3 numRetries = 0 - chainReader core.Reader + chainReader core.Reader cancel context.CancelFunc ) From 4ed780c358a5d35d5c06a472787f2b62bb07aa1c Mon Sep 17 00:00:00 2001 From: litt3 <102969658+litt3@users.noreply.github.com> Date: Tue, 25 Feb 2025 15:27:06 -0500 Subject: [PATCH 04/13] Iterate on concurrency logic Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com> --- api/clients/v2/relay/key_lock.go | 34 ++++++++++++++ api/clients/v2/relay_client.go | 78 +++++++++++++++++--------------- 2 files changed, 76 insertions(+), 36 deletions(-) create mode 100644 api/clients/v2/relay/key_lock.go diff --git a/api/clients/v2/relay/key_lock.go b/api/clients/v2/relay/key_lock.go new file mode 100644 index 0000000000..86b0f4e43d --- /dev/null +++ b/api/clients/v2/relay/key_lock.go @@ -0,0 +1,34 @@ +package relay + +import ( + "sync" +) + +// KeyLock is a utility that provides a way to lock access to a given key of type T +// +// This utility is useful in situations where you want to synchronize operations for something that doesn't exist +// in a concrete form. For example, perhaps you only want to create connections with a given peer on a single +// thread of execution, but the new peer could appear simultaneously in concurrent operations. This utility allows +// the first thread which encounters the new peer to perform necessary initialization tasks, and store generated +// artifacts in a central location for subsequent callers to access. +type KeyLock[T comparable] struct { + // Map from key T to a mutex that corresponds to that key + keyMutexMap map[T]*sync.Mutex + // Used to lock access to the keyMutexMap, so that only a single mutex is created for each key + globalMutex sync.Mutex +} + +// AcquireKeyLock acquires an exclusive lock on a conceptual key, and returns a function to release the lock +func (kl *KeyLock[T]) AcquireKeyLock(key T) func() { + // we must globally synchronize access to the mutex map, so that only a single mutex will be created for a given key + kl.globalMutex.Lock() + keyMutex, valueAlreadyExists := kl.keyMutexMap[key] + if !valueAlreadyExists { + keyMutex = &sync.Mutex{} + kl.keyMutexMap[key] = keyMutex + } + kl.globalMutex.Unlock() + + keyMutex.Lock() + return keyMutex.Unlock +} diff --git a/api/clients/v2/relay_client.go b/api/clients/v2/relay_client.go index 594e19a7ca..8e979290ee 100644 --- a/api/clients/v2/relay_client.go +++ b/api/clients/v2/relay_client.go @@ -57,10 +57,11 @@ type RelayClient interface { type relayClient struct { logger logging.Logger config *RelayClientConfig - // initOnce is used to ensure that the connection to each relay is initialized only once - initOnce map[corev2.RelayKey]*sync.Once - // initOnceMutex protects access to the initOnce map - initOnceMutex sync.Mutex + // relayLockProvider provides locks that correspond to individual relay keys + relayLockProvider relay.KeyLock[corev2.RelayKey] + // relayInitializationStatus maps relay key to a bool `map[corev2.RelayKey]bool` + // the boolean value indicates whether the connection to that relay has been initialized + relayInitializationStatus sync.Map // clientConnections maps relay key to the gRPC connection: `map[corev2.RelayKey]*grpc.ClientConn` // this map is maintained so that connections can be closed in Close clientConnections sync.Map @@ -73,8 +74,8 @@ type relayClient struct { var _ RelayClient = (*relayClient)(nil) -// NewRelayClient creates a new RelayClient that connects to the relays specified in the config. -// It keeps a connection to each relay and reuses it for subsequent requests, and the connection is lazily instantiated. +// NewRelayClient creates a new RelayClient. It keeps a connection to each relay and reuses it for subsequent requests, +// and the connection is lazily instantiated. func NewRelayClient( config *RelayClientConfig, logger logging.Logger, @@ -243,39 +244,44 @@ func (c *relayClient) getClient(ctx context.Context, key corev2.RelayKey) (relay // initOnceGrpcConnection initializes the GRPC connection for a given relay, and is guaranteed to only do perform // the initialization once per relay. func (c *relayClient) initOnceGrpcConnection(ctx context.Context, key corev2.RelayKey) error { - // we must use a mutex here instead of a sync.Map, because this method could be called concurrently, and if - // two concurrent calls tried to `LoadOrStore` from a sync.Map at the same time, it's possible they would - // each create a unique sync.Once object, and perform duplicate initialization - c.initOnceMutex.Lock() - once, ok := c.initOnce[key] - if !ok { - once = &sync.Once{} - c.initOnce[key] = once + _, alreadyInitialized := c.relayInitializationStatus.Load(key) + if alreadyInitialized { + // this is the standard case, where the grpc connection has already been initialized + return nil } - c.initOnceMutex.Unlock() - - // TODO (litt3): should we implement a way to rebuild connections that break, or fail to initialize? as it currently - // stands, if this initialization fails, or a connection breaks, the relay client will never speak to that relay again - var initErr error - once.Do( - func() { - relayUrl, err := c.relayUrlProvider.GetRelayUrl(ctx, key) - if err != nil { - initErr = fmt.Errorf("get relay url for key %d: %w", key, err) - return - } - dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag, c.config.MaxGRPCMessageSize) - conn, err := grpc.NewClient(relayUrl, dialOptions...) - if err != nil { - initErr = fmt.Errorf("create grpc client for key %d: %w", key, err) - return - } - c.clientConnections.Store(key, conn) - c.grpcRelayClients.Store(key, relaygrpc.NewRelayClient(conn)) - }) + // In cases were the value hasn't already been initialized, we must acquire a conceptual lock on the relay in + // question. This allows us to guarantee that a connection with a given relay is only initialized a single time + releaseMemberLock := c.relayLockProvider.AcquireKeyLock(key) + defer releaseMemberLock() - return initErr + _, alreadyInitialized = c.relayInitializationStatus.Load(key) + if alreadyInitialized { + // If we find that the connection was initialized in the time it took to acquire a conceptual lock on the relay, + // that means that a different caller did the necessary work already + return nil + } + + // TODO (litt3): storing `true` for key immediately mirrors the previous implementation, where a failed init + // is NOT retried, and the connection to that relay will just be broken forever. Consider implementing + // logic to retry initialization after a period of time in case of failure + c.relayInitializationStatus.Store(key, true) + + relayUrl, err := c.relayUrlProvider.GetRelayUrl(ctx, key) + if err != nil { + return fmt.Errorf("get relay url for key %d: %w", key, err) + } + + dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag, c.config.MaxGRPCMessageSize) + conn, err := grpc.NewClient(relayUrl, dialOptions...) + if err != nil { + return fmt.Errorf("create grpc client for key %d: %w", key, err) + + } + c.clientConnections.Store(key, conn) + c.grpcRelayClients.Store(key, relaygrpc.NewRelayClient(conn)) + + return nil } func (c *relayClient) Close() error { From ef66f7574fe7f5a80fb218babf1b6df036964043 Mon Sep 17 00:00:00 2001 From: litt3 <102969658+litt3@users.noreply.github.com> Date: Tue, 25 Feb 2025 15:39:48 -0500 Subject: [PATCH 05/13] Fix tests Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com> --- api/clients/v2/relay/test_relay_url_provider.go | 6 ++++++ api/clients/v2/relay_payload_retriever.go | 4 ++-- node/node_v2_test.go | 4 +--- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/api/clients/v2/relay/test_relay_url_provider.go b/api/clients/v2/relay/test_relay_url_provider.go index dab6a22e07..51f0d88e8c 100644 --- a/api/clients/v2/relay/test_relay_url_provider.go +++ b/api/clients/v2/relay/test_relay_url_provider.go @@ -15,6 +15,12 @@ type TestRelayUrlProvider struct { var _ RelayUrlProvider = &TestRelayUrlProvider{} +func NewTestRelayUrlProvider() *TestRelayUrlProvider { + return &TestRelayUrlProvider{ + urlMap: make(map[v2.RelayKey]string), + } +} + func (rup *TestRelayUrlProvider) GetRelayUrl(_ context.Context, relayKey v2.RelayKey) (string, error) { return rup.urlMap[relayKey], nil } diff --git a/api/clients/v2/relay_payload_retriever.go b/api/clients/v2/relay_payload_retriever.go index 2ec2b3c483..3abfa8fb8c 100644 --- a/api/clients/v2/relay_payload_retriever.go +++ b/api/clients/v2/relay_payload_retriever.go @@ -42,9 +42,9 @@ func BuildRelayPayloadRetriever( return nil, fmt.Errorf("check and set RelayPayloadRetrieverConfig config: %w", err) } - testRelayUrlProvider := relay.TestRelayUrlProvider{} + testRelayUrlProvider := relay.NewTestRelayUrlProvider() - relayClient, err := NewRelayClient(relayClientConfig, log, &testRelayUrlProvider) + relayClient, err := NewRelayClient(relayClientConfig, log, testRelayUrlProvider) if err != nil { return nil, fmt.Errorf("new relay client: %w", err) } diff --git a/node/node_v2_test.go b/node/node_v2_test.go index e29d198897..821eac44d8 100644 --- a/node/node_v2_test.go +++ b/node/node_v2_test.go @@ -252,7 +252,7 @@ func TestRefreshOnchainStateSuccess(t *testing.T) { c.node.Config.EnableV2 = true c.node.Config.OnchainStateRefreshInterval = time.Millisecond - relayUrlProvider := &relay.TestRelayUrlProvider{} + relayUrlProvider := relay.NewTestRelayUrlProvider() relayUrlProvider.StoreRelayUrl(0, "http://localhost:8080") messageSigner := func(ctx context.Context, data [32]byte) (*core.Signature, error) { @@ -298,8 +298,6 @@ func TestRefreshOnchainStateSuccess(t *testing.T) { bp, ok = c.node.BlobVersionParams.Load().Get(1) require.True(t, ok) require.Equal(t, bp, blobParams2) - newRelayClient := c.node.RelayClient.Load().(clients.RelayClient) - require.NotSame(t, relayClient, newRelayClient) } func bundleEqual(t *testing.T, expected, actual core.Bundle) { From b5b13038441458b4f6b72045fd50f0217f879d20 Mon Sep 17 00:00:00 2001 From: litt3 <102969658+litt3@users.noreply.github.com> Date: Tue, 25 Feb 2025 16:38:24 -0500 Subject: [PATCH 06/13] Add test for KeyLock Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com> --- api/clients/v2/relay/key_lock.go | 7 +++ api/clients/v2/relay/key_lock_test.go | 63 +++++++++++++++++++++++++++ api/clients/v2/relay_client.go | 3 +- 3 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 api/clients/v2/relay/key_lock_test.go diff --git a/api/clients/v2/relay/key_lock.go b/api/clients/v2/relay/key_lock.go index 86b0f4e43d..954abb4fb1 100644 --- a/api/clients/v2/relay/key_lock.go +++ b/api/clients/v2/relay/key_lock.go @@ -18,6 +18,13 @@ type KeyLock[T comparable] struct { globalMutex sync.Mutex } +// NewKeyLock constructs a KeyLock utility +func NewKeyLock[T comparable]() *KeyLock[T] { + return &KeyLock[T]{ + keyMutexMap: make(map[T]*sync.Mutex), + } +} + // AcquireKeyLock acquires an exclusive lock on a conceptual key, and returns a function to release the lock func (kl *KeyLock[T]) AcquireKeyLock(key T) func() { // we must globally synchronize access to the mutex map, so that only a single mutex will be created for a given key diff --git a/api/clients/v2/relay/key_lock_test.go b/api/clients/v2/relay/key_lock_test.go new file mode 100644 index 0000000000..51703dbc20 --- /dev/null +++ b/api/clients/v2/relay/key_lock_test.go @@ -0,0 +1,63 @@ +package relay + +import ( + "sync" + "sync/atomic" + "testing" + + "github.com/Layr-Labs/eigenda/common/testutils/random" + "github.com/stretchr/testify/require" +) + +func TestKeyLock(t *testing.T) { + // test in a field of 100 unique keys + keyCount := 100 + + // keep an atomic count, and a non-atomic count for each key + // the atomic count can be used at the end of the test, to make sure that the non-atomic count was handled correctly + atomicKeyAccessCounts := make([]atomic.Uint32, keyCount) + nonAtomicKeyAccessCounts := make([]uint32, keyCount) + for i := 0; i < keyCount; i++ { + atomicKeyAccessCounts = append(atomicKeyAccessCounts, atomic.Uint32{}) + nonAtomicKeyAccessCounts = append(nonAtomicKeyAccessCounts, uint32(0)) + } + + keyLock := NewKeyLock[uint32]() + + var waitGroup sync.WaitGroup + + targetValue := uint32(1000) + worker := func() { + workerRandom := random.NewTestRandom() + + for { + // randomly select a key to access + keyToAccess := uint32(workerRandom.Intn(keyCount)) + newValue := atomicKeyAccessCounts[keyToAccess].Add(1) + + unlock := keyLock.AcquireKeyLock(keyToAccess) + // increment the non-atomic count after acquiring access + // if the access controls are working correctly, this is a safe operation + nonAtomicKeyAccessCounts[keyToAccess] = nonAtomicKeyAccessCounts[keyToAccess] + 1 + unlock() + + // each worker stops looping after it sees a counter that has increased to targetValue + if newValue >= targetValue { + break + } + } + + waitGroup.Done() + } + + // start up 100 concurrent workers + for i := 0; i < 100; i++ { + waitGroup.Add(1) + go worker() + } + waitGroup.Wait() + + for i := 0; i < keyCount; i++ { + require.Equal(t, atomicKeyAccessCounts[i].Load(), nonAtomicKeyAccessCounts[i]) + } +} diff --git a/api/clients/v2/relay_client.go b/api/clients/v2/relay_client.go index 8e979290ee..34e5ae8a5a 100644 --- a/api/clients/v2/relay_client.go +++ b/api/clients/v2/relay_client.go @@ -58,7 +58,7 @@ type relayClient struct { logger logging.Logger config *RelayClientConfig // relayLockProvider provides locks that correspond to individual relay keys - relayLockProvider relay.KeyLock[corev2.RelayKey] + relayLockProvider *relay.KeyLock[corev2.RelayKey] // relayInitializationStatus maps relay key to a bool `map[corev2.RelayKey]bool` // the boolean value indicates whether the connection to that relay has been initialized relayInitializationStatus sync.Map @@ -95,6 +95,7 @@ func NewRelayClient( return &relayClient{ config: config, logger: logger.With("component", "RelayClient"), + relayLockProvider: relay.NewKeyLock[corev2.RelayKey](), relayUrlProvider: relayUrlProvider, }, nil } From 4608ae5ea0f22a3b52d22af03efd2d309f6b7026 Mon Sep 17 00:00:00 2001 From: litt3 <102969658+litt3@users.noreply.github.com> Date: Tue, 25 Feb 2025 16:51:39 -0500 Subject: [PATCH 07/13] Fix improper test struct usage Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com> --- api/clients/v2/relay_client.go | 4 ++-- api/clients/v2/relay_payload_retriever.go | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/api/clients/v2/relay_client.go b/api/clients/v2/relay_client.go index 34e5ae8a5a..aed0c8035b 100644 --- a/api/clients/v2/relay_client.go +++ b/api/clients/v2/relay_client.go @@ -242,7 +242,7 @@ func (c *relayClient) getClient(ctx context.Context, key corev2.RelayKey) (relay return client, nil } -// initOnceGrpcConnection initializes the GRPC connection for a given relay, and is guaranteed to only do perform +// initOnceGrpcConnection initializes the GRPC connection for a given relay, and is guaranteed to only perform // the initialization once per relay. func (c *relayClient) initOnceGrpcConnection(ctx context.Context, key corev2.RelayKey) error { _, alreadyInitialized := c.relayInitializationStatus.Load(key) @@ -263,7 +263,7 @@ func (c *relayClient) initOnceGrpcConnection(ctx context.Context, key corev2.Rel return nil } - // TODO (litt3): storing `true` for key immediately mirrors the previous implementation, where a failed init + // TODO (litt3): immediately storing `true` for key mirrors the previous implementation, where a failed init // is NOT retried, and the connection to that relay will just be broken forever. Consider implementing // logic to retry initialization after a period of time in case of failure c.relayInitializationStatus.Store(key, true) diff --git a/api/clients/v2/relay_payload_retriever.go b/api/clients/v2/relay_payload_retriever.go index 3abfa8fb8c..d86f301a27 100644 --- a/api/clients/v2/relay_payload_retriever.go +++ b/api/clients/v2/relay_payload_retriever.go @@ -35,6 +35,7 @@ func BuildRelayPayloadRetriever( log logging.Logger, relayPayloadRetrieverConfig RelayPayloadRetrieverConfig, relayClientConfig *RelayClientConfig, + relayUrlProvider relay.RelayUrlProvider, g1Srs []bn254.G1Affine) (*RelayPayloadRetriever, error) { err := relayPayloadRetrieverConfig.checkAndSetDefaults() @@ -42,9 +43,7 @@ func BuildRelayPayloadRetriever( return nil, fmt.Errorf("check and set RelayPayloadRetrieverConfig config: %w", err) } - testRelayUrlProvider := relay.NewTestRelayUrlProvider() - - relayClient, err := NewRelayClient(relayClientConfig, log, testRelayUrlProvider) + relayClient, err := NewRelayClient(relayClientConfig, log, relayUrlProvider) if err != nil { return nil, fmt.Errorf("new relay client: %w", err) } From e172aed8d0d7ac2d1d5fd450908d8a476c182dcf Mon Sep 17 00:00:00 2001 From: litt3 <102969658+litt3@users.noreply.github.com> Date: Wed, 26 Feb 2025 08:42:37 -0500 Subject: [PATCH 08/13] Tweak formatting Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com> --- api/clients/v2/relay_client.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/api/clients/v2/relay_client.go b/api/clients/v2/relay_client.go index aed0c8035b..9f8cacc910 100644 --- a/api/clients/v2/relay_client.go +++ b/api/clients/v2/relay_client.go @@ -93,10 +93,10 @@ func NewRelayClient( logger.Info("creating relay client") return &relayClient{ - config: config, - logger: logger.With("component", "RelayClient"), + config: config, + logger: logger.With("component", "RelayClient"), relayLockProvider: relay.NewKeyLock[corev2.RelayKey](), - relayUrlProvider: relayUrlProvider, + relayUrlProvider: relayUrlProvider, }, nil } @@ -253,8 +253,8 @@ func (c *relayClient) initOnceGrpcConnection(ctx context.Context, key corev2.Rel // In cases were the value hasn't already been initialized, we must acquire a conceptual lock on the relay in // question. This allows us to guarantee that a connection with a given relay is only initialized a single time - releaseMemberLock := c.relayLockProvider.AcquireKeyLock(key) - defer releaseMemberLock() + releaseKeyLock := c.relayLockProvider.AcquireKeyLock(key) + defer releaseKeyLock() _, alreadyInitialized = c.relayInitializationStatus.Load(key) if alreadyInitialized { From 0c3d309fea889a314e2736e34365ad46b64651dd Mon Sep 17 00:00:00 2001 From: litt3 <102969658+litt3@users.noreply.github.com> Date: Wed, 26 Feb 2025 12:11:03 -0500 Subject: [PATCH 09/13] Fix assignment bug Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com> --- inabox/tests/integration_suite_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/inabox/tests/integration_suite_test.go b/inabox/tests/integration_suite_test.go index c71ffa2a9c..bdc223d61d 100644 --- a/inabox/tests/integration_suite_test.go +++ b/inabox/tests/integration_suite_test.go @@ -219,7 +219,11 @@ func setupRetrievalClient(testConfig *deploy.Config) error { if err != nil { return err } - chainReader, err := eth.NewReader(logger, ethClient, testConfig.Retriever.RETRIEVER_BLS_OPERATOR_STATE_RETRIVER, testConfig.Retriever.RETRIEVER_EIGENDA_SERVICE_MANAGER) + chainReader, err = eth.NewReader( + logger, + ethClient, + testConfig.Retriever.RETRIEVER_BLS_OPERATOR_STATE_RETRIVER, + testConfig.Retriever.RETRIEVER_EIGENDA_SERVICE_MANAGER) if err != nil { return err } From 1872c24ec84654588667498ef66696f327aa4a3b Mon Sep 17 00:00:00 2001 From: litt3 <102969658+litt3@users.noreply.github.com> Date: Wed, 26 Feb 2025 15:15:35 -0500 Subject: [PATCH 10/13] Add additional method doc Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com> --- api/clients/v2/relay/key_lock.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/api/clients/v2/relay/key_lock.go b/api/clients/v2/relay/key_lock.go index 954abb4fb1..1e8c19bbb2 100644 --- a/api/clients/v2/relay/key_lock.go +++ b/api/clients/v2/relay/key_lock.go @@ -26,6 +26,9 @@ func NewKeyLock[T comparable]() *KeyLock[T] { } // AcquireKeyLock acquires an exclusive lock on a conceptual key, and returns a function to release the lock +// +// The caller MUST eventually invoke the returned unlock function, or all future calls with the same key will block +// indefinitely func (kl *KeyLock[T]) AcquireKeyLock(key T) func() { // we must globally synchronize access to the mutex map, so that only a single mutex will be created for a given key kl.globalMutex.Lock() From 8415ecf3844422dfb24ea8c8bdb82efee6b5de68 Mon Sep 17 00:00:00 2001 From: litt3 <102969658+litt3@users.noreply.github.com> Date: Thu, 27 Feb 2025 08:37:29 -0500 Subject: [PATCH 11/13] Tweak order of operations, to retry upon failure Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com> --- api/clients/v2/relay_client.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/api/clients/v2/relay_client.go b/api/clients/v2/relay_client.go index 9f8cacc910..127748f7d3 100644 --- a/api/clients/v2/relay_client.go +++ b/api/clients/v2/relay_client.go @@ -242,8 +242,8 @@ func (c *relayClient) getClient(ctx context.Context, key corev2.RelayKey) (relay return client, nil } -// initOnceGrpcConnection initializes the GRPC connection for a given relay, and is guaranteed to only perform -// the initialization once per relay. +// initOnceGrpcConnection initializes the GRPC connection for a given relay, and is guaranteed to only be completed +// once per relay. If initialization fails, it will be retried by the next caller. func (c *relayClient) initOnceGrpcConnection(ctx context.Context, key corev2.RelayKey) error { _, alreadyInitialized := c.relayInitializationStatus.Load(key) if alreadyInitialized { @@ -263,11 +263,6 @@ func (c *relayClient) initOnceGrpcConnection(ctx context.Context, key corev2.Rel return nil } - // TODO (litt3): immediately storing `true` for key mirrors the previous implementation, where a failed init - // is NOT retried, and the connection to that relay will just be broken forever. Consider implementing - // logic to retry initialization after a period of time in case of failure - c.relayInitializationStatus.Store(key, true) - relayUrl, err := c.relayUrlProvider.GetRelayUrl(ctx, key) if err != nil { return fmt.Errorf("get relay url for key %d: %w", key, err) @@ -282,6 +277,9 @@ func (c *relayClient) initOnceGrpcConnection(ctx context.Context, key corev2.Rel c.clientConnections.Store(key, conn) c.grpcRelayClients.Store(key, relaygrpc.NewRelayClient(conn)) + // only set the initialization status to true if everything was successful. + c.relayInitializationStatus.Store(key, true) + return nil } From ee9b2ba64b2ce9fd1fd02e4d7b7fba128802ffd9 Mon Sep 17 00:00:00 2001 From: litt3 <102969658+litt3@users.noreply.github.com> Date: Thu, 27 Feb 2025 08:46:54 -0500 Subject: [PATCH 12/13] Change naming Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com> --- .../v2/relay/default_relay_url_provider.go | 19 ++++++++++--------- .../test_relay_url_provider.go | 5 +++-- inabox/tests/integration_v2_test.go | 2 +- node/node.go | 2 +- node/node_v2_test.go | 4 ++-- test/v2/client/test_client.go | 2 +- 6 files changed, 18 insertions(+), 16 deletions(-) rename api/clients/v2/{relay => test}/test_relay_url_provider.go (86%) diff --git a/api/clients/v2/relay/default_relay_url_provider.go b/api/clients/v2/relay/default_relay_url_provider.go index 968d4d98ed..b158e9f33c 100644 --- a/api/clients/v2/relay/default_relay_url_provider.go +++ b/api/clients/v2/relay/default_relay_url_provider.go @@ -7,35 +7,36 @@ import ( "github.com/Layr-Labs/eigenda/common" relayRegistryBindings "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDARelayRegistry" v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/ethereum/go-ethereum/accounts/abi/bind" gethcommon "github.com/ethereum/go-ethereum/common" ) -// DefaultRelayUrlProvider provides relay URL strings, based on relay key. -type DefaultRelayUrlProvider struct { +// relayUrlProvider provides relay URL strings, based on relay key. +type relayUrlProvider struct { relayRegistryCaller *relayRegistryBindings.ContractEigenDARelayRegistryCaller } -var _ RelayUrlProvider = &DefaultRelayUrlProvider{} +var _ RelayUrlProvider = &relayUrlProvider{} -// NewDefaultRelayUrlProvider constructs a DefaultRelayUrlProvider -func NewDefaultRelayUrlProvider( +// NewRelayUrlProvider constructs a relayUrlProvider +func NewRelayUrlProvider( ethClient common.EthClient, relayRegistryAddress gethcommon.Address, -) (*DefaultRelayUrlProvider, error) { +) (RelayUrlProvider, error) { relayRegistryContractCaller, err := relayRegistryBindings.NewContractEigenDARelayRegistryCaller( relayRegistryAddress, ethClient) if err != nil { return nil, fmt.Errorf("NewContractEigenDARelayRegistryCaller: %w", err) } - return &DefaultRelayUrlProvider{ + return &relayUrlProvider{ relayRegistryCaller: relayRegistryContractCaller, }, nil } // GetRelayUrl gets the URL string for a given relayKey -func (rup *DefaultRelayUrlProvider) GetRelayUrl(ctx context.Context, relayKey v2.RelayKey) (string, error) { +func (rup *relayUrlProvider) GetRelayUrl(ctx context.Context, relayKey v2.RelayKey) (string, error) { relayUrl, err := rup.relayRegistryCaller.RelayKeyToUrl(&bind.CallOpts{Context: ctx}, relayKey) if err != nil { return "", fmt.Errorf("fetch relay key (%d) URL from EigenDARelayRegistry contract: %w", relayKey, err) @@ -45,7 +46,7 @@ func (rup *DefaultRelayUrlProvider) GetRelayUrl(ctx context.Context, relayKey v2 } // GetRelayCount gets the number of relays that exist in the registry -func (rup *DefaultRelayUrlProvider) GetRelayCount(ctx context.Context) (uint32, error) { +func (rup *relayUrlProvider) GetRelayCount(ctx context.Context) (uint32, error) { // NextRelayKey initializes to 0, and is incremented each time a relay is added // current logic doesn't support removing relays, so NextRelayKey therefore corresponds directly to relay count relayCount, err := rup.relayRegistryCaller.NextRelayKey(&bind.CallOpts{Context: ctx}) diff --git a/api/clients/v2/relay/test_relay_url_provider.go b/api/clients/v2/test/test_relay_url_provider.go similarity index 86% rename from api/clients/v2/relay/test_relay_url_provider.go rename to api/clients/v2/test/test_relay_url_provider.go index 51f0d88e8c..f7a714c850 100644 --- a/api/clients/v2/relay/test_relay_url_provider.go +++ b/api/clients/v2/test/test_relay_url_provider.go @@ -1,8 +1,9 @@ -package relay +package test import ( "context" + "github.com/Layr-Labs/eigenda/api/clients/v2/relay" v2 "github.com/Layr-Labs/eigenda/core/v2" ) @@ -13,7 +14,7 @@ type TestRelayUrlProvider struct { urlMap map[v2.RelayKey]string } -var _ RelayUrlProvider = &TestRelayUrlProvider{} +var _ relay.RelayUrlProvider = &TestRelayUrlProvider{} func NewTestRelayUrlProvider() *TestRelayUrlProvider { return &TestRelayUrlProvider{ diff --git a/inabox/tests/integration_v2_test.go b/inabox/tests/integration_v2_test.go index cfe1ba01e8..bb19a1628f 100644 --- a/inabox/tests/integration_v2_test.go +++ b/inabox/tests/integration_v2_test.go @@ -213,7 +213,7 @@ var _ = Describe("Inabox v2 Integration", func() { MaxGRPCMessageSize: units.GiB, } - relayUrlProvider, err := relay.NewDefaultRelayUrlProvider(ethClient, chainReader.GetRelayRegistryAddress()) + relayUrlProvider, err := relay.NewRelayUrlProvider(ethClient, chainReader.GetRelayRegistryAddress()) Expect(err).To(BeNil()) // Test retrieval from relay diff --git a/node/node.go b/node/node.go index 1285674ce3..c54835e7b9 100644 --- a/node/node.go +++ b/node/node.go @@ -244,7 +244,7 @@ func NewNode( MaxGRPCMessageSize: n.Config.RelayMaxMessageSize, } - relayUrlProvider, err := relay.NewDefaultRelayUrlProvider(client, tx.GetRelayRegistryAddress()) + relayUrlProvider, err := relay.NewRelayUrlProvider(client, tx.GetRelayRegistryAddress()) if err != nil { return nil, fmt.Errorf("create relay url provider: %w", err) } diff --git a/node/node_v2_test.go b/node/node_v2_test.go index 821eac44d8..47a0cec1f6 100644 --- a/node/node_v2_test.go +++ b/node/node_v2_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - "github.com/Layr-Labs/eigenda/api/clients/v2/relay" + "github.com/Layr-Labs/eigenda/api/clients/v2/test" "github.com/docker/go-units" "github.com/Layr-Labs/eigenda/api/clients/v2" @@ -252,7 +252,7 @@ func TestRefreshOnchainStateSuccess(t *testing.T) { c.node.Config.EnableV2 = true c.node.Config.OnchainStateRefreshInterval = time.Millisecond - relayUrlProvider := relay.NewTestRelayUrlProvider() + relayUrlProvider := test.NewTestRelayUrlProvider() relayUrlProvider.StoreRelayUrl(0, "http://localhost:8080") messageSigner := func(ctx context.Context, data [32]byte) (*core.Signature, error) { diff --git a/test/v2/client/test_client.go b/test/v2/client/test_client.go index 8cb589cf7f..01f57cd3b2 100644 --- a/test/v2/client/test_client.go +++ b/test/v2/client/test_client.go @@ -205,7 +205,7 @@ func NewTestClient( MessageSigner: fakeSigner, } - relayUrlProvider, err := relayv2.NewDefaultRelayUrlProvider(ethClient, ethReader.GetRelayRegistryAddress()) + relayUrlProvider, err := relayv2.NewRelayUrlProvider(ethClient, ethReader.GetRelayRegistryAddress()) if err != nil { return nil, fmt.Errorf("create relay url provider: %w", err) } From 31de2cdacabd859a27e98a358843a443e6ceae59 Mon Sep 17 00:00:00 2001 From: litt3 <102969658+litt3@users.noreply.github.com> Date: Thu, 27 Feb 2025 12:26:51 -0500 Subject: [PATCH 13/13] Combine interface and default impl into one file Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com> --- .../v2/relay/default_relay_url_provider.go | 58 ------------------- api/clients/v2/relay/relay_url_provider.go | 51 ++++++++++++++++ 2 files changed, 51 insertions(+), 58 deletions(-) delete mode 100644 api/clients/v2/relay/default_relay_url_provider.go diff --git a/api/clients/v2/relay/default_relay_url_provider.go b/api/clients/v2/relay/default_relay_url_provider.go deleted file mode 100644 index b158e9f33c..0000000000 --- a/api/clients/v2/relay/default_relay_url_provider.go +++ /dev/null @@ -1,58 +0,0 @@ -package relay - -import ( - "context" - "fmt" - - "github.com/Layr-Labs/eigenda/common" - relayRegistryBindings "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDARelayRegistry" - v2 "github.com/Layr-Labs/eigenda/core/v2" - - "github.com/ethereum/go-ethereum/accounts/abi/bind" - gethcommon "github.com/ethereum/go-ethereum/common" -) - -// relayUrlProvider provides relay URL strings, based on relay key. -type relayUrlProvider struct { - relayRegistryCaller *relayRegistryBindings.ContractEigenDARelayRegistryCaller -} - -var _ RelayUrlProvider = &relayUrlProvider{} - -// NewRelayUrlProvider constructs a relayUrlProvider -func NewRelayUrlProvider( - ethClient common.EthClient, - relayRegistryAddress gethcommon.Address, -) (RelayUrlProvider, error) { - relayRegistryContractCaller, err := relayRegistryBindings.NewContractEigenDARelayRegistryCaller( - relayRegistryAddress, ethClient) - if err != nil { - return nil, fmt.Errorf("NewContractEigenDARelayRegistryCaller: %w", err) - } - - return &relayUrlProvider{ - relayRegistryCaller: relayRegistryContractCaller, - }, nil -} - -// GetRelayUrl gets the URL string for a given relayKey -func (rup *relayUrlProvider) GetRelayUrl(ctx context.Context, relayKey v2.RelayKey) (string, error) { - relayUrl, err := rup.relayRegistryCaller.RelayKeyToUrl(&bind.CallOpts{Context: ctx}, relayKey) - if err != nil { - return "", fmt.Errorf("fetch relay key (%d) URL from EigenDARelayRegistry contract: %w", relayKey, err) - } - - return relayUrl, nil -} - -// GetRelayCount gets the number of relays that exist in the registry -func (rup *relayUrlProvider) GetRelayCount(ctx context.Context) (uint32, error) { - // NextRelayKey initializes to 0, and is incremented each time a relay is added - // current logic doesn't support removing relays, so NextRelayKey therefore corresponds directly to relay count - relayCount, err := rup.relayRegistryCaller.NextRelayKey(&bind.CallOpts{Context: ctx}) - if err != nil { - return 0, fmt.Errorf("get next relay key from EigenDARelayRegistry contract: %w", err) - } - - return relayCount, nil -} diff --git a/api/clients/v2/relay/relay_url_provider.go b/api/clients/v2/relay/relay_url_provider.go index 0fe638eec0..b5d6220ef7 100644 --- a/api/clients/v2/relay/relay_url_provider.go +++ b/api/clients/v2/relay/relay_url_provider.go @@ -2,8 +2,14 @@ package relay import ( "context" + "fmt" + "github.com/Layr-Labs/eigenda/common" + relayRegistryBindings "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDARelayRegistry" v2 "github.com/Layr-Labs/eigenda/core/v2" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + gethcommon "github.com/ethereum/go-ethereum/common" ) // RelayUrlProvider provides relay URL strings, based on relay key @@ -13,3 +19,48 @@ type RelayUrlProvider interface { // GetRelayCount returns the number of relays in the registry GetRelayCount(ctx context.Context) (uint32, error) } + +// relayUrlProvider provides relay URL strings, based on relay key. +type relayUrlProvider struct { + relayRegistryCaller *relayRegistryBindings.ContractEigenDARelayRegistryCaller +} + +var _ RelayUrlProvider = &relayUrlProvider{} + +// NewRelayUrlProvider constructs a relayUrlProvider +func NewRelayUrlProvider( + ethClient common.EthClient, + relayRegistryAddress gethcommon.Address, +) (RelayUrlProvider, error) { + relayRegistryContractCaller, err := relayRegistryBindings.NewContractEigenDARelayRegistryCaller( + relayRegistryAddress, ethClient) + if err != nil { + return nil, fmt.Errorf("NewContractEigenDARelayRegistryCaller: %w", err) + } + + return &relayUrlProvider{ + relayRegistryCaller: relayRegistryContractCaller, + }, nil +} + +// GetRelayUrl gets the URL string for a given relayKey +func (rup *relayUrlProvider) GetRelayUrl(ctx context.Context, relayKey v2.RelayKey) (string, error) { + relayUrl, err := rup.relayRegistryCaller.RelayKeyToUrl(&bind.CallOpts{Context: ctx}, relayKey) + if err != nil { + return "", fmt.Errorf("fetch relay key (%d) URL from EigenDARelayRegistry contract: %w", relayKey, err) + } + + return relayUrl, nil +} + +// GetRelayCount gets the number of relays that exist in the registry +func (rup *relayUrlProvider) GetRelayCount(ctx context.Context) (uint32, error) { + // NextRelayKey initializes to 0, and is incremented each time a relay is added + // current logic doesn't support removing relays, so NextRelayKey therefore corresponds directly to relay count + relayCount, err := rup.relayRegistryCaller.NextRelayKey(&bind.CallOpts{Context: ctx}) + if err != nil { + return 0, fmt.Errorf("get next relay key from EigenDARelayRegistry contract: %w", err) + } + + return relayCount, nil +}