Skip to content

Commit

Permalink
feat: add support for parallel steps when recording metrics and fix b…
Browse files Browse the repository at this point in the history
…ug wit… (#6210)

* add support for parallel steps when recording metrics and fix bug with multiple processes

Signed-off-by: Dejan Zele Pejchev <pejcev.dejan@gmail.com>

* remove unneeded logic for metrics scraper

Signed-off-by: Dejan Zele Pejchev <pejcev.dejan@gmail.com>

---------

Signed-off-by: Dejan Zele Pejchev <pejcev.dejan@gmail.com>
  • Loading branch information
dejanzele authored Mar 4, 2025
1 parent 2320f92 commit e18eb52
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 61 deletions.
4 changes: 4 additions & 0 deletions cmd/testworkflow-init/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func newMetricsRecorderConfig(stepRef string, skip bool, containerResources test
Workflow: s.InternalConfig.Workflow.Name,
Step: stepRef,
Execution: s.InternalConfig.Execution.Id,
Resource: s.InternalConfig.Resource.Id,
},
Format: core.FormatInflux,
ContainerResources: core.ContainerResources{
Expand All @@ -483,6 +484,9 @@ func newMetricsRecorderConfig(stepRef string, skip bool, containerResources test
}

func appendSuffixIfNeeded(s, suffix string) string {
if s == "" {
return s
}
if !strings.HasSuffix(s, suffix) {
return s + suffix
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/utilization/core/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Metadata struct {
Workflow string `meta:"workflow"`
Step Step `meta:"step"`
Execution string `meta:"execution"`
Resource string `meta:"resource"`
Lines int `meta:"lines"`
Format MetricsFormat `meta:"format"`
ContainerResources ContainerResources `meta:"resources"`
Expand Down Expand Up @@ -184,12 +185,13 @@ func parseMetadataFromFilename(filename string) (*Metadata, error) {
if len(tokens) != 3 {
return nil, errors.Errorf("invalid filename format: expected <workflow>_<step>_<execution>.<format>, got: %q", base)
}
return &Metadata{
m := &Metadata{
Workflow: tokens[0],
Step: Step{Ref: tokens[1]},
Execution: tokens[2],
Format: format,
}, nil
}
return m, nil
}

func getFormatFromFileExtension(filename string) MetricsFormat {
Expand Down Expand Up @@ -282,6 +284,8 @@ func parseMetadata(line string) (*Metadata, error) {
m.Step.Name = stepName
case "execution":
m.Execution = value
case "resource":
m.Resource = value
case "resources.requests.cpu":
m.ContainerResources.Requests.CPU = value
case "resources.requests.memory":
Expand Down
14 changes: 13 additions & 1 deletion pkg/utilization/core/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ func TestParseMetadataFromFilename(t *testing.T) {
wantErr: false,
errMessage: "",
},
{
name: "valid INFLUX file with resource",
filename: "myWorkflow_step2_0002.influx",
wantMeta: &Metadata{
Workflow: "myWorkflow",
Step: Step{Ref: "step2"},
Execution: "0002",
Format: FormatInflux,
},
wantErr: false,
errMessage: "",
},
{
name: "invalid extension",
filename: "someWorkflow_someStep_someExecution.txt",
Expand All @@ -108,7 +120,7 @@ func TestParseMetadataFromFilename(t *testing.T) {
},
{
name: "invalid format - more underscore segments",
filename: "workflow_step_execution_extra.json",
filename: "workflow_step_execution_as.json",
wantMeta: nil,
wantErr: true,
errMessage: "invalid filename format: expected <workflow>_<step>_<execution>.<format>",
Expand Down
3 changes: 2 additions & 1 deletion pkg/utilization/core/writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ type FileWriter struct {

// NewFileWriter creates a new FileWriter that writes to a file in the specified directory with the given metadata.
func NewFileWriter(dir string, metadata *Metadata, increment int) (*FileWriter, error) {
filename := fmt.Sprintf("%s_%s_%s.%s", metadata.Workflow, metadata.Step.Ref, metadata.Execution, metadata.Format)
base := fmt.Sprintf("%s_%s_%s", metadata.Workflow, metadata.Step.Ref, metadata.Execution)
filename := fmt.Sprintf("%s.%s", base, metadata.Format)
f, err := initFile(dir, filename)
if err != nil {
return nil, errors.WithStack(err)
Expand Down
14 changes: 7 additions & 7 deletions pkg/utilization/core/writers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@ func TestNewBufferedFileWriter(t *testing.T) {
t.Parallel()

tmpDir := t.TempDir()
meta := &Metadata{
meta1 := &Metadata{
Workflow: "wf",
Step: Step{Ref: "step"},
Execution: "exec",
Format: "txt",
}

writer, err := NewFileWriter(tmpDir, meta, 1)
require.NoError(t, err, "expected no error creating FileWriter")
require.NotNil(t, writer, "expected a non-nil writer")
writer1, err := NewFileWriter(tmpDir, meta1, 1)
require.NoError(t, err)
require.NotNil(t, writer1)

// Ensure the correct file was created
expectedFilename := fmt.Sprintf("%s_%s_%s.%s", meta.Workflow, meta.Step.Ref, meta.Execution, meta.Format)
fullPath := filepath.Join(tmpDir, expectedFilename)
expectedFilename1 := fmt.Sprintf("%s_%s_%s.%s", meta1.Workflow, meta1.Step.Ref, meta1.Execution, meta1.Format)
fullPath := filepath.Join(tmpDir, expectedFilename1)
_, statErr := os.Stat(fullPath)
assert.NoError(t, statErr, "expected the file to exist at %s", fullPath)

// Cleanup
require.NoError(t, writer.Close(context.Background()))
require.NoError(t, writer1.Close(context.Background()))
}

func TestInitFile(t *testing.T) {
Expand Down
91 changes: 49 additions & 42 deletions pkg/utilization/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package utilization

import (
"context"
errors2 "errors"
"fmt"
"math"
"os"

"github.com/pkg/errors"
"github.com/shirou/gopsutil/v4/net"
gopsutil "github.com/shirou/gopsutil/v4/process"

"github.com/kubeshop/testkube/cmd/testworkflow-init/output"
"github.com/kubeshop/testkube/pkg/utilization/core"
)

Expand All @@ -21,16 +20,18 @@ type Metrics struct {
Network *net.IOCountersStat
}

func (r *MetricRecorder) iterate(ctx context.Context, process *gopsutil.Process, previous *Metrics) *Metrics {
stdout := output.Std
stdoutUnsafe := stdout.Direct()

// record records the metrics of the provided process.
func (r *MetricRecorder) record(process *gopsutil.Process) (*Metrics, error) {
// Instrument the current process
metrics, err := instrument(process)
if err != nil {
stdoutUnsafe.Errorf("failed to gather some metrics: %v\n", err)
metrics, errs := instrument(process)
if len(errs) > 0 {
return nil, errors.Wrapf(errors2.Join(errs...), "failed to gather some metrics for process with pid %q", process.Pid)
}

return metrics, nil
}

func (r *MetricRecorder) write(ctx context.Context, metrics, previous *Metrics) error {
// Build each set of metrics
memoryMetrics := r.format.Format("memory", r.tags, r.buildMemoryFields(metrics))
cpuMetrics := r.format.Format("cpu", r.tags, r.buildCPUFields(metrics))
Expand All @@ -40,10 +41,10 @@ func (r *MetricRecorder) iterate(ctx context.Context, process *gopsutil.Process,
// Combine all metrics so we can write them all at once
data := fmt.Sprintf("%s\n%s\n%s\n%s", memoryMetrics, cpuMetrics, networkMetrics, diskMetrics)
if err := r.writer.Write(ctx, data); err != nil {
stdoutUnsafe.Errorf("failed to write combined metrics: %v\n", err)
return errors.Wrap(err, "failed to write combined metrics")
}

return metrics
return nil
}

func instrument(process *gopsutil.Process) (*Metrics, []error) {
Expand Down Expand Up @@ -138,39 +139,45 @@ func (r *MetricRecorder) buildDiskFields(current, previous *Metrics) []core.KeyV
return values
}

// getChildProcess tries to find the child process of the current process.
// The child process is the process which is running the underlying test.
func getChildProcess() (*gopsutil.Process, error) {
var processes []*gopsutil.Process
var err error
// We need to retry a few times to get the process because a race condition might occur where the child process is not yet created.
for i := 0; i < 5; i++ {
processes, err = gopsutil.Processes()
if err != nil {
return nil, errors.Wrap(err, "failed to list running processes")
// aggregate aggregates the metrics from multiple processes.
// Some test tools might spawn multiple processes to run the tests.
// Example: when executing JMeter, the entry process is a shell script which spawns the actual JMeter Java process.
func aggregate(metrics []*Metrics) *Metrics {
aggregated := &Metrics{
Memory: &gopsutil.MemoryInfoStat{},
CPU: 0,
Disk: &gopsutil.IOCountersStat{},
Network: &net.IOCountersStat{},
}
for _, m := range metrics {
if m.Memory != nil {
aggregated.Memory.RSS += m.Memory.RSS
aggregated.Memory.VMS += m.Memory.VMS
aggregated.Memory.Swap += m.Memory.Swap
aggregated.Memory.Data += m.Memory.Data
aggregated.Memory.Stack += m.Memory.Stack
aggregated.Memory.Locked += m.Memory.Locked
aggregated.Memory.Stack += m.Memory.Stack
}
if len(processes) > 1 {
break
aggregated.CPU += m.CPU
if m.Disk != nil {
aggregated.Disk.ReadCount += m.Disk.ReadCount
aggregated.Disk.WriteCount += m.Disk.WriteCount
aggregated.Disk.ReadBytes += m.Disk.ReadBytes
aggregated.Disk.WriteBytes += m.Disk.WriteBytes
aggregated.Disk.DiskReadBytes += m.Disk.DiskReadBytes
aggregated.Disk.DiskWriteBytes += m.Disk.DiskWriteBytes
}
}

// Find the pid of the process which is running the underlying binary.
pid := int32(os.Getpid())
var process *gopsutil.Process
for _, p := range processes {
ppid, err := p.Ppid()
if err != nil {
return nil, errors.Wrapf(err, "failed to get parent process id for process pid: %d", p.Pid)
}
if p.Pid != pid && ppid == pid {
process = p
break
if m.Network != nil {
aggregated.Network.BytesSent += m.Network.BytesSent
aggregated.Network.BytesRecv += m.Network.BytesRecv
aggregated.Network.PacketsSent += m.Network.PacketsSent
aggregated.Network.PacketsRecv += m.Network.PacketsRecv
aggregated.Network.Errin += m.Network.Errin
aggregated.Network.Errout += m.Network.Errout
aggregated.Network.Dropin += m.Network.Dropin
aggregated.Network.Dropout += m.Network.Dropout
}
}
// If the process is not found, return an error.
if process == nil {
return nil, errors.New("failed to find process")
}

return process, nil
return aggregated
}
42 changes: 34 additions & 8 deletions pkg/utilization/utilisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package utilization
import (
"context"
"fmt"
"sync"
"time"

gopsutil "github.com/shirou/gopsutil/v4/process"

"github.com/kubeshop/testkube/pkg/utilization/core"

"github.com/kubeshop/testkube/cmd/testworkflow-init/output"
Expand Down Expand Up @@ -76,12 +79,6 @@ func (r *MetricRecorder) Start(ctx context.Context) {
t := time.NewTicker(r.samplingInterval)
defer t.Stop()

process, err := getChildProcess()
if err != nil {
stdoutUnsafe.Errorf("failed to get process: %v\n", err)
return
}

previous := &Metrics{}
for {
select {
Expand All @@ -91,7 +88,32 @@ func (r *MetricRecorder) Start(ctx context.Context) {
}
return
case <-t.C:
previous = r.iterate(ctx, process, previous)
processes, err := gopsutil.Processes()
if err != nil {
stdoutUnsafe.Errorf("failed to get processes: %v\n", err)
continue
}

metrics := make([]*Metrics, len(processes))
wg := sync.WaitGroup{}
wg.Add(len(processes))
for i := range processes {
go func(i int) {
defer wg.Done()
m, err := r.record(processes[i])
if err != nil {
stdoutUnsafe.Errorf("failed to record metrics: %v\n", err)
return
}
metrics[i] = m
}(i)
}
wg.Wait()
aggregated := aggregate(metrics)
if err := r.write(ctx, aggregated, previous); err != nil {
stdoutUnsafe.Errorf("failed to write metrics: %v\n", err)
}
previous = aggregated
}
}
}
Expand All @@ -110,9 +132,12 @@ type Config struct {
}

type ExecutionConfig struct {
Workflow string
Workflow string
// Step is a reference to the step in the workflow.
Step string
Execution string
// Resource is the unique identifier of a container step
Resource string
}

// WithMetricsRecorder runs the provided function and records the metrics in the specified directory.
Expand All @@ -136,6 +161,7 @@ func WithMetricsRecorder(config Config, fn func(), postProcessFn func() error) {
Step: core.Step{Ref: config.ExecutionConfig.Step},
Execution: config.ExecutionConfig.Execution,
Format: config.Format,
Resource: config.ExecutionConfig.Resource,
ContainerResources: config.ContainerResources,
}
w, err := core.NewFileWriter(config.Dir, metadata, 4)
Expand Down

0 comments on commit e18eb52

Please sign in to comment.