diff --git a/cellpack/autopack/DBRecipeHandler.py b/cellpack/autopack/DBRecipeHandler.py index 8b4e8578a..3eb691d59 100644 --- a/cellpack/autopack/DBRecipeHandler.py +++ b/cellpack/autopack/DBRecipeHandler.py @@ -529,7 +529,7 @@ def upload_config(self, config_data, source_path): self.db.update_doc("configs", id, config_data) return id - def upload_result_metadata(self, file_name, url, job_id=None): + def upload_result_metadata(self, file_name, url): """ Upload the metadata of the result file to the database. """ @@ -543,28 +543,37 @@ def upload_result_metadata(self, file_name, url, job_id=None): "user": username, "timestamp": timestamp, "url": url, - "batch_job_id": job_id, }, ) - if job_id: - self.upload_job_status(job_id, "DONE", result_path=url) - def upload_job_status(self, job_id, status, result_path=None, error_message=None): + def upload_job_status( + self, + dedup_hash, + status, + result_path=None, + error_message=None, + outputs_directory=None, + ): """ - Update status for a given job ID + Update status for a given dedup_hash """ if self.db: - timestamp = self.db.create_timestamp() - self.db.update_or_create( - "job_status", - job_id, - { - "timestamp": timestamp, - "status": str(status), - "result_path": result_path, - "error_message": error_message, - }, - ) + db_handler = self.db + # If db is AWSHandler, switch to firebase handler for job status updates + if hasattr(self.db, "s3_client"): + handler = DATABASE_IDS.handlers().get(DATABASE_IDS.FIREBASE) + db_handler = handler(default_db="staging") + timestamp = db_handler.create_timestamp() + data = { + "timestamp": timestamp, + "status": str(status), + "error_message": error_message, + } + if result_path: + data["result_path"] = result_path + if outputs_directory: + data["outputs_directory"] = outputs_directory + db_handler.update_or_create("job_status", dedup_hash, data) def save_recipe_and_config_to_output(self, output_folder, config_data, recipe_data): output_path = Path(output_folder) @@ -583,7 +592,7 @@ def upload_packing_results_workflow( self, source_folder, recipe_name, - job_id, + dedup_hash, config_data, recipe_data, ): @@ -591,7 +600,7 @@ def upload_packing_results_workflow( Complete packing results upload workflow including folder preparation and s3 upload """ try: - if job_id: + if dedup_hash: source_path = Path(source_folder) if not source_path.exists(): @@ -601,7 +610,7 @@ def upload_packing_results_workflow( # prepare unique S3 upload folder parent_folder = source_path.parent - unique_folder_name = f"{source_path.name}_run_{job_id}" + unique_folder_name = f"{source_path.name}_run_{dedup_hash}" s3_upload_folder = parent_folder / unique_folder_name logging.debug(f"outputs will be copied to: {s3_upload_folder}") @@ -618,7 +627,7 @@ def upload_packing_results_workflow( upload_result = self.upload_outputs_to_s3( output_folder=s3_upload_folder, recipe_name=recipe_name, - job_id=job_id, + dedup_hash=dedup_hash, ) # clean up temporary folder after upload @@ -628,9 +637,12 @@ def upload_packing_results_workflow( f"Cleaned up temporary upload folder: {s3_upload_folder}" ) - # update outputs directory in firebase - self.update_outputs_directory( - job_id, upload_result.get("outputs_directory") + # update outputs directory in job status + self.upload_job_status( + dedup_hash, + "DONE", + result_path=upload_result.get("simularium_url"), + outputs_directory=upload_result.get("outputs_directory"), ) return upload_result @@ -639,7 +651,7 @@ def upload_packing_results_workflow( logging.error(e) return {"success": False, "error": e} - def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): + def upload_outputs_to_s3(self, output_folder, recipe_name, dedup_hash): """ Upload packing outputs to S3 bucket """ @@ -647,7 +659,7 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): bucket_name = self.db.bucket_name region_name = self.db.region_name sub_folder_name = self.db.sub_folder_name - s3_prefix = f"{sub_folder_name}/{recipe_name}/{job_id}" + s3_prefix = f"{sub_folder_name}/{recipe_name}/{dedup_hash}" try: upload_result = self.db.upload_directory( @@ -661,8 +673,11 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): f"{base_url}/{file_info['s3_key']}" for file_info in upload_result["uploaded_files"] ] + simularium_url = None + for url in public_urls: + if url.endswith(".simularium"): + simularium_url = url outputs_directory = f"https://us-west-2.console.aws.amazon.com/s3/buckets/{bucket_name}/{s3_prefix}/" - logging.info( f"Successfully uploaded {upload_result['total_files']} files to {outputs_directory}" ) @@ -671,7 +686,7 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): return { "success": True, - "run_id": job_id, + "dedup_hash": dedup_hash, "s3_bucket": bucket_name, "s3_prefix": s3_prefix, "public_url_base": f"{base_url}/{s3_prefix}/", @@ -680,30 +695,12 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): "total_size": upload_result["total_size"], "urls": public_urls, "outputs_directory": outputs_directory, + "simularium_url": simularium_url, } except Exception as e: logging.error(e) return {"success": False, "error": e} - def update_outputs_directory(self, job_id, outputs_directory): - if not self.db or self.db.s3_client: - # switch to firebase handler to update job status - handler = DATABASE_IDS.handlers().get("firebase") - initialized_db = handler(default_db="staging") - if job_id: - timestamp = initialized_db.create_timestamp() - initialized_db.update_or_create( - "job_status", - job_id, - { - "timestamp": timestamp, - "outputs_directory": outputs_directory, - }, - ) - logging.debug( - f"Updated outputs s3 location {outputs_directory} for job ID: {job_id}" - ) - class DBRecipeLoader(object): """ diff --git a/cellpack/autopack/loaders/recipe_loader.py b/cellpack/autopack/loaders/recipe_loader.py index 84cd78ac0..bbdb662ad 100644 --- a/cellpack/autopack/loaders/recipe_loader.py +++ b/cellpack/autopack/loaders/recipe_loader.py @@ -30,7 +30,13 @@ class RecipeLoader(object): # TODO: add all default values here default_values = default_recipe_values.copy() - def __init__(self, input_file_path, save_converted_recipe=False, use_docker=False): + def __init__( + self, + input_file_path, + save_converted_recipe=False, + use_docker=False, + json_recipe=None, + ): _, file_extension = os.path.splitext(input_file_path) self.current_version = CURRENT_VERSION self.file_path = input_file_path @@ -38,6 +44,7 @@ def __init__(self, input_file_path, save_converted_recipe=False, use_docker=Fals self.ingredient_list = [] self.compartment_list = [] self.save_converted_recipe = save_converted_recipe + self.json_recipe = json_recipe # set CURRENT_RECIPE_PATH appropriately for remote(firebase) vs local recipes if autopack.is_remote_path(self.file_path): @@ -49,6 +56,15 @@ def __init__(self, input_file_path, save_converted_recipe=False, use_docker=Fals self.recipe_data = self._read(use_docker=use_docker) + @classmethod + def from_json(cls, json_recipe, save_converted_recipe=False, use_docker=False): + return cls( + input_file_path="", + save_converted_recipe=save_converted_recipe, + use_docker=use_docker, + json_recipe=json_recipe, + ) + @staticmethod def _resolve_object(key, objects): current_object = objects[key] @@ -168,9 +184,15 @@ def _migrate_version(self, old_recipe): ) def _read(self, resolve_inheritance=True, use_docker=False): - new_values, database_name, is_unnested_firebase = autopack.load_file( - self.file_path, cache="recipes", use_docker=use_docker - ) + database_name = None + is_unnested_firebase = False + new_values = self.json_recipe + if new_values is None: + # Read recipe from filepath + new_values, database_name, is_unnested_firebase = autopack.load_file( + self.file_path, cache="recipes", use_docker=use_docker + ) + if database_name == "firebase": if is_unnested_firebase: objects = new_values.get("objects", {}) diff --git a/cellpack/autopack/upy/simularium/simularium_helper.py b/cellpack/autopack/upy/simularium/simularium_helper.py index 86af08746..08179d854 100644 --- a/cellpack/autopack/upy/simularium/simularium_helper.py +++ b/cellpack/autopack/upy/simularium/simularium_helper.py @@ -1385,40 +1385,30 @@ def raycast(self, **kw): def raycast_test(self, obj, start, end, length, **kw): return - def post_and_open_file(self, file_name, open_results_in_browser): + def post_and_open_file(self, file_name, open_results_in_browser, dedup_hash=None): simularium_file = Path(f"{file_name}.simularium") - url = None - job_id = os.environ.get("AWS_BATCH_JOB_ID", None) - file_name, url = simulariumHelper.store_result_file( - simularium_file, storage="aws", batch_job_id=job_id - ) - if file_name and url: - simulariumHelper.store_metadata( - file_name, url, db="firebase", job_id=job_id + if dedup_hash is None: + file_name, url = simulariumHelper.store_result_file( + simularium_file, storage="aws" ) - if open_results_in_browser: - simulariumHelper.open_in_simularium(url) + if file_name and url: + simulariumHelper.store_metadata( + file_name, url, db="firebase" + ) + if open_results_in_browser: + simulariumHelper.open_in_simularium(url) @staticmethod def store_result_file( - file_path, storage=None, batch_job_id=None, sub_folder="simularium" + file_path, storage=None, sub_folder="simularium" ): if storage == "aws": handler = DATABASE_IDS.handlers().get(storage) - # if batch_job_id is not None, then we are in a batch job and should use the temp bucket - # TODO: use cellpack-results bucket for batch jobs once we have the correct permissions - if batch_job_id: - initialized_handler = handler( - bucket_name="cellpack-demo", - sub_folder_name=sub_folder, - region_name="us-west-2", - ) - else: - initialized_handler = handler( - bucket_name="cellpack-results", - sub_folder_name=sub_folder, - region_name="us-west-2", - ) + initialized_handler = handler( + bucket_name="cellpack-results", + sub_folder_name=sub_folder, + region_name="us-west-2", + ) file_name, url = initialized_handler.save_file_and_get_url(file_path) if not file_name or not url: db_maintainer = DBMaintenance(initialized_handler) @@ -1428,7 +1418,7 @@ def store_result_file( return file_name, url @staticmethod - def store_metadata(file_name, url, db=None, job_id=None): + def store_metadata(file_name, url, db=None): if db == "firebase": handler = DATABASE_IDS.handlers().get(db) initialized_db = handler( @@ -1436,7 +1426,7 @@ def store_metadata(file_name, url, db=None, job_id=None): ) # default to staging for metadata uploads if initialized_db._initialized: db_uploader = DBUploader(initialized_db) - db_uploader.upload_result_metadata(file_name, url, job_id) + db_uploader.upload_result_metadata(file_name, url) else: db_maintainer = DBMaintenance(initialized_db) logging.warning( diff --git a/cellpack/autopack/writers/__init__.py b/cellpack/autopack/writers/__init__.py index 6ca931af2..0b09e03a6 100644 --- a/cellpack/autopack/writers/__init__.py +++ b/cellpack/autopack/writers/__init__.py @@ -197,8 +197,11 @@ def save_as_simularium(self, env, seed_to_results_map): number_of_packings = env.config_data.get("number_of_packings", 1) open_results_in_browser = env.config_data.get("open_results_in_browser", False) upload_results = env.config_data.get("upload_results", False) + dedup_hash = getattr(env, "dedup_hash", None) if (number_of_packings == 1 or is_aggregate) and upload_results: - autopack.helper.post_and_open_file(file_name, open_results_in_browser) + autopack.helper.post_and_open_file( + file_name, open_results_in_browser, dedup_hash + ) def save_Mixed_asJson( self, diff --git a/cellpack/bin/pack.py b/cellpack/bin/pack.py index 9db539372..27c4d0185 100644 --- a/cellpack/bin/pack.py +++ b/cellpack/bin/pack.py @@ -1,6 +1,5 @@ import logging import logging.config -import os import time from pathlib import Path @@ -25,23 +24,32 @@ def pack( - recipe, config_path=None, analysis_config_path=None, docker=False, validate=True + recipe, + config_path=None, + analysis_config_path=None, + docker=False, + hash=None, ): """ Initializes an autopack packing from the command line - :param recipe: string argument, path to recipe + :param recipe: string argument, path to recipe file, or a dictionary representing a recipe :param config_path: string argument, path to packing config file :param analysis_config_path: string argument, path to analysis config file :param docker: boolean argument, are we using docker - :param validate: boolean argument, validate recipe before packing + :param hash: string argument, dedup hash identifier for tracking/caching results :return: void """ packing_config_data = ConfigLoader(config_path, docker).config - recipe_loader = RecipeLoader( - recipe, packing_config_data["save_converted_recipe"], docker - ) + if isinstance(recipe, dict): + # Load recipe from JSON dictionary + recipe_loader = RecipeLoader.from_json(recipe, use_docker=docker) + else: + # Load recipe from file path + recipe_loader = RecipeLoader( + recipe, packing_config_data["save_converted_recipe"], docker + ) recipe_data = recipe_loader.recipe_data analysis_config_data = {} if analysis_config_path is not None: @@ -52,6 +60,7 @@ def pack( autopack.helper = helper env = Environment(config=packing_config_data, recipe=recipe_data) env.helper = helper + env.dedup_hash = hash log.info("Packing recipe: %s", recipe_data["name"]) log.info("Outputs will be saved to %s", env.out_folder) @@ -78,24 +87,22 @@ def pack( env.buildGrid(rebuild=True) env.pack_grid(verbose=0, usePP=False) - if docker: - job_id = os.environ.get("AWS_BATCH_JOB_ID", None) - if job_id: - handler = DATABASE_IDS.handlers().get(DATABASE_IDS.AWS) - # temporarily using demo bucket before permissions are granted - initialized_handler = handler( - bucket_name="cellpack-demo", - sub_folder_name="runs", - region_name="us-west-2", - ) - uploader = DBUploader(db_handler=initialized_handler) - uploader.upload_packing_results_workflow( - source_folder=env.out_folder, - recipe_name=recipe_data["name"], - job_id=job_id, - config_data=packing_config_data, - recipe_data=recipe_loader.serializable_recipe_data, - ) + if docker and hash: + handler = DATABASE_IDS.handlers().get(DATABASE_IDS.AWS) + # temporarily using demo bucket before permissions are granted + initialized_handler = handler( + bucket_name="cellpack-demo", + sub_folder_name="runs", + region_name="us-west-2", + ) + uploader = DBUploader(db_handler=initialized_handler) + uploader.upload_packing_results_workflow( + source_folder=env.out_folder, + recipe_name=recipe_data["name"], + dedup_hash=hash, + config_data=packing_config_data, + recipe_data=recipe_loader.serializable_recipe_data, + ) def main(): diff --git a/cellpack/tests/test_db_uploader.py b/cellpack/tests/test_db_uploader.py index 0c91cbd52..414f6b9c5 100644 --- a/cellpack/tests/test_db_uploader.py +++ b/cellpack/tests/test_db_uploader.py @@ -175,3 +175,56 @@ def test_upload_recipe(): "A": "firebase:composition/test_id", } assert recipe_doc.objects_to_path_map == {"sphere_25": "firebase:objects/test_id"} + + +def test_upload_job_status_with_firebase_handler(): + mock_firebase_db = MagicMock() + mock_firebase_db.create_timestamp.return_value = "test_timestamp" + # firebaseHandler does not have s3_client attribute + del mock_firebase_db.s3_client + + uploader = DBUploader(mock_firebase_db) + uploader.upload_job_status("test_hash", "RUNNING") + + mock_firebase_db.create_timestamp.assert_called_once() + mock_firebase_db.update_or_create.assert_called_once_with( + "job_status", + "test_hash", + { + "timestamp": "test_timestamp", + "status": "RUNNING", + "error_message": None, + }, + ) + + +def test_upload_job_status_with_aws_handler(): + mock_aws_db = MagicMock() + mock_aws_db.s3_client = MagicMock() # AWSHandler has s3_client + + mock_firebase_handler = MagicMock() + mock_firebase_handler.create_timestamp.return_value = "firebase_timestamp" + + with patch( + "cellpack.autopack.DBRecipeHandler.DATABASE_IDS.handlers" + ) as mock_handlers: + mock_handlers.return_value.get.return_value = ( + lambda default_db: mock_firebase_handler + ) + + uploader = DBUploader(mock_aws_db) + uploader.upload_job_status("test_hash", "DONE", result_path="test_path") + + mock_firebase_handler.create_timestamp.assert_called_once() + mock_firebase_handler.update_or_create.assert_called_once_with( + "job_status", + "test_hash", + { + "timestamp": "firebase_timestamp", + "status": "DONE", + "error_message": None, + "result_path": "test_path", + }, + ) + # AWS handler should not be called for timestamp + mock_aws_db.create_timestamp.assert_not_called() diff --git a/docker/Dockerfile.ecs b/docker/Dockerfile.ecs index 7a303c023..5ed89b7bd 100644 --- a/docker/Dockerfile.ecs +++ b/docker/Dockerfile.ecs @@ -4,7 +4,7 @@ WORKDIR /cellpack COPY . /cellpack RUN python -m pip install --upgrade pip --root-user-action=ignore -RUN python -m pip install . -r requirements/linux/requirements.txt --root-user-action=ignore +RUN python -m pip install . --root-user-action=ignore EXPOSE 80 diff --git a/docker/server.py b/docker/server.py index 581e0151d..74bb20f30 100644 --- a/docker/server.py +++ b/docker/server.py @@ -1,8 +1,6 @@ import asyncio from aiohttp import web -import os -import uuid -from cellpack.autopack.DBRecipeHandler import DBUploader +from cellpack.autopack.DBRecipeHandler import DataDoc, DBUploader from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS from cellpack.bin.pack import pack @@ -12,41 +10,61 @@ class CellpackServer: def __init__(self): self.packing_tasks = set() - async def run_packing(self, recipe, config, job_id): - os.environ["AWS_BATCH_JOB_ID"] = job_id - self.update_job_status(job_id, "RUNNING") + def _get_firebase_handler(self, database_name="firebase"): + handler = DATABASE_IDS.handlers().get(database_name) + initialized_db = handler(default_db="staging") + if initialized_db._initialized: + return initialized_db + return None + + def job_exists(self, dedup_hash): + db = self._get_firebase_handler() + if not db: + return False + + job_status, _ = db.get_doc_by_id("job_status", dedup_hash) + return job_status is not None + + async def run_packing(self, dedup_hash, recipe=None, config=None, body=None): + self.update_job_status(dedup_hash, "RUNNING") try: - pack(recipe=recipe, config_path=config, docker=True) + # Pack JSON recipe in body if provided, otherwise use recipe path + pack(recipe=(body if body else recipe), config_path=config, docker=True, hash=dedup_hash) except Exception as e: - self.update_job_status(job_id, "FAILED", error_message=str(e)) + self.update_job_status(dedup_hash, "FAILED", error_message=str(e)) - def update_job_status(self, job_id, status, result_path=None, error_message=None): - handler = DATABASE_IDS.handlers().get("firebase") - initialized_db = handler( - default_db="staging" - ) - if initialized_db._initialized: - db_uploader = DBUploader(initialized_db) - db_uploader.upload_job_status(job_id, status, result_path, error_message) + def update_job_status(self, dedup_hash, status, result_path=None, error_message=None): + db = self._get_firebase_handler() + if db: + db_uploader = DBUploader(db) + db_uploader.upload_job_status(dedup_hash, status, result_path, error_message) async def hello_world(self, request: web.Request) -> web.Response: return web.Response(text="Hello from the cellPACK server") async def health_check(self, request: web.Request) -> web.Response: - # healthcheck endpoint needed for AWS load balancer + # health check endpoint needed for AWS load balancer return web.Response() async def pack_handler(self, request: web.Request) -> web.Response: - recipe = request.rel_url.query.get("recipe") - if recipe is None: + recipe = request.rel_url.query.get("recipe") or "" + if request.can_read_body: + body = await request.json() + else: + body = None + if not recipe and not body: raise web.HTTPBadRequest( "Pack requests must include recipe as a query param" ) config = request.rel_url.query.get("config") - job_id = str(uuid.uuid4()) + + dedup_hash = DataDoc.generate_hash(body) + + if self.job_exists(dedup_hash): + return web.json_response({"jobId": dedup_hash}) # Initiate packing task to run in background - packing_task = asyncio.create_task(self.run_packing(recipe, config, job_id)) + packing_task = asyncio.create_task(self.run_packing(dedup_hash, recipe, config, body)) # Keep track of task references to prevent them from being garbage # collected, then discard after task completion @@ -55,7 +73,7 @@ async def pack_handler(self, request: web.Request) -> web.Response: # return job id immediately, rather than wait for task to complete, # to avoid timeout issues with API gateway - return web.json_response({"jobId": job_id}) + return web.json_response({"jobId": dedup_hash}) async def init_app() -> web.Application: @@ -70,4 +88,4 @@ async def init_app() -> web.Application: ) return app -web.run_app(init_app(), host="0.0.0.0", port=SERVER_PORT) \ No newline at end of file +web.run_app(init_app(), host="0.0.0.0", port=SERVER_PORT) diff --git a/docs/DOCKER.md b/docs/DOCKER.md index a1214a335..48e4b6ddd 100644 --- a/docs/DOCKER.md +++ b/docs/DOCKER.md @@ -13,6 +13,6 @@ ## AWS ECS Docker Image 1. Build image, running `docker build -f docker/Dockerfile.ecs -t [CONTAINER-NAME] .` 2. Run packings in the container, running: `docker run -v ~/.aws:/root/.aws -p 80:80 [CONTAINER-NAME]` -3. Try hitting the test endpoint on the server, by navigating to `http://0.0.0.0:8443/hello` in your browser. -4. Try running a packing on the server, by hitting the `http://0.0.0.0:80/pack?recipe=firebase:recipes/one_sphere_v_1.0.0` in your browser. +3. Try hitting the test endpoint on the server, by navigating to `http://0.0.0.0:80/hello` in your browser. +4. Try running a packing on the server, by hitting the `http://0.0.0.0:80/start-packing?recipe=firebase:recipes/one_sphere_v_1.0.0` in your browser. 5. Verify that the packing result path was uploaded to the firebase results table, with the job id specified in the response from the request in step 4.The result simularium file can be found at the s3 path specified there. \ No newline at end of file