From d65aef74359a75137f78768ce100809705eb52fb Mon Sep 17 00:00:00 2001 From: Jason Keller Date: Tue, 6 Feb 2024 10:58:38 -0500 Subject: [PATCH] removed previous async files --- pkg/mountexecutor/mountexecutor.go | 143 ------------------------- pkg/pullexecutor/pullexecutor.go | 166 ----------------------------- 2 files changed, 309 deletions(-) delete mode 100644 pkg/mountexecutor/mountexecutor.go delete mode 100644 pkg/pullexecutor/pullexecutor.go diff --git a/pkg/mountexecutor/mountexecutor.go b/pkg/mountexecutor/mountexecutor.go deleted file mode 100644 index 540b8b6f..00000000 --- a/pkg/mountexecutor/mountexecutor.go +++ /dev/null @@ -1,143 +0,0 @@ -package mountexecutor - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/containerd/containerd/reference/docker" - "github.com/warm-metal/container-image-csi-driver/pkg/backend" - "github.com/warm-metal/container-image-csi-driver/pkg/metrics" - "github.com/warm-metal/container-image-csi-driver/pkg/mountstatus" - "github.com/warm-metal/container-image-csi-driver/pkg/pullstatus" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/klog/v2" -) - -const ( - mountPollTimeInterval = 100 * time.Millisecond - mountPollTimeout = 2 * time.Minute - mountCtxTimeout = 10 * time.Minute -) - -// MountExecutorOptions are options passed to mount executor -type MountExecutorOptions struct { - AsyncMount bool - Mounter *backend.SnapshotMounter -} - -// MountOptions are options for a single mount request -type MountOptions struct { - // Context here is only valid for synchronous mounts - Context context.Context - NamedRef docker.Named - VolumeId string - TargetPath string - VolumeCapability *csi.VolumeCapability - ReadOnly bool - Logger klog.Logger -} - -// MountExecutor executes mount -type MountExecutor struct { - asyncMount bool - mutex *sync.Mutex - mounter *backend.SnapshotMounter - asyncErrs map[docker.Named]error -} - -// NewMountExecutor initializes a new mount executor -func NewMountExecutor(o *MountExecutorOptions) *MountExecutor { - return &MountExecutor{ - asyncMount: o.AsyncMount, - mutex: &sync.Mutex{}, - mounter: o.Mounter, - } -} - -// StartMounting starts the mounting -func (m *MountExecutor) StartMounting(o *MountOptions) error { - o.Logger.Info("Mounting image", "image", o.NamedRef.Name()) - if pullstatus.Get(o.NamedRef) != pullstatus.Pulled || mountstatus.Get(o.TargetPath) == mountstatus.StillMounting { - o.Logger.Info("Could not mount image because image hasn't finshed pulling or volume is still mounting", - "image", o.NamedRef.Name(), - "pull-status", pullstatus.Get(o.NamedRef), - "mount-status", mountstatus.Get(o.TargetPath)) - return nil - } - - ro := o.ReadOnly || - o.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY || - o.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY - - if !m.asyncMount { - mountstatus.Update(o.TargetPath, mountstatus.StillMounting) - startTime := time.Now() - if err := m.mounter.Mount(o.Context, o.VolumeId, backend.MountTarget(o.TargetPath), o.NamedRef, ro); err != nil { - o.Logger.Error(err, "mount error") - metrics.OperationErrorsCount.WithLabelValues("StartMounting").Inc() - mountstatus.Update(o.TargetPath, mountstatus.Errored) - return err - } - elapsed := time.Since(startTime) - metrics.ImageMountTime.WithLabelValues(metrics.Sync).Observe(elapsed.Seconds()) - o.Logger.Info("Finished mounting", "image", o.NamedRef.Name(), "mount-duration", elapsed) - mountstatus.Update(o.TargetPath, mountstatus.Mounted) - return nil - } - - go func() { - m.mutex.Lock() - defer m.mutex.Unlock() - ctx, cancel := context.WithTimeout(context.Background(), mountCtxTimeout) - defer cancel() - - mountstatus.Update(o.TargetPath, mountstatus.StillMounting) - startTime := time.Now() - if err := m.mounter.Mount(ctx, o.VolumeId, backend.MountTarget(o.TargetPath), o.NamedRef, ro); err != nil { - o.Logger.Error(err, "mount error") - metrics.OperationErrorsCount.WithLabelValues("StartMounting").Inc() - mountstatus.Update(o.TargetPath, mountstatus.Errored) - m.asyncErrs[o.NamedRef] = fmt.Errorf("err: %v: %v", err, m.asyncErrs[o.NamedRef]) - return - } - elapsed := time.Since(startTime) - metrics.ImageMountTime.WithLabelValues(metrics.Async).Observe(elapsed.Seconds()) - o.Logger.Info("Finished mounting", "image", o.NamedRef.Name(), "mount-duration", elapsed) - mountstatus.Update(o.TargetPath, mountstatus.Mounted) - }() - - return nil -} - -// WaitForMount waits for the volume to get mounted -func (m *MountExecutor) WaitForMount(o *MountOptions) error { - if pullstatus.Get(o.NamedRef) != pullstatus.Pulled { - return nil - } - - if !m.asyncMount { - return nil - } - - mountCondFn := func() (done bool, err error) { - if mountstatus.Get(o.TargetPath) == mountstatus.Mounted { - return true, nil - } - if m.asyncErrs[o.NamedRef] != nil { - return false, m.asyncErrs[o.NamedRef] - } - return false, nil - } - - if err := wait.PollImmediate( - mountPollTimeInterval, - mountPollTimeout, - mountCondFn); err != nil { - return fmt.Errorf("waited too long to mount the image: %v", err) - } - - return nil -} diff --git a/pkg/pullexecutor/pullexecutor.go b/pkg/pullexecutor/pullexecutor.go deleted file mode 100644 index 8ca0f73a..00000000 --- a/pkg/pullexecutor/pullexecutor.go +++ /dev/null @@ -1,166 +0,0 @@ -package pullexecutor - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/containerd/containerd/reference/docker" - "github.com/pkg/errors" - "github.com/warm-metal/container-image-csi-driver/pkg/backend" - "github.com/warm-metal/container-image-csi-driver/pkg/metrics" - "github.com/warm-metal/container-image-csi-driver/pkg/pullstatus" - "github.com/warm-metal/container-image-csi-driver/pkg/remoteimage" - "github.com/warm-metal/container-image-csi-driver/pkg/secret" - "k8s.io/apimachinery/pkg/util/wait" - cri "k8s.io/cri-api/pkg/apis/runtime/v1" - "k8s.io/klog/v2" -) - -const ( - pullPollTimeInterval = 100 * time.Millisecond - pullPollTimeout = 2 * time.Minute - pullCtxTimeout = 10 * time.Minute -) - -// PullExecutorOptions are the options passed to the pull executor -type PullExecutorOptions struct { - AsyncPull bool - ImageServiceClient cri.ImageServiceClient - SecretStore secret.Store - Mounter *backend.SnapshotMounter -} - -// PullOptions are the options for a single pull request -type PullOptions struct { - // Context here is only valid for synchronous mounts - Context context.Context - NamedRef docker.Named - PullAlways bool - PullSecrets map[string]string - Image string - Logger klog.Logger -} - -// PullExecutor executes the pulls -type PullExecutor struct { - asyncPull bool - imageSvcClient cri.ImageServiceClient - mutex *sync.Mutex - asyncErrs map[docker.Named]error - secretStore secret.Store - mounter *backend.SnapshotMounter -} - -// NewPullExecutor initializes a new pull executor object -func NewPullExecutor(o *PullExecutorOptions) *PullExecutor { - return &PullExecutor{ - asyncPull: o.AsyncPull, - mutex: &sync.Mutex{}, - imageSvcClient: o.ImageServiceClient, - secretStore: o.SecretStore, - mounter: o.Mounter, - asyncErrs: make(map[docker.Named]error), - } -} - -// StartPulling starts pulling the image -func (m *PullExecutor) StartPulling(o *PullOptions) error { - keyring, err := m.secretStore.GetDockerKeyring(o.Context, o.PullSecrets) - if err != nil { - return errors.Errorf("unable to fetch keyring: %s", err) - } - - if !m.asyncPull { - puller := remoteimage.NewPuller(m.imageSvcClient, o.NamedRef, keyring) - shouldPull := o.PullAlways || !m.mounter.ImageExists(o.Context, o.NamedRef) - if shouldPull { - o.Logger.Info("Pulling image", "image", o.Image) - pullstatus.Update(o.NamedRef, pullstatus.StillPulling) - startTime := time.Now() - if err = puller.Pull(o.Context); err != nil { - pullstatus.Update(o.NamedRef, pullstatus.Errored) - metrics.OperationErrorsCount.WithLabelValues("StartPulling").Inc() - o.Logger.Error(err, "Unable to pull image", "image", o.NamedRef) - return errors.Errorf("unable to pull image %q: %s", o.NamedRef, err) - } - elapsed := time.Since(startTime) - metrics.ImagePullTime.WithLabelValues(metrics.Sync).Observe(elapsed.Seconds()) - size := puller.ImageSize(o.Context) - o.Logger.Info("Finished pulling image", "image", o.Image, "pull-duration", elapsed, "image-size", fmt.Sprintf("%.2f MiB", float64(size)/(1024.0*1024.0))) - } - pullstatus.Update(o.NamedRef, pullstatus.Pulled) - return nil - } - - if pullstatus.Get(o.NamedRef) == pullstatus.Pulled || - pullstatus.Get(o.NamedRef) == pullstatus.StillPulling { - return nil - } - - go func() { - if pullstatus.Get(o.NamedRef) == pullstatus.StatusNotFound { - m.mutex.Lock() - defer m.mutex.Unlock() - c, cancel := context.WithTimeout(context.Background(), pullCtxTimeout) - defer cancel() - - if pullstatus.Get(o.NamedRef) == pullstatus.StillPulling || - pullstatus.Get(o.NamedRef) == pullstatus.Pulled { - return - } - - puller := remoteimage.NewPuller(m.imageSvcClient, o.NamedRef, keyring) - shouldPull := o.PullAlways || !m.mounter.ImageExists(o.Context, o.NamedRef) - if shouldPull { - o.Logger.Info("Pulling image asynchronously", "image", o.Image) - pullstatus.Update(o.NamedRef, pullstatus.StillPulling) - startTime := time.Now() - - if err = puller.Pull(c); err != nil { - pullstatus.Update(o.NamedRef, pullstatus.Errored) - metrics.OperationErrorsCount.WithLabelValues("StartPulling").Inc() - o.Logger.Error(err, "Unable to pull image", "image", o.Image) - m.asyncErrs[o.NamedRef] = fmt.Errorf("unable to pull image %q: %s", o.Image, err) - return - } - elapsed := time.Since(startTime) - metrics.ImagePullTime.WithLabelValues(metrics.Async).Observe(elapsed.Seconds()) - size := puller.ImageSize(o.Context) - o.Logger.Info("Finished pulling image", "image", o.Image, "pull-duration", elapsed, "image-size", fmt.Sprintf("%.2f MiB", float64(size)/(1024.0*1024.0))) - - } - pullstatus.Update(o.NamedRef, pullstatus.Pulled) - } - }() - - return nil -} - -// WaitForPull waits until the image pull succeeds or errors or timeout is exceeded -func (m *PullExecutor) WaitForPull(o *PullOptions) error { - if !m.asyncPull { - return nil - } - - condFn := func() (done bool, err error) { - if pullstatus.Get(o.NamedRef) == pullstatus.Pulled { - return true, nil - } - - if m.asyncErrs[o.NamedRef] != nil { - return false, m.asyncErrs[o.NamedRef] - } - return false, nil - } - - if err := wait.PollImmediate( - pullPollTimeInterval, - pullPollTimeout, - condFn); err != nil { - return errors.Errorf("waited too long to download the image: %v", err) - } - - return nil -}