diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index 1499cf267..0845cd84b 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -3,18 +3,20 @@ import time import traceback from contextlib import contextmanager -from typing import Dict +from datetime import datetime +from typing import Dict, List from bson.objectid import ObjectId from mongoengine import connect, connection -from pymongo import MongoClient +from pymongo import MongoClient, UpdateOne from pymongo.errors import ServerSelectionTimeoutError -from lib.execution_engine2.db.models.models import JobLog, Job, Status, TerminatedCode -from lib.execution_engine2.exceptions import ( +from execution_engine2.db.models.models import JobLog, Job, Status, TerminatedCode +from execution_engine2.exceptions import ( RecordNotFoundException, InvalidStatusTransitionException, ) +from execution_engine2.sdk.EE2Runjob import JobIdPair class MongoUtil: @@ -216,7 +218,9 @@ def get_job(self, job_id=None, exclude_fields=None) -> Job: return job - def get_jobs(self, job_ids=None, exclude_fields=None, sort_id_ascending=None): + def get_jobs( + self, job_ids=None, exclude_fields=None, sort_id_ascending=None + ) -> List[Job]: if not (job_ids and isinstance(job_ids, list)): raise ValueError("Please provide a non empty list of job ids") @@ -263,6 +267,68 @@ def check_if_already_finished(job_status): return True return False + def update_jobs_to_queued( + self, job_id_pairs: List[JobIdPair], scheduler_type: str = "condor" + ) -> None: + f""" + * Adds scheduler id to list of jobs + * Updates a list of {Status.created.value} jobs to queued. Does not work on jobs that already have gone through any other + status transition. If the record is not in the {Status.created.value} status, nothing will happen + :param job_id_pairs: A list of pairs of Job Ids and Scheduler Ids + :param scheduler_type: The scheduler this job was queued in, default condor + """ + + bulk_update_scheduler_jobs = [] + bulk_update_created_to_queued = [] + queue_time_now = datetime.utcnow().timestamp() + for job_id_pair in job_id_pairs: + if job_id_pair.job_id is None: + raise ValueError( + f"Provided a bad job_id_pair, missing job_id for {job_id_pair.scheduler_id}" + ) + elif job_id_pair.scheduler_id is None: + raise ValueError( + f"Provided a bad job_id_pair, missing scheduler_id for {job_id_pair.job_id}" + ) + + bulk_update_scheduler_jobs.append( + UpdateOne( + { + "_id": ObjectId(job_id_pair.job_id), + }, + { + "$set": { + "scheduler_id": job_id_pair.scheduler_id, + "scheduler_type": scheduler_type, + } + }, + ) + ) + bulk_update_created_to_queued.append( + UpdateOne( + { + "_id": ObjectId(job_id_pair.job_id), + "status": Status.created.value, + }, + { + "$set": { + "status": Status.queued.value, + "queued": queue_time_now, + } + }, + ) + ) + # Update provided jobs with scheduler id. Then only update non terminated jobs into updated status. + mongo_collection = self.config["mongo-jobs-collection"] + + if bulk_update_scheduler_jobs: + with self.pymongo_client(mongo_collection) as pymongo_client: + ee2_jobs_col = pymongo_client[self.mongo_database][mongo_collection] + # Bulk Update to add scheduler ids + ee2_jobs_col.bulk_write(bulk_update_scheduler_jobs, ordered=False) + # Bulk Update to add queued status ids + ee2_jobs_col.bulk_write(bulk_update_created_to_queued, ordered=False) + def cancel_job(self, job_id=None, terminated_code=None): """ #TODO Should we check for a valid state transition here also? @@ -420,6 +486,18 @@ def update_job_status(self, job_id, status, msg=None, error_message=None): def mongo_engine_connection(self): yield self.me_connection + def insert_jobs(self, jobs_to_insert: List[Job]) -> List[ObjectId]: + """ + Insert multiple job records using MongoEngine + :param jobs_to_insert: Multiple jobs to insert at once + :return: List of job ids from the insertion + """ + # TODO Look at pymongo write_concerns that may be useful + # TODO see if pymongo is faster + # TODO: Think about error handling + inserted = Job.objects.insert(doc_or_docs=jobs_to_insert, load_bulk=False) + return inserted + def insert_one(self, doc): """ insert a doc into collection diff --git a/lib/execution_engine2/sdk/EE2Logs.py b/lib/execution_engine2/sdk/EE2Logs.py index d96acb0b2..be04acd78 100644 --- a/lib/execution_engine2/sdk/EE2Logs.py +++ b/lib/execution_engine2/sdk/EE2Logs.py @@ -1,8 +1,8 @@ from enum import Enum from typing import Dict, NamedTuple -from lib.execution_engine2.db.models.models import JobLog as JLModel, LogLines -from lib.execution_engine2.exceptions import RecordNotFoundException +from execution_engine2.db.models.models import JobLog as JLModel, LogLines +from execution_engine2.exceptions import RecordNotFoundException # if TYPE_CHECKING: @@ -104,7 +104,6 @@ def add_job_logs(self, job_id, log_lines, as_admin=False) -> AddLogResult: self.sdkmr.get_job_with_permission( job_id, JobPermissions.WRITE, as_admin=as_admin ) - self.sdkmr.logger.debug(f"About to add logs for {job_id}") try: try: job_log = self.sdkmr.mongo_util.get_job_log_pymongo(job_id) diff --git a/lib/execution_engine2/sdk/EE2Runjob.py b/lib/execution_engine2/sdk/EE2Runjob.py index 0e57e3995..caaa7cdbd 100644 --- a/lib/execution_engine2/sdk/EE2Runjob.py +++ b/lib/execution_engine2/sdk/EE2Runjob.py @@ -74,6 +74,11 @@ class PreparedJobParams(NamedTuple): job_id: str +class JobIdPair(NamedTuple): + job_id: str + scheduler_id: str + + from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -87,10 +92,8 @@ def __init__(self, sdkmr): self.logger = self.sdkmr.get_logger() def _init_job_rec( - self, - user_id: str, - params: Dict, - ) -> str: + self, user_id: str, params: Dict, save: bool = True + ) -> Union[str, Job]: f""" Save an initial job record to the db and send a message to kafka @@ -163,11 +166,13 @@ def _init_job_rec( job.retry_parent = str(parent_retry_job_id) job.batch_id = str(params.get(_BATCH_ID)) if params.get(_BATCH_ID) else None - job_id = self.sdkmr.save_job(job) - self.sdkmr.get_kafka_client().send_kafka_message( - message=KafkaCreateJob(job_id=job_id, user=user_id) - ) - return job_id + if save: + job_id = self.sdkmr.save_job(job) + self.sdkmr.get_kafka_client().send_kafka_message( + message=KafkaCreateJob(job_id=job_id, user=user_id) + ) + return job_id + return job def _check_ws_objects(self, source_objects) -> None: """ @@ -234,17 +239,7 @@ def _finish_created_job( error=f"{exception}", ) - def _prepare_to_run(self, params, concierge_params=None) -> JobSubmissionParameters: - """ - Creates a job record and creates the job submission params - """ - - job_id = self._init_job_rec(self.sdkmr.get_user_id(), params) - - self.logger.debug( - f"User {self.sdkmr.get_user_id()} attempting to run job {params[_METHOD]} {params}" - ) - + def _generate_job_submission_params(self, job_id, params): return JobSubmissionParameters( job_id, AppInfo(params[_METHOD], params.get(_APP_ID)), @@ -258,6 +253,130 @@ def _prepare_to_run(self, params, concierge_params=None) -> JobSubmissionParamet source_ws_objects=params.get(_SOURCE_WS_OBJECTS), ) + def _prepare_to_run(self, params, concierge_params=None) -> JobSubmissionParameters: + """ + Creates a job record and creates the job submission params + """ + + job_id = self._init_job_rec(self.sdkmr.get_user_id(), params) + self.logger.debug( + f"User {self.sdkmr.get_user_id()} attempting to run job {params[_METHOD]} {params}" + ) + return self._generate_job_submission_params(job_id, params) + + def _run_multiple(self, runjob_params): + """ + Get the job records, bulk save them, then submit to condor. + If any condor submission fails, abort all of the jobs + :return: + """ + # Save records to db + job_records = [] + for runjob_param in runjob_params: + job_records.append( + self._init_job_rec(self.sdkmr.get_user_id(), runjob_param, save=False) + ) + job_ids = self.sdkmr.save_jobs(job_records) + + # Generate job submission params + job_submission_params = [] + for i, job_id in enumerate(job_ids): + job_submission_params.append( + self._generate_job_submission_params(job_id, runjob_params[i]) + ) + assert job_id == job_submission_params[i].job_id + + # Takes 2.5200018882751465 for 100 records, can shave off 2.5 secs by making this async + for job_id in job_ids: + self.sdkmr.get_kafka_client().send_kafka_message( + message=KafkaCreateJob( + job_id=str(job_id), user=self.sdkmr.get_user_id() + ) + ) + # Submit to Condor + try: + submission_ids = self._submit_multiple(job_submission_params) + return submission_ids + except Exception as e: + self._abort_multiple_jobs(job_ids) + raise e + + def _update_to_queued_multiple(self, job_ids, scheduler_ids): + """ + This is called during job submission. If a job is terminated during job submission, + we have the chance to re-issue a termination and remove the job from the Job Queue + """ + if len(job_ids) != len(scheduler_ids): + raise Exception( + "Need to provide the same amount of job ids and scheduler_ids" + ) + jobs_to_update = list(map(JobIdPair, job_ids, scheduler_ids)) + self.sdkmr.get_mongo_util().update_jobs_to_queued(jobs_to_update) + jobs = self.sdkmr.get_mongo_util().get_jobs(job_ids) + + for job in jobs: + job_id = str(job.id) + if job.status == Status.queued.value: + self.sdkmr.get_kafka_client().send_kafka_message( + message=KafkaQueueChange( + job_id=job_id, + new_status=Status.queued.value, + previous_status=Status.created.value, # TODO maybe change this to allow for estimating jobs + scheduler_id=job.scheduler_id, + ) + ) + elif job.status == Status.terminated.value: + # Remove from the queue, now that the scheduler_id is available + # The job record doesn't actually get updated in the db a 2nd time, and this TerminatedCode is only + # used by the initial transition to Terminated + self._safe_cancel(job_id, TerminatedCode.terminated_by_user) + + def _submit_multiple(self, job_submission_params): + """ + Submit multiple jobs. If any of the submissions are a failure, raise exception in order + to fail all submitted jobs, rather than allowing the submissions to continue + """ + begin = time.time() + job_ids = [] + condor_job_ids = [] + for job_submit_param in job_submission_params: + job_id = job_submit_param.job_id + job_ids.append(job_id) + try: + submission_info = self.sdkmr.get_condor().run_job( + params=job_submit_param + ) + condor_job_id = submission_info.clusterid + except Exception as e: + self.logger.error(e) + self._finish_created_job(job_id=job_id, exception=e) + raise e + + if submission_info.error is not None and isinstance( + submission_info.error, Exception + ): + self._finish_created_job(exception=submission_info.error, job_id=job_id) + raise submission_info.error + if condor_job_id is None: + error_msg = ( + "Condor job not run, and error not found. Something went wrong" + ) + self._finish_created_job( + job_id=job_id, exception=RuntimeError(error_msg) + ) + raise RuntimeError(error_msg) + condor_job_ids.append(condor_job_id) + + self.logger.error(f"It took {time.time() - begin} to submit jobs to condor") + # It took 4.836009502410889 to submit jobs to condor + + update_time = time.time() + self._update_to_queued_multiple(job_ids=job_ids, scheduler_ids=condor_job_ids) + # It took 1.9239885807037354 to update jobs + self.logger.error(f"It took {time.time() - update_time} to update jobs ") + + return job_ids + def _run(self, params): job_params = self._prepare_to_run(params=params) job_id = job_params.job_id @@ -265,7 +384,6 @@ def _run(self, params): try: submission_info = self.sdkmr.get_condor().run_job(params=job_params) condor_job_id = submission_info.clusterid - self.logger.debug(f"Submitted job id and got '{condor_job_id}'") except Exception as e: self.logger.error(e) self._finish_created_job(job_id=job_id, exception=e) @@ -285,14 +403,14 @@ def _run(self, params): return job_id - def _abort_child_jobs(self, child_job_ids): + def _abort_multiple_jobs(self, job_ids): """ Cancel a list of child jobs, and their child jobs """ - for child_job_id in child_job_ids: + for job_id in job_ids: try: self.sdkmr.cancel_job( - job_id=child_job_id, + job_id=job_id, terminated_code=TerminatedCode.terminated_by_batch_abort.value, ) except Exception as e: @@ -326,28 +444,26 @@ def _create_batch_job(self, wsid, meta): ) j = self.sdkmr.save_and_return_job(j) - # TODO Do we need a new kafka call? + # TODO Do we need a new kafka call for batch? self.sdkmr.get_kafka_client().send_kafka_message( message=KafkaCreateJob(job_id=str(j.id), user=j.user) ) return j def _run_batch(self, batch_job: Job, params): - child_jobs = [] + """Add the batch id, save the jobs to the db, run the jobs""" for job_param in params: job_param[_BATCH_ID] = str(batch_job.id) - try: - child_jobs.append(str(self._run(params=job_param))) - except Exception as e: - self.logger.debug( - msg=f"Failed to submit child job. Aborting entire batch job {e}" - ) - self._abort_child_jobs(child_jobs) - raise e - batch_job.child_jobs = child_jobs - self.sdkmr.save_job(batch_job) + child_jobs = self._run_multiple(params) + + # Cancel child jobs if we can't notify the batch job of the child jobs + try: + self.sdkmr.add_child_jobs(batch_job=batch_job, child_jobs=child_jobs) + except Exception as e: + self._abort_multiple_jobs(child_jobs) + raise e return child_jobs @@ -377,10 +493,8 @@ def run_batch( new_batch_job=True, as_admin=as_admin, ) - self._add_job_requirements(params, bool(as_admin)) # as_admin checked above self._check_job_arguments(params, batch_job=True) - batch_job = self._create_batch_job(wsid=wsid, meta=meta) children_jobs = self._run_batch(batch_job=batch_job, params=params) @@ -639,6 +753,7 @@ def retry_multiple( """ #TODO Add new job requirements/cgroups as an optional param #TODO Notify the parent container that it has multiple new children, instead of multiple transactions? + #TODO Prevent retry when multiple batch job containers? :param job_ids: The list of jobs to retry :param as_admin: Run with admin permission diff --git a/lib/execution_engine2/sdk/EE2Status.py b/lib/execution_engine2/sdk/EE2Status.py index f26f9ce56..053cfb77d 100644 --- a/lib/execution_engine2/sdk/EE2Status.py +++ b/lib/execution_engine2/sdk/EE2Status.py @@ -94,7 +94,6 @@ def cancel_job(self, job_id, terminated_code=None, as_admin=False): :param as_admin: Cancel the job for a different user """ # Is it inefficient to get the job twice? Is it cached? - # Maybe if the call fails, we don't actually cancel the job? job = self.sdkmr.get_job_with_permission( job_id, JobPermissions.WRITE, as_admin=as_admin @@ -123,6 +122,7 @@ def cancel_job(self, job_id, terminated_code=None, as_admin=False): ) # TODO Issue #190 IF success['TotalSuccess = 0'] == FALSE, don't send a kafka message? + self.sdkmr.get_condor().cancel_job(job_id=f"{job.scheduler_id}.0") self.sdkmr.kafka_client.send_kafka_message( message=KafkaCancelJob( @@ -373,9 +373,9 @@ def finish_job( def _update_finished_job_with_usage(self, job_id, as_admin=None) -> Dict: """ # TODO Does this need a kafka message? - :param job_id: - :param as_admin: - :return: + # TODO EE2 issue #251 : The saved job stats are inaccurate: + # The usage is not recorded until the job is completely finished. + :return: Resources at the time the job almost finished. """ # note this method is replaced by a magic mock in some tests job = self.sdkmr.get_job_with_permission( diff --git a/lib/execution_engine2/sdk/SDKMethodRunner.py b/lib/execution_engine2/sdk/SDKMethodRunner.py index 5091b2fee..350599960 100644 --- a/lib/execution_engine2/sdk/SDKMethodRunner.py +++ b/lib/execution_engine2/sdk/SDKMethodRunner.py @@ -12,13 +12,14 @@ from datetime import datetime from enum import Enum from logging import Logger +from typing import List import dateutil -from lib.execution_engine2.db.MongoUtil import MongoUtil +from execution_engine2.db.MongoUtil import MongoUtil from execution_engine2.db.models.models import Job -from lib.execution_engine2.exceptions import AuthError -from lib.execution_engine2.sdk import ( +from execution_engine2.exceptions import AuthError +from execution_engine2.sdk import ( EE2Runjob, EE2StatusRange, EE2Authentication, @@ -251,6 +252,13 @@ def check_as_concierge(self): # at this point since MongoEngine creates a global connection to MongoDB # and makes it available to all the model objects. + def save_jobs(self, jobs: List[Job]) -> List[str]: + """ + Save multiple jobs to the Mongo DB at once, and return all of the job ids + """ + job_ids = self.get_mongo_util().insert_jobs(jobs_to_insert=jobs) + return [str(job_id) for job_id in job_ids] + def save_job(self, job: Job) -> str: """ Save a job record to the Mongo database and return the job's ID as a string. @@ -258,6 +266,14 @@ def save_job(self, job: Job) -> str: job.save() return str(job.id) + def add_child_jobs(self, batch_job: Job, child_jobs: List[str]): + """ + Add child jobs to a batch job record in the Mongo Database and return the updated job. + :return: + """ + batch_job.modify(add_to_set__child_jobs=child_jobs) + return batch_job + def save_and_return_job(self, job: Job) -> Job: """ Save a job record to the Mongo database and return the updated job. diff --git a/lib/execution_engine2/utils/KafkaUtils.py b/lib/execution_engine2/utils/KafkaUtils.py index ec2b07f7c..afb1a9473 100644 --- a/lib/execution_engine2/utils/KafkaUtils.py +++ b/lib/execution_engine2/utils/KafkaUtils.py @@ -212,11 +212,8 @@ def send_kafka_message(self, message, topic: str = DEFAULT_TOPIC): ) # TODO Remove POLL? producer.poll(2) - logger.debug( - f"Successfully sent message to kafka at topic={topic} message={json.dumps(message.__dict__)} server_address={self.server_address}" - ) except Exception as e: - logger.debug( + logger.error( f"Failed to send message to kafka at topic={topic} message={json.dumps(message.__dict__)} server_address={self.server_address}" ) raise Exception(e) diff --git a/test/tests_for_auth/ee2_admin_mode_test.py b/test/tests_for_auth/ee2_admin_mode_test.py index bbc16c465..15ee0682a 100644 --- a/test/tests_for_auth/ee2_admin_mode_test.py +++ b/test/tests_for_auth/ee2_admin_mode_test.py @@ -162,7 +162,7 @@ def test_regular_user(self): ws_auth.can_write.assert_called_once_with(self.ws_id) # RUNJOB BUT ATTEMPT TO BE AN ADMIN - with self.assertRaisesRegexp( + with self.assertRaisesRegex( expected_exception=PermissionError, expected_regex=lowly_user ): runner.run_job(params=job_params_1, as_admin=True) @@ -172,7 +172,7 @@ def test_regular_user(self): self.assertEqual(params["method"], job_params_1["method"]) # get_job_params BUT ATTEMPT TO BE AN ADMIN - with self.assertRaisesRegexp( + with self.assertRaisesRegex( expected_exception=PermissionError, expected_regex=lowly_user ): runner.get_job_params(job_id=job_id, as_admin=True) @@ -188,12 +188,12 @@ def test_regular_user(self): runner.view_job_logs(job_id=job_id) # add_job_logs and view them, BUT ATTEMPT TO BE AN ADMIN - with self.assertRaisesRegexp( + with self.assertRaisesRegex( expected_exception=PermissionError, expected_regex=lowly_user ): runner.add_job_logs(job_id=job_id, log_lines=lines, as_admin=True) - with self.assertRaisesRegexp( + with self.assertRaisesRegex( expected_exception=PermissionError, expected_regex=lowly_user ): runner.view_job_logs(job_id=job_id, as_admin=True) @@ -288,7 +288,7 @@ def test_no_user(self): method_1 = "module_name.function_name" job_params_1 = get_sample_job_params(method=method_1, wsid=self.ws_id) - with self.assertRaisesRegexp( + with self.assertRaisesRegex( expected_exception=RuntimeError, expected_regex=r"ServerError\('Token validation failed: Login failed! Server responded with code 401 Unauthorized'\)", ): @@ -317,7 +317,7 @@ def test_admin_reader(self): self.assertEqual(admin_type, {"permission": "r"}) # RUNJOB - with self.assertRaisesRegexp( + with self.assertRaisesRegex( expected_exception=PermissionError, expected_regex=lowly_admin ): runner.run_job(params=job_params_1, as_admin=True) diff --git a/test/tests_for_db/ee2_MongoUtil_test.py b/test/tests_for_db/ee2_MongoUtil_test.py index fc0e276bf..9591f16a1 100644 --- a/test/tests_for_db/ee2_MongoUtil_test.py +++ b/test/tests_for_db/ee2_MongoUtil_test.py @@ -2,12 +2,13 @@ import logging import os import unittest -from configparser import ConfigParser +from datetime import datetime from bson.objectid import ObjectId from execution_engine2.db.MongoUtil import MongoUtil -from execution_engine2.db.models.models import Job, JobLog +from execution_engine2.db.models.models import Job, JobLog, Status +from execution_engine2.sdk.EE2Runjob import JobIdPair from test.utils_shared.test_utils import ( bootstrap, get_example_job, @@ -57,6 +58,87 @@ def test_init_ok(self): mongo_util = self.getMongoUtil() self.assertTrue(set(class_attri) <= set(mongo_util.__dict__.keys())) + def test_insert_jobs(self): + """Check to see that jobs are inserted into mongo""" + job = get_example_job(status=Status.created.value) + job2 = get_example_job(status=Status.created.value) + jobs_to_insert = [job, job2] + job_ids = self.getMongoUtil().insert_jobs(jobs_to_insert) + assert len(job_ids) == len(jobs_to_insert) + retrieved_jobs = self.getMongoUtil().get_jobs(job_ids=job_ids) + + for i, retrieved_job in enumerate(retrieved_jobs): + assert jobs_to_insert[i].to_json() == retrieved_job.to_json() + + def test_update_jobs_enmasse(self): + """Check to see that created jobs get updated to queued""" + for state in Status: + job = get_example_job(status=Status.created.value, scheduler_id=None) + job2 = get_example_job(status=state.value, scheduler_id=None) + job3 = get_example_job(status=state.value, scheduler_id=None) + jobs = [job, job2, job3] + + for j in jobs: + j.scheduler_id = None + j.save() + assert j.scheduler_id is None + + job_ids = [job.id, job2.id, job3.id] + scheduler_ids = ["humpty", "dumpty", "alice"] + jobs_to_update = list(map(JobIdPair, job_ids, scheduler_ids)) + + now_ms = datetime.utcnow().timestamp() + + self.getMongoUtil().update_jobs_to_queued(jobs_to_update) + job.reload() + job2.reload() + job3.reload() + + # Check that sched ids are set + for i, val in enumerate(scheduler_ids): + assert jobs[i].scheduler_id == val + assert jobs[i].scheduler_type == "condor" + + # Checks that a timestamp in seconds since the epoch is within a second of the current time. + for j in jobs: + assert now_ms + 1 > j.updated + assert now_ms - 1 < j.updated + + # First job always should transition to queued + assert job.status == Status.queued.value + + # Created jobs should transition + if state.value == Status.created.value: + assert all(j.status == Status.queued.value for j in [job, job2, job3]) + + else: + # Don't change their state + assert all(j.status == state.value for j in [job2, job3]) + + def test_update_jobs_enmasse_bad_job_pairs(self): + job = get_example_job(status=Status.created.value).save() + job2 = get_example_job(status=Status.created.value).save() + job3 = get_example_job(status=Status.created.value).save() + job_ids = [job.id, job2.id, job3.id] + scheduler_ids = [job.scheduler_id, job2.scheduler_id, None] + job_id_pairs = list(map(JobIdPair, job_ids, scheduler_ids)) + + with self.assertRaisesRegex( + expected_exception=ValueError, + expected_regex=f"Provided a bad job_id_pair, missing scheduler_id for {job3.id}", + ): + self.getMongoUtil().update_jobs_to_queued(job_id_pairs) + + job_ids = [job.id, job2.id, None] + scheduler_ids = [job.scheduler_id, job2.scheduler_id, job3.scheduler_id] + job_id_pairs = list(map(JobIdPair, job_ids, scheduler_ids)) + + with self.assertRaisesRegex( + expected_exception=ValueError, + expected_regex=f"Provided a bad job_id_pair, missing job_id for {job3.scheduler_id}", + ): + self.getMongoUtil().update_jobs_to_queued(job_id_pairs) + def test_get_by_cluster(self): """Get a job by its condor scheduler_id""" mongo_util = self.getMongoUtil() @@ -67,7 +149,6 @@ def test_get_by_cluster(self): self.assertEqual(str(job_id), batch) def test_get_job_ok(self): - mongo_util = self.getMongoUtil() with mongo_util.mongo_engine_connection(): @@ -142,7 +223,6 @@ def test_get_job_ok(self): self.assertEqual(ori_job_count, Job.objects.count()) def test_get_jobs_ok(self): - mongo_util = self.getMongoUtil() with mongo_util.mongo_engine_connection(): @@ -199,7 +279,6 @@ def test_get_jobs_ok(self): self.assertEqual(ori_job_count, Job.objects.count()) def test_connection_ok(self): - mongo_util = self.getMongoUtil() with mongo_util.mongo_engine_connection(): @@ -341,7 +420,6 @@ def test_delete_one_ok(self): self.assertEqual(col.count_documents({}), doc_count) def test_get_job_log_pymongo_ok(self): - mongo_util = self.getMongoUtil() primary_key = ObjectId() diff --git a/test/tests_for_sdkmr/EE2Runjob_test.py b/test/tests_for_sdkmr/EE2Runjob_test.py index 3698c9124..3794104c9 100644 --- a/test/tests_for_sdkmr/EE2Runjob_test.py +++ b/test/tests_for_sdkmr/EE2Runjob_test.py @@ -14,13 +14,19 @@ from execution_engine2.authorization.workspaceauth import WorkspaceAuth from execution_engine2.db.MongoUtil import MongoUtil -from execution_engine2.db.models.models import Job, JobInput, JobRequirements, Meta +from execution_engine2.db.models.models import ( + Job, + JobInput, + JobRequirements, + Meta, + Status, +) from execution_engine2.exceptions import ( IncorrectParamsException, AuthError, InvalidParameterForBatch, ) -from execution_engine2.sdk.EE2Runjob import EE2RunJob, JobPermissions +from execution_engine2.sdk.EE2Runjob import EE2RunJob, JobPermissions, JobIdPair from execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner from execution_engine2.sdk.job_submission_parameters import ( JobSubmissionParameters, @@ -72,7 +78,6 @@ _CLUSTER_1 = "cluster1" _CLUSTER_2 = "cluster2" - _EMPTY_JOB_REQUIREMENTS = { "cpus": None, "memory_MB": None, @@ -529,7 +534,6 @@ def test_run_job_as_concierge_sched_reqs_empty_list_as_admin(): def _run_as_concierge_empty_as_admin(concierge_params, app): - # Set up data variables client_group = "concierge" # hardcoded default for run_as_concierge cpus = 1 @@ -763,7 +767,7 @@ def _run_and_run_batch_fail( _run_batch_fail(rj, [params], {}, as_admin, expected) -def _set_up_common_return_values_batch(mocks): +def _set_up_common_return_values_batch(mocks, returned_job_state=_QUEUED_STATE): """ Set up return values on mocks that are the same for several tests. """ @@ -773,13 +777,13 @@ def _set_up_common_return_values_batch(mocks): returned_parent_job = Job() returned_parent_job.id = ObjectId(_JOB_ID) returned_parent_job.user = _USER + + mocks[SDKMethodRunner].save_and_return_job.return_value = returned_parent_job mocks[CatalogCache].lookup_git_commit_version.side_effect = [ _GIT_COMMIT_1, _GIT_COMMIT_2, ] - mocks[SDKMethodRunner].save_and_return_job.return_value = returned_parent_job - # create job1, update job1, create job2, update job2, update parent job mocks[SDKMethodRunner].save_job.side_effect = [ _JOB_ID_1, @@ -788,6 +792,11 @@ def _set_up_common_return_values_batch(mocks): None, None, ] + + mocks[SDKMethodRunner].save_jobs.side_effect = [ + [_JOB_ID_1, _JOB_ID_2], + ] + mocks[Condor].run_job.side_effect = [ SubmissionInfo(_CLUSTER_1, {}, None), SubmissionInfo(_CLUSTER_2, {}, None), @@ -798,10 +807,25 @@ def _set_up_common_return_values_batch(mocks): retjob_2 = Job() retjob_2.id = ObjectId(_JOB_ID_2) retjob_2.status = _CREATED_STATE + + retjob_1_after_submit = Job() + retjob_1_after_submit.id = ObjectId(_JOB_ID_1) + retjob_1_after_submit.status = returned_job_state + retjob_1_after_submit.scheduler_id = _CLUSTER_1 + retjob_2_after_submit = Job() + retjob_2_after_submit.id = ObjectId(_JOB_ID_2) + retjob_2_after_submit.status = returned_job_state + retjob_2_after_submit.scheduler_id = _CLUSTER_2 + mocks[MongoUtil].get_job.side_effect = [retjob_1, retjob_2] + mocks[MongoUtil].get_jobs.side_effect = [ + [retjob_1_after_submit, retjob_2_after_submit] + ] -def _check_common_mock_calls_batch(mocks, reqs1, reqs2, parent_wsid): +def _check_common_mock_calls_batch( + mocks, reqs1, reqs2, parent_wsid, terminated_during_submit=False +): """ Check that mocks are called as expected when those calls are similar or the same for several tests. @@ -834,7 +858,7 @@ def _check_common_mock_calls_batch(mocks, reqs1, reqs2, parent_wsid): ] ) - assert len(sdkmr.save_job.call_args_list) == 5 + assert len(sdkmr.save_jobs.call_args_list) == 1 # initial child jobs data save expected_job_1 = _create_job( @@ -846,7 +870,7 @@ def _check_common_mock_calls_batch(mocks, reqs1, reqs2, parent_wsid): wsid=parent_wsid, batch_id=_JOB_ID, ) - got_job_1 = sdkmr.save_job.call_args_list[0][0][0] + got_job_1 = sdkmr.save_jobs.call_args_list[0][0][0][0] assert_jobs_equal(got_job_1, expected_job_1) expected_job_2 = _create_job( @@ -857,8 +881,8 @@ def _check_common_mock_calls_batch(mocks, reqs1, reqs2, parent_wsid): wsid=parent_wsid, batch_id=_JOB_ID, ) - # index 2 because job 1 is updated with save_job before this job is created - got_job_2 = sdkmr.save_job.call_args_list[2][0][0] + # index 1 because save_jobs returns a list of two jobs + got_job_2 = sdkmr.save_jobs.call_args_list[0][0][0][1] assert_jobs_equal(got_job_2, expected_job_2) jsp_expected_1 = JobSubmissionParameters( @@ -882,38 +906,53 @@ def _check_common_mock_calls_batch(mocks, reqs1, reqs2, parent_wsid): [call(params=jsp_expected_1), call(params=jsp_expected_2)] ) - # updated job data save - mocks[MongoUtil].get_job.assert_has_calls([call(_JOB_ID_1), call(_JOB_ID_2)]) - # update to queued state - got_queued_job_1 = sdkmr.save_job.call_args_list[1][0][0] - got_queued_job_2 = sdkmr.save_job.call_args_list[3][0][0] - _check_queued_job_save(got_queued_job_1, _JOB_ID_1, _CLUSTER_1) - _check_queued_job_save(got_queued_job_2, _JOB_ID_2, _CLUSTER_2) - - mocks[KafkaClient].send_kafka_message.assert_has_calls( - [ - call(KafkaCreateJob(job_id=_JOB_ID, user=_USER)), # parent job - call(KafkaCreateJob(job_id=_JOB_ID_1, user=_USER)), - call( - KafkaQueueChange( - job_id=_JOB_ID_1, - new_status=_QUEUED_STATE, - previous_status=_CREATED_STATE, - scheduler_id=_CLUSTER_1, - ) - ), - call(KafkaCreateJob(job_id=_JOB_ID_2, user=_USER)), - call( - KafkaQueueChange( - job_id=_JOB_ID_2, - new_status=_QUEUED_STATE, - previous_status=_CREATED_STATE, - scheduler_id=_CLUSTER_2, - ) - ), - ] - ) + child_job_pairs = [ + JobIdPair(_JOB_ID_1, _CLUSTER_1), + JobIdPair(_JOB_ID_2, _CLUSTER_2), + ] + mocks[MongoUtil].update_jobs_to_queued.assert_has_calls([call(child_job_pairs)]) + job_ids = [child_job_pair.job_id for child_job_pair in child_job_pairs] + mocks[MongoUtil].get_jobs.assert_has_calls([call(job_ids)]) + + if not terminated_during_submit: + mocks[KafkaClient].send_kafka_message.assert_has_calls( + [ + call(KafkaCreateJob(job_id=_JOB_ID, user=_USER)), # parent job + call(KafkaCreateJob(job_id=_JOB_ID_1, user=_USER)), + call(KafkaCreateJob(job_id=_JOB_ID_2, user=_USER)), + call( + KafkaQueueChange( + job_id=_JOB_ID_1, + new_status=_QUEUED_STATE, + previous_status=_CREATED_STATE, + scheduler_id=_CLUSTER_1, + ) + ), + call( + KafkaQueueChange( + job_id=_JOB_ID_2, + new_status=_QUEUED_STATE, + previous_status=_CREATED_STATE, + scheduler_id=_CLUSTER_2, + ) + ), + ] + ) + else: + mocks[KafkaClient].send_kafka_message.assert_has_calls( + [ + call(KafkaCreateJob(job_id=_JOB_ID, user=_USER)), # parent job + call(KafkaCreateJob(job_id=_JOB_ID_1, user=_USER)), + call(KafkaCreateJob(job_id=_JOB_ID_2, user=_USER)), + ] + ) + mocks[SDKMethodRunner].cancel_job.assert_has_calls( + [ + call(job_id=_JOB_ID_1, terminated_code=0), + call(job_id=_JOB_ID_2, terminated_code=0), + ] + ) # Removed for now, but might be added back in if run_job_message is re-added # mocks[SlackClient].run_job_message.assert_has_calls( @@ -923,12 +962,114 @@ def _check_common_mock_calls_batch(mocks, reqs1, reqs2, parent_wsid): # ] # ) - final_expected_parent_job = Job() - final_expected_parent_job.id = ObjectId(_JOB_ID) - final_expected_parent_job.user = _USER - final_expected_parent_job.child_jobs = [_JOB_ID_1, _JOB_ID_2] - final_got_parent_job = sdkmr.save_job.call_args_list[4][0][0] - assert_jobs_equal(final_got_parent_job, final_expected_parent_job) + # Test to see if add_child jobs is called with correct batch_container and children + expected_batch_container = Job() + expected_batch_container.id = ObjectId(_JOB_ID) + expected_batch_container.user = _USER + + batch_job = sdkmr.add_child_jobs.call_args_list[0][1]["batch_job"] + sdkmr.add_child_jobs.assert_called_once_with( + batch_job=expected_batch_container, child_jobs=[_JOB_ID_1, _JOB_ID_2] + ) + """ + So this test doesn't actually check that the call is correct, but the assert_jobs_equal line below does + the assert below is necessary because of how equality works for Job objects + ( because they have the same object ID, which is what Job equality is based on. ) + and that the assert_called_once_with doesn't correctly check the job object + """ + assert_jobs_equal(batch_job, expected_batch_container) + + +def test_run_job_batch_with_cancellation_during_submit(): + """ + A basic unit test of the run_batch() method, providing a workspace ID for the parent job. This one also checks for + cancellation during submit causing a job cancellation request to be processed . + + This test is a fairly minimal test of the run_batch() method. It does not exercise all the + potential code paths or provide all the possible run inputs, such as job parameters, cell + metadata, etc. + """ + # When an assertion is failed, this test doesn't show you where failed in PyCharm, so use + # Additional arguments `--no-cov -s` or run from cmd line + # PYTHONPATH=.:lib:test pytest test/tests_for_sdkmr/EE2Runjob_test.py::test_run_job_batch_with_parent_job_wsid --no-cov + + # set up variables + parent_wsid = 89 + wsid = 32 + + # set up mocks + mocks = _set_up_mocks(_USER, _TOKEN) + sdkmr = mocks[SDKMethodRunner] + jrr = mocks[JobRequirementsResolver] + # We intentionally do not check the logger methods as there are a lot of them and this is + # already a very large test. This may be something to be added later when needed. + + # Set up call returns. These calls are in the order they occur in the code + mocks[WorkspaceAuth].can_write.return_value = True + mocks[WorkspaceAuth].can_write_list.return_value = {wsid: True} + + jrr.normalize_job_reqs.side_effect = [{}, {}] + jrr.get_requirements_type.side_effect = [ + RequirementsType.STANDARD, + RequirementsType.STANDARD, + ] + reqs1 = ResolvedRequirements( + cpus=1, + memory_MB=2, + disk_GB=3, + client_group="cg1", + ) + reqs2 = ResolvedRequirements( + cpus=10, + memory_MB=20, + disk_GB=30, + client_group="cg2", + ) + jrr.resolve_requirements.side_effect = [reqs1, reqs2] + + _set_up_common_return_values_batch( + mocks, returned_job_state=Status.terminated.value + ) + + # set up the class to be tested and run the method + rj = EE2RunJob(sdkmr) + params = [ + { + "method": _METHOD_1, + "app_id": _APP_1, + "source_ws_objects": [_WS_REF_1, _WS_REF_2], + }, + { + "method": _METHOD_2, + "app_id": _APP_2, + "wsid": wsid, + }, + ] + params[1]["wsid"] = None + assert rj.run_batch(params, {"wsid": parent_wsid}) == { + "batch_id": _JOB_ID, + "child_job_ids": [_JOB_ID_1, _JOB_ID_2], + } + + # check mocks called as expected. The order here is the order that they're called in the code. + mocks[WorkspaceAuth].can_write.assert_called_once_with(parent_wsid) + + jrr = mocks[JobRequirementsResolver] + jrr.normalize_job_reqs.assert_has_calls( + [call({}, "input job"), call({}, "input job")] + ) + jrr.get_requirements_type.assert_has_calls( + [call(**_EMPTY_JOB_REQUIREMENTS), call(**_EMPTY_JOB_REQUIREMENTS)] + ) + jrr.resolve_requirements.assert_has_calls( + [ + call(_METHOD_1, mocks[CatalogCache], **_EMPTY_JOB_REQUIREMENTS), + call(_METHOD_2, mocks[CatalogCache], **_EMPTY_JOB_REQUIREMENTS), + ] + ) + _check_common_mock_calls_batch( + mocks, reqs1, reqs2, parent_wsid, terminated_during_submit=True + ) def test_run_job_batch_with_parent_job_wsid(): diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py index a90044703..ce98c0fb9 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py @@ -19,9 +19,13 @@ from execution_engine2.authorization.workspaceauth import WorkspaceAuth from execution_engine2.db.MongoUtil import MongoUtil +from execution_engine2.db.models.models import Job, Status, TerminatedCode from execution_engine2.exceptions import AuthError +from execution_engine2.exceptions import InvalidStatusTransitionException +from execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner from execution_engine2.sdk.job_submission_parameters import JobRequirements from execution_engine2.utils.Condor import Condor +from execution_engine2.utils.CondorTuples import SubmissionInfo from execution_engine2.utils.KafkaUtils import KafkaClient from execution_engine2.utils.SlackUtils import SlackClient from execution_engine2.utils.clients import UserClientSet, ClientSet @@ -30,10 +34,6 @@ JobRequirementsResolver, RequirementsType, ) -from lib.execution_engine2.db.models.models import Job, Status, TerminatedCode -from lib.execution_engine2.exceptions import InvalidStatusTransitionException -from lib.execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner -from lib.execution_engine2.utils.CondorTuples import SubmissionInfo from test.tests_for_sdkmr.ee2_SDKMethodRunner_test_utils import ee2_sdkmr_test_helper from test.utils_shared.mock_utils import get_client_mocks, ALL_CLIENTS from test.utils_shared.test_utils import ( @@ -48,7 +48,7 @@ logging.basicConfig(level=logging.INFO) bootstrap() -from lib.execution_engine2.sdk.EE2Runjob import EE2RunJob +from execution_engine2.sdk.EE2Runjob import EE2RunJob from installed_clients.CatalogClient import Catalog from installed_clients.WorkspaceClient import Workspace @@ -174,7 +174,7 @@ def test_getters(self): is clients_and_mocks[JobRequirementsResolver] ) - def test_save_job(self): + def test_save_job_and_save_jobs(self): ws = Workspace("https://fake.com") wsa = WorkspaceAuth("user", ws) cliset = UserClientSet("user", "token", ws, wsa) @@ -190,9 +190,31 @@ def test_save_job(self): j = create_autospec(Job, spec_set=False, instance=True) j.id = bson.objectid.ObjectId("603051cfaf2e3401b0500982") assert sdkmr.save_job(j) == "603051cfaf2e3401b0500982" - j.save.assert_called_once_with() + # Test Save Jobs + job1 = Job() + job1.id = bson.objectid.ObjectId("603051cfaf2e3401b0500980") + job2 = Job() + job2.id = bson.objectid.ObjectId("603051cfaf2e3401b0500981") + sdkmr.get_mongo_util().insert_jobs.return_value = [job1.id, job2.id] + jobs = sdkmr.save_jobs([job1, job2]) + sdkmr.get_mongo_util().insert_jobs.assert_called_with( + jobs_to_insert=[job1, job2] + ) + assert jobs == [str(job1.id), str(job2.id)] + + def test_add_child_jobs(self): + ws = Workspace("https://fake.com") + wsa = WorkspaceAuth("user", ws) + cliset = UserClientSet("user", "token", ws, wsa) + clients_and_mocks = get_client_mocks(self.cfg, self.config_file, *ALL_CLIENTS) + sdkmr = SDKMethodRunner(cliset, clients_and_mocks[ClientSet]) + j = create_autospec(Job, spec_set=False, instance=True) + returned_job = sdkmr.add_child_jobs(batch_job=j, child_jobs=["a", "b", "c"]) + j.modify.assert_called_once_with(add_to_set__child_jobs=["a", "b", "c"]) + assert returned_job == j + def test_save_and_return_job(self): ws = Workspace("https://fake.com") wsa = WorkspaceAuth("user", ws) diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py index 96ecd7349..f30c153f7 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py @@ -334,7 +334,7 @@ def test_retry_job_multiple(self, rq_mock, condor_mock): "'123' is not a valid ObjectId, it must be a 12-byte input or a 24-character " "hex string" ) - with self.assertRaisesRegexp(RetryFailureException, errmsg): + with self.assertRaisesRegex(RetryFailureException, errmsg): runner.retry_multiple(job_ids=[parent_job_id1, 123]) # 3. Retry the jobs with duplicate job ids @@ -382,7 +382,7 @@ def test_retry_job_multiple(self, rq_mock, condor_mock): self.check_retry_job_state(parent_job_id4, job4["job_id"]) # Test no job ids - with self.assertRaisesRegexp(ValueError, "No job_ids provided to retry"): + with self.assertRaisesRegex(ValueError, "No job_ids provided to retry"): runner.retry_multiple(job_ids=None) # Test error during retry, but passing validate diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Status_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Status_test.py index 98e73cf70..f356d8ce0 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Status_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Status_test.py @@ -131,7 +131,7 @@ def test_check_job(self, rq_mock, condor_mock): @patch("lib.execution_engine2.utils.Condor.Condor", autospec=True) def test_run_job_and_handle_held(self, rq_mock, condor_mock): """ - Run a job, then call it held as an admin, and then check to see if the record contains condor info about the job + Run a job, then call it held as an admin, and then check to see if the record is set to error or terminated :param rq_mock: :param condor_mock: :return: @@ -157,8 +157,15 @@ def test_run_job_and_handle_held(self, rq_mock, condor_mock): print( f"Job id is {job_id}. Status is {check_job.get('status')} Cluster is {check_job.get('scheduler_id')} " ) + self.assertEqual(check_job.get("status"), Status.queued.value) job_record = runner.handle_held_job(cluster_id=check_job.get("scheduler_id")) - self.assertEqual(self.fake_used_resources, job_record.get("condor_job_ads")) + # This flaky test changes depending on your test environment + self.assertIn( + job_record.get("status"), [Status.terminated.value, Status.error.value] + ) + # Condor ads are actually wrong and should only be updated after the job is completed, + # so we don't need to check them in this test right now. + # See EE2 issue #251 def test_update_job_status(self): runner = self.getRunner() diff --git a/test/tests_for_sdkmr/ee2_kafka_test.py b/test/tests_for_sdkmr/ee2_kafka_test.py index ebd73a845..60856d18c 100644 --- a/test/tests_for_sdkmr/ee2_kafka_test.py +++ b/test/tests_for_sdkmr/ee2_kafka_test.py @@ -29,7 +29,7 @@ def setUpClass(cls): def test_status_change(self): - with self.assertRaisesRegexp( + with self.assertRaisesRegex( expected_exception=TypeError, expected_regex=r"__init__\(\) missing 1 required positional argument: 'scheduler_id'", ): diff --git a/test/utils_shared/producer.py b/test/utils_shared/producer.py index b90926da4..0ff2f5ace 100644 --- a/test/utils_shared/producer.py +++ b/test/utils_shared/producer.py @@ -22,11 +22,8 @@ def send_kafka_message(self, message, topic=DEFAULT_TOPIC): producer = Producer({"bootstrap.servers": self.server_address}) producer.produce(topic, str(message), callback=_delivery_report) producer.poll(2) - logging.info( - f"Successfully sent message to kafka at topic={topic} message={json.dumps(message)} server_address={self.server_address}" - ) except Exception as e: - logging.info( + logging.error( f"Failed to send message to kafka at topic={topic} message={json.dumps(message)} server_address={self.server_address}" ) raise Exception(e)