Skip to content

Commit

Permalink
implemented test organization improvements from discussion
Browse files Browse the repository at this point in the history
  • Loading branch information
imuni4fun committed Feb 23, 2024
1 parent b5d6c03 commit b824954
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 94 deletions.
76 changes: 76 additions & 0 deletions pkg/remoteimageasync/patterns_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package remoteimageasync

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

// demonstrates session channel structure's pass-by-reference is appropriate
func TestChannelStructContent(t *testing.T) {
input1 := make(chan PullSession, 1)
val1 := PullSession{
image: "test1",
err: nil,
}
assert.Nil(t, val1.err)
input1 <- val1
tmp1 := <-input1
tmp1.err = fmt.Errorf("test1")
assert.NotNil(t, tmp1.err)
assert.Nil(t, val1.err, "pass by value does not update value")

input2 := make(chan *PullSession, 1)
val2 := PullSession{
image: "test2",
err: nil,
}
assert.Nil(t, val2.err)
input2 <- &val2
tmp2 := <-input2
tmp2.err = fmt.Errorf("test2")
assert.NotNil(t, tmp2.err)
assert.NotNil(t, val2.err, "pass by reference does update value")
}

// demonstrates logic used in remoteimageasync.StartPull()
func TestChannelClose(t *testing.T) {
input1 := make(chan interface{}, 5)
result := 0

select {
case input1 <- 0:
result = 1
default:
result = -1
}
assert.Equal(t, 1, result, "write should succeed")

assert.Panics(t, func() {
close(input1)
select {
case input1 <- 0:
result = 2
default:
result = -2
}
}, "write should panic")

var err error = nil
assert.NotPanics(t, func() {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("recovered from %v", rec)
}
}()
select {
case input1 <- 0:
result = 3
default:
result = -3
}
}, "write should not panic")
assert.NotNil(t, err, "error should have been returned")
assert.Contains(t, err.Error(), "closed", "error should indicate channel closed")
}
130 changes: 36 additions & 94 deletions pkg/remoteimageasync/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,79 +11,7 @@ import (
"github.com/stretchr/testify/assert"
)

type chanTestStruct struct {
str string
err error
}

// demonstrates session channel structure's pass-by-reference is appropriate
func TestChannelStructContent(t *testing.T) {
input1 := make(chan chanTestStruct, 1)
val1 := chanTestStruct{
str: "test1",
err: nil,
}
assert.Nil(t, val1.err)
input1 <- val1
tmp1 := <-input1
tmp1.err = fmt.Errorf("test1")
assert.NotNil(t, tmp1.err)
assert.Nil(t, val1.err, "pass by value does not update value")

input2 := make(chan *chanTestStruct, 1)
val2 := chanTestStruct{
str: "test2",
err: nil,
}
assert.Nil(t, val2.err)
input2 <- &val2
tmp2 := <-input2
tmp2.err = fmt.Errorf("test2")
assert.NotNil(t, tmp2.err)
assert.NotNil(t, val2.err, "pass by reference does update value")
}

// demonstrates logic used in remoteimageasync.StartPull()
func TestChannelClose(t *testing.T) {
input1 := make(chan interface{}, 5)
result := 0

select {
case input1 <- 0:
result = 1
default:
result = -1
}
assert.Equal(t, 1, result, "write should succeed")

assert.Panics(t, func() {
close(input1)
select {
case input1 <- 0:
result = 2
default:
result = -2
}
}, "write should panic")

var err error = nil
assert.NotPanics(t, func() {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("recovered from %v", rec)
}
}()
select {
case input1 <- 0:
result = 3
default:
result = -3
}
}, "write should not panic")
assert.NotNil(t, err, "error should have been returned")
assert.Contains(t, err.Error(), "closed", "error should indicate channel closed")
}

