Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 41 additions & 46 deletions cellpack/autopack/DBRecipeHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ def upload_config(self, config_data, source_path):
self.db.update_doc("configs", id, config_data)
return id

def upload_result_metadata(self, file_name, url, job_id=None):
def upload_result_metadata(self, file_name, url, dedup_hash=None):
"""
Upload the metadata of the result file to the database.
"""
Expand All @@ -543,28 +543,40 @@ def upload_result_metadata(self, file_name, url, job_id=None):
"user": username,
"timestamp": timestamp,
"url": url,
"batch_job_id": job_id,
"dedup_hash": dedup_hash,
},
)
if job_id:
self.upload_job_status(job_id, "DONE", result_path=url)
if dedup_hash:
self.upload_job_status(dedup_hash, "DONE", result_path=url)

def upload_job_status(self, job_id, status, result_path=None, error_message=None):
def upload_job_status(
self,
dedup_hash,
status,
result_path=None,
error_message=None,
outputs_directory=None,
):
"""
Update status for a given job ID
Update status for a given dedup_hash
"""
if self.db:
timestamp = self.db.create_timestamp()
self.db.update_or_create(
"job_status",
job_id,
{
"timestamp": timestamp,
"status": str(status),
"result_path": result_path,
"error_message": error_message,
},
)
db_handler = self.db
# If db is AWSHandler, switch to firebase handler for job status updates
if hasattr(self.db, "s3_client"):
handler = DATABASE_IDS.handlers().get(DATABASE_IDS.FIREBASE)
db_handler = handler(default_db="staging")
timestamp = db_handler.create_timestamp()
data = {
"timestamp": timestamp,
"status": str(status),
"error_message": error_message,
}
if result_path:
data["result_path"] = result_path
if outputs_directory:
data["outputs_directory"] = outputs_directory
db_handler.update_or_create("job_status", dedup_hash, data)

def save_recipe_and_config_to_output(self, output_folder, config_data, recipe_data):
output_path = Path(output_folder)
Expand All @@ -583,15 +595,15 @@ def upload_packing_results_workflow(
self,
source_folder,
recipe_name,
job_id,
dedup_hash,
config_data,
recipe_data,
):
"""
Complete packing results upload workflow including folder preparation and s3 upload
"""
try:
if job_id:
if dedup_hash:

source_path = Path(source_folder)
if not source_path.exists():
Expand All @@ -601,7 +613,7 @@ def upload_packing_results_workflow(

# prepare unique S3 upload folder
parent_folder = source_path.parent
unique_folder_name = f"{source_path.name}_run_{job_id}"
unique_folder_name = f"{source_path.name}_run_{dedup_hash}"
s3_upload_folder = parent_folder / unique_folder_name

logging.debug(f"outputs will be copied to: {s3_upload_folder}")
Expand All @@ -618,7 +630,7 @@ def upload_packing_results_workflow(
upload_result = self.upload_outputs_to_s3(
output_folder=s3_upload_folder,
recipe_name=recipe_name,
job_id=job_id,
dedup_hash=dedup_hash,
)

# clean up temporary folder after upload
Expand All @@ -628,9 +640,11 @@ def upload_packing_results_workflow(
f"Cleaned up temporary upload folder: {s3_upload_folder}"
)

# update outputs directory in firebase
self.update_outputs_directory(
job_id, upload_result.get("outputs_directory")
# update outputs directory in job status
self.upload_job_status(
dedup_hash,
"DONE",
outputs_directory=upload_result.get("outputs_directory"),
)
Copy link
Collaborator

@ascibisz ascibisz Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I tried running this locally, I got the error ERROR | DBRecipeHandler:647 | upload_packing_results_workflow() | 'AWSHandler' object has no attribute 'create_timestamp', which I think traces back to the fact that we're now calling upload_job_status here instead of update_outputs_directory here, so we need to add a check into upload_job_status similiar to the one we had in update_outputs_directory since self.db is currently a AWSHandler instance.

So I think we want to add something like

if not self.db or self.db.s3_client:
    # switch to firebase handler to update job status
    handler = DATABASE_IDS.handlers().get("firebase")
    initialized_db = handler(default_db="staging")

to upload_job_status


return upload_result
Expand All @@ -639,15 +653,15 @@ def upload_packing_results_workflow(
logging.error(e)
return {"success": False, "error": e}

def upload_outputs_to_s3(self, output_folder, recipe_name, job_id):
def upload_outputs_to_s3(self, output_folder, recipe_name, dedup_hash):
"""
Upload packing outputs to S3 bucket
"""

bucket_name = self.db.bucket_name
region_name = self.db.region_name
sub_folder_name = self.db.sub_folder_name
s3_prefix = f"{sub_folder_name}/{recipe_name}/{job_id}"
s3_prefix = f"{sub_folder_name}/{recipe_name}/{dedup_hash}"

try:
upload_result = self.db.upload_directory(
Expand All @@ -671,7 +685,7 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id):

return {
"success": True,
"run_id": job_id,
"dedup_hash": dedup_hash,
"s3_bucket": bucket_name,
"s3_prefix": s3_prefix,
"public_url_base": f"{base_url}/{s3_prefix}/",
Expand All @@ -685,25 +699,6 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id):
logging.error(e)
return {"success": False, "error": e}

def update_outputs_directory(self, job_id, outputs_directory):
if not self.db or self.db.s3_client:
# switch to firebase handler to update job status
handler = DATABASE_IDS.handlers().get("firebase")
initialized_db = handler(default_db="staging")
if job_id:
timestamp = initialized_db.create_timestamp()
initialized_db.update_or_create(
"job_status",
job_id,
{
"timestamp": timestamp,
"outputs_directory": outputs_directory,
},
)
logging.debug(
f"Updated outputs s3 location {outputs_directory} for job ID: {job_id}"
)


class DBRecipeLoader(object):
"""
Expand Down
12 changes: 5 additions & 7 deletions cellpack/autopack/upy/simularium/simularium_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -1385,16 +1385,14 @@ def raycast(self, **kw):
def raycast_test(self, obj, start, end, length, **kw):
return

def post_and_open_file(self, file_name, open_results_in_browser):
def post_and_open_file(self, file_name, open_results_in_browser, dedup_hash=None):
simularium_file = Path(f"{file_name}.simularium")
url = None
job_id = os.environ.get("AWS_BATCH_JOB_ID", None)
file_name, url = simulariumHelper.store_result_file(
simularium_file, storage="aws", batch_job_id=job_id
simularium_file, storage="aws", batch_job_id=dedup_hash
)
if file_name and url:
simulariumHelper.store_metadata(
file_name, url, db="firebase", job_id=job_id
file_name, url, db="firebase", dedup_hash=dedup_hash
)
if open_results_in_browser:
simulariumHelper.open_in_simularium(url)
Expand Down Expand Up @@ -1428,15 +1426,15 @@ def store_result_file(
return file_name, url

@staticmethod
def store_metadata(file_name, url, db=None, job_id=None):
def store_metadata(file_name, url, db=None, dedup_hash=None):
if db == "firebase":
handler = DATABASE_IDS.handlers().get(db)
initialized_db = handler(
default_db="staging"
) # default to staging for metadata uploads
if initialized_db._initialized:
db_uploader = DBUploader(initialized_db)
db_uploader.upload_result_metadata(file_name, url, job_id)
db_uploader.upload_result_metadata(file_name, url, dedup_hash)
else:
db_maintainer = DBMaintenance(initialized_db)
logging.warning(
Expand Down
5 changes: 4 additions & 1 deletion cellpack/autopack/writers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,11 @@ def save_as_simularium(self, env, seed_to_results_map):
number_of_packings = env.config_data.get("number_of_packings", 1)
open_results_in_browser = env.config_data.get("open_results_in_browser", False)
upload_results = env.config_data.get("upload_results", False)
dedup_hash = getattr(env, "dedup_hash", None)
if (number_of_packings == 1 or is_aggregate) and upload_results:
autopack.helper.post_and_open_file(file_name, open_results_in_browser)
autopack.helper.post_and_open_file(
file_name, open_results_in_browser, dedup_hash
)

def save_Mixed_asJson(
self,
Expand Down
44 changes: 23 additions & 21 deletions cellpack/bin/pack.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import logging.config
import os
import time
from pathlib import Path

Expand All @@ -25,15 +24,19 @@


def pack(
recipe, config_path=None, analysis_config_path=None, docker=False, validate=True
recipe,
config_path=None,
analysis_config_path=None,
docker=False,
hash=None,
):
"""
Initializes an autopack packing from the command line
:param recipe: string argument, path to recipe file, or a dictionary representing a recipe
:param config_path: string argument, path to packing config file
:param analysis_config_path: string argument, path to analysis config file
:param docker: boolean argument, are we using docker
:param validate: boolean argument, validate recipe before packing
:param hash: string argument, dedup hash identifier for tracking/caching results

:return: void
"""
Expand All @@ -57,6 +60,7 @@ def pack(
autopack.helper = helper
env = Environment(config=packing_config_data, recipe=recipe_data)
env.helper = helper
env.dedup_hash = hash

log.info("Packing recipe: %s", recipe_data["name"])
log.info("Outputs will be saved to %s", env.out_folder)
Expand All @@ -83,24 +87,22 @@ def pack(
env.buildGrid(rebuild=True)
env.pack_grid(verbose=0, usePP=False)

if docker:
job_id = os.environ.get("AWS_BATCH_JOB_ID", None)
if job_id:
handler = DATABASE_IDS.handlers().get(DATABASE_IDS.AWS)
# temporarily using demo bucket before permissions are granted
initialized_handler = handler(
bucket_name="cellpack-demo",
sub_folder_name="runs",
region_name="us-west-2",
)
uploader = DBUploader(db_handler=initialized_handler)
uploader.upload_packing_results_workflow(
source_folder=env.out_folder,
recipe_name=recipe_data["name"],
job_id=job_id,
config_data=packing_config_data,
recipe_data=recipe_loader.serializable_recipe_data,
)
if docker and hash:
handler = DATABASE_IDS.handlers().get(DATABASE_IDS.AWS)
# temporarily using demo bucket before permissions are granted
initialized_handler = handler(
bucket_name="cellpack-demo",
sub_folder_name="runs",
region_name="us-west-2",
)
uploader = DBUploader(db_handler=initialized_handler)
uploader.upload_packing_results_workflow(
source_folder=env.out_folder,
recipe_name=recipe_data["name"],
dedup_hash=hash,
config_data=packing_config_data,
recipe_data=recipe_loader.serializable_recipe_data,
)


def main():
Expand Down
53 changes: 53 additions & 0 deletions cellpack/tests/test_db_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,56 @@ def test_upload_recipe():
"A": "firebase:composition/test_id",
}
assert recipe_doc.objects_to_path_map == {"sphere_25": "firebase:objects/test_id"}


def test_upload_job_status_with_firebase_handler():
mock_firebase_db = MagicMock()
mock_firebase_db.create_timestamp.return_value = "test_timestamp"
# firebaseHandler does not have s3_client attribute
del mock_firebase_db.s3_client

uploader = DBUploader(mock_firebase_db)
uploader.upload_job_status("test_hash", "RUNNING")

mock_firebase_db.create_timestamp.assert_called_once()
mock_firebase_db.update_or_create.assert_called_once_with(
"job_status",
"test_hash",
{
"timestamp": "test_timestamp",
"status": "RUNNING",
"error_message": None,
},
)


def test_upload_job_status_with_aws_handler():
mock_aws_db = MagicMock()
mock_aws_db.s3_client = MagicMock() # AWSHandler has s3_client

mock_firebase_handler = MagicMock()
mock_firebase_handler.create_timestamp.return_value = "firebase_timestamp"

with patch(
"cellpack.autopack.DBRecipeHandler.DATABASE_IDS.handlers"
) as mock_handlers:
mock_handlers.return_value.get.return_value = (
lambda default_db: mock_firebase_handler
)

uploader = DBUploader(mock_aws_db)
uploader.upload_job_status("test_hash", "DONE", result_path="test_path")

mock_firebase_handler.create_timestamp.assert_called_once()
mock_firebase_handler.update_or_create.assert_called_once_with(
"job_status",
"test_hash",
{
"timestamp": "firebase_timestamp",
"status": "DONE",
"error_message": None,
"result_path": "test_path",
},
)
# AWS handler should not be called for timestamp
mock_aws_db.create_timestamp.assert_not_called()
Loading