diff --git a/kwok/charts/crds/karpenter.sh_nodepools.yaml b/kwok/charts/crds/karpenter.sh_nodepools.yaml index 6601e59dbf..a898f5c361 100644 --- a/kwok/charts/crds/karpenter.sh_nodepools.yaml +++ b/kwok/charts/crds/karpenter.sh_nodepools.yaml @@ -498,6 +498,12 @@ spec: - type type: object type: array + nodeClassObservedGeneration: + description: |- + NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match + the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset + format: int64 + type: integer resources: additionalProperties: anyOf: diff --git a/pkg/apis/crds/karpenter.sh_nodepools.yaml b/pkg/apis/crds/karpenter.sh_nodepools.yaml index 157aaf13c4..725b953af7 100644 --- a/pkg/apis/crds/karpenter.sh_nodepools.yaml +++ b/pkg/apis/crds/karpenter.sh_nodepools.yaml @@ -496,6 +496,12 @@ spec: - type type: object type: array + nodeClassObservedGeneration: + description: |- + NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match + the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset + format: int64 + type: integer resources: additionalProperties: anyOf: diff --git a/pkg/apis/v1/nodepool_status.go b/pkg/apis/v1/nodepool_status.go index 1b3f974694..8fe17c376b 100644 --- a/pkg/apis/v1/nodepool_status.go +++ b/pkg/apis/v1/nodepool_status.go @@ -27,6 +27,8 @@ const ( ConditionTypeValidationSucceeded = "ValidationSucceeded" // ConditionTypeNodeClassReady = "NodeClassReady" condition indicates that underlying nodeClass was resolved and is reporting as Ready ConditionTypeNodeClassReady = "NodeClassReady" + // ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy" condition indicates if a misconfiguration exists that is preventing successful node launch/registrations that requires manual investigation + ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy" ) // NodePoolStatus defines the observed state of NodePool @@ -34,6 +36,10 @@ type NodePoolStatus struct { // Resources is the list of resources that have been provisioned. // +optional Resources v1.ResourceList `json:"resources,omitempty"` + // NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match + // the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset + // +optional + NodeClassObservedGeneration int64 `json:"nodeClassObservedGeneration,omitempty"` // Conditions contains signals for health and readiness // +optional Conditions []status.Condition `json:"conditions,omitempty"` diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 682bf172fd..544def960e 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -50,6 +50,7 @@ import ( nodepoolcounter "sigs.k8s.io/karpenter/pkg/controllers/nodepool/counter" nodepoolhash "sigs.k8s.io/karpenter/pkg/controllers/nodepool/hash" nodepoolreadiness "sigs.k8s.io/karpenter/pkg/controllers/nodepool/readiness" + nodepoolregistrationhealth "sigs.k8s.io/karpenter/pkg/controllers/nodepool/registrationhealth" nodepoolvalidation "sigs.k8s.io/karpenter/pkg/controllers/nodepool/validation" "sigs.k8s.io/karpenter/pkg/controllers/provisioning" "sigs.k8s.io/karpenter/pkg/controllers/state" @@ -88,6 +89,7 @@ func NewControllers( metricsnodepool.NewController(kubeClient, cloudProvider), metricsnode.NewController(cluster), nodepoolreadiness.NewController(kubeClient, cloudProvider), + nodepoolregistrationhealth.NewController(kubeClient, cloudProvider), nodepoolcounter.NewController(kubeClient, cloudProvider, cluster), nodepoolvalidation.NewController(kubeClient, cloudProvider), podevents.NewController(clock, kubeClient, cloudProvider), diff --git a/pkg/controllers/nodeclaim/lifecycle/liveness.go b/pkg/controllers/nodeclaim/lifecycle/liveness.go index fc1a272752..610c5b0ca9 100644 --- a/pkg/controllers/nodeclaim/lifecycle/liveness.go +++ b/pkg/controllers/nodeclaim/lifecycle/liveness.go @@ -20,9 +20,14 @@ import ( "context" "time" + "k8s.io/apimachinery/pkg/api/errors" + + "k8s.io/apimachinery/pkg/types" + + "sigs.k8s.io/controller-runtime/pkg/log" + "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" @@ -51,6 +56,12 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco if ttl := registrationTTL - l.clock.Since(registered.LastTransitionTime.Time); ttl > 0 { return reconcile.Result{RequeueAfter: ttl}, nil } + if err := l.updateNodePoolRegistrationHealth(ctx, nodeClaim); err != nil { + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, err + } // Delete the NodeClaim if we believe the NodeClaim won't register since we haven't seen the node if err := l.kubeClient.Delete(ctx, nodeClaim); err != nil { return reconcile.Result{}, client.IgnoreNotFound(err) @@ -61,6 +72,34 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey], metrics.CapacityTypeLabel: nodeClaim.Labels[v1.CapacityTypeLabelKey], }) - return reconcile.Result{}, nil } + +// updateNodePoolRegistrationHealth sets the NodeRegistrationHealthy=False +// on the NodePool if the nodeClaim fails to launch/register +func (l *Liveness) updateNodePoolRegistrationHealth(ctx context.Context, nodeClaim *v1.NodeClaim) error { + nodePoolName := nodeClaim.Labels[v1.NodePoolLabelKey] + if nodePoolName != "" { + nodePool := &v1.NodePool{} + if err := l.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil { + return client.IgnoreNotFound(err) + } + if nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown() { + stored := nodePool.DeepCopy() + // If the nodeClaim failed to register during the TTL set NodeRegistrationHealthy status condition on + // NodePool to False. If the launch failed get the launch failure reason and message from nodeClaim. + if launchCondition := nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched); launchCondition.IsTrue() { + nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "RegistrationFailed", "Failed to register node") + } else { + nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, launchCondition.Reason, launchCondition.Message) + } + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch + // can cause races due to the fact that it fully replaces the list on a change + // Here, we are updating the status condition list + if err := l.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil { + return err + } + } + } + return nil +} diff --git a/pkg/controllers/nodeclaim/lifecycle/liveness_test.go b/pkg/controllers/nodeclaim/lifecycle/liveness_test.go index 8fe3421782..b2309ea91d 100644 --- a/pkg/controllers/nodeclaim/lifecycle/liveness_test.go +++ b/pkg/controllers/nodeclaim/lifecycle/liveness_test.go @@ -19,6 +19,9 @@ package lifecycle_test import ( "time" + "github.com/awslabs/operatorpkg/status" + + operatorpkg "github.com/awslabs/operatorpkg/test/expectations" . "github.com/onsi/ginkgo/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -78,6 +81,12 @@ var _ = Describe("Liveness", func() { ExpectFinalizersRemoved(ctx, env.Client, nodeClaim) if isManagedNodeClaim { ExpectNotFound(ctx, env.Client, nodeClaim) + operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{ + Type: v1.ConditionTypeNodeRegistrationHealthy, + Status: metav1.ConditionFalse, + Reason: "RegistrationFailed", + Message: "Failed to register node", + }) } else { ExpectExists(ctx, env.Client, nodeClaim) } @@ -138,6 +147,58 @@ var _ = Describe("Liveness", func() { _ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim) nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + // If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim + fakeClock.Step(time.Minute * 20) + _ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim) + operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{ + Type: v1.ConditionTypeNodeRegistrationHealthy, + Status: metav1.ConditionFalse, + Reason: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Reason, + Message: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Message, + }) + ExpectFinalizersRemoved(ctx, env.Client, nodeClaim) + ExpectNotFound(ctx, env.Client, nodeClaim) + }) + It("should not update NodeRegistrationHealthy status condition if it is already set to True", func() { + nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy) + nodeClaim := test.NodeClaim(v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + }, + }, + Spec: v1.NodeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("50Mi"), + corev1.ResourcePods: resource.MustParse("5"), + fake.ResourceGPUVendorA: resource.MustParse("1"), + }, + }, + }, + }) + cloudProvider.AllowedCreateCalls = 0 // Don't allow Create() calls to succeed + ExpectApplied(ctx, env.Client, nodePool, nodeClaim) + _ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim) + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + + // If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim + fakeClock.Step(time.Minute * 20) + _ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim) + + // NodeClaim registration failed, but we should not update the NodeRegistrationHealthy status condition if it is already True + operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{Type: v1.ConditionTypeNodeRegistrationHealthy, Status: metav1.ConditionTrue}) + ExpectFinalizersRemoved(ctx, env.Client, nodeClaim) + ExpectNotFound(ctx, env.Client, nodeClaim) + }) + It("should not block on updating NodeRegistrationHealthy status condition if nodeClaim is not owned by a nodePool", func() { + nodeClaim := test.NodeClaim() + cloudProvider.AllowedCreateCalls = 0 // Don't allow Create() calls to succeed + ExpectApplied(ctx, env.Client, nodeClaim) + _ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim) + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + // If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim fakeClock.Step(time.Minute * 20) _ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim) diff --git a/pkg/controllers/nodeclaim/lifecycle/registration.go b/pkg/controllers/nodeclaim/lifecycle/registration.go index de92e108cd..063f1a440d 100644 --- a/pkg/controllers/nodeclaim/lifecycle/registration.go +++ b/pkg/controllers/nodeclaim/lifecycle/registration.go @@ -20,6 +20,8 @@ import ( "context" "fmt" + "k8s.io/apimachinery/pkg/types" + "github.com/samber/lo" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -83,9 +85,37 @@ func (r *Registration) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) ( metrics.NodesCreatedTotal.Inc(map[string]string{ metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey], }) + if err := r.updateNodePoolRegistrationHealth(ctx, nodeClaim); err != nil { + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, err + } return reconcile.Result{}, nil } +// updateNodePoolRegistrationHealth sets the NodeRegistrationHealthy=True +// on the NodePool if the nodeClaim that registered is owned by a NodePool +func (r *Registration) updateNodePoolRegistrationHealth(ctx context.Context, nodeClaim *v1.NodeClaim) error { + nodePoolName := nodeClaim.Labels[v1.NodePoolLabelKey] + if nodePoolName != "" { + nodePool := &v1.NodePool{} + if err := r.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil { + return client.IgnoreNotFound(err) + } + storedNodePool := nodePool.DeepCopy() + if nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy) { + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch + // can cause races due to the fact that it fully replaces the list on a change + // Here, we are updating the status condition list + if err := r.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(storedNodePool, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil { + return err + } + } + } + return nil +} + func (r *Registration) syncNode(ctx context.Context, nodeClaim *v1.NodeClaim, node *corev1.Node) error { stored := node.DeepCopy() controllerutil.AddFinalizer(node, v1.TerminationFinalizer) diff --git a/pkg/controllers/nodeclaim/lifecycle/registration_test.go b/pkg/controllers/nodeclaim/lifecycle/registration_test.go index 254a08cfa4..9506dbdb47 100644 --- a/pkg/controllers/nodeclaim/lifecycle/registration_test.go +++ b/pkg/controllers/nodeclaim/lifecycle/registration_test.go @@ -17,6 +17,10 @@ limitations under the License. package lifecycle_test import ( + "time" + + "github.com/awslabs/operatorpkg/status" + operatorpkg "github.com/awslabs/operatorpkg/test/expectations" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" @@ -55,6 +59,7 @@ var _ = Describe("Registration", func() { }) } nodeClaim := test.NodeClaim(nodeClaimOpts...) + nodePool.StatusConditions().SetUnknown(v1.ConditionTypeNodeRegistrationHealthy) ExpectApplied(ctx, env.Client, nodePool, nodeClaim) ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim) nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) @@ -67,6 +72,10 @@ var _ = Describe("Registration", func() { if isManagedNodeClaim { Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsTrue()).To(BeTrue()) Expect(nodeClaim.Status.NodeName).To(Equal(node.Name)) + operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{ + Type: v1.ConditionTypeNodeRegistrationHealthy, + Status: metav1.ConditionTrue, + }) } else { Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsUnknown()).To(BeTrue()) Expect(nodeClaim.Status.NodeName).To(Equal("")) @@ -380,4 +389,44 @@ var _ = Describe("Registration", func() { node = ExpectExists(ctx, env.Client, node) Expect(node.Spec.Taints).To(HaveLen(0)) }) + It("should add NodeRegistrationHealthy=true on the nodePool if registration succeeds and if it was previously false", func() { + nodeClaimOpts := []v1.NodeClaim{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + }, + }, + }} + nodeClaim := test.NodeClaim(nodeClaimOpts...) + nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "unhealthy", "unhealthy") + ExpectApplied(ctx, env.Client, nodePool, nodeClaim) + ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim) + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + + node := test.Node(test.NodeOptions{ProviderID: nodeClaim.Status.ProviderID, Taints: []corev1.Taint{v1.UnregisteredNoExecuteTaint}}) + ExpectApplied(ctx, env.Client, node) + ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsTrue()).To(BeTrue()) + Expect(nodeClaim.Status.NodeName).To(Equal(node.Name)) + operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{ + Type: v1.ConditionTypeNodeRegistrationHealthy, + Status: metav1.ConditionTrue, + }) + }) + It("should not block on updating NodeRegistrationHealthy status condition if nodeClaim is not owned by a nodePool", func() { + nodeClaim := test.NodeClaim() + ExpectApplied(ctx, env.Client, nodeClaim) + ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim) + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + + node := test.Node(test.NodeOptions{ProviderID: nodeClaim.Status.ProviderID, Taints: []corev1.Taint{v1.UnregisteredNoExecuteTaint}}) + ExpectApplied(ctx, env.Client, node) + ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsTrue()).To(BeTrue()) + Expect(nodeClaim.Status.NodeName).To(Equal(node.Name)) + }) }) diff --git a/pkg/controllers/nodepool/readiness/controller.go b/pkg/controllers/nodepool/readiness/controller.go index 7ec99f3d07..05edbd0855 100644 --- a/pkg/controllers/nodepool/readiness/controller.go +++ b/pkg/controllers/nodepool/readiness/controller.go @@ -19,9 +19,7 @@ package readiness import ( "context" - "github.com/awslabs/operatorpkg/object" "github.com/awslabs/operatorpkg/status" - "github.com/samber/lo" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" controllerruntime "sigs.k8s.io/controller-runtime" @@ -55,15 +53,10 @@ func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reco ctx = injection.WithControllerName(ctx, "nodepool.readiness") stored := nodePool.DeepCopy() - nodeClass, ok := lo.Find(c.cloudProvider.GetSupportedNodeClasses(), func(nc status.Object) bool { - return object.GVK(nc).GroupKind() == nodePool.Spec.Template.Spec.NodeClassRef.GroupKind() - }) - if !ok { - // Ignore NodePools which aren't using a supported NodeClass. + nodeClass, err := nodepoolutils.GetNodeClass(ctx, c.kubeClient, nodePool, c.cloudProvider) + if nodeClass == nil { return reconcile.Result{}, nil } - - err := c.kubeClient.Get(ctx, client.ObjectKey{Name: nodePool.Spec.Template.Spec.NodeClassRef.Name}, nodeClass) if client.IgnoreNotFound(err) != nil { return reconcile.Result{}, err } diff --git a/pkg/controllers/nodepool/registrationhealth/controller.go b/pkg/controllers/nodepool/registrationhealth/controller.go new file mode 100644 index 0000000000..0a489803fd --- /dev/null +++ b/pkg/controllers/nodepool/registrationhealth/controller.go @@ -0,0 +1,95 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrationhealth + +import ( + "context" + + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + + "sigs.k8s.io/karpenter/pkg/operator/injection" + + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" + nodepoolutils "sigs.k8s.io/karpenter/pkg/utils/nodepool" +) + +// Controller for the resource +type Controller struct { + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider +} + +// NewController will create a controller to reset NodePool's registration health when there is an update to NodePool/NodeClass spec +func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller { + return &Controller{ + kubeClient: kubeClient, + cloudProvider: cloudProvider, + } +} + +func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, "nodepool.registrationhealth") + + nodeClass, err := nodepoolutils.GetNodeClass(ctx, c.kubeClient, nodePool, c.cloudProvider) + if nodeClass == nil { + return reconcile.Result{}, nil + } + if err != nil { + return reconcile.Result{}, client.IgnoreNotFound(err) + } + stored := nodePool.DeepCopy() + + // If NodeClass/NodePool have been updated then NodeRegistrationHealthy = Unknown + if nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy) == nil || + nodePool.Status.NodeClassObservedGeneration != nodeClass.GetGeneration() || + nodePool.Generation != nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).ObservedGeneration { + nodePool.StatusConditions().SetUnknown(v1.ConditionTypeNodeRegistrationHealthy) + } + nodePool.Status.NodeClassObservedGeneration = nodeClass.GetGeneration() + if !equality.Semantic.DeepEqual(stored, nodePool) { + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch + // can cause races due to the fact that it fully replaces the list on a change + // Here, we are updating the status condition list + if err := c.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil { + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, err + } + } + return reconcile.Result{}, nil +} + +func (c *Controller) Register(_ context.Context, m manager.Manager) error { + b := controllerruntime.NewControllerManagedBy(m). + Named("nodepool.registrationhealth"). + For(&v1.NodePool{}, builder.WithPredicates(nodepoolutils.IsManagedPredicateFuncs(c.cloudProvider))). + WithOptions(controller.Options{MaxConcurrentReconciles: 10}) + for _, nodeClass := range c.cloudProvider.GetSupportedNodeClasses() { + b.Watches(nodeClass, nodepoolutils.NodeClassEventHandler(c.kubeClient)) + } + return b.Complete(reconcile.AsReconciler(m.GetClient(), c)) +} diff --git a/pkg/controllers/nodepool/registrationhealth/suite_test.go b/pkg/controllers/nodepool/registrationhealth/suite_test.go new file mode 100644 index 0000000000..57eb5a8a64 --- /dev/null +++ b/pkg/controllers/nodepool/registrationhealth/suite_test.go @@ -0,0 +1,131 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrationhealth_test + +import ( + "context" + "testing" + + "sigs.k8s.io/karpenter/pkg/controllers/nodepool/registrationhealth" + + "github.com/awslabs/operatorpkg/object" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "sigs.k8s.io/karpenter/pkg/apis" + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider/fake" + "sigs.k8s.io/karpenter/pkg/test" + . "sigs.k8s.io/karpenter/pkg/test/expectations" + "sigs.k8s.io/karpenter/pkg/test/v1alpha1" + . "sigs.k8s.io/karpenter/pkg/utils/testing" +) + +var ( + controller *registrationhealth.Controller + ctx context.Context + env *test.Environment + cloudProvider *fake.CloudProvider + nodePool *v1.NodePool + nodeClass *v1alpha1.TestNodeClass +) + +func TestAPIs(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "RegistrationHealth") +} + +var _ = BeforeSuite(func() { + cloudProvider = fake.NewCloudProvider() + env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...)) + controller = registrationhealth.NewController(env.Client, cloudProvider) +}) +var _ = AfterEach(func() { + ExpectCleanedUp(ctx, env.Client) +}) + +var _ = AfterSuite(func() { + Expect(env.Stop()).To(Succeed(), "Failed to stop environment") +}) + +var _ = Describe("RegistrationHealth", func() { + BeforeEach(func() { + nodePool = test.NodePool() + nodeClass = test.NodeClass(v1alpha1.TestNodeClass{ + ObjectMeta: metav1.ObjectMeta{Name: nodePool.Spec.Template.Spec.NodeClassRef.Name}, + }) + nodePool.Spec.Template.Spec.NodeClassRef.Group = object.GVK(nodeClass).Group + nodePool.Spec.Template.Spec.NodeClassRef.Kind = object.GVK(nodeClass).Kind + _ = nodePool.StatusConditions().Clear(v1.ConditionTypeNodeRegistrationHealthy) + }) + It("should ignore setting NodeRegistrationHealthy status condition on NodePools which aren't managed by this instance of Karpenter", func() { + nodePool.Spec.Template.Spec.NodeClassRef = &v1.NodeClassReference{ + Group: "karpenter.test.sh", + Kind: "UnmanagedNodeClass", + Name: "default", + } + ExpectApplied(ctx, env.Client, nodePool, nodeClass) + _ = ExpectObjectReconciled(ctx, env.Client, controller, nodePool) + nodePool = ExpectExists(ctx, env.Client, nodePool) + Expect(nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy)).To(BeNil()) + }) + It("should not set NodeRegistrationHealthy status condition on nodePool when nodeClass does not exist", func() { + ExpectApplied(ctx, env.Client, nodePool) + ExpectObjectReconciled(ctx, env.Client, controller, nodePool) + nodePool = ExpectExists(ctx, env.Client, nodePool) + Expect(nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy)).To(BeNil()) + }) + It("should set NodeRegistrationHealthy status condition on nodePool as Unknown if the nodeClass observed generation doesn't match with that on nodePool", func() { + nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "unhealthy", "unhealthy") + ExpectApplied(ctx, env.Client, nodePool, nodeClass) + Expect(nodePool.Status.NodeClassObservedGeneration).To(Equal(int64(0))) + + nodePool.Spec.Limits = map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("14")} + ExpectApplied(ctx, env.Client, nodePool, nodeClass) + _ = ExpectObjectReconciled(ctx, env.Client, controller, nodePool) + nodePool = ExpectExists(ctx, env.Client, nodePool) + Expect(nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown()).To(BeTrue()) + Expect(nodePool.Status.NodeClassObservedGeneration).To(Equal(int64(1))) + }) + It("should set NodeRegistrationHealthy status condition on nodePool as Unknown if the nodePool is updated", func() { + ExpectApplied(ctx, env.Client, nodePool, nodeClass) + _ = ExpectObjectReconciled(ctx, env.Client, controller, nodePool) + nodePool = ExpectExists(ctx, env.Client, nodePool) + Expect(nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown()).To(BeTrue()) + Expect(nodePool.Status.NodeClassObservedGeneration).To(Equal(int64(1))) + }) + It("should not set NodeRegistrationHealthy status condition on nodePool as Unknown if it is already set to true", func() { + nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy) + nodePool.Status.NodeClassObservedGeneration = int64(1) + ExpectApplied(ctx, env.Client, nodePool, nodeClass) + _ = ExpectObjectReconciled(ctx, env.Client, controller, nodePool) + nodePool = ExpectExists(ctx, env.Client, nodePool) + Expect(nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown()).To(BeFalse()) + }) + It("should not set NodeRegistrationHealthy status condition on nodePool as Unknown if it is already set to false", func() { + nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "unhealthy", "unhealthy") + nodePool.Status.NodeClassObservedGeneration = int64(1) + ExpectApplied(ctx, env.Client, nodePool, nodeClass) + _ = ExpectObjectReconciled(ctx, env.Client, controller, nodePool) + nodePool = ExpectExists(ctx, env.Client, nodePool) + Expect(nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown()).To(BeFalse()) + }) +}) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index b8b57ff3bc..1c5cce28ae 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -343,10 +343,25 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { } // Mark in memory when these pods were marked as schedulable or when we made a decision on the pods p.cluster.MarkPodSchedulingDecisions(results.PodErrors, pendingPods...) + p.MarkPodSchedulingDecisionsNodeRegistrationHealthy(ctx, results) results.Record(ctx, p.recorder, p.cluster) return results, nil } +// MarkPodSchedulingDecisionsNodeRegistrationHealthy iterates through the nodeClaims +// and existing nodes and marks podHealthyNodePoolScheduledTime time for pods +func (p *Provisioner) MarkPodSchedulingDecisionsNodeRegistrationHealthy(ctx context.Context, results scheduler.Results) { + now := time.Now() + for _, n := range results.NewNodeClaims { + p.cluster.UpdatePodHealthyNodePoolScheduledTime(ctx, n.Labels[v1.NodePoolLabelKey], now, n.Pods...) + } + for _, n := range results.ExistingNodes { + if nodePoolName, ok := n.Labels()[v1.NodePoolLabelKey]; ok { + p.cluster.UpdatePodHealthyNodePoolScheduledTime(ctx, nodePoolName, now, n.Pods...) + } + } +} + func (p *Provisioner) Create(ctx context.Context, n *scheduler.NodeClaim, opts ...option.Function[LaunchOptions]) (string, error) { ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodePool", klog.KRef("", n.NodePoolName))) options := option.Resolve(opts...) diff --git a/pkg/controllers/provisioning/suite_test.go b/pkg/controllers/provisioning/suite_test.go index 0b6625322f..8afa9fd840 100644 --- a/pkg/controllers/provisioning/suite_test.go +++ b/pkg/controllers/provisioning/suite_test.go @@ -245,6 +245,34 @@ var _ = Describe("Provisioning", func() { Expect(cluster.PodSchedulingDecisionTime(nn).IsZero()).To(BeFalse()) ExpectMetricHistogramSampleCountValue("karpenter_pods_scheduling_decision_duration_seconds", 1, nil) }) + It("should mark podHealthyNodePoolScheduledTime if it is scheduled against a nodePool with NodeRegistrationHealthy=true", func() { + nodePool := test.NodePool() + nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy) + nodePool.StatusConditions().SetTrue(v1.ConditionTypeValidationSucceeded) + nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeClassReady) + ExpectApplied(ctx, env.Client, nodePool) + pod := test.UnschedulablePod() + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) + nodes := &corev1.NodeList{} + Expect(env.Client.List(ctx, nodes)).To(Succeed()) + Expect(len(nodes.Items)).To(Equal(1)) + ExpectScheduled(ctx, env.Client, pod) + Expect(cluster.PodSchedulingSuccessTimeRegistrationHealthyCheck(client.ObjectKeyFromObject(pod)).IsZero()).To(BeFalse()) + }) + It("should not mark podHealthyNodePoolScheduledTime if it is scheduled against a nodePool with NodeRegistrationHealthy=False", func() { + nodePool := test.NodePool() + nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "unhealthy", "unhealthy") + nodePool.StatusConditions().SetTrue(v1.ConditionTypeValidationSucceeded) + nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeClassReady) + ExpectApplied(ctx, env.Client, nodePool) + pod := test.UnschedulablePod() + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) + nodes := &corev1.NodeList{} + Expect(env.Client.List(ctx, nodes)).To(Succeed()) + Expect(len(nodes.Items)).To(Equal(1)) + ExpectScheduled(ctx, env.Client, pod) + Expect(cluster.PodSchedulingSuccessTimeRegistrationHealthyCheck(client.ObjectKeyFromObject(pod)).IsZero()).To(BeTrue()) + }) It("should provision nodes for pods with supported node selectors", func() { nodePool := test.NodePool() schedulable := []*corev1.Pod{ diff --git a/pkg/controllers/state/cluster.go b/pkg/controllers/state/cluster.go index b1a7e8853f..72cd8c6c4b 100644 --- a/pkg/controllers/state/cluster.go +++ b/pkg/controllers/state/cluster.go @@ -58,9 +58,10 @@ type Cluster struct { nodeClaimNameToProviderID map[string]string // node claim name -> provider id daemonSetPods sync.Map // daemonSet -> existing pod - podAcks sync.Map // pod namespaced name -> time when Karpenter first saw the pod as pending - podsSchedulingAttempted sync.Map // pod namespaced name -> time when Karpenter tried to schedule a pod - podsSchedulableTimes sync.Map // pod namespaced name -> time when it was first marked as able to fit to a node + podAcks sync.Map // pod namespaced name -> time when Karpenter first saw the pod as pending + podsSchedulingAttempted sync.Map // pod namespaced name -> time when Karpenter tried to schedule a pod + podsSchedulableTimes sync.Map // pod namespaced name -> time when it was first marked as able to fit to a node + podHealthyNodePoolScheduledTime sync.Map // pod namespaced name -> time when pod scheduled to a nodePool that has NodeRegistrationHealthy=true, is marked as able to fit to a node clusterStateMu sync.RWMutex // Separate mutex as this is called in some places that mu is held // A monotonically increasing timestamp representing the time state of the @@ -83,9 +84,11 @@ func NewCluster(clk clock.Clock, client client.Client, cloudProvider cloudprovid daemonSetPods: sync.Map{}, nodeNameToProviderID: map[string]string{}, nodeClaimNameToProviderID: map[string]string{}, - podAcks: sync.Map{}, - podsSchedulableTimes: sync.Map{}, - podsSchedulingAttempted: sync.Map{}, + + podAcks: sync.Map{}, + podsSchedulableTimes: sync.Map{}, + podsSchedulingAttempted: sync.Map{}, + podHealthyNodePoolScheduledTime: sync.Map{}, } } @@ -384,6 +387,26 @@ func (c *Cluster) MarkPodSchedulingDecisions(podErrors map[*corev1.Pod]error, po } } +// UpdatePodHealthyNodePoolScheduledTime updates podHealthyNodePoolScheduledTime +// for pods scheduled against nodePool that have NodeRegistrationHealthy=true +func (c *Cluster) UpdatePodHealthyNodePoolScheduledTime(ctx context.Context, nodePoolName string, schedulableTime time.Time, pods ...*corev1.Pod) { + nodePool := &v1.NodePool{} + if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err == nil { + for _, p := range pods { + nn := client.ObjectKeyFromObject(p) + // If the pod is scheduled to a nodePool and if the nodePool has NodeRegistrationHealthy=true + // then mark the time when we thought it can schedule to now. + if nodePool.StatusConditions().IsTrue(v1.ConditionTypeNodeRegistrationHealthy) { + c.podHealthyNodePoolScheduledTime.Store(nn, schedulableTime) + } else { + // If the pod was scheduled to a healthy nodePool earlier but is now getting scheduled to an + // unhealthy one then we need to delete its entry from the map because it will not schedule successfully + c.podHealthyNodePoolScheduledTime.Delete(nn) + } + } + } +} + // PodSchedulingDecisionTime returns when Karpenter first decided if a pod could schedule a pod in scheduling simulations. // This returns 0, false if Karpenter never made a decision on the pod. func (c *Cluster) PodSchedulingDecisionTime(podKey types.NamespacedName) time.Time { @@ -402,6 +425,15 @@ func (c *Cluster) PodSchedulingSuccessTime(podKey types.NamespacedName) time.Tim return time.Time{} } +// PodSchedulingSuccessTimeRegistrationHealthyCheck returns when Karpenter first thought it could schedule a pod in its scheduling simulation. +// This returns 0, false if the pod was never considered in scheduling as a pending pod. +func (c *Cluster) PodSchedulingSuccessTimeRegistrationHealthyCheck(podKey types.NamespacedName) time.Time { + if val, found := c.podHealthyNodePoolScheduledTime.Load(podKey); found { + return val.(time.Time) + } + return time.Time{} +} + func (c *Cluster) DeletePod(podKey types.NamespacedName) { c.mu.Lock() defer c.mu.Unlock() @@ -416,6 +448,7 @@ func (c *Cluster) ClearPodSchedulingMappings(podKey types.NamespacedName) { c.podAcks.Delete(podKey) c.podsSchedulableTimes.Delete(podKey) c.podsSchedulingAttempted.Delete(podKey) + c.podHealthyNodePoolScheduledTime.Delete(podKey) } // MarkUnconsolidated marks the cluster state as being unconsolidated. This should be called in any situation where diff --git a/pkg/controllers/state/suite_test.go b/pkg/controllers/state/suite_test.go index 6f2773221c..da017b9c59 100644 --- a/pkg/controllers/state/suite_test.go +++ b/pkg/controllers/state/suite_test.go @@ -99,6 +99,26 @@ var _ = AfterEach(func() { cluster.Reset() cloudProvider.Reset() }) +var _ = Describe("Pod Healthy NodePool", func() { + It("should not store pod schedulable time if the nodePool that pod is scheduled to does not have NodeRegistrationHealthy=true", func() { + pod := test.Pod() + ExpectApplied(ctx, env.Client, pod, nodePool) + + cluster.UpdatePodHealthyNodePoolScheduledTime(ctx, nodePool.Name, time.Now(), []*corev1.Pod{pod}...) + setTime := cluster.PodSchedulingSuccessTimeRegistrationHealthyCheck(client.ObjectKeyFromObject(pod)) + Expect(setTime.IsZero()).To(BeTrue()) + }) + It("should store pod schedulable time if the nodePool that pod is scheduled to has NodeRegistrationHealthy=true", func() { + pod := test.Pod() + nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy) + ExpectApplied(ctx, env.Client, pod, nodePool) + + now := time.Now() + cluster.UpdatePodHealthyNodePoolScheduledTime(ctx, nodePool.Name, now, []*corev1.Pod{pod}...) + setTime := cluster.PodSchedulingSuccessTimeRegistrationHealthyCheck(client.ObjectKeyFromObject(pod)) + Expect(setTime).To(Equal(now)) + }) +}) var _ = Describe("Pod Ack", func() { It("should only mark pods as schedulable once", func() { diff --git a/pkg/utils/nodepool/nodepool.go b/pkg/utils/nodepool/nodepool.go index 298f133dbc..fa8f36180d 100644 --- a/pkg/utils/nodepool/nodepool.go +++ b/pkg/utils/nodepool/nodepool.go @@ -39,6 +39,18 @@ func IsManaged(nodePool *v1.NodePool, cp cloudprovider.CloudProvider) bool { }) } +func GetNodeClass(ctx context.Context, c client.Client, nodePool *v1.NodePool, cp cloudprovider.CloudProvider) (status.Object, error) { + if nodeClass, ok := lo.Find(cp.GetSupportedNodeClasses(), func(nodeClass status.Object) bool { + return object.GVK(nodeClass).GroupKind() == nodePool.Spec.Template.Spec.NodeClassRef.GroupKind() + }); ok { + if err := c.Get(ctx, client.ObjectKey{Name: nodePool.Spec.Template.Spec.NodeClassRef.Name}, nodeClass); err != nil { + return nodeClass, err + } + return nodeClass, nil + } + return nil, nil +} + // IsManagedPredicateFuncs is used to filter controller-runtime NodeClaim watches to NodeClaims managed by the given cloudprovider. func IsManagedPredicateFuncs(cp cloudprovider.CloudProvider) predicate.Funcs { return predicate.NewPredicateFuncs(func(o client.Object) bool {