Skip to content

Commit

Permalink
remaining updates
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Feb 20, 2025
1 parent 290a7dc commit c07b539
Show file tree
Hide file tree
Showing 12 changed files with 339 additions and 106 deletions.
1 change: 0 additions & 1 deletion pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
func init() {
v1.WellKnownLabels = v1.WellKnownLabels.Insert(v1alpha1.LabelReservationID)
cloudprovider.ReservationIDLabel = v1alpha1.LabelReservationID
cloudprovider.ReservedCapacityPriceFactor = 1.0 / 1_000_000.0
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)
Expand Down
29 changes: 11 additions & 18 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,12 @@ import (
var (
SpotRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeSpot))
OnDemandRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeOnDemand))
ReservedRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeReserved))

// ReservationIDLabel is a label injected into a reserved offering's requirements which is used to uniquely identify a
// reservation. For example, a reservation could be shared across multiple NodePools, and the value encoded in this
// requirement is used to inform the scheduler that a reservation for one should affect the other.
ReservationIDLabel string

// ReservedCapacityPriceFactor is a constant which should be applied when determining the cost of a reserved offering,
// if unavailable from `GetInstanceTypes`.
ReservedCapacityPriceFactor float64
)

type DriftReason string
Expand Down Expand Up @@ -311,21 +308,17 @@ func (ofs Offerings) MostExpensive() *Offering {
})
}

