Skip to content

Commit

Permalink
feat: reserved capacity
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Feb 12, 2025
1 parent 45f73ec commit 0547af3
Show file tree
Hide file tree
Showing 24 changed files with 753 additions and 185 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module sigs.k8s.io/karpenter

go 1.23.6
go 1.24.0

require (
github.com/Pallinder/go-randomdata v1.2.0
Expand Down
3 changes: 2 additions & 1 deletion kwok/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"strings"

"github.com/awslabs/operatorpkg/option"
"github.com/awslabs/operatorpkg/status"
"github.com/docker/docker/pkg/namesgenerator"
"github.com/samber/lo"
Expand Down Expand Up @@ -109,7 +110,7 @@ func (c CloudProvider) List(ctx context.Context) ([]*v1.NodeClaim, error) {
}

// Return the hard-coded instance types.
func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, _ ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
return c.instanceTypes, nil
}

Expand Down
4 changes: 2 additions & 2 deletions kwok/cloudprovider/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,11 @@ func newInstanceType(options InstanceTypeOptions) *cloudprovider.InstanceType {
Requirements: requirements,
Offerings: lo.Map(options.Offerings, func(off KWOKOffering, _ int) cloudprovider.Offering {
return cloudprovider.Offering{
ReservationManager: off.Offering.ReservationManager,
Requirements: scheduling.NewRequirements(lo.Map(off.Requirements, func(req corev1.NodeSelectorRequirement, _ int) *scheduling.Requirement {
return scheduling.NewRequirement(req.Key, req.Operator, req.Values...)
})...),
Price: off.Offering.Price,
Available: off.Offering.Available,
Price: off.Offering.Price,
}
}),
Capacity: options.Resources,
Expand Down
2 changes: 1 addition & 1 deletion kwok/tools/gen_instance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func constructGenericInstanceTypes() []kwok.InstanceTypeOptions {
corev1.NodeSelectorRequirement{Key: corev1.LabelTopologyZone, Operator: corev1.NodeSelectorOpIn, Values: []string{zone}},
},
Offering: cloudprovider.Offering{
Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price),
Available: true,
Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price),
},
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
ArchitectureArm64 = "arm64"
CapacityTypeSpot = "spot"
CapacityTypeOnDemand = "on-demand"
CapacityTypeReserved = "reserved"
)

// Karpenter specific domains and labels
Expand Down
57 changes: 44 additions & 13 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"github.com/awslabs/operatorpkg/option"
"github.com/awslabs/operatorpkg/status"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
Expand All @@ -40,6 +41,10 @@ import (
"sigs.k8s.io/karpenter/pkg/utils/resources"
)

func init() {
v1.WellKnownLabels = v1.WellKnownLabels.Insert(v1alpha1.LabelReservationID)
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)

type CloudProvider struct {
Expand All @@ -61,14 +66,17 @@ type CloudProvider struct {
Drifted cloudprovider.DriftReason
NodeClassGroupVersionKind []schema.GroupVersionKind
RepairPolicy []cloudprovider.RepairPolicy

ReservationManagerProvider *ReservationManagerProvider
}

func NewCloudProvider() *CloudProvider {
return &CloudProvider{
AllowedCreateCalls: math.MaxInt,
CreatedNodeClaims: map[string]*v1.NodeClaim{},
InstanceTypesForNodePool: map[string][]*cloudprovider.InstanceType{},
ErrorsForNodePool: map[string]error{},
AllowedCreateCalls: math.MaxInt,
CreatedNodeClaims: map[string]*v1.NodeClaim{},
InstanceTypesForNodePool: map[string][]*cloudprovider.InstanceType{},
ErrorsForNodePool: map[string]error{},
ReservationManagerProvider: NewReservationManagerProvider(),
}
}

Expand Down Expand Up @@ -102,6 +110,7 @@ func (c *CloudProvider) Reset() {
TolerationDuration: 30 * time.Minute,
},
}
c.ReservationManagerProvider.Reset()
}

