Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement relay url provider #1328

Merged
merged 15 commits into from
Feb 27, 2025
58 changes: 58 additions & 0 deletions api/clients/v2/relay/default_relay_url_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package relay
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: filename

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


import (
"context"
"fmt"

"github.com/Layr-Labs/eigenda/common"
relayRegistryBindings "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDARelayRegistry"
v2 "github.com/Layr-Labs/eigenda/core/v2"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
gethcommon "github.com/ethereum/go-ethereum/common"
)

// relayUrlProvider provides relay URL strings, based on relay key.
type relayUrlProvider struct {
relayRegistryCaller *relayRegistryBindings.ContractEigenDARelayRegistryCaller
}

var _ RelayUrlProvider = &relayUrlProvider{}

// NewRelayUrlProvider constructs a relayUrlProvider
func NewRelayUrlProvider(
ethClient common.EthClient,
relayRegistryAddress gethcommon.Address,
) (RelayUrlProvider, error) {
relayRegistryContractCaller, err := relayRegistryBindings.NewContractEigenDARelayRegistryCaller(
relayRegistryAddress, ethClient)
if err != nil {
return nil, fmt.Errorf("NewContractEigenDARelayRegistryCaller: %w", err)
}

return &relayUrlProvider{
relayRegistryCaller: relayRegistryContractCaller,
}, nil
}

// GetRelayUrl gets the URL string for a given relayKey
func (rup *relayUrlProvider) GetRelayUrl(ctx context.Context, relayKey v2.RelayKey) (string, error) {
relayUrl, err := rup.relayRegistryCaller.RelayKeyToUrl(&bind.CallOpts{Context: ctx}, relayKey)
if err != nil {
return "", fmt.Errorf("fetch relay key (%d) URL from EigenDARelayRegistry contract: %w", relayKey, err)
}

return relayUrl, nil
}

// GetRelayCount gets the number of relays that exist in the registry
func (rup *relayUrlProvider) GetRelayCount(ctx context.Context) (uint32, error) {
// NextRelayKey initializes to 0, and is incremented each time a relay is added
// current logic doesn't support removing relays, so NextRelayKey therefore corresponds directly to relay count
relayCount, err := rup.relayRegistryCaller.NextRelayKey(&bind.CallOpts{Context: ctx})
if err != nil {
return 0, fmt.Errorf("get next relay key from EigenDARelayRegistry contract: %w", err)
}

return relayCount, nil
}
44 changes: 44 additions & 0 deletions api/clients/v2/relay/key_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package relay

import (
"sync"
)

// KeyLock is a utility that provides a way to lock access to a given key of type T
//
// This utility is useful in situations where you want to synchronize operations for something that doesn't exist
// in a concrete form. For example, perhaps you only want to create connections with a given peer on a single
// thread of execution, but the new peer could appear simultaneously in concurrent operations. This utility allows
// the first thread which encounters the new peer to perform necessary initialization tasks, and store generated
// artifacts in a central location for subsequent callers to access.
type KeyLock[T comparable] struct {
// Map from key T to a mutex that corresponds to that key
keyMutexMap map[T]*sync.Mutex
// Used to lock access to the keyMutexMap, so that only a single mutex is created for each key
globalMutex sync.Mutex
}

// NewKeyLock constructs a KeyLock utility
func NewKeyLock[T comparable]() *KeyLock[T] {
return &KeyLock[T]{
keyMutexMap: make(map[T]*sync.Mutex),
}
}

// AcquireKeyLock acquires an exclusive lock on a conceptual key, and returns a function to release the lock
//
// The caller MUST eventually invoke the returned unlock function, or all future calls with the same key will block
// indefinitely
func (kl *KeyLock[T]) AcquireKeyLock(key T) func() {
// we must globally synchronize access to the mutex map, so that only a single mutex will be created for a given key
kl.globalMutex.Lock()
keyMutex, valueAlreadyExists := kl.keyMutexMap[key]
if !valueAlreadyExists {
keyMutex = &sync.Mutex{}
kl.keyMutexMap[key] = keyMutex
}
kl.globalMutex.Unlock()

keyMutex.Lock()
return keyMutex.Unlock
}
63 changes: 63 additions & 0 deletions api/clients/v2/relay/key_lock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package relay

import (
"sync"
"sync/atomic"
"testing"

"github.com/Layr-Labs/eigenda/common/testutils/random"
"github.com/stretchr/testify/require"
)

func TestKeyLock(t *testing.T) {
// test in a field of 100 unique keys
keyCount := 100

// keep an atomic count, and a non-atomic count for each key
// the atomic count can be used at the end of the test, to make sure that the non-atomic count was handled correctly
atomicKeyAccessCounts := make([]atomic.Uint32, keyCount)
nonAtomicKeyAccessCounts := make([]uint32, keyCount)
for i := 0; i < keyCount; i++ {
atomicKeyAccessCounts = append(atomicKeyAccessCounts, atomic.Uint32{})
nonAtomicKeyAccessCounts = append(nonAtomicKeyAccessCounts, uint32(0))
}

keyLock := NewKeyLock[uint32]()

var waitGroup sync.WaitGroup

targetValue := uint32(1000)
worker := func() {
workerRandom := random.NewTestRandom()

for {
// randomly select a key to access
keyToAccess := uint32(workerRandom.Intn(keyCount))
newValue := atomicKeyAccessCounts[keyToAccess].Add(1)

unlock := keyLock.AcquireKeyLock(keyToAccess)
// increment the non-atomic count after acquiring access
// if the access controls are working correctly, this is a safe operation
nonAtomicKeyAccessCounts[keyToAccess] = nonAtomicKeyAccessCounts[keyToAccess] + 1
unlock()

// each worker stops looping after it sees a counter that has increased to targetValue
if newValue >= targetValue {
break
}
}

waitGroup.Done()
}

// start up 100 concurrent workers
for i := 0; i < 100; i++ {
waitGroup.Add(1)
go worker()
}
waitGroup.Wait()

for i := 0; i < keyCount; i++ {
require.Equal(t, atomicKeyAccessCounts[i].Load(), nonAtomicKeyAccessCounts[i])
}
}
15 changes: 15 additions & 0 deletions api/clients/v2/relay/relay_url_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package relay

import (
"context"

v2 "github.com/Layr-Labs/eigenda/core/v2"
)

// RelayUrlProvider provides relay URL strings, based on relay key
type RelayUrlProvider interface {
// GetRelayUrl gets the URL string for a given relayKey
GetRelayUrl(ctx context.Context, relayKey v2.RelayKey) (string, error)
// GetRelayCount returns the number of relays in the registry
GetRelayCount(ctx context.Context) (uint32, error)
}
Loading
Loading