Skip to content

Commit

Permalink
feat: add NodeRegistrationHealthy status condition to nodepool
Browse files Browse the repository at this point in the history
  • Loading branch information
jigisha620 committed Feb 26, 2025
1 parent a863104 commit deab154
Show file tree
Hide file tree
Showing 16 changed files with 543 additions and 17 deletions.
6 changes: 6 additions & 0 deletions kwok/charts/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,12 @@ spec:
- type
type: object
type: array
nodeClassObservedGeneration:
description: |-
NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match
the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
format: int64
type: integer
resources:
additionalProperties:
anyOf:
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,12 @@ spec:
- type
type: object
type: array
nodeClassObservedGeneration:
description: |-
NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match
the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
format: int64
type: integer
resources:
additionalProperties:
anyOf:
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/v1/nodepool_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@ const (
ConditionTypeValidationSucceeded = "ValidationSucceeded"
// ConditionTypeNodeClassReady = "NodeClassReady" condition indicates that underlying nodeClass was resolved and is reporting as Ready
ConditionTypeNodeClassReady = "NodeClassReady"
// ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy" condition indicates if a misconfiguration exists that is preventing successful node launch/registrations that requires manual investigation
ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy"
)

// NodePoolStatus defines the observed state of NodePool
type NodePoolStatus struct {
// Resources is the list of resources that have been provisioned.
// +optional
Resources v1.ResourceList `json:"resources,omitempty"`
// NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match
// the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
// +optional
NodeClassObservedGeneration int64 `json:"nodeClassObservedGeneration,omitempty"`
// Conditions contains signals for health and readiness
// +optional
Conditions []status.Condition `json:"conditions,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
nodepoolcounter "sigs.k8s.io/karpenter/pkg/controllers/nodepool/counter"
nodepoolhash "sigs.k8s.io/karpenter/pkg/controllers/nodepool/hash"
nodepoolreadiness "sigs.k8s.io/karpenter/pkg/controllers/nodepool/readiness"
nodepoolregistrationhealth "sigs.k8s.io/karpenter/pkg/controllers/nodepool/registrationhealth"
nodepoolvalidation "sigs.k8s.io/karpenter/pkg/controllers/nodepool/validation"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
"sigs.k8s.io/karpenter/pkg/controllers/state"
Expand Down Expand Up @@ -88,6 +89,7 @@ func NewControllers(
metricsnodepool.NewController(kubeClient, cloudProvider),
metricsnode.NewController(cluster),
nodepoolreadiness.NewController(kubeClient, cloudProvider),
nodepoolregistrationhealth.NewController(kubeClient, cloudProvider),
nodepoolcounter.NewController(kubeClient, cloudProvider, cluster),
nodepoolvalidation.NewController(kubeClient, cloudProvider),
podevents.NewController(clock, kubeClient, cloudProvider),
Expand Down
43 changes: 41 additions & 2 deletions pkg/controllers/nodeclaim/lifecycle/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/errors"

"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/controller-runtime/pkg/log"

"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -51,6 +56,12 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco
if ttl := registrationTTL - l.clock.Since(registered.LastTransitionTime.Time); ttl > 0 {
return reconcile.Result{RequeueAfter: ttl}, nil
}
if err := l.updateNodePoolRegistrationHealth(ctx, nodeClaim); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
// Delete the NodeClaim if we believe the NodeClaim won't register since we haven't seen the node
if err := l.kubeClient.Delete(ctx, nodeClaim); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
Expand All @@ -61,6 +72,34 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: nodeClaim.Labels[v1.CapacityTypeLabelKey],
})

return reconcile.Result{}, nil
}

// updateNodePoolRegistrationHealth sets the NodeRegistrationHealthy=False
// on the NodePool if the nodeClaim fails to launch/register
func (l *Liveness) updateNodePoolRegistrationHealth(ctx context.Context, nodeClaim *v1.NodeClaim) error {
nodePoolName := nodeClaim.Labels[v1.NodePoolLabelKey]
if nodePoolName != "" {
nodePool := &v1.NodePool{}
if err := l.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
return client.IgnoreNotFound(err)
}
if nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown() {
stored := nodePool.DeepCopy()
// If the nodeClaim failed to register during the TTL set NodeRegistrationHealthy status condition on
// NodePool to False. If the launch failed get the launch failure reason and message from nodeClaim.
if launchCondition := nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched); launchCondition.IsTrue() {
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "RegistrationFailed", "Failed to register node")
} else {
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, launchCondition.Reason, launchCondition.Message)
}
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := l.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
return err
}
}
}
return nil
}
61 changes: 61 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package lifecycle_test
import (
"time"

"github.com/awslabs/operatorpkg/status"

operatorpkg "github.com/awslabs/operatorpkg/test/expectations"
. "github.com/onsi/ginkgo/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -78,6 +81,12 @@ var _ = Describe("Liveness", func() {
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
if isManagedNodeClaim {
ExpectNotFound(ctx, env.Client, nodeClaim)
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionFalse,
Reason: "RegistrationFailed",
Message: "Failed to register node",
})
} else {
ExpectExists(ctx, env.Client, nodeClaim)
}
Expand Down Expand Up @@ -138,6 +147,58 @@ var _ = Describe("Liveness", func() {
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionFalse,
Reason: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Reason,
Message: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Message,
})
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectNotFound(ctx, env.Client, nodeClaim)
})
It("should not update NodeRegistrationHealthy status condition if it is already set to True", func() {
nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy)
nodeClaim := test.NodeClaim(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
},
},
Spec: v1.NodeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourcePods: resource.MustParse("5"),
fake.ResourceGPUVendorA: resource.MustParse("1"),
},
},
},
})
cloudProvider.AllowedCreateCalls = 0 // Don't allow Create() calls to succeed
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)

// NodeClaim registration failed, but we should not update the NodeRegistrationHealthy status condition if it is already True
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{Type: v1.ConditionTypeNodeRegistrationHealthy, Status: metav1.ConditionTrue})
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectNotFound(ctx, env.Client, nodeClaim)
})
It("should not block on updating NodeRegistrationHealthy status condition if nodeClaim is not owned by a nodePool", func() {
nodeClaim := test.NodeClaim()
cloudProvider.AllowedCreateCalls = 0 // Don't allow Create() calls to succeed
ExpectApplied(ctx, env.Client, nodeClaim)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
Expand Down
30 changes: 30 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/types"

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -83,9 +85,37 @@ func (r *Registration) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (
metrics.NodesCreatedTotal.Inc(map[string]string{
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
})
if err := r.updateNodePoolRegistrationHealth(ctx, nodeClaim); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

// updateNodePoolRegistrationHealth sets the NodeRegistrationHealthy=True
// on the NodePool if the nodeClaim that registered is owned by a NodePool
func (r *Registration) updateNodePoolRegistrationHealth(ctx context.Context, nodeClaim *v1.NodeClaim) error {
nodePoolName := nodeClaim.Labels[v1.NodePoolLabelKey]
if nodePoolName != "" {
nodePool := &v1.NodePool{}
if err := r.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
return client.IgnoreNotFound(err)
}
storedNodePool := nodePool.DeepCopy()
if nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy) {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := r.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(storedNodePool, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
return err
}
}
}
return nil
}

func (r *Registration) syncNode(ctx context.Context, nodeClaim *v1.NodeClaim, node *corev1.Node) error {
stored := node.DeepCopy()
controllerutil.AddFinalizer(node, v1.TerminationFinalizer)
Expand Down
49 changes: 49 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/registration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ limitations under the License.
package lifecycle_test

import (
"time"

"github.com/awslabs/operatorpkg/status"
operatorpkg "github.com/awslabs/operatorpkg/test/expectations"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -55,6 +59,7 @@ var _ = Describe("Registration", func() {
})
}
nodeClaim := test.NodeClaim(nodeClaimOpts...)
nodePool.StatusConditions().SetUnknown(v1.ConditionTypeNodeRegistrationHealthy)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expand All @@ -67,6 +72,10 @@ var _ = Describe("Registration", func() {
if isManagedNodeClaim {
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsTrue()).To(BeTrue())
Expect(nodeClaim.Status.NodeName).To(Equal(node.Name))
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionTrue,
})
} else {
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsUnknown()).To(BeTrue())
Expect(nodeClaim.Status.NodeName).To(Equal(""))
Expand Down Expand Up @@ -380,4 +389,44 @@ var _ = Describe("Registration", func() {
node = ExpectExists(ctx, env.Client, node)
Expect(node.Spec.Taints).To(HaveLen(0))
})
It("should add NodeRegistrationHealthy=true on the nodePool if registration succeeds and if it was previously false", func() {
nodeClaimOpts := []v1.NodeClaim{{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
},
},
}}
nodeClaim := test.NodeClaim(nodeClaimOpts...)
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "unhealthy", "unhealthy")
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

node := test.Node(test.NodeOptions{ProviderID: nodeClaim.Status.ProviderID, Taints: []corev1.Taint{v1.UnregisteredNoExecuteTaint}})
ExpectApplied(ctx, env.Client, node)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsTrue()).To(BeTrue())
Expect(nodeClaim.Status.NodeName).To(Equal(node.Name))
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionTrue,
})
})
It("should not block on updating NodeRegistrationHealthy status condition if nodeClaim is not owned by a nodePool", func() {
nodeClaim := test.NodeClaim()
ExpectApplied(ctx, env.Client, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

node := test.Node(test.NodeOptions{ProviderID: nodeClaim.Status.ProviderID, Taints: []corev1.Taint{v1.UnregisteredNoExecuteTaint}})
ExpectApplied(ctx, env.Client, node)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsTrue()).To(BeTrue())
Expect(nodeClaim.Status.NodeName).To(Equal(node.Name))
})
})
11 changes: 2 additions & 9 deletions pkg/controllers/nodepool/readiness/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package readiness
import (
"context"

"github.com/awslabs/operatorpkg/object"
"github.com/awslabs/operatorpkg/status"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -55,15 +53,10 @@ func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reco
ctx = injection.WithControllerName(ctx, "nodepool.readiness")
stored := nodePool.DeepCopy()

nodeClass, ok := lo.Find(c.cloudProvider.GetSupportedNodeClasses(), func(nc status.Object) bool {
return object.GVK(nc).GroupKind() == nodePool.Spec.Template.Spec.NodeClassRef.GroupKind()
})
if !ok {
// Ignore NodePools which aren't using a supported NodeClass.
nodeClass, err := nodepoolutils.GetNodeClass(ctx, c.kubeClient, nodePool, c.cloudProvider)
if nodeClass == nil {
return reconcile.Result{}, nil
}

err := c.kubeClient.Get(ctx, client.ObjectKey{Name: nodePool.Spec.Template.Spec.NodeClassRef.Name}, nodeClass)
if client.IgnoreNotFound(err) != nil {
return reconcile.Result{}, err
}
Expand Down
Loading

0 comments on commit deab154

Please sign in to comment.