Skip to content

Commit

Permalink
Add csi-common package
Browse files Browse the repository at this point in the history
  • Loading branch information
mugdha-adhav committed Mar 5, 2025
1 parent 54838f3 commit 4b21e12
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 21 deletions.
4 changes: 2 additions & 2 deletions cmd/install/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ func main() {

manifest := &bytes.Buffer{}
if err := t.Execute(manifest, conf); err != nil {
fmt.Fprintf(os.Stderr, err.Error())
fmt.Fprintf(os.Stderr, "%s", err.Error())
}

ds := appsv1.DaemonSet{}
if err := yaml.Unmarshal(manifest.Bytes(), &ds); err != nil {
fmt.Fprintf(os.Stderr, manifest.String())
fmt.Fprintf(os.Stderr, "%s", manifest.String())
panic(err)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/plugin/controller_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/pkg/errors"
csicommon "github.com/warm-metal/container-image-csi-driver/pkg/csi-common"
"github.com/warm-metal/container-image-csi-driver/pkg/watcher"
csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down
10 changes: 5 additions & 5 deletions cmd/plugin/identity_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ import (
"golang.org/x/net/context"
)

type IdentityServer struct {
version string
csi.UnimplementedIdentityServer
}

func NewIdentityServer(version string) *IdentityServer {
return &IdentityServer{
version: version,
}
}

type IdentityServer struct {
version string
csi.UnimplementedIdentityServer
}

func (ids *IdentityServer) GetPluginInfo(_ context.Context, _ *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
return &csi.GetPluginInfoResponse{
Name: driverName,
Expand Down
2 changes: 1 addition & 1 deletion cmd/plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
"github.com/warm-metal/container-image-csi-driver/pkg/backend/containerd"
"github.com/warm-metal/container-image-csi-driver/pkg/backend/crio"
"github.com/warm-metal/container-image-csi-driver/pkg/cri"
"github.com/warm-metal/container-image-csi-driver/pkg/csi-common"
"github.com/warm-metal/container-image-csi-driver/pkg/metrics"
"github.com/warm-metal/container-image-csi-driver/pkg/secret"
"github.com/warm-metal/container-image-csi-driver/pkg/watcher"
csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common"
"k8s.io/klog/v2"
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/plugin/node_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
"github.com/containerd/containerd/reference/docker"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"github.com/warm-metal/container-image-csi-driver/pkg/backend"
csicommon "github.com/warm-metal/container-image-csi-driver/pkg/csi-common"
"github.com/warm-metal/container-image-csi-driver/pkg/metrics"
"github.com/warm-metal/container-image-csi-driver/pkg/remoteimage"
"github.com/warm-metal/container-image-csi-driver/pkg/remoteimageasync"
"github.com/warm-metal/container-image-csi-driver/pkg/secret"
csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
cri "k8s.io/cri-api/pkg/apis/runtime/v1"
Expand Down
2 changes: 1 addition & 1 deletion cmd/plugin/node_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"github.com/warm-metal/container-image-csi-driver/pkg/backend"
"github.com/warm-metal/container-image-csi-driver/pkg/backend/containerd"
"github.com/warm-metal/container-image-csi-driver/pkg/cri"
csicommon "github.com/warm-metal/container-image-csi-driver/pkg/csi-common"
"github.com/warm-metal/container-image-csi-driver/pkg/metrics"
"github.com/warm-metal/container-image-csi-driver/pkg/test/utils"
csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
github.com/prometheus/client_golang v1.20.5
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.10.0
github.com/warm-metal/csi-drivers v1.0.2
golang.org/x/net v0.32.0
google.golang.org/grpc v1.69.0
k8s.io/api v0.32.0
Expand Down Expand Up @@ -62,7 +61,6 @@ require (
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/gnostic-models v0.6.9 // indirect
github.com/google/go-cmp v0.6.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZ
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.2.2 h1:1+mZ9upx1Dh6FmUTFR1naJ77miKiXgALjWOZ3NVFPmY=
github.com/golang/glog v1.2.2/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -219,8 +217,6 @@ github.com/ulikunitz/xz v0.5.12 h1:37Nm15o69RwBkXM0J6A5OlE67RZTfzUxTj8fB3dfcsc=
github.com/ulikunitz/xz v0.5.12/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/vbatts/tar-split v0.11.7 h1:ixZ93pO/GmvaZw4Vq9OwmfZK/kc2zKdPfu0B+gYqs3U=
github.com/vbatts/tar-split v0.11.7/go.mod h1:eF6B6i6ftWQcDqEn3/iGFRFRo8cBIMSJVOpnNdfTMFA=
github.com/warm-metal/csi-drivers v1.0.2 h1:aNfIbHr8rdDTbgmk4FcRoUhklReaWJ3DIvugU+E7rQw=
github.com/warm-metal/csi-drivers v1.0.2/go.mod h1:Z5byZ1dFlAmx9TI4ObZbtbXjgQXrF7NAHoH8QGgu9w8=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
2 changes: 1 addition & 1 deletion pkg/backend/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *SnapshotMounter) buildSnapshotCacheOrDie() {
// But the mountpoint checking become unavailable any more.
if notMount, err := mounter.IsLikelyNotMountPoint(string(target)); err != nil || notMount {
klog.Errorf("target %q is not a mountpoint yet. trying to release the ref of snapshot %q",
key)
target, key)
delete(targets, target)
continue
}
Expand Down
165 changes: 165 additions & 0 deletions pkg/csi-common/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package csicommon

import (
"context"
"fmt"
"net"
"os"
"strings"
"sync"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"k8s.io/klog/v2"
)

// Defines Non blocking GRPC server interfaces
type NonBlockingGRPCServer interface {
// Start services
Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer)
// Waits for the service to stop
Wait()
// Stops the service gracefully
Stop()
// Stops the service forcefully
ForceStop()
}

func NewNonBlockingGRPCServer() NonBlockingGRPCServer {
return &nonBlockingGRPCServer{}
}

// NonBlocking server
type nonBlockingGRPCServer struct {
wg sync.WaitGroup
server *grpc.Server
}

func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
s.wg.Add(1)
go s.serve(endpoint, ids, cs, ns)
}

func (s *nonBlockingGRPCServer) Wait() {
s.wg.Wait()
}

func (s *nonBlockingGRPCServer) Stop() {
s.server.GracefulStop()
}

func (s *nonBlockingGRPCServer) ForceStop() {
s.server.Stop()
}

func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
proto, addr, err := parseEndpoint(endpoint)
if err != nil {
klog.Fatal(err.Error())
}

if proto == "unix" {
addr = "/" + addr
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
klog.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
}
}

listener, err := net.Listen(proto, addr)
if err != nil {
klog.Fatalf("Failed to listen: %v", err)
}

opts := []grpc.ServerOption{
grpc.UnaryInterceptor(logGRPC),
}
server := grpc.NewServer(opts...)
s.server = server

if ids != nil {
csi.RegisterIdentityServer(server, ids)
}
if cs != nil {
csi.RegisterControllerServer(server, cs)
}
if ns != nil {
csi.RegisterNodeServer(server, ns)
}

klog.Infof("Listening for connections on address: %#v", listener.Addr())

err = server.Serve(listener)
if err != nil {
klog.Fatalf("Failed to serve: %v", err)
}
}

