Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions karpenter/pkg/cloudproviders/nebius/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,21 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v
// of other resources (disk, nic, etc). So here we delete the created resource
// in best effort when seeing quota error.
// FIXME: use a better clean up helper to perform the clean up in background
//
// TODO: currently nebius doesn't provide a way for us to check if the capacity exists before real creation.
// This could result in deadlock case where the node claim is being repeatedly created and failed for the same
// instance type with quota issue. In such case, we are relying on the user to fix the quota or update the
// node pool to exclude the problematic instance type. In the future, we should consider implementing a proactive
// model to filter out these instance types internally and temporarily.

// Record the failed instance type in the unavailable offerings cache
// so that ResolvePlatformPresetFromNodeClaim will skip this offering
// on subsequent attempts. The entry expires after a TTL, at which point
// the offering becomes eligible for selection again.
// NOTE: Nebius does not expose availability zones at this moment,
// so we use the region as the location dimension for the cache key.
c.instanceTypeProvider.UnavailableOfferings.MarkUnavailable(
ctx,
"QuotaFailure",
launchSettings.InstanceTypeName(),
nodeClass.Spec.Region,
launchSettings.CapacityType,
)

go func() {
cleanUpCtx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
Expand Down
20 changes: 13 additions & 7 deletions karpenter/pkg/cloudproviders/nebius/instancetype/offerings.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,31 +39,37 @@ type Prices struct {
// If the platform allows preemptible instances, a second offering with capacity type "spot"
// is added (Karpenter uses "spot" as the generic term for preemptible/interruptible).
//
// When the pricing lookup failed (prices.Err != nil), offerings are still created with
// the fallback default price but marked as unavailable so Karpenter will not schedule
// pods onto them.
// An offering is marked unavailable when:
// - the pricing lookup failed (prices.Err != nil), or
// - the instance type + zone + capacity type combination is present in the
// unavailableOfferings cache (recent quota/capacity failure).
//
// The zone is set to the region because Nebius does not expose availability zones.
func CreateOfferings(
ctx context.Context,
p *PlatformPreset,
region string,
prices Prices,
unavailableOfferings *UnavailableOfferings,
) karpcloudprovider.Offerings {
available := true
pricingAvailable := true
if prices.Err != nil {
available = false
pricingAvailable = false
log.FromContext(ctx).Error(prices.Err, "failed to resolve price for instance type, marking as unavailable",
"instanceType", p.InstanceTypeName(),
)
}

instanceTypeName := p.InstanceTypeName()

onDemandAvailable := pricingAvailable && !unavailableOfferings.IsUnavailable(instanceTypeName, region, v1.CapacityTypeOnDemand)
offerings := karpcloudprovider.Offerings{
newOffering(v1.CapacityTypeOnDemand, region, prices.OnDemand, available),
newOffering(v1.CapacityTypeOnDemand, region, prices.OnDemand, onDemandAvailable),
}

if p.AllowedForPreemptibles() {
offerings = append(offerings, newOffering(v1.CapacityTypeSpot, region, prices.Preemptible, available))
spotAvailable := pricingAvailable && !unavailableOfferings.IsUnavailable(instanceTypeName, region, v1.CapacityTypeSpot)
offerings = append(offerings, newOffering(v1.CapacityTypeSpot, region, prices.Preemptible, spotAvailable))
}

return offerings
Expand Down
62 changes: 41 additions & 21 deletions karpenter/pkg/cloudproviders/nebius/instancetype/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ type Provider struct {

cache *lru.Cache

// UnavailableOfferings tracks offerings that recently failed due to quota
// or insufficient-capacity errors. Entries expire after a TTL so that
// previously blocked instance types are retried automatically.
UnavailableOfferings *UnavailableOfferings

// activeKeys tracks cache keys that have been fetched at least once,
// so the background refresh loop knows what to re-fetch.
// (k8s.io/utils/lru does not expose a Keys() method.)
Expand All @@ -85,20 +90,23 @@ func newProvider(
pricingProvider *PricingProvider,
) *Provider {
op := &Provider{
platformService: platformService,
pricingProvider: pricingProvider,
cache: lru.New(cacheSize),
activeKeys: make(map[NodeClassKey]struct{}),
refreshInterval: defaultRefreshInterval,
stopCh: make(chan struct{}),
platformService: platformService,
pricingProvider: pricingProvider,
cache: lru.New(cacheSize),
UnavailableOfferings: NewUnavailableOfferings(),
activeKeys: make(map[NodeClassKey]struct{}),
refreshInterval: defaultRefreshInterval,
stopCh: make(chan struct{}),
}
go op.refreshLoop()
return op
}

// Stop terminates the background refresh goroutine.
// Stop terminates the background refresh goroutine and the unavailable
// offerings cleanup goroutine.
func (op *Provider) Stop() {
close(op.stopCh)
op.UnavailableOfferings.Stop()
}

func (op *Provider) getOrResolveCachedEntry(
Expand Down Expand Up @@ -166,7 +174,7 @@ func (op *Provider) GetInstanceTypeByPlatformPreset(
presetPrices = Prices{OnDemand: defaultPrice, Preemptible: defaultPrice}
}

offerings := CreateOfferings(ctx, preset, key.Region, presetPrices)
offerings := CreateOfferings(ctx, preset, key.Region, presetPrices, op.UnavailableOfferings)
return NewInstanceType(key, preset, offerings)
}

Expand Down Expand Up @@ -204,7 +212,7 @@ func (op *Provider) resolve(
presetPrices = Prices{OnDemand: defaultPrice, Preemptible: defaultPrice}
}

offerings := CreateOfferings(ctx, p, key.Region, presetPrices)
offerings := CreateOfferings(ctx, p, key.Region, presetPrices, op.UnavailableOfferings)
it := NewInstanceType(key, p, offerings)
instanceTypes = append(instanceTypes, it)
}
Expand Down Expand Up @@ -311,6 +319,8 @@ func (op *Provider) ResolvePlatformPresetFromNodeClaim(

// Find the cheapest matching instance type whose offering is compatible
// with the NodeClaim's requirements (instance type, zone, capacity type).
// Offerings that are temporarily blocked due to recent quota failures are
// skipped via the UnavailableOfferings cache.
var (
bestPreset *PlatformPreset
bestOffering *karpcloudprovider.Offering
Expand All @@ -328,6 +338,15 @@ func (op *Provider) ResolvePlatformPresetFromNodeClaim(
if !requirements.IsCompatible(of.Requirements, scheduling.AllowUndefinedWellKnownLabels) {
continue
}

// Check the unavailable offerings cache to skip offerings that
// recently failed due to quota/capacity errors.
ofRegion := requirementValue(of.Requirements, corev1.LabelTopologyZone, key.Region)
ofCapType := requirementValue(of.Requirements, karpv1.CapacityTypeLabelKey, karpv1.CapacityTypeOnDemand)
if op.UnavailableOfferings.IsUnavailable(it.Name, ofRegion, ofCapType) {
continue
}

if of.Price < bestPrice {
bestPrice = of.Price
bestPreset = presetByName[it.Name]
Expand All @@ -342,18 +361,8 @@ func (op *Provider) ResolvePlatformPresetFromNodeClaim(
}

// Extract capacity type and zone from the winning offering's requirements.
capacityType := karpv1.CapacityTypeOnDemand
if capReq := bestOffering.Requirements.Get(karpv1.CapacityTypeLabelKey); capReq != nil {
if values := capReq.Values(); len(values) > 0 {
capacityType = values[0]
}
}
zone := key.Region // fallback
if zoneReq := bestOffering.Requirements.Get(corev1.LabelTopologyZone); zoneReq != nil {
if values := zoneReq.Values(); len(values) > 0 {
zone = values[0]
}
}
capacityType := requirementValue(bestOffering.Requirements, karpv1.CapacityTypeLabelKey, karpv1.CapacityTypeOnDemand)
zone := requirementValue(bestOffering.Requirements, corev1.LabelTopologyZone, key.Region)

return &PlatformPresetLaunchSettings{
PlatformPreset: bestPreset,
Expand All @@ -362,6 +371,17 @@ func (op *Provider) ResolvePlatformPresetFromNodeClaim(
}, nil
}

// requirementValue returns the first value for the given label key from the
// scheduling requirements, or fallback if the key is absent or has no values.
func requirementValue(reqs scheduling.Requirements, key string, fallback string) string {
if req := reqs.Get(key); req != nil {
if values := req.Values(); len(values) > 0 {
return values[0]
}
}
return fallback
}

// On cache hit it returns the presets stored alongside the assembled InstanceTypes.
// On cache miss it falls back to listing from the Nebius Platform API.
func (op *Provider) getCachedOrListPresets(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package instancetype

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"sigs.k8s.io/controller-runtime/pkg/log"
)

const (
// UnavailableOfferingsTTL is the duration for which an offering that
// failed due to a quota/capacity error is considered unavailable.
// After this period, the entry expires and the offering becomes
// eligible for selection again.
UnavailableOfferingsTTL = 30 * time.Minute

// unavailableOfferingsCleanupInterval is how often expired entries are
// purged from the cache.
unavailableOfferingsCleanupInterval = 5 * time.Minute
)

// UnavailableOfferings tracks offerings (instance type + region + capacity type
// combinations) that recently failed due to quota or insufficient-capacity
// errors. Entries automatically expire after UnavailableOfferingsTTL.
//
// Region is used instead of zone because Nebius does not expose availability
// zones — the region is the finest-grained location dimension available.
//
// This cache is consulted by ResolvePlatformPresetFromNodeClaim to skip
// offerings that are known to be temporarily unavailable, preventing
// repeated launch-and-fail cycles for the same instance type.
type UnavailableOfferings struct {
mu sync.RWMutex
entries map[string]time.Time // key -> expiry time

// SeqNum is atomically incremented on every mutation or eviction,
// so watchers can detect when the cache has changed.
SeqNum uint64

stopCh chan struct{}
}

// NewUnavailableOfferings creates a new UnavailableOfferings cache and starts
// a background goroutine to clean up expired entries.
func NewUnavailableOfferings() *UnavailableOfferings {
u := &UnavailableOfferings{
entries: make(map[string]time.Time),
stopCh: make(chan struct{}),
}
go u.cleanupLoop()
return u
}

// MarkUnavailable records that the given instance type, region, and capacity
// type combination is currently unavailable due to a quota or capacity error.
// The entry will expire after UnavailableOfferingsTTL.
func (u *UnavailableOfferings) MarkUnavailable(ctx context.Context, reason, instanceType, region, capacityType string) {
u.MarkUnavailableWithTTL(ctx, reason, instanceType, region, capacityType, UnavailableOfferingsTTL)
}

// MarkUnavailableWithTTL records an unavailable offering with a custom TTL.
func (u *UnavailableOfferings) MarkUnavailableWithTTL(ctx context.Context, reason, instanceType, region, capacityType string, ttl time.Duration) {
key := unavailableOfferingKey(instanceType, region, capacityType)
expiry := time.Now().Add(ttl)

log.FromContext(ctx).V(1).Info("marking offering as unavailable",
"reason", reason,
"instanceType", instanceType,
"region", region,
"capacityType", capacityType,
"ttl", ttl,
)

u.mu.Lock()
u.entries[key] = expiry
u.mu.Unlock()

atomic.AddUint64(&u.SeqNum, 1)
}

// IsUnavailable returns true if the given instance type, region, and capacity
// type combination is currently marked as unavailable.
func (u *UnavailableOfferings) IsUnavailable(instanceType, region, capacityType string) bool {
key := unavailableOfferingKey(instanceType, region, capacityType)

u.mu.RLock()
expiry, found := u.entries[key]
u.mu.RUnlock()

if !found {
return false
}

// Treat expired entries as available; cleanup goroutine will remove them.
return time.Now().Before(expiry)
}

// Stop terminates the background cleanup goroutine.
func (u *UnavailableOfferings) Stop() {
close(u.stopCh)
}

// cleanupLoop periodically removes expired entries.
func (u *UnavailableOfferings) cleanupLoop() {
ticker := time.NewTicker(unavailableOfferingsCleanupInterval)
defer ticker.Stop()

for {
select {
case <-u.stopCh:
return
case <-ticker.C:
u.cleanup()
}
}
}

func (u *UnavailableOfferings) cleanup() {
now := time.Now()
evicted := false

u.mu.Lock()
for key, expiry := range u.entries {
if now.After(expiry) {
delete(u.entries, key)
evicted = true
}
}
u.mu.Unlock()

if evicted {
atomic.AddUint64(&u.SeqNum, 1)
}
}

// unavailableOfferingKey constructs the cache key for a specific offering.
func unavailableOfferingKey(instanceType, region, capacityType string) string {
return fmt.Sprintf("%s:%s:%s", capacityType, instanceType, region)
}