Skip to content

Commit

Permalink
Apply feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
dmanc committed Feb 6, 2025
1 parent ef3d38c commit 23554fa
Showing 1 changed file with 17 additions and 24 deletions.
41 changes: 17 additions & 24 deletions disperser/dataapi/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewMetrics(serverVersion uint, blobMetadataStore interface{}, httpPort stri
}
} else if serverVersion == 2 {
if store, ok := blobMetadataStore.(*blobstorev2.BlobMetadataStore); ok {
reg.MustRegister(NewDynamoDBCollectorV2(store, logger))
reg.MustRegister(NewBlobMetadataStoreV2Collector(store, logger))
}
}
metrics := &Metrics{
Expand Down Expand Up @@ -252,18 +252,18 @@ type BlobStatusMetrics struct {
currentValue float64
}

// DynamoDBCollectorV2 collects metrics about blob metadata from DynamoDB.
type DynamoDBCollectorV2 struct {
// BlobMetadataStoreV2Collector collects metrics from the blob metadata store.
type BlobMetadataStoreV2Collector struct {
blobMetadataStore *blobstorev2.BlobMetadataStore
statusMetrics map[commonv2.BlobStatus]*BlobStatusMetrics
logger logging.Logger
ctx context.Context
cancel context.CancelFunc
}

func NewDynamoDBCollectorV2(blobMetadataStore *blobstorev2.BlobMetadataStore, logger logging.Logger) *DynamoDBCollectorV2 {
func NewBlobMetadataStoreV2Collector(blobMetadataStore *blobstorev2.BlobMetadataStore, logger logging.Logger) *BlobMetadataStoreV2Collector {
ctx, cancel := context.WithCancel(context.Background())
collector := &DynamoDBCollectorV2{
collector := &BlobMetadataStoreV2Collector{
blobMetadataStore: blobMetadataStore,
statusMetrics: make(map[commonv2.BlobStatus]*BlobStatusMetrics),
logger: logger,
Expand All @@ -279,8 +279,8 @@ func NewDynamoDBCollectorV2(blobMetadataStore *blobstorev2.BlobMetadataStore, lo
} {
gauge := prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "dynamodb_blob_metadata_v2_status_count",
Help: "Current number of blobs in this status. In case of timeouts or failures when querying DynamoDB (e.g. when there are too many blobs), the last known value will be returned as stale data.",
Name: "eigenda_blob_metadata_v2_status_count",
Help: "Current number of blobs in this status. In case of timeouts or failures when querying the blob metadata store (e.g. when there are too many blobs), the last known value will be returned as stale data.",
ConstLabels: prometheus.Labels{
"status": status.String(),
},
Expand All @@ -300,7 +300,7 @@ func NewDynamoDBCollectorV2(blobMetadataStore *blobstorev2.BlobMetadataStore, lo
}

// countBlobsWithStatus counts blobs for a specific status with pagination and timeout handling
func (collector *DynamoDBCollectorV2) countBlobsWithStatus(ctx context.Context, status commonv2.BlobStatus) (int32, error) {
func (collector *BlobMetadataStoreV2Collector) countBlobsWithStatus(ctx context.Context, status commonv2.BlobStatus) (int32, error) {
var totalCount int32
var cursor *blobstorev2.StatusIndexCursor

Expand Down Expand Up @@ -332,7 +332,7 @@ func (collector *DynamoDBCollectorV2) countBlobsWithStatus(ctx context.Context,
}
}

func (collector *DynamoDBCollectorV2) updateCounts(ctx context.Context) {
func (collector *BlobMetadataStoreV2Collector) updateCounts(ctx context.Context) {
collector.logger.Debug("Starting blob status count update")
startTime := time.Now()

Expand All @@ -343,20 +343,12 @@ func (collector *DynamoDBCollectorV2) updateCounts(ctx context.Context) {
defer cancel()

if err != nil {
if err == context.DeadlineExceeded {
collector.logger.Warn("Timeout while counting blobs",
"status", status.String(),
"current_count", metrics.currentValue,
"using_stale_data", true,
)
continue // Keep using the last known value
}
collector.logger.Error("Failed to get count of blob metadata by status",
collector.logger.Error("Failed to get count of blob metadata by status - using stale data",
"status", status,
"err", err,
"current_value", metrics.currentValue,
"current_count", metrics.currentValue,
)
continue
continue // Keep using the last known value
}

metrics.gauge.Set(float64(totalCount))
Expand All @@ -373,17 +365,18 @@ func (collector *DynamoDBCollectorV2) updateCounts(ctx context.Context) {
)
}

func (collector *DynamoDBCollectorV2) Describe(ch chan<- *prometheus.Desc) {
func (collector *BlobMetadataStoreV2Collector) Describe(ch chan<- *prometheus.Desc) {
for _, metrics := range collector.statusMetrics {
ch <- metrics.gauge.Desc()
}
}

func (collector *DynamoDBCollectorV2) Collect(ch chan<- prometheus.Metric) {
func (collector *BlobMetadataStoreV2Collector) Collect(ch chan<- prometheus.Metric) {
collector.logger.Debug("Prometheus scrape triggered, updating counts")
startTime := time.Now()

// Create a context with timeout for the entire collection
// Create a context with timeout for the entire collection.
// The default scrape timeout is 10 seconds so we set it to 8 seconds to allow for some time for the collection.
ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second)
defer cancel()

Expand All @@ -395,7 +388,7 @@ func (collector *DynamoDBCollectorV2) Collect(ch chan<- prometheus.Metric) {
ch <- metrics.gauge
}

collector.logger.Debug("Completed DynamoDB collector scrape",
collector.logger.Debug("Completed blob metadata store v2 collector scrape",
"duration_ms", time.Since(startTime).Milliseconds(),
)
}

0 comments on commit 23554fa

Please sign in to comment.