diff --git a/karpenter/pkg/cloudproviders/nebius/cloudprovider.go b/karpenter/pkg/cloudproviders/nebius/cloudprovider.go index 9a0db20..0fdcfe8 100644 --- a/karpenter/pkg/cloudproviders/nebius/cloudprovider.go +++ b/karpenter/pkg/cloudproviders/nebius/cloudprovider.go @@ -178,12 +178,21 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v // of other resources (disk, nic, etc). So here we delete the created resource // in best effort when seeing quota error. // FIXME: use a better clean up helper to perform the clean up in background - // - // TODO: currently nebius doesn't provide a way for us to check if the capacity exists before real creation. - // This could result in deadlock case where the node claim is being repeatedly created and failed for the same - // instance type with quota issue. In such case, we are relying on the user to fix the quota or update the - // node pool to exclude the problematic instance type. In the future, we should consider implementing a proactive - // model to filter out these instance types internally and temporarily. + + // Record the failed instance type in the unavailable offerings cache + // so that ResolvePlatformPresetFromNodeClaim will skip this offering + // on subsequent attempts. The entry expires after a TTL, at which point + // the offering becomes eligible for selection again. + // NOTE: Nebius does not expose availability zones at this moment, + // so we use the region as the location dimension for the cache key. + c.instanceTypeProvider.UnavailableOfferings.MarkUnavailable( + ctx, + "QuotaFailure", + launchSettings.InstanceTypeName(), + nodeClass.Spec.Region, + launchSettings.CapacityType, + ) + go func() { cleanUpCtx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() diff --git a/karpenter/pkg/cloudproviders/nebius/instancetype/offerings.go b/karpenter/pkg/cloudproviders/nebius/instancetype/offerings.go index 5a3ea50..21eaff2 100644 --- a/karpenter/pkg/cloudproviders/nebius/instancetype/offerings.go +++ b/karpenter/pkg/cloudproviders/nebius/instancetype/offerings.go @@ -39,9 +39,10 @@ type Prices struct { // If the platform allows preemptible instances, a second offering with capacity type "spot" // is added (Karpenter uses "spot" as the generic term for preemptible/interruptible). // -// When the pricing lookup failed (prices.Err != nil), offerings are still created with -// the fallback default price but marked as unavailable so Karpenter will not schedule -// pods onto them. +// An offering is marked unavailable when: +// - the pricing lookup failed (prices.Err != nil), or +// - the instance type + zone + capacity type combination is present in the +// unavailableOfferings cache (recent quota/capacity failure). // // The zone is set to the region because Nebius does not expose availability zones. func CreateOfferings( @@ -49,21 +50,26 @@ func CreateOfferings( p *PlatformPreset, region string, prices Prices, + unavailableOfferings *UnavailableOfferings, ) karpcloudprovider.Offerings { - available := true + pricingAvailable := true if prices.Err != nil { - available = false + pricingAvailable = false log.FromContext(ctx).Error(prices.Err, "failed to resolve price for instance type, marking as unavailable", "instanceType", p.InstanceTypeName(), ) } + instanceTypeName := p.InstanceTypeName() + + onDemandAvailable := pricingAvailable && !unavailableOfferings.IsUnavailable(instanceTypeName, region, v1.CapacityTypeOnDemand) offerings := karpcloudprovider.Offerings{ - newOffering(v1.CapacityTypeOnDemand, region, prices.OnDemand, available), + newOffering(v1.CapacityTypeOnDemand, region, prices.OnDemand, onDemandAvailable), } if p.AllowedForPreemptibles() { - offerings = append(offerings, newOffering(v1.CapacityTypeSpot, region, prices.Preemptible, available)) + spotAvailable := pricingAvailable && !unavailableOfferings.IsUnavailable(instanceTypeName, region, v1.CapacityTypeSpot) + offerings = append(offerings, newOffering(v1.CapacityTypeSpot, region, prices.Preemptible, spotAvailable)) } return offerings diff --git a/karpenter/pkg/cloudproviders/nebius/instancetype/provider.go b/karpenter/pkg/cloudproviders/nebius/instancetype/provider.go index 13f2516..5ab31bf 100644 --- a/karpenter/pkg/cloudproviders/nebius/instancetype/provider.go +++ b/karpenter/pkg/cloudproviders/nebius/instancetype/provider.go @@ -62,6 +62,11 @@ type Provider struct { cache *lru.Cache + // UnavailableOfferings tracks offerings that recently failed due to quota + // or insufficient-capacity errors. Entries expire after a TTL so that + // previously blocked instance types are retried automatically. + UnavailableOfferings *UnavailableOfferings + // activeKeys tracks cache keys that have been fetched at least once, // so the background refresh loop knows what to re-fetch. // (k8s.io/utils/lru does not expose a Keys() method.) @@ -85,20 +90,23 @@ func newProvider( pricingProvider *PricingProvider, ) *Provider { op := &Provider{ - platformService: platformService, - pricingProvider: pricingProvider, - cache: lru.New(cacheSize), - activeKeys: make(map[NodeClassKey]struct{}), - refreshInterval: defaultRefreshInterval, - stopCh: make(chan struct{}), + platformService: platformService, + pricingProvider: pricingProvider, + cache: lru.New(cacheSize), + UnavailableOfferings: NewUnavailableOfferings(), + activeKeys: make(map[NodeClassKey]struct{}), + refreshInterval: defaultRefreshInterval, + stopCh: make(chan struct{}), } go op.refreshLoop() return op } -// Stop terminates the background refresh goroutine. +// Stop terminates the background refresh goroutine and the unavailable +// offerings cleanup goroutine. func (op *Provider) Stop() { close(op.stopCh) + op.UnavailableOfferings.Stop() } func (op *Provider) getOrResolveCachedEntry( @@ -166,7 +174,7 @@ func (op *Provider) GetInstanceTypeByPlatformPreset( presetPrices = Prices{OnDemand: defaultPrice, Preemptible: defaultPrice} } - offerings := CreateOfferings(ctx, preset, key.Region, presetPrices) + offerings := CreateOfferings(ctx, preset, key.Region, presetPrices, op.UnavailableOfferings) return NewInstanceType(key, preset, offerings) } @@ -204,7 +212,7 @@ func (op *Provider) resolve( presetPrices = Prices{OnDemand: defaultPrice, Preemptible: defaultPrice} } - offerings := CreateOfferings(ctx, p, key.Region, presetPrices) + offerings := CreateOfferings(ctx, p, key.Region, presetPrices, op.UnavailableOfferings) it := NewInstanceType(key, p, offerings) instanceTypes = append(instanceTypes, it) } @@ -311,6 +319,8 @@ func (op *Provider) ResolvePlatformPresetFromNodeClaim( // Find the cheapest matching instance type whose offering is compatible // with the NodeClaim's requirements (instance type, zone, capacity type). + // Offerings that are temporarily blocked due to recent quota failures are + // skipped via the UnavailableOfferings cache. var ( bestPreset *PlatformPreset bestOffering *karpcloudprovider.Offering @@ -328,6 +338,15 @@ func (op *Provider) ResolvePlatformPresetFromNodeClaim( if !requirements.IsCompatible(of.Requirements, scheduling.AllowUndefinedWellKnownLabels) { continue } + + // Check the unavailable offerings cache to skip offerings that + // recently failed due to quota/capacity errors. + ofRegion := requirementValue(of.Requirements, corev1.LabelTopologyZone, key.Region) + ofCapType := requirementValue(of.Requirements, karpv1.CapacityTypeLabelKey, karpv1.CapacityTypeOnDemand) + if op.UnavailableOfferings.IsUnavailable(it.Name, ofRegion, ofCapType) { + continue + } + if of.Price < bestPrice { bestPrice = of.Price bestPreset = presetByName[it.Name] @@ -342,18 +361,8 @@ func (op *Provider) ResolvePlatformPresetFromNodeClaim( } // Extract capacity type and zone from the winning offering's requirements. - capacityType := karpv1.CapacityTypeOnDemand - if capReq := bestOffering.Requirements.Get(karpv1.CapacityTypeLabelKey); capReq != nil { - if values := capReq.Values(); len(values) > 0 { - capacityType = values[0] - } - } - zone := key.Region // fallback - if zoneReq := bestOffering.Requirements.Get(corev1.LabelTopologyZone); zoneReq != nil { - if values := zoneReq.Values(); len(values) > 0 { - zone = values[0] - } - } + capacityType := requirementValue(bestOffering.Requirements, karpv1.CapacityTypeLabelKey, karpv1.CapacityTypeOnDemand) + zone := requirementValue(bestOffering.Requirements, corev1.LabelTopologyZone, key.Region) return &PlatformPresetLaunchSettings{ PlatformPreset: bestPreset, @@ -362,6 +371,17 @@ func (op *Provider) ResolvePlatformPresetFromNodeClaim( }, nil } +// requirementValue returns the first value for the given label key from the +// scheduling requirements, or fallback if the key is absent or has no values. +func requirementValue(reqs scheduling.Requirements, key string, fallback string) string { + if req := reqs.Get(key); req != nil { + if values := req.Values(); len(values) > 0 { + return values[0] + } + } + return fallback +} + // On cache hit it returns the presets stored alongside the assembled InstanceTypes. // On cache miss it falls back to listing from the Nebius Platform API. func (op *Provider) getCachedOrListPresets( diff --git a/karpenter/pkg/cloudproviders/nebius/instancetype/unavailableofferings.go b/karpenter/pkg/cloudproviders/nebius/instancetype/unavailableofferings.go new file mode 100644 index 0000000..78695ec --- /dev/null +++ b/karpenter/pkg/cloudproviders/nebius/instancetype/unavailableofferings.go @@ -0,0 +1,142 @@ +package instancetype + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "sigs.k8s.io/controller-runtime/pkg/log" +) + +const ( + // UnavailableOfferingsTTL is the duration for which an offering that + // failed due to a quota/capacity error is considered unavailable. + // After this period, the entry expires and the offering becomes + // eligible for selection again. + UnavailableOfferingsTTL = 30 * time.Minute + + // unavailableOfferingsCleanupInterval is how often expired entries are + // purged from the cache. + unavailableOfferingsCleanupInterval = 5 * time.Minute +) + +// UnavailableOfferings tracks offerings (instance type + region + capacity type +// combinations) that recently failed due to quota or insufficient-capacity +// errors. Entries automatically expire after UnavailableOfferingsTTL. +// +// Region is used instead of zone because Nebius does not expose availability +// zones — the region is the finest-grained location dimension available. +// +// This cache is consulted by ResolvePlatformPresetFromNodeClaim to skip +// offerings that are known to be temporarily unavailable, preventing +// repeated launch-and-fail cycles for the same instance type. +type UnavailableOfferings struct { + mu sync.RWMutex + entries map[string]time.Time // key -> expiry time + + // SeqNum is atomically incremented on every mutation or eviction, + // so watchers can detect when the cache has changed. + SeqNum uint64 + + stopCh chan struct{} +} + +// NewUnavailableOfferings creates a new UnavailableOfferings cache and starts +// a background goroutine to clean up expired entries. +func NewUnavailableOfferings() *UnavailableOfferings { + u := &UnavailableOfferings{ + entries: make(map[string]time.Time), + stopCh: make(chan struct{}), + } + go u.cleanupLoop() + return u +} + +// MarkUnavailable records that the given instance type, region, and capacity +// type combination is currently unavailable due to a quota or capacity error. +// The entry will expire after UnavailableOfferingsTTL. +func (u *UnavailableOfferings) MarkUnavailable(ctx context.Context, reason, instanceType, region, capacityType string) { + u.MarkUnavailableWithTTL(ctx, reason, instanceType, region, capacityType, UnavailableOfferingsTTL) +} + +// MarkUnavailableWithTTL records an unavailable offering with a custom TTL. +func (u *UnavailableOfferings) MarkUnavailableWithTTL(ctx context.Context, reason, instanceType, region, capacityType string, ttl time.Duration) { + key := unavailableOfferingKey(instanceType, region, capacityType) + expiry := time.Now().Add(ttl) + + log.FromContext(ctx).V(1).Info("marking offering as unavailable", + "reason", reason, + "instanceType", instanceType, + "region", region, + "capacityType", capacityType, + "ttl", ttl, + ) + + u.mu.Lock() + u.entries[key] = expiry + u.mu.Unlock() + + atomic.AddUint64(&u.SeqNum, 1) +} + +// IsUnavailable returns true if the given instance type, region, and capacity +// type combination is currently marked as unavailable. +func (u *UnavailableOfferings) IsUnavailable(instanceType, region, capacityType string) bool { + key := unavailableOfferingKey(instanceType, region, capacityType) + + u.mu.RLock() + expiry, found := u.entries[key] + u.mu.RUnlock() + + if !found { + return false + } + + // Treat expired entries as available; cleanup goroutine will remove them. + return time.Now().Before(expiry) +} + +// Stop terminates the background cleanup goroutine. +func (u *UnavailableOfferings) Stop() { + close(u.stopCh) +} + +// cleanupLoop periodically removes expired entries. +func (u *UnavailableOfferings) cleanupLoop() { + ticker := time.NewTicker(unavailableOfferingsCleanupInterval) + defer ticker.Stop() + + for { + select { + case <-u.stopCh: + return + case <-ticker.C: + u.cleanup() + } + } +} + +func (u *UnavailableOfferings) cleanup() { + now := time.Now() + evicted := false + + u.mu.Lock() + for key, expiry := range u.entries { + if now.After(expiry) { + delete(u.entries, key) + evicted = true + } + } + u.mu.Unlock() + + if evicted { + atomic.AddUint64(&u.SeqNum, 1) + } +} + +// unavailableOfferingKey constructs the cache key for a specific offering. +func unavailableOfferingKey(instanceType, region, capacityType string) string { + return fmt.Sprintf("%s:%s:%s", capacityType, instanceType, region) +}