From ceb3f9501a60d027ee6c291899292c3e33c3fe4e Mon Sep 17 00:00:00 2001 From: Ruge Li Date: Tue, 20 Jan 2026 10:12:33 -0800 Subject: [PATCH 01/15] remove os fetch for job_id --- cellpack/bin/pack.py | 36 +++++++++++++++++------------------- docker/server.py | 4 +--- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/cellpack/bin/pack.py b/cellpack/bin/pack.py index 99186c9d..0608c594 100644 --- a/cellpack/bin/pack.py +++ b/cellpack/bin/pack.py @@ -1,6 +1,5 @@ import logging import logging.config -import os import time from pathlib import Path @@ -31,6 +30,7 @@ def pack( docker=False, validate=True, json_recipe=None, + job_id=None, ): """ Initializes an autopack packing from the command line @@ -83,24 +83,22 @@ def pack( env.buildGrid(rebuild=True) env.pack_grid(verbose=0, usePP=False) - if docker: - job_id = os.environ.get("AWS_BATCH_JOB_ID", None) - if job_id: - handler = DATABASE_IDS.handlers().get(DATABASE_IDS.AWS) - # temporarily using demo bucket before permissions are granted - initialized_handler = handler( - bucket_name="cellpack-demo", - sub_folder_name="runs", - region_name="us-west-2", - ) - uploader = DBUploader(db_handler=initialized_handler) - uploader.upload_packing_results_workflow( - source_folder=env.out_folder, - recipe_name=recipe_data["name"], - job_id=job_id, - config_data=packing_config_data, - recipe_data=recipe_loader.serializable_recipe_data, - ) + if docker and job_id: + handler = DATABASE_IDS.handlers().get(DATABASE_IDS.AWS) + # temporarily using demo bucket before permissions are granted + initialized_handler = handler( + bucket_name="cellpack-demo", + sub_folder_name="runs", + region_name="us-west-2", + ) + uploader = DBUploader(db_handler=initialized_handler) + uploader.upload_packing_results_workflow( + source_folder=env.out_folder, + recipe_name=recipe_data["name"], + job_id=job_id, + config_data=packing_config_data, + recipe_data=recipe_loader.serializable_recipe_data, + ) def main(): diff --git a/docker/server.py b/docker/server.py index ce6da7ee..477cf011 100644 --- a/docker/server.py +++ b/docker/server.py @@ -1,6 +1,5 @@ import asyncio from aiohttp import web -import os import uuid from cellpack.autopack.DBRecipeHandler import DBUploader from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS @@ -13,10 +12,9 @@ def __init__(self): self.packing_tasks = set() async def run_packing(self, recipe, config, job_id, body=None): - os.environ["AWS_BATCH_JOB_ID"] = job_id self.update_job_status(job_id, "RUNNING") try: - pack(recipe=recipe, config_path=config, docker=True, json_recipe=body) + pack(recipe=recipe, config_path=config, docker=True, json_recipe=body, job_id=job_id) except Exception as e: self.update_job_status(job_id, "FAILED", error_message=str(e)) From 81fc3e7c18149e6918315adbf62768b7f2cf97fa Mon Sep 17 00:00:00 2001 From: Ruge Li Date: Tue, 20 Jan 2026 10:20:00 -0800 Subject: [PATCH 02/15] use dedup_hash instead of job id --- docker/server.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/docker/server.py b/docker/server.py index 477cf011..0a0e75b1 100644 --- a/docker/server.py +++ b/docker/server.py @@ -1,7 +1,7 @@ import asyncio from aiohttp import web import uuid -from cellpack.autopack.DBRecipeHandler import DBUploader +from cellpack.autopack.DBRecipeHandler import DataDoc, DBUploader from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS from cellpack.bin.pack import pack @@ -45,7 +45,12 @@ async def pack_handler(self, request: web.Request) -> web.Response: "Pack requests must include recipe as a query param" ) config = request.rel_url.query.get("config") - job_id = str(uuid.uuid4()) + + if body: + job_id = DataDoc.generate_hash(body) + # TODO: keep uuid at all? + else: + job_id = str(uuid.uuid4()) # Initiate packing task to run in background packing_task = asyncio.create_task(self.run_packing(recipe, config, job_id, body)) From a2dbe7539036df2ac07f842e338a765f2bb7859b Mon Sep 17 00:00:00 2001 From: Ruge Li Date: Tue, 20 Jan 2026 11:51:41 -0800 Subject: [PATCH 03/15] proposal: get hash from recipe loader --- cellpack/autopack/loaders/recipe_loader.py | 19 +++++++- docker/server.py | 53 ++++++++++++++++------ 2 files changed, 58 insertions(+), 14 deletions(-) diff --git a/cellpack/autopack/loaders/recipe_loader.py b/cellpack/autopack/loaders/recipe_loader.py index cf87a10c..2d3a4c61 100644 --- a/cellpack/autopack/loaders/recipe_loader.py +++ b/cellpack/autopack/loaders/recipe_loader.py @@ -7,7 +7,7 @@ import cellpack.autopack as autopack -from cellpack.autopack.DBRecipeHandler import DBRecipeLoader +from cellpack.autopack.DBRecipeHandler import DataDoc, DBRecipeLoader from cellpack.autopack.interface_objects import ( Representations, default_recipe_values, @@ -55,6 +55,23 @@ def __init__( autopack.CURRENT_RECIPE_PATH = os.path.dirname(self.file_path) self.recipe_data = self._read(use_docker=use_docker) + # calculate dedup_hash from the normalized recipe data + self.dedup_hash = DataDoc.generate_hash(self.serializable_recipe_data) + + @staticmethod + def get_dedup_hash(recipe_path, json_recipe=None, use_docker=False): + """ + Load recipe and return its dedup_hash. + This method loads and normalizes the recipe to ensure consistent hashing + regardless of source (local file, firebase, or JSON body). + """ + loader = RecipeLoader( + recipe_path, + save_converted_recipe=False, + use_docker=use_docker, + json_recipe=json_recipe, + ) + return loader.dedup_hash @staticmethod def _resolve_object(key, objects): diff --git a/docker/server.py b/docker/server.py index 0a0e75b1..9bd3aad7 100644 --- a/docker/server.py +++ b/docker/server.py @@ -1,8 +1,8 @@ import asyncio from aiohttp import web -import uuid -from cellpack.autopack.DBRecipeHandler import DataDoc, DBUploader +from cellpack.autopack.DBRecipeHandler import DBUploader from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS +from cellpack.autopack.loaders.recipe_loader import RecipeLoader from cellpack.bin.pack import pack SERVER_PORT = 80 @@ -11,6 +11,28 @@ class CellpackServer: def __init__(self): self.packing_tasks = set() + def _get_firebase_handler(self, database_name="firebase"): + handler = DATABASE_IDS.handlers().get(database_name) + initialized_db = handler(default_db="staging") + if initialized_db._initialized: + return initialized_db + return None + + def get_cached_result(self, job_id): + """ + Check if a completed result already exists for this job_id (dedup_hash). + Returns the cached result data if found with status DONE, otherwise None. + """ + db = self._get_firebase_handler() + if not db: + return None + + job_status, _ = db.get_doc_by_id("job_status", job_id) + if job_status and job_status.get("status") == "DONE": + # TODO: if the same recipe is submitted again quickly, the status may not be updated in time ("RUNNING"), discuss if we need to handle this case + return job_status + return None + async def run_packing(self, recipe, config, job_id, body=None): self.update_job_status(job_id, "RUNNING") try: @@ -19,12 +41,9 @@ async def run_packing(self, recipe, config, job_id, body=None): self.update_job_status(job_id, "FAILED", error_message=str(e)) def update_job_status(self, job_id, status, result_path=None, error_message=None): - handler = DATABASE_IDS.handlers().get("firebase") - initialized_db = handler( - default_db="staging" - ) - if initialized_db._initialized: - db_uploader = DBUploader(initialized_db) + db = self._get_firebase_handler() + if db: + db_uploader = DBUploader(db) db_uploader.upload_job_status(job_id, status, result_path, error_message) async def hello_world(self, request: web.Request) -> web.Response: @@ -46,11 +65,19 @@ async def pack_handler(self, request: web.Request) -> web.Response: ) config = request.rel_url.query.get("config") - if body: - job_id = DataDoc.generate_hash(body) - # TODO: keep uuid at all? - else: - job_id = str(uuid.uuid4()) + # calculate dedup_hash from normalized recipe content + # TODO: discuss when to hash firebase recipes(has references) vs raw json, this currently loads and processes the recipe twice (one here and once in pack()) + job_id = RecipeLoader.get_dedup_hash(recipe, json_recipe=body, use_docker=True) + + cached_result = self.get_cached_result(job_id) + if cached_result: + return web.json_response({ + "jobId": job_id, + "status": "DONE", + "cached": True, + "outputs_directory": cached_result.get("outputs_directory"), + "result_path": cached_result.get("result_path"), + }) # Initiate packing task to run in background packing_task = asyncio.create_task(self.run_packing(recipe, config, job_id, body)) From 7f3d2edb25d7a9d61bdbc6e2f3efab697e746bb1 Mon Sep 17 00:00:00 2001 From: Ruge Li Date: Tue, 20 Jan 2026 12:50:57 -0800 Subject: [PATCH 04/15] renaming and add TODOs --- cellpack/autopack/DBRecipeHandler.py | 38 +++++++++---------- .../upy/simularium/simularium_helper.py | 11 +++--- cellpack/bin/pack.py | 8 ++-- docker/server.py | 31 ++++++++------- 4 files changed, 45 insertions(+), 43 deletions(-) diff --git a/cellpack/autopack/DBRecipeHandler.py b/cellpack/autopack/DBRecipeHandler.py index 8b4e8578..5c48270d 100644 --- a/cellpack/autopack/DBRecipeHandler.py +++ b/cellpack/autopack/DBRecipeHandler.py @@ -529,7 +529,7 @@ def upload_config(self, config_data, source_path): self.db.update_doc("configs", id, config_data) return id - def upload_result_metadata(self, file_name, url, job_id=None): + def upload_result_metadata(self, file_name, url, dedup_hash=None): """ Upload the metadata of the result file to the database. """ @@ -543,21 +543,21 @@ def upload_result_metadata(self, file_name, url, job_id=None): "user": username, "timestamp": timestamp, "url": url, - "batch_job_id": job_id, + "dedup_hash": dedup_hash, }, ) - if job_id: - self.upload_job_status(job_id, "DONE", result_path=url) + if dedup_hash: + self.upload_job_status(dedup_hash, "DONE", result_path=url) - def upload_job_status(self, job_id, status, result_path=None, error_message=None): + def upload_job_status(self, dedup_hash, status, result_path=None, error_message=None): """ - Update status for a given job ID + Update status for a given dedup_hash """ if self.db: timestamp = self.db.create_timestamp() self.db.update_or_create( "job_status", - job_id, + dedup_hash, { "timestamp": timestamp, "status": str(status), @@ -583,7 +583,7 @@ def upload_packing_results_workflow( self, source_folder, recipe_name, - job_id, + dedup_hash, config_data, recipe_data, ): @@ -591,7 +591,7 @@ def upload_packing_results_workflow( Complete packing results upload workflow including folder preparation and s3 upload """ try: - if job_id: + if dedup_hash: source_path = Path(source_folder) if not source_path.exists(): @@ -601,7 +601,7 @@ def upload_packing_results_workflow( # prepare unique S3 upload folder parent_folder = source_path.parent - unique_folder_name = f"{source_path.name}_run_{job_id}" + unique_folder_name = f"{source_path.name}_run_{dedup_hash}" s3_upload_folder = parent_folder / unique_folder_name logging.debug(f"outputs will be copied to: {s3_upload_folder}") @@ -618,7 +618,7 @@ def upload_packing_results_workflow( upload_result = self.upload_outputs_to_s3( output_folder=s3_upload_folder, recipe_name=recipe_name, - job_id=job_id, + dedup_hash=dedup_hash, ) # clean up temporary folder after upload @@ -630,7 +630,7 @@ def upload_packing_results_workflow( # update outputs directory in firebase self.update_outputs_directory( - job_id, upload_result.get("outputs_directory") + dedup_hash, upload_result.get("outputs_directory") ) return upload_result @@ -639,7 +639,7 @@ def upload_packing_results_workflow( logging.error(e) return {"success": False, "error": e} - def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): + def upload_outputs_to_s3(self, output_folder, recipe_name, dedup_hash): """ Upload packing outputs to S3 bucket """ @@ -647,7 +647,7 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): bucket_name = self.db.bucket_name region_name = self.db.region_name sub_folder_name = self.db.sub_folder_name - s3_prefix = f"{sub_folder_name}/{recipe_name}/{job_id}" + s3_prefix = f"{sub_folder_name}/{recipe_name}/{dedup_hash}" try: upload_result = self.db.upload_directory( @@ -671,7 +671,7 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): return { "success": True, - "run_id": job_id, + "dedup_hash": dedup_hash, "s3_bucket": bucket_name, "s3_prefix": s3_prefix, "public_url_base": f"{base_url}/{s3_prefix}/", @@ -685,23 +685,23 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): logging.error(e) return {"success": False, "error": e} - def update_outputs_directory(self, job_id, outputs_directory): + def update_outputs_directory(self, dedup_hash, outputs_directory): if not self.db or self.db.s3_client: # switch to firebase handler to update job status handler = DATABASE_IDS.handlers().get("firebase") initialized_db = handler(default_db="staging") - if job_id: + if dedup_hash: timestamp = initialized_db.create_timestamp() initialized_db.update_or_create( "job_status", - job_id, + dedup_hash, { "timestamp": timestamp, "outputs_directory": outputs_directory, }, ) logging.debug( - f"Updated outputs s3 location {outputs_directory} for job ID: {job_id}" + f"Updated outputs s3 location {outputs_directory} for dedup_hash: {dedup_hash}" ) diff --git a/cellpack/autopack/upy/simularium/simularium_helper.py b/cellpack/autopack/upy/simularium/simularium_helper.py index 86af0874..61a848d3 100644 --- a/cellpack/autopack/upy/simularium/simularium_helper.py +++ b/cellpack/autopack/upy/simularium/simularium_helper.py @@ -1388,13 +1388,14 @@ def raycast_test(self, obj, start, end, length, **kw): def post_and_open_file(self, file_name, open_results_in_browser): simularium_file = Path(f"{file_name}.simularium") url = None - job_id = os.environ.get("AWS_BATCH_JOB_ID", None) + # TODO: refactor to receive dedup_hash as parameter instead of reading from environment, and address the todo in L1410 + dedup_hash = os.environ.get("AWS_BATCH_JOB_ID", None) file_name, url = simulariumHelper.store_result_file( - simularium_file, storage="aws", batch_job_id=job_id + simularium_file, storage="aws", batch_job_id=dedup_hash ) if file_name and url: simulariumHelper.store_metadata( - file_name, url, db="firebase", job_id=job_id + file_name, url, db="firebase", dedup_hash=dedup_hash ) if open_results_in_browser: simulariumHelper.open_in_simularium(url) @@ -1428,7 +1429,7 @@ def store_result_file( return file_name, url @staticmethod - def store_metadata(file_name, url, db=None, job_id=None): + def store_metadata(file_name, url, db=None, dedup_hash=None): if db == "firebase": handler = DATABASE_IDS.handlers().get(db) initialized_db = handler( @@ -1436,7 +1437,7 @@ def store_metadata(file_name, url, db=None, job_id=None): ) # default to staging for metadata uploads if initialized_db._initialized: db_uploader = DBUploader(initialized_db) - db_uploader.upload_result_metadata(file_name, url, job_id) + db_uploader.upload_result_metadata(file_name, url, dedup_hash) else: db_maintainer = DBMaintenance(initialized_db) logging.warning( diff --git a/cellpack/bin/pack.py b/cellpack/bin/pack.py index 0608c594..c6f8f869 100644 --- a/cellpack/bin/pack.py +++ b/cellpack/bin/pack.py @@ -30,7 +30,7 @@ def pack( docker=False, validate=True, json_recipe=None, - job_id=None, + dedup_hash=None, ): """ Initializes an autopack packing from the command line @@ -39,6 +39,8 @@ def pack( :param analysis_config_path: string argument, path to analysis config file :param docker: boolean argument, are we using docker :param validate: boolean argument, validate recipe before packing + :param json_recipe: dict argument, recipe data passed directly + :param dedup_hash: string argument, hash identifier for tracking/caching results :return: void """ @@ -83,7 +85,7 @@ def pack( env.buildGrid(rebuild=True) env.pack_grid(verbose=0, usePP=False) - if docker and job_id: + if docker and dedup_hash: handler = DATABASE_IDS.handlers().get(DATABASE_IDS.AWS) # temporarily using demo bucket before permissions are granted initialized_handler = handler( @@ -95,7 +97,7 @@ def pack( uploader.upload_packing_results_workflow( source_folder=env.out_folder, recipe_name=recipe_data["name"], - job_id=job_id, + dedup_hash=dedup_hash, config_data=packing_config_data, recipe_data=recipe_loader.serializable_recipe_data, ) diff --git a/docker/server.py b/docker/server.py index 9bd3aad7..780dfa19 100644 --- a/docker/server.py +++ b/docker/server.py @@ -18,33 +18,33 @@ def _get_firebase_handler(self, database_name="firebase"): return initialized_db return None - def get_cached_result(self, job_id): + def get_cached_result(self, dedup_hash): """ - Check if a completed result already exists for this job_id (dedup_hash). + Check if a completed result already exists for this dedup_hash. Returns the cached result data if found with status DONE, otherwise None. """ db = self._get_firebase_handler() if not db: return None - job_status, _ = db.get_doc_by_id("job_status", job_id) + job_status, _ = db.get_doc_by_id("job_status", dedup_hash) if job_status and job_status.get("status") == "DONE": # TODO: if the same recipe is submitted again quickly, the status may not be updated in time ("RUNNING"), discuss if we need to handle this case return job_status return None - async def run_packing(self, recipe, config, job_id, body=None): - self.update_job_status(job_id, "RUNNING") + async def run_packing(self, recipe, config, dedup_hash, body=None): + self.update_job_status(dedup_hash, "RUNNING") try: - pack(recipe=recipe, config_path=config, docker=True, json_recipe=body, job_id=job_id) + pack(recipe=recipe, config_path=config, docker=True, json_recipe=body, dedup_hash=dedup_hash) except Exception as e: - self.update_job_status(job_id, "FAILED", error_message=str(e)) + self.update_job_status(dedup_hash, "FAILED", error_message=str(e)) - def update_job_status(self, job_id, status, result_path=None, error_message=None): + def update_job_status(self, dedup_hash, status, result_path=None, error_message=None): db = self._get_firebase_handler() if db: db_uploader = DBUploader(db) - db_uploader.upload_job_status(job_id, status, result_path, error_message) + db_uploader.upload_job_status(dedup_hash, status, result_path, error_message) async def hello_world(self, request: web.Request) -> web.Response: return web.Response(text="Hello from the cellPACK server") @@ -67,12 +67,12 @@ async def pack_handler(self, request: web.Request) -> web.Response: # calculate dedup_hash from normalized recipe content # TODO: discuss when to hash firebase recipes(has references) vs raw json, this currently loads and processes the recipe twice (one here and once in pack()) - job_id = RecipeLoader.get_dedup_hash(recipe, json_recipe=body, use_docker=True) + dedup_hash = RecipeLoader.get_dedup_hash(recipe, json_recipe=body, use_docker=True) - cached_result = self.get_cached_result(job_id) + cached_result = self.get_cached_result(dedup_hash) if cached_result: return web.json_response({ - "jobId": job_id, + "jobId": dedup_hash, # keep "jobId" for backwards compatibility "status": "DONE", "cached": True, "outputs_directory": cached_result.get("outputs_directory"), @@ -80,16 +80,15 @@ async def pack_handler(self, request: web.Request) -> web.Response: }) # Initiate packing task to run in background - packing_task = asyncio.create_task(self.run_packing(recipe, config, job_id, body)) + packing_task = asyncio.create_task(self.run_packing(recipe, config, dedup_hash, body)) # Keep track of task references to prevent them from being garbage # collected, then discard after task completion self.packing_tasks.add(packing_task) packing_task.add_done_callback(self.packing_tasks.discard) - # return job id immediately, rather than wait for task to complete, - # to avoid timeout issues with API gateway - return web.json_response({"jobId": job_id}) + # return dedup_hash as "jobId" for backwards compatibility + return web.json_response({"jobId": dedup_hash}) async def init_app() -> web.Application: From faf2c106934614810444b42dbfd5a6569f026b33 Mon Sep 17 00:00:00 2001 From: Ruge Li Date: Tue, 20 Jan 2026 14:18:01 -0800 Subject: [PATCH 05/15] format --- cellpack/autopack/DBRecipeHandler.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cellpack/autopack/DBRecipeHandler.py b/cellpack/autopack/DBRecipeHandler.py index 5c48270d..2655758b 100644 --- a/cellpack/autopack/DBRecipeHandler.py +++ b/cellpack/autopack/DBRecipeHandler.py @@ -549,7 +549,9 @@ def upload_result_metadata(self, file_name, url, dedup_hash=None): if dedup_hash: self.upload_job_status(dedup_hash, "DONE", result_path=url) - def upload_job_status(self, dedup_hash, status, result_path=None, error_message=None): + def upload_job_status( + self, dedup_hash, status, result_path=None, error_message=None + ): """ Update status for a given dedup_hash """ @@ -630,7 +632,8 @@ def upload_packing_results_workflow( # update outputs directory in firebase self.update_outputs_directory( - dedup_hash, upload_result.get("outputs_directory") + dedup_hash, + upload_result.get("outputs_directory"), ) return upload_result @@ -697,11 +700,12 @@ def update_outputs_directory(self, dedup_hash, outputs_directory): dedup_hash, { "timestamp": timestamp, + "status": "DONE", "outputs_directory": outputs_directory, }, ) logging.debug( - f"Updated outputs s3 location {outputs_directory} for dedup_hash: {dedup_hash}" + f"Updated status to DONE, outputs_directory={outputs_directory} for dedup_hash: {dedup_hash}" ) From 50304d8c0b95b566272e9e55407818e0c2b2e4c6 Mon Sep 17 00:00:00 2001 From: Ruge Li Date: Tue, 20 Jan 2026 16:23:14 -0800 Subject: [PATCH 06/15] rename param to hash --- cellpack/bin/pack.py | 8 ++++---- docker/server.py | 6 ++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/cellpack/bin/pack.py b/cellpack/bin/pack.py index c6f8f869..9fc9bf8b 100644 --- a/cellpack/bin/pack.py +++ b/cellpack/bin/pack.py @@ -30,7 +30,7 @@ def pack( docker=False, validate=True, json_recipe=None, - dedup_hash=None, + hash=None, ): """ Initializes an autopack packing from the command line @@ -40,7 +40,7 @@ def pack( :param docker: boolean argument, are we using docker :param validate: boolean argument, validate recipe before packing :param json_recipe: dict argument, recipe data passed directly - :param dedup_hash: string argument, hash identifier for tracking/caching results + :param hash: string argument, dedup hash identifier for tracking/caching results :return: void """ @@ -85,7 +85,7 @@ def pack( env.buildGrid(rebuild=True) env.pack_grid(verbose=0, usePP=False) - if docker and dedup_hash: + if docker and hash: handler = DATABASE_IDS.handlers().get(DATABASE_IDS.AWS) # temporarily using demo bucket before permissions are granted initialized_handler = handler( @@ -97,7 +97,7 @@ def pack( uploader.upload_packing_results_workflow( source_folder=env.out_folder, recipe_name=recipe_data["name"], - dedup_hash=dedup_hash, + dedup_hash=hash, config_data=packing_config_data, recipe_data=recipe_loader.serializable_recipe_data, ) diff --git a/docker/server.py b/docker/server.py index 780dfa19..76d3e6b2 100644 --- a/docker/server.py +++ b/docker/server.py @@ -36,7 +36,7 @@ def get_cached_result(self, dedup_hash): async def run_packing(self, recipe, config, dedup_hash, body=None): self.update_job_status(dedup_hash, "RUNNING") try: - pack(recipe=recipe, config_path=config, docker=True, json_recipe=body, dedup_hash=dedup_hash) + pack(recipe=recipe, config_path=config, docker=True, json_recipe=body, hash=dedup_hash) except Exception as e: self.update_job_status(dedup_hash, "FAILED", error_message=str(e)) @@ -72,9 +72,8 @@ async def pack_handler(self, request: web.Request) -> web.Response: cached_result = self.get_cached_result(dedup_hash) if cached_result: return web.json_response({ - "jobId": dedup_hash, # keep "jobId" for backwards compatibility + "jobId": dedup_hash, "status": "DONE", - "cached": True, "outputs_directory": cached_result.get("outputs_directory"), "result_path": cached_result.get("result_path"), }) @@ -87,7 +86,6 @@ async def pack_handler(self, request: web.Request) -> web.Response: self.packing_tasks.add(packing_task) packing_task.add_done_callback(self.packing_tasks.discard) - # return dedup_hash as "jobId" for backwards compatibility return web.json_response({"jobId": dedup_hash}) From f127956d8beb6ff16ca3738fa9244b6d10b40547 Mon Sep 17 00:00:00 2001 From: Ruge Li Date: Fri, 23 Jan 2026 12:16:42 -0800 Subject: [PATCH 07/15] remove unused validate param and doc strings in pack --- cellpack/bin/pack.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/cellpack/bin/pack.py b/cellpack/bin/pack.py index 4775946c..d5879358 100644 --- a/cellpack/bin/pack.py +++ b/cellpack/bin/pack.py @@ -28,7 +28,6 @@ def pack( config_path=None, analysis_config_path=None, docker=False, - validate=True, hash=None, ): """ @@ -37,8 +36,6 @@ def pack( :param config_path: string argument, path to packing config file :param analysis_config_path: string argument, path to analysis config file :param docker: boolean argument, are we using docker - :param validate: boolean argument, validate recipe before packing - :param json_recipe: dict argument, recipe data passed directly :param hash: string argument, dedup hash identifier for tracking/caching results :return: void From 60a43870f176c20be1cf0477ccb225390567e91d Mon Sep 17 00:00:00 2001 From: Ruge Li Date: Fri, 23 Jan 2026 14:39:54 -0800 Subject: [PATCH 08/15] simplify get_ dedup_hash --- cellpack/autopack/loaders/recipe_loader.py | 19 +----------- docker/server.py | 34 +++++++--------------- 2 files changed, 12 insertions(+), 41 deletions(-) diff --git a/cellpack/autopack/loaders/recipe_loader.py b/cellpack/autopack/loaders/recipe_loader.py index e3fe54a3..bbdb662a 100644 --- a/cellpack/autopack/loaders/recipe_loader.py +++ b/cellpack/autopack/loaders/recipe_loader.py @@ -7,7 +7,7 @@ import cellpack.autopack as autopack -from cellpack.autopack.DBRecipeHandler import DataDoc, DBRecipeLoader +from cellpack.autopack.DBRecipeHandler import DBRecipeLoader from cellpack.autopack.interface_objects import ( Representations, default_recipe_values, @@ -55,23 +55,6 @@ def __init__( autopack.CURRENT_RECIPE_PATH = os.path.dirname(self.file_path) self.recipe_data = self._read(use_docker=use_docker) - # calculate dedup_hash from the normalized recipe data - self.dedup_hash = DataDoc.generate_hash(self.serializable_recipe_data) - - @staticmethod - def get_dedup_hash(recipe_path, json_recipe=None, use_docker=False): - """ - Load recipe and return its dedup_hash. - This method loads and normalizes the recipe to ensure consistent hashing - regardless of source (local file, firebase, or JSON body). - """ - loader = RecipeLoader( - recipe_path, - save_converted_recipe=False, - use_docker=use_docker, - json_recipe=json_recipe, - ) - return loader.dedup_hash @classmethod def from_json(cls, json_recipe, save_converted_recipe=False, use_docker=False): diff --git a/docker/server.py b/docker/server.py index ac3e6212..addfaad9 100644 --- a/docker/server.py +++ b/docker/server.py @@ -1,8 +1,7 @@ import asyncio from aiohttp import web -from cellpack.autopack.DBRecipeHandler import DBUploader +from cellpack.autopack.DBRecipeHandler import DataDoc, DBUploader from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS -from cellpack.autopack.loaders.recipe_loader import RecipeLoader from cellpack.bin.pack import pack SERVER_PORT = 80 @@ -18,22 +17,19 @@ def _get_firebase_handler(self, database_name="firebase"): return initialized_db return None - def get_cached_result(self, dedup_hash): + def job_exists(self, dedup_hash): """ - Check if a completed result already exists for this dedup_hash. - Returns the cached result data if found with status DONE, otherwise None. + Check if a job already exists for this dedup_hash. + Returns True if a document exists, False otherwise. """ db = self._get_firebase_handler() if not db: - return None + return False job_status, _ = db.get_doc_by_id("job_status", dedup_hash) - if job_status and job_status.get("status") == "DONE": - # TODO: if the same recipe is submitted again quickly, the status may not be updated in time ("RUNNING"), discuss if we need to handle this case - return job_status - return None + return job_status is not None - async def run_packing(self, recipe=None, config=None, dedup_hash, body=None): + async def run_packing(self, dedup_hash, recipe=None, config=None, body=None): self.update_job_status(dedup_hash, "RUNNING") try: # Pack JSON recipe in body if provided, otherwise use recipe path @@ -66,21 +62,13 @@ async def pack_handler(self, request: web.Request) -> web.Response: ) config = request.rel_url.query.get("config") - # calculate dedup_hash from normalized recipe content - # TODO: discuss when to hash firebase recipes(has references) vs raw json, this currently loads and processes the recipe twice (one here and once in pack()) - dedup_hash = RecipeLoader.get_dedup_hash(recipe, json_recipe=body, use_docker=True) + dedup_hash = DataDoc.generate_hash(body) - cached_result = self.get_cached_result(dedup_hash) - if cached_result: - return web.json_response({ - "jobId": dedup_hash, - "status": "DONE", - "outputs_directory": cached_result.get("outputs_directory"), - "result_path": cached_result.get("result_path"), - }) + if self.job_exists(dedup_hash): + return web.json_response({"jobId": dedup_hash}) # Initiate packing task to run in background - packing_task = asyncio.create_task(self.run_packing(recipe, config, dedup_hash, body)) + packing_task = asyncio.create_task(self.run_packing(dedup_hash, recipe, config, body)) # Keep track of task references to prevent them from being garbage # collected, then discard after task completion From 61d2ec85d66d2927c42b4026438be9d1fc92634f Mon Sep 17 00:00:00 2001 From: Ruge Li Date: Fri, 23 Jan 2026 15:01:08 -0800 Subject: [PATCH 09/15] refactor job_status update --- cellpack/autopack/DBRecipeHandler.py | 53 ++++++++++------------------ 1 file changed, 19 insertions(+), 34 deletions(-) diff --git a/cellpack/autopack/DBRecipeHandler.py b/cellpack/autopack/DBRecipeHandler.py index 2655758b..023fc2de 100644 --- a/cellpack/autopack/DBRecipeHandler.py +++ b/cellpack/autopack/DBRecipeHandler.py @@ -550,23 +550,27 @@ def upload_result_metadata(self, file_name, url, dedup_hash=None): self.upload_job_status(dedup_hash, "DONE", result_path=url) def upload_job_status( - self, dedup_hash, status, result_path=None, error_message=None + self, + dedup_hash, + status, + result_path=None, + error_message=None, + outputs_directory=None, ): """ Update status for a given dedup_hash """ if self.db: timestamp = self.db.create_timestamp() - self.db.update_or_create( - "job_status", - dedup_hash, - { - "timestamp": timestamp, - "status": str(status), - "result_path": result_path, - "error_message": error_message, - }, - ) + data = { + "timestamp": timestamp, + "status": str(status), + "result_path": result_path, + "error_message": error_message, + } + if outputs_directory: + data["outputs_directory"] = outputs_directory + self.db.update_or_create("job_status", dedup_hash, data) def save_recipe_and_config_to_output(self, output_folder, config_data, recipe_data): output_path = Path(output_folder) @@ -630,10 +634,11 @@ def upload_packing_results_workflow( f"Cleaned up temporary upload folder: {s3_upload_folder}" ) - # update outputs directory in firebase - self.update_outputs_directory( + # update outputs directory in job status + self.upload_job_status( dedup_hash, - upload_result.get("outputs_directory"), + "DONE", + outputs_directory=upload_result.get("outputs_directory"), ) return upload_result @@ -688,26 +693,6 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, dedup_hash): logging.error(e) return {"success": False, "error": e} - def update_outputs_directory(self, dedup_hash, outputs_directory): - if not self.db or self.db.s3_client: - # switch to firebase handler to update job status - handler = DATABASE_IDS.handlers().get("firebase") - initialized_db = handler(default_db="staging") - if dedup_hash: - timestamp = initialized_db.create_timestamp() - initialized_db.update_or_create( - "job_status", - dedup_hash, - { - "timestamp": timestamp, - "status": "DONE", - "outputs_directory": outputs_directory, - }, - ) - logging.debug( - f"Updated status to DONE, outputs_directory={outputs_directory} for dedup_hash: {dedup_hash}" - ) - class DBRecipeLoader(object): """ From a320a5a0e426c4a18417df9b81e636037a2fe3ef Mon Sep 17 00:00:00 2001 From: Ruge Li Date: Fri, 23 Jan 2026 15:16:42 -0800 Subject: [PATCH 10/15] cleanup --- docker/server.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/docker/server.py b/docker/server.py index addfaad9..74bb20f3 100644 --- a/docker/server.py +++ b/docker/server.py @@ -18,10 +18,6 @@ def _get_firebase_handler(self, database_name="firebase"): return None def job_exists(self, dedup_hash): - """ - Check if a job already exists for this dedup_hash. - Returns True if a document exists, False otherwise. - """ db = self._get_firebase_handler() if not db: return False @@ -47,7 +43,7 @@ async def hello_world(self, request: web.Request) -> web.Response: return web.Response(text="Hello from the cellPACK server") async def health_check(self, request: web.Request) -> web.Response: - # healthcheck endpoint needed for AWS load balancer + # health check endpoint needed for AWS load balancer return web.Response() async def pack_handler(self, request: web.Request) -> web.Response: @@ -75,6 +71,8 @@ async def pack_handler(self, request: web.Request) -> web.Response: self.packing_tasks.add(packing_task) packing_task.add_done_callback(self.packing_tasks.discard) + # return job id immediately, rather than wait for task to complete, + # to avoid timeout issues with API gateway return web.json_response({"jobId": dedup_hash}) @@ -90,4 +88,4 @@ async def init_app() -> web.Application: ) return app -web.run_app(init_app(), host="0.0.0.0", port=SERVER_PORT) \ No newline at end of file +web.run_app(init_app(), host="0.0.0.0", port=SERVER_PORT) From 3b1547fa199e980ada8c713fdfaa3834a85ca7c8 Mon Sep 17 00:00:00 2001 From: Ruge Li Date: Tue, 27 Jan 2026 14:28:24 -0800 Subject: [PATCH 11/15] fix upload_job_status to handle awshandler --- cellpack/autopack/DBRecipeHandler.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/cellpack/autopack/DBRecipeHandler.py b/cellpack/autopack/DBRecipeHandler.py index 023fc2de..00fc8bb7 100644 --- a/cellpack/autopack/DBRecipeHandler.py +++ b/cellpack/autopack/DBRecipeHandler.py @@ -561,16 +561,22 @@ def upload_job_status( Update status for a given dedup_hash """ if self.db: - timestamp = self.db.create_timestamp() + db_handler = self.db + # If db is AWSHandler, switch to firebase handler for job status updates + if hasattr(self.db, "s3_client"): + handler = DATABASE_IDS.handlers().get(DATABASE_IDS.FIREBASE) + db_handler = handler(default_db="staging") + timestamp = db_handler.create_timestamp() data = { "timestamp": timestamp, "status": str(status), - "result_path": result_path, "error_message": error_message, } + if result_path: + data["result_path"] = result_path if outputs_directory: data["outputs_directory"] = outputs_directory - self.db.update_or_create("job_status", dedup_hash, data) + db_handler.update_or_create("job_status", dedup_hash, data) def save_recipe_and_config_to_output(self, output_folder, config_data, recipe_data): output_path = Path(output_folder) From 54f7a324a8c3fa36232ed8b59a700c9c443161e9 Mon Sep 17 00:00:00 2001 From: Ruge Li Date: Tue, 27 Jan 2026 14:50:08 -0800 Subject: [PATCH 12/15] pass dedup_pash to env for fetching across files --- cellpack/autopack/upy/simularium/simularium_helper.py | 5 +---- cellpack/autopack/writers/__init__.py | 3 ++- cellpack/bin/pack.py | 1 + 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/cellpack/autopack/upy/simularium/simularium_helper.py b/cellpack/autopack/upy/simularium/simularium_helper.py index 61a848d3..4f934e0e 100644 --- a/cellpack/autopack/upy/simularium/simularium_helper.py +++ b/cellpack/autopack/upy/simularium/simularium_helper.py @@ -1385,11 +1385,8 @@ def raycast(self, **kw): def raycast_test(self, obj, start, end, length, **kw): return - def post_and_open_file(self, file_name, open_results_in_browser): + def post_and_open_file(self, file_name, open_results_in_browser, dedup_hash=None): simularium_file = Path(f"{file_name}.simularium") - url = None - # TODO: refactor to receive dedup_hash as parameter instead of reading from environment, and address the todo in L1410 - dedup_hash = os.environ.get("AWS_BATCH_JOB_ID", None) file_name, url = simulariumHelper.store_result_file( simularium_file, storage="aws", batch_job_id=dedup_hash ) diff --git a/cellpack/autopack/writers/__init__.py b/cellpack/autopack/writers/__init__.py index 6ca931af..add4de18 100644 --- a/cellpack/autopack/writers/__init__.py +++ b/cellpack/autopack/writers/__init__.py @@ -197,8 +197,9 @@ def save_as_simularium(self, env, seed_to_results_map): number_of_packings = env.config_data.get("number_of_packings", 1) open_results_in_browser = env.config_data.get("open_results_in_browser", False) upload_results = env.config_data.get("upload_results", False) + dedup_hash = getattr(env, "dedup_hash", None) if (number_of_packings == 1 or is_aggregate) and upload_results: - autopack.helper.post_and_open_file(file_name, open_results_in_browser) + autopack.helper.post_and_open_file(file_name, open_results_in_browser, dedup_hash) def save_Mixed_asJson( self, diff --git a/cellpack/bin/pack.py b/cellpack/bin/pack.py index d5879358..27c4d018 100644 --- a/cellpack/bin/pack.py +++ b/cellpack/bin/pack.py @@ -60,6 +60,7 @@ def pack( autopack.helper = helper env = Environment(config=packing_config_data, recipe=recipe_data) env.helper = helper + env.dedup_hash = hash log.info("Packing recipe: %s", recipe_data["name"]) log.info("Outputs will be saved to %s", env.out_folder) From 1c7d93bd48abc4cdbbd280ca53a224b308127c84 Mon Sep 17 00:00:00 2001 From: Ruge Li Date: Tue, 27 Jan 2026 14:57:13 -0800 Subject: [PATCH 13/15] add tests --- cellpack/tests/test_db_uploader.py | 51 ++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/cellpack/tests/test_db_uploader.py b/cellpack/tests/test_db_uploader.py index 0c91cbd5..8738937c 100644 --- a/cellpack/tests/test_db_uploader.py +++ b/cellpack/tests/test_db_uploader.py @@ -175,3 +175,54 @@ def test_upload_recipe(): "A": "firebase:composition/test_id", } assert recipe_doc.objects_to_path_map == {"sphere_25": "firebase:objects/test_id"} + + +def test_upload_job_status_with_firebase_handler(): + mock_firebase_db = MagicMock() + mock_firebase_db.create_timestamp.return_value = "test_timestamp" + # firebaseHandler does not have s3_client attribute + del mock_firebase_db.s3_client + + uploader = DBUploader(mock_firebase_db) + uploader.upload_job_status("test_hash", "RUNNING") + + mock_firebase_db.create_timestamp.assert_called_once() + mock_firebase_db.update_or_create.assert_called_once_with( + "job_status", + "test_hash", + { + "timestamp": "test_timestamp", + "status": "RUNNING", + "error_message": None, + }, + ) + + +def test_upload_job_status_with_aws_handler(): + mock_aws_db = MagicMock() + mock_aws_db.s3_client = MagicMock() # AWSHandler has s3_client + + mock_firebase_handler = MagicMock() + mock_firebase_handler.create_timestamp.return_value = "firebase_timestamp" + + with patch( + "cellpack.autopack.DBRecipeHandler.DATABASE_IDS.handlers" + ) as mock_handlers: + mock_handlers.return_value.get.return_value = lambda default_db: mock_firebase_handler + + uploader = DBUploader(mock_aws_db) + uploader.upload_job_status("test_hash", "DONE", result_path="test_path") + + mock_firebase_handler.create_timestamp.assert_called_once() + mock_firebase_handler.update_or_create.assert_called_once_with( + "job_status", + "test_hash", + { + "timestamp": "firebase_timestamp", + "status": "DONE", + "error_message": None, + "result_path": "test_path", + }, + ) + # AWS handler should not be called for timestamp + mock_aws_db.create_timestamp.assert_not_called() From 2189bb18516853066271c71cd7cc820492193b3b Mon Sep 17 00:00:00 2001 From: Ruge Li Date: Tue, 27 Jan 2026 14:57:51 -0800 Subject: [PATCH 14/15] format1 --- cellpack/autopack/writers/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cellpack/autopack/writers/__init__.py b/cellpack/autopack/writers/__init__.py index add4de18..0b09e03a 100644 --- a/cellpack/autopack/writers/__init__.py +++ b/cellpack/autopack/writers/__init__.py @@ -199,7 +199,9 @@ def save_as_simularium(self, env, seed_to_results_map): upload_results = env.config_data.get("upload_results", False) dedup_hash = getattr(env, "dedup_hash", None) if (number_of_packings == 1 or is_aggregate) and upload_results: - autopack.helper.post_and_open_file(file_name, open_results_in_browser, dedup_hash) + autopack.helper.post_and_open_file( + file_name, open_results_in_browser, dedup_hash + ) def save_Mixed_asJson( self, From feeb20383d4c83b554c45dff040c9a0c10eb17ea Mon Sep 17 00:00:00 2001 From: Ruge Li Date: Tue, 27 Jan 2026 14:58:14 -0800 Subject: [PATCH 15/15] format test --- cellpack/tests/test_db_uploader.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cellpack/tests/test_db_uploader.py b/cellpack/tests/test_db_uploader.py index 8738937c..414f6b9c 100644 --- a/cellpack/tests/test_db_uploader.py +++ b/cellpack/tests/test_db_uploader.py @@ -208,7 +208,9 @@ def test_upload_job_status_with_aws_handler(): with patch( "cellpack.autopack.DBRecipeHandler.DATABASE_IDS.handlers" ) as mock_handlers: - mock_handlers.return_value.get.return_value = lambda default_db: mock_firebase_handler + mock_handlers.return_value.get.return_value = ( + lambda default_db: mock_firebase_handler + ) uploader = DBUploader(mock_aws_db) uploader.upload_job_status("test_hash", "DONE", result_path="test_path")