From b824954339d8ceb782c0bac73c387a59817d31e7 Mon Sep 17 00:00:00 2001 From: Jason Keller Date: Fri, 23 Feb 2024 11:50:08 -0500 Subject: [PATCH] implemented test organization improvements from discussion --- pkg/remoteimageasync/patterns_test.go | 76 +++++++++++++ pkg/remoteimageasync/synchronizer_test.go | 130 ++++++---------------- 2 files changed, 112 insertions(+), 94 deletions(-) create mode 100644 pkg/remoteimageasync/patterns_test.go diff --git a/pkg/remoteimageasync/patterns_test.go b/pkg/remoteimageasync/patterns_test.go new file mode 100644 index 0000000..2382dba --- /dev/null +++ b/pkg/remoteimageasync/patterns_test.go @@ -0,0 +1,76 @@ +package remoteimageasync + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +// demonstrates session channel structure's pass-by-reference is appropriate +func TestChannelStructContent(t *testing.T) { + input1 := make(chan PullSession, 1) + val1 := PullSession{ + image: "test1", + err: nil, + } + assert.Nil(t, val1.err) + input1 <- val1 + tmp1 := <-input1 + tmp1.err = fmt.Errorf("test1") + assert.NotNil(t, tmp1.err) + assert.Nil(t, val1.err, "pass by value does not update value") + + input2 := make(chan *PullSession, 1) + val2 := PullSession{ + image: "test2", + err: nil, + } + assert.Nil(t, val2.err) + input2 <- &val2 + tmp2 := <-input2 + tmp2.err = fmt.Errorf("test2") + assert.NotNil(t, tmp2.err) + assert.NotNil(t, val2.err, "pass by reference does update value") +} + +// demonstrates logic used in remoteimageasync.StartPull() +func TestChannelClose(t *testing.T) { + input1 := make(chan interface{}, 5) + result := 0 + + select { + case input1 <- 0: + result = 1 + default: + result = -1 + } + assert.Equal(t, 1, result, "write should succeed") + + assert.Panics(t, func() { + close(input1) + select { + case input1 <- 0: + result = 2 + default: + result = -2 + } + }, "write should panic") + + var err error = nil + assert.NotPanics(t, func() { + defer func() { + if rec := recover(); rec != nil { + err = fmt.Errorf("recovered from %v", rec) + } + }() + select { + case input1 <- 0: + result = 3 + default: + result = -3 + } + }, "write should not panic") + assert.NotNil(t, err, "error should have been returned") + assert.Contains(t, err.Error(), "closed", "error should indicate channel closed") +} diff --git a/pkg/remoteimageasync/synchronizer_test.go b/pkg/remoteimageasync/synchronizer_test.go index d4925bf..fb1cb45 100644 --- a/pkg/remoteimageasync/synchronizer_test.go +++ b/pkg/remoteimageasync/synchronizer_test.go @@ -11,79 +11,7 @@ import ( "github.com/stretchr/testify/assert" ) -type chanTestStruct struct { - str string - err error -} - -// demonstrates session channel structure's pass-by-reference is appropriate -func TestChannelStructContent(t *testing.T) { - input1 := make(chan chanTestStruct, 1) - val1 := chanTestStruct{ - str: "test1", - err: nil, - } - assert.Nil(t, val1.err) - input1 <- val1 - tmp1 := <-input1 - tmp1.err = fmt.Errorf("test1") - assert.NotNil(t, tmp1.err) - assert.Nil(t, val1.err, "pass by value does not update value") - - input2 := make(chan *chanTestStruct, 1) - val2 := chanTestStruct{ - str: "test2", - err: nil, - } - assert.Nil(t, val2.err) - input2 <- &val2 - tmp2 := <-input2 - tmp2.err = fmt.Errorf("test2") - assert.NotNil(t, tmp2.err) - assert.NotNil(t, val2.err, "pass by reference does update value") -} - -// demonstrates logic used in remoteimageasync.StartPull() -func TestChannelClose(t *testing.T) { - input1 := make(chan interface{}, 5) - result := 0 - - select { - case input1 <- 0: - result = 1 - default: - result = -1 - } - assert.Equal(t, 1, result, "write should succeed") - - assert.Panics(t, func() { - close(input1) - select { - case input1 <- 0: - result = 2 - default: - result = -2 - } - }, "write should panic") - - var err error = nil - assert.NotPanics(t, func() { - defer func() { - if rec := recover(); rec != nil { - err = fmt.Errorf("recovered from %v", rec) - } - }() - select { - case input1 <- 0: - result = 3 - default: - result = -3 - } - }, "write should not panic") - assert.NotNil(t, err, "error should have been returned") - assert.Contains(t, err.Error(), "closed", "error should indicate channel closed") -} - +// this demonstrates that session errors are consistent across go routines func TestAsyncPullErrorReturn(t *testing.T) { ctx, dontCare := context.WithTimeout(context.TODO(), 5*time.Second) defer dontCare() @@ -98,27 +26,31 @@ func TestAsyncPullErrorReturn(t *testing.T) { <-ctx.Done() } +// demonstrates pullerMock is functioning properly +// verifies parallelism +// checks correct image pulls completed withing set time (5s) func TestPullDuration(t *testing.T) { - ctx, dontCare := context.WithTimeout(context.TODO(), 5*time.Second) + ctx, dontCare := context.WithTimeout(context.TODO(), 5*time.Second) // shut down execution defer dontCare() puller := StartAsyncPuller(ctx, 100) var ops atomic.Int32 - imgs := []int{1, 2, 3, 4, 6, 7, 8} + durations := []int{1, 2, 3, 4, 6, 7, 8} - for _, i := range imgs { - go func(i int) { - err := pullImage(puller, fmt.Sprintf("nginx:%v", i), i, 10, 10) - if err == nil { + for _, dur := range durations { + go func(dur int) { + err := pullImage(puller, fmt.Sprintf("nginx:%v", dur), dur, 10, 10) + if err == nil { // rejects pull results that are returned due to shutting down puller loop, otherwise a race condition ops.Add(1) } - }(i) + }(dur) } - <-ctx.Done() - assert.Equal(t, 4, int(ops.Load()), "only 4 of %v should complete", len(imgs)) + <-ctx.Done() // stop waiting when context times out (shut down) + assert.Equal(t, 4, int(ops.Load()), "only 4 of %v should complete", len(durations)) } +// checks for call serialization func TestParallelPull(t *testing.T) { ctx, dontCare := context.WithTimeout(context.TODO(), 3*time.Second) defer dontCare() @@ -140,6 +72,7 @@ func TestParallelPull(t *testing.T) { assert.Equal(t, len(imgs), int(ops.Load()), "all %v should complete", len(imgs)) } +// tests timeouts and eventual success of long image pull func TestSerialResumedSessions(t *testing.T) { ctx, dontCare := context.WithTimeout(context.TODO(), 6*time.Second) defer dontCare() @@ -147,6 +80,7 @@ func TestSerialResumedSessions(t *testing.T) { var success atomic.Int32 var notSuccess atomic.Int32 + // 3 states exist for each pull: running, success, error pull := func(image string, pullSec, asyncTimeoutSec, callerTimeoutSec int) { err := pullImage(puller, image, pullSec, asyncTimeoutSec, callerTimeoutSec) if err == nil { @@ -156,20 +90,24 @@ func TestSerialResumedSessions(t *testing.T) { } } - pull("nginx:A", 5, 6, 1) - pull("nginx:A", 5, 6, 1) - pull("nginx:A", 5, 6, 1) - pull("nginx:A", 5, 6, 1) + // these are serial, not parallel. simulates kubelet retrying call to NodePublishVolume(). + pull("nginx:A", 5, 6, 1) // caller times out after 1s but pull continues asynchronously + pull("nginx:A", 5, 6, 1) // continues session but caller times out after 1s + pull("nginx:A", 5, 6, 1) // continues session but caller times out after 1s + pull("nginx:A", 5, 6, 1) // continues session but caller times out after 1s assert.Equal(t, 0, int(success.Load()), "none should have finished yet") - assert.Equal(t, 4, int(notSuccess.Load()), "all should have errored so far") + assert.Equal(t, 4, int(notSuccess.Load()), "all should have errored so far") // needed because 3 states exist - pull("nginx:A", 5, 6, 1) + pull("nginx:A", 5, 6, 2) // succeed after 1s because 5s (pull time) has elapsed since session started assert.Equal(t, 1, int(success.Load()), "1 should have finished") assert.Equal(t, 4, int(notSuccess.Load()), "no new errors after previous") <-ctx.Done() } +// simulates multiple pods trying to mount same image +// this would result in parallel NodePublishVolume() calls to pull and mount same image +// demonstrates timeout and async pull continuation under that scenario func TestParallelResumedSessions(t *testing.T) { ctx, dontCare := context.WithTimeout(context.TODO(), 6*time.Second) defer dontCare() @@ -186,19 +124,19 @@ func TestParallelResumedSessions(t *testing.T) { } } - pull("nginx:A", 5, 6, 1) + pull("nginx:A", 5, 6, 1) // caller times out after 1s, pull continues async assert.Equal(t, 0, int(success.Load()), "none should have finished yet") - assert.Equal(t, 1, int(notSuccess.Load()), "all should have errored so far") + assert.Equal(t, 1, int(notSuccess.Load()), "all should have errored so far") // caller timeout error returned time.Sleep(1 * time.Second) // time is now 2 sec into 5 sec pull // make parallel pull requests which would time out if not resuming session - go func() { pull("nginx:A", 5, 6, 4) }() - go func() { pull("nginx:A", 5, 6, 4) }() - go func() { pull("nginx:A", 5, 6, 4) }() + go func() { pull("nginx:A", 5, 6, 4) }() // caller allows 4s but completes in 3s + go func() { pull("nginx:A", 5, 6, 4) }() // caller allows 4s but completes in 3s + go func() { pull("nginx:A", 5, 6, 4) }() // caller allows 4s but completes in 3s assert.Equal(t, 0, int(success.Load()), "none should have finished yet") - assert.Equal(t, 1, int(notSuccess.Load()), "all should have errored so far") + assert.Equal(t, 1, int(notSuccess.Load()), "all should have errored so far") // 1 timed out, 3 in-flight blocked waiting time.Sleep(3100 * time.Millisecond) // all should have succeeded 100ms ago @@ -208,6 +146,9 @@ func TestParallelResumedSessions(t *testing.T) { <-ctx.Done() } +// pullDurationSec: typically 5-60 seconds, containerd behavior (time actually required to pull image) +// asyncPullTimeoutSec: ~10m, the new logic allows async continuation of a pull (if enabled) +// callerTimeoutSec: kubelet hard coded to 2m func pullImage(puller AsyncPuller, image string, pullDurationSec, asyncPullTimeoutSec, callerTimeoutSec int) error { return pullImageRand(puller, image, pullDurationSec, pullDurationSec, asyncPullTimeoutSec, callerTimeoutSec) } @@ -278,4 +219,5 @@ func (p pullerMock) ImageSize(ctx context.Context) (int, error) { return p.size, nil } +// this image is known to not exist and is used by integration tests for that purpose const nonExistentImage = "docker.io/warmmetal/container-image-csi-driver-test:simple-fs-doesnt-exist"