From 901c5d2eac2a6eab145f1bdb40978a7b0b794189 Mon Sep 17 00:00:00 2001 From: Jason Keller Date: Thu, 15 Feb 2024 22:18:01 -0500 Subject: [PATCH] simplified async implementation further --- pkg/remoteimageasync/puller.go | 91 +++++++++++------------ pkg/remoteimageasync/synchronizer.go | 71 ++++++++---------- pkg/remoteimageasync/synchronizer_test.go | 42 +++++++++++ 3 files changed, 113 insertions(+), 91 deletions(-) diff --git a/pkg/remoteimageasync/puller.go b/pkg/remoteimageasync/puller.go index 5226519..6d0eafe 100644 --- a/pkg/remoteimageasync/puller.go +++ b/pkg/remoteimageasync/puller.go @@ -3,69 +3,62 @@ package remoteimageasync import ( "context" "fmt" + "sync" "time" "k8s.io/klog/v2" ) +// sessionChan and completedChan both closed here func RunPullerLoop( ctx context.Context, sessionChan chan *PullSession, completedChan chan string, ) { go func() { + <-ctx.Done() + close(sessionChan) // only close this once + }() + go func() { + wg := sync.WaitGroup{} for { - select { - case <-ctx.Done(): // shut down loop - // close(completedChan) // the writer is supposed to close channels, but this is messy due to async ops... leaving it open + ses, ok := <-sessionChan // ctx not observed for shut down, this sleep breaks when sessionChan is closed + if !ok { // sessionChan closed, shut down loop + wg.Wait() // wait for running goroutines (which observe cancellation) + close(completedChan) // only close this once, after completions submitted (wg.Wait() returns) return - case ses, ok := <-sessionChan: - if !ok { // sessionChan closed, shut down loop - return - } - go func() { - klog.V(2).Infof("%s.RunPullerLoop(): asked to pull image %s with timeout %v\n", - prefix, ses.image, ses.timeout) - ctxCombined, cancelDontCare := context.WithTimeout(ctx, ses.timeout) // combine timeout and shut down signal into one - defer cancelDontCare() // IF we exit, this no longer matters. calling to satisfy linter. - //NOTE: the logic for "mustPull" is not needed so long as we are not throttling. - // if we DO implement throttling, then additional logic might be required. - // mustPull := !cri.hasImage(ses.image) - pullStart := time.Now() - // if mustPull { - // klog.V(2).Infof("%s.RunPullerLoop(): image not found, pulling %s\n", prefix, ses.image) - // cri.pullImage(ses.image, ctx2) - // } - pullErr := ses.puller.Pull(ctxCombined) //NOTE: relying existing tests or history to veirfy behavior, asyncPull just wraps it - // update fields - select { - case <-ctx.Done(): // shutting down - ses.isComplete = false - ses.isTimedOut = false - ses.err = fmt.Errorf("%s.RunPullerLoop(): shutting down", prefix) - klog.V(2).Infof(ses.err.Error()) - case <-ctxCombined.Done(): - ses.isComplete = false - ses.isTimedOut = true - ses.err = fmt.Errorf("%s.RunPullerLoop(): async pull exceeded timeout of %v for image %s", prefix, ses.timeout, ses.image) - klog.V(2).Infof(ses.err.Error()) - default: - ses.isComplete = true - ses.isTimedOut = false - ses.err = pullErr - // if mustPull { - klog.V(2).Infof("%s.RunPullerLoop(): pull completed in %v for image %s with error=%v\n", prefix, time.Since(pullStart), ses.image, ses.err) - // } else { - // klog.V(2).Infof("%s.RunPullerLoop(): image already present for %s\n", prefix, ses.image) - // } - } - close(ses.done) // signal done - //NOTE: writing to completedChan could error if already closed above... that's ok because everything would be shutting down. - //NOTE: also, it could block until the completion processor catches up, which is ok. - completedChan <- ses.image // this must be open when this completes or we'd have to recover from a panic - }() } + wg.Add(1) // increment before goroutine starts execution to avoid race condition + go func() { + defer wg.Done() // announce done upon completion, various ways to wake this goroutine are in place + klog.V(2).Infof("%s.RunPullerLoop(): asked to pull image %s with timeout %v\n", + prefix, ses.image, ses.timeout) + ctxCombined, cancelDontCare := context.WithTimeout(ctx, ses.timeout) // combine timeout and shut down signal into one + defer cancelDontCare() // IF we exit, this no longer matters. calling to satisfy linter. + pullStart := time.Now() + pullErr := ses.puller.Pull(ctxCombined) //NOTE: relying existing tests or history to verify behavior, asyncPull just wraps it + // update fields on session before declaring done + select { + case <-ctx.Done(): // shutting down + ses.isComplete = false + ses.isTimedOut = false + ses.err = fmt.Errorf("%s.RunPullerLoop(): shutting down", prefix) + klog.V(2).Infof(ses.err.Error()) + case <-ctxCombined.Done(): // timeout or shutdown + ses.isComplete = false + ses.isTimedOut = true + ses.err = fmt.Errorf("%s.RunPullerLoop(): async pull exceeded timeout of %v for image %s", prefix, ses.timeout, ses.image) + klog.V(2).Infof(ses.err.Error()) + default: // completion: success or error + ses.isComplete = true + ses.isTimedOut = false + ses.err = pullErr + klog.V(2).Infof("%s.RunPullerLoop(): pull completed in %v for image %s with error=%v\n", prefix, time.Since(pullStart), ses.image, ses.err) + } + close(ses.done) // signal done, all waiters should wake + //NOTE: completedChan could block until the completion loop catches up, which is ok... it's work is trivial and gated only by sessionMap mutex + completedChan <- ses.image // this will always be open on this send + }() } - }() } diff --git a/pkg/remoteimageasync/synchronizer.go b/pkg/remoteimageasync/synchronizer.go index 8011bf6..6d3b60a 100644 --- a/pkg/remoteimageasync/synchronizer.go +++ b/pkg/remoteimageasync/synchronizer.go @@ -21,7 +21,7 @@ func StartAsyncPuller(ctx context.Context, sessionChanDepth, completedChanDepth sessionChan, completedChan, ) - async.RunCompletionsChecker() + async.RunCompletionsLoop() RunPullerLoop(ctx, sessionChan, completedChan) klog.Infof("%s.StartAsyncPuller(): async puller is operational", prefix) return async @@ -48,9 +48,9 @@ func getSynchronizer( } } -func (s synchronizer) StartPull(image string, puller remoteimage.Puller, asyncPullTimeout time.Duration) (*PullSession, error) { +func (s synchronizer) StartPull(image string, puller remoteimage.Puller, asyncPullTimeout time.Duration) (ses *PullSession, err error) { klog.V(2).Infof("%s.StartPull(): start pull: asked to pull image %s", prefix, image) - s.mutex.Lock() // lock mutex + s.mutex.Lock() // lock mutex, no blocking sends/receives inside mutex defer s.mutex.Unlock() ses, ok := s.sessionMap[image] // try get session if !ok { // if no session, create session @@ -63,70 +63,57 @@ func (s synchronizer) StartPull(image string, puller remoteimage.Puller, asyncPu isTimedOut: false, err: nil, } + + defer func() { + if rec := recover(); rec != nil { // handle session write panic due to closed sessionChan + // override named return values + ses = nil + err = fmt.Errorf("%s.StartPull(): cannot create pull session for %s at this time, reason: %v", prefix, ses.image, rec) + klog.V(2).Info(err.Error()) + } + }() select { - case s.sessions <- ses: // start session, check for deadlock... possibility of panic but only during app shutdown where Puller has already ceased to operate + case s.sessions <- ses: // start session, check for deadlock... possibility of panic but only during app shutdown where Puller has already ceased to operate, handle with defer/recover klog.V(2).Infof("%s.StartPull(): new session created for %s with timeout %v", prefix, ses.image, ses.timeout) s.sessionMap[image] = ses // add session to map to allow continuation... only do this because was passed to puller via sessions channel - default: // catch deadlock or throttling (they will look the same) - ses.err = fmt.Errorf("%s.StartPull(): cannot pull %s at this time, throttling or deadlock condition exists, retry if throttling", prefix, ses.image) - klog.V(2).Info(ses.err.Error()) - close(ses.done) // this can and must be closed here because it was never passed to the puller via sessions channel - return ses, ses.err + return ses, nil + default: // catch deadlock or throttling (they may look the same) + err := fmt.Errorf("%s.StartPull(): cannot create pull session for %s at this time, throttling or deadlock condition exists, retry if throttling", prefix, ses.image) + klog.V(2).Info(err.Error()) + return nil, err } } else { klog.V(2).Infof("%s.StartPull(): found open session for %s", prefix, ses.image) + // return session and unlock + return ses, nil } - // return session and unlock - return ses, nil } func (s synchronizer) WaitForPull(session *PullSession, callerTimeout context.Context) error { klog.V(2).Infof("%s.WaitForPull(): starting to wait for image %s", prefix, session.image) defer klog.V(2).Infof("%s.WaitForPull(): exiting wait for image %s", prefix, session.image) select { - case <-session.done: // success or error (including session timeout) + case <-session.done: // success or error (including session timeout and shutting down) klog.V(2).Infof("%s.WaitForPull(): session completed with success or error for image %s, error=%v", prefix, session.image, session.err) return session.err - case <-callerTimeout.Done(): + case <-callerTimeout.Done(): // caller timeout err := fmt.Errorf("%s.WaitForPull(): this wait for image %s has timed out due to caller context cancellation, pull likely continues in the background", prefix, session.image) klog.V(2).Info(err.Error()) return err - case <-s.ctx.Done(): //TODO: might wait for puller to do this instead - err := fmt.Errorf("%s.WaitForPull(): synchronizer is shutting down", prefix) // must return error since not success - klog.V(2).Infof(err.Error()) - return err } } -func (s synchronizer) RunCompletionsChecker() { +// NOTE: all sessions that are successfully submitted to sessionsChan must be submitted to completedEvents +func (s synchronizer) RunCompletionsLoop() { go func() { - klog.V(2).Infof("%s.RunCompletionsChecker(): starting", prefix) - shutdown := func() { - klog.V(2).Infof("%s.RunCompletionsChecker(): shutting down", prefix) + klog.V(2).Infof("%s.RunCompletionsLoop(): starting", prefix) + for image := range s.completedEvents { // remove session (no longer active) s.mutex.Lock() - for image := range s.sessionMap { // purge open sessions, continuation no longer allowed - delete(s.sessionMap, image) // no-op if already deleted - } - close(s.sessions) // the writer is supposed to close channels - // no need to process any future completed events + klog.V(2).Infof("%s.RunCompletionsLoop(): clearing session for %s", prefix, image) + delete(s.sessionMap, image) // no-op if already deleted s.mutex.Unlock() } - defer shutdown() - - for { - select { - case <-s.ctx.Done(): // shut down loop - return // deferred shutdown will do the work - case image, ok := <-s.completedEvents: // remove session (no longer active) - if ok { - s.mutex.Lock() - delete(s.sessionMap, image) // no-op if already deleted - s.mutex.Unlock() - } else { // channel closed, no further sessions can be created - return // deferred shutdown will do the work - } - } - } + klog.V(2).Infof("%s.RunCompletionsLoop(): exiting loop", prefix) }() } diff --git a/pkg/remoteimageasync/synchronizer_test.go b/pkg/remoteimageasync/synchronizer_test.go index 9087cc2..b90abed 100644 --- a/pkg/remoteimageasync/synchronizer_test.go +++ b/pkg/remoteimageasync/synchronizer_test.go @@ -16,6 +16,7 @@ type chanTestStruct struct { err error } +// demonstrates session channel structure's pass-by-reference is appropriate func TestChannelStructContent(t *testing.T) { input1 := make(chan chanTestStruct, 1) val1 := chanTestStruct{ @@ -42,6 +43,47 @@ func TestChannelStructContent(t *testing.T) { assert.NotNil(t, val2.err, "pass by reference does update value") } +// demonstrates logic used in remoteimageasync.StartPull() +func TestChannelClose(t *testing.T) { + input1 := make(chan interface{}, 5) + result := 0 + + select { + case input1 <- 0: + result = 1 + default: + result = -1 + } + assert.Equal(t, 1, result, "write should succeed") + + assert.Panics(t, func() { + close(input1) + select { + case input1 <- 0: + result = 2 + default: + result = -2 + } + }, "write should panic") + + var err error = nil + assert.NotPanics(t, func() { + defer func() { + if rec := recover(); rec != nil { + err = fmt.Errorf("recovered from %v", rec) + } + }() + select { + case input1 <- 0: + result = 3 + default: + result = -3 + } + }, "write should not panic") + assert.NotNil(t, err, "error should have been returned") + assert.Contains(t, err.Error(), "closed", "error should indicate channel closed") +} + func TestAsyncPullErrorReturn(t *testing.T) { ctx, dontCare := context.WithTimeout(context.TODO(), 5*time.Second) defer dontCare()