// this demonstrates that session errors are consistent across go routines
func TestAsyncPullErrorReturn(t *testing.T) {
ctx, dontCare := context.WithTimeout(context.TODO(), 5*time.Second)
defer dontCare()
Expand All @@ -98,27 +26,31 @@ func TestAsyncPullErrorReturn(t *testing.T) {
<-ctx.Done()
}

// demonstrates pullerMock is functioning properly
// verifies parallelism
// checks correct image pulls completed withing set time (5s)
func TestPullDuration(t *testing.T) {
ctx, dontCare := context.WithTimeout(context.TODO(), 5*time.Second)
ctx, dontCare := context.WithTimeout(context.TODO(), 5*time.Second) // shut down execution
defer dontCare()
puller := StartAsyncPuller(ctx, 100)
var ops atomic.Int32

imgs := []int{1, 2, 3, 4, 6, 7, 8}
durations := []int{1, 2, 3, 4, 6, 7, 8}

for _, i := range imgs {
go func(i int) {
err := pullImage(puller, fmt.Sprintf("nginx:%v", i), i, 10, 10)
if err == nil {
for _, dur := range durations {
go func(dur int) {
err := pullImage(puller, fmt.Sprintf("nginx:%v", dur), dur, 10, 10)
if err == nil { // rejects pull results that are returned due to shutting down puller loop, otherwise a race condition
ops.Add(1)
}
}(i)
}(dur)
}

<-ctx.Done()
assert.Equal(t, 4, int(ops.Load()), "only 4 of %v should complete", len(imgs))
<-ctx.Done() // stop waiting when context times out (shut down)
assert.Equal(t, 4, int(ops.Load()), "only 4 of %v should complete", len(durations))
}

// checks for call serialization
func TestParallelPull(t *testing.T) {
ctx, dontCare := context.WithTimeout(context.TODO(), 3*time.Second)
defer dontCare()
Expand All @@ -140,13 +72,15 @@ func TestParallelPull(t *testing.T) {
assert.Equal(t, len(imgs), int(ops.Load()), "all %v should complete", len(imgs))
}

// tests timeouts and eventual success of long image pull
func TestSerialResumedSessions(t *testing.T) {
ctx, dontCare := context.WithTimeout(context.TODO(), 6*time.Second)
defer dontCare()
puller := StartAsyncPuller(ctx, 100)
var success atomic.Int32
var notSuccess atomic.Int32

// 3 states exist for each pull: running, success, error
pull := func(image string, pullSec, asyncTimeoutSec, callerTimeoutSec int) {
err := pullImage(puller, image, pullSec, asyncTimeoutSec, callerTimeoutSec)
if err == nil {
Expand All @@ -156,20 +90,24 @@ func TestSerialResumedSessions(t *testing.T) {
}
}

pull("nginx:A", 5, 6, 1)
pull("nginx:A", 5, 6, 1)
pull("nginx:A", 5, 6, 1)
pull("nginx:A", 5, 6, 1)
// these are serial, not parallel. simulates kubelet retrying call to NodePublishVolume().
pull("nginx:A", 5, 6, 1) // caller times out after 1s but pull continues asynchronously
pull("nginx:A", 5, 6, 1) // continues session but caller times out after 1s
pull("nginx:A", 5, 6, 1) // continues session but caller times out after 1s
pull("nginx:A", 5, 6, 1) // continues session but caller times out after 1s
assert.Equal(t, 0, int(success.Load()), "none should have finished yet")
assert.Equal(t, 4, int(notSuccess.Load()), "all should have errored so far")
assert.Equal(t, 4, int(notSuccess.Load()), "all should have errored so far") // needed because 3 states exist

pull("nginx:A", 5, 6, 1)
pull("nginx:A", 5, 6, 2) // succeed after 1s because 5s (pull time) has elapsed since session started
assert.Equal(t, 1, int(success.Load()), "1 should have finished")
assert.Equal(t, 4, int(notSuccess.Load()), "no new errors after previous")

<-ctx.Done()
}

// simulates multiple pods trying to mount same image
// this would result in parallel NodePublishVolume() calls to pull and mount same image
// demonstrates timeout and async pull continuation under that scenario
func TestParallelResumedSessions(t *testing.T) {
ctx, dontCare := context.WithTimeout(context.TODO(), 6*time.Second)
defer dontCare()
Expand All @@ -186,19 +124,19 @@ func TestParallelResumedSessions(t *testing.T) {
}
}

pull("nginx:A", 5, 6, 1)
pull("nginx:A", 5, 6, 1) // caller times out after 1s, pull continues async
assert.Equal(t, 0, int(success.Load()), "none should have finished yet")
assert.Equal(t, 1, int(notSuccess.Load()), "all should have errored so far")
assert.Equal(t, 1, int(notSuccess.Load()), "all should have errored so far") // caller timeout error returned

time.Sleep(1 * time.Second)
// time is now 2 sec into 5 sec pull

// make parallel pull requests which would time out if not resuming session
go func() { pull("nginx:A", 5, 6, 4) }()
go func() { pull("nginx:A", 5, 6, 4) }()
go func() { pull("nginx:A", 5, 6, 4) }()
go func() { pull("nginx:A", 5, 6, 4) }() // caller allows 4s but completes in 3s
go func() { pull("nginx:A", 5, 6, 4) }() // caller allows 4s but completes in 3s
go func() { pull("nginx:A", 5, 6, 4) }() // caller allows 4s but completes in 3s
assert.Equal(t, 0, int(success.Load()), "none should have finished yet")
assert.Equal(t, 1, int(notSuccess.Load()), "all should have errored so far")
assert.Equal(t, 1, int(notSuccess.Load()), "all should have errored so far") // 1 timed out, 3 in-flight blocked waiting

time.Sleep(3100 * time.Millisecond) // all should have succeeded 100ms ago

Expand All @@ -208,6 +146,9 @@ func TestParallelResumedSessions(t *testing.T) {
<-ctx.Done()
}

// pullDurationSec: typically 5-60 seconds, containerd behavior (time actually required to pull image)
// asyncPullTimeoutSec: ~10m, the new logic allows async continuation of a pull (if enabled)
// callerTimeoutSec: kubelet hard coded to 2m
func pullImage(puller AsyncPuller, image string, pullDurationSec, asyncPullTimeoutSec, callerTimeoutSec int) error {
return pullImageRand(puller, image, pullDurationSec, pullDurationSec, asyncPullTimeoutSec, callerTimeoutSec)
}
Expand Down Expand Up @@ -278,4 +219,5 @@ func (p pullerMock) ImageSize(ctx context.Context) (int, error) {
return p.size, nil
}

// this image is known to not exist and is used by integration tests for that purpose
const nonExistentImage = "docker.io/warmmetal/container-image-csi-driver-test:simple-fs-doesnt-exist"

0 comments on commit b824954

Please sign in to comment.