diff --git a/cmd/install/main.go b/cmd/install/main.go index 1df71636..458cfd6b 100644 --- a/cmd/install/main.go +++ b/cmd/install/main.go @@ -108,12 +108,12 @@ func main() { manifest := &bytes.Buffer{} if err := t.Execute(manifest, conf); err != nil { - fmt.Fprintf(os.Stderr, err.Error()) + fmt.Fprintf(os.Stderr, "%s", err.Error()) } ds := appsv1.DaemonSet{} if err := yaml.Unmarshal(manifest.Bytes(), &ds); err != nil { - fmt.Fprintf(os.Stderr, manifest.String()) + fmt.Fprintf(os.Stderr, "%s", manifest.String()) panic(err) } diff --git a/cmd/plugin/controller_server.go b/cmd/plugin/controller_server.go index e946e002..765eae44 100644 --- a/cmd/plugin/controller_server.go +++ b/cmd/plugin/controller_server.go @@ -5,8 +5,8 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/pkg/errors" + csicommon "github.com/warm-metal/container-image-csi-driver/pkg/csi-common" "github.com/warm-metal/container-image-csi-driver/pkg/watcher" - csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) diff --git a/cmd/plugin/identity_server.go b/cmd/plugin/identity_server.go index e2ea17ab..23354b0e 100644 --- a/cmd/plugin/identity_server.go +++ b/cmd/plugin/identity_server.go @@ -6,17 +6,17 @@ import ( "golang.org/x/net/context" ) +type IdentityServer struct { + version string + csi.UnimplementedIdentityServer +} + func NewIdentityServer(version string) *IdentityServer { return &IdentityServer{ version: version, } } -type IdentityServer struct { - version string - csi.UnimplementedIdentityServer -} - func (ids *IdentityServer) GetPluginInfo(_ context.Context, _ *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) { return &csi.GetPluginInfoResponse{ Name: driverName, diff --git a/cmd/plugin/main.go b/cmd/plugin/main.go index e10ee294..5b64a87a 100644 --- a/cmd/plugin/main.go +++ b/cmd/plugin/main.go @@ -13,10 +13,10 @@ import ( "github.com/warm-metal/container-image-csi-driver/pkg/backend/containerd" "github.com/warm-metal/container-image-csi-driver/pkg/backend/crio" "github.com/warm-metal/container-image-csi-driver/pkg/cri" + "github.com/warm-metal/container-image-csi-driver/pkg/csi-common" "github.com/warm-metal/container-image-csi-driver/pkg/metrics" "github.com/warm-metal/container-image-csi-driver/pkg/secret" "github.com/warm-metal/container-image-csi-driver/pkg/watcher" - csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common" "k8s.io/klog/v2" ) diff --git a/cmd/plugin/node_server.go b/cmd/plugin/node_server.go index cadb6333..9534a195 100644 --- a/cmd/plugin/node_server.go +++ b/cmd/plugin/node_server.go @@ -11,11 +11,11 @@ import ( "github.com/containerd/containerd/reference/docker" "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" "github.com/warm-metal/container-image-csi-driver/pkg/backend" + csicommon "github.com/warm-metal/container-image-csi-driver/pkg/csi-common" "github.com/warm-metal/container-image-csi-driver/pkg/metrics" "github.com/warm-metal/container-image-csi-driver/pkg/remoteimage" "github.com/warm-metal/container-image-csi-driver/pkg/remoteimageasync" "github.com/warm-metal/container-image-csi-driver/pkg/secret" - csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" cri "k8s.io/cri-api/pkg/apis/runtime/v1" diff --git a/cmd/plugin/node_server_test.go b/cmd/plugin/node_server_test.go index e3e165a5..bebf787b 100644 --- a/cmd/plugin/node_server_test.go +++ b/cmd/plugin/node_server_test.go @@ -18,9 +18,9 @@ import ( "github.com/warm-metal/container-image-csi-driver/pkg/backend" "github.com/warm-metal/container-image-csi-driver/pkg/backend/containerd" "github.com/warm-metal/container-image-csi-driver/pkg/cri" + csicommon "github.com/warm-metal/container-image-csi-driver/pkg/csi-common" "github.com/warm-metal/container-image-csi-driver/pkg/metrics" "github.com/warm-metal/container-image-csi-driver/pkg/test/utils" - csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common" "google.golang.org/grpc" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" diff --git a/go.mod b/go.mod index 9587e782..faecd825 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,6 @@ require ( github.com/prometheus/client_golang v1.20.5 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.10.0 - github.com/warm-metal/csi-drivers v1.0.2 golang.org/x/net v0.32.0 google.golang.org/grpc v1.69.0 k8s.io/api v0.32.0 @@ -62,7 +61,6 @@ require ( github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/glog v1.2.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/gnostic-models v0.6.9 // indirect github.com/google/go-cmp v0.6.0 // indirect diff --git a/go.sum b/go.sum index 7e89d586..f35cb82f 100644 --- a/go.sum +++ b/go.sum @@ -82,8 +82,6 @@ github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/glog v1.2.2 h1:1+mZ9upx1Dh6FmUTFR1naJ77miKiXgALjWOZ3NVFPmY= -github.com/golang/glog v1.2.2/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -219,8 +217,6 @@ github.com/ulikunitz/xz v0.5.12 h1:37Nm15o69RwBkXM0J6A5OlE67RZTfzUxTj8fB3dfcsc= github.com/ulikunitz/xz v0.5.12/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= github.com/vbatts/tar-split v0.11.7 h1:ixZ93pO/GmvaZw4Vq9OwmfZK/kc2zKdPfu0B+gYqs3U= github.com/vbatts/tar-split v0.11.7/go.mod h1:eF6B6i6ftWQcDqEn3/iGFRFRo8cBIMSJVOpnNdfTMFA= -github.com/warm-metal/csi-drivers v1.0.2 h1:aNfIbHr8rdDTbgmk4FcRoUhklReaWJ3DIvugU+E7rQw= -github.com/warm-metal/csi-drivers v1.0.2/go.mod h1:Z5byZ1dFlAmx9TI4ObZbtbXjgQXrF7NAHoH8QGgu9w8= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/pkg/backend/mounter.go b/pkg/backend/mounter.go index c66d222c..cec353e9 100644 --- a/pkg/backend/mounter.go +++ b/pkg/backend/mounter.go @@ -69,7 +69,7 @@ func (s *SnapshotMounter) buildSnapshotCacheOrDie() { // But the mountpoint checking become unavailable any more. if notMount, err := mounter.IsLikelyNotMountPoint(string(target)); err != nil || notMount { klog.Errorf("target %q is not a mountpoint yet. trying to release the ref of snapshot %q", - key) + target, key) delete(targets, target) continue } diff --git a/pkg/csi-common/utils.go b/pkg/csi-common/utils.go new file mode 100644 index 00000000..461dfb65 --- /dev/null +++ b/pkg/csi-common/utils.go @@ -0,0 +1,165 @@ +package csicommon + +import ( + "context" + "fmt" + "net" + "os" + "strings" + "sync" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc" + "k8s.io/klog/v2" +) + +// Defines Non blocking GRPC server interfaces +type NonBlockingGRPCServer interface { + // Start services + Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) + // Waits for the service to stop + Wait() + // Stops the service gracefully + Stop() + // Stops the service forcefully + ForceStop() +} + +func NewNonBlockingGRPCServer() NonBlockingGRPCServer { + return &nonBlockingGRPCServer{} +} + +// NonBlocking server +type nonBlockingGRPCServer struct { + wg sync.WaitGroup + server *grpc.Server +} + +func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { + s.wg.Add(1) + go s.serve(endpoint, ids, cs, ns) +} + +func (s *nonBlockingGRPCServer) Wait() { + s.wg.Wait() +} + +func (s *nonBlockingGRPCServer) Stop() { + s.server.GracefulStop() +} + +func (s *nonBlockingGRPCServer) ForceStop() { + s.server.Stop() +} + +func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { + proto, addr, err := parseEndpoint(endpoint) + if err != nil { + klog.Fatal(err.Error()) + } + + if proto == "unix" { + addr = "/" + addr + if err := os.Remove(addr); err != nil && !os.IsNotExist(err) { + klog.Fatalf("Failed to remove %s, error: %s", addr, err.Error()) + } + } + + listener, err := net.Listen(proto, addr) + if err != nil { + klog.Fatalf("Failed to listen: %v", err) + } + + opts := []grpc.ServerOption{ + grpc.UnaryInterceptor(logGRPC), + } + server := grpc.NewServer(opts...) + s.server = server + + if ids != nil { + csi.RegisterIdentityServer(server, ids) + } + if cs != nil { + csi.RegisterControllerServer(server, cs) + } + if ns != nil { + csi.RegisterNodeServer(server, ns) + } + + klog.Infof("Listening for connections on address: %#v", listener.Addr()) + + err = server.Serve(listener) + if err != nil { + klog.Fatalf("Failed to serve: %v", err) + } +} + +// Helper functions +func parseEndpoint(ep string) (string, string, error) { + if strings.HasPrefix(ep, "unix://") || strings.HasPrefix(ep, "tcp://") { + s := strings.SplitN(ep, "://", 2) + if s[1] != "" { + return s[0], s[1], nil + } + } + return "", "", fmt.Errorf("invalid endpoint: %v", ep) +} + +func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + klog.V(3).Infof("GRPC call: %s", info.FullMethod) + return handler(ctx, req) +} + +// CSIDriver object +type CSIDriver struct { + name string + nodeID string + version string + volumeCapabilities []*csi.VolumeCapability_AccessMode + controllerCapabilities []*csi.ControllerServiceCapability +} + +func NewCSIDriver(name string, v string, nodeID string) *CSIDriver { + driver := CSIDriver{ + name: name, + version: v, + nodeID: nodeID, + } + return &driver +} + +func (d *CSIDriver) AddControllerServiceCapabilities(cl []csi.ControllerServiceCapability_RPC_Type) { + var csc []*csi.ControllerServiceCapability + + for _, c := range cl { + csc = append(csc, NewControllerServiceCapability(c)) + } + + d.controllerCapabilities = csc +} + +func (d *CSIDriver) AddVolumeCapabilityAccessModes(vc []csi.VolumeCapability_AccessMode_Mode) { + var vca []*csi.VolumeCapability_AccessMode + for _, c := range vc { + vca = append(vca, NewVolumeCapabilityAccessMode(c)) + } + d.volumeCapabilities = vca +} + +func (d *CSIDriver) GetVolumeCapabilityAccessModes() []*csi.VolumeCapability_AccessMode { + return d.volumeCapabilities +} + +func NewVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *csi.VolumeCapability_AccessMode { + return &csi.VolumeCapability_AccessMode{Mode: mode} +} + +func NewControllerServiceCapability(cap csi.ControllerServiceCapability_RPC_Type) *csi.ControllerServiceCapability { + return &csi.ControllerServiceCapability{ + Type: &csi.ControllerServiceCapability_Rpc{ + Rpc: &csi.ControllerServiceCapability_RPC{ + Type: cap, + }, + }, + } +} diff --git a/pkg/remoteimage/pull.go b/pkg/remoteimage/pull.go index 282d57b5..ba2f3160 100644 --- a/pkg/remoteimage/pull.go +++ b/pkg/remoteimage/pull.go @@ -48,7 +48,7 @@ func (p puller) ImageWithoutTag() string { func (p puller) ImageSize(ctx context.Context) (size int, err error) { defer func() { if err != nil { - klog.Errorf(err.Error()) + klog.Errorf("%s", err.Error()) metrics.OperationErrorsCount.WithLabelValues("size-error").Inc() } }() diff --git a/pkg/remoteimageasync/puller.go b/pkg/remoteimageasync/puller.go index 5e23d619..a4e3bb09 100644 --- a/pkg/remoteimageasync/puller.go +++ b/pkg/remoteimageasync/puller.go @@ -37,12 +37,12 @@ func RunPullerLoop( case <-ctx.Done(): // application shutting down ses.isTimedOut = false ses.err = fmt.Errorf("%s.RunPullerLoop(): shutting down", prefix) - klog.V(2).Infof(ses.err.Error()) + klog.V(2).Infof("%s", ses.err.Error()) metrics.OperationErrorsCount.WithLabelValues("pull-async-shutdown").Inc() case <-ctxAsyncPullTimeoutOrShutdown.Done(): // async pull timeout or shutdown ses.isTimedOut = true ses.err = fmt.Errorf("%s.RunPullerLoop(): async pull exceeded timeout of %v for image %s", prefix, ses.timeout, ses.ImageWithTag()) - klog.V(2).Infof(ses.err.Error()) + klog.V(2).Infof("%s", ses.err.Error()) metrics.OperationErrorsCount.WithLabelValues("pull-async-timeout").Inc() default: // completion: success or error ses.isTimedOut = false