Skip to content

Commit

Permalink
Extract a payload retriever interface (Layr-Labs#1187)
Browse files Browse the repository at this point in the history
Signed-off-by: litt3 <102969658+litt3@users.noreply.github.com>
  • Loading branch information
litt3 authored Jan 30, 2025
1 parent 0d293cc commit 7ce439f
Show file tree
Hide file tree
Showing 5 changed files with 394 additions and 359 deletions.
16 changes: 8 additions & 8 deletions api/clients/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ type PayloadClientConfig struct {
BlobVersion v2.BlobVersion
}

// PayloadRetrieverConfig contains an embedded PayloadClientConfig, plus all additional configuration values needed
// by a PayloadRetriever
type PayloadRetrieverConfig struct {
// RelayPayloadRetrieverConfig contains an embedded PayloadClientConfig, plus all additional configuration values needed
// by a RelayPayloadRetriever
type RelayPayloadRetrieverConfig struct {
PayloadClientConfig

// The timeout duration for relay calls to retrieve blobs.
Expand Down Expand Up @@ -125,11 +125,11 @@ func (cc *PayloadClientConfig) checkAndSetDefaults() error {
return nil
}

// GetDefaultPayloadRetrieverConfig creates a PayloadRetrieverConfig with default values
// GetDefaultRelayPayloadRetrieverConfig creates a RelayPayloadRetrieverConfig with default values
//
// NOTE: EthRpcUrl and EigenDACertVerifierAddr do not have defined defaults. These must always be specifically configured.
func GetDefaultPayloadRetrieverConfig() *PayloadRetrieverConfig {
return &PayloadRetrieverConfig{
func GetDefaultRelayPayloadRetrieverConfig() *RelayPayloadRetrieverConfig {
return &RelayPayloadRetrieverConfig{
PayloadClientConfig: *getDefaultPayloadClientConfig(),
RelayTimeout: 5 * time.Second,
}
Expand All @@ -140,13 +140,13 @@ func GetDefaultPayloadRetrieverConfig() *PayloadRetrieverConfig {
// 1. If a config value is 0, and a 0 value makes sense, do nothing.
// 2. If a config value is 0, but a 0 value doesn't make sense and a default value is defined, then set it to the default.
// 3. If a config value is 0, but a 0 value doesn't make sense and a default value isn't defined, return an error.
func (rc *PayloadRetrieverConfig) checkAndSetDefaults() error {
func (rc *RelayPayloadRetrieverConfig) checkAndSetDefaults() error {
err := rc.PayloadClientConfig.checkAndSetDefaults()
if err != nil {
return err
}

defaultConfig := GetDefaultPayloadRetrieverConfig()
defaultConfig := GetDefaultRelayPayloadRetrieverConfig()
if rc.RelayTimeout == 0 {
rc.RelayTimeout = defaultConfig.RelayTimeout
}
Expand Down
280 changes: 6 additions & 274 deletions api/clients/v2/payload_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,283 +2,15 @@ package clients

import (
"context"
"errors"
"fmt"
"math/rand"

"github.com/Layr-Labs/eigenda/api/clients/codecs"
"github.com/Layr-Labs/eigenda/api/clients/v2/verification"
"github.com/Layr-Labs/eigenda/common/geth"
verifiercontract "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDACertVerifier"
core "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/consensys/gnark-crypto/ecc/bn254"
gethcommon "github.com/ethereum/go-ethereum/common"
)

// PayloadRetriever provides the ability to get payloads from the relay subsystem.
// PayloadRetriever represents something that knows how to retrieve a payload from some backend using a verification.EigenDACert
//
// In the future, this struct will be expanded to support distributed retrieval directly from DA nodes, if unable
// to retrieve a payload from the relay subsystem.
//
// This struct is goroutine safe.
type PayloadRetriever struct {
log logging.Logger
// random doesn't need to be cryptographically secure, as it's only used to distribute load across relays.
// Not all methods on Rand are guaranteed goroutine safe: if additional usages of random are added, they
// must be evaluated for thread safety.
random *rand.Rand
config PayloadRetrieverConfig
codec codecs.BlobCodec
relayClient RelayClient
g1Srs []bn254.G1Affine
certVerifier verification.ICertVerifier
}

// BuildPayloadRetriever builds a PayloadRetriever from config structs.
func BuildPayloadRetriever(
log logging.Logger,
payloadRetrieverConfig PayloadRetrieverConfig,
ethConfig geth.EthClientConfig,
relayClientConfig *RelayClientConfig,
g1Srs []bn254.G1Affine) (*PayloadRetriever, error) {

relayClient, err := NewRelayClient(relayClientConfig, log)
if err != nil {
return nil, fmt.Errorf("new relay client: %w", err)
}

ethClient, err := geth.NewClient(ethConfig, gethcommon.Address{}, 0, log)
if err != nil {
return nil, fmt.Errorf("new eth client: %w", err)
}

certVerifier, err := verification.NewCertVerifier(*ethClient, payloadRetrieverConfig.EigenDACertVerifierAddr)
if err != nil {
return nil, fmt.Errorf("new cert verifier: %w", err)
}

codec, err := codecs.CreateCodec(payloadRetrieverConfig.PayloadPolynomialForm, payloadRetrieverConfig.BlobEncodingVersion)
if err != nil {
return nil, err
}

return NewPayloadRetriever(
log,
rand.New(rand.NewSource(rand.Int63())),
payloadRetrieverConfig,
relayClient,
certVerifier,
codec,
g1Srs)
}

// NewPayloadRetriever assembles a PayloadRetriever from subcomponents that have already been constructed and initialized.
func NewPayloadRetriever(
log logging.Logger,
random *rand.Rand,
payloadRetrieverConfig PayloadRetrieverConfig,
relayClient RelayClient,
certVerifier verification.ICertVerifier,
codec codecs.BlobCodec,
g1Srs []bn254.G1Affine) (*PayloadRetriever, error) {

err := payloadRetrieverConfig.checkAndSetDefaults()
if err != nil {
return nil, fmt.Errorf("check and set PayloadRetrieverConfig config: %w", err)
}

return &PayloadRetriever{
log: log,
random: random,
config: payloadRetrieverConfig,
codec: codec,
relayClient: relayClient,
certVerifier: certVerifier,
g1Srs: g1Srs,
}, nil
}

// GetPayload iteratively attempts to fetch a given blob with key blobKey from relays that have it, as claimed by the
// blob certificate. The relays are attempted in random order.
//
// If the blob is successfully retrieved, then the blob is verified. If the verification succeeds, the blob is decoded
// to yield the payload (the original user data), and the payload is returned.
func (pr *PayloadRetriever) GetPayload(
ctx context.Context,
blobKey core.BlobKey,
eigenDACert *verification.EigenDACert) ([]byte, error) {

err := pr.verifyCertWithTimeout(ctx, eigenDACert)
if err != nil {
return nil, fmt.Errorf("verify cert with timeout for blobKey %v: %w", blobKey.Hex(), err)
}

relayKeys := eigenDACert.BlobInclusionInfo.BlobCertificate.RelayKeys
relayKeyCount := len(relayKeys)
if relayKeyCount == 0 {
return nil, errors.New("relay key count is zero")
}

blobCommitments, err := blobCommitmentsBindingToInternal(
&eigenDACert.BlobInclusionInfo.BlobCertificate.BlobHeader.Commitment)

if err != nil {
return nil, fmt.Errorf("blob commitments binding to internal: %w", err)
}

// create a randomized array of indices, so that it isn't always the first relay in the list which gets hit
indices := pr.random.Perm(relayKeyCount)

// TODO (litt3): consider creating a utility which deprioritizes relays that fail to respond (or respond maliciously),
// and prioritizes relays with lower latencies.

// iterate over relays in random order, until we are able to get the blob from someone
for _, val := range indices {
relayKey := relayKeys[val]

blob, err := pr.getBlobWithTimeout(ctx, relayKey, blobKey)
// if GetBlob returned an error, try calling a different relay
if err != nil {
pr.log.Warn(
"blob couldn't be retrieved from relay",
"blobKey", blobKey.Hex(),
"relayKey", relayKey,
"error", err)
continue
}

err = pr.verifyBlobAgainstCert(blobKey, relayKey, blob, blobCommitments.Commitment, blobCommitments.Length)

// An honest relay should never send a blob which doesn't verify against the cert
if err != nil {
pr.log.Warn("verify blob from relay against cert: %w", err)
continue
}

payload, err := pr.codec.DecodeBlob(blob)
if err != nil {
pr.log.Error(
`Cert verification was successful, but decode blob failed!
This is likely a problem with the local blob codec configuration,
but could potentially indicate a maliciously generated blob certificate.
It should not be possible for an honestly generated certificate to verify
for an invalid blob!`,
"blobKey", blobKey.Hex(), "relayKey", relayKey, "eigenDACert", eigenDACert, "error", err)
return nil, fmt.Errorf("decode blob: %w", err)
}

return payload, nil
}

return nil, fmt.Errorf("unable to retrieve blob %v from any relay. relay count: %d", blobKey.Hex(), relayKeyCount)
}

// verifyBlobAgainstCert verifies the blob received from a relay against the certificate.
//
// The following verifications are performed in this method:
// 1. Verify that the blob isn't empty
// 2. Verify the blob against the cert's kzg commitment
// 3. Verify that the blob length is less than or equal to the cert's blob length
//
// If all verifications succeed, the method returns nil. Otherwise, it returns an error.
func (pr *PayloadRetriever) verifyBlobAgainstCert(
blobKey core.BlobKey,
relayKey core.RelayKey,
blob []byte,
kzgCommitment *encoding.G1Commitment,
blobLength uint) error {

// An honest relay should never send an empty blob
if len(blob) == 0 {
return fmt.Errorf("blob %v received from relay %v had length 0", blobKey.Hex(), relayKey)
}

// TODO: in the future, this will be optimized to use fiat shamir transformation for verification, rather than
// regenerating the commitment: https://github.com/Layr-Labs/eigenda/issues/1037
valid, err := verification.GenerateAndCompareBlobCommitment(pr.g1Srs, blob, kzgCommitment)
if err != nil {
return fmt.Errorf(
"generate and compare commitment for blob %v received from relay %v: %w",
blobKey.Hex(),
relayKey,
err)
}

if !valid {
return fmt.Errorf("commitment for blob %v is invalid for bytes received from relay %v", blobKey.Hex(), relayKey)
}

// Checking that the length returned by the relay is <= the length claimed in the BlobCommitments is sufficient
// here: it isn't necessary to verify the length proof itself, since this will have been done by DA nodes prior to
// signing for availability.
//
// Note that the length in the commitment is the length of the blob in symbols
if uint(len(blob)) > blobLength*encoding.BYTES_PER_SYMBOL {
return fmt.Errorf(
"length for blob %v (%d bytes) received from relay %v is greater than claimed blob length (%d bytes)",
blobKey.Hex(),
len(blob),
relayKey,
blobLength*encoding.BYTES_PER_SYMBOL)
}

return nil
}

// getBlobWithTimeout attempts to get a blob from a given relay, and times out based on config.RelayTimeout
func (pr *PayloadRetriever) getBlobWithTimeout(
ctx context.Context,
relayKey core.RelayKey,
blobKey core.BlobKey) ([]byte, error) {

timeoutCtx, cancel := context.WithTimeout(ctx, pr.config.RelayTimeout)
defer cancel()

return pr.relayClient.GetBlob(timeoutCtx, relayKey, blobKey)
}

// verifyCertWithTimeout verifies an EigenDACert by making a call to VerifyCertV2.
//
// This method times out after the duration configured in payloadRetrieverConfig.ContractCallTimeout
func (pr *PayloadRetriever) verifyCertWithTimeout(
ctx context.Context,
eigenDACert *verification.EigenDACert,
) error {
timeoutCtx, cancel := context.WithTimeout(ctx, pr.config.ContractCallTimeout)
defer cancel()

return pr.certVerifier.VerifyCertV2(timeoutCtx, eigenDACert)
}

// Close is responsible for calling close on all internal clients. This method will do its best to close all internal
// clients, even if some closes fail.
//
// Any and all errors returned from closing internal clients will be joined and returned.
//
// This method should only be called once.
func (pr *PayloadRetriever) Close() error {
err := pr.relayClient.Close()
if err != nil {
return fmt.Errorf("close relay client: %w", err)
}

return nil
}

// blobCommitmentsBindingToInternal converts a blob commitment from an eigenDA cert into the internal
// encoding.BlobCommitments type
func blobCommitmentsBindingToInternal(
blobCommitmentBinding *verifiercontract.BlobCommitment,
) (*encoding.BlobCommitments, error) {

blobCommitment, err := encoding.BlobCommitmentsFromProtobuf(
verification.BlobCommitmentBindingToProto(blobCommitmentBinding))

if err != nil {
return nil, fmt.Errorf("blob commitments from protobuf: %w", err)
}

return blobCommitment, nil
// This interface may be implemented to provide alternate retrieval methods, for example payload retrieval from an S3
// bucket instead of from EigenDA relays or nodes.
type PayloadRetriever interface {
// GetPayload retrieves a payload from some backend, using the provided certificate
GetPayload(ctx context.Context, eigenDACert *verification.EigenDACert) ([]byte, error)
}
Loading

0 comments on commit 7ce439f

Please sign in to comment.