// WorstLaunchPrice gets the worst-case launch price from the offerings that are offered
// on an instance type. If the instance type has a spot offering available, then it uses the spot offering
// to get the launch price; else, it uses the on-demand launch price
// WorstLaunchPrice gets the worst-case launch price from the offerings that are offered on an instance type. Only
// offerings for the capacity type we will launch with are considered. The following precedence order is used to
// determine which capacity type is used: reserved, spot, on-demand.
func (ofs Offerings) WorstLaunchPrice(reqs scheduling.Requirements) float64 {
// We prefer to launch spot offerings, so we will get the worst price based on the node requirements
if reqs.Get(v1.CapacityTypeLabelKey).Has(v1.CapacityTypeSpot) {
spotOfferings := ofs.Compatible(reqs).Compatible(SpotRequirement)
if len(spotOfferings) > 0 {
return spotOfferings.MostExpensive().Price
}
}
if reqs.Get(v1.CapacityTypeLabelKey).Has(v1.CapacityTypeOnDemand) {
onDemandOfferings := ofs.Compatible(reqs).Compatible(OnDemandRequirement)
if len(onDemandOfferings) > 0 {
return onDemandOfferings.MostExpensive().Price
for _, ctReqs := range []scheduling.Requirements{
ReservedRequirement,
SpotRequirement,
OnDemandRequirement,
} {
if compatOfs := ofs.Compatible(reqs).Compatible(ctReqs); len(compatOfs) != 0 {
return compatOfs.MostExpensive().Price
}
}
return math.MaxFloat64
Expand Down
21 changes: 4 additions & 17 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,24 +308,11 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
func getCandidatePrices(candidates []*Candidate) (float64, error) {
var price float64
for _, c := range candidates {
var compatibleOfferings cloudprovider.Offerings
reservedFallback := false
reqs := scheduling.NewLabelRequirements(c.StateNode.Labels())
for {
compatibleOfferings = c.instanceType.Offerings.Compatible(reqs)
if len(compatibleOfferings) != 0 {
break
}
if c.capacityType != v1.CapacityTypeReserved {
return 0.0, fmt.Errorf("unable to determine offering for %s/%s/%s", c.instanceType.Name, c.capacityType, c.zone)
}
// If there are no compatible offerings, but the capacity type for the candidate is reserved, we can fall-back to
// the on-demand offering to derive pricing.
reqs[v1.CapacityTypeLabelKey] = scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeOnDemand)
delete(reqs, cloudprovider.ReservationIDLabel)
reservedFallback = true
compatibleOfferings := c.instanceType.Offerings.Compatible(scheduling.NewLabelRequirements(c.StateNode.Labels()))
if len(compatibleOfferings) == 0 {
return 0.0, fmt.Errorf("unable to determine offering for %s/%s/%s", c.instanceType.Name, c.capacityType, c.zone)
}
price += compatibleOfferings.Cheapest().Price * lo.Ternary(reservedFallback, cloudprovider.ReservedCapacityPriceFactor, 1.0)
price += compatibleOfferings.Cheapest().Price
}
return price, nil
}
139 changes: 139 additions & 0 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"sigs.k8s.io/karpenter/pkg/scheduling"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
"sigs.k8s.io/karpenter/pkg/test/v1alpha1"
)

var _ = Describe("Consolidation", func() {
Expand Down Expand Up @@ -4379,4 +4380,142 @@ var _ = Describe("Consolidation", func() {
Expect(result.RequeueAfter).To(BeNumerically(">", 0))
})
})
Context("Reserved Capacity", func() {
var reservedNodeClaim *v1.NodeClaim
var reservedNode *corev1.Node
var mostExpensiveReservationID string

BeforeEach(func() {
mostExpensiveReservationID = fmt.Sprintf("r-%s", mostExpensiveInstance.Name)
mostExpensiveInstance.Requirements.Add(scheduling.NewRequirement(
cloudprovider.ReservationIDLabel,
corev1.NodeSelectorOpIn,
mostExpensiveReservationID,
))
mostExpensiveInstance.Requirements.Get(v1.CapacityTypeLabelKey).Insert(v1.CapacityTypeReserved)
mostExpensiveInstance.Offerings = append(mostExpensiveInstance.Offerings, &cloudprovider.Offering{
Price: mostExpensiveOffering.Price / 1_000_000.0,
Available: true,
ReservationCapacity: 10,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: v1.CapacityTypeReserved,
corev1.LabelTopologyZone: mostExpensiveOffering.Zone(),
v1alpha1.LabelReservationID: mostExpensiveReservationID,
}),
})
reservedNodeClaim, reservedNode = test.NodeClaimAndNode(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
v1.CapacityTypeLabelKey: v1.CapacityTypeReserved,
corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
cloudprovider.ReservationIDLabel: mostExpensiveReservationID,
},
},
})
reservedNodeClaim.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable)
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{CapacityReservations: lo.ToPtr(true)}}))
})
DescribeTable(
"can replace node",
func(initialCapacityType string) {
nodeClaim = lo.Switch[string, *v1.NodeClaim](initialCapacityType).
Case(v1.CapacityTypeOnDemand, nodeClaim).
Case(v1.CapacityTypeSpot, spotNodeClaim).
Default(reservedNodeClaim)
node = lo.Switch[string, *corev1.Node](initialCapacityType).
Case(v1.CapacityTypeOnDemand, node).
Case(v1.CapacityTypeSpot, spotNode).
Default(reservedNode)

// If the capacity type is reserved, we will need a cheaper reserved instance to consolidat into
var leastExpensiveReservationID string
if initialCapacityType == v1.CapacityTypeReserved {
leastExpensiveReservationID = fmt.Sprintf("r-%s", leastExpensiveInstance.Name)
leastExpensiveInstance.Requirements.Add(scheduling.NewRequirement(
cloudprovider.ReservationIDLabel,
corev1.NodeSelectorOpIn,
leastExpensiveReservationID,
))
leastExpensiveInstance.Requirements.Get(v1.CapacityTypeLabelKey).Insert(v1.CapacityTypeReserved)
leastExpensiveInstance.Offerings = append(leastExpensiveInstance.Offerings, &cloudprovider.Offering{
Price: leastExpensiveOffering.Price / 1_000_000.0,
Available: true,
ReservationCapacity: 10,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: v1.CapacityTypeReserved,
corev1.LabelTopologyZone: leastExpensiveOffering.Zone(),
v1alpha1.LabelReservationID: leastExpensiveReservationID,
}),
})
}

