From bdf51805ef2b52bb6e702ccf797a4046b1912847 Mon Sep 17 00:00:00 2001 From: bio-boris Date: Fri, 27 Aug 2021 17:12:39 -0500 Subject: [PATCH 01/12] API design --- execution_engine2.spec | 51 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/execution_engine2.spec b/execution_engine2.spec index eb042674e..e4bbed2c0 100644 --- a/execution_engine2.spec +++ b/execution_engine2.spec @@ -16,6 +16,9 @@ /* A job id. */ typedef string job_id; + /* A job state. */ + typedef string job_state; + /* A structure representing the Execution Engine status git_commit - the Git hash of the version of the module. @@ -229,6 +232,20 @@ boolean as_admin; } BulkRetryParams; + + /* + batch job_id to retry + status_filter: job states in ['terminated', 'error'] (valid retry states) + as_admin: retry someone else's job in your namespace + #TODO: Possibly Add list job_requirements; + */ + typedef structure { + list job_ids; + list status_filter; + boolean as_admin; + } BatchRetryParams; + + /* #TODO write retry parent tests to ensure BOTH the parent_job_id is present, and retry_job_id is present #TODO Add retry child that checks the status of the child? to prevent multiple retries @@ -246,14 +263,17 @@ */ funcdef retry_jobs(BulkRetryParams params) returns (list retry_result) authentication required; - + /* + Retry a job based on a batch id with a job_state status list ['error', 'terminated'] + Requires the user to keep track of the job states of the Status enum in the ee2 models file + If no status_list is provided, an exception is thrown. + */ + funcdef retry_batch_jobs(BatchRetry params) returns (list retry_result) authentication required; funcdef abandon_children(AbandonChildren params) returns (BatchSubmission parent_and_child_ids) authentication required; - - /* EE2Constants Concierge Params are request_cpus: int request_memory: int in MB @@ -585,6 +605,31 @@ */ funcdef cancel_job(CancelJobParams params) returns () authentication required; + + + /* + job_id: batch job to retry + as_admin: retry someone else's job in your namespace + status_filter: optional filter of either 'terminated' or 'error'. Not setting this results in cancel of both + #TODO: Possibly Add list job_requirements; + */ + typedef structure { + job_id batch_job_id; + boolean as_admin; + str status_filter; + } CancelBatchJobParams; + + /* + Cancels children of a batch job. This results in the status becoming "terminated" with termination_code 0. + Valid statuses are ['created', 'estimating', 'queued', 'running'] + (Requires the user to keep track of the job states of the Status enum in the ee2 models file) + If no status_filter is provided, an exception is thrown. + */ + funcdef cancel_batch_job(CancelBatchJobParams params) returns () authentication required; + + + + /* job_id - id of job running method finished - indicates whether job is done (including error/cancel cases) or not From 7011bfe8532a04c4d0b44859d9e5cfa4a9bd36a1 Mon Sep 17 00:00:00 2001 From: bio-boris Date: Fri, 27 Aug 2021 17:19:44 -0500 Subject: [PATCH 02/12] API design --- execution_engine2.spec | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/execution_engine2.spec b/execution_engine2.spec index e4bbed2c0..b1b324990 100644 --- a/execution_engine2.spec +++ b/execution_engine2.spec @@ -240,7 +240,7 @@ #TODO: Possibly Add list job_requirements; */ typedef structure { - list job_ids; + job_id batch_job_id; list status_filter; boolean as_admin; } BatchRetryParams; @@ -606,18 +606,17 @@ funcdef cancel_job(CancelJobParams params) returns () authentication required; - /* job_id: batch job to retry - as_admin: retry someone else's job in your namespace status_filter: optional filter of either 'terminated' or 'error'. Not setting this results in cancel of both + as_admin: retry someone else's job in your namespace #TODO: Possibly Add list job_requirements; */ typedef structure { job_id batch_job_id; + list status_filter; boolean as_admin; - str status_filter; - } CancelBatchJobParams; + } BatchCancelParams; /* Cancels children of a batch job. This results in the status becoming "terminated" with termination_code 0. @@ -625,8 +624,7 @@ (Requires the user to keep track of the job states of the Status enum in the ee2 models file) If no status_filter is provided, an exception is thrown. */ - funcdef cancel_batch_job(CancelBatchJobParams params) returns () authentication required; - + funcdef cancel_batch_job(BatchCancelParams params) returns () authentication required; From 3133d58a5eaeab4c876f4282b65b847bcd996b5f Mon Sep 17 00:00:00 2001 From: bio-boris Date: Fri, 27 Aug 2021 17:21:07 -0500 Subject: [PATCH 03/12] API design --- execution_engine2.spec | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/execution_engine2.spec b/execution_engine2.spec index b1b324990..ef33135ed 100644 --- a/execution_engine2.spec +++ b/execution_engine2.spec @@ -234,7 +234,7 @@ /* - batch job_id to retry + batch_job_id: BATCH_ID to retry status_filter: job states in ['terminated', 'error'] (valid retry states) as_admin: retry someone else's job in your namespace #TODO: Possibly Add list job_requirements; @@ -607,10 +607,9 @@ /* - job_id: batch job to retry + batch_job_id: BATCH_ID to cancel status_filter: optional filter of either 'terminated' or 'error'. Not setting this results in cancel of both as_admin: retry someone else's job in your namespace - #TODO: Possibly Add list job_requirements; */ typedef structure { job_id batch_job_id; From ac8da691d9012571b51995a68fb826c8ae32e146 Mon Sep 17 00:00:00 2001 From: bio-boris Date: Fri, 27 Aug 2021 17:24:04 -0500 Subject: [PATCH 04/12] API design --- execution_engine2.spec | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/execution_engine2.spec b/execution_engine2.spec index ef33135ed..37a62df09 100644 --- a/execution_engine2.spec +++ b/execution_engine2.spec @@ -16,8 +16,8 @@ /* A job id. */ typedef string job_id; - /* A job state. */ - typedef string job_state; + /* A job state's job status. */ + typedef string job_status; /* A structure representing the Execution Engine status @@ -241,7 +241,7 @@ */ typedef structure { job_id batch_job_id; - list status_filter; + list status_filter; boolean as_admin; } BatchRetryParams; @@ -613,7 +613,7 @@ */ typedef structure { job_id batch_job_id; - list status_filter; + list status_filter; boolean as_admin; } BatchCancelParams; From 9adfa50d1ea628eb3907038c9993c6a52fe34fc7 Mon Sep 17 00:00:00 2001 From: bio-boris Date: Fri, 27 Aug 2021 17:29:31 -0500 Subject: [PATCH 05/12] Api --- .../execution_engine2Impl.py | 96 ++++++++++++++----- .../execution_engine2Server.py | 16 ++++ 2 files changed, 90 insertions(+), 22 deletions(-) diff --git a/lib/execution_engine2/execution_engine2Impl.py b/lib/execution_engine2/execution_engine2Impl.py index 49e7b9b9a..2198d02ec 100644 --- a/lib/execution_engine2/execution_engine2Impl.py +++ b/lib/execution_engine2/execution_engine2Impl.py @@ -29,8 +29,8 @@ class execution_engine2: # the latter method is running. ######################################### noqa VERSION = "0.0.5" - GIT_URL = "https://github.com/mrcreosote/execution_engine2.git" - GIT_COMMIT_HASH = "2ad95ce47caa4f1e7b939651f2b1773840e67a8a" + GIT_URL = "git@github.com:kbase/execution_engine2.git" + GIT_COMMIT_HASH = "ac8da691d9012571b51995a68fb826c8ae32e146" #BEGIN_CLASS_HEADER MONGO_COLLECTION = "jobs" @@ -450,6 +450,38 @@ def retry_jobs(self, ctx, params): # return the results return [retry_result] + def retry_batch_jobs(self, ctx, params): + """ + Retry a job based on a batch id with a job_state status list ['error', 'terminated'] + Requires the user to keep track of the job states of the Status enum in the ee2 models file + If no status_list is provided, an exception is thrown. + :param params: instance of type "BatchRetryParams" (batch_job_id: + BATCH_ID to retry status_filter: job states in ['terminated', + 'error'] (valid retry states) as_admin: retry someone else's job + in your namespace #TODO: Possibly Add list + job_requirements;) -> structure: parameter "batch_job_id" of type + "job_id" (A job id.), parameter "status_filter" of list of type + "job_status" (A job state's job status.), parameter "as_admin" of + type "boolean" (@range [0,1]) + :returns: instance of list of type "RetryResult" (job_id of retried + job retry_id: job_id of the job that was launched str error: + reason as to why that particular retry failed (available for bulk + retry only)) -> structure: parameter "job_id" of type "job_id" (A + job id.), parameter "retry_id" of type "job_id" (A job id.), + parameter "error" of String + """ + # ctx is the context object + # return variables are: retry_result + #BEGIN retry_batch_jobs + #END retry_batch_jobs + + # At some point might do deeper type checking... + if not isinstance(retry_result, list): + raise ValueError('Method retry_batch_jobs return value ' + + 'retry_result is not type list as required.') + # return the results + return [retry_result] + def abandon_children(self, ctx, params): """ :param params: instance of type "AbandonChildren" -> structure: @@ -911,7 +943,7 @@ def check_job(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1033,19 +1065,19 @@ def check_job_batch(self, ctx, params): of list of String, parameter "as_admin" of type "boolean" (@range [0,1]) :returns: instance of type "CheckJobBatchResults" (batch_jobstate - - state of parent job of the batch child_jobstates - states of child - jobs IDEA: ADD aggregate_states - count of all available child job - states, even if they are zero) -> structure: parameter - "batch_jobstate" of type "JobState" (job_id - string - id of the - job user - string - user who started the job wsid - int - optional - id of the workspace where the job is bound authstrat - string - - what strategy used to authenticate the job job_input - object - - inputs to the job (from the run_job call) ## TODO - verify - job_output - object - outputs from the job (from the run_job call) - ## TODO - verify updated - int - timestamp since epoch in - milliseconds of the last time the status was updated running - int - - timestamp since epoch in milliseconds of when it entered the - running state created - int - timestamp since epoch in + state of the coordinating job for the batch child_jobstates - + states of child jobs IDEA: ADD aggregate_states - count of all + available child job states, even if they are zero) -> structure: + parameter "batch_jobstate" of type "JobState" (job_id - string - + id of the job user - string - user who started the job wsid - int + - optional id of the workspace where the job is bound authstrat - + string - what strategy used to authenticate the job job_input - + object - inputs to the job (from the run_job call) ## TODO - + verify job_output - object - outputs from the job (from the + run_job call) ## TODO - verify updated - int - timestamp since + epoch in milliseconds of the last time the status was updated + running - int - timestamp since epoch in milliseconds of when it + entered the running state created - int - timestamp since epoch in milliseconds when the job was created finished - int - timestamp since epoch in milliseconds when the job was finished status - string - status of the job. one of the following: created - job @@ -1068,7 +1100,7 @@ def check_job_batch(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1189,7 +1221,7 @@ def check_job_batch(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1343,7 +1375,7 @@ def check_jobs(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1500,7 +1532,7 @@ def check_workspace_jobs(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1641,6 +1673,26 @@ def cancel_job(self, ctx, params): #END cancel_job pass + def cancel_batch_job(self, ctx, params): + """ + Cancels children of a batch job. This results in the status becoming "terminated" with termination_code 0. + Valid statuses are ['created', 'estimating', 'queued', 'running'] + (Requires the user to keep track of the job states of the Status enum in the ee2 models file) + If no status_filter is provided, an exception is thrown. + :param params: instance of type "BatchCancelParams" (batch_job_id: + BATCH_ID to cancel status_filter: optional filter of either + 'terminated' or 'error'. Not setting this results in cancel of + both as_admin: retry someone else's job in your namespace) -> + structure: parameter "batch_job_id" of type "job_id" (A job id.), + parameter "status_filter" of list of type "job_status" (A job + state's job status.), parameter "as_admin" of type "boolean" + (@range [0,1]) + """ + # ctx is the context object + #BEGIN cancel_batch_job + #END cancel_batch_job + pass + def check_job_canceled(self, ctx, params): """ Check whether a job has been canceled. This method is lightweight compared to check_job. @@ -1801,7 +1853,7 @@ def check_jobs_date_range_for_user(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -2016,7 +2068,7 @@ def check_jobs_date_range_for_all(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe diff --git a/lib/execution_engine2/execution_engine2Server.py b/lib/execution_engine2/execution_engine2Server.py index b63fe2210..2bdbb8564 100644 --- a/lib/execution_engine2/execution_engine2Server.py +++ b/lib/execution_engine2/execution_engine2Server.py @@ -412,6 +412,14 @@ def __init__(self): types=[dict], ) self.method_authentication["execution_engine2.retry_jobs"] = "required" # noqa + self.rpc_service.add( + impl_execution_engine2.retry_batch_jobs, + name="execution_engine2.retry_batch_jobs", + types=[dict], + ) + self.method_authentication[ + "execution_engine2.retry_batch_jobs" + ] = "required" # noqa self.rpc_service.add( impl_execution_engine2.abandon_children, name="execution_engine2.abandon_children", @@ -506,6 +514,14 @@ def __init__(self): types=[dict], ) self.method_authentication["execution_engine2.cancel_job"] = "required" # noqa + self.rpc_service.add( + impl_execution_engine2.cancel_batch_job, + name="execution_engine2.cancel_batch_job", + types=[dict], + ) + self.method_authentication[ + "execution_engine2.cancel_batch_job" + ] = "required" # noqa self.rpc_service.add( impl_execution_engine2.check_job_canceled, name="execution_engine2.check_job_canceled", From 2d3f5eee6dca809d60c4e6f8a1218d0c4a056486 Mon Sep 17 00:00:00 2001 From: bio-boris Date: Thu, 2 Sep 2021 11:51:36 -0500 Subject: [PATCH 06/12] Add endpoints --- execution_engine2.spec | 9 ++- lib/execution_engine2/db/MongoUtil.py | 14 ++++ lib/execution_engine2/exceptions.py | 8 ++ .../execution_engine2Impl.py | 46 +++++++++--- lib/execution_engine2/sdk/EE2Runjob.py | 35 ++++++++- lib/execution_engine2/sdk/EE2Status.py | 74 +++++++++++++++++-- lib/execution_engine2/sdk/SDKMethodRunner.py | 15 ++++ 7 files changed, 181 insertions(+), 20 deletions(-) diff --git a/execution_engine2.spec b/execution_engine2.spec index 37a62df09..6d10d49cf 100644 --- a/execution_engine2.spec +++ b/execution_engine2.spec @@ -268,7 +268,7 @@ Requires the user to keep track of the job states of the Status enum in the ee2 models file If no status_list is provided, an exception is thrown. */ - funcdef retry_batch_jobs(BatchRetry params) returns (list retry_result) authentication required; + funcdef retry_batch_jobs(BatchRetryParams params) returns (list retry_result) authentication required; funcdef abandon_children(AbandonChildren params) @@ -609,11 +609,14 @@ /* batch_job_id: BATCH_ID to cancel status_filter: optional filter of either 'terminated' or 'error'. Not setting this results in cancel of both + terminated_code: optional terminated code, default to terminated by user as_admin: retry someone else's job in your namespace + @optional terminated_code */ typedef structure { job_id batch_job_id; list status_filter; + int terminated_code; boolean as_admin; } BatchCancelParams; @@ -623,9 +626,7 @@ (Requires the user to keep track of the job states of the Status enum in the ee2 models file) If no status_filter is provided, an exception is thrown. */ - funcdef cancel_batch_job(BatchCancelParams params) returns () authentication required; - - + funcdef cancel_batch_job(BatchCancelParams params) returns (list job_ids) authentication required; /* job_id - id of job running method diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index 349b066bc..533441bce 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -223,6 +223,20 @@ def get_job(self, job_id=None, exclude_fields=None) -> Job: return job + def get_jobs_with_status( + self, job_ids: List[str], status_list: List[str], only_job_ids: bool = False + ) -> List[Job]: + if not (job_ids and isinstance(job_ids, list)): + raise ValueError("Please provide a non empty list of job ids") + + if not (status_list and isinstance(job_ids, list)): + raise ValueError("Please provide a non empty list of job statuses") + + with self.mongo_engine_connection(): + if only_job_ids: + return Job.objects(id__in=job_ids, status__in=status_list).only("_id") + return Job.objects(id__in=job_ids, status__in=status_list) + def get_jobs( self, job_ids=None, exclude_fields=None, sort_id_ascending=None ) -> List[Job]: diff --git a/lib/execution_engine2/exceptions.py b/lib/execution_engine2/exceptions.py index 13961697e..b2b6c00d9 100644 --- a/lib/execution_engine2/exceptions.py +++ b/lib/execution_engine2/exceptions.py @@ -18,10 +18,18 @@ def __init__(self, msg=None, *args, **kwargs): super().__init__(msg or self.__doc__, *args, **kwargs) +class InvalidStatusListException(ExecutionEngineValueError): + """Invalid job state status provided""" + + class IncorrectParamsException(ExecutionEngineValueError): """Wrong parameters were provided""" +class NotBatchJobException(ExecutionEngineValueError): + """Requested job is not a batch job""" + + class InvalidParameterForBatch(ExecutionEngineValueError): """Workspace ids are not allowed in RunJobParams in Batch Mode""" diff --git a/lib/execution_engine2/execution_engine2Impl.py b/lib/execution_engine2/execution_engine2Impl.py index 2198d02ec..d88d19712 100644 --- a/lib/execution_engine2/execution_engine2Impl.py +++ b/lib/execution_engine2/execution_engine2Impl.py @@ -30,7 +30,7 @@ class execution_engine2: ######################################### noqa VERSION = "0.0.5" GIT_URL = "git@github.com:kbase/execution_engine2.git" - GIT_COMMIT_HASH = "ac8da691d9012571b51995a68fb826c8ae32e146" + GIT_COMMIT_HASH = "9adfa50d1ea628eb3907038c9993c6a52fe34fc7" #BEGIN_CLASS_HEADER MONGO_COLLECTION = "jobs" @@ -360,7 +360,7 @@ def run_job_batch(self, ctx, params, batch_params): #BEGIN run_job_batch mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), - clients = self.clients, + clients=self.clients, job_permission_cache=self.job_permission_cache, admin_permissions_cache=self.admin_permissions_cache ) @@ -402,7 +402,7 @@ def retry_job(self, ctx, params): #BEGIN retry_job mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), - clients = self.clients, + clients=self.clients, job_permission_cache=self.job_permission_cache, admin_permissions_cache=self.admin_permissions_cache ) @@ -473,6 +473,15 @@ def retry_batch_jobs(self, ctx, params): # ctx is the context object # return variables are: retry_result #BEGIN retry_batch_jobs + mr = SDKMethodRunner( + user_clients=self.gen_cfg.get_user_clients(ctx), + clients=self.clients, + job_permission_cache=self.job_permission_cache, + admin_permissions_cache=self.admin_permissions_cache + ) + retry_result = mr.retry_batch(job_id=params.get('job_id'), + job_status=params.get('status_filter'), + as_admin=params.get('as_admin')) #END retry_batch_jobs # At some point might do deeper type checking... @@ -1682,16 +1691,35 @@ def cancel_batch_job(self, ctx, params): :param params: instance of type "BatchCancelParams" (batch_job_id: BATCH_ID to cancel status_filter: optional filter of either 'terminated' or 'error'. Not setting this results in cancel of - both as_admin: retry someone else's job in your namespace) -> - structure: parameter "batch_job_id" of type "job_id" (A job id.), - parameter "status_filter" of list of type "job_status" (A job - state's job status.), parameter "as_admin" of type "boolean" - (@range [0,1]) + both terminated_code: optional terminated code, default to + terminated by user as_admin: retry someone else's job in your + namespace @optional terminated_code) -> structure: parameter + "batch_job_id" of type "job_id" (A job id.), parameter + "status_filter" of list of type "job_status" (A job state's job + status.), parameter "terminated_code" of Long, parameter + "as_admin" of type "boolean" (@range [0,1]) + :returns: instance of list of type "job_id" (A job id.) """ # ctx is the context object + # return variables are: job_ids #BEGIN cancel_batch_job + mr = SDKMethodRunner( + user_clients=self.gen_cfg.get_user_clients(ctx), + clients=self.clients, + job_permission_cache=self.job_permission_cache, + admin_permissions_cache=self.admin_permissions_cache + ) + returnVal = mr.cancel_batch_job(job_id=params.get('job_id'), + status_list=params.get('status_filter'), + as_admin=params.get('as_admin')) #END cancel_batch_job - pass + + # At some point might do deeper type checking... + if not isinstance(job_ids, list): + raise ValueError('Method cancel_batch_job return value ' + + 'job_ids is not type list as required.') + # return the results + return [job_ids] def check_job_canceled(self, ctx, params): """ diff --git a/lib/execution_engine2/sdk/EE2Runjob.py b/lib/execution_engine2/sdk/EE2Runjob.py index ec6d3952c..8a1e863dd 100644 --- a/lib/execution_engine2/sdk/EE2Runjob.py +++ b/lib/execution_engine2/sdk/EE2Runjob.py @@ -26,6 +26,8 @@ CannotRetryJob, RetryFailureException, InvalidParameterForBatch, + InvalidStatusListException, + NotBatchJobException, ) from execution_engine2.sdk.EE2Constants import CONCIERGE_CLIENTGROUP from execution_engine2.sdk.job_submission_parameters import ( @@ -744,7 +746,7 @@ def _retry(self, job_id: str, job: Job, batch_job: Job, as_admin: bool = False): # to make sure the retried job is correctly submitted? Or save that for a unit test? return {"job_id": job_id, "retry_id": retry_job_id} - def retry(self, job_id: str, as_admin=False) -> Dict[str, Optional[str]]: + def retry(self, job_id: str, as_admin: bool = False) -> Dict[str, Optional[str]]: """ #TODO Add new job requirements/cgroups as an optional param :param job_id: The main job to retry @@ -758,6 +760,37 @@ def retry(self, job_id: str, as_admin=False) -> Dict[str, Optional[str]]: job_id=job_id, job=job, batch_job=batch_job, as_admin=as_admin ) + def retry_batch(self, job_id: str, status_list: list, as_admin: bool = False): + """ + Retry jobs by status given a BATCH_ID + :param job_id: The batch job id to retry jobs for + :param status_list: The state of jobs in that batch to retry + :param as_admin: Retry jobs for others + :return: A list of job ids that are retried from the batch + """ + valid_statuses = [Status.terminated.value, Status.error.value] + # Validation + if not status_list: + raise InvalidStatusListException( + f"Provide a list of status codes from {valid_statuses}." + ) + for status in status_list: + if not self._retryable(status): + raise InvalidStatusListException( + f"Provided status {status} not retryable . Status not in {valid_statuses}" + ) + + batch_job = self.sdkmr.get_job_with_permission( + job_id, JobPermissions.WRITE, as_admin=as_admin + ) + if not batch_job.batch_job: + raise NotBatchJobException(f"{job_id} is not a batch job") + # Retry and Report + retryable_child_job_ids = self.sdkmr.get_mongo_util().get_jobs_with_status( + job_ids=batch_job.child_jobs, status_list=status_list, only_job_ids=True + ) + return self.retry_multiple(job_ids=retryable_child_job_ids) + def retry_multiple( self, job_ids, as_admin=False ) -> List[Dict[str, Union[str, Any]]]: diff --git a/lib/execution_engine2/sdk/EE2Status.py b/lib/execution_engine2/sdk/EE2Status.py index 053cfb77d..24d3a261a 100644 --- a/lib/execution_engine2/sdk/EE2Status.py +++ b/lib/execution_engine2/sdk/EE2Status.py @@ -8,6 +8,8 @@ from execution_engine2.exceptions import ( InvalidStatusTransitionException, ChildrenNotFoundError, + InvalidStatusListException, + NotBatchJobException, ) from execution_engine2.sdk.EE2Constants import JobError from lib.execution_engine2.authorization.authstrategy import can_read_jobs @@ -85,19 +87,79 @@ def handle_held_job(self, cluster_id, as_admin): # There's probably a better way and a return type, but not really sure what I need yet return json.loads(json.dumps(j.to_mongo().to_dict(), default=str)) - def cancel_job(self, job_id, terminated_code=None, as_admin=False): + def cancel_batch_job( + self, job_id, status_list, terminated_code=None, as_admin=False + ): + """ + Terminate jobs by status given a BATCH_ID + :param job_id: A batch job id to terminate child jobs of + :param status_list: A list of statuses to terminate the child jobs in + :param terminated_code: The terminated code, default to TerminatedCode.terminated_by_user.value + :param as_admin: Terminate jobs for others + :return: A list of job ids that were successfully terminated + """ + # Validation + valid_statuses = [ + Status.created.value, + Status.queued.value, + Status.estimating.value, + Status.running.value, + ] + if not status_list: + raise InvalidStatusListException( + f"Provide a list of status codes from {valid_statuses}." + ) + for status in status_list: + if status not in valid_statuses: + raise InvalidStatusListException( + f"Provided status {status} not in {valid_statuses}." + ) + + batch_job = self.sdkmr.get_job_with_permission( + job_id, JobPermissions.WRITE, as_admin=as_admin + ) + if not batch_job.batch_job: + raise NotBatchJobException(f"{job_id} is not a batch job") + + # Termination + terminated_ids = [] + child_jobs = self.sdkmr.get_mongo_util().get_jobs_with_status( + job_ids=batch_job.child_jobs, status_list=status_list + ) + for job in child_jobs: + try: + self.cancel_job( + job=job, terminated_code=terminated_code, as_admin=as_admin + ) + terminated_ids.append(job.id) + except Exception: + # Nothing to report, a job might have finished by now + pass + + # Report + if len(terminated_ids) == 0: + raise Exception(f"{job_id} didn't have any valid child jobs to terminate") + return terminated_ids + + def cancel_job(self, job_id=None, job=None, terminated_code=None, as_admin=False): """ Authorization Required: Ability to Read and Write to the Workspace - Default for terminated code is Terminated By User + Terminates child jobs as well + :param job_id: Job ID To cancel - :param terminated_code: + :param terminated_code: Default Terminated By User :param as_admin: Cancel the job for a different user """ # Is it inefficient to get the job twice? Is it cached? + if (not job_id and not job) or (job_id and job): + raise Exception( + "Programming Error: Need to provide exactly one job id or a job object" + ) - job = self.sdkmr.get_job_with_permission( - job_id, JobPermissions.WRITE, as_admin=as_admin - ) + if job_id: + job = self.sdkmr.get_job_with_permission( + job_id, JobPermissions.WRITE, as_admin=as_admin + ) if terminated_code is None: terminated_code = TerminatedCode.terminated_by_user.value diff --git a/lib/execution_engine2/sdk/SDKMethodRunner.py b/lib/execution_engine2/sdk/SDKMethodRunner.py index 350599960..422694b00 100644 --- a/lib/execution_engine2/sdk/SDKMethodRunner.py +++ b/lib/execution_engine2/sdk/SDKMethodRunner.py @@ -335,6 +335,12 @@ def retry(self, job_id, as_admin=False): """Authorization Required Read/Write""" return self.get_runjob().retry(job_id=job_id, as_admin=as_admin) + def retry_batch(self, job_id, status_list, as_admin=False): + """Authorization Required Read/Write""" + return self.get_runjob().retry_batch( + job_id=job_id, as_admin=as_admin, status_list=status_list + ) + def run_job(self, params, as_admin=False): """Authorization Required Read/Write""" return self.get_runjob().run(params=params, as_admin=as_admin) @@ -394,6 +400,15 @@ def cancel_job(self, job_id, terminated_code=None, as_admin=False): job_id=job_id, terminated_code=terminated_code, as_admin=as_admin ) + def cancel_batch_job(self, job_id, status_list, terminated_code, as_admin=False): + """Authorization Required Read/Write""" + return self.get_jobs_status().cancel_batch_job( + job_id=job_id, + terminated_code=terminated_code, + status_list=status_list, + as_admin=as_admin, + ) + def handle_held_job(self, cluster_id): """Authorization Required Read/Write""" if self.check_as_admin(requested_perm=JobPermissions.WRITE): From 6e35b8d266df7de7694e4afa62e73ff8a1fdc1fa Mon Sep 17 00:00:00 2001 From: bio-boris Date: Thu, 21 Oct 2021 14:17:45 -0500 Subject: [PATCH 07/12] Fix bug and add test --- execution_engine2.spec | 10 +- lib/execution_engine2/db/MongoUtil.py | 10 +- .../execution_engine2Impl.py | 6 +- lib/execution_engine2/sdk/EE2Runjob.py | 6 + lib/execution_engine2/sdk/EE2Status.py | 5 +- ...ee2_SDKMethodRunner_test_EE2Runjob_test.py | 270 ++++++++++++------ 6 files changed, 199 insertions(+), 108 deletions(-) diff --git a/execution_engine2.spec b/execution_engine2.spec index 6d10d49cf..38ed89fb9 100644 --- a/execution_engine2.spec +++ b/execution_engine2.spec @@ -235,13 +235,13 @@ /* batch_job_id: BATCH_ID to retry - status_filter: job states in ['terminated', 'error'] (valid retry states) + status_list: job states in ['terminated', 'error'] (valid retry states) as_admin: retry someone else's job in your namespace #TODO: Possibly Add list job_requirements; */ typedef structure { job_id batch_job_id; - list status_filter; + list status_list; boolean as_admin; } BatchRetryParams; @@ -608,14 +608,14 @@ /* batch_job_id: BATCH_ID to cancel - status_filter: optional filter of either 'terminated' or 'error'. Not setting this results in cancel of both + status_list: optional filter of either 'terminated' or 'error'. Not setting this results in cancel of both terminated_code: optional terminated code, default to terminated by user as_admin: retry someone else's job in your namespace @optional terminated_code */ typedef structure { job_id batch_job_id; - list status_filter; + list status_list; int terminated_code; boolean as_admin; } BatchCancelParams; @@ -624,7 +624,7 @@ Cancels children of a batch job. This results in the status becoming "terminated" with termination_code 0. Valid statuses are ['created', 'estimating', 'queued', 'running'] (Requires the user to keep track of the job states of the Status enum in the ee2 models file) - If no status_filter is provided, an exception is thrown. + If no status_list is provided, an exception is thrown. */ funcdef cancel_batch_job(BatchCancelParams params) returns (list job_ids) authentication required; diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index 533441bce..90c0abdcd 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -4,7 +4,7 @@ import traceback from contextlib import contextmanager from datetime import datetime -from typing import Dict, List +from typing import Dict, List, Union from bson.objectid import ObjectId from mongoengine import connect, connection from pymongo import MongoClient, UpdateOne @@ -225,7 +225,7 @@ def get_job(self, job_id=None, exclude_fields=None) -> Job: def get_jobs_with_status( self, job_ids: List[str], status_list: List[str], only_job_ids: bool = False - ) -> List[Job]: + ) -> Union[List[Job], List[str]]: if not (job_ids and isinstance(job_ids, list)): raise ValueError("Please provide a non empty list of job ids") @@ -233,9 +233,11 @@ def get_jobs_with_status( raise ValueError("Please provide a non empty list of job statuses") with self.mongo_engine_connection(): + # TODO: Only seems to be returning other fields as well. Is .only() broken? + jobs = Job.objects(id__in=job_ids, status__in=status_list) if only_job_ids: - return Job.objects(id__in=job_ids, status__in=status_list).only("_id") - return Job.objects(id__in=job_ids, status__in=status_list) + return [str(job.id) for job in jobs] + return jobs def get_jobs( self, job_ids=None, exclude_fields=None, sort_id_ascending=None diff --git a/lib/execution_engine2/execution_engine2Impl.py b/lib/execution_engine2/execution_engine2Impl.py index bf476e282..76b659ec1 100644 --- a/lib/execution_engine2/execution_engine2Impl.py +++ b/lib/execution_engine2/execution_engine2Impl.py @@ -480,7 +480,7 @@ def retry_batch_jobs(self, ctx, params): admin_permissions_cache=self.admin_permissions_cache ) retry_result = mr.retry_batch(job_id=params.get('job_id'), - job_status=params.get('status_filter'), + status_list=params.get('status_list'), as_admin=params.get('as_admin')) #END retry_batch_jobs @@ -1709,8 +1709,8 @@ def cancel_batch_job(self, ctx, params): job_permission_cache=self.job_permission_cache, admin_permissions_cache=self.admin_permissions_cache ) - returnVal = mr.cancel_batch_job(job_id=params.get('job_id'), - status_list=params.get('status_filter'), + job_ids = mr.cancel_batch_job(job_id=params.get('job_id'), + status_list=params.get('status_list'), as_admin=params.get('as_admin')) #END cancel_batch_job diff --git a/lib/execution_engine2/sdk/EE2Runjob.py b/lib/execution_engine2/sdk/EE2Runjob.py index 8a1e863dd..e548ad034 100644 --- a/lib/execution_engine2/sdk/EE2Runjob.py +++ b/lib/execution_engine2/sdk/EE2Runjob.py @@ -789,6 +789,12 @@ def retry_batch(self, job_id: str, status_list: list, as_admin: bool = False): retryable_child_job_ids = self.sdkmr.get_mongo_util().get_jobs_with_status( job_ids=batch_job.child_jobs, status_list=status_list, only_job_ids=True ) + + if len(retryable_child_job_ids) == 0: + raise RetryFailureException( + f"No retryable jobs found with a state of {status_list}" + ) + return self.retry_multiple(job_ids=retryable_child_job_ids) def retry_multiple( diff --git a/lib/execution_engine2/sdk/EE2Status.py b/lib/execution_engine2/sdk/EE2Status.py index 24d3a261a..31563385a 100644 --- a/lib/execution_engine2/sdk/EE2Status.py +++ b/lib/execution_engine2/sdk/EE2Status.py @@ -107,7 +107,7 @@ def cancel_batch_job( ] if not status_list: raise InvalidStatusListException( - f"Provide a list of status codes from {valid_statuses}." + f"Provide a list of valid job statuses from {valid_statuses}." ) for status in status_list: if status not in valid_statuses: @@ -146,6 +146,7 @@ def cancel_job(self, job_id=None, job=None, terminated_code=None, as_admin=False Authorization Required: Ability to Read and Write to the Workspace Terminates child jobs as well + :param job: Job Object to cancel :param job_id: Job ID To cancel :param terminated_code: Default Terminated By User :param as_admin: Cancel the job for a different user @@ -153,7 +154,7 @@ def cancel_job(self, job_id=None, job=None, terminated_code=None, as_admin=False # Is it inefficient to get the job twice? Is it cached? if (not job_id and not job) or (job_id and job): raise Exception( - "Programming Error: Need to provide exactly one job id or a job object" + "Programming Error: Need to provide exactly one job id or a job object" ) if job_id: diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py index af441a81d..680e1eb1f 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py @@ -11,8 +11,12 @@ from execution_engine2.exceptions import ( CannotRetryJob, - RetryFailureException, InvalidParameterForBatch, + ExecutionEngineException, + InvalidStatusTransitionException, + InvalidStatusListException, + NotBatchJobException, + RetryFailureException, ) from execution_engine2.sdk.job_submission_parameters import JobRequirements from execution_engine2.utils.clients import ( @@ -541,20 +545,7 @@ def test_retry_job_with_params_and_nci_and_src_ws_objs(self, rq_mock, condor_moc # TODO Retry a job without an app_id # TODO Check narrative_cell_info - @requests_mock.Mocker() - @patch("lib.execution_engine2.utils.Condor.Condor", autospec=True) - def test_run_job_batch(self, rq_mock, condor_mock): - """ - Test running batch jobs - """ - rq_mock.add_matcher( - run_job_adapter( - ws_perms_info={"user_id": self.user_id, "ws_perms": {self.ws_id: "a"}} - ) - ) - runner = self.getRunner() - runner.get_condor = MagicMock(return_value=condor_mock) - + def get_batch_jobs(self): job = get_example_job_as_dict( user=self.user_id, wsid=None, source_ws_objects=[] ) @@ -564,95 +555,186 @@ def test_run_job_batch(self, rq_mock, condor_mock): job3 = get_example_job_as_dict( user=self.user_id, wsid=None, source_ws_objects=[] ) - si = SubmissionInfo(clusterid="test", submit=job, error=None) - condor_mock.run_job = MagicMock(return_value=si) jobs = [job, job2, job3] - job_ids = runner.run_job_batch( - params=copy.deepcopy(jobs), batch_params={"wsid": self.ws_id} - ) - - for job in runner.check_jobs( - job_ids=job_ids["child_job_ids"] + [job_ids["batch_id"]] - )["job_states"]: - assert job.get("wsid") == self.ws_id - # Job input is forced to assume the batch wsid - if job["job_id"] != job_ids["batch_id"]: - assert job.get("job_input", {}).get("wsid") == self.ws_id - - assert "batch_id" in job_ids and isinstance(job_ids["batch_id"], str) - assert "child_job_ids" in job_ids and isinstance(job_ids["child_job_ids"], list) - assert len(job_ids["child_job_ids"]) == len(jobs) - - with self.assertRaises(InvalidParameterForBatch): - job_good = get_example_job_as_dict( - user=self.user_id, wsid=None, source_ws_objects=[] - ) - job_bad = ( - get_example_job(user=self.user_id, wsid=self.ws_id).to_mongo().to_dict() - ) - jobs = [job_good, job_bad] - runner.run_job_batch(params=jobs, batch_params={"wsid": self.ws_id}) - - # Test that you can't run a job in someone elses workspace - no_perms_ws = 111970 - with self.assertRaises(PermissionError): - job_good = get_example_job_as_dict( - user=self.user_id, wsid=None, source_ws_objects=[] - ) - job_bad = get_example_job(user=self.user_id, wsid=None).to_mongo().to_dict() - jobs = [job_good, job_bad] - runner.run_job_batch(params=jobs, batch_params={"wsid": no_perms_ws}) - - # Check wsids - batch_id = job_ids["batch_id"] - child_job_id = job_ids["child_job_ids"][0] - - # Squeeze in a retry test here - runner.update_job_status(job_id=child_job_id, status=Status.terminated.value) - batch_job = runner.check_job(job_id=batch_id) - assert len(batch_job["child_jobs"]) == 3 - - retry_result = runner.retry(job_id=child_job_id) - retry_id = retry_result["retry_id"] - self.check_retry_job_state(child_job_id, retry_id) - batch_job = runner.check_job(job_id=batch_id) - assert len(batch_job["child_jobs"]) == 4 - assert batch_job["child_jobs"][-1] == retry_id - - job = runner.check_job(job_id=child_job_id) - retry_count = job["retry_count"] - - # Test to see if one input fails, so keep going - results = runner.retry_multiple(job_ids=[child_job_id, "grail", "fail"]) - assert results[0]["job_id"] == child_job_id - assert "error" in results[1] - assert "error" in results[2] - - # Check to see child_job_id was retried - assert retry_count + 1 == runner.check_job(job_id=child_job_id)["retry_count"] - - # Test for duplicates - with self.assertRaises(expected_exception=ValueError) as e: - runner.retry_multiple(job_ids=[1, 2, 2]) - assert ( - e.exception.args[0] - == "Retry of the same id in the same request is not supported. Offending ids: [2] " - ) + return jobs @requests_mock.Mocker() @patch("lib.execution_engine2.utils.Condor.Condor", autospec=True) - def test_run_job_fail(self, rq_mock, condor_mock): + def test_retry_batch(self, rq_mock, condor_mock): rq_mock.add_matcher( run_job_adapter( ws_perms_info={"user_id": self.user_id, "ws_perms": {self.ws_id: "a"}} ) ) runner = self.getRunner() + runner.get_condor = MagicMock(return_value=condor_mock) + jobs = self.get_batch_jobs() + condor_mock.run_job = MagicMock( + return_value=SubmissionInfo(clusterid="test", submit=jobs[0], error=None) + ) + job_ids = runner.run_job_batch( + params=copy.deepcopy(jobs), batch_params={"wsid": self.ws_id} + ) + batch_job_id = job_ids["batch_id"] + child_jobs = job_ids["child_job_ids"] + + # TEST OUT ERROR CONDITIONS + for status_list in [ + None, + [], + ]: + expected_exception = InvalidStatusListException + with self.assertRaises(expected_exception) as e: + runner.retry_batch(job_id=batch_job_id, status_list=status_list) + assert ( + e.exception.args[0] + == "Provide a list of status codes from ['terminated', 'error']." + ) - job = get_example_job_as_dict(user=self.user_id, wsid=self.ws_id) + for status_list in [["running"], ["apple"], [None]]: + status = status_list[0] + expected_exception = InvalidStatusListException + with self.assertRaises(expected_exception) as e: + runner.retry_batch(job_id=batch_job_id, status_list=status_list) + assert ( + e.exception.args[0] + == f"Provided status {status} not retryable . Status not in ['terminated', 'error']" + ) - si = SubmissionInfo(clusterid="test", submit=job, error=None) - condor_mock.run_job = MagicMock(return_value=si) + with self.assertRaises(NotBatchJobException) as e: + runner.retry_batch(job_id=child_jobs[0], status_list=["error"]) + assert e.exception.args[0] == f"{child_jobs[0]} is not a batch job" - with self.assertRaises(expected_exception=RuntimeError): - runner.run_job(params=job) + for status_list in [["terminated"], ["terminated", "error"], ["error"]]: + with self.assertRaises(RetryFailureException) as e: + runner.retry_batch(job_id=batch_job_id, status_list=status_list) + assert ( + e.exception.args[0] + == f"No retryable jobs found with a state of {status_list}" + ) + + for job_id in child_jobs: + runner.update_job_status(job_id=job_id, status="error") + + batch_retry_results = runner.retry_batch( + job_id=batch_job_id, status_list=["error"] + ) + assert len(job_ids["child_job_ids"]) == len(batch_retry_results) + + for brr in batch_retry_results: + parent_id = brr["job_id"] + child_job = brr["retry_id"] + batch_check = runner.check_job(parent_id) + cj_check = runner.check_job(child_job) + assert batch_check["retry_ids"] == [child_job] + assert cj_check["retry_parent"] == parent_id + + +@requests_mock.Mocker() +@patch("lib.execution_engine2.utils.Condor.Condor", autospec=True) +def test_run_job_batch(self, rq_mock, condor_mock): + """ + Test running batch jobs + """ + rq_mock.add_matcher( + run_job_adapter( + ws_perms_info={"user_id": self.user_id, "ws_perms": {self.ws_id: "a"}} + ) + ) + runner = self.getRunner() + runner.get_condor = MagicMock(return_value=condor_mock) + jobs = self.get_batch_jobs() + condor_mock.run_job = MagicMock( + return_value=SubmissionInfo(clusterid="test", submit=jobs[0], error=None) + ) + + job_ids = runner.run_job_batch( + params=copy.deepcopy(jobs), batch_params={"wsid": self.ws_id} + ) + + for job in runner.check_jobs( + job_ids=job_ids["child_job_ids"] + [job_ids["batch_id"]] + )["job_states"]: + assert job.get("wsid") == self.ws_id + # Job input is forced to assume the batch wsid + if job["job_id"] != job_ids["batch_id"]: + assert job.get("job_input", {}).get("wsid") == self.ws_id + + assert "batch_id" in job_ids and isinstance(job_ids["batch_id"], str) + assert "child_job_ids" in job_ids and isinstance(job_ids["child_job_ids"], list) + assert len(job_ids["child_job_ids"]) == len(jobs) + + with self.assertRaises(InvalidParameterForBatch): + job_good = get_example_job_as_dict( + user=self.user_id, wsid=None, source_ws_objects=[] + ) + job_bad = ( + get_example_job(user=self.user_id, wsid=self.ws_id).to_mongo().to_dict() + ) + jobs = [job_good, job_bad] + runner.run_job_batch(params=jobs, batch_params={"wsid": self.ws_id}) + + # Test that you can't run a job in someone elses workspace + no_perms_ws = 111970 + with self.assertRaises(PermissionError): + job_good = get_example_job_as_dict( + user=self.user_id, wsid=None, source_ws_objects=[] + ) + job_bad = get_example_job(user=self.user_id, wsid=None).to_mongo().to_dict() + jobs = [job_good, job_bad] + runner.run_job_batch(params=jobs, batch_params={"wsid": no_perms_ws}) + + # Check wsids + batch_id = job_ids["batch_id"] + child_job_id = job_ids["child_job_ids"][0] + + # Squeeze in a retry test here + runner.update_job_status(job_id=child_job_id, status=Status.terminated.value) + batch_job = runner.check_job(job_id=batch_id) + assert len(batch_job["child_jobs"]) == 3 + + retry_result = runner.retry(job_id=child_job_id) + retry_id = retry_result["retry_id"] + self.check_retry_job_state(child_job_id, retry_id) + batch_job = runner.check_job(job_id=batch_id) + assert len(batch_job["child_jobs"]) == 4 + assert batch_job["child_jobs"][-1] == retry_id + + job = runner.check_job(job_id=child_job_id) + retry_count = job["retry_count"] + + # Test to see if one input fails, so keep going + results = runner.retry_multiple(job_ids=[child_job_id, "grail", "fail"]) + assert results[0]["job_id"] == child_job_id + assert "error" in results[1] + assert "error" in results[2] + + # Check to see child_job_id was retried + assert retry_count + 1 == runner.check_job(job_id=child_job_id)["retry_count"] + + # Test for duplicates + with self.assertRaises(expected_exception=ValueError) as e: + runner.retry_multiple(job_ids=[1, 2, 2]) + assert ( + e.exception.args[0] + == "Retry of the same id in the same request is not supported. Offending ids: [2] " + ) + + +@requests_mock.Mocker() +@patch("lib.execution_engine2.utils.Condor.Condor", autospec=True) +def test_run_job_fail(self, rq_mock, condor_mock): + rq_mock.add_matcher( + run_job_adapter( + ws_perms_info={"user_id": self.user_id, "ws_perms": {self.ws_id: "a"}} + ) + ) + runner = self.getRunner() + + job = get_example_job_as_dict(user=self.user_id, wsid=self.ws_id) + + si = SubmissionInfo(clusterid="test", submit=job, error=None) + condor_mock.run_job = MagicMock(return_value=si) + + with self.assertRaises(expected_exception=RuntimeError): + runner.run_job(params=job) From 586f6a43fe12064ccac057220a69b64d16890c5d Mon Sep 17 00:00:00 2001 From: bio-boris Date: Thu, 21 Oct 2021 18:49:39 -0500 Subject: [PATCH 08/12] Fix case for retries with retries --- lib/execution_engine2/db/MongoUtil.py | 34 ++++++- lib/execution_engine2/db/models/models.py | 4 + lib/execution_engine2/exceptions.py | 4 + lib/execution_engine2/sdk/EE2Runjob.py | 15 +++- lib/execution_engine2/sdk/EE2Status.py | 6 +- ...ee2_SDKMethodRunner_test_EE2Runjob_test.py | 90 +++++++++++++++++-- 6 files changed, 137 insertions(+), 16 deletions(-) diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index 90c0abdcd..afe54baad 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -5,6 +5,7 @@ from contextlib import contextmanager from datetime import datetime from typing import Dict, List, Union + from bson.objectid import ObjectId from mongoengine import connect, connection from pymongo import MongoClient, UpdateOne @@ -15,9 +16,8 @@ RecordNotFoundException, InvalidStatusTransitionException, ) - -from lib.execution_engine2.utils.arg_processing import parse_bool from execution_engine2.sdk.EE2Runjob import JobIdPair +from lib.execution_engine2.utils.arg_processing import parse_bool class MongoUtil: @@ -224,7 +224,11 @@ def get_job(self, job_id=None, exclude_fields=None) -> Job: return job def get_jobs_with_status( - self, job_ids: List[str], status_list: List[str], only_job_ids: bool = False + self, + job_ids: List[str], + status_list: List[str], + only_job_ids: bool = False, + retried_jobs_allowed=True, ) -> Union[List[Job], List[str]]: if not (job_ids and isinstance(job_ids, list)): raise ValueError("Please provide a non empty list of job ids") @@ -234,11 +238,33 @@ def get_jobs_with_status( with self.mongo_engine_connection(): # TODO: Only seems to be returning other fields as well. Is .only() broken? - jobs = Job.objects(id__in=job_ids, status__in=status_list) + if retried_jobs_allowed: + jobs = Job.objects(id__in=job_ids, status__in=status_list) + else: + jobs = Job.objects( + id__in=job_ids, status__in=status_list, retry_parent__exists=False + ) + if only_job_ids: return [str(job.id) for job in jobs] return jobs + def eligible_for_retry(self, job: Job): + """ + Checks the job record to see if it has any retry_ids, + and if those retry_ids do not contain an ineligble job state + :param job: + :return: + """ + if job.retry_ids == []: + return True + eligble_states = [Status.completed.value, Status.error.value] + jobs = self.get_jobs(job_ids=job.retry_ids) + for job in jobs: + if job.status not in eligble_states: + return False + return True + def get_jobs( self, job_ids=None, exclude_fields=None, sort_id_ascending=None ) -> List[Job]: diff --git a/lib/execution_engine2/db/models/models.py b/lib/execution_engine2/db/models/models.py index 99e115412..a14d25460 100644 --- a/lib/execution_engine2/db/models/models.py +++ b/lib/execution_engine2/db/models/models.py @@ -227,6 +227,10 @@ class Status(Enum): A job begins at created, then can either be estimating """ + @classmethod + def get_status_names(cls): + return list(map(lambda x: x.value, cls._member_map_.values())) + created = "created" estimating = "estimating" queued = "queued" diff --git a/lib/execution_engine2/exceptions.py b/lib/execution_engine2/exceptions.py index b2b6c00d9..acab3e094 100644 --- a/lib/execution_engine2/exceptions.py +++ b/lib/execution_engine2/exceptions.py @@ -22,6 +22,10 @@ class InvalidStatusListException(ExecutionEngineValueError): """Invalid job state status provided""" +class BatchTerminationException(ExecutionEngineException): + """No jobs to terminate""" + + class IncorrectParamsException(ExecutionEngineValueError): """Wrong parameters were provided""" diff --git a/lib/execution_engine2/sdk/EE2Runjob.py b/lib/execution_engine2/sdk/EE2Runjob.py index e548ad034..90f73d08b 100644 --- a/lib/execution_engine2/sdk/EE2Runjob.py +++ b/lib/execution_engine2/sdk/EE2Runjob.py @@ -786,9 +786,20 @@ def retry_batch(self, job_id: str, status_list: list, as_admin: bool = False): if not batch_job.batch_job: raise NotBatchJobException(f"{job_id} is not a batch job") # Retry and Report - retryable_child_job_ids = self.sdkmr.get_mongo_util().get_jobs_with_status( - job_ids=batch_job.child_jobs, status_list=status_list, only_job_ids=True + # Get jobs that do NOT have a retry_parent, i.e. only jobs that haven't been retried and retry_parents only + potentially_retryable_child_jobs = ( + self.sdkmr.get_mongo_util().get_jobs_with_status( + job_ids=batch_job.child_jobs, + status_list=status_list, + retried_jobs_allowed=False, + ) ) + # So we don't want to retry jobs that have retry jobs in progress, + # or a retry job that has already been successful + retryable_child_job_ids = [] + for job in potentially_retryable_child_jobs: + if self.sdkmr.get_mongo_util().eligible_for_retry(job=job): + retryable_child_job_ids.append(str(job.id)) if len(retryable_child_job_ids) == 0: raise RetryFailureException( diff --git a/lib/execution_engine2/sdk/EE2Status.py b/lib/execution_engine2/sdk/EE2Status.py index 31563385a..76435a998 100644 --- a/lib/execution_engine2/sdk/EE2Status.py +++ b/lib/execution_engine2/sdk/EE2Status.py @@ -10,6 +10,8 @@ ChildrenNotFoundError, InvalidStatusListException, NotBatchJobException, + CannotRetryJob, + BatchTerminationException, ) from execution_engine2.sdk.EE2Constants import JobError from lib.execution_engine2.authorization.authstrategy import can_read_jobs @@ -138,7 +140,9 @@ def cancel_batch_job( # Report if len(terminated_ids) == 0: - raise Exception(f"{job_id} didn't have any valid child jobs to terminate") + raise BatchTerminationException( + f"{job_id} didn't have any valid child jobs to terminate" + ) return terminated_ids def cancel_job(self, job_id=None, job=None, terminated_code=None, as_admin=False): diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py index 680e1eb1f..d310272ec 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import copy +import itertools import logging import os import unittest @@ -12,11 +13,10 @@ from execution_engine2.exceptions import ( CannotRetryJob, InvalidParameterForBatch, - ExecutionEngineException, - InvalidStatusTransitionException, InvalidStatusListException, NotBatchJobException, RetryFailureException, + BatchTerminationException, ) from execution_engine2.sdk.job_submission_parameters import JobRequirements from execution_engine2.utils.clients import ( @@ -560,7 +560,7 @@ def get_batch_jobs(self): @requests_mock.Mocker() @patch("lib.execution_engine2.utils.Condor.Condor", autospec=True) - def test_retry_batch(self, rq_mock, condor_mock): + def test_retry_batch_and_cancel_batch(self, rq_mock, condor_mock): rq_mock.add_matcher( run_job_adapter( ws_perms_info={"user_id": self.user_id, "ws_perms": {self.ws_id: "a"}} @@ -622,12 +622,84 @@ def test_retry_batch(self, rq_mock, condor_mock): assert len(job_ids["child_job_ids"]) == len(batch_retry_results) for brr in batch_retry_results: - parent_id = brr["job_id"] - child_job = brr["retry_id"] - batch_check = runner.check_job(parent_id) - cj_check = runner.check_job(child_job) - assert batch_check["retry_ids"] == [child_job] - assert cj_check["retry_parent"] == parent_id + job_id = brr["job_id"] + retry_id = brr["retry_id"] + batch_check = runner.check_job(job_id) + cj_check = runner.check_job(retry_id) + assert batch_check["retry_ids"] == [retry_id] + assert cj_check["retry_parent"] == job_id + # For the next test + runner.update_job_status(retry_id, "error") + runner.update_job_status(job_id, "error") + + # Now let's generate more errors + batch_retry_results = runner.retry_batch( + job_id=batch_job_id, status_list=["error"] + ) + assert len(job_ids["child_job_ids"]) == len(batch_retry_results) + + # Now setup tests based on a failed retry and successful retry combo + runner.update_job_status(batch_retry_results[0]["job_id"], "error") + runner.update_job_status(batch_retry_results[0]["retry_id"], "error") + + runner.update_job_status(batch_retry_results[1]["job_id"], "error") + runner.update_job_status(batch_retry_results[1]["retry_id"], "created") + + runner.update_job_status(batch_retry_results[2]["job_id"], "error") + runner.update_job_status(batch_retry_results[2]["retry_id"], "running") + + batch_retry_results = runner.retry_batch( + job_id=batch_job_id, status_list=["error"] + ) + assert 1 == len(batch_retry_results) + + # Cancel Batch job tests + + features = Status.get_status_names() + combos_list = [] + for i in range(1, len(features)): + oc = itertools.combinations(features, i + 1) + for c in oc: + combos_list.append(list(c)) + + possible_errors = [] + for item in [ + Status.error.value, + Status.terminated.value, + Status.completed.value, + ]: + possible_errors.append( + f"Provided status {item} not in ['created', 'queued', 'estimating', 'running']" + ) + + for status_list in combos_list: + failable = False + for item in status_list: + if item in [ + Status.error.value, + Status.terminated.value, + Status.completed.value, + ]: + failable = True + + if status_list and failable: + with self.assertRaises(InvalidStatusListException) as e: + runner.cancel_batch_job( + job_id=batch_job_id, status_list=status_list, terminated_code=0 + ) + assert e.exception.args[0] in [possible_errors] + else: + with self.assertRaises(BatchTerminationException) as e: + runner.cancel_batch_job( + job_id=batch_job_id, status_list=status_list, terminated_code=0 + ) + assert ( + e.exception.args[0] + == f"{batch_job_id} didn't have any valid child jobs to terminate" + ) + + job_status = runner.check_job_batch(batch_id=batch_job_id) + print(job_status) @requests_mock.Mocker() From b4bab186311778ac30ba6484562b4008fa9d55a8 Mon Sep 17 00:00:00 2001 From: bio-boris Date: Thu, 21 Oct 2021 18:56:48 -0500 Subject: [PATCH 09/12] Fix tests indent --- ...ee2_SDKMethodRunner_test_EE2Runjob_test.py | 198 +++++++++--------- 1 file changed, 98 insertions(+), 100 deletions(-) diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py index d310272ec..5c975d6c7 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py @@ -701,112 +701,110 @@ def test_retry_batch_and_cancel_batch(self, rq_mock, condor_mock): job_status = runner.check_job_batch(batch_id=batch_job_id) print(job_status) - -@requests_mock.Mocker() -@patch("lib.execution_engine2.utils.Condor.Condor", autospec=True) -def test_run_job_batch(self, rq_mock, condor_mock): - """ - Test running batch jobs - """ - rq_mock.add_matcher( - run_job_adapter( - ws_perms_info={"user_id": self.user_id, "ws_perms": {self.ws_id: "a"}} + @requests_mock.Mocker() + @patch("lib.execution_engine2.utils.Condor.Condor", autospec=True) + def test_run_job_batch(self, rq_mock, condor_mock): + """ + Test running batch jobs + """ + rq_mock.add_matcher( + run_job_adapter( + ws_perms_info={"user_id": self.user_id, "ws_perms": {self.ws_id: "a"}} + ) ) - ) - runner = self.getRunner() - runner.get_condor = MagicMock(return_value=condor_mock) - jobs = self.get_batch_jobs() - condor_mock.run_job = MagicMock( - return_value=SubmissionInfo(clusterid="test", submit=jobs[0], error=None) - ) - - job_ids = runner.run_job_batch( - params=copy.deepcopy(jobs), batch_params={"wsid": self.ws_id} - ) - - for job in runner.check_jobs( - job_ids=job_ids["child_job_ids"] + [job_ids["batch_id"]] - )["job_states"]: - assert job.get("wsid") == self.ws_id - # Job input is forced to assume the batch wsid - if job["job_id"] != job_ids["batch_id"]: - assert job.get("job_input", {}).get("wsid") == self.ws_id - - assert "batch_id" in job_ids and isinstance(job_ids["batch_id"], str) - assert "child_job_ids" in job_ids and isinstance(job_ids["child_job_ids"], list) - assert len(job_ids["child_job_ids"]) == len(jobs) - - with self.assertRaises(InvalidParameterForBatch): - job_good = get_example_job_as_dict( - user=self.user_id, wsid=None, source_ws_objects=[] + runner = self.getRunner() + runner.get_condor = MagicMock(return_value=condor_mock) + jobs = self.get_batch_jobs() + condor_mock.run_job = MagicMock( + return_value=SubmissionInfo(clusterid="test", submit=jobs[0], error=None) ) - job_bad = ( - get_example_job(user=self.user_id, wsid=self.ws_id).to_mongo().to_dict() + + job_ids = runner.run_job_batch( + params=copy.deepcopy(jobs), batch_params={"wsid": self.ws_id} ) - jobs = [job_good, job_bad] - runner.run_job_batch(params=jobs, batch_params={"wsid": self.ws_id}) - # Test that you can't run a job in someone elses workspace - no_perms_ws = 111970 - with self.assertRaises(PermissionError): - job_good = get_example_job_as_dict( - user=self.user_id, wsid=None, source_ws_objects=[] + for job in runner.check_jobs( + job_ids=job_ids["child_job_ids"] + [job_ids["batch_id"]] + )["job_states"]: + assert job.get("wsid") == self.ws_id + # Job input is forced to assume the batch wsid + if job["job_id"] != job_ids["batch_id"]: + assert job.get("job_input", {}).get("wsid") == self.ws_id + + assert "batch_id" in job_ids and isinstance(job_ids["batch_id"], str) + assert "child_job_ids" in job_ids and isinstance(job_ids["child_job_ids"], list) + assert len(job_ids["child_job_ids"]) == len(jobs) + + with self.assertRaises(InvalidParameterForBatch): + job_good = get_example_job_as_dict( + user=self.user_id, wsid=None, source_ws_objects=[] + ) + job_bad = ( + get_example_job(user=self.user_id, wsid=self.ws_id).to_mongo().to_dict() + ) + jobs = [job_good, job_bad] + runner.run_job_batch(params=jobs, batch_params={"wsid": self.ws_id}) + + # Test that you can't run a job in someone elses workspace + no_perms_ws = 111970 + with self.assertRaises(PermissionError): + job_good = get_example_job_as_dict( + user=self.user_id, wsid=None, source_ws_objects=[] + ) + job_bad = get_example_job(user=self.user_id, wsid=None).to_mongo().to_dict() + jobs = [job_good, job_bad] + runner.run_job_batch(params=jobs, batch_params={"wsid": no_perms_ws}) + + # Check wsids + batch_id = job_ids["batch_id"] + child_job_id = job_ids["child_job_ids"][0] + + # Squeeze in a retry test here + runner.update_job_status(job_id=child_job_id, status=Status.terminated.value) + batch_job = runner.check_job(job_id=batch_id) + assert len(batch_job["child_jobs"]) == 3 + + retry_result = runner.retry(job_id=child_job_id) + retry_id = retry_result["retry_id"] + self.check_retry_job_state(child_job_id, retry_id) + batch_job = runner.check_job(job_id=batch_id) + assert len(batch_job["child_jobs"]) == 4 + assert batch_job["child_jobs"][-1] == retry_id + + job = runner.check_job(job_id=child_job_id) + retry_count = job["retry_count"] + + # Test to see if one input fails, so keep going + results = runner.retry_multiple(job_ids=[child_job_id, "grail", "fail"]) + assert results[0]["job_id"] == child_job_id + assert "error" in results[1] + assert "error" in results[2] + + # Check to see child_job_id was retried + assert retry_count + 1 == runner.check_job(job_id=child_job_id)["retry_count"] + + # Test for duplicates + with self.assertRaises(expected_exception=ValueError) as e: + runner.retry_multiple(job_ids=[1, 2, 2]) + assert ( + e.exception.args[0] + == "Retry of the same id in the same request is not supported. Offending ids: [2] " ) - job_bad = get_example_job(user=self.user_id, wsid=None).to_mongo().to_dict() - jobs = [job_good, job_bad] - runner.run_job_batch(params=jobs, batch_params={"wsid": no_perms_ws}) - - # Check wsids - batch_id = job_ids["batch_id"] - child_job_id = job_ids["child_job_ids"][0] - - # Squeeze in a retry test here - runner.update_job_status(job_id=child_job_id, status=Status.terminated.value) - batch_job = runner.check_job(job_id=batch_id) - assert len(batch_job["child_jobs"]) == 3 - - retry_result = runner.retry(job_id=child_job_id) - retry_id = retry_result["retry_id"] - self.check_retry_job_state(child_job_id, retry_id) - batch_job = runner.check_job(job_id=batch_id) - assert len(batch_job["child_jobs"]) == 4 - assert batch_job["child_jobs"][-1] == retry_id - - job = runner.check_job(job_id=child_job_id) - retry_count = job["retry_count"] - - # Test to see if one input fails, so keep going - results = runner.retry_multiple(job_ids=[child_job_id, "grail", "fail"]) - assert results[0]["job_id"] == child_job_id - assert "error" in results[1] - assert "error" in results[2] - - # Check to see child_job_id was retried - assert retry_count + 1 == runner.check_job(job_id=child_job_id)["retry_count"] - - # Test for duplicates - with self.assertRaises(expected_exception=ValueError) as e: - runner.retry_multiple(job_ids=[1, 2, 2]) - assert ( - e.exception.args[0] - == "Retry of the same id in the same request is not supported. Offending ids: [2] " - ) - - -@requests_mock.Mocker() -@patch("lib.execution_engine2.utils.Condor.Condor", autospec=True) -def test_run_job_fail(self, rq_mock, condor_mock): - rq_mock.add_matcher( - run_job_adapter( - ws_perms_info={"user_id": self.user_id, "ws_perms": {self.ws_id: "a"}} + + @requests_mock.Mocker() + @patch("lib.execution_engine2.utils.Condor.Condor", autospec=True) + def test_run_job_fail(self, rq_mock, condor_mock): + rq_mock.add_matcher( + run_job_adapter( + ws_perms_info={"user_id": self.user_id, "ws_perms": {self.ws_id: "a"}} + ) ) - ) - runner = self.getRunner() + runner = self.getRunner() - job = get_example_job_as_dict(user=self.user_id, wsid=self.ws_id) + job = get_example_job_as_dict(user=self.user_id, wsid=self.ws_id) - si = SubmissionInfo(clusterid="test", submit=job, error=None) - condor_mock.run_job = MagicMock(return_value=si) + si = SubmissionInfo(clusterid="test", submit=job, error=None) + condor_mock.run_job = MagicMock(return_value=si) - with self.assertRaises(expected_exception=RuntimeError): - runner.run_job(params=job) + with self.assertRaises(expected_exception=RuntimeError): + runner.run_job(params=job) From 6f9e6f9593f74a64cd8453c342546e97b269ba14 Mon Sep 17 00:00:00 2001 From: bio-boris Date: Thu, 4 Nov 2021 12:55:05 -0500 Subject: [PATCH 10/12] Fix tests --- execution_engine2.spec | 6 +-- lib/execution_engine2/db/MongoUtil.py | 10 ++-- lib/execution_engine2/exceptions.py | 2 +- lib/execution_engine2/sdk/EE2Runjob.py | 30 +++++++----- lib/execution_engine2/sdk/EE2Status.py | 48 +++++++------------ lib/execution_engine2/sdk/SDKMethodRunner.py | 4 +- ...ee2_SDKMethodRunner_test_EE2Runjob_test.py | 43 ++++++++--------- 7 files changed, 67 insertions(+), 76 deletions(-) diff --git a/execution_engine2.spec b/execution_engine2.spec index 38ed89fb9..6b886b9ff 100644 --- a/execution_engine2.spec +++ b/execution_engine2.spec @@ -268,7 +268,7 @@ Requires the user to keep track of the job states of the Status enum in the ee2 models file If no status_list is provided, an exception is thrown. */ - funcdef retry_batch_jobs(BatchRetryParams params) returns (list retry_result) authentication required; + funcdef retry_batch_jobs_by_status(BatchRetryParams params) returns (list retry_result) authentication required; funcdef abandon_children(AbandonChildren params) @@ -608,7 +608,7 @@ /* batch_job_id: BATCH_ID to cancel - status_list: optional filter of either 'terminated' or 'error'. Not setting this results in cancel of both + status_list: required filter of one or more of [created, estimating, queued, or running] terminated_code: optional terminated code, default to terminated by user as_admin: retry someone else's job in your namespace @optional terminated_code @@ -626,7 +626,7 @@ (Requires the user to keep track of the job states of the Status enum in the ee2 models file) If no status_list is provided, an exception is thrown. */ - funcdef cancel_batch_job(BatchCancelParams params) returns (list job_ids) authentication required; + funcdef cancel_batch_jobs_by_status(BatchCancelParams params) returns (list job_ids) authentication required; /* job_id - id of job running method diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index afe54baad..e7f431c42 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -253,15 +253,15 @@ def eligible_for_retry(self, job: Job): """ Checks the job record to see if it has any retry_ids, and if those retry_ids do not contain an ineligble job state - :param job: - :return: + :param job: Should be a child job of a BATCH job """ - if job.retry_ids == []: + + if not job.retry_ids: return True - eligble_states = [Status.completed.value, Status.error.value] + valid_statuses = [Status.terminated.value, Status.error.value] jobs = self.get_jobs(job_ids=job.retry_ids) for job in jobs: - if job.status not in eligble_states: + if job.status not in valid_statuses: return False return True diff --git a/lib/execution_engine2/exceptions.py b/lib/execution_engine2/exceptions.py index acab3e094..7034455ec 100644 --- a/lib/execution_engine2/exceptions.py +++ b/lib/execution_engine2/exceptions.py @@ -19,7 +19,7 @@ def __init__(self, msg=None, *args, **kwargs): class InvalidStatusListException(ExecutionEngineValueError): - """Invalid job state status provided""" + """Invalid job status provided""" class BatchTerminationException(ExecutionEngineException): diff --git a/lib/execution_engine2/sdk/EE2Runjob.py b/lib/execution_engine2/sdk/EE2Runjob.py index 90f73d08b..e4ab0eee7 100644 --- a/lib/execution_engine2/sdk/EE2Runjob.py +++ b/lib/execution_engine2/sdk/EE2Runjob.py @@ -760,7 +760,9 @@ def retry(self, job_id: str, as_admin: bool = False) -> Dict[str, Optional[str]] job_id=job_id, job=job, batch_job=batch_job, as_admin=as_admin ) - def retry_batch(self, job_id: str, status_list: list, as_admin: bool = False): + def retry_jobs_in_batch_by_status( + self, job_id: str, status_list: list, as_admin: bool = False + ): """ Retry jobs by status given a BATCH_ID :param job_id: The batch job id to retry jobs for @@ -774,11 +776,15 @@ def retry_batch(self, job_id: str, status_list: list, as_admin: bool = False): raise InvalidStatusListException( f"Provide a list of status codes from {valid_statuses}." ) - for status in status_list: - if not self._retryable(status): - raise InvalidStatusListException( - f"Provided status {status} not retryable . Status not in {valid_statuses}" - ) + + invalid_statuses = [ + status for status in status_list if not self._retryable(status) + ] + if len(invalid_statuses): + raise InvalidStatusListException( + f"Provided status list contains {invalid_statuses}, which are not retryable. " + + f"Status not in {valid_statuses}" + ) batch_job = self.sdkmr.get_job_with_permission( job_id, JobPermissions.WRITE, as_admin=as_admin @@ -786,7 +792,9 @@ def retry_batch(self, job_id: str, status_list: list, as_admin: bool = False): if not batch_job.batch_job: raise NotBatchJobException(f"{job_id} is not a batch job") # Retry and Report - # Get jobs that do NOT have a retry_parent, i.e. only jobs that haven't been retried and retry_parents only + # Get jobs that + # do NOT have a retry_parent, i.e. only jobs that haven't been retried + # and jobs that have not retried retry_parents only potentially_retryable_child_jobs = ( self.sdkmr.get_mongo_util().get_jobs_with_status( job_ids=batch_job.child_jobs, @@ -797,13 +805,13 @@ def retry_batch(self, job_id: str, status_list: list, as_admin: bool = False): # So we don't want to retry jobs that have retry jobs in progress, # or a retry job that has already been successful retryable_child_job_ids = [] - for job in potentially_retryable_child_jobs: - if self.sdkmr.get_mongo_util().eligible_for_retry(job=job): - retryable_child_job_ids.append(str(job.id)) + for child_job in potentially_retryable_child_jobs: + if self.sdkmr.get_mongo_util().eligible_for_retry(job=child_job): + retryable_child_job_ids.append(str(child_job.id)) if len(retryable_child_job_ids) == 0: raise RetryFailureException( - f"No retryable jobs found with a state of {status_list}" + f"No retryable jobs found with a status in {status_list}" ) return self.retry_multiple(job_ids=retryable_child_job_ids) diff --git a/lib/execution_engine2/sdk/EE2Status.py b/lib/execution_engine2/sdk/EE2Status.py index 76435a998..475d25103 100644 --- a/lib/execution_engine2/sdk/EE2Status.py +++ b/lib/execution_engine2/sdk/EE2Status.py @@ -89,7 +89,7 @@ def handle_held_job(self, cluster_id, as_admin): # There's probably a better way and a return type, but not really sure what I need yet return json.loads(json.dumps(j.to_mongo().to_dict(), default=str)) - def cancel_batch_job( + def cancel_jobs_in_batch_by_status( self, job_id, status_list, terminated_code=None, as_admin=False ): """ @@ -111,11 +111,14 @@ def cancel_batch_job( raise InvalidStatusListException( f"Provide a list of valid job statuses from {valid_statuses}." ) - for status in status_list: - if status not in valid_statuses: - raise InvalidStatusListException( - f"Provided status {status} not in {valid_statuses}." - ) + + found_invalid_statuses = [ + status for status in status_list if status not in valid_statuses + ] + if len(found_invalid_statuses): + raise InvalidStatusListException( + f"Provided status list contains {found_invalid_statuses}, which are not cancelable. Status not in {valid_statuses}" + ) batch_job = self.sdkmr.get_job_with_permission( job_id, JobPermissions.WRITE, as_admin=as_admin @@ -125,31 +128,20 @@ def cancel_batch_job( # Termination terminated_ids = [] - child_jobs = self.sdkmr.get_mongo_util().get_jobs_with_status( - job_ids=batch_job.child_jobs, status_list=status_list + child_job_ids = self.sdkmr.get_mongo_util().get_jobs_with_status( + job_ids=batch_job.child_jobs, status_list=status_list, only_job_ids=False ) - for job in child_jobs: - try: - self.cancel_job( - job=job, terminated_code=terminated_code, as_admin=as_admin - ) - terminated_ids.append(job.id) - except Exception: - # Nothing to report, a job might have finished by now - pass - - # Report - if len(terminated_ids) == 0: - raise BatchTerminationException( - f"{job_id} didn't have any valid child jobs to terminate" - ) + for job in child_job_ids: + self.cancel_job(job=job, terminated_code=terminated_code, as_admin=as_admin) + terminated_ids.append(str(job.id)) + return terminated_ids def cancel_job(self, job_id=None, job=None, terminated_code=None, as_admin=False): """ Authorization Required: Ability to Read and Write to the Workspace Terminates child jobs as well - + Need to provide exactly one job id or a Job :param job: Job Object to cancel :param job_id: Job ID To cancel :param terminated_code: Default Terminated By User @@ -170,7 +162,7 @@ def cancel_job(self, job_id=None, job=None, terminated_code=None, as_admin=False terminated_code = TerminatedCode.terminated_by_user.value self.sdkmr.get_mongo_util().cancel_job( - job_id=job_id, terminated_code=terminated_code + job_id=job.id, terminated_code=terminated_code ) for child_job_id in job.child_jobs: self.cancel_job( @@ -178,12 +170,6 @@ def cancel_job(self, job_id=None, job=None, terminated_code=None, as_admin=False terminated_code=TerminatedCode.terminated_by_batch_abort.value, ) - for child_job_id in job.child_jobs: - self.cancel_job( - job_id=child_job_id, - terminated_code=TerminatedCode.terminated_by_batch_abort.value, - ) - self.sdkmr.logger.debug( f"About to cancel job in CONDOR using jobid {job_id} {job.scheduler_id}" ) diff --git a/lib/execution_engine2/sdk/SDKMethodRunner.py b/lib/execution_engine2/sdk/SDKMethodRunner.py index 422694b00..5fbfc7e2e 100644 --- a/lib/execution_engine2/sdk/SDKMethodRunner.py +++ b/lib/execution_engine2/sdk/SDKMethodRunner.py @@ -337,7 +337,7 @@ def retry(self, job_id, as_admin=False): def retry_batch(self, job_id, status_list, as_admin=False): """Authorization Required Read/Write""" - return self.get_runjob().retry_batch( + return self.get_runjob().retry_jobs_in_batch_by_status( job_id=job_id, as_admin=as_admin, status_list=status_list ) @@ -402,7 +402,7 @@ def cancel_job(self, job_id, terminated_code=None, as_admin=False): def cancel_batch_job(self, job_id, status_list, terminated_code, as_admin=False): """Authorization Required Read/Write""" - return self.get_jobs_status().cancel_batch_job( + return self.get_jobs_status().cancel_jobs_in_batch_by_status( job_id=job_id, terminated_code=terminated_code, status_list=status_list, diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py index 5c975d6c7..e6c99219c 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py @@ -591,14 +591,13 @@ def test_retry_batch_and_cancel_batch(self, rq_mock, condor_mock): == "Provide a list of status codes from ['terminated', 'error']." ) - for status_list in [["running"], ["apple"], [None]]: - status = status_list[0] + for status_list in [["running"], ["apple", "kertuffl"], [None]]: expected_exception = InvalidStatusListException with self.assertRaises(expected_exception) as e: runner.retry_batch(job_id=batch_job_id, status_list=status_list) assert ( e.exception.args[0] - == f"Provided status {status} not retryable . Status not in ['terminated', 'error']" + == f"Provided status list contains {status_list}, which are not retryable. Status not in ['terminated', 'error']" ) with self.assertRaises(NotBatchJobException) as e: @@ -610,7 +609,7 @@ def test_retry_batch_and_cancel_batch(self, rq_mock, condor_mock): runner.retry_batch(job_id=batch_job_id, status_list=status_list) assert ( e.exception.args[0] - == f"No retryable jobs found with a state of {status_list}" + == f"No retryable jobs found with a status in {status_list}" ) for job_id in child_jobs: @@ -662,15 +661,12 @@ def test_retry_batch_and_cancel_batch(self, rq_mock, condor_mock): for c in oc: combos_list.append(list(c)) - possible_errors = [] - for item in [ - Status.error.value, - Status.terminated.value, - Status.completed.value, - ]: - possible_errors.append( - f"Provided status {item} not in ['created', 'queued', 'estimating', 'running']" - ) + valid_statuses = [ + Status.created.value, + Status.queued.value, + Status.estimating.value, + Status.running.value, + ] for status_list in combos_list: failable = False @@ -687,17 +683,18 @@ def test_retry_batch_and_cancel_batch(self, rq_mock, condor_mock): runner.cancel_batch_job( job_id=batch_job_id, status_list=status_list, terminated_code=0 ) - assert e.exception.args[0] in [possible_errors] - else: - with self.assertRaises(BatchTerminationException) as e: - runner.cancel_batch_job( - job_id=batch_job_id, status_list=status_list, terminated_code=0 - ) - assert ( - e.exception.args[0] - == f"{batch_job_id} didn't have any valid child jobs to terminate" - ) + for item in valid_statuses: + try: + del status_list[status_list.index(item)] + except Exception: + pass + + expected = f"Provided status list contains {status_list}, which are not cancelable. Status not in {valid_statuses}" + + assert e.exception.args[0] == expected + + # Assert cancelled jobs? job_status = runner.check_job_batch(batch_id=batch_job_id) print(job_status) From 79432c8b9649e29c2c8aa52e3c97c39a3f67c363 Mon Sep 17 00:00:00 2001 From: bio-boris Date: Thu, 4 Nov 2021 13:34:16 -0500 Subject: [PATCH 11/12] Recompile fix --- lib/execution_engine2/execution_engine2Impl.py | 3 ++- lib/execution_engine2/execution_engine2Server.py | 12 ++++++------ lib/execution_engine2/sdk/EE2Status.py | 2 -- .../ee2_SDKMethodRunner_test_EE2Runjob_test.py | 1 - 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/lib/execution_engine2/execution_engine2Impl.py b/lib/execution_engine2/execution_engine2Impl.py index 76b659ec1..92f76df2e 100644 --- a/lib/execution_engine2/execution_engine2Impl.py +++ b/lib/execution_engine2/execution_engine2Impl.py @@ -1711,7 +1711,8 @@ def cancel_batch_job(self, ctx, params): ) job_ids = mr.cancel_batch_job(job_id=params.get('job_id'), status_list=params.get('status_list'), - as_admin=params.get('as_admin')) + as_admin=params.get('as_admin'), + terminated_code=params.get('terminated_code')) #END cancel_batch_job # At some point might do deeper type checking... diff --git a/lib/execution_engine2/execution_engine2Server.py b/lib/execution_engine2/execution_engine2Server.py index 2bdbb8564..3ca72d3cc 100644 --- a/lib/execution_engine2/execution_engine2Server.py +++ b/lib/execution_engine2/execution_engine2Server.py @@ -413,12 +413,12 @@ def __init__(self): ) self.method_authentication["execution_engine2.retry_jobs"] = "required" # noqa self.rpc_service.add( - impl_execution_engine2.retry_batch_jobs, - name="execution_engine2.retry_batch_jobs", + impl_execution_engine2.retry_batch_jobs_by_status, + name="execution_engine2.retry_batch_jobs_by_status", types=[dict], ) self.method_authentication[ - "execution_engine2.retry_batch_jobs" + "execution_engine2.retry_batch_jobs_by_status" ] = "required" # noqa self.rpc_service.add( impl_execution_engine2.abandon_children, @@ -515,12 +515,12 @@ def __init__(self): ) self.method_authentication["execution_engine2.cancel_job"] = "required" # noqa self.rpc_service.add( - impl_execution_engine2.cancel_batch_job, - name="execution_engine2.cancel_batch_job", + impl_execution_engine2.cancel_batch_jobs_by_status, + name="execution_engine2.cancel_batch_jobs_by_status", types=[dict], ) self.method_authentication[ - "execution_engine2.cancel_batch_job" + "execution_engine2.cancel_batch_jobs_by_status" ] = "required" # noqa self.rpc_service.add( impl_execution_engine2.check_job_canceled, diff --git a/lib/execution_engine2/sdk/EE2Status.py b/lib/execution_engine2/sdk/EE2Status.py index 475d25103..9dcdf61a8 100644 --- a/lib/execution_engine2/sdk/EE2Status.py +++ b/lib/execution_engine2/sdk/EE2Status.py @@ -10,8 +10,6 @@ ChildrenNotFoundError, InvalidStatusListException, NotBatchJobException, - CannotRetryJob, - BatchTerminationException, ) from execution_engine2.sdk.EE2Constants import JobError from lib.execution_engine2.authorization.authstrategy import can_read_jobs diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py index e6c99219c..fdfb65f61 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py @@ -16,7 +16,6 @@ InvalidStatusListException, NotBatchJobException, RetryFailureException, - BatchTerminationException, ) from execution_engine2.sdk.job_submission_parameters import JobRequirements from execution_engine2.utils.clients import ( From 4f0c4ee1f4c11c43769763164e26b311ffa2dd68 Mon Sep 17 00:00:00 2001 From: bio-boris Date: Thu, 4 Nov 2021 13:56:15 -0500 Subject: [PATCH 12/12] Fix impl --- .../execution_engine2Impl.py | 61 +++++++++---------- 1 file changed, 28 insertions(+), 33 deletions(-) diff --git a/lib/execution_engine2/execution_engine2Impl.py b/lib/execution_engine2/execution_engine2Impl.py index 92f76df2e..905dde121 100644 --- a/lib/execution_engine2/execution_engine2Impl.py +++ b/lib/execution_engine2/execution_engine2Impl.py @@ -30,7 +30,7 @@ class execution_engine2: ######################################### noqa VERSION = "0.0.5" GIT_URL = "git@github.com:kbase/execution_engine2.git" - GIT_COMMIT_HASH = "9adfa50d1ea628eb3907038c9993c6a52fe34fc7" + GIT_COMMIT_HASH = "6f9e6f9593f74a64cd8453c342546e97b269ba14" #BEGIN_CLASS_HEADER MONGO_COLLECTION = "jobs" @@ -450,17 +450,17 @@ def retry_jobs(self, ctx, params): # return the results return [retry_result] - def retry_batch_jobs(self, ctx, params): + def retry_batch_jobs_by_status(self, ctx, params): """ Retry a job based on a batch id with a job_state status list ['error', 'terminated'] Requires the user to keep track of the job states of the Status enum in the ee2 models file If no status_list is provided, an exception is thrown. :param params: instance of type "BatchRetryParams" (batch_job_id: - BATCH_ID to retry status_filter: job states in ['terminated', + BATCH_ID to retry status_list: job states in ['terminated', 'error'] (valid retry states) as_admin: retry someone else's job in your namespace #TODO: Possibly Add list job_requirements;) -> structure: parameter "batch_job_id" of type - "job_id" (A job id.), parameter "status_filter" of list of type + "job_id" (A job id.), parameter "status_list" of list of type "job_status" (A job state's job status.), parameter "as_admin" of type "boolean" (@range [0,1]) :returns: instance of list of type "RetryResult" (job_id of retried @@ -472,21 +472,18 @@ def retry_batch_jobs(self, ctx, params): """ # ctx is the context object # return variables are: retry_result - #BEGIN retry_batch_jobs - mr = SDKMethodRunner( - user_clients=self.gen_cfg.get_user_clients(ctx), - clients=self.clients, - job_permission_cache=self.job_permission_cache, - admin_permissions_cache=self.admin_permissions_cache - ) + #BEGIN retry_batch_jobs_by_status + mr = SDKMethodRunner(user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, + job_permission_cache=self.job_permission_cache, + admin_permissions_cache=self.admin_permissions_cache) retry_result = mr.retry_batch(job_id=params.get('job_id'), status_list=params.get('status_list'), as_admin=params.get('as_admin')) - #END retry_batch_jobs + #END retry_batch_jobs_by_status # At some point might do deeper type checking... if not isinstance(retry_result, list): - raise ValueError('Method retry_batch_jobs return value ' + + raise ValueError('Method retry_batch_jobs_by_status return value ' + 'retry_result is not type list as required.') # return the results return [retry_result] @@ -1682,42 +1679,40 @@ def cancel_job(self, ctx, params): #END cancel_job pass - def cancel_batch_job(self, ctx, params): + def cancel_batch_jobs_by_status(self, ctx, params): """ Cancels children of a batch job. This results in the status becoming "terminated" with termination_code 0. Valid statuses are ['created', 'estimating', 'queued', 'running'] (Requires the user to keep track of the job states of the Status enum in the ee2 models file) - If no status_filter is provided, an exception is thrown. + If no status_list is provided, an exception is thrown. :param params: instance of type "BatchCancelParams" (batch_job_id: - BATCH_ID to cancel status_filter: optional filter of either - 'terminated' or 'error'. Not setting this results in cancel of - both terminated_code: optional terminated code, default to - terminated by user as_admin: retry someone else's job in your - namespace @optional terminated_code) -> structure: parameter - "batch_job_id" of type "job_id" (A job id.), parameter - "status_filter" of list of type "job_status" (A job state's job - status.), parameter "terminated_code" of Long, parameter - "as_admin" of type "boolean" (@range [0,1]) + BATCH_ID to cancel status_list: required filter of one or more of + [created, estimating, queued, or running] terminated_code: + optional terminated code, default to terminated by user as_admin: + retry someone else's job in your namespace @optional + terminated_code) -> structure: parameter "batch_job_id" of type + "job_id" (A job id.), parameter "status_list" of list of type + "job_status" (A job state's job status.), parameter + "terminated_code" of Long, parameter "as_admin" of type "boolean" + (@range [0,1]) :returns: instance of list of type "job_id" (A job id.) """ # ctx is the context object # return variables are: job_ids - #BEGIN cancel_batch_job - mr = SDKMethodRunner( - user_clients=self.gen_cfg.get_user_clients(ctx), - clients=self.clients, - job_permission_cache=self.job_permission_cache, - admin_permissions_cache=self.admin_permissions_cache - ) + #BEGIN cancel_batch_jobs_by_status + mr = SDKMethodRunner(user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, + job_permission_cache=self.job_permission_cache, + admin_permissions_cache=self.admin_permissions_cache) + job_ids = mr.cancel_batch_job(job_id=params.get('job_id'), status_list=params.get('status_list'), as_admin=params.get('as_admin'), terminated_code=params.get('terminated_code')) - #END cancel_batch_job + #END cancel_batch_jobs_by_status # At some point might do deeper type checking... if not isinstance(job_ids, list): - raise ValueError('Method cancel_batch_job return value ' + + raise ValueError('Method cancel_batch_jobs_by_status return value ' + 'job_ids is not type list as required.') # return the results return [job_ids]