Skip to content

Commit

Permalink
Use existing collector if already registered
Browse files Browse the repository at this point in the history
  • Loading branch information
dmanc committed Feb 22, 2025
1 parent 459d8c6 commit 87df535
Showing 1 changed file with 144 additions and 29 deletions.
173 changes: 144 additions & 29 deletions common/kvstore/leveldb/metrics.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package leveldb

import (
"fmt"
"sync"
"time"

"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/syndtr/goleveldb/leveldb"
)

Expand Down Expand Up @@ -62,120 +62,232 @@ var (
levelSize *prometheus.GaugeVec
levelReadBytes *prometheus.GaugeVec
levelWriteBytes *prometheus.GaugeVec

// Operation latency metrics
operationLatency *prometheus.HistogramVec

// Error metrics
errorCount *prometheus.CounterVec
)

func newLevelDBMetrics(reg *prometheus.Registry) {
func newLevelDBMetrics(reg *prometheus.Registry) error {
if reg == nil {
return fmt.Errorf("prometheus registry cannot be nil")
}

// Compaction metrics
compactionLatency = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
compactionLatencyMetric := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "compaction_duration_seconds",
Namespace: "eigenda",
Subsystem: "leveldb",
Help: "Duration of compaction operations by type (memory, level0, non-level0)",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
}, []string{"type", "name"})

compactionThroughput = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
if err := reg.Register(compactionLatencyMetric); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
compactionLatency = are.ExistingCollector.(*prometheus.HistogramVec)
} else {
return fmt.Errorf("failed to register compaction latency metric: %w", err)
}
} else {
compactionLatency = compactionLatencyMetric
}

compactionThroughputMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "compaction_throughput_bytes_per_second",
Namespace: "eigenda",
Subsystem: "leveldb",
Help: "Rate of data processed during compaction operations (read/write)",
}, []string{"operation", "name"})

if err := reg.Register(compactionThroughputMetric); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
compactionThroughput = are.ExistingCollector.(*prometheus.GaugeVec)
} else {
return fmt.Errorf("failed to register compaction throughput metric: %w", err)
}
} else {
compactionThroughput = compactionThroughputMetric
}

// Resource utilization metrics
readThroughput = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
readThroughputMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "read_throughput_bytes_per_second",
Namespace: "eigenda",
Subsystem: "leveldb",
Help: "Rate of bytes read per second",
}, []string{"name"})

writeThroughput = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
if err := reg.Register(readThroughputMetric); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
readThroughput = are.ExistingCollector.(*prometheus.GaugeVec)
} else {
return fmt.Errorf("failed to register read throughput metric: %w", err)
}
} else {
readThroughput = readThroughputMetric
}

writeThroughputMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "write_throughput_bytes_per_second",
Namespace: "eigenda",
Subsystem: "leveldb",
Help: "Rate of bytes written per second",
}, []string{"name"})

openTableCount = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
if err := reg.Register(writeThroughputMetric); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
writeThroughput = are.ExistingCollector.(*prometheus.GaugeVec)
} else {
return fmt.Errorf("failed to register write throughput metric: %w", err)
}
} else {
writeThroughput = writeThroughputMetric
}

openTableCountMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "open_tables_total",
Namespace: "eigenda",
Subsystem: "leveldb",
Help: "Number of currently open tables",
}, []string{"name"})

blockCacheSize = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
if err := reg.Register(openTableCountMetric); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
openTableCount = are.ExistingCollector.(*prometheus.GaugeVec)
} else {
return fmt.Errorf("failed to register open table count metric: %w", err)
}
} else {
openTableCount = openTableCountMetric
}

blockCacheSizeMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "block_cache_bytes",
Namespace: "eigenda",
Subsystem: "leveldb",
Help: "Size of block cache in bytes",
}, []string{"name"})

if err := reg.Register(blockCacheSizeMetric); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
blockCacheSize = are.ExistingCollector.(*prometheus.GaugeVec)
} else {
return fmt.Errorf("failed to register block cache size metric: %w", err)
}
} else {
blockCacheSize = blockCacheSizeMetric
}