// Helper functions
func parseEndpoint(ep string) (string, string, error) {
if strings.HasPrefix(ep, "unix://") || strings.HasPrefix(ep, "tcp://") {
s := strings.SplitN(ep, "://", 2)
if s[1] != "" {
return s[0], s[1], nil
}
}
return "", "", fmt.Errorf("invalid endpoint: %v", ep)
}

func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
klog.V(3).Infof("GRPC call: %s", info.FullMethod)
return handler(ctx, req)
}

// CSIDriver object
type CSIDriver struct {
name string
nodeID string
version string
volumeCapabilities []*csi.VolumeCapability_AccessMode
controllerCapabilities []*csi.ControllerServiceCapability
}

func NewCSIDriver(name string, v string, nodeID string) *CSIDriver {
driver := CSIDriver{
name: name,
version: v,
nodeID: nodeID,
}
return &driver
}

func (d *CSIDriver) AddControllerServiceCapabilities(cl []csi.ControllerServiceCapability_RPC_Type) {
var csc []*csi.ControllerServiceCapability

for _, c := range cl {
csc = append(csc, NewControllerServiceCapability(c))
}

d.controllerCapabilities = csc
}

func (d *CSIDriver) AddVolumeCapabilityAccessModes(vc []csi.VolumeCapability_AccessMode_Mode) {
var vca []*csi.VolumeCapability_AccessMode
for _, c := range vc {
vca = append(vca, NewVolumeCapabilityAccessMode(c))
}
d.volumeCapabilities = vca
}

func (d *CSIDriver) GetVolumeCapabilityAccessModes() []*csi.VolumeCapability_AccessMode {
return d.volumeCapabilities
}

func NewVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *csi.VolumeCapability_AccessMode {
return &csi.VolumeCapability_AccessMode{Mode: mode}
}

func NewControllerServiceCapability(cap csi.ControllerServiceCapability_RPC_Type) *csi.ControllerServiceCapability {
return &csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: cap,
},
},
}
}
2 changes: 1 addition & 1 deletion pkg/remoteimage/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (p puller) ImageWithoutTag() string {
func (p puller) ImageSize(ctx context.Context) (size int, err error) {
defer func() {
if err != nil {
klog.Errorf(err.Error())
klog.Errorf("%s", err.Error())
metrics.OperationErrorsCount.WithLabelValues("size-error").Inc()
}
}()
Expand Down
4 changes: 2 additions & 2 deletions pkg/remoteimageasync/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ func RunPullerLoop(
case <-ctx.Done(): // application shutting down
ses.isTimedOut = false
ses.err = fmt.Errorf("%s.RunPullerLoop(): shutting down", prefix)
klog.V(2).Infof(ses.err.Error())
klog.V(2).Infof("%s", ses.err.Error())
metrics.OperationErrorsCount.WithLabelValues("pull-async-shutdown").Inc()
case <-ctxAsyncPullTimeoutOrShutdown.Done(): // async pull timeout or shutdown
ses.isTimedOut = true
ses.err = fmt.Errorf("%s.RunPullerLoop(): async pull exceeded timeout of %v for image %s", prefix, ses.timeout, ses.ImageWithTag())
klog.V(2).Infof(ses.err.Error())
klog.V(2).Infof("%s", ses.err.Error())
metrics.OperationErrorsCount.WithLabelValues("pull-async-timeout").Inc()
default: // completion: success or error
ses.isTimedOut = false
Expand Down

0 comments on commit 4b21e12

Please sign in to comment.