Skip to content

Commit

Permalink
review, consolidation updates, and testing
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Feb 20, 2025
1 parent 94377e1 commit 290a7dc
Show file tree
Hide file tree
Showing 10 changed files with 501 additions and 126 deletions.
41 changes: 26 additions & 15 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
func init() {
v1.WellKnownLabels = v1.WellKnownLabels.Insert(v1alpha1.LabelReservationID)
cloudprovider.ReservationIDLabel = v1alpha1.LabelReservationID
cloudprovider.ReservedCapacityPriceFactor = 1.0 / 1_000_000.0
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)
Expand Down Expand Up @@ -109,6 +110,7 @@ func (c *CloudProvider) Reset() {
}
}

//nolint:gocyclo
func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1.NodeClaim, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -126,9 +128,16 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v
reqs := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...)
np := &v1.NodePool{ObjectMeta: metav1.ObjectMeta{Name: nodeClaim.Labels[v1.NodePoolLabelKey]}}
instanceTypes := lo.Filter(lo.Must(c.GetInstanceTypes(ctx, np)), func(i *cloudprovider.InstanceType, _ int) bool {
return reqs.IsCompatible(i.Requirements, scheduling.AllowUndefinedWellKnownLabels) &&
i.Offerings.Available().HasCompatible(reqs) &&
resources.Fits(nodeClaim.Spec.Resources.Requests, i.Allocatable())
if !reqs.IsCompatible(i.Requirements, scheduling.AllowUndefinedWellKnownLabels) {
return false
}
if !i.Offerings.Available().HasCompatible(reqs) {
return false
}
if !resources.Fits(nodeClaim.Spec.Resources.Requests, i.Allocatable()) {
return false
}
return true
})
// Order instance types so that we get the cheapest instance types of the available offerings
sort.Slice(instanceTypes, func(i, j int) bool {
Expand All @@ -145,20 +154,22 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v
}
}
// Find offering, prioritizing reserved instances
offering := func() *cloudprovider.Offering {
offerings := instanceType.Offerings.Available().Compatible(reqs)
lo.Must0(len(offerings) != 0, "created nodeclaim with no available offerings")
for _, o := range offerings {
if o.CapacityType() == v1.CapacityTypeReserved {
o.ReservationCapacity -= 1
if o.ReservationCapacity == 0 {
o.Available = false
}
return o
var offering *cloudprovider.Offering
offerings := instanceType.Offerings.Available().Compatible(reqs)
lo.Must0(len(offerings) != 0, "created nodeclaim with no available offerings")
for _, o := range offerings {
if o.CapacityType() == v1.CapacityTypeReserved {
o.ReservationCapacity -= 1
if o.ReservationCapacity == 0 {
o.Available = false
}
offering = o
break
}
return offerings[0]
}()
}
if offering == nil {
offering = offerings[0]
}
// Propagate labels dictated by offering requirements - e.g. zone, capacity-type, and reservation-id
for _, req := range offering.Requirements {
labels[req.Key] = req.Any()
Expand Down
22 changes: 8 additions & 14 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ var (
// reservation. For example, a reservation could be shared across multiple NodePools, and the value encoded in this
// requirement is used to inform the scheduler that a reservation for one should affect the other.
ReservationIDLabel string

// ReservedCapacityPriceFactor is a constant which should be applied when determining the cost of a reserved offering,
// if unavailable from `GetInstanceTypes`.
ReservedCapacityPriceFactor float64
)

type DriftReason string
Expand Down Expand Up @@ -244,20 +248,6 @@ func (i InstanceTypeOverhead) Total() corev1.ResourceList {
return resources.Merge(i.KubeReserved, i.SystemReserved, i.EvictionThreshold)
}

// ReservationManager is used to track the availability of a reserved offering over the course of a scheduling
// simulation. Reserved offerings may have a limited number of available instances associated with them,
// This is exposed as an interface for cloudprovider's to implement to give flexibility when dealing with separate
// offerings with associated availablility.
type ReservationManager interface {
// Reserve takes a unique identifier for a reservation, and returns a boolean indicating if the reservation was
// successful. Reserve should be idempotent, i.e. multiple calls with the same reservation ID should only count for a
// single reservation.
Reserve(string) bool
// Release takes a unique identifier for a reservation, and should discard any matching reservations. If no
// reservations exist for the given id, release should be a no-op.
Release(string)
}

// An Offering describes where an InstanceType is available to be used, with the expectation that its properties
// may be tightly coupled (e.g. the availability of an instance type in some zone is scoped to a capacity type) and
// these properties are captured with labels in Requirements.
Expand All @@ -273,6 +263,10 @@ func (o *Offering) CapacityType() string {
return o.Requirements.Get(v1.CapacityTypeLabelKey).Any()
}

func (o *Offering) Zone() string {
return o.Requirements.Get(corev1.LabelTopologyZone).Any()
}

func (o *Offering) ReservationID() string {
return o.Requirements.Get(ReservationIDLabel).Any()
}
Expand Down
24 changes: 19 additions & 5 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
// We are consolidating a node from OD -> [OD,Spot] but have filtered the instance types by cost based on the
// assumption, that the spot variant will launch. We also need to add a requirement to the node to ensure that if
// spot capacity is insufficient we don't replace the node with a more expensive on-demand node. Instead the launch
// should fail and we'll just leave the node alone.
// should fail and we'll just leave the node alone. We don't need to do the same for reserved since the requirements
// are injected on by the scheduler.
ctReq := results.NewNodeClaims[0].Requirements.Get(v1.CapacityTypeLabelKey)
if ctReq.Has(v1.CapacityTypeSpot) && ctReq.Has(v1.CapacityTypeOnDemand) {
results.NewNodeClaims[0].Requirements.Add(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeSpot))
Expand Down Expand Up @@ -307,11 +308,24 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
func getCandidatePrices(candidates []*Candidate) (float64, error) {
var price float64
for _, c := range candidates {
compatibleOfferings := c.instanceType.Offerings.Compatible(scheduling.NewLabelRequirements(c.StateNode.Labels()))
if len(compatibleOfferings) == 0 {
return 0.0, fmt.Errorf("unable to determine offering for %s/%s/%s", c.instanceType.Name, c.capacityType, c.zone)
var compatibleOfferings cloudprovider.Offerings
reservedFallback := false
reqs := scheduling.NewLabelRequirements(c.StateNode.Labels())
for {
compatibleOfferings = c.instanceType.Offerings.Compatible(reqs)
if len(compatibleOfferings) != 0 {
break
}
if c.capacityType != v1.CapacityTypeReserved {
return 0.0, fmt.Errorf("unable to determine offering for %s/%s/%s", c.instanceType.Name, c.capacityType, c.zone)
}
// If there are no compatible offerings, but the capacity type for the candidate is reserved, we can fall-back to
// the on-demand offering to derive pricing.
reqs[v1.CapacityTypeLabelKey] = scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeOnDemand)
delete(reqs, cloudprovider.ReservationIDLabel)
reservedFallback = true
}
price += compatibleOfferings.Cheapest().Price
price += compatibleOfferings.Cheapest().Price * lo.Ternary(reservedFallback, cloudprovider.ReservedCapacityPriceFactor, 1.0)
}
return price, nil
}
13 changes: 8 additions & 5 deletions pkg/controllers/disruption/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,22 @@ func (c Command) String() string {
}
odNodeClaims := 0
spotNodeClaims := 0
reservedNodeClaims := 0
for _, nodeClaim := range c.replacements {
ct := nodeClaim.Requirements.Get(v1.CapacityTypeLabelKey)
if ct.Has(v1.CapacityTypeOnDemand) {
switch {
case ct.Has(v1.CapacityTypeOnDemand):
odNodeClaims++
}
if ct.Has(v1.CapacityTypeSpot) {
case ct.Has(v1.CapacityTypeSpot):
spotNodeClaims++
case ct.Has(v1.CapacityTypeReserved):
reservedNodeClaims++
}
}
// Print list of instance types for the first replacements.
if len(c.replacements) > 1 {
fmt.Fprintf(&buf, " and replacing with %d spot and %d on-demand, from types %s",
spotNodeClaims, odNodeClaims,
fmt.Fprintf(&buf, " and replacing with %d spot, %d on-demand, and %d reserved, from types %s",
spotNodeClaims, odNodeClaims, reservedNodeClaims,
scheduling.InstanceTypeList(c.replacements[0].InstanceTypeOptions))
return buf.String()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
return scheduler.Results{}, nil
}
log.FromContext(ctx).V(1).WithValues("pending-pods", len(pendingPods), "deleting-pods", len(deletingNodePods)).Info("computing scheduling decision for provisionable pod(s)")
s, err := p.NewScheduler(ctx, pods, nodes.Active(), scheduler.DisableReservedFallback)
s, err := p.NewScheduler(ctx, pods, nodes.Active(), scheduler.DisableReservedCapacityFallback)
if err != nil {
if errors.Is(err, ErrNodePoolsNotFound) {
log.FromContext(ctx).Info("no nodepools found")
Expand All @@ -318,7 +318,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
}
results := s.Solve(ctx, pods).TruncateInstanceTypes(scheduler.MaxInstanceTypes)
if len(results.ReservedOfferingErrors) != 0 {
log.FromContext(ctx).V(1).WithValues("Pods", pretty.Slice(lo.Map(pods, func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5)).Info("deferring scheduling decision for provisionable pod(s) to future simulation due to limited reserved offering capacity")
log.FromContext(ctx).V(1).WithValues("Pods", pretty.Slice(lo.Map(lo.Keys(results.ReservedOfferingErrors), func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5)).Info("deferring scheduling decision for provisionable pod(s) to future simulation due to limited reserved offering capacity")
}
scheduler.UnschedulablePodsCount.Set(float64(len(results.PodErrors)), map[string]string{scheduler.ControllerLabel: injection.GetControllerName(ctx)})
if len(results.NewNodeClaims) > 0 {
Expand Down
59 changes: 31 additions & 28 deletions pkg/controllers/provisioning/scheduling/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (n *NodeClaim) Add(pod *corev1.Pod, podData *PodData) error {
return err
}

reservedOfferings, offeringsToRelease, err := n.reserveOfferings(remaining, nodeClaimRequirements)
reservedOfferings, err := n.reserveOfferings(remaining, nodeClaimRequirements)
if err != nil {
return err
}
Expand All @@ -148,13 +148,29 @@ func (n *NodeClaim) Add(pod *corev1.Pod, podData *PodData) error {
n.Requirements = nodeClaimRequirements
n.topology.Record(pod, n.NodeClaim.Spec.Taints, nodeClaimRequirements, scheduling.AllowUndefinedWellKnownLabels)
n.hostPortUsage.Add(pod, hostPorts)
n.releaseReservedOfferings(n.reservedOfferings, reservedOfferings)
n.reservedOfferings = reservedOfferings
for _, o := range offeringsToRelease {
n.reservationManager.Release(n.hostname, o)
}
return nil
}

// releaseReservedOfferings releases all offerings which are present in the current reserved offerings, but are not
// present in the updated reserved offerings.
func (n *NodeClaim) releaseReservedOfferings(current, updated map[string]cloudprovider.Offerings) {
updatedIDs := sets.New[string]()
for _, ofs := range updated {
for _, o := range ofs {
updatedIDs.Insert(o.ReservationID())
}
}
for _, ofs := range current {
for _, o := range ofs {
if !updatedIDs.Has(o.ReservationID()) {
n.reservationManager.Release(n.hostname, o)
}
}
}
}

// reserveOfferings handles the reservation of `karpenter.sh/capacity-type: reserved` offerings, returning the reserved
// offerings in a map from instance type name to offerings. Additionally, a slice of offerings to release is returned.
// This is based on the previously reserved offerings which are no longer compatible with the nodeclaim. These should
Expand All @@ -163,19 +179,17 @@ func (n *NodeClaim) Add(pod *corev1.Pod, podData *PodData) error {
func (n *NodeClaim) reserveOfferings(
instanceTypes []*cloudprovider.InstanceType,
nodeClaimRequirements scheduling.Requirements,
) (reservedOfferings map[string]cloudprovider.Offerings, offeringsToRelease []*cloudprovider.Offering, err error) {
compatibleReservedInstanceTypes := sets.New[string]()
reservedOfferings = map[string]cloudprovider.Offerings{}
) (map[string]cloudprovider.Offerings, error) {
hasCompatibleOffering := false
reservedOfferings := map[string]cloudprovider.Offerings{}
for _, it := range instanceTypes {
hasCompatibleOffering := false
for _, o := range it.Offerings {
if o.CapacityType() != v1.CapacityTypeReserved || !o.Available {
continue
}
// Track every incompatible reserved offering for release. Since releasing a reservation is a no-op when there is no
// reservation for the given host, there's no need to check that a reservation actually exists for the offering.
if !nodeClaimRequirements.IsCompatible(o.Requirements, scheduling.AllowUndefinedWellKnownLabels) {
offeringsToRelease = append(offeringsToRelease, o)
continue
}
hasCompatibleOffering = true
Expand All @@ -186,35 +200,24 @@ func (n *NodeClaim) reserveOfferings(
reservedOfferings[it.Name] = append(reservedOfferings[it.Name], o)
}
}
if hasCompatibleOffering {
compatibleReservedInstanceTypes.Insert(it.Name)
}
}

if n.reservedOfferingMode == ReservedOfferingModeStrict {
// If an instance type with a compatible reserved offering exists, but we failed to make any reservations, we should
// fail. We include this check due to the pessimistic nature of instance reservation - we have to reserve each potential
// offering for every nodeclaim, since we don't know what we'll launch with. Without this check we would fall back to
// on-demand / spot even when there's sufficient reserved capacity. This check means we may fail to schedule the pod
// during this scheduling simulation, but with the possibility of success on subsequent simulations.
// Note: while this can occur both on initial creation and on
if len(compatibleReservedInstanceTypes) != 0 && len(reservedOfferings) == 0 {
return nil, nil, NewReservedOfferingError(fmt.Errorf("one or more instance types with compatible reserved offerings are available, but could not be reserved"))
// fail. This could occur when all of the capacity for compatible instances has been reserved by previously created
// nodeclaims. Since we reserve offering pessimistically, i.e. we will reserve any offering that the instance could
// be launched with, we should fall back and attempt to schedule this pod in a subsequent scheduling simulation once
// reservation capacity is available again.
if hasCompatibleOffering && len(reservedOfferings) == 0 {
return nil, NewReservedOfferingError(fmt.Errorf("one or more instance types with compatible reserved offerings are available, but could not be reserved"))
}
// If the nodeclaim previously had compatible reserved offerings, but the additional requirements filtered those out,
// we should fail to add the pod to this nodeclaim.
if len(n.reservedOfferings) != 0 && len(reservedOfferings) == 0 {
return nil, nil, NewReservedOfferingError(fmt.Errorf("satisfying updated nodeclaim constraints would remove all compatible reserved offering options"))
}
}
// Ensure we release any offerings for instance types which are no longer compatible with nodeClaimRequirements
for instanceName, offerings := range n.reservedOfferings {
if compatibleReservedInstanceTypes.Has(instanceName) {
continue
return nil, NewReservedOfferingError(fmt.Errorf("satisfying updated nodeclaim constraints would remove all compatible reserved offering options"))
}
offeringsToRelease = append(offeringsToRelease, offerings...)
}
return reservedOfferings, offeringsToRelease, nil
return reservedOfferings, nil
}

func (n *NodeClaim) Destroy() {
Expand Down
Loading

0 comments on commit 290a7dc

Please sign in to comment.