Skip to content

Commit

Permalink
simplified async implementation further
Browse files Browse the repository at this point in the history
  • Loading branch information
imuni4fun committed Feb 16, 2024
1 parent 613debd commit 901c5d2
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 91 deletions.
91 changes: 42 additions & 49 deletions pkg/remoteimageasync/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,69 +3,62 @@ package remoteimageasync
import (
"context"
"fmt"
"sync"
"time"

"k8s.io/klog/v2"
)

// sessionChan and completedChan both closed here
func RunPullerLoop(
ctx context.Context,
sessionChan chan *PullSession,
completedChan chan string,
) {
go func() {
<-ctx.Done()
close(sessionChan) // only close this once
}()
go func() {
wg := sync.WaitGroup{}
for {
select {
case <-ctx.Done(): // shut down loop
// close(completedChan) // the writer is supposed to close channels, but this is messy due to async ops... leaving it open
ses, ok := <-sessionChan // ctx not observed for shut down, this sleep breaks when sessionChan is closed
if !ok { // sessionChan closed, shut down loop
wg.Wait() // wait for running goroutines (which observe cancellation)
close(completedChan) // only close this once, after completions submitted (wg.Wait() returns)
return
case ses, ok := <-sessionChan:
if !ok { // sessionChan closed, shut down loop
return
}
go func() {
klog.V(2).Infof("%s.RunPullerLoop(): asked to pull image %s with timeout %v\n",
prefix, ses.image, ses.timeout)
ctxCombined, cancelDontCare := context.WithTimeout(ctx, ses.timeout) // combine timeout and shut down signal into one
defer cancelDontCare() // IF we exit, this no longer matters. calling to satisfy linter.
//NOTE: the logic for "mustPull" is not needed so long as we are not throttling.
// if we DO implement throttling, then additional logic might be required.
// mustPull := !cri.hasImage(ses.image)
pullStart := time.Now()
// if mustPull {
// klog.V(2).Infof("%s.RunPullerLoop(): image not found, pulling %s\n", prefix, ses.image)
// cri.pullImage(ses.image, ctx2)
// }
pullErr := ses.puller.Pull(ctxCombined) //NOTE: relying existing tests or history to veirfy behavior, asyncPull just wraps it
// update fields
select {
case <-ctx.Done(): // shutting down
ses.isComplete = false
ses.isTimedOut = false
ses.err = fmt.Errorf("%s.RunPullerLoop(): shutting down", prefix)
klog.V(2).Infof(ses.err.Error())
case <-ctxCombined.Done():
ses.isComplete = false
ses.isTimedOut = true
ses.err = fmt.Errorf("%s.RunPullerLoop(): async pull exceeded timeout of %v for image %s", prefix, ses.timeout, ses.image)
klog.V(2).Infof(ses.err.Error())
default:
ses.isComplete = true
ses.isTimedOut = false
ses.err = pullErr
// if mustPull {
klog.V(2).Infof("%s.RunPullerLoop(): pull completed in %v for image %s with error=%v\n", prefix, time.Since(pullStart), ses.image, ses.err)
// } else {
// klog.V(2).Infof("%s.RunPullerLoop(): image already present for %s\n", prefix, ses.image)
// }
}
close(ses.done) // signal done
//NOTE: writing to completedChan could error if already closed above... that's ok because everything would be shutting down.
//NOTE: also, it could block until the completion processor catches up, which is ok.
completedChan <- ses.image // this must be open when this completes or we'd have to recover from a panic
}()
}
wg.Add(1) // increment before goroutine starts execution to avoid race condition
go func() {
defer wg.Done() // announce done upon completion, various ways to wake this goroutine are in place
klog.V(2).Infof("%s.RunPullerLoop(): asked to pull image %s with timeout %v\n",
prefix, ses.image, ses.timeout)
ctxCombined, cancelDontCare := context.WithTimeout(ctx, ses.timeout) // combine timeout and shut down signal into one
defer cancelDontCare() // IF we exit, this no longer matters. calling to satisfy linter.
pullStart := time.Now()
pullErr := ses.puller.Pull(ctxCombined) //NOTE: relying existing tests or history to verify behavior, asyncPull just wraps it
// update fields on session before declaring done
select {
case <-ctx.Done(): // shutting down
ses.isComplete = false
ses.isTimedOut = false
ses.err = fmt.Errorf("%s.RunPullerLoop(): shutting down", prefix)
klog.V(2).Infof(ses.err.Error())
case <-ctxCombined.Done(): // timeout or shutdown
ses.isComplete = false
ses.isTimedOut = true
ses.err = fmt.Errorf("%s.RunPullerLoop(): async pull exceeded timeout of %v for image %s", prefix, ses.timeout, ses.image)
klog.V(2).Infof(ses.err.Error())
default: // completion: success or error
ses.isComplete = true
ses.isTimedOut = false
ses.err = pullErr
klog.V(2).Infof("%s.RunPullerLoop(): pull completed in %v for image %s with error=%v\n", prefix, time.Since(pullStart), ses.image, ses.err)
}
close(ses.done) // signal done, all waiters should wake
//NOTE: completedChan could block until the completion loop catches up, which is ok... it's work is trivial and gated only by sessionMap mutex
completedChan <- ses.image // this will always be open on this send
}()
}

}()
}
71 changes: 29 additions & 42 deletions pkg/remoteimageasync/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func StartAsyncPuller(ctx context.Context, sessionChanDepth, completedChanDepth
sessionChan,
completedChan,
)
async.RunCompletionsChecker()
async.RunCompletionsLoop()
RunPullerLoop(ctx, sessionChan, completedChan)
klog.Infof("%s.StartAsyncPuller(): async puller is operational", prefix)
return async
Expand All @@ -48,9 +48,9 @@ func getSynchronizer(
}
}