// Performance metrics
compactionCount = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
compactionCountMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "compactions_total",
Namespace: "eigenda",
Subsystem: "leveldb",
Help: "Number of compactions by type (memory, level0, nonlevel0, seek)",
}, []string{"type", "name"})

writePaused = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
if err := reg.Register(compactionCountMetric); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
compactionCount = are.ExistingCollector.(*prometheus.GaugeVec)
} else {
return fmt.Errorf("failed to register compaction count metric: %w", err)
}
} else {
compactionCount = compactionCountMetric
}

writePausedMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "write_paused",
Namespace: "eigenda",
Subsystem: "leveldb",
Help: "Whether writes are currently paused (1 for yes, 0 for no)",
}, []string{"name"})

if err := reg.Register(writePausedMetric); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
writePaused = are.ExistingCollector.(*prometheus.GaugeVec)
} else {
return fmt.Errorf("failed to register write paused metric: %w", err)
}
} else {
writePaused = writePausedMetric
}

// Level-specific metrics
levelTableCount = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
levelTableCountMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "level_tables_total",
Namespace: "eigenda",
Subsystem: "leveldb",
Help: "Number of tables in each level",
}, []string{"level", "name"})

levelSize = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
if err := reg.Register(levelTableCountMetric); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
levelTableCount = are.ExistingCollector.(*prometheus.GaugeVec)
} else {
return fmt.Errorf("failed to register level table count metric: %w", err)
}
} else {
levelTableCount = levelTableCountMetric
}

levelSizeMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "level_size_bytes",
Namespace: "eigenda",
Subsystem: "leveldb",
Help: "Size of each level in bytes",
}, []string{"level", "name"})

levelReadBytes = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
if err := reg.Register(levelSizeMetric); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
levelSize = are.ExistingCollector.(*prometheus.GaugeVec)
} else {
return fmt.Errorf("failed to register level size metric: %w", err)
}
} else {
levelSize = levelSizeMetric
}

levelReadBytesMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "level_read_bytes_total",
Namespace: "eigenda",
Subsystem: "leveldb",
Help: "Total bytes read from each level",
}, []string{"level", "name"})

levelWriteBytes = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
if err := reg.Register(levelReadBytesMetric); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
levelReadBytes = are.ExistingCollector.(*prometheus.GaugeVec)
} else {
return fmt.Errorf("failed to register level read bytes metric: %w", err)
}
} else {
levelReadBytes = levelReadBytesMetric
}

levelWriteBytesMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "level_write_bytes_total",
Namespace: "eigenda",
Subsystem: "leveldb",
Help: "Total bytes written to each level",
}, []string{"level", "name"})

if err := reg.Register(levelWriteBytesMetric); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
levelWriteBytes = are.ExistingCollector.(*prometheus.GaugeVec)
} else {
return fmt.Errorf("failed to register level write bytes metric: %w", err)
}
} else {
levelWriteBytes = levelWriteBytesMetric
}

// Operation latency metrics
operationLatency = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
operationLatencyMetric := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "operation_duration_seconds",
Namespace: "eigenda",
Subsystem: "leveldb",
Help: "Duration of database operations (get, put, delete, batch)",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 15),
}, []string{"operation", "name"})

// Error metrics
errorCount = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "errors_total",
Namespace: "eigenda",
Subsystem: "leveldb",
Help: "Total number of errors by type",
}, []string{"type", "name"})
return nil
}

// NewMetricsCollector creates a new metrics collector with the given configuration
Expand All @@ -187,7 +299,10 @@ func NewMetricsCollector(db *leveldb.DB, logger logging.Logger, config MetricsCo
config.DegradationThreshold = DefaultMetricsConfig.DegradationThreshold
}

newLevelDBMetrics(reg)
if err := newLevelDBMetrics(reg); err != nil {
logger.Error("Failed to initialize LevelDB metrics", "error", err)
return nil
}

mc := &MetricsCollector{
db: db,
Expand Down

0 comments on commit 87df535

Please sign in to comment.