diff --git a/common/kvstore/leveldb/metrics.go b/common/kvstore/leveldb/metrics.go index 47204a1a7e..d0c727bee5 100644 --- a/common/kvstore/leveldb/metrics.go +++ b/common/kvstore/leveldb/metrics.go @@ -1,12 +1,12 @@ package leveldb import ( + "fmt" "sync" "time" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/syndtr/goleveldb/leveldb" ) @@ -62,17 +62,15 @@ var ( levelSize *prometheus.GaugeVec levelReadBytes *prometheus.GaugeVec levelWriteBytes *prometheus.GaugeVec - - // Operation latency metrics - operationLatency *prometheus.HistogramVec - - // Error metrics - errorCount *prometheus.CounterVec ) -func newLevelDBMetrics(reg *prometheus.Registry) { +func newLevelDBMetrics(reg *prometheus.Registry) error { + if reg == nil { + return fmt.Errorf("prometheus registry cannot be nil") + } + // Compaction metrics - compactionLatency = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + compactionLatencyMetric := prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "compaction_duration_seconds", Namespace: "eigenda", Subsystem: "leveldb", @@ -80,88 +78,208 @@ func newLevelDBMetrics(reg *prometheus.Registry) { Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), }, []string{"type", "name"}) - compactionThroughput = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + if err := reg.Register(compactionLatencyMetric); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + compactionLatency = are.ExistingCollector.(*prometheus.HistogramVec) + } else { + return fmt.Errorf("failed to register compaction latency metric: %w", err) + } + } else { + compactionLatency = compactionLatencyMetric + } + + compactionThroughputMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "compaction_throughput_bytes_per_second", Namespace: "eigenda", Subsystem: "leveldb", Help: "Rate of data processed during compaction operations (read/write)", }, []string{"operation", "name"}) + if err := reg.Register(compactionThroughputMetric); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + compactionThroughput = are.ExistingCollector.(*prometheus.GaugeVec) + } else { + return fmt.Errorf("failed to register compaction throughput metric: %w", err) + } + } else { + compactionThroughput = compactionThroughputMetric + } + // Resource utilization metrics - readThroughput = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + readThroughputMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "read_throughput_bytes_per_second", Namespace: "eigenda", Subsystem: "leveldb", Help: "Rate of bytes read per second", }, []string{"name"}) - writeThroughput = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + if err := reg.Register(readThroughputMetric); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + readThroughput = are.ExistingCollector.(*prometheus.GaugeVec) + } else { + return fmt.Errorf("failed to register read throughput metric: %w", err) + } + } else { + readThroughput = readThroughputMetric + } + + writeThroughputMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "write_throughput_bytes_per_second", Namespace: "eigenda", Subsystem: "leveldb", Help: "Rate of bytes written per second", }, []string{"name"}) - openTableCount = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + if err := reg.Register(writeThroughputMetric); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + writeThroughput = are.ExistingCollector.(*prometheus.GaugeVec) + } else { + return fmt.Errorf("failed to register write throughput metric: %w", err) + } + } else { + writeThroughput = writeThroughputMetric + } + + openTableCountMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "open_tables_total", Namespace: "eigenda", Subsystem: "leveldb", Help: "Number of currently open tables", }, []string{"name"}) - blockCacheSize = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + if err := reg.Register(openTableCountMetric); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + openTableCount = are.ExistingCollector.(*prometheus.GaugeVec) + } else { + return fmt.Errorf("failed to register open table count metric: %w", err) + } + } else { + openTableCount = openTableCountMetric + } + + blockCacheSizeMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "block_cache_bytes", Namespace: "eigenda", Subsystem: "leveldb", Help: "Size of block cache in bytes", }, []string{"name"}) + if err := reg.Register(blockCacheSizeMetric); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + blockCacheSize = are.ExistingCollector.(*prometheus.GaugeVec) + } else { + return fmt.Errorf("failed to register block cache size metric: %w", err) + } + } else { + blockCacheSize = blockCacheSizeMetric + } + // Performance metrics - compactionCount = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + compactionCountMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "compactions_total", Namespace: "eigenda", Subsystem: "leveldb", Help: "Number of compactions by type (memory, level0, nonlevel0, seek)", }, []string{"type", "name"}) - writePaused = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + if err := reg.Register(compactionCountMetric); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + compactionCount = are.ExistingCollector.(*prometheus.GaugeVec) + } else { + return fmt.Errorf("failed to register compaction count metric: %w", err) + } + } else { + compactionCount = compactionCountMetric + } + + writePausedMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "write_paused", Namespace: "eigenda", Subsystem: "leveldb", Help: "Whether writes are currently paused (1 for yes, 0 for no)", }, []string{"name"}) + if err := reg.Register(writePausedMetric); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + writePaused = are.ExistingCollector.(*prometheus.GaugeVec) + } else { + return fmt.Errorf("failed to register write paused metric: %w", err) + } + } else { + writePaused = writePausedMetric + } + // Level-specific metrics - levelTableCount = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + levelTableCountMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "level_tables_total", Namespace: "eigenda", Subsystem: "leveldb", Help: "Number of tables in each level", }, []string{"level", "name"}) - levelSize = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + if err := reg.Register(levelTableCountMetric); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + levelTableCount = are.ExistingCollector.(*prometheus.GaugeVec) + } else { + return fmt.Errorf("failed to register level table count metric: %w", err) + } + } else { + levelTableCount = levelTableCountMetric + } + + levelSizeMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "level_size_bytes", Namespace: "eigenda", Subsystem: "leveldb", Help: "Size of each level in bytes", }, []string{"level", "name"}) - levelReadBytes = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + if err := reg.Register(levelSizeMetric); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + levelSize = are.ExistingCollector.(*prometheus.GaugeVec) + } else { + return fmt.Errorf("failed to register level size metric: %w", err) + } + } else { + levelSize = levelSizeMetric + } + + levelReadBytesMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "level_read_bytes_total", Namespace: "eigenda", Subsystem: "leveldb", Help: "Total bytes read from each level", }, []string{"level", "name"}) - levelWriteBytes = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + if err := reg.Register(levelReadBytesMetric); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + levelReadBytes = are.ExistingCollector.(*prometheus.GaugeVec) + } else { + return fmt.Errorf("failed to register level read bytes metric: %w", err) + } + } else { + levelReadBytes = levelReadBytesMetric + } + + levelWriteBytesMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "level_write_bytes_total", Namespace: "eigenda", Subsystem: "leveldb", Help: "Total bytes written to each level", }, []string{"level", "name"}) + if err := reg.Register(levelWriteBytesMetric); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + levelWriteBytes = are.ExistingCollector.(*prometheus.GaugeVec) + } else { + return fmt.Errorf("failed to register level write bytes metric: %w", err) + } + } else { + levelWriteBytes = levelWriteBytesMetric + } + // Operation latency metrics - operationLatency = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + operationLatencyMetric := prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "operation_duration_seconds", Namespace: "eigenda", Subsystem: "leveldb", @@ -169,13 +287,7 @@ func newLevelDBMetrics(reg *prometheus.Registry) { Buckets: prometheus.ExponentialBuckets(0.0001, 2, 15), }, []string{"operation", "name"}) - // Error metrics - errorCount = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "errors_total", - Namespace: "eigenda", - Subsystem: "leveldb", - Help: "Total number of errors by type", - }, []string{"type", "name"}) + return nil } // NewMetricsCollector creates a new metrics collector with the given configuration @@ -187,7 +299,10 @@ func NewMetricsCollector(db *leveldb.DB, logger logging.Logger, config MetricsCo config.DegradationThreshold = DefaultMetricsConfig.DegradationThreshold } - newLevelDBMetrics(reg) + if err := newLevelDBMetrics(reg); err != nil { + logger.Error("Failed to initialize LevelDB metrics", "error", err) + return nil + } mc := &MetricsCollector{ db: db,