func (s synchronizer) StartPull(image string, puller remoteimage.Puller, asyncPullTimeout time.Duration) (*PullSession, error) {
func (s synchronizer) StartPull(image string, puller remoteimage.Puller, asyncPullTimeout time.Duration) (ses *PullSession, err error) {
klog.V(2).Infof("%s.StartPull(): start pull: asked to pull image %s", prefix, image)
s.mutex.Lock() // lock mutex
s.mutex.Lock() // lock mutex, no blocking sends/receives inside mutex
defer s.mutex.Unlock()
ses, ok := s.sessionMap[image] // try get session
if !ok { // if no session, create session
Expand All @@ -63,70 +63,57 @@ func (s synchronizer) StartPull(image string, puller remoteimage.Puller, asyncPu
isTimedOut: false,
err: nil,
}

defer func() {
if rec := recover(); rec != nil { // handle session write panic due to closed sessionChan
// override named return values
ses = nil
err = fmt.Errorf("%s.StartPull(): cannot create pull session for %s at this time, reason: %v", prefix, ses.image, rec)
klog.V(2).Info(err.Error())
}
}()
select {
case s.sessions <- ses: // start session, check for deadlock... possibility of panic but only during app shutdown where Puller has already ceased to operate
case s.sessions <- ses: // start session, check for deadlock... possibility of panic but only during app shutdown where Puller has already ceased to operate, handle with defer/recover
klog.V(2).Infof("%s.StartPull(): new session created for %s with timeout %v", prefix, ses.image, ses.timeout)
s.sessionMap[image] = ses // add session to map to allow continuation... only do this because was passed to puller via sessions channel
default: // catch deadlock or throttling (they will look the same)
ses.err = fmt.Errorf("%s.StartPull(): cannot pull %s at this time, throttling or deadlock condition exists, retry if throttling", prefix, ses.image)
klog.V(2).Info(ses.err.Error())
close(ses.done) // this can and must be closed here because it was never passed to the puller via sessions channel
return ses, ses.err
return ses, nil
default: // catch deadlock or throttling (they may look the same)
err := fmt.Errorf("%s.StartPull(): cannot create pull session for %s at this time, throttling or deadlock condition exists, retry if throttling", prefix, ses.image)
klog.V(2).Info(err.Error())
return nil, err
}
} else {
klog.V(2).Infof("%s.StartPull(): found open session for %s", prefix, ses.image)
// return session and unlock
return ses, nil
}
// return session and unlock
return ses, nil
}

func (s synchronizer) WaitForPull(session *PullSession, callerTimeout context.Context) error {
klog.V(2).Infof("%s.WaitForPull(): starting to wait for image %s", prefix, session.image)
defer klog.V(2).Infof("%s.WaitForPull(): exiting wait for image %s", prefix, session.image)
select {
case <-session.done: // success or error (including session timeout)
case <-session.done: // success or error (including session timeout and shutting down)
klog.V(2).Infof("%s.WaitForPull(): session completed with success or error for image %s, error=%v", prefix, session.image, session.err)
return session.err
case <-callerTimeout.Done():
case <-callerTimeout.Done(): // caller timeout
err := fmt.Errorf("%s.WaitForPull(): this wait for image %s has timed out due to caller context cancellation, pull likely continues in the background",
prefix, session.image)
klog.V(2).Info(err.Error())
return err
case <-s.ctx.Done(): //TODO: might wait for puller to do this instead
err := fmt.Errorf("%s.WaitForPull(): synchronizer is shutting down", prefix) // must return error since not success
klog.V(2).Infof(err.Error())
return err
}
}

func (s synchronizer) RunCompletionsChecker() {
// NOTE: all sessions that are successfully submitted to sessionsChan must be submitted to completedEvents
func (s synchronizer) RunCompletionsLoop() {
go func() {
klog.V(2).Infof("%s.RunCompletionsChecker(): starting", prefix)
shutdown := func() {
klog.V(2).Infof("%s.RunCompletionsChecker(): shutting down", prefix)
klog.V(2).Infof("%s.RunCompletionsLoop(): starting", prefix)
for image := range s.completedEvents { // remove session (no longer active)
s.mutex.Lock()
for image := range s.sessionMap { // purge open sessions, continuation no longer allowed
delete(s.sessionMap, image) // no-op if already deleted
}
close(s.sessions) // the writer is supposed to close channels
// no need to process any future completed events
klog.V(2).Infof("%s.RunCompletionsLoop(): clearing session for %s", prefix, image)
delete(s.sessionMap, image) // no-op if already deleted
s.mutex.Unlock()
}
defer shutdown()

for {
select {
case <-s.ctx.Done(): // shut down loop
return // deferred shutdown will do the work
case image, ok := <-s.completedEvents: // remove session (no longer active)
if ok {
s.mutex.Lock()
delete(s.sessionMap, image) // no-op if already deleted
s.mutex.Unlock()
} else { // channel closed, no further sessions can be created
return // deferred shutdown will do the work
}
}
}
klog.V(2).Infof("%s.RunCompletionsLoop(): exiting loop", prefix)
}()
}
42 changes: 42 additions & 0 deletions pkg/remoteimageasync/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type chanTestStruct struct {
err error
}

// demonstrates session channel structure's pass-by-reference is appropriate
func TestChannelStructContent(t *testing.T) {
input1 := make(chan chanTestStruct, 1)
val1 := chanTestStruct{
Expand All @@ -42,6 +43,47 @@ func TestChannelStructContent(t *testing.T) {
assert.NotNil(t, val2.err, "pass by reference does update value")
}

// demonstrates logic used in remoteimageasync.StartPull()
func TestChannelClose(t *testing.T) {
input1 := make(chan interface{}, 5)
result := 0

select {
case input1 <- 0:
result = 1
default:
result = -1
}
assert.Equal(t, 1, result, "write should succeed")

assert.Panics(t, func() {
close(input1)
select {
case input1 <- 0:
result = 2
default:
result = -2
}
}, "write should panic")

var err error = nil
assert.NotPanics(t, func() {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("recovered from %v", rec)
}
}()
select {
case input1 <- 0:
result = 3
default:
result = -3
}
}, "write should not panic")
assert.NotNil(t, err, "error should have been returned")
assert.Contains(t, err.Error(), "closed", "error should indicate channel closed")
}

func TestAsyncPullErrorReturn(t *testing.T) {
ctx, dontCare := context.WithTimeout(context.TODO(), 5*time.Second)
defer dontCare()
Expand Down

0 comments on commit 901c5d2

Please sign in to comment.