func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1.NodeClaim, error) {
Expand Down Expand Up @@ -139,14 +148,19 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v
labels[key] = requirement.Values()[0]
}
}
// Find Offering
for _, o := range instanceType.Offerings.Available() {
if reqs.IsCompatible(o.Requirements, scheduling.AllowUndefinedWellKnownLabels) {
labels[corev1.LabelTopologyZone] = o.Requirements.Get(corev1.LabelTopologyZone).Any()
labels[v1.CapacityTypeLabelKey] = o.Requirements.Get(v1.CapacityTypeLabelKey).Any()
break
// Find offering, prioritizing reserved instances
offering := func() cloudprovider.Offering {
offerings := instanceType.Offerings.Available().Compatible(reqs)
lo.Must0(len(offerings) != 0, "created nodeclaim with no available offerings")
if reservedOfferings := offerings.WithCapacityType(v1.CapacityTypeReserved); len(reservedOfferings) != 0 {
c.ReservationManagerProvider.create(reservedOfferings[0].Requirements.Get(v1alpha1.LabelReservationID).Any())
return reservedOfferings[0]
}
}
return offerings[0]
}()
labels[corev1.LabelTopologyZone] = offering.Requirements.Get(corev1.LabelTopologyZone).Any()
labels[v1.CapacityTypeLabelKey] = offering.Requirements.Get(v1.CapacityTypeLabelKey).Any()

created := &v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: nodeClaim.Name,
Expand Down Expand Up @@ -189,7 +203,8 @@ func (c *CloudProvider) List(_ context.Context) ([]*v1.NodeClaim, error) {
}), nil
}

func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
// Note: this fake implementation does **not** support availability snapshots. The burden of testing snapshot support should be on the cloudprovider implementation.
func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
if np != nil {
if err, ok := c.ErrorsForNodePool[np.Name]; ok {
return nil, err
Expand All @@ -200,7 +215,23 @@ func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool) ([]
}
}
if c.InstanceTypes != nil {
return c.InstanceTypes, nil
return lo.Map(c.InstanceTypes, func(it *cloudprovider.InstanceType, _ int) *cloudprovider.InstanceType {
for i := range it.Offerings {
if it.Offerings[i].Requirements.Get(v1.CapacityTypeLabelKey).Any() != v1.CapacityTypeReserved {
continue
}
lo.Must0(
it.Offerings[i].Requirements.Has(v1alpha1.LabelReservationID),
"reserved offering for instance type %s must define requirement for label %s",
it.Name,
v1alpha1.LabelReservationID,
)
reservationID := it.Offerings[i].Requirements.Get(v1alpha1.LabelReservationID).Any()
it.Offerings[i].ReservationManager = c.ReservationManagerProvider.ReservationManager(reservationID, opts...)
it.Offerings[i].Available = c.ReservationManagerProvider.Capacity(reservationID) > 0
}
return it
}), nil
}
return []*cloudprovider.InstanceType{
NewInstanceType(InstanceTypeOptions{
Expand Down
64 changes: 42 additions & 22 deletions pkg/cloudprovider/fake/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,46 @@ func NewInstanceTypeWithCustomRequirement(options InstanceTypeOptions, customReq
}
if len(options.Offerings) == 0 {
options.Offerings = []cloudprovider.Offering{
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-1",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-2",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-1",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-2",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-3",
}), Price: PriceFromResources(options.Resources), Available: true},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-1",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-2",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-1",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-2",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-3",
}),
Price: PriceFromResources(options.Resources),
},
}
}
if len(options.Architecture) == 0 {
Expand Down Expand Up @@ -153,12 +173,12 @@ func InstanceTypesAssorted() []*cloudprovider.InstanceType {
price := PriceFromResources(opts.Resources)
opts.Offerings = []cloudprovider.Offering{
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: ct,
corev1.LabelTopologyZone: zone,
}),
Price: price,
Available: true,
Price: price,
},
}
instanceTypes = append(instanceTypes, NewInstanceType(opts))
Expand Down
119 changes: 119 additions & 0 deletions pkg/cloudprovider/fake/reservationmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fake

import (
"maps"

"github.com/awslabs/operatorpkg/option"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"

"sigs.k8s.io/karpenter/pkg/cloudprovider"
)

type ReservationManagerProvider struct {
snapshots map[types.UID]*snapshot
capacity map[string]int // map[offering name]total capacity
}

type snapshot struct {
reservations map[string]sets.Set[string] // map[reservation id]set[offering name]
capacity map[string]int
}

func NewReservationManagerProvider() *ReservationManagerProvider {
return &ReservationManagerProvider{
snapshots: map[types.UID]*snapshot{},
capacity: map[string]int{},
}
}

// SetCapacity sets the total number of instances available for a given reservationID. This value will be decremented
// internally each time an instance is launched for the given reservationID.
func (p *ReservationManagerProvider) SetCapacity(reservationID string, capacity int) {
p.capacity[reservationID] = capacity
}

// Capacity returns the total number of instances
func (p *ReservationManagerProvider) Capacity(reservationID string) int {
return p.capacity[reservationID]
}

// create decrements the availability for the given reservationID by one.
func (p *ReservationManagerProvider) create(reservationID string) {
lo.Must0(p.capacity[reservationID] > 0, "created an instance with an offering with no availability")
p.capacity[reservationID] -= 1
}

// getSnapshot returns an existing snapshot, if one exists for the given UUID, or creates a new one
func (p *ReservationManagerProvider) getSnapshot(uuid *types.UID) *snapshot {
if uuid != nil {
if snapshot, ok := p.snapshots[*uuid]; ok {
return snapshot
}
}
snapshot := &snapshot{
reservations: map[string]sets.Set[string]{},
capacity: map[string]int{},
}
maps.Copy(snapshot.capacity, p.capacity)
if uuid != nil {
p.snapshots[*uuid] = snapshot
}
return snapshot
}

func (p *ReservationManagerProvider) Reset() {
*p = *NewReservationManagerProvider()
}

func (p *ReservationManagerProvider) ReservationManager(reservationID string, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) cloudprovider.ReservationManager {
return snapshotAdapter{
snapshot: p.getSnapshot(option.Resolve(opts...).AvailabilitySnapshotUUID),
reservationID: reservationID,
}
}

type snapshotAdapter struct {
*snapshot
reservationID string
}

func (a snapshotAdapter) Reserve(reservationID string) bool {
if reservations, ok := a.reservations[reservationID]; ok && reservations.Has(a.reservationID) {
return true
}
if a.capacity[a.reservationID] > 0 {
reservations, ok := a.reservations[reservationID]
if !ok {
reservations = sets.New[string]()
a.reservations[reservationID] = reservations
}
reservations.Insert(a.reservationID)
a.capacity[a.reservationID] -= 1
return true
}
return false
}

func (a snapshotAdapter) Release(reservationID string) {
if reservations, ok := a.reservations[reservationID]; ok && reservations.Has(a.reservationID) {
reservations.Delete(a.reservationID)
}
}
3 changes: 2 additions & 1 deletion pkg/cloudprovider/metrics/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

opmetrics "github.com/awslabs/operatorpkg/metrics"
"github.com/awslabs/operatorpkg/option"
"github.com/prometheus/client_golang/prometheus"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"

Expand Down Expand Up @@ -133,7 +134,7 @@ func (d *decorator) List(ctx context.Context) ([]*v1.NodeClaim, error) {
return nodeClaims, err
}

func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
method := "GetInstanceTypes"
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
instanceType, err := d.CloudProvider.GetInstanceTypes(ctx, nodePool)
Expand Down
Loading

0 comments on commit 0547af3

Please sign in to comment.