2323from typing import Dict , Any , Optional , Tuple , Union
2424
2525from ray .runtime_env import RuntimeEnv
26- from codeflare_sdk .common .kueue .kueue import get_default_kueue_name
26+ from codeflare_sdk .common .kueue .kueue import (
27+ get_default_kueue_name ,
28+ priority_class_exists ,
29+ )
2730from codeflare_sdk .common .utils .constants import MOUNT_PATH
2831
2932from codeflare_sdk .common .utils .utils import get_ray_image_for_python_version
@@ -69,6 +72,7 @@ def __init__(
6972 ttl_seconds_after_finished : int = 0 ,
7073 active_deadline_seconds : Optional [int ] = None ,
7174 local_queue : Optional [str ] = None ,
75+ priority_class : Optional [str ] = None ,
7276 ):
7377 """
7478 Initialize a RayJob instance.
@@ -86,11 +90,13 @@ def __init__(
8690 ttl_seconds_after_finished: Seconds to wait before cleanup after job finishes (default: 0)
8791 active_deadline_seconds: Maximum time the job can run before being terminated (optional)
8892 local_queue: The Kueue LocalQueue to submit the job to (optional)
93+ priority_class: The Kueue WorkloadPriorityClass name for preemption control (optional).
8994
9095 Note:
9196 - True if cluster_config is provided (new cluster will be cleaned up)
9297 - False if cluster_name is provided (existing cluster will not be shut down)
9398 - User can explicitly set this value to override auto-detection
99+ - Kueue labels (queue and priority) can be applied to both new and existing clusters
94100 """
95101 if cluster_name is None and cluster_config is None :
96102 raise ValueError (
@@ -124,6 +130,7 @@ def __init__(
124130 self .ttl_seconds_after_finished = ttl_seconds_after_finished
125131 self .active_deadline_seconds = active_deadline_seconds
126132 self .local_queue = local_queue
133+ self .priority_class = priority_class
127134
128135 if namespace is None :
129136 detected_namespace = get_current_namespace ()
@@ -165,6 +172,7 @@ def submit(self) -> str:
165172 # Validate configuration before submitting
166173 self ._validate_ray_version_compatibility ()
167174 self ._validate_working_dir_entrypoint ()
175+ self ._validate_priority_class ()
168176
169177 # Extract files from entrypoint and runtime_env working_dir
170178 files = extract_all_local_files (self )
@@ -243,12 +251,14 @@ def _build_rayjob_cr(self) -> Dict[str, Any]:
243251 # Extract files once and use for both runtime_env and submitter pod
244252 files = extract_all_local_files (self )
245253
254+ # Build Kueue labels - only for new clusters (lifecycled)
246255 labels = {}
247- # If cluster_config is provided, use the local_queue from the cluster_config
256+
248257 if self ._cluster_config is not None :
249258 if self .local_queue :
250259 labels ["kueue.x-k8s.io/queue-name" ] = self .local_queue
251260 else :
261+ # Auto-detect default queue for new clusters
252262 default_queue = get_default_kueue_name (self .namespace )
253263 if default_queue :
254264 labels ["kueue.x-k8s.io/queue-name" ] = default_queue
@@ -262,12 +272,23 @@ def _build_rayjob_cr(self) -> Dict[str, Any]:
262272 f"To fix this, please explicitly specify the 'local_queue' parameter."
263273 )
264274
265- rayjob_cr ["metadata" ]["labels" ] = labels
275+ if self .priority_class :
276+ labels ["kueue.x-k8s.io/priority-class" ] = self .priority_class
266277
267- # When using Kueue (queue label present), start with suspend=true
268- # Kueue will unsuspend the job once the workload is admitted
269- if labels .get ("kueue.x-k8s.io/queue-name" ):
270- rayjob_cr ["spec" ]["suspend" ] = True
278+ # Apply labels to metadata
279+ if labels :
280+ rayjob_cr ["metadata" ]["labels" ] = labels
281+
282+ # When using Kueue with lifecycled clusters, start with suspend=true
283+ # Kueue will unsuspend the job once the workload is admitted
284+ if labels .get ("kueue.x-k8s.io/queue-name" ):
285+ rayjob_cr ["spec" ]["suspend" ] = True
286+ else :
287+ if self .local_queue or self .priority_class :
288+ logger .warning (
289+ f"Kueue labels (local_queue, priority_class) are ignored for RayJobs "
290+ f"targeting existing clusters. Kueue only manages RayJobs that create new clusters."
291+ )
271292
272293 # Add active deadline if specified
273294 if self .active_deadline_seconds :
@@ -450,6 +471,32 @@ def _validate_cluster_config_image(self):
450471 elif is_warning :
451472 warnings .warn (f"Cluster config image: { message } " )
452473
474+ def _validate_priority_class (self ):
475+ """
476+ Validate that the priority class exists in the cluster (best effort).
477+
478+ Raises ValueError if the priority class is definitively known not to exist.
479+ If we cannot verify (e.g., permission denied), logs a warning and allows submission.
480+ """
481+ if self .priority_class :
482+ logger .debug (f"Validating priority class '{ self .priority_class } '..." )
483+ exists = priority_class_exists (self .priority_class )
484+
485+ if exists is False :
486+ # Definitively doesn't exist - fail validation
487+ raise ValueError (
488+ f"Priority class '{ self .priority_class } ' does not exist"
489+ )
490+ elif exists is None :
491+ # Cannot verify - log warning and allow submission
492+ logger .warning (
493+ f"Could not verify if priority class '{ self .priority_class } ' exists. "
494+ f"Proceeding with submission - Kueue will validate on admission."
495+ )
496+ else :
497+ # exists is True - validation passed
498+ logger .debug (f"Priority class '{ self .priority_class } ' verified." )
499+
453500 def _validate_working_dir_entrypoint (self ):
454501 """
455502 Validate entrypoint file configuration.
0 commit comments