55import jsonpatch
66
77from framework .utils import (
8+ create_custom_object ,
9+ delete_custom_object ,
10+ get_head_pod ,
811 logger ,
12+ pod_exec_command ,
913 shell_subprocess_run ,
1014 shell_subprocess_check_output ,
1115 CONST ,
@@ -47,9 +51,9 @@ def get_expected_head_pods(custom_resource):
4751 """Get the number of head pods in custom_resource"""
4852 resource_kind = custom_resource ["kind" ]
4953 head_replica_paths = {
50- "RayCluster" : "spec.headGroupSpec.replicas" ,
51- "RayService" : "spec.rayClusterConfig.headGroupSpec.replicas" ,
52- "RayJob" : "spec.rayClusterSpec.headGroupSpec.replicas"
54+ CONST . RAY_CLUSTER_CRD : "spec.headGroupSpec.replicas" ,
55+ CONST . RAY_SERVICE_CRD : "spec.rayClusterConfig.headGroupSpec.replicas" ,
56+ CONST . RAY_JOB_CRD : "spec.rayClusterSpec.headGroupSpec.replicas"
5357 }
5458 if resource_kind in head_replica_paths :
5559 path = head_replica_paths [resource_kind ]
@@ -60,9 +64,9 @@ def get_expected_worker_pods(custom_resource):
6064 """Get the number of head pods in custom_resource"""
6165 resource_kind = custom_resource ["kind" ]
6266 worker_specs_paths = {
63- "RayCluster" : "spec.workerGroupSpecs" ,
64- "RayService" : "spec.rayClusterConfig.workerGroupSpecs" ,
65- "RayJob" : "spec.rayClusterSpec.workerGroupSpecs"
67+ CONST . RAY_CLUSTER_CRD : "spec.workerGroupSpecs" ,
68+ CONST . RAY_SERVICE_CRD : "spec.rayClusterConfig.workerGroupSpecs" ,
69+ CONST . RAY_JOB_CRD : "spec.rayClusterSpec.workerGroupSpecs"
6670 }
6771 if resource_kind in worker_specs_paths :
6872 path = worker_specs_paths [resource_kind ]
@@ -95,6 +99,7 @@ class Mutator:
9599 def __init__ (self , base_custom_resource , json_patch_list : List [jsonpatch .JsonPatch ]):
96100 self .base_cr = base_custom_resource
97101 self .patch_list = json_patch_list
102+
98103 def mutate (self ):
99104 """ Generate a new cr by applying the json patch to `cr`. """
100105 for patch in self .patch_list :
@@ -108,12 +113,14 @@ class Rule:
108113 """
109114 def __init__ (self ):
110115 pass
116+
111117 def trigger_condition (self , custom_resource = None ) -> bool :
112118 """
113119 The rule will only be checked when `trigger_condition` is true. For example, we will only
114120 check "HeadPodNameRule" when "spec.headGroupSpec" is defined in CR YAML file.
115121 """
116122 return True
123+
117124 def assert_rule (self , custom_resource = None , cr_namespace = 'default' ):
118125 """Check whether the actual cluster state fulfills the rule or not."""
119126 raise NotImplementedError
@@ -122,6 +129,7 @@ class RuleSet:
122129 """A set of Rule"""
123130 def __init__ (self , rules : List [Rule ]):
124131 self .rules = rules
132+
125133 def check_rule_set (self , custom_resource , namespace ):
126134 """Check all rules that the trigger conditions are fulfilled."""
127135 for rule in self .rules :
@@ -152,19 +160,26 @@ def trigger(self):
152160 self .exec ()
153161 self .wait ()
154162 self .check_rule_sets ()
163+
155164 def exec (self ):
156165 """
157166 Execute a command to trigger the CREvent. For example, create a CR by a
158167 `kubectl apply` command.
159168 """
160- raise NotImplementedError
169+ if not self .filepath :
170+ create_custom_object (self .namespace , self .custom_resource_object )
171+ else :
172+ shell_subprocess_run (f"kubectl apply -n { self .namespace } -f { self .filepath } " )
173+
161174 def wait (self ):
162175 """Wait for the system to converge."""
163176 time .sleep (self .timeout )
177+
164178 def check_rule_sets (self ):
165179 """When the system converges, check all registered RuleSets."""
166180 for ruleset in self .rulesets :
167181 ruleset .check_rule_set (self .custom_resource_object , self .namespace )
182+
168183 def clean_up (self ):
169184 """Cleanup the CR."""
170185 raise NotImplementedError
@@ -179,9 +194,8 @@ def trigger_condition(self, custom_resource=None) -> bool:
179194 def assert_rule (self , custom_resource = None , cr_namespace = 'default' ):
180195 expected_val = search_path (custom_resource ,
181196 "spec.headGroupSpec.template.spec.containers.0.name" .split ('.' ))
182- headpods = K8S_CLUSTER_MANAGER .k8s_client_dict [CONST .K8S_V1_CLIENT_KEY ].list_namespaced_pod (
183- namespace = cr_namespace , label_selector = 'ray.io/node-type=head' )
184- assert headpods .items [0 ].spec .containers [0 ].name == expected_val
197+ headpod = get_head_pod (cr_namespace )
198+ assert headpod .spec .containers [0 ].name == expected_val
185199
186200class HeadSvcRule (Rule ):
187201 """The labels of the head pod and the selectors of the head service must match."""
@@ -199,12 +213,10 @@ def assert_rule(self, custom_resource=None, cr_namespace='default'):
199213class EasyJobRule (Rule ):
200214 """Submit a very simple Ray job to test the basic functionality of the Ray cluster."""
201215 def assert_rule (self , custom_resource = None , cr_namespace = 'default' ):
202- k8s_v1_api = K8S_CLUSTER_MANAGER .k8s_client_dict [CONST .K8S_V1_CLIENT_KEY ]
203- headpods = k8s_v1_api .list_namespaced_pod (
204- namespace = cr_namespace , label_selector = 'ray.io/node-type=head' )
205- headpod_name = headpods .items [0 ].metadata .name
206- shell_subprocess_run (f"kubectl exec { headpod_name } -n { cr_namespace } --" +
207- " python -c \" import ray; ray.init(); print(ray.cluster_resources())\" " )
216+ headpod = get_head_pod (cr_namespace )
217+ headpod_name = headpod .metadata .name
218+ pod_exec_command (headpod_name , cr_namespace ,
219+ "python -c \" import ray; ray.init(); print(ray.cluster_resources())\" " )
208220
209221class CurlServiceRule (Rule ):
210222 """"Using curl to access the deployed application on Ray service"""
@@ -231,15 +243,6 @@ def assert_rule(self, custom_resource=None, cr_namespace='default'):
231243
232244class RayClusterAddCREvent (CREvent ):
233245 """CREvent for RayCluster addition"""
234- def exec (self ):
235- if not self .filepath :
236- k8s_cr_api = K8S_CLUSTER_MANAGER .k8s_client_dict [CONST .K8S_CR_CLIENT_KEY ]
237- k8s_cr_api .create_namespaced_custom_object (
238- group = 'ray.io' ,version = 'v1alpha1' , namespace = self .namespace ,
239- plural = 'rayclusters' , body = self .custom_resource_object )
240- else :
241- shell_subprocess_run (f"kubectl apply -n { self .namespace } -f { self .filepath } " )
242-
243246 def wait (self ):
244247 start_time = time .time ()
245248 expected_head_pods = get_expected_head_pods (self .custom_resource_object )
@@ -272,10 +275,11 @@ def wait(self):
272275
273276 def clean_up (self ):
274277 """Delete added RayCluster"""
275- k8s_cr_api = K8S_CLUSTER_MANAGER .k8s_client_dict [CONST .K8S_CR_CLIENT_KEY ]
276- k8s_cr_api .delete_namespaced_custom_object (
277- group = 'ray.io' , version = 'v1alpha1' , namespace = self .namespace ,
278- plural = 'rayclusters' , name = self .custom_resource_object ['metadata' ]['name' ])
278+ if not self .filepath :
279+ delete_custom_object (CONST .RAY_CLUSTER_CRD ,
280+ self .namespace , self .custom_resource_object ['metadata' ]['name' ])
281+ else :
282+ shell_subprocess_run (f"kubectl delete -n { self .namespace } -f { self .filepath } " )
279283 # Wait pods to be deleted
280284 converge = False
281285 k8s_v1_api = K8S_CLUSTER_MANAGER .k8s_client_dict [CONST .K8S_V1_CLIENT_KEY ]
@@ -300,16 +304,6 @@ def clean_up(self):
300304
301305class RayServiceAddCREvent (CREvent ):
302306 """CREvent for RayService addition"""
303- def exec (self ):
304- """Wait for RayService to converge""" ""
305- if not self .filepath :
306- k8s_cr_api = K8S_CLUSTER_MANAGER .k8s_client_dict [CONST .K8S_CR_CLIENT_KEY ]
307- k8s_cr_api .create_namespaced_custom_object (
308- group = 'ray.io' ,version = 'v1alpha1' , namespace = self .namespace ,
309- plural = 'rayservices' , body = self .custom_resource_object )
310- else :
311- shell_subprocess_run (f"kubectl apply -n { self .namespace } -f { self .filepath } " )
312-
313307 def wait (self ):
314308 """Wait for RayService to converge""" ""
315309 start_time = time .time ()
@@ -347,10 +341,11 @@ def wait(self):
347341
348342 def clean_up (self ):
349343 """Delete added RayService"""
350- k8s_cr_api = K8S_CLUSTER_MANAGER .k8s_client_dict [CONST .K8S_CR_CLIENT_KEY ]
351- k8s_cr_api .delete_namespaced_custom_object (
352- group = 'ray.io' , version = 'v1alpha1' , namespace = self .namespace ,
353- plural = 'rayservices' , name = self .custom_resource_object ['metadata' ]['name' ])
344+ if not self .filepath :
345+ delete_custom_object (CONST .RAY_SERVICE_CRD ,
346+ self .namespace , self .custom_resource_object ['metadata' ]['name' ])
347+ else :
348+ shell_subprocess_run (f"kubectl delete -n { self .namespace } -f { self .filepath } " )
354349 # Wait pods to be deleted
355350 converge = False
356351 k8s_v1_api = K8S_CLUSTER_MANAGER .k8s_client_dict [CONST .K8S_V1_CLIENT_KEY ]
0 commit comments