diff --git a/cellpack/autopack/DBRecipeHandler.py b/cellpack/autopack/DBRecipeHandler.py index 8b4e8578..00fc8bb7 100644 --- a/cellpack/autopack/DBRecipeHandler.py +++ b/cellpack/autopack/DBRecipeHandler.py @@ -529,7 +529,7 @@ def upload_config(self, config_data, source_path): self.db.update_doc("configs", id, config_data) return id - def upload_result_metadata(self, file_name, url, job_id=None): + def upload_result_metadata(self, file_name, url, dedup_hash=None): """ Upload the metadata of the result file to the database. """ @@ -543,28 +543,40 @@ def upload_result_metadata(self, file_name, url, job_id=None): "user": username, "timestamp": timestamp, "url": url, - "batch_job_id": job_id, + "dedup_hash": dedup_hash, }, ) - if job_id: - self.upload_job_status(job_id, "DONE", result_path=url) + if dedup_hash: + self.upload_job_status(dedup_hash, "DONE", result_path=url) - def upload_job_status(self, job_id, status, result_path=None, error_message=None): + def upload_job_status( + self, + dedup_hash, + status, + result_path=None, + error_message=None, + outputs_directory=None, + ): """ - Update status for a given job ID + Update status for a given dedup_hash """ if self.db: - timestamp = self.db.create_timestamp() - self.db.update_or_create( - "job_status", - job_id, - { - "timestamp": timestamp, - "status": str(status), - "result_path": result_path, - "error_message": error_message, - }, - ) + db_handler = self.db + # If db is AWSHandler, switch to firebase handler for job status updates + if hasattr(self.db, "s3_client"): + handler = DATABASE_IDS.handlers().get(DATABASE_IDS.FIREBASE) + db_handler = handler(default_db="staging") + timestamp = db_handler.create_timestamp() + data = { + "timestamp": timestamp, + "status": str(status), + "error_message": error_message, + } + if result_path: + data["result_path"] = result_path + if outputs_directory: + data["outputs_directory"] = outputs_directory + db_handler.update_or_create("job_status", dedup_hash, data) def save_recipe_and_config_to_output(self, output_folder, config_data, recipe_data): output_path = Path(output_folder) @@ -583,7 +595,7 @@ def upload_packing_results_workflow( self, source_folder, recipe_name, - job_id, + dedup_hash, config_data, recipe_data, ): @@ -591,7 +603,7 @@ def upload_packing_results_workflow( Complete packing results upload workflow including folder preparation and s3 upload """ try: - if job_id: + if dedup_hash: source_path = Path(source_folder) if not source_path.exists(): @@ -601,7 +613,7 @@ def upload_packing_results_workflow( # prepare unique S3 upload folder parent_folder = source_path.parent - unique_folder_name = f"{source_path.name}_run_{job_id}" + unique_folder_name = f"{source_path.name}_run_{dedup_hash}" s3_upload_folder = parent_folder / unique_folder_name logging.debug(f"outputs will be copied to: {s3_upload_folder}") @@ -618,7 +630,7 @@ def upload_packing_results_workflow( upload_result = self.upload_outputs_to_s3( output_folder=s3_upload_folder, recipe_name=recipe_name, - job_id=job_id, + dedup_hash=dedup_hash, ) # clean up temporary folder after upload @@ -628,9 +640,11 @@ def upload_packing_results_workflow( f"Cleaned up temporary upload folder: {s3_upload_folder}" ) - # update outputs directory in firebase - self.update_outputs_directory( - job_id, upload_result.get("outputs_directory") + # update outputs directory in job status + self.upload_job_status( + dedup_hash, + "DONE", + outputs_directory=upload_result.get("outputs_directory"), ) return upload_result @@ -639,7 +653,7 @@ def upload_packing_results_workflow( logging.error(e) return {"success": False, "error": e} - def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): + def upload_outputs_to_s3(self, output_folder, recipe_name, dedup_hash): """ Upload packing outputs to S3 bucket """ @@ -647,7 +661,7 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): bucket_name = self.db.bucket_name region_name = self.db.region_name sub_folder_name = self.db.sub_folder_name - s3_prefix = f"{sub_folder_name}/{recipe_name}/{job_id}" + s3_prefix = f"{sub_folder_name}/{recipe_name}/{dedup_hash}" try: upload_result = self.db.upload_directory( @@ -671,7 +685,7 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): return { "success": True, - "run_id": job_id, + "dedup_hash": dedup_hash, "s3_bucket": bucket_name, "s3_prefix": s3_prefix, "public_url_base": f"{base_url}/{s3_prefix}/", @@ -685,25 +699,6 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): logging.error(e) return {"success": False, "error": e} - def update_outputs_directory(self, job_id, outputs_directory): - if not self.db or self.db.s3_client: - # switch to firebase handler to update job status - handler = DATABASE_IDS.handlers().get("firebase") - initialized_db = handler(default_db="staging") - if job_id: - timestamp = initialized_db.create_timestamp() - initialized_db.update_or_create( - "job_status", - job_id, - { - "timestamp": timestamp, - "outputs_directory": outputs_directory, - }, - ) - logging.debug( - f"Updated outputs s3 location {outputs_directory} for job ID: {job_id}" - ) - class DBRecipeLoader(object): """ diff --git a/cellpack/autopack/upy/simularium/simularium_helper.py b/cellpack/autopack/upy/simularium/simularium_helper.py index 86af0874..4f934e0e 100644 --- a/cellpack/autopack/upy/simularium/simularium_helper.py +++ b/cellpack/autopack/upy/simularium/simularium_helper.py @@ -1385,16 +1385,14 @@ def raycast(self, **kw): def raycast_test(self, obj, start, end, length, **kw): return - def post_and_open_file(self, file_name, open_results_in_browser): + def post_and_open_file(self, file_name, open_results_in_browser, dedup_hash=None): simularium_file = Path(f"{file_name}.simularium") - url = None - job_id = os.environ.get("AWS_BATCH_JOB_ID", None) file_name, url = simulariumHelper.store_result_file( - simularium_file, storage="aws", batch_job_id=job_id + simularium_file, storage="aws", batch_job_id=dedup_hash ) if file_name and url: simulariumHelper.store_metadata( - file_name, url, db="firebase", job_id=job_id + file_name, url, db="firebase", dedup_hash=dedup_hash ) if open_results_in_browser: simulariumHelper.open_in_simularium(url) @@ -1428,7 +1426,7 @@ def store_result_file( return file_name, url @staticmethod - def store_metadata(file_name, url, db=None, job_id=None): + def store_metadata(file_name, url, db=None, dedup_hash=None): if db == "firebase": handler = DATABASE_IDS.handlers().get(db) initialized_db = handler( @@ -1436,7 +1434,7 @@ def store_metadata(file_name, url, db=None, job_id=None): ) # default to staging for metadata uploads if initialized_db._initialized: db_uploader = DBUploader(initialized_db) - db_uploader.upload_result_metadata(file_name, url, job_id) + db_uploader.upload_result_metadata(file_name, url, dedup_hash) else: db_maintainer = DBMaintenance(initialized_db) logging.warning( diff --git a/cellpack/autopack/writers/__init__.py b/cellpack/autopack/writers/__init__.py index 6ca931af..0b09e03a 100644 --- a/cellpack/autopack/writers/__init__.py +++ b/cellpack/autopack/writers/__init__.py @@ -197,8 +197,11 @@ def save_as_simularium(self, env, seed_to_results_map): number_of_packings = env.config_data.get("number_of_packings", 1) open_results_in_browser = env.config_data.get("open_results_in_browser", False) upload_results = env.config_data.get("upload_results", False) + dedup_hash = getattr(env, "dedup_hash", None) if (number_of_packings == 1 or is_aggregate) and upload_results: - autopack.helper.post_and_open_file(file_name, open_results_in_browser) + autopack.helper.post_and_open_file( + file_name, open_results_in_browser, dedup_hash + ) def save_Mixed_asJson( self, diff --git a/cellpack/bin/pack.py b/cellpack/bin/pack.py index 83b22264..27c4d018 100644 --- a/cellpack/bin/pack.py +++ b/cellpack/bin/pack.py @@ -1,6 +1,5 @@ import logging import logging.config -import os import time from pathlib import Path @@ -25,7 +24,11 @@ def pack( - recipe, config_path=None, analysis_config_path=None, docker=False, validate=True + recipe, + config_path=None, + analysis_config_path=None, + docker=False, + hash=None, ): """ Initializes an autopack packing from the command line @@ -33,7 +36,7 @@ def pack( :param config_path: string argument, path to packing config file :param analysis_config_path: string argument, path to analysis config file :param docker: boolean argument, are we using docker - :param validate: boolean argument, validate recipe before packing + :param hash: string argument, dedup hash identifier for tracking/caching results :return: void """ @@ -57,6 +60,7 @@ def pack( autopack.helper = helper env = Environment(config=packing_config_data, recipe=recipe_data) env.helper = helper + env.dedup_hash = hash log.info("Packing recipe: %s", recipe_data["name"]) log.info("Outputs will be saved to %s", env.out_folder) @@ -83,24 +87,22 @@ def pack( env.buildGrid(rebuild=True) env.pack_grid(verbose=0, usePP=False) - if docker: - job_id = os.environ.get("AWS_BATCH_JOB_ID", None) - if job_id: - handler = DATABASE_IDS.handlers().get(DATABASE_IDS.AWS) - # temporarily using demo bucket before permissions are granted - initialized_handler = handler( - bucket_name="cellpack-demo", - sub_folder_name="runs", - region_name="us-west-2", - ) - uploader = DBUploader(db_handler=initialized_handler) - uploader.upload_packing_results_workflow( - source_folder=env.out_folder, - recipe_name=recipe_data["name"], - job_id=job_id, - config_data=packing_config_data, - recipe_data=recipe_loader.serializable_recipe_data, - ) + if docker and hash: + handler = DATABASE_IDS.handlers().get(DATABASE_IDS.AWS) + # temporarily using demo bucket before permissions are granted + initialized_handler = handler( + bucket_name="cellpack-demo", + sub_folder_name="runs", + region_name="us-west-2", + ) + uploader = DBUploader(db_handler=initialized_handler) + uploader.upload_packing_results_workflow( + source_folder=env.out_folder, + recipe_name=recipe_data["name"], + dedup_hash=hash, + config_data=packing_config_data, + recipe_data=recipe_loader.serializable_recipe_data, + ) def main(): diff --git a/cellpack/tests/test_db_uploader.py b/cellpack/tests/test_db_uploader.py index 0c91cbd5..414f6b9c 100644 --- a/cellpack/tests/test_db_uploader.py +++ b/cellpack/tests/test_db_uploader.py @@ -175,3 +175,56 @@ def test_upload_recipe(): "A": "firebase:composition/test_id", } assert recipe_doc.objects_to_path_map == {"sphere_25": "firebase:objects/test_id"} + + +def test_upload_job_status_with_firebase_handler(): + mock_firebase_db = MagicMock() + mock_firebase_db.create_timestamp.return_value = "test_timestamp" + # firebaseHandler does not have s3_client attribute + del mock_firebase_db.s3_client + + uploader = DBUploader(mock_firebase_db) + uploader.upload_job_status("test_hash", "RUNNING") + + mock_firebase_db.create_timestamp.assert_called_once() + mock_firebase_db.update_or_create.assert_called_once_with( + "job_status", + "test_hash", + { + "timestamp": "test_timestamp", + "status": "RUNNING", + "error_message": None, + }, + ) + + +def test_upload_job_status_with_aws_handler(): + mock_aws_db = MagicMock() + mock_aws_db.s3_client = MagicMock() # AWSHandler has s3_client + + mock_firebase_handler = MagicMock() + mock_firebase_handler.create_timestamp.return_value = "firebase_timestamp" + + with patch( + "cellpack.autopack.DBRecipeHandler.DATABASE_IDS.handlers" + ) as mock_handlers: + mock_handlers.return_value.get.return_value = ( + lambda default_db: mock_firebase_handler + ) + + uploader = DBUploader(mock_aws_db) + uploader.upload_job_status("test_hash", "DONE", result_path="test_path") + + mock_firebase_handler.create_timestamp.assert_called_once() + mock_firebase_handler.update_or_create.assert_called_once_with( + "job_status", + "test_hash", + { + "timestamp": "firebase_timestamp", + "status": "DONE", + "error_message": None, + "result_path": "test_path", + }, + ) + # AWS handler should not be called for timestamp + mock_aws_db.create_timestamp.assert_not_called() diff --git a/docker/server.py b/docker/server.py index 9b1ce105..74bb20f3 100644 --- a/docker/server.py +++ b/docker/server.py @@ -1,8 +1,6 @@ import asyncio from aiohttp import web -import os -import uuid -from cellpack.autopack.DBRecipeHandler import DBUploader +from cellpack.autopack.DBRecipeHandler import DataDoc, DBUploader from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS from cellpack.bin.pack import pack @@ -12,29 +10,40 @@ class CellpackServer: def __init__(self): self.packing_tasks = set() - async def run_packing(self, job_id, recipe=None, config=None, body=None): - os.environ["AWS_BATCH_JOB_ID"] = job_id - self.update_job_status(job_id, "RUNNING") + def _get_firebase_handler(self, database_name="firebase"): + handler = DATABASE_IDS.handlers().get(database_name) + initialized_db = handler(default_db="staging") + if initialized_db._initialized: + return initialized_db + return None + + def job_exists(self, dedup_hash): + db = self._get_firebase_handler() + if not db: + return False + + job_status, _ = db.get_doc_by_id("job_status", dedup_hash) + return job_status is not None + + async def run_packing(self, dedup_hash, recipe=None, config=None, body=None): + self.update_job_status(dedup_hash, "RUNNING") try: # Pack JSON recipe in body if provided, otherwise use recipe path - pack(recipe=(body if body else recipe), config_path=config, docker=True) + pack(recipe=(body if body else recipe), config_path=config, docker=True, hash=dedup_hash) except Exception as e: - self.update_job_status(job_id, "FAILED", error_message=str(e)) + self.update_job_status(dedup_hash, "FAILED", error_message=str(e)) - def update_job_status(self, job_id, status, result_path=None, error_message=None): - handler = DATABASE_IDS.handlers().get("firebase") - initialized_db = handler( - default_db="staging" - ) - if initialized_db._initialized: - db_uploader = DBUploader(initialized_db) - db_uploader.upload_job_status(job_id, status, result_path, error_message) + def update_job_status(self, dedup_hash, status, result_path=None, error_message=None): + db = self._get_firebase_handler() + if db: + db_uploader = DBUploader(db) + db_uploader.upload_job_status(dedup_hash, status, result_path, error_message) async def hello_world(self, request: web.Request) -> web.Response: return web.Response(text="Hello from the cellPACK server") async def health_check(self, request: web.Request) -> web.Response: - # healthcheck endpoint needed for AWS load balancer + # health check endpoint needed for AWS load balancer return web.Response() async def pack_handler(self, request: web.Request) -> web.Response: @@ -48,10 +57,14 @@ async def pack_handler(self, request: web.Request) -> web.Response: "Pack requests must include recipe as a query param" ) config = request.rel_url.query.get("config") - job_id = str(uuid.uuid4()) + + dedup_hash = DataDoc.generate_hash(body) + + if self.job_exists(dedup_hash): + return web.json_response({"jobId": dedup_hash}) # Initiate packing task to run in background - packing_task = asyncio.create_task(self.run_packing(job_id, recipe, config, body)) + packing_task = asyncio.create_task(self.run_packing(dedup_hash, recipe, config, body)) # Keep track of task references to prevent them from being garbage # collected, then discard after task completion @@ -60,7 +73,7 @@ async def pack_handler(self, request: web.Request) -> web.Response: # return job id immediately, rather than wait for task to complete, # to avoid timeout issues with API gateway - return web.json_response({"jobId": job_id}) + return web.json_response({"jobId": dedup_hash}) async def init_app() -> web.Application: @@ -75,4 +88,4 @@ async def init_app() -> web.Application: ) return app -web.run_app(init_app(), host="0.0.0.0", port=SERVER_PORT) \ No newline at end of file +web.run_app(init_app(), host="0.0.0.0", port=SERVER_PORT)