Skip to content

Commit

Permalink
updated logging
Browse files Browse the repository at this point in the history
  • Loading branch information
imuni4fun committed Feb 6, 2024
1 parent cfc23bd commit c98e7d6
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 25 deletions.
5 changes: 2 additions & 3 deletions cmd/plugin/node_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ type NodeServer struct {
}

func (n NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (resp *csi.NodePublishVolumeResponse, err error) {
// valuesLogger := klog.LoggerWithValues(klog.NewKlogr(), "pod-name", req.VolumeContext["pod-name"], "namespace", req.VolumeContext["namespace"], "uid", req.VolumeContext["uid"], "request-id", uuid.NewString())
// valuesLogger.Info("Incoming NodePublishVolume request", "request string", req.String())
klog.Infof("mount request: %s", req.String()) //TODO: review the commented logging above
valuesLogger := klog.LoggerWithValues(klog.NewKlogr(), "pod-name", req.VolumeContext["pod-name"], "namespace", req.VolumeContext["namespace"], "uid", req.VolumeContext["uid"])
valuesLogger.Info("Incoming NodePublishVolume request", "request string", req.String())
if len(req.VolumeId) == 0 {
err = status.Error(codes.InvalidArgument, "VolumeId is missing")
return
Expand Down
20 changes: 11 additions & 9 deletions pkg/remoteimageasync/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"
"time"

"k8s.io/klog/v2"
)

func RunPullerLoop(
Expand All @@ -22,16 +24,16 @@ func RunPullerLoop(
return
}
go func() {
fmt.Printf("puller: asked to pull image %s with timeout %v\n",
ses.image, ses.timeout)
klog.V(2).Infof("%s.RunPullerLoop(): asked to pull image %s with timeout %v\n",
prefix, ses.image, ses.timeout)
ctxCombined, cancelDontCare := context.WithTimeout(ctx, ses.timeout) // combine timeout and shut down signal into one
defer cancelDontCare() // IF we exit, this no longer matters. calling to satisfy linter.
//NOTE: the logic for "mustPull" is not needed so long as we are not throttling.
// if we DO implement throttling, then additional logic might be required.
// mustPull := !cri.hasImage(ses.image)
pullStart := time.Now()
// if mustPull {
// fmt.Printf("puller: image not found, pulling %s\n", ses.image)
// klog.V(2).Infof("%s.RunPullerLoop(): image not found, pulling %s\n", prefix, ses.image)
// cri.pullImage(ses.image, ctx2)
// }
pullErr := ses.puller.Pull(ctxCombined) //NOTE: relying existing tests or history to veirfy behavior, asyncPull just wraps it
Expand All @@ -40,21 +42,21 @@ func RunPullerLoop(
case <-ctx.Done(): // shutting down
ses.isComplete = false
ses.isTimedOut = false
ses.err = fmt.Errorf("puller: shutting down")
fmt.Println(ses.err.Error())
ses.err = fmt.Errorf("%s.RunPullerLoop(): shutting down", prefix)
klog.V(2).Infof(ses.err.Error())
case <-ctxCombined.Done():
ses.isComplete = false
ses.isTimedOut = true
ses.err = fmt.Errorf("puller: async pull exceeded timeout of %v for image %s", ses.timeout, ses.image)
fmt.Println(ses.err.Error())
ses.err = fmt.Errorf("%s.RunPullerLoop(): async pull exceeded timeout of %v for image %s", prefix, ses.timeout, ses.image)
klog.V(2).Infof(ses.err.Error())
default:
ses.isComplete = true
ses.isTimedOut = false
ses.err = pullErr
// if mustPull {
fmt.Printf("puller: pull completed in %v for image %s\n", time.Since(pullStart), ses.image)
klog.V(2).Infof("%s.RunPullerLoop(): pull completed in %v for image %s\n", prefix, time.Since(pullStart), ses.image)
// } else {
// fmt.Printf("puller: image already present for %s\n", ses.image)
// klog.V(2).Infof("%s.RunPullerLoop(): image already present for %s\n", prefix, ses.image)
// }
}
close(ses.done) // signal done
Expand Down
34 changes: 21 additions & 13 deletions pkg/remoteimageasync/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/warm-metal/container-image-csi-driver/pkg/remoteimage"
"k8s.io/klog/v2"
)

// sessionChanDepth : 100 - must give lots of buffer to ensure no deadlock or dropped requests
Expand All @@ -31,10 +32,10 @@ func getSynchronizer(
completedChan chan string,
) synchronizer {
if cap(sessionChan) < 50 {
panic("session channel must have capacity to buffer events, minimum of 50 is required")
klog.Fatalf("%s.getSynchronizer(): session channel must have capacity to buffer events, minimum of 50 is required", prefix)
}
if cap(completedChan) < 5 {
panic("completion channel must have capacity to buffer events, minimum of 5 is required")
klog.Fatalf("%s.getSynchronizer(): completion channel must have capacity to buffer events, minimum of 5 is required", prefix)
}
return synchronizer{
sessionMap: make(map[string]PullSession),
Expand All @@ -46,7 +47,7 @@ func getSynchronizer(
}

func (s synchronizer) StartPull(image string, puller remoteimage.Puller, asyncPullTimeout time.Duration) (PullSession, error) {
fmt.Printf("start pull: asked to pull image %s\n", image)
klog.V(2).Infof("%s.StartPull(): start pull: asked to pull image %s", prefix, image)
s.mutex.Lock() // lock mutex
defer s.mutex.Unlock()
ses, ok := s.sessionMap[image] // try get session
Expand All @@ -62,39 +63,46 @@ func (s synchronizer) StartPull(image string, puller remoteimage.Puller, asyncPu
}
select {
case s.sessions <- ses: // start session, check for deadlock... possibility of panic but only during app shutdown where Puller has already ceased to operate
fmt.Printf("start pull: new session created for %s with timeout %v\n", ses.image, ses.timeout)
klog.V(2).Infof("%s.StartPull(): new session created for %s with timeout %v", prefix, ses.image, ses.timeout)
default: // catch deadlock or throttling (they will look the same)
ses.err = fmt.Errorf("start pull: cannot pull %s at this time, throttling or deadlock condition exists, retry if throttling", ses.image)
ses.err = fmt.Errorf("%s.StartPull(): cannot pull %s at this time, throttling or deadlock condition exists, retry if throttling", prefix, ses.image)
klog.V(2).Info(ses.err.Error())
ses.done <- true
return ses, ses.err
}
s.sessionMap[image] = ses // add session to map
} else {
fmt.Printf("start pull: found open session for %s\n", ses.image)
klog.V(2).Infof("%s.StartPull(): found open session for %s", prefix, ses.image)
}
// return session and unlock
return ses, nil
}

func (s synchronizer) WaitForPull(session PullSession, callerTimeout context.Context) error {
fmt.Printf("wait for pull: starting to wait for image %s\n", session.image)
defer fmt.Printf("wait for pull: exiting wait for image %s\n", session.image)
klog.V(2).Infof("%s.WaitForPull(): starting to wait for image %s", prefix, session.image)
defer klog.V(2).Infof("%s.WaitForPull(): exiting wait for image %s", prefix, session.image)
select {
case <-session.done: // success or error (including session timeout)
fmt.Printf("wait for pull: pull completed for %s, isError: %t\n",
session.image, session.err != nil)
klog.V(2).Infof("%s.WaitForPull(): pull completed for %s, isError: %t, error: %v",
prefix, session.image, session.err != nil, session.err)
return session.err
case <-callerTimeout.Done():
return fmt.Errorf("wait for pull: this wait for image %s has timed out due to caller context cancellation, pull likely continues in the background",
session.image)
err := fmt.Errorf("%s.WaitForPull(): this wait for image %s has timed out due to caller context cancellation, pull likely continues in the background",
prefix, session.image)
klog.V(2).Info(err.Error())
return err
case <-s.ctx.Done(): //TODO: might wait for puller to do this instead
return fmt.Errorf("wait for pull: synchronizer is shutting down") // must return error since not success
err := fmt.Errorf("%s.WaitForPull(): synchronizer is shutting down", prefix) // must return error since not success
klog.V(2).Infof(err.Error())
return err
}
}

func (s synchronizer) RunCompletionsChecker() {
go func() {
klog.V(2).Infof("%s.RunCompletionsChecker(): starting", prefix)
shutdown := func() {
klog.V(2).Infof("%s.RunCompletionsChecker(): shutting down", prefix)
s.mutex.Lock()
for image := range s.sessionMap { // purge open sessions, continuation no longer allowed
delete(s.sessionMap, image) // no-op if already deleted
Expand Down
2 changes: 2 additions & 0 deletions pkg/remoteimageasync/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/warm-metal/container-image-csi-driver/pkg/remoteimage"
)

const prefix = "remoteimageasync"

type PullSession struct {
image string
puller remoteimage.Puller
Expand Down

0 comments on commit c98e7d6

Please sign in to comment.