Skip to content

Commit

Permalink
Wrap errors in relay (Layr-Labs#1130)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmanc authored Jan 21, 2025
1 parent 8913f95 commit a6dd724
Showing 1 changed file with 26 additions and 22 deletions.
48 changes: 26 additions & 22 deletions relay/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"errors"
"fmt"
"github.com/Layr-Labs/eigenda/relay/metrics"
"net"
"strings"
"time"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/relay"
"github.com/Layr-Labs/eigenda/common/healthcheck"
"github.com/Layr-Labs/eigenda/core"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/Layr-Labs/eigenda/relay/auth"
"github.com/Layr-Labs/eigenda/relay/chunkstore"
"github.com/Layr-Labs/eigenda/relay/limiter"
"github.com/Layr-Labs/eigenda/relay/metrics"
"github.com/Layr-Labs/eigensdk-go/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
Expand Down Expand Up @@ -222,37 +224,37 @@ func (s *Server) GetBlob(ctx context.Context, request *pb.GetBlobRequest) (*pb.G

err := s.blobRateLimiter.BeginGetBlobOperation(time.Now())
if err != nil {
return nil, err
return nil, api.NewErrorResourceExhausted(fmt.Sprintf("rate limit exceeded: %v", err))
}
defer s.blobRateLimiter.FinishGetBlobOperation()

key, err := v2.BytesToBlobKey(request.BlobKey)
if err != nil {
return nil, fmt.Errorf("invalid blob key: %w", err)
return nil, api.NewErrorInvalidArg(fmt.Sprintf("invalid blob key: %v", err))
}

keys := []v2.BlobKey{key}
mMap, err := s.metadataProvider.GetMetadataForBlobs(ctx, keys)
if err != nil {
return nil, fmt.Errorf(
"error fetching metadata for blob, check if blob exists and is assigned to this relay: %w", err)
return nil, api.NewErrorInternal(fmt.Sprintf(
"error fetching metadata for blob, check if blob exists and is assigned to this relay: %v", err))
}
metadata := mMap[v2.BlobKey(request.BlobKey)]
if metadata == nil {
return nil, fmt.Errorf("blob not found")
return nil, api.NewErrorNotFound("blob not found")
}

finishedFetchingMetadata := time.Now()
s.metrics.ReportBlobMetadataLatency(finishedFetchingMetadata.Sub(start))

err = s.blobRateLimiter.RequestGetBlobBandwidth(time.Now(), metadata.blobSizeBytes)
if err != nil {
return nil, err
return nil, api.NewErrorResourceExhausted(fmt.Sprintf("bandwidth limit exceeded: %v", err))
}

data, err := s.blobProvider.GetBlob(ctx, key)
if err != nil {
return nil, fmt.Errorf("error fetching blob %s: %w", key.Hex(), err)
return nil, api.NewErrorInternal(fmt.Sprintf("error fetching blob %s: %v", key.Hex(), err))
}

s.metrics.ReportBlobDataSize(len(data))
Expand All @@ -276,25 +278,25 @@ func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (*
}

if len(request.ChunkRequests) <= 0 {
return nil, fmt.Errorf("no chunk requests provided")
return nil, api.NewErrorInvalidArg("no chunk requests provided")
}
if len(request.ChunkRequests) > s.config.MaxKeysPerGetChunksRequest {
return nil, fmt.Errorf(
"too many chunk requests provided, max is %d", s.config.MaxKeysPerGetChunksRequest)
return nil, api.NewErrorInvalidArg(fmt.Sprintf(
"too many chunk requests provided, max is %d", s.config.MaxKeysPerGetChunksRequest))
}
s.metrics.ReportChunkKeyCount(len(request.ChunkRequests))

if s.authenticator != nil {
client, ok := peer.FromContext(ctx)
if !ok {
return nil, errors.New("could not get peer information")
return nil, api.NewErrorInternal("could not get peer information")
}
clientAddress := client.Addr.String()

err := s.authenticator.AuthenticateGetChunksRequest(ctx, clientAddress, request, time.Now())
if err != nil {
s.metrics.ReportChunkAuthFailure()
return nil, fmt.Errorf("auth failed: %w", err)
return nil, api.NewErrorInvalidArg(fmt.Sprintf("auth failed: %v", err))
}
}

Expand All @@ -306,43 +308,46 @@ func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (*
clientID := string(request.OperatorId)
err := s.chunkRateLimiter.BeginGetChunkOperation(time.Now(), clientID)
if err != nil {
return nil, err
return nil, api.NewErrorResourceExhausted(fmt.Sprintf("rate limit exceeded: %v", err))
}
defer s.chunkRateLimiter.FinishGetChunkOperation(clientID)

// keys might contain duplicate keys
keys, err := getKeysFromChunkRequest(request)
if err != nil {
return nil, err
return nil, api.NewErrorInvalidArg(fmt.Sprintf("invalid request: %v", err))
}

mMap, err := s.metadataProvider.GetMetadataForBlobs(ctx, keys)
if err != nil {
return nil, fmt.Errorf(
"error fetching metadata for blob, check if blob exists and is assigned to this relay: %w", err)
return nil, api.NewErrorInternal(fmt.Sprintf(
"error fetching metadata for blob, check if blob exists and is assigned to this relay: %v", err))
}

finishedFetchingMetadata := time.Now()
s.metrics.ReportChunkMetadataLatency(finishedFetchingMetadata.Sub(finishedAuthenticating))

requiredBandwidth, err := computeChunkRequestRequiredBandwidth(request, mMap)
if err != nil {
return nil, fmt.Errorf("error computing required bandwidth: %w", err)
return nil, api.NewErrorInternal(fmt.Sprintf("error computing required bandwidth: %v", err))
}
err = s.chunkRateLimiter.RequestGetChunkBandwidth(time.Now(), clientID, requiredBandwidth)
if err != nil {
return nil, err
if strings.Contains(err.Error(), "internal error") {
return nil, api.NewErrorInternal(err.Error())
}
return nil, api.NewErrorResourceExhausted(fmt.Sprintf("bandwidth limit exceeded: %v", err))
}
s.metrics.ReportChunkDataSize(requiredBandwidth)

frames, err := s.chunkProvider.GetFrames(ctx, mMap)
if err != nil {
return nil, fmt.Errorf("error fetching frames: %w", err)
return nil, api.NewErrorInternal(fmt.Sprintf("error fetching frames: %v", err))
}

bytesToSend, err := gatherChunkDataToSend(frames, request)
if err != nil {
return nil, fmt.Errorf("error gathering chunk data: %w", err)
return nil, api.NewErrorInternal(fmt.Sprintf("error gathering chunk data: %v", err))
}

s.metrics.ReportChunkDataLatency(time.Since(finishedFetchingMetadata))
Expand Down Expand Up @@ -462,7 +467,6 @@ func computeChunkRequestRequiredBandwidth(request *pb.GetChunksRequest, mMap met
}

return requiredBandwidth, nil

}

// Start starts the server listening for requests. This method will block until the server is stopped.
Expand Down

0 comments on commit a6dd724

Please sign in to comment.