Skip to content

Commit

Permalink
feat: reserved capacity
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Feb 12, 2025
1 parent ff18b57 commit b67556e
Show file tree
Hide file tree
Showing 24 changed files with 737 additions and 183 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module sigs.k8s.io/karpenter

go 1.23.6
go 1.24.0

require (
github.com/Pallinder/go-randomdata v1.2.0
Expand Down
3 changes: 2 additions & 1 deletion kwok/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"strings"

"github.com/awslabs/operatorpkg/option"
"github.com/awslabs/operatorpkg/status"
"github.com/docker/docker/pkg/namesgenerator"
"github.com/samber/lo"
Expand Down Expand Up @@ -109,7 +110,7 @@ func (c CloudProvider) List(ctx context.Context) ([]*v1.NodeClaim, error) {
}

// Return the hard-coded instance types.
func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, _ ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
return c.instanceTypes, nil
}

Expand Down
4 changes: 2 additions & 2 deletions kwok/cloudprovider/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,11 @@ func newInstanceType(options InstanceTypeOptions) *cloudprovider.InstanceType {
Requirements: requirements,
Offerings: lo.Map(options.Offerings, func(off KWOKOffering, _ int) cloudprovider.Offering {
return cloudprovider.Offering{
ReservationManager: off.Offering.ReservationManager,
Requirements: scheduling.NewRequirements(lo.Map(off.Requirements, func(req corev1.NodeSelectorRequirement, _ int) *scheduling.Requirement {
return scheduling.NewRequirement(req.Key, req.Operator, req.Values...)
})...),
Price: off.Offering.Price,
Available: off.Offering.Available,
Price: off.Offering.Price,
}
}),
Capacity: options.Resources,
Expand Down
2 changes: 1 addition & 1 deletion kwok/tools/gen_instance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func constructGenericInstanceTypes() []kwok.InstanceTypeOptions {
corev1.NodeSelectorRequirement{Key: corev1.LabelTopologyZone, Operator: corev1.NodeSelectorOpIn, Values: []string{zone}},
},
Offering: cloudprovider.Offering{
Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price),
Available: true,
Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price),
},
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
ArchitectureArm64 = "arm64"
CapacityTypeSpot = "spot"
CapacityTypeOnDemand = "on-demand"
CapacityTypeReserved = "reserved"
)

// Karpenter specific domains and labels
Expand Down
57 changes: 44 additions & 13 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"github.com/awslabs/operatorpkg/option"
"github.com/awslabs/operatorpkg/status"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
Expand All @@ -40,6 +41,10 @@ import (
"sigs.k8s.io/karpenter/pkg/utils/resources"
)

func init() {
v1.WellKnownLabels = v1.WellKnownLabels.Insert(v1alpha1.LabelReservationID)
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)

type CloudProvider struct {
Expand All @@ -61,14 +66,17 @@ type CloudProvider struct {
Drifted cloudprovider.DriftReason
NodeClassGroupVersionKind []schema.GroupVersionKind
RepairPolicy []cloudprovider.RepairPolicy

ReservationManagerProvider *ReservationManagerProvider
}

func NewCloudProvider() *CloudProvider {
return &CloudProvider{
AllowedCreateCalls: math.MaxInt,
CreatedNodeClaims: map[string]*v1.NodeClaim{},
InstanceTypesForNodePool: map[string][]*cloudprovider.InstanceType{},
ErrorsForNodePool: map[string]error{},
AllowedCreateCalls: math.MaxInt,
CreatedNodeClaims: map[string]*v1.NodeClaim{},
InstanceTypesForNodePool: map[string][]*cloudprovider.InstanceType{},
ErrorsForNodePool: map[string]error{},
ReservationManagerProvider: NewReservationManagerProvider(),
}
}

Expand Down Expand Up @@ -102,6 +110,7 @@ func (c *CloudProvider) Reset() {
TolerationDuration: 30 * time.Minute,
},
}
c.ReservationManagerProvider.Reset()
}

func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1.NodeClaim, error) {
Expand Down Expand Up @@ -139,14 +148,19 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v
labels[key] = requirement.Values()[0]
}
}
// Find Offering
for _, o := range instanceType.Offerings.Available() {
if reqs.IsCompatible(o.Requirements, scheduling.AllowUndefinedWellKnownLabels) {
labels[corev1.LabelTopologyZone] = o.Requirements.Get(corev1.LabelTopologyZone).Any()
labels[v1.CapacityTypeLabelKey] = o.Requirements.Get(v1.CapacityTypeLabelKey).Any()
break
// Find offering, prioritizing reserved instances
offering := func() cloudprovider.Offering {
offerings := instanceType.Offerings.Available().Compatible(reqs)
lo.Must0(len(offerings) != 0, "created nodeclaim with no available offerings")
if reservedOfferings := offerings.WithCapacityType(v1.CapacityTypeReserved); len(reservedOfferings) != 0 {
c.ReservationManagerProvider.create(reservedOfferings[0].Requirements.Get(v1alpha1.LabelReservationID).Any())
return reservedOfferings[0]
}
}
return offerings[0]
}()
labels[corev1.LabelTopologyZone] = offering.Requirements.Get(corev1.LabelTopologyZone).Any()
labels[v1.CapacityTypeLabelKey] = offering.Requirements.Get(v1.CapacityTypeLabelKey).Any()

created := &v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: nodeClaim.Name,
Expand Down Expand Up @@ -189,7 +203,8 @@ func (c *CloudProvider) List(_ context.Context) ([]*v1.NodeClaim, error) {
}), nil
}

func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
// Note: this fake implementation does **not** support availability snapshots. The burden of testing snapshot support should be on the cloudprovider implementation.
func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
if np != nil {
if err, ok := c.ErrorsForNodePool[np.Name]; ok {
return nil, err
Expand All @@ -200,7 +215,23 @@ func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool) ([]
}
}
if c.InstanceTypes != nil {
return c.InstanceTypes, nil
return lo.Map(c.InstanceTypes, func(it *cloudprovider.InstanceType, _ int) *cloudprovider.InstanceType {
for i := range it.Offerings {
if it.Offerings[i].Requirements.Get(v1.CapacityTypeLabelKey).Any() != v1.CapacityTypeReserved {
continue
}
lo.Must0(
it.Offerings[i].Requirements.Has(v1alpha1.LabelReservationID),
"reserved offering for instance type %s must define requirement for label %s",
it.Name,
v1alpha1.LabelReservationID,
)
reservationID := it.Offerings[i].Requirements.Get(v1alpha1.LabelReservationID).Any()
it.Offerings[i].ReservationManager = c.ReservationManagerProvider.ReservationManager(reservationID, opts...)
it.Offerings[i].Available = c.ReservationManagerProvider.Capacity(reservationID) > 0
}
return it
}), nil
}
return []*cloudprovider.InstanceType{
NewInstanceType(InstanceTypeOptions{
Expand Down
64 changes: 42 additions & 22 deletions pkg/cloudprovider/fake/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,46 @@ func NewInstanceTypeWithCustomRequirement(options InstanceTypeOptions, customReq
}
if len(options.Offerings) == 0 {
options.Offerings = []cloudprovider.Offering{
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-1",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-2",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-1",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-2",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-3",
}), Price: PriceFromResources(options.Resources), Available: true},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-1",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-2",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-1",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-2",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-3",
}),
Price: PriceFromResources(options.Resources),
},
}
}
if len(options.Architecture) == 0 {
Expand Down Expand Up @@ -153,12 +173,12 @@ func InstanceTypesAssorted() []*cloudprovider.InstanceType {
price := PriceFromResources(opts.Resources)
opts.Offerings = []cloudprovider.Offering{
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: ct,
corev1.LabelTopologyZone: zone,
}),
Price: price,
Available: true,
Price: price,
},
}
instanceTypes = append(instanceTypes, NewInstanceType(opts))
Expand Down
105 changes: 105 additions & 0 deletions pkg/cloudprovider/fake/reservationmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package fake

import (
"maps"

"github.com/awslabs/operatorpkg/option"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
)

type ReservationManagerProvider struct {
snapshots map[types.UID]*snapshot
capacity map[string]int // map[offering name]total capacity
}

type snapshot struct {
reservations map[string]sets.Set[string] // map[reservation id]set[offering name]
capacity map[string]int
}

func NewReservationManagerProvider() *ReservationManagerProvider {
return &ReservationManagerProvider{
snapshots: map[types.UID]*snapshot{},
capacity: map[string]int{},
}
}

// SetCapacity sets the total number of instances available for a given reservationID. This value will be decremented
// internally each time an instance is launched for the given reservationID.
func (p *ReservationManagerProvider) SetCapacity(reservationID string, capacity int) {
p.capacity[reservationID] = capacity
}

// Capacity returns the total number of instances
func (p *ReservationManagerProvider) Capacity(reservationID string) int {
return p.capacity[reservationID]
}

// create decrements the availability for the given reservationID by one.
func (p *ReservationManagerProvider) create(reservationID string) {
lo.Must0(p.capacity[reservationID] > 0, "created an instance with an offering with no availability")
p.capacity[reservationID] -= 1
if p.capacity[reservationID] == 0 {

}
}

// getSnapshot returns an existing snapshot, if one exists for the given UUID, or creates a new one
func (p *ReservationManagerProvider) getSnapshot(uuid *types.UID) *snapshot {
if uuid != nil {
if snapshot, ok := p.snapshots[*uuid]; ok {
return snapshot
}
}
snapshot := &snapshot{
reservations: map[string]sets.Set[string]{},
capacity: map[string]int{},
}
maps.Copy(snapshot.capacity, p.capacity)
if uuid != nil {
p.snapshots[*uuid] = snapshot
}
return snapshot
}

func (p *ReservationManagerProvider) Reset() {
*p = *NewReservationManagerProvider()
}

func (p *ReservationManagerProvider) ReservationManager(reservationID string, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) cloudprovider.ReservationManager {
return snapshotAdapter{
snapshot: p.getSnapshot(option.Resolve(opts...).AvailabilitySnapshotUUID),
reservationID: reservationID,
}
}

type snapshotAdapter struct {
*snapshot
reservationID string
}

func (a snapshotAdapter) Reserve(reservationID string) bool {
if reservations, ok := a.reservations[reservationID]; ok && reservations.Has(a.reservationID) {
return true
}
if a.capacity[a.reservationID] > 0 {
reservations, ok := a.reservations[reservationID]
if !ok {
reservations = sets.New[string]()
a.reservations[reservationID] = reservations
}
reservations.Insert(a.reservationID)
a.capacity[a.reservationID] -= 1
return true
}
return false
}

func (a snapshotAdapter) Release(reservationID string) {
if reservations, ok := a.reservations[reservationID]; ok && reservations.Has(a.reservationID) {
reservations.Delete(a.reservationID)
}
}
3 changes: 2 additions & 1 deletion pkg/cloudprovider/metrics/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

opmetrics "github.com/awslabs/operatorpkg/metrics"
"github.com/awslabs/operatorpkg/option"
"github.com/prometheus/client_golang/prometheus"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"

Expand Down Expand Up @@ -133,7 +134,7 @@ func (d *decorator) List(ctx context.Context) ([]*v1.NodeClaim, error) {
return nodeClaims, err
}

func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
method := "GetInstanceTypes"
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
instanceType, err := d.CloudProvider.GetInstanceTypes(ctx, nodePool)
Expand Down
Loading

0 comments on commit b67556e

Please sign in to comment.