// create our RS so we can link a pod to it
rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())

pod := test.Pod(test.PodOptions{ObjectMeta: metav1.ObjectMeta{
Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: lo.ToPtr(true),
BlockOwnerDeletion: lo.ToPtr(true),
},
},
}})
ExpectApplied(ctx, env.Client, rs, pod, node, nodeClaim, nodePool)

// bind pods to node
ExpectManualBinding(ctx, env.Client, pod, node)

// inform cluster state about nodes and nodeClaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim})

fakeClock.Step(10 * time.Minute)

// consolidation won't delete the old nodeclaim until the new nodeclaim is ready
var wg sync.WaitGroup
ExpectToWait(fakeClock, &wg)
ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1)
ExpectSingletonReconciled(ctx, disruptionController)
wg.Wait()

// Process the item so that the nodes can be deleted.
ExpectSingletonReconciled(ctx, queue)

// Cascade any deletion of the nodeclaim to the node
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim)

// should create a new nodeclaim as there is a cheaper one that can hold the pod
nodeClaims := ExpectNodeClaims(ctx, env.Client)
nodes := ExpectNodes(ctx, env.Client)
Expect(nodeClaims).To(HaveLen(1))
Expect(nodes).To(HaveLen(1))

Expect(nodeClaims[0].Name).ToNot(Equal(nodeClaim.Name))
// If the original capacity type was OD or spot, we should be able to consolidate into the reserved offering of the
// same type.
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Has(corev1.LabelInstanceTypeStable)).To(BeTrue())
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Get(corev1.LabelInstanceTypeStable).Has(mostExpensiveInstance.Name)).To(Equal(initialCapacityType != v1.CapacityTypeReserved))
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Has(cloudprovider.ReservationIDLabel)).To(BeTrue())
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Get(cloudprovider.ReservationIDLabel).Any()).To(Equal(lo.Ternary(
initialCapacityType == v1.CapacityTypeReserved,
leastExpensiveReservationID,
mostExpensiveReservationID,
)))

// and delete the old one
ExpectNotFound(ctx, env.Client, nodeClaim, node)
},
Entry("on-demand", v1.CapacityTypeOnDemand),
Entry("spot", v1.CapacityTypeSpot),
Entry("reserved", v1.CapacityTypeReserved),
)
})
})
78 changes: 78 additions & 0 deletions pkg/controllers/nodeclaim/disruption/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package disruption_test

import (
"fmt"
"time"

"github.com/imdario/mergo"
Expand All @@ -26,10 +27,13 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"sigs.k8s.io/controller-runtime/pkg/client"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/disruption"
"sigs.k8s.io/karpenter/pkg/controllers/nodepool/hash"
"sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/scheduling"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
Expand Down Expand Up @@ -521,4 +525,78 @@ var _ = Describe("Drift", func() {
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted)).To(BeNil())
})
})
Context("Reserved Capacity", func() {
var reservedOffering *cloudprovider.Offering
BeforeEach(func() {
reservedOffering = &cloudprovider.Offering{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: v1.CapacityTypeReserved,
corev1.LabelTopologyZone: "test-zone-1a",
cloudprovider.ReservationIDLabel: fmt.Sprintf("r-%s", it.Name),
}),
Price: it.Offerings[0].Price / 1_000_000.0,
ReservationCapacity: 10,
}
it.Offerings = append(it.Offerings, reservedOffering)
it.Requirements.Get(v1.CapacityTypeLabelKey).Insert(v1.CapacityTypeReserved)

nodePool.Spec.Template.Spec.Requirements = append(nodePool.Spec.Template.Spec.Requirements, v1.NodeSelectorRequirementWithMinValues{
NodeSelectorRequirement: corev1.NodeSelectorRequirement{
Key: v1.CapacityTypeLabelKey,
Operator: corev1.NodeSelectorOpIn,
Values: []string{v1.CapacityTypeReserved},
},
})

for _, o := range []client.Object{nodeClaim, node} {
o.SetLabels(lo.Assign(o.GetLabels(), map[string]string{
v1.CapacityTypeLabelKey: v1.CapacityTypeReserved,
cloudprovider.ReservationIDLabel: reservedOffering.ReservationID(),
}))
}

ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{CapacityReservations: lo.ToPtr(true)}}))
})
// This is required to support cloudproviders dynamically updating the capacity type based on reservation expirations
It("should drift reserved nodeclaim if the capacity type label has been updated", func() {
cp.Drifted = ""
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeFalse())

nodeClaim.Labels[v1.CapacityTypeLabelKey] = v1.CapacityTypeOnDemand
ExpectApplied(ctx, env.Client, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeTrue())
})
It("should drift reserved nodeclaims if there are no reserved offerings available for the nodepool", func() {
cp.Drifted = ""
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeFalse())

it.Offerings = lo.Reject(it.Offerings, func(o *cloudprovider.Offering, _ int) bool {
return o.CapacityType() == v1.CapacityTypeReserved
})
ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeTrue())
})
It("should drift reserved nodeclaims if an offering with the reservation ID is no longer available for the nodepool", func() {
cp.Drifted = ""
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeFalse())

reservedOffering.Requirements[cloudprovider.ReservationIDLabel] = scheduling.NewRequirement(cloudprovider.ReservationIDLabel, corev1.NodeSelectorOpIn, "test")
ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeTrue())
})
})
})
27 changes: 22 additions & 5 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,30 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
}
return scheduler.Results{}, fmt.Errorf("creating scheduler, %w", err)
}

results := s.Solve(ctx, pods).TruncateInstanceTypes(scheduler.MaxInstanceTypes)
if len(results.ReservedOfferingErrors) != 0 {
log.FromContext(ctx).V(1).WithValues("Pods", pretty.Slice(lo.Map(lo.Keys(results.ReservedOfferingErrors), func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5)).Info("deferring scheduling decision for provisionable pod(s) to future simulation due to limited reserved offering capacity")
}
scheduler.UnschedulablePodsCount.Set(float64(len(results.PodErrors)), map[string]string{scheduler.ControllerLabel: injection.GetControllerName(ctx)})
reservedOfferingErrors := results.ReservedOfferingErrors()
if len(reservedOfferingErrors) != 0 {
log.FromContext(ctx).V(1).WithValues(
"Pods", pretty.Slice(lo.Map(lo.Keys(reservedOfferingErrors), func(p *corev1.Pod, _ int) string {
return klog.KRef(p.Namespace, p.Name).String()
}), 5),
).Info("deferring scheduling decision for provisionable pod(s) to future simulation due to limited reserved offering capacity")
}
scheduler.UnschedulablePodsCount.Set(
// A reserved offering error doesn't indicate a pod is unschedulable, just that the scheduling decision was deferred.
float64(len(results.PodErrors)-len(reservedOfferingErrors)),
map[string]string{
scheduler.ControllerLabel: injection.GetControllerName(ctx),
},
)
if len(results.NewNodeClaims) > 0 {
log.FromContext(ctx).WithValues("Pods", pretty.Slice(lo.Map(pods, func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5), "duration", time.Since(start)).Info("found provisionable pod(s)")
log.FromContext(ctx).WithValues(
"Pods", pretty.Slice(lo.Map(pods, func(p *corev1.Pod, _ int) string {
return klog.KRef(p.Namespace, p.Name).String()
}), 5),
"duration", time.Since(start),
).Info("found provisionable pod(s)")
}
// Mark in memory when these pods were marked as schedulable or when we made a decision on the pods
p.cluster.MarkPodSchedulingDecisions(results.PodErrors, pendingPods...)
Expand Down
Loading

0 comments on commit c07b539

Please sign in to comment.