Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support backward iteration for blob feed #1314

Merged
merged 5 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (s *BlobMetadataStore) queryBucketBlobMetadata(
}

// GetBlobMetadataByRequestedAtForward returns blobs (as BlobMetadata) in cursor range
// (after, until] (after exclusive, until inclusive). Blobs are ordered by <RequestedAt, BlobKey>
// (after, until) (both exclusive). Blobs are ordered by <RequestedAt, BlobKey>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making both ends exclusive, because:

  • it's easier for the feed API implementation
  • this is at nanosecond level (the highest resolution), so inclusive use cases can just make a [L-1, R+1] adjustment which will work correctly (at nanosecond, there is no time points between L-1 and L)

// in ascending order.
//
// If limit > 0, returns at most that many blobs. If limit <= 0, returns all blobs in range.
Expand Down Expand Up @@ -388,8 +388,8 @@ func (s *BlobMetadataStore) GetBlobMetadataByRequestedAtForward(
if err != nil {
return nil, nil, fmt.Errorf("failed to get blob key: %w", err)
}
// Skip the after cursor's blob
if after.Equal(bm.RequestedAt, &blobKey) {
// Skip blobs at the endpoints
if after.Equal(bm.RequestedAt, &blobKey) || until.Equal(bm.RequestedAt, &blobKey) {
continue
}
result = append(result, bm)
Expand All @@ -406,7 +406,7 @@ func (s *BlobMetadataStore) GetBlobMetadataByRequestedAtForward(
}

// GetBlobMetadataByRequestedAtBackward returns blobs (as BlobMetadata) in cursor range
// [until, before) (until inclusive, before exclusive). Blobs are ordered by <RequestedAt, BlobKey>
// (until, before) (both exclusive). Blobs are ordered by <RequestedAt, BlobKey>
// in descending order.
//
// If limit > 0, returns at most that many blobs. If limit <= 0, returns all blobs in range.
Expand Down Expand Up @@ -441,8 +441,8 @@ func (s *BlobMetadataStore) GetBlobMetadataByRequestedAtBackward(
if err != nil {
return nil, nil, fmt.Errorf("failed to get blob key: %w", err)
}
// Skip the before cursor's blob
if before.Equal(bm.RequestedAt, &blobKey) {
// Skip blobs at the endpoints
if before.Equal(bm.RequestedAt, &blobKey) || until.Equal(bm.RequestedAt, &blobKey) {
continue
}
result = append(result, bm)
Expand Down Expand Up @@ -528,8 +528,8 @@ func (s *BlobMetadataStore) queryBucketAttestation(
return attestations, nil
}

// GetAttestationByAttestedAtForward returns attestations within time range (after, until]
// (after exclusive, until inclusive), ordered by AttestedAt timestamp in ascending order.
// GetAttestationByAttestedAtForward returns attestations within time range (after, until)
// (both exclusive), ordered by AttestedAt timestamp in ascending order.
//
// The function splits the time range into buckets and queries each bucket sequentially from earliest to latest.
// Results from all buckets are combined while maintaining the ordering.
Expand All @@ -542,8 +542,8 @@ func (s *BlobMetadataStore) GetAttestationByAttestedAtForward(
until uint64,
limit int,
) ([]*corev2.Attestation, error) {
if after >= until {
return nil, errors.New("after must be less than until")
if after+1 > until-1 {
return nil, fmt.Errorf("no time point in exclusive time range (%d, %d)", after, until)
}
startBucket, endBucket := GetAttestedAtBucketIDRange(after, until)
result := make([]*corev2.Attestation, 0)
Expand All @@ -558,7 +558,7 @@ func (s *BlobMetadataStore) GetAttestationByAttestedAtForward(
remaining = limit - len(result)
}
// Query bucket in ascending order
bucketAttestation, err := s.queryBucketAttestation(ctx, bucket, after+1, until, remaining, true)
bucketAttestation, err := s.queryBucketAttestation(ctx, bucket, after+1, until-1, remaining, true)
if err != nil {
return nil, err
}
Expand All @@ -572,8 +572,8 @@ func (s *BlobMetadataStore) GetAttestationByAttestedAtForward(
return result, nil
}

// GetAttestationByAttestedAtBackward returns attestations within time range [until, before)
// (until inclusive, before exclusive), ordered by AttestedAt timestamp in descending order.
// GetAttestationByAttestedAtBackward returns attestations within time range (until, before)
// (both exclusive), ordered by AttestedAt timestamp in descending order.
//
// The function splits the time range into buckets and queries each bucket sequentially from latest to earliest.
// Results from all buckets are combined while maintaining the ordering.
Expand All @@ -586,8 +586,8 @@ func (s *BlobMetadataStore) GetAttestationByAttestedAtBackward(
until uint64,
limit int,
) ([]*corev2.Attestation, error) {
if until >= before {
return nil, errors.New("until must be less than before")
if until+1 > before-1 {
return nil, fmt.Errorf("no time point in exclusive time range (%d, %d)", until, before)
}
// Note: we traverse buckets in reverse order for backward query
startBucket, endBucket := GetAttestedAtBucketIDRange(until, before)
Expand All @@ -603,7 +603,7 @@ func (s *BlobMetadataStore) GetAttestationByAttestedAtBackward(
remaining = limit - len(result)
}
// Query bucket in descending order
bucketAttestation, err := s.queryBucketAttestation(ctx, bucket, until, before-1, remaining, false)
bucketAttestation, err := s.queryBucketAttestation(ctx, bucket, until+1, before-1, remaining, false)
if err != nil {
return nil, err
}
Expand Down
77 changes: 43 additions & 34 deletions disperser/common/v2/blobstore/dynamo_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func TestBlobMetadataStoreGetBlobMetadataByRequestedAtForwardWithIdenticalTimest
BlobKey: &keys[1],
}
endCursor = blobstore.BlobFeedCursor{
RequestedAt: requestedAts[3],
RequestedAt: requestedAts[4],
BlobKey: nil,
}

Expand All @@ -351,8 +351,8 @@ func TestBlobMetadataStoreGetBlobMetadataByRequestedAtForwardWithIdenticalTimest
expectLast int
}{
{nil, 1, 2},
{&keys[3], 2, 3},
{&keys[4], 3, 4},
{&keys[3], 1, 2}, // keys[2] will be retrieved
{&keys[4], 2, 3}, // keys[2], keys[3] will be retrieved
}

for _, tc := range testCases {
Expand Down Expand Up @@ -537,11 +537,17 @@ func TestBlobMetadataStoreGetBlobMetadataByRequestedAtForward(t *testing.T) {
assert.Equal(t, 0, len(metadata))
assert.Nil(t, lastProcessedCursor)

// Test inclusive end
// Test exclusive end
endCursor.BlobKey = &keys[1]
metadata, lastProcessedCursor, err = blobMetadataStore.GetBlobMetadataByRequestedAtForward(ctx, startCursor, endCursor, 0)
require.NoError(t, err)
assert.Equal(t, 1, len(metadata))
require.Equal(t, 0, len(metadata))
assert.Equal(t, nil, lastProcessedCursor)

endCursor.RequestedAt = firstBlobTime + nanoSecsPerBlob + 1 // pass the time of second blob
metadata, lastProcessedCursor, err = blobMetadataStore.GetBlobMetadataByRequestedAtForward(ctx, startCursor, endCursor, 0)
require.NoError(t, err)
require.Equal(t, 1, len(metadata))
assert.Equal(t, firstBlobTime+nanoSecsPerBlob, metadata[0].RequestedAt)
checkBlobKeyEqual(t, keys[1], metadata[0].BlobHeader)
require.NotNil(t, lastProcessedCursor)
Expand Down Expand Up @@ -700,40 +706,35 @@ func TestBlobMetadataStoreGetBlobMetadataByRequestedAtBackward(t *testing.T) {
}
untilCursor := blobstore.BlobFeedCursor{
RequestedAt: firstBlobTime, // time of blob[0]
BlobKey: &keys[0], // inclusive
BlobKey: &keys[0], // exclusive
}

// Test exclusive before, inclusive until
// Test exclusive before, exclusive until
metadata, lastProcessedCursor, err := blobMetadataStore.GetBlobMetadataByRequestedAtBackward(
ctx,
beforeCursor, // blob[1] excluded
untilCursor, // blob[0] included
untilCursor, // blob[0] excluded
0,
)
require.NoError(t, err)
require.Equal(t, 1, len(metadata))
assert.Equal(t, firstBlobTime, metadata[0].RequestedAt) // blob[0]
checkBlobKeyEqual(t, keys[0], metadata[0].BlobHeader)
require.NotNil(t, lastProcessedCursor)
assert.Equal(t, keys[0], *lastProcessedCursor.BlobKey)
require.Equal(t, 0, len(metadata))
assert.Equal(t, nil, lastProcessedCursor)

// Test the effects of blob key in before cursor
beforeCursor.RequestedAt = firstBlobTime + nanoSecsPerBlob*2 // time of blob[2]
beforeCursor.BlobKey = &keys[2] // exclusive of blob[2]
metadata, lastProcessedCursor, err = blobMetadataStore.GetBlobMetadataByRequestedAtBackward(
ctx,
beforeCursor, // excludes blob[2]
untilCursor, // includes blob[0]
untilCursor, // excludes blob[0]
0,
)
require.NoError(t, err)
require.Equal(t, 2, len(metadata))
require.Equal(t, 1, len(metadata))
assert.Equal(t, firstBlobTime+nanoSecsPerBlob, metadata[0].RequestedAt) // blob[1]
assert.Equal(t, firstBlobTime, metadata[1].RequestedAt) // blob[0]
checkBlobKeyEqual(t, keys[1], metadata[0].BlobHeader)
checkBlobKeyEqual(t, keys[0], metadata[1].BlobHeader)
require.NotNil(t, lastProcessedCursor)
assert.Equal(t, keys[0], *lastProcessedCursor.BlobKey)
assert.Equal(t, keys[1], *lastProcessedCursor.BlobKey)

// Test when removing blob key from until cursor
untilCursor.BlobKey = nil // makes until cursor point to before blob[0]
Expand Down Expand Up @@ -859,7 +860,11 @@ func TestBlobMetadataStoreGetAttestationByAttestedAtForward(t *testing.T) {
// Test invalid time range
_, err := blobMetadataStore.GetAttestationByAttestedAtForward(ctx, 1, 1, 0)
require.Error(t, err)
assert.Equal(t, "after must be less than until", err.Error())
assert.Equal(t, "no time point in exclusive time range (1, 1)", err.Error())

_, err = blobMetadataStore.GetAttestationByAttestedAtForward(ctx, 1, 2, 0)
require.Error(t, err)
assert.Equal(t, "no time point in exclusive time range (1, 2)", err.Error())

// Test empty range
attestations, err := blobMetadataStore.GetAttestationByAttestedAtForward(ctx, now, now+uint64(240*time.Hour.Nanoseconds()), 0)
Expand Down Expand Up @@ -895,7 +900,7 @@ func TestBlobMetadataStoreGetAttestationByAttestedAtForward(t *testing.T) {
// Test range boundaries
t.Run("range boundaries", func(t *testing.T) {
// Test exclusive start
attestations, err := blobMetadataStore.GetAttestationByAttestedAtForward(ctx, firstBatchTs, now, 0)
attestations, err := blobMetadataStore.GetAttestationByAttestedAtForward(ctx, firstBatchTs, now+1, 0)
require.NoError(t, err)
require.Equal(t, numBatches-1, len(attestations))
checkAttestationsOrdered(t, attestations)
Expand All @@ -904,21 +909,21 @@ func TestBlobMetadataStoreGetAttestationByAttestedAtForward(t *testing.T) {
assert.Equal(t, attestedAt[numBatches-1], attestations[numBatches-2].AttestedAt)
assert.Equal(t, batchHeaders[numBatches-1].BatchRoot, attestations[numBatches-2].BatchRoot)

// Test inclusive end
// Test exclusive end
attestations, err = blobMetadataStore.GetAttestationByAttestedAtForward(ctx, firstBatchTs-1, attestedAt[4], 0)
require.NoError(t, err)
require.Equal(t, 5, len(attestations))
require.Equal(t, 4, len(attestations))
checkAttestationsOrdered(t, attestations)
assert.Equal(t, attestedAt[0], attestations[0].AttestedAt)
assert.Equal(t, batchHeaders[0].BatchRoot, attestations[0].BatchRoot)
assert.Equal(t, attestedAt[4], attestations[4].AttestedAt)
assert.Equal(t, batchHeaders[4].BatchRoot, attestations[4].BatchRoot)
assert.Equal(t, attestedAt[3], attestations[3].AttestedAt)
assert.Equal(t, batchHeaders[3].BatchRoot, attestations[3].BatchRoot)
})

// Test pagination
t.Run("pagination", func(t *testing.T) {
for i := 1; i < numBatches; i++ {
attestations, err := blobMetadataStore.GetAttestationByAttestedAtForward(ctx, attestedAt[i-1], attestedAt[i], 1)
attestations, err := blobMetadataStore.GetAttestationByAttestedAtForward(ctx, attestedAt[i-1], attestedAt[i]+1, 1)
require.NoError(t, err)
require.Equal(t, 1, len(attestations))
assert.Equal(t, attestedAt[i], attestations[0].AttestedAt)
Expand Down Expand Up @@ -983,7 +988,11 @@ func TestBlobMetadataStoreGetAttestationByAttestedAtBackward(t *testing.T) {
// Test invalid time range
_, err := blobMetadataStore.GetAttestationByAttestedAtBackward(ctx, 1, 1, 0)
require.Error(t, err)
assert.Equal(t, "until must be less than before", err.Error())
assert.Equal(t, "no time point in exclusive time range (1, 1)", err.Error())

_, err = blobMetadataStore.GetAttestationByAttestedAtBackward(ctx, 2, 1, 0)
require.Error(t, err)
assert.Equal(t, "no time point in exclusive time range (1, 2)", err.Error())

// Test empty range
attestations, err := blobMetadataStore.GetAttestationByAttestedAtBackward(
Expand Down Expand Up @@ -1025,28 +1034,28 @@ func TestBlobMetadataStoreGetAttestationByAttestedAtBackward(t *testing.T) {
attestations, err := blobMetadataStore.GetAttestationByAttestedAtBackward(
ctx,
attestedAt[numBatches-1], // before (exclusive)
firstBatchTs, // until (inclusive)
firstBatchTs, // until (exclusive)
0,
)
require.NoError(t, err)
require.Equal(t, numBatches-1, len(attestations))
require.Equal(t, numBatches-2, len(attestations))
// The first one returned is not "before" (as "before" is exclusive)
assert.Equal(t, attestedAt[numBatches-2], attestations[0].AttestedAt)
// The last one returned is the first batch (as "until" is inclusive)
assert.Equal(t, attestedAt[0], attestations[numBatches-2].AttestedAt)
// The last one returned is the second batch (as "until" is exclusive)
assert.Equal(t, attestedAt[1], attestations[numBatches-3].AttestedAt)
checkAttestationsOrderedDesc(t, attestations)

// Test inclusive until - should include the oldest item
// Test exclusive until - should not include the oldest item
attestations, err = blobMetadataStore.GetAttestationByAttestedAtBackward(
ctx,
attestedAt[4]+1, // before: just after 4th item (so this batch should be included)
attestedAt[0], // until: oldest item (should be included)
attestedAt[0], // until: oldest item (should not be included)
0,
)
require.NoError(t, err)
require.Equal(t, 4, len(attestations))
assert.Equal(t, attestedAt[4], attestations[0].AttestedAt)
assert.Equal(t, attestedAt[0], attestations[4].AttestedAt)
require.Equal(t, 5, len(attestations))
assert.Equal(t, attestedAt[1], attestations[3].AttestedAt)
checkAttestationsOrderedDesc(t, attestations)
})

Expand Down
23 changes: 15 additions & 8 deletions disperser/dataapi/v2/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package v2
import (
"context"
"encoding/hex"
"errors"
"fmt"
"net/http"
"strconv"
Expand Down Expand Up @@ -132,7 +131,7 @@ func (s *ServerV2) FetchBlobFeed(c *gin.Context) {

if direction == "forward" {
startCursor := afterCursor
// The presence of `cusor` param will override the `after` param
// The presence of `cursor` param will override the `after` param
if current.RequestedAt > 0 {
startCursor = current
}
Expand All @@ -143,9 +142,17 @@ func (s *ServerV2) FetchBlobFeed(c *gin.Context) {
limit,
)
} else {
// TODO(jianxiao): To be implemented
errorResponse(c, errors.New("Not Implemented"))
return
endCursor := beforeCursor
// The presence of `cursor` param will override the `before` param
if current.RequestedAt > 0 {
endCursor = current
}
blobs, nextCursor, err = s.blobMetadataStore.GetBlobMetadataByRequestedAtBackward(
c.Request.Context(),
endCursor,
afterCursor,
limit,
)
}

if err != nil {
Expand Down Expand Up @@ -393,7 +400,7 @@ func (s *ServerV2) sendBlobFeedResponse(
for i := 0; i < len(blobs); i++ {
bk, err := blobs[i].BlobHeader.BlobKey()
if err != nil {
s.metrics.IncrementFailedRequestNum("FetchBlobFeedForward")
s.metrics.IncrementFailedRequestNum("FetchBlobFeed")
errorResponse(c, fmt.Errorf("failed to serialize blob key: %w", err))
return
}
Expand All @@ -405,7 +412,7 @@ func (s *ServerV2) sendBlobFeedResponse(
Cursor: cursorStr,
}
c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxFeedBlobAge))
s.metrics.IncrementSuccessfulRequestNum("FetchBlobFeedForward")
s.metrics.ObserveLatency("FetchBlobFeedForward", time.Since(handlerStart))
s.metrics.IncrementSuccessfulRequestNum("FetchBlobFeed")
s.metrics.ObserveLatency("FetchBlobFeed", time.Since(handlerStart))
c.JSON(http.StatusOK, response)
}
Loading
Loading