Skip to content

Commit

Permalink
Iterate on concurrency logic
Browse files Browse the repository at this point in the history
Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com>
  • Loading branch information
litt3 committed Feb 25, 2025
1 parent 83c2623 commit 4ed780c
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 36 deletions.
34 changes: 34 additions & 0 deletions api/clients/v2/relay/key_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package relay

import (
"sync"
)

// KeyLock is a utility that provides a way to lock access to a given key of type T
//
// This utility is useful in situations where you want to synchronize operations for something that doesn't exist
// in a concrete form. For example, perhaps you only want to create connections with a given peer on a single
// thread of execution, but the new peer could appear simultaneously in concurrent operations. This utility allows
// the first thread which encounters the new peer to perform necessary initialization tasks, and store generated
// artifacts in a central location for subsequent callers to access.
type KeyLock[T comparable] struct {
// Map from key T to a mutex that corresponds to that key
keyMutexMap map[T]*sync.Mutex
// Used to lock access to the keyMutexMap, so that only a single mutex is created for each key
globalMutex sync.Mutex
}

// AcquireKeyLock acquires an exclusive lock on a conceptual key, and returns a function to release the lock
func (kl *KeyLock[T]) AcquireKeyLock(key T) func() {
// we must globally synchronize access to the mutex map, so that only a single mutex will be created for a given key
kl.globalMutex.Lock()
keyMutex, valueAlreadyExists := kl.keyMutexMap[key]
if !valueAlreadyExists {
keyMutex = &sync.Mutex{}
kl.keyMutexMap[key] = keyMutex
}
kl.globalMutex.Unlock()

keyMutex.Lock()
return keyMutex.Unlock
}
78 changes: 42 additions & 36 deletions api/clients/v2/relay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ type RelayClient interface {
type relayClient struct {
logger logging.Logger
config *RelayClientConfig
// initOnce is used to ensure that the connection to each relay is initialized only once
initOnce map[corev2.RelayKey]*sync.Once
// initOnceMutex protects access to the initOnce map
initOnceMutex sync.Mutex
// relayLockProvider provides locks that correspond to individual relay keys
relayLockProvider relay.KeyLock[corev2.RelayKey]
// relayInitializationStatus maps relay key to a bool `map[corev2.RelayKey]bool`
// the boolean value indicates whether the connection to that relay has been initialized
relayInitializationStatus sync.Map
// clientConnections maps relay key to the gRPC connection: `map[corev2.RelayKey]*grpc.ClientConn`
// this map is maintained so that connections can be closed in Close
clientConnections sync.Map
Expand All @@ -73,8 +74,8 @@ type relayClient struct {

var _ RelayClient = (*relayClient)(nil)

// NewRelayClient creates a new RelayClient that connects to the relays specified in the config.
// It keeps a connection to each relay and reuses it for subsequent requests, and the connection is lazily instantiated.
// NewRelayClient creates a new RelayClient. It keeps a connection to each relay and reuses it for subsequent requests,
// and the connection is lazily instantiated.
func NewRelayClient(
config *RelayClientConfig,
logger logging.Logger,
Expand Down Expand Up @@ -243,39 +244,44 @@ func (c *relayClient) getClient(ctx context.Context, key corev2.RelayKey) (relay
// initOnceGrpcConnection initializes the GRPC connection for a given relay, and is guaranteed to only do perform
// the initialization once per relay.
func (c *relayClient) initOnceGrpcConnection(ctx context.Context, key corev2.RelayKey) error {
// we must use a mutex here instead of a sync.Map, because this method could be called concurrently, and if
// two concurrent calls tried to `LoadOrStore` from a sync.Map at the same time, it's possible they would
// each create a unique sync.Once object, and perform duplicate initialization
c.initOnceMutex.Lock()
once, ok := c.initOnce[key]
if !ok {
once = &sync.Once{}
c.initOnce[key] = once
_, alreadyInitialized := c.relayInitializationStatus.Load(key)
if alreadyInitialized {
// this is the standard case, where the grpc connection has already been initialized
return nil
}
c.initOnceMutex.Unlock()

// TODO (litt3): should we implement a way to rebuild connections that break, or fail to initialize? as it currently
// stands, if this initialization fails, or a connection breaks, the relay client will never speak to that relay again
var initErr error
once.Do(
func() {
relayUrl, err := c.relayUrlProvider.GetRelayUrl(ctx, key)
if err != nil {
initErr = fmt.Errorf("get relay url for key %d: %w", key, err)
return
}

dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag, c.config.MaxGRPCMessageSize)
conn, err := grpc.NewClient(relayUrl, dialOptions...)
if err != nil {
initErr = fmt.Errorf("create grpc client for key %d: %w", key, err)
return
}
c.clientConnections.Store(key, conn)
c.grpcRelayClients.Store(key, relaygrpc.NewRelayClient(conn))
})
// In cases were the value hasn't already been initialized, we must acquire a conceptual lock on the relay in
// question. This allows us to guarantee that a connection with a given relay is only initialized a single time
releaseMemberLock := c.relayLockProvider.AcquireKeyLock(key)
defer releaseMemberLock()

return initErr
_, alreadyInitialized = c.relayInitializationStatus.Load(key)
if alreadyInitialized {
// If we find that the connection was initialized in the time it took to acquire a conceptual lock on the relay,
// that means that a different caller did the necessary work already
return nil
}

// TODO (litt3): storing `true` for key immediately mirrors the previous implementation, where a failed init
// is NOT retried, and the connection to that relay will just be broken forever. Consider implementing
// logic to retry initialization after a period of time in case of failure
c.relayInitializationStatus.Store(key, true)

relayUrl, err := c.relayUrlProvider.GetRelayUrl(ctx, key)
if err != nil {
return fmt.Errorf("get relay url for key %d: %w", key, err)
}

dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag, c.config.MaxGRPCMessageSize)
conn, err := grpc.NewClient(relayUrl, dialOptions...)
if err != nil {
return fmt.Errorf("create grpc client for key %d: %w", key, err)

}
c.clientConnections.Store(key, conn)
c.grpcRelayClients.Store(key, relaygrpc.NewRelayClient(conn))

return nil
}

func (c *relayClient) Close() error {
Expand Down

0 comments on commit 4ed780c

Please sign in to comment.