Skip to content

Commit

Permalink
inject reservation requirements
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Feb 14, 2025
1 parent a978d85 commit 95834ee
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 18 deletions.
1 change: 0 additions & 1 deletion kwok/cloudprovider/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ func newInstanceType(options InstanceTypeOptions) *cloudprovider.InstanceType {
Requirements: requirements,
Offerings: lo.Map(options.Offerings, func(off KWOKOffering, _ int) *cloudprovider.Offering {
return &cloudprovider.Offering{
ReservationID: off.ReservationID,
ReservationCapacity: off.ReservationCapacity,
Requirements: scheduling.NewRequirements(lo.Map(off.Requirements, func(req corev1.NodeSelectorRequirement, _ int) *scheduling.Requirement {
return scheduling.NewRequirement(req.Key, req.Operator, req.Values...)
Expand Down
1 change: 1 addition & 0 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

func init() {
v1.WellKnownLabels = v1.WellKnownLabels.Insert(v1alpha1.LabelReservationID)
cloudprovider.ReservationIDLabel = v1alpha1.LabelReservationID
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)
Expand Down
17 changes: 12 additions & 5 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ import (
var (
SpotRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeSpot))
OnDemandRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeOnDemand))

// ReservationIDLabel is a label injected into a reserved offering's requirements which is used to uniquely identify a
// reservation. For example, a reservation could be shared across multiple NodePools, and the value encoded in this
// requirement is used to inform the scheduler that a reservation for one should affect the other.
ReservationIDLabel string
)

type DriftReason string
Expand Down Expand Up @@ -258,18 +263,20 @@ type ReservationManager interface {
// these properties are captured with labels in Requirements.
// Requirements are required to contain the keys v1.CapacityTypeLabelKey and corev1.LabelTopologyZone.
type Offering struct {
ReservationID string
Requirements scheduling.Requirements
Price float64
Available bool
ReservationCapacity int

Requirements scheduling.Requirements
Price float64
Available bool
}

func (o *Offering) CapacityType() string {
return o.Requirements.Get(v1.CapacityTypeLabelKey).Any()
}

func (o *Offering) ReservationID() string {
return o.Requirements.Get(ReservationIDLabel).Any()
}

type Offerings []*Offering

// Available filters the available offerings from the returned offerings
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/provisioning/scheduling/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,17 @@ func (n *NodeClaim) FinalizeScheduling() {
// We need nodes to have hostnames for topology purposes, but we don't want to pass that node name on to consumers
// of the node as it will be displayed in error messages
delete(n.Requirements, corev1.LabelHostname)
// If there are any reserved offerings tracked, inject those requirements onto the NodeClaim. This ensures that if
// there are multiple reserved offerings for an instance type, we don't attempt to overlaunch into a single offering.
if len(n.reservedOfferings) != 0 {
n.Requirements.Add(scheduling.NewRequirement(
cloudprovider.ReservationIDLabel,
corev1.NodeSelectorOpIn,
lo.FlatMap(lo.Values(n.reservedOfferings), func(ofs cloudprovider.Offerings, _ int) []string {
return lo.Map(ofs, func(o *cloudprovider.Offering, _ int) string { return o.ReservationID() })
})...,
))
}
}

func (n *NodeClaim) RemoveInstanceTypeOptionsByPriceAndMinValues(reqs scheduling.Requirements, maxPrice float64) (*NodeClaim, error) {
Expand Down
20 changes: 10 additions & 10 deletions pkg/controllers/provisioning/scheduling/reservationmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func NewReservationManager(instanceTypes map[string][]*cloudprovider.InstanceTyp
// If we have multiple offerings with the same reservation ID, track the one with the least capacity. This could be
// the result of multiple nodepools referencing the same capacity reservation, and there being an update to the
// capacity between calls to GetInstanceTypes.
if current, ok := capacity[o.ReservationID]; !ok || current > o.ReservationCapacity {
capacity[o.ReservationID] = o.ReservationCapacity
if current, ok := capacity[o.ReservationID()]; !ok || current > o.ReservationCapacity {
capacity[o.ReservationID()] = o.ReservationCapacity
}
}
}
Expand All @@ -55,31 +55,31 @@ func NewReservationManager(instanceTypes map[string][]*cloudprovider.InstanceTyp

func (rm *ReservationManager) Reserve(hostname string, offering *cloudprovider.Offering) bool {
reservations, ok := rm.reservations[hostname]
if ok && reservations.Has(offering.ReservationID) {
if ok && reservations.Has(offering.ReservationID()) {
return true
}
if !ok {
reservations = sets.New[string]()
rm.reservations[hostname] = reservations
}
capacity, ok := rm.capacity[offering.ReservationID]
capacity, ok := rm.capacity[offering.ReservationID()]
if !ok {
// Note: this panic should never occur, and would indicate a serious bug in the scheduling code.
panic(fmt.Sprintf("attempted to reserve non-existent offering with reservation id %q", offering.ReservationID))
panic(fmt.Sprintf("attempted to reserve non-existent offering with reservation id %q", offering.ReservationID()))
}
if capacity == 0 {
return false
}
rm.capacity[offering.ReservationID] -= 1
reservations.Insert(offering.ReservationID)
rm.capacity[offering.ReservationID()] -= 1
reservations.Insert(offering.ReservationID())
return true
}

func (rm *ReservationManager) Release(hostname string, offerings ...*cloudprovider.Offering) {
for _, o := range offerings {
if reservations, ok := rm.reservations[hostname]; ok && reservations.Has(o.ReservationID) {
reservations.Delete(o.ReservationID)
rm.capacity[o.ReservationID] += 1
if reservations, ok := rm.reservations[hostname]; ok && reservations.Has(o.ReservationID()) {
reservations.Delete(o.ReservationID())
rm.capacity[o.ReservationID()] += 1
}
}
}
2 changes: 0 additions & 2 deletions pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3806,9 +3806,7 @@ var _ = Context("Scheduling", func() {
}
reservedInstanceTypes := []*cloudprovider.InstanceType{cloudProvider.InstanceTypes[1], cloudProvider.InstanceTypes[2]}
for _, it := range reservedInstanceTypes {
reservationID := fmt.Sprintf("r-%s", it.Name)
it.Offerings = append(it.Offerings, &cloudprovider.Offering{
ReservationID: reservationID,
ReservationCapacity: 1,
Available: true,
Requirements: pscheduling.NewLabelRequirements(map[string]string{
Expand Down

0 comments on commit 95834ee

Please sign in to comment.