Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reserved capacity support #1911

Merged
merged 10 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions designs/capacity-reservations.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ Karpenter doesn't currently support reasoning about this capacity type. Karpente
3. Karpenter should add logic to its scheduler to reason about this availability as an `int` -- ensuring that the scheduler never schedules more offerings of an instance type for a capacity type than are available
4. Karpenter should extend its CloudProvider [InstanceType](https://github.com/kubernetes-sigs/karpenter/blob/35d6197e38e64cd6abfef71a082aee80e38d09fd/pkg/cloudprovider/types.go#L75) struct to allow offerings to represent availability of an offering as an `int` rather than a `bool` -- allowing Cloud Providers to represent the constrained capacity of `reserved`
5. Karpenter should consolidate between `on-demand` and/or `spot` instance types to `reserved` when the capacity type is available
6. Karpenter should introduce a feature flag `FEATURE_FLAG=CapacityReservations` to gate this new feature in `ALPHA` when it's introduced
6. Karpenter should introduce a feature flag `FEATURE_FLAG=ReservedCapacity` to gate this new feature in `ALPHA` when it's introduced

### `karpenter.sh/capacity-type` API

_Note: Some excerpts taken from [`aws/karpenter-provider-aws` RFC](https://github.com/aws/karpenter-provider-aws/blob/main/designs/odcr.md#nodepool-api)._

This RFC proposes the addition of a new `karpenter.sh/capacity-type` label value, called `reserved`. A cluster admin could then select to support only launching reserved node capacity and falling back between reserved capacity to on-demand (or even spot) capacity respectively.
This RFC proposes the addition of a new `karpenter.sh/capacity-type` label value, called `reserved`. A cluster admin could then select to support only launching reserved node capacity and falling back between reserved capacity to on-demand (or even spot) capacity respectively.

_Note: This option requires any applications (pods) that are using node selection on `karpenter.sh/capacity-type: "on-demand"` to expand their selection to include `reserved` or to update it to perform a `NotIn` node affinity on `karpenter.sh/capacity-type: spot`_

Expand Down Expand Up @@ -140,4 +140,4 @@ In practice, this means that if a user has two capacity reservation offerings av

## Appendix

1. AWS Cloud Provider's RFC for On-Demand Capacity Reservations: https://github.com/aws/karpenter-provider-aws/blob/main/designs/odcr.md
1. AWS Cloud Provider's RFC for On-Demand Capacity Reservations: https://github.com/aws/karpenter-provider-aws/blob/main/designs/odcr.md
4 changes: 2 additions & 2 deletions kwok/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ func (c CloudProvider) toNode(nodeClaim *v1.NodeClaim) (*corev1.Node, error) {

availableOfferings := it.Offerings.Available().Compatible(requirements)

offeringsByPrice := lo.GroupBy(availableOfferings, func(of cloudprovider.Offering) float64 { return of.Price })
offeringsByPrice := lo.GroupBy(availableOfferings, func(of *cloudprovider.Offering) float64 { return of.Price })
minOfferingPrice := lo.Min(lo.Keys(offeringsByPrice))
if cheapestOffering == nil || minOfferingPrice < cheapestOffering.Price {
cheapestOffering = lo.ToPtr(lo.Sample(offeringsByPrice[minOfferingPrice]))
cheapestOffering = lo.Sample(offeringsByPrice[minOfferingPrice])
instanceType = it
}
}
Expand Down
7 changes: 4 additions & 3 deletions kwok/cloudprovider/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,14 @@ func newInstanceType(options InstanceTypeOptions) *cloudprovider.InstanceType {
return &cloudprovider.InstanceType{
Name: options.Name,
Requirements: requirements,
Offerings: lo.Map(options.Offerings, func(off KWOKOffering, _ int) cloudprovider.Offering {
return cloudprovider.Offering{
Offerings: lo.Map(options.Offerings, func(off KWOKOffering, _ int) *cloudprovider.Offering {
return &cloudprovider.Offering{
ReservationCapacity: off.ReservationCapacity,
Requirements: scheduling.NewRequirements(lo.Map(off.Requirements, func(req corev1.NodeSelectorRequirement, _ int) *scheduling.Requirement {
return scheduling.NewRequirement(req.Key, req.Operator, req.Values...)
})...),
Price: off.Offering.Price,
Available: off.Offering.Available,
Available: off.Available,
}
}),
Capacity: options.Resources,
Expand Down
2 changes: 1 addition & 1 deletion kwok/tools/gen_instance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func constructGenericInstanceTypes() []kwok.InstanceTypeOptions {
corev1.NodeSelectorRequirement{Key: corev1.LabelTopologyZone, Operator: corev1.NodeSelectorOpIn, Values: []string{zone}},
},
Offering: cloudprovider.Offering{
Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price),
Available: true,
Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price),
},
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
ArchitectureArm64 = "arm64"
CapacityTypeSpot = "spot"
CapacityTypeOnDemand = "on-demand"
CapacityTypeReserved = "reserved"
)

// Karpenter specific domains and labels
Expand Down
43 changes: 35 additions & 8 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ import (
"sigs.k8s.io/karpenter/pkg/utils/resources"
)

func init() {
v1.WellKnownLabels = v1.WellKnownLabels.Insert(v1alpha1.LabelReservationID)
cloudprovider.ReservationIDLabel = v1alpha1.LabelReservationID
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)

type CloudProvider struct {
Expand Down Expand Up @@ -104,6 +109,7 @@ func (c *CloudProvider) Reset() {
}
}

//nolint:gocyclo
func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1.NodeClaim, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -121,9 +127,16 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v
reqs := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...)
np := &v1.NodePool{ObjectMeta: metav1.ObjectMeta{Name: nodeClaim.Labels[v1.NodePoolLabelKey]}}
instanceTypes := lo.Filter(lo.Must(c.GetInstanceTypes(ctx, np)), func(i *cloudprovider.InstanceType, _ int) bool {
return reqs.IsCompatible(i.Requirements, scheduling.AllowUndefinedWellKnownLabels) &&
i.Offerings.Available().HasCompatible(reqs) &&
resources.Fits(nodeClaim.Spec.Resources.Requests, i.Allocatable())
if !reqs.IsCompatible(i.Requirements, scheduling.AllowUndefinedWellKnownLabels) {
return false
}
if !i.Offerings.Available().HasCompatible(reqs) {
return false
}
if !resources.Fits(nodeClaim.Spec.Resources.Requests, i.Allocatable()) {
return false
}
return true
})
// Order instance types so that we get the cheapest instance types of the available offerings
sort.Slice(instanceTypes, func(i, j int) bool {
Expand All @@ -139,14 +152,28 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v
labels[key] = requirement.Values()[0]
}
}
// Find Offering
for _, o := range instanceType.Offerings.Available() {
if reqs.IsCompatible(o.Requirements, scheduling.AllowUndefinedWellKnownLabels) {
labels[corev1.LabelTopologyZone] = o.Requirements.Get(corev1.LabelTopologyZone).Any()
labels[v1.CapacityTypeLabelKey] = o.Requirements.Get(v1.CapacityTypeLabelKey).Any()
// Find offering, prioritizing reserved instances
var offering *cloudprovider.Offering
offerings := instanceType.Offerings.Available().Compatible(reqs)
lo.Must0(len(offerings) != 0, "created nodeclaim with no available offerings")
for _, o := range offerings {
if o.CapacityType() == v1.CapacityTypeReserved {
o.ReservationCapacity -= 1
if o.ReservationCapacity == 0 {
o.Available = false
}
offering = o
break
}
}
if offering == nil {
offering = offerings[0]
}
// Propagate labels dictated by offering requirements - e.g. zone, capacity-type, and reservation-id
for _, req := range offering.Requirements {
labels[req.Key] = req.Any()
}

created := &v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: nodeClaim.Name,
Expand Down
72 changes: 46 additions & 26 deletions pkg/cloudprovider/fake/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,27 +64,47 @@ func NewInstanceTypeWithCustomRequirement(options InstanceTypeOptions, customReq
options.Resources[corev1.ResourcePods] = resource.MustParse("5")
}
if len(options.Offerings) == 0 {
options.Offerings = []cloudprovider.Offering{
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-1",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-2",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-1",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-2",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-3",
}), Price: PriceFromResources(options.Resources), Available: true},
options.Offerings = []*cloudprovider.Offering{
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-1",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-2",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-1",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-2",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-3",
}),
Price: PriceFromResources(options.Resources),
},
}
}
if len(options.Architecture) == 0 {
Expand All @@ -97,10 +117,10 @@ func NewInstanceTypeWithCustomRequirement(options InstanceTypeOptions, customReq
scheduling.NewRequirement(corev1.LabelInstanceTypeStable, corev1.NodeSelectorOpIn, options.Name),
scheduling.NewRequirement(corev1.LabelArchStable, corev1.NodeSelectorOpIn, options.Architecture),
scheduling.NewRequirement(corev1.LabelOSStable, corev1.NodeSelectorOpIn, sets.List(options.OperatingSystems)...),
scheduling.NewRequirement(corev1.LabelTopologyZone, corev1.NodeSelectorOpIn, lo.Map(options.Offerings.Available(), func(o cloudprovider.Offering, _ int) string {
scheduling.NewRequirement(corev1.LabelTopologyZone, corev1.NodeSelectorOpIn, lo.Map(options.Offerings.Available(), func(o *cloudprovider.Offering, _ int) string {
return o.Requirements.Get(corev1.LabelTopologyZone).Any()
})...),
scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, lo.Map(options.Offerings.Available(), func(o cloudprovider.Offering, _ int) string {
scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, lo.Map(options.Offerings.Available(), func(o *cloudprovider.Offering, _ int) string {
return o.Requirements.Get(v1.CapacityTypeLabelKey).Any()
})...),
scheduling.NewRequirement(LabelInstanceSize, corev1.NodeSelectorOpDoesNotExist),
Expand Down Expand Up @@ -151,14 +171,14 @@ func InstanceTypesAssorted() []*cloudprovider.InstanceType {
},
}
price := PriceFromResources(opts.Resources)
opts.Offerings = []cloudprovider.Offering{
opts.Offerings = []*cloudprovider.Offering{
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: ct,
corev1.LabelTopologyZone: zone,
}),
Price: price,
Available: true,
Price: price,
},
}
instanceTypes = append(instanceTypes, NewInstanceType(opts))
Expand Down
67 changes: 40 additions & 27 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ import (
var (
SpotRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeSpot))
OnDemandRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeOnDemand))
ReservedRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeReserved))

// ReservationIDLabel is a label injected into a reserved offering's requirements which is used to uniquely identify a
// reservation. For example, a reservation could be shared across multiple NodePools, and the value encoded in this
// requirement is used to inform the scheduler that a reservation for one should affect the other.
ReservationIDLabel string
)

type DriftReason string
Expand Down Expand Up @@ -242,27 +248,38 @@ func (i InstanceTypeOverhead) Total() corev1.ResourceList {
// An Offering describes where an InstanceType is available to be used, with the expectation that its properties
// may be tightly coupled (e.g. the availability of an instance type in some zone is scoped to a capacity type) and
// these properties are captured with labels in Requirements.
// Requirements are required to contain the keys v1.CapacityTypeLabelKey and corev1.LabelTopologyZone
// Requirements are required to contain the keys v1.CapacityTypeLabelKey and corev1.LabelTopologyZone.
type Offering struct {
Requirements scheduling.Requirements
Price float64
// Available is added so that Offerings can return all offerings that have ever existed for an instance type,
// so we can get historical pricing data for calculating savings in consolidation
Available bool
Requirements scheduling.Requirements
Price float64
Available bool
ReservationCapacity int
}

func (o *Offering) CapacityType() string {
return o.Requirements.Get(v1.CapacityTypeLabelKey).Any()
}

func (o *Offering) Zone() string {
return o.Requirements.Get(corev1.LabelTopologyZone).Any()
}

func (o *Offering) ReservationID() string {
return o.Requirements.Get(ReservationIDLabel).Any()
}

type Offerings []Offering
type Offerings []*Offering

// Available filters the available offerings from the returned offerings
func (ofs Offerings) Available() Offerings {
return lo.Filter(ofs, func(o Offering, _ int) bool {
return lo.Filter(ofs, func(o *Offering, _ int) bool {
return o.Available
})
}

// Compatible returns the offerings based on the passed requirements
func (ofs Offerings) Compatible(reqs scheduling.Requirements) Offerings {
return lo.Filter(ofs, func(offering Offering, _ int) bool {
return lo.Filter(ofs, func(offering *Offering, _ int) bool {
return reqs.IsCompatible(offering.Requirements, scheduling.AllowUndefinedWellKnownLabels)
})
}
Expand All @@ -278,34 +295,30 @@ func (ofs Offerings) HasCompatible(reqs scheduling.Requirements) bool {
}

// Cheapest returns the cheapest offering from the returned offerings
func (ofs Offerings) Cheapest() Offering {
return lo.MinBy(ofs, func(a, b Offering) bool {
func (ofs Offerings) Cheapest() *Offering {
return lo.MinBy(ofs, func(a, b *Offering) bool {
return a.Price < b.Price
})
}

// MostExpensive returns the most expensive offering from the return offerings
func (ofs Offerings) MostExpensive() Offering {
return lo.MaxBy(ofs, func(a, b Offering) bool {
func (ofs Offerings) MostExpensive() *Offering {
return lo.MaxBy(ofs, func(a, b *Offering) bool {
return a.Price > b.Price
})
}

// WorstLaunchPrice gets the worst-case launch price from the offerings that are offered
// on an instance type. If the instance type has a spot offering available, then it uses the spot offering
// to get the launch price; else, it uses the on-demand launch price
// WorstLaunchPrice gets the worst-case launch price from the offerings that are offered on an instance type. Only
// offerings for the capacity type we will launch with are considered. The following precedence order is used to
// determine which capacity type is used: reserved, spot, on-demand.
func (ofs Offerings) WorstLaunchPrice(reqs scheduling.Requirements) float64 {
// We prefer to launch spot offerings, so we will get the worst price based on the node requirements
if reqs.Get(v1.CapacityTypeLabelKey).Has(v1.CapacityTypeSpot) {
spotOfferings := ofs.Compatible(reqs).Compatible(SpotRequirement)
if len(spotOfferings) > 0 {
return spotOfferings.MostExpensive().Price
}
}
if reqs.Get(v1.CapacityTypeLabelKey).Has(v1.CapacityTypeOnDemand) {
onDemandOfferings := ofs.Compatible(reqs).Compatible(OnDemandRequirement)
if len(onDemandOfferings) > 0 {
return onDemandOfferings.MostExpensive().Price
for _, ctReqs := range []scheduling.Requirements{
ReservedRequirement,
SpotRequirement,
OnDemandRequirement,
} {
if compatOfs := ofs.Compatible(reqs).Compatible(ctReqs); len(compatOfs) != 0 {
return compatOfs.MostExpensive().Price
}
}
return math.MaxFloat64
Expand Down
13 changes: 11 additions & 2 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
// We are consolidating a node from OD -> [OD,Spot] but have filtered the instance types by cost based on the
// assumption, that the spot variant will launch. We also need to add a requirement to the node to ensure that if
// spot capacity is insufficient we don't replace the node with a more expensive on-demand node. Instead the launch
// should fail and we'll just leave the node alone.
// should fail and we'll just leave the node alone. We don't need to do the same for reserved since the requirements
// are injected on by the scheduler.
ctReq := results.NewNodeClaims[0].Requirements.Get(v1.CapacityTypeLabelKey)
if ctReq.Has(v1.CapacityTypeSpot) && ctReq.Has(v1.CapacityTypeOnDemand) {
results.NewNodeClaims[0].Requirements.Add(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeSpot))
Expand Down Expand Up @@ -307,8 +308,16 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
func getCandidatePrices(candidates []*Candidate) (float64, error) {
var price float64
for _, c := range candidates {
compatibleOfferings := c.instanceType.Offerings.Compatible(scheduling.NewLabelRequirements(c.StateNode.Labels()))
reqs := scheduling.NewLabelRequirements(c.StateNode.Labels())
compatibleOfferings := c.instanceType.Offerings.Compatible(reqs)
if len(compatibleOfferings) == 0 {
// It's expected that offerings may no longer exist for capacity reservations once a NodeClass stops selecting on
// them (or they are no longer considered for some other reason on by the cloudprovider). By definition though,
// reserved capacity is free. By modeling it as free, consolidation won't be able to succeed, but the node should be
// disrupted via drift regardless.
if reqs.Get(v1.CapacityTypeLabelKey).Has(v1.CapacityTypeReserved) {
return 0.0, nil
}
return 0.0, fmt.Errorf("unable to determine offering for %s/%s/%s", c.instanceType.Name, c.capacityType, c.zone)
}
price += compatibleOfferings.Cheapest().Price
Expand Down
Loading