Skip to content

Commit

Permalink
Draft usage of relay url provider
Browse files Browse the repository at this point in the history
Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com>
  • Loading branch information
litt3 committed Feb 25, 2025
1 parent 6b551f8 commit c44fce7
Show file tree
Hide file tree
Showing 16 changed files with 260 additions and 349 deletions.
56 changes: 56 additions & 0 deletions api/clients/v2/relay/default_relay_url_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package relay

import (
"context"
"fmt"

"github.com/Layr-Labs/eigenda/common"
relayRegistryBindings "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDARelayRegistry"
v2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
gethcommon "github.com/ethereum/go-ethereum/common"
)

// DefaultRelayUrlProvider provides relay URL strings, based on relay key.
type DefaultRelayUrlProvider struct {
relayRegistryCaller *relayRegistryBindings.ContractEigenDARelayRegistryCaller
}

var _ RelayUrlProvider = &DefaultRelayUrlProvider{}

// NewDefaultRelayUrlProvider constructs a DefaultRelayUrlProvider
func NewDefaultRelayUrlProvider(
ethClient common.EthClient,
relayRegistryAddress gethcommon.Address,
) (*DefaultRelayUrlProvider, error) {
relayRegistryContractCaller, err := relayRegistryBindings.NewContractEigenDARelayRegistryCaller(
relayRegistryAddress,
ethClient)
if err != nil {
return nil, fmt.Errorf("NewContractEigenDARelayRegistryCaller: %w", err)
}

return &DefaultRelayUrlProvider{
relayRegistryCaller: relayRegistryContractCaller,
}, nil
}

// GetRelayUrl gets the URL string for a given relayKey
func (rup *DefaultRelayUrlProvider) GetRelayUrl(ctx context.Context, relayKey v2.RelayKey) (string, error) {
relayUrl, err := rup.relayRegistryCaller.RelayKeyToUrl(&bind.CallOpts{Context: ctx}, relayKey)
if err != nil {
return "", fmt.Errorf("fetch relay key (%d) URL from EigenDARelayRegistry contract: %w", relayKey, err)
}

return relayUrl, nil
}

// GetRelayCount gets the number of relays that exist in the registry
func (rup *DefaultRelayUrlProvider) GetRelayCount(ctx context.Context) (uint32, error) {
relayCount, err := rup.relayRegistryCaller.NextRelayKey(&bind.CallOpts{Context: ctx})
if err != nil {
return 0, fmt.Errorf("get next relay key from EigenDARelayRegistry contract: %w", err)
}

return relayCount, nil
}
15 changes: 15 additions & 0 deletions api/clients/v2/relay/relay_url_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package relay

import (
"context"

v2 "github.com/Layr-Labs/eigenda/core/v2"
)

// RelayUrlProvider provides relay URL strings, based on relay key
type RelayUrlProvider interface {
// GetRelayUrl gets the URL string for a given relayKey
GetRelayUrl(ctx context.Context, relayKey v2.RelayKey) (string, error)
// GetRelayCount returns the number of relays in the registry
GetRelayCount(ctx context.Context) (uint32, error)
}
28 changes: 28 additions & 0 deletions api/clients/v2/relay/test_relay_url_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package relay

import (
"context"

v2 "github.com/Layr-Labs/eigenda/core/v2"
)

// TestRelayUrlProvider implements RelayUrlProvider, for test cases
//
// NOT SAFE for concurrent use
type TestRelayUrlProvider struct {
urlMap map[v2.RelayKey]string
}

var _ RelayUrlProvider = &TestRelayUrlProvider{}

func (rup *TestRelayUrlProvider) GetRelayUrl(_ context.Context, relayKey v2.RelayKey) (string, error) {
return rup.urlMap[relayKey], nil
}

func (rup *TestRelayUrlProvider) GetRelayCount(_ context.Context) (uint32, error) {
return uint32(len(rup.urlMap)), nil
}

func (rup *TestRelayUrlProvider) StoreRelayUrl(relayKey v2.RelayKey, url string) {
rup.urlMap[relayKey] = url
}
165 changes: 90 additions & 75 deletions api/clients/v2/relay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sync"

"github.com/Layr-Labs/eigenda/api/clients/v2/relay"
relaygrpc "github.com/Layr-Labs/eigenda/api/grpc/relay"
"github.com/Layr-Labs/eigenda/api/hashing"
"github.com/Layr-Labs/eigenda/core"
Expand All @@ -19,7 +20,6 @@ import (
type MessageSigner func(ctx context.Context, data [32]byte) (*core.Signature, error)

type RelayClientConfig struct {
Sockets map[corev2.RelayKey]string
UseSecureGrpcFlag bool
MaxGRPCMessageSize uint
OperatorID *core.OperatorID
Expand Down Expand Up @@ -48,56 +48,60 @@ type RelayClient interface {
// The returned slice has the same length and ordering as the input slice, and the i-th element is the bundle for the i-th request.
// Each bundle is a sequence of frames in raw form (i.e., serialized core.Bundle bytearray).
GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByIndex) ([][]byte, error)
// GetSockets returns the relay sockets
GetSockets() map[corev2.RelayKey]string
Close() error
}

// relayClient is a client for the entire relay subsystem.
//
// It is a wrapper around a collection of grpc relay clients, which are used to interact with individual relays.
type relayClient struct {
config *RelayClientConfig

// initOnce is used to ensure that the connection to each relay is initialized only once.
// It maps relay key to a sync.Once instance: `map[corev2.RelayKey]*sync.Once`
initOnce *sync.Map
// conns maps relay key to the gRPC connection: `map[corev2.RelayKey]*grpc.ClientConn`
conns sync.Map
logger logging.Logger

// grpcClients maps relay key to the gRPC client: `map[corev2.RelayKey]relaygrpc.RelayClient`
grpcClients sync.Map
config *RelayClientConfig
// initOnce is used to ensure that the connection to each relay is initialized only once
initOnce map[corev2.RelayKey]*sync.Once
// initOnceMutex protects access to the initOnce map
initOnceMutex sync.Mutex
// clientConnections maps relay key to the gRPC connection: `map[corev2.RelayKey]*grpc.ClientConn`
// this map is maintained so that connections can be closed in Close
clientConnections sync.Map
// grpcRelayClients maps relay key to the gRPC client: `map[corev2.RelayKey]relaygrpc.RelayClient`
// these grpc relay clients are used to communicate with individual relays
grpcRelayClients sync.Map
// relayUrlProvider knows how to retrieve the relay URLs, and maintains an internal URL cache
relayUrlProvider relay.RelayUrlProvider
}

var _ RelayClient = (*relayClient)(nil)

// NewRelayClient creates a new RelayClient that connects to the relays specified in the config.
// It keeps a connection to each relay and reuses it for subsequent requests, and the connection is lazily instantiated.
func NewRelayClient(config *RelayClientConfig, logger logging.Logger) (RelayClient, error) {
func NewRelayClient(
config *RelayClientConfig,
logger logging.Logger,
relayUrlProvider relay.RelayUrlProvider,
) (RelayClient, error) {

if config == nil {
return nil, errors.New("nil config")
} else if len(config.Sockets) <= 0 {
return nil, errors.New("no relay sockets provided")
} else if config.MaxGRPCMessageSize == 0 {
}

if config.MaxGRPCMessageSize == 0 {
return nil, errors.New("max gRPC message size must be greater than 0")
}

logger.Info("creating relay client", "urls", config.Sockets)
logger.Info("creating relay client")

initOnce := sync.Map{}
for key := range config.Sockets {
initOnce.Store(key, &sync.Once{})
}
return &relayClient{
config: config,

initOnce: &initOnce,
logger: logger.With("component", "RelayClient"),
config: config,
logger: logger.With("component", "RelayClient"),
relayUrlProvider: relayUrlProvider,
}, nil
}

func (c *relayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) {
client, err := c.getClient(relayKey)
client, err := c.getClient(ctx, relayKey)
if err != nil {
return nil, err
return nil, fmt.Errorf("get grpc client for key %d: %w", relayKey, err)
}

res, err := client.GetBlob(ctx, &relaygrpc.GetBlobRequest{
Expand Down Expand Up @@ -140,9 +144,10 @@ func (c *relayClient) GetChunksByRange(
if len(requests) == 0 {
return nil, fmt.Errorf("no requests")
}
client, err := c.getClient(relayKey)

client, err := c.getClient(ctx, relayKey)
if err != nil {
return nil, err
return nil, fmt.Errorf("get grpc relay client for key %d: %w", relayKey, err)
}

grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests))
Expand Down Expand Up @@ -184,9 +189,9 @@ func (c *relayClient) GetChunksByIndex(
return nil, fmt.Errorf("no requests")
}

client, err := c.getClient(relayKey)
client, err := c.getClient(ctx, relayKey)
if err != nil {
return nil, err
return nil, fmt.Errorf("get grpc relay client for key %d: %w", relayKey, err)
}

grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests))
Expand Down Expand Up @@ -219,11 +224,12 @@ func (c *relayClient) GetChunksByIndex(
return res.GetData(), nil
}

func (c *relayClient) getClient(key corev2.RelayKey) (relaygrpc.RelayClient, error) {
if err := c.initOnceGrpcConnection(key); err != nil {
return nil, err
// getClient gets the grpc relay client, which has a connection to a given relay
func (c *relayClient) getClient(ctx context.Context, key corev2.RelayKey) (relaygrpc.RelayClient, error) {
if err := c.initOnceGrpcConnection(ctx, key); err != nil {
return nil, fmt.Errorf("init grpc connection for key %d: %w", key, err)
}
maybeClient, ok := c.grpcClients.Load(key)
maybeClient, ok := c.grpcRelayClients.Load(key)
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", key)
}
Expand All @@ -234,53 +240,62 @@ func (c *relayClient) getClient(key corev2.RelayKey) (relaygrpc.RelayClient, err
return client, nil
}

func (c *relayClient) initOnceGrpcConnection(key corev2.RelayKey) error {
var initErr error
once, ok := c.initOnce.Load(key)
// initOnceGrpcConnection initializes the GRPC connection for a given relay, and is guaranteed to only do perform
// the initialization once per relay.
func (c *relayClient) initOnceGrpcConnection(ctx context.Context, key corev2.RelayKey) error {
// we must use a mutex here instead of a sync.Map, because this method could be called concurrently, and if
// two concurrent calls tried to `LoadOrStore` from a sync.Map at the same time, it's possible they would
// each create a unique sync.Once object, and perform duplicate initialization
c.initOnceMutex.Lock()
once, ok := c.initOnce[key]
if !ok {
return fmt.Errorf("unknown relay key: %v", key)
once = &sync.Once{}
c.initOnce[key] = once
}
once.(*sync.Once).Do(func() {
socket, ok := c.config.Sockets[key]
if !ok {
initErr = fmt.Errorf("unknown relay key: %v", key)
return
}
dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag, c.config.MaxGRPCMessageSize)
conn, err := grpc.NewClient(socket, dialOptions...)
if err != nil {
initErr = err
return
}
c.conns.Store(key, conn)
c.grpcClients.Store(key, relaygrpc.NewRelayClient(conn))
})
return initErr
}
c.initOnceMutex.Unlock()

var initErr error
once.Do(
func() {
relayUrl, err := c.relayUrlProvider.GetRelayUrl(ctx, key)
if err != nil {
initErr = fmt.Errorf("get relay url for key %d: %w", key, err)
return
}

dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag, c.config.MaxGRPCMessageSize)
conn, err := grpc.NewClient(relayUrl, dialOptions...)
if err != nil {
initErr = fmt.Errorf("create grpc client for key %d: %w", key, err)
return
}
c.clientConnections.Store(key, conn)
c.grpcRelayClients.Store(key, relaygrpc.NewRelayClient(conn))
})

func (c *relayClient) GetSockets() map[corev2.RelayKey]string {
return c.config.Sockets
return initErr
}

func (c *relayClient) Close() error {
var errList *multierror.Error
c.conns.Range(func(k, v interface{}) bool {
conn, ok := v.(*grpc.ClientConn)
if !ok {
errList = multierror.Append(errList, fmt.Errorf("invalid connection for relay key: %v", k))
return true
}
c.clientConnections.Range(
func(k, v interface{}) bool {
conn, ok := v.(*grpc.ClientConn)
if !ok {
errList = multierror.Append(errList, fmt.Errorf("invalid connection for relay key: %v", k))
return true
}

if conn != nil {
err := conn.Close()
c.conns.Delete(k)
c.grpcClients.Delete(k)
if err != nil {
c.logger.Error("failed to close connection", "err", err)
errList = multierror.Append(errList, err)
if conn != nil {
err := conn.Close()
c.clientConnections.Delete(k)
c.grpcRelayClients.Delete(k)
if err != nil {
c.logger.Error("failed to close connection", "err", err)
errList = multierror.Append(errList, err)
}
}
}
return true
})
return true
})
return errList.ErrorOrNil()
}
5 changes: 4 additions & 1 deletion api/clients/v2/relay_payload_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/rand"

"github.com/Layr-Labs/eigenda/api/clients/v2/coretypes"
"github.com/Layr-Labs/eigenda/api/clients/v2/relay"
"github.com/Layr-Labs/eigenda/api/clients/v2/verification"
core "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigensdk-go/logging"
Expand Down Expand Up @@ -41,7 +42,9 @@ func BuildRelayPayloadRetriever(
return nil, fmt.Errorf("check and set RelayPayloadRetrieverConfig config: %w", err)
}

relayClient, err := NewRelayClient(relayClientConfig, log)
testRelayUrlProvider := relay.TestRelayUrlProvider{}

relayClient, err := NewRelayClient(relayClientConfig, log, &testRelayUrlProvider)
if err != nil {
return nil, fmt.Errorf("new relay client: %w", err)
}
Expand Down
Loading

0 comments on commit c44fce7

Please sign in to comment.