Skip to content

Commit

Permalink
fix disk metrics calculation and reorganize code (#6224)
Browse files Browse the repository at this point in the history
  • Loading branch information
dejanzele authored Mar 6, 2025
1 parent 644e410 commit 201e2a7
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 181 deletions.
93 changes: 93 additions & 0 deletions pkg/utilization/format.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package utilization

import (
"fmt"
"math"

"github.com/kubeshop/testkube/pkg/utilization/core"
)

func (r *MetricRecorder) buildMemoryFields(metrics *Metrics) []core.KeyValue {
if metrics.Memory == nil {
return nil
}
return []core.KeyValue{
core.NewKeyValue("used", fmt.Sprintf("%d", metrics.Memory.RSS)),
}
}

func (r *MetricRecorder) buildCPUFields(metrics *Metrics) []core.KeyValue {
return []core.KeyValue{
core.NewKeyValue("percent", fmt.Sprintf("%.2f", metrics.CPU)),
core.NewKeyValue("millicores", fmt.Sprintf("%d", int64(math.Round(metrics.CPU*10)))),
}
}

func (r *MetricRecorder) buildNetworkFields(current, previous *Metrics) []core.KeyValue {
if current.Network == nil {
return nil
}
bytesSent := current.Network.BytesSent
bytesRecv := current.Network.BytesRecv
values := []core.KeyValue{
core.NewKeyValue("bytes_sent_total", fmt.Sprintf("%d", bytesSent)),
core.NewKeyValue("bytes_recv_total", fmt.Sprintf("%d", bytesRecv)),
}
if previous.Network != nil {
previousBytesSent := previous.Network.BytesSent
previousBytesRecv := previous.Network.BytesRecv
var bytesSentRate, bytesRecvRate uint64
// This safety guard is because if a network interface is removed,
// the bytes sent and received will be removed from the calculation,
// and we can end up with lower values than the previous ones.
// Issue: https://github.com/shirou/gopsutil/issues/511
if bytesSent > previousBytesSent {
bytesSentRate = bytesSent - previousBytesSent
}
if bytesRecv > previousBytesRecv {
bytesRecvRate = bytesRecv - previousBytesRecv
}
values = append(
values,
core.NewKeyValue("bytes_sent_per_s", fmt.Sprintf("%d", bytesSentRate)),
core.NewKeyValue("bytes_recv_per_s", fmt.Sprintf("%d", bytesRecvRate)),
)
}

return values
}

func (r *MetricRecorder) buildDiskFields(current, previous *Metrics) []core.KeyValue {
if current.Disk == nil {
return nil
}

diskReadBytes := current.Disk.ReadBytes
diskWriteBytes := current.Disk.WriteBytes
values := []core.KeyValue{
core.NewKeyValue("read_bytes_total", fmt.Sprintf("%d", diskReadBytes)),
core.NewKeyValue("write_bytes_total", fmt.Sprintf("%d", diskWriteBytes)),
}
if previous.Disk != nil {
previousDiskReadBytes := previous.Disk.ReadBytes
previousDiskWriteBytes := previous.Disk.WriteBytes
var diskReadBytesRate, diskWriteBytesRate uint64
// This safety guard is because if a disk is unmounted,
// the bytes sent and received will be removed from the calculation,
// and we can end up with lower values than the previous ones.
// Issue: https://github.com/shirou/gopsutil/issues/511
if diskReadBytes > previousDiskReadBytes {
diskReadBytesRate = diskReadBytes - previousDiskReadBytes
}
if diskWriteBytes > previousDiskWriteBytes {
diskWriteBytesRate = diskWriteBytes - previousDiskWriteBytes
}
values = append(
values,
core.NewKeyValue("read_bytes_per_s", fmt.Sprintf("%d", diskReadBytesRate)),
core.NewKeyValue("write_bytes_per_s", fmt.Sprintf("%d", diskWriteBytesRate)),
)
}

return values
}
213 changes: 56 additions & 157 deletions pkg/utilization/instrument.go
Original file line number Diff line number Diff line change
@@ -1,175 +1,77 @@
package utilization

import (
"context"
errors2 "errors"
"fmt"
"math"
"sync"

"github.com/shirou/gopsutil/v4/disk"

"github.com/pkg/errors"
"github.com/shirou/gopsutil/v4/net"
gopsutil "github.com/shirou/gopsutil/v4/process"

"github.com/kubeshop/testkube/pkg/utilization/core"
"github.com/shirou/gopsutil/v4/process"
)

type Metrics struct {
Memory *gopsutil.MemoryInfoStat
Memory *process.MemoryInfoStat
CPU float64
Disk *gopsutil.IOCountersStat
Disk *disk.IOCountersStat
Network *net.IOCountersStat
}

// record records the metrics of the provided process.
func (r *MetricRecorder) record(process *gopsutil.Process) (*Metrics, error) {
// Instrument the current process
metrics, errs := instrument(process)
if len(errs) > 0 {
return nil, errors.Wrapf(errors2.Join(errs...), "failed to gather some metrics for process with pid %q", process.Pid)
}

return metrics, nil
}

func (r *MetricRecorder) write(ctx context.Context, metrics, previous *Metrics) error {
// Build each set of metrics
memoryMetrics := r.format.Format("memory", r.tags, r.buildMemoryFields(metrics))
cpuMetrics := r.format.Format("cpu", r.tags, r.buildCPUFields(metrics))
networkMetrics := r.format.Format("network", r.tags, r.buildNetworkFields(metrics, previous))
diskMetrics := r.format.Format("disk", r.tags, r.buildDiskFields(metrics, previous))

// Combine all metrics so we can write them all at once
data := fmt.Sprintf("%s\n%s\n%s\n%s", memoryMetrics, cpuMetrics, networkMetrics, diskMetrics)
if err := r.writer.Write(ctx, data); err != nil {
return errors.Wrap(err, "failed to write combined metrics")
}

return nil
// record captures a single metrics data point for all processes .
func (r *MetricRecorder) record() (*Metrics, error) {
processes, err := process.Processes()
if err != nil {
return nil, err
}

metrics := make([]*Metrics, len(processes))
wg := sync.WaitGroup{}
wg.Add(len(processes))
for i := range processes {
go func(i int) {
defer wg.Done()
m, err := instrument(processes[i])
if err != nil {
return
}
metrics[i] = m
}(i)
}
wg.Wait()
// aggregate CPU and Memory metrics as they are fetched per process
aggregated := aggregate(metrics)
// fetch Disk and Network metrics and add them to the aggregated metrics
r.recordSystemWideMetrics(aggregated)

return aggregated, nil
}

func instrument(process *gopsutil.Process) (*Metrics, []error) {
// instrument captures the metrics of the provided process.
func instrument(process *process.Process) (*Metrics, error) {
var errs []error
mem, err := process.MemoryInfo()
if err != nil {
errs = append(errs, errors.Wrapf(err, "failed to get memory info"))
}
cpu, err := process.CPUPercent()
if err != nil {
errs = append(errs, errors.Wrapf(err, "failed to get cpu info"))
}
io, err := process.IOCounters()
if err != nil {
errs = append(errs, errors.Wrapf(err, "failed to get cpu info"))
}
net, err := net.IOCounters(false)
mem, err := process.MemoryInfo()
if err != nil {
errs = append(errs, errors.Wrapf(err, "failed to get network info"))
errs = append(errs, errors.Wrapf(err, "failed to get memory info"))
}
m := &Metrics{
Memory: mem,
CPU: cpu,
Disk: io,
}
if len(net) > 0 {
m.Network = &net[0]
}
return m, errs
}

func (r *MetricRecorder) buildMemoryFields(metrics *Metrics) []core.KeyValue {
if metrics.Memory == nil {
return nil
}
return []core.KeyValue{
core.NewKeyValue("used", fmt.Sprintf("%d", metrics.Memory.RSS)),
}
}

func (r *MetricRecorder) buildCPUFields(metrics *Metrics) []core.KeyValue {
return []core.KeyValue{
core.NewKeyValue("percent", fmt.Sprintf("%.2f", metrics.CPU)),
core.NewKeyValue("millicores", fmt.Sprintf("%d", int64(math.Round(metrics.CPU*10)))),
}
}

func (r *MetricRecorder) buildNetworkFields(current, previous *Metrics) []core.KeyValue {
if current.Network == nil {
return nil
}
bytesSent := current.Network.BytesSent
bytesRecv := current.Network.BytesRecv
values := []core.KeyValue{
core.NewKeyValue("bytes_sent_total", fmt.Sprintf("%d", bytesSent)),
core.NewKeyValue("bytes_recv_total", fmt.Sprintf("%d", bytesRecv)),
}
if previous.Network != nil {
previousBytesSent := previous.Network.BytesSent
previousBytesRecv := previous.Network.BytesRecv
var bytesSentRate, bytesRecvRate uint64
// This safety guard is because if a network interface is removed,
// the bytes sent and received will be removed from the calculation,
// and we can end up with lower values than the previous ones.
// Issue: https://github.com/shirou/gopsutil/issues/511
if bytesSent > previousBytesSent {
bytesSentRate = bytesSent - previousBytesSent
}
if bytesRecv > previousBytesRecv {
bytesRecvRate = bytesRecv - previousBytesRecv
}
values = append(
values,
core.NewKeyValue("bytes_sent_per_s", fmt.Sprintf("%d", bytesSentRate)),
core.NewKeyValue("bytes_recv_per_s", fmt.Sprintf("%d", bytesRecvRate)),
)
}

return values
}

func (r *MetricRecorder) buildDiskFields(current, previous *Metrics) []core.KeyValue {
if current.Disk == nil {
return nil
}

diskReadBytes := current.Disk.DiskReadBytes
diskWriteBytes := current.Disk.DiskWriteBytes
values := []core.KeyValue{
core.NewKeyValue("read_bytes_total", fmt.Sprintf("%d", diskReadBytes)),
core.NewKeyValue("write_bytes_total", fmt.Sprintf("%d", diskWriteBytes)),
}
if previous.Disk != nil {
previousDiskReadBytes := previous.Disk.DiskReadBytes
previousDiskWriteBytes := previous.Disk.DiskWriteBytes
var diskReadBytesRate, diskWriteBytesRate uint64
// This safety guard is because if a disk is unmounted,
// the bytes sent and received will be removed from the calculation,
// and we can end up with lower values than the previous ones.
// Issue: https://github.com/shirou/gopsutil/issues/511
if diskReadBytes > previousDiskReadBytes {
diskReadBytesRate = diskReadBytes - previousDiskReadBytes
}
if diskWriteBytes > previousDiskWriteBytes {
diskWriteBytesRate = diskWriteBytes - previousDiskWriteBytes
}
values = append(
values,
core.NewKeyValue("read_bytes_per_s", fmt.Sprintf("%d", diskReadBytesRate)),
core.NewKeyValue("write_bytes_per_s", fmt.Sprintf("%d", diskWriteBytesRate)),
)
Memory: mem,
}

return values
return m, errors2.Join(errs...)
}

// aggregate aggregates the metrics from multiple processes.
// Some test tools might spawn multiple processes to run the tests.
// Example: when executing JMeter, the entry process is a shell script which spawns the actual JMeter Java process.
func aggregate(metrics []*Metrics) *Metrics {
aggregated := &Metrics{
Memory: &gopsutil.MemoryInfoStat{},
CPU: 0,
Disk: &gopsutil.IOCountersStat{},
Network: &net.IOCountersStat{},
Memory: &process.MemoryInfoStat{},
CPU: 0,
}
for _, m := range metrics {
if m == nil {
Expand All @@ -185,24 +87,21 @@ func aggregate(metrics []*Metrics) *Metrics {
aggregated.Memory.Stack += m.Memory.Stack
}
aggregated.CPU += m.CPU
if m.Disk != nil {
aggregated.Disk.ReadCount += m.Disk.ReadCount
aggregated.Disk.WriteCount += m.Disk.WriteCount
aggregated.Disk.ReadBytes += m.Disk.ReadBytes
aggregated.Disk.WriteBytes += m.Disk.WriteBytes
aggregated.Disk.DiskReadBytes += m.Disk.DiskReadBytes
aggregated.Disk.DiskWriteBytes += m.Disk.DiskWriteBytes
}
if m.Network != nil {
aggregated.Network.BytesSent = m.Network.BytesSent
aggregated.Network.BytesRecv = m.Network.BytesRecv
aggregated.Network.PacketsSent = m.Network.PacketsSent
aggregated.Network.PacketsRecv = m.Network.PacketsRecv
aggregated.Network.Errin = m.Network.Errin
aggregated.Network.Errout = m.Network.Errout
aggregated.Network.Dropin = m.Network.Dropin
aggregated.Network.Dropout = m.Network.Dropout
}
}
return aggregated
}

// recordSystemWideMetrics captures the network and disk system-wide metrics by using the global gopsutil packages instead of the process one.
func (r *MetricRecorder) recordSystemWideMetrics(aggregated *Metrics) {
diskName := "vda"
io, _ := disk.IOCounters(diskName)
if len(io) > 0 {
d := io[diskName]
aggregated.Disk = &d
}
n, _ := net.IOCounters(false)
if len(n) > 0 {
n := n[0]
aggregated.Network = &n
}
}
Loading

0 comments on commit 201e2a7

Please sign in to comment.