diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index e8d190b7eb..0666ef7542 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -42,6 +42,7 @@ import ( func init() { v1.WellKnownLabels = v1.WellKnownLabels.Insert(v1alpha1.LabelReservationID) + cloudprovider.ReservationIDLabel = v1alpha1.LabelReservationID } var _ cloudprovider.CloudProvider = (*CloudProvider)(nil) diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index fda2f46e71..de38374b21 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -38,6 +38,11 @@ import ( var ( SpotRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeSpot)) OnDemandRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeOnDemand)) + + // ReservationIDLabel is a label injected into a reserved offering's requirements which is used to uniquely identify a + // reservation. For example, a reservation could be shared across multiple NodePools, and the value encoded in this + // requirement is used to inform the scheduler that a reservation for one should affect the other. + ReservationIDLabel string ) type DriftReason string @@ -258,18 +263,20 @@ type ReservationManager interface { // these properties are captured with labels in Requirements. // Requirements are required to contain the keys v1.CapacityTypeLabelKey and corev1.LabelTopologyZone. type Offering struct { - ReservationID string + Requirements scheduling.Requirements + Price float64 + Available bool ReservationCapacity int - - Requirements scheduling.Requirements - Price float64 - Available bool } func (o *Offering) CapacityType() string { return o.Requirements.Get(v1.CapacityTypeLabelKey).Any() } +func (o *Offering) ReservationID() string { + return o.Requirements.Get(ReservationIDLabel).Any() +} + type Offerings []*Offering // Available filters the available offerings from the returned offerings diff --git a/pkg/controllers/provisioning/scheduling/nodeclaim.go b/pkg/controllers/provisioning/scheduling/nodeclaim.go index a77dfd95e1..4ddcf6e724 100644 --- a/pkg/controllers/provisioning/scheduling/nodeclaim.go +++ b/pkg/controllers/provisioning/scheduling/nodeclaim.go @@ -230,6 +230,17 @@ func (n *NodeClaim) FinalizeScheduling() { // We need nodes to have hostnames for topology purposes, but we don't want to pass that node name on to consumers // of the node as it will be displayed in error messages delete(n.Requirements, corev1.LabelHostname) + // If there are any reserved offerings tracked, inject those requirements onto the NodeClaim. This ensures that if + // there are multiple reserved offerings for an instance type, we don't attempt to overlaunch into a single offering. + if len(n.reservedOfferings) != 0 { + n.Requirements.Add(scheduling.NewRequirement( + cloudprovider.ReservationIDLabel, + corev1.NodeSelectorOpIn, + lo.FlatMap(lo.Values(n.reservedOfferings), func(ofs cloudprovider.Offerings, _ int) []string { + return lo.Map(ofs, func(o *cloudprovider.Offering, _ int) string { return o.ReservationID() }) + })..., + )) + } } func (n *NodeClaim) RemoveInstanceTypeOptionsByPriceAndMinValues(reqs scheduling.Requirements, maxPrice float64) (*NodeClaim, error) { diff --git a/pkg/controllers/provisioning/scheduling/reservationmanager.go b/pkg/controllers/provisioning/scheduling/reservationmanager.go index 59fd758d09..357154b9ee 100644 --- a/pkg/controllers/provisioning/scheduling/reservationmanager.go +++ b/pkg/controllers/provisioning/scheduling/reservationmanager.go @@ -41,8 +41,8 @@ func NewReservationManager(instanceTypes map[string][]*cloudprovider.InstanceTyp // If we have multiple offerings with the same reservation ID, track the one with the least capacity. This could be // the result of multiple nodepools referencing the same capacity reservation, and there being an update to the // capacity between calls to GetInstanceTypes. - if current, ok := capacity[o.ReservationID]; !ok || current > o.ReservationCapacity { - capacity[o.ReservationID] = o.ReservationCapacity + if current, ok := capacity[o.ReservationID()]; !ok || current > o.ReservationCapacity { + capacity[o.ReservationID()] = o.ReservationCapacity } } } @@ -55,31 +55,31 @@ func NewReservationManager(instanceTypes map[string][]*cloudprovider.InstanceTyp func (rm *ReservationManager) Reserve(hostname string, offering *cloudprovider.Offering) bool { reservations, ok := rm.reservations[hostname] - if ok && reservations.Has(offering.ReservationID) { + if ok && reservations.Has(offering.ReservationID()) { return true } if !ok { reservations = sets.New[string]() rm.reservations[hostname] = reservations } - capacity, ok := rm.capacity[offering.ReservationID] + capacity, ok := rm.capacity[offering.ReservationID()] if !ok { // Note: this panic should never occur, and would indicate a serious bug in the scheduling code. - panic(fmt.Sprintf("attempted to reserve non-existent offering with reservation id %q", offering.ReservationID)) + panic(fmt.Sprintf("attempted to reserve non-existent offering with reservation id %q", offering.ReservationID())) } if capacity == 0 { return false } - rm.capacity[offering.ReservationID] -= 1 - reservations.Insert(offering.ReservationID) + rm.capacity[offering.ReservationID()] -= 1 + reservations.Insert(offering.ReservationID()) return true } func (rm *ReservationManager) Release(hostname string, offerings ...*cloudprovider.Offering) { for _, o := range offerings { - if reservations, ok := rm.reservations[hostname]; ok && reservations.Has(o.ReservationID) { - reservations.Delete(o.ReservationID) - rm.capacity[o.ReservationID] += 1 + if reservations, ok := rm.reservations[hostname]; ok && reservations.Has(o.ReservationID()) { + reservations.Delete(o.ReservationID()) + rm.capacity[o.ReservationID()] += 1 } } } diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index cc22ef0097..014b4ebc4c 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -3806,9 +3806,7 @@ var _ = Context("Scheduling", func() { } reservedInstanceTypes := []*cloudprovider.InstanceType{cloudProvider.InstanceTypes[1], cloudProvider.InstanceTypes[2]} for _, it := range reservedInstanceTypes { - reservationID := fmt.Sprintf("r-%s", it.Name) it.Offerings = append(it.Offerings, &cloudprovider.Offering{ - ReservationID: reservationID, ReservationCapacity: 1, Available: true, Requirements: pscheduling.NewLabelRequirements(map[string]string{