Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 44 additions & 47 deletions cellpack/autopack/DBRecipeHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ def upload_config(self, config_data, source_path):
self.db.update_doc("configs", id, config_data)
return id

def upload_result_metadata(self, file_name, url, job_id=None):
def upload_result_metadata(self, file_name, url):
"""
Upload the metadata of the result file to the database.
"""
Expand All @@ -543,28 +543,37 @@ def upload_result_metadata(self, file_name, url, job_id=None):
"user": username,
"timestamp": timestamp,
"url": url,
"batch_job_id": job_id,
},
)
if job_id:
self.upload_job_status(job_id, "DONE", result_path=url)

def upload_job_status(self, job_id, status, result_path=None, error_message=None):
def upload_job_status(
self,
dedup_hash,
status,
result_path=None,
error_message=None,
outputs_directory=None,
):
"""
Update status for a given job ID
Update status for a given dedup_hash
"""
if self.db:
timestamp = self.db.create_timestamp()
self.db.update_or_create(
"job_status",
job_id,
{
"timestamp": timestamp,
"status": str(status),
"result_path": result_path,
"error_message": error_message,
},
)
db_handler = self.db
# If db is AWSHandler, switch to firebase handler for job status updates
if hasattr(self.db, "s3_client"):
handler = DATABASE_IDS.handlers().get(DATABASE_IDS.FIREBASE)
db_handler = handler(default_db="staging")
timestamp = db_handler.create_timestamp()
data = {
"timestamp": timestamp,
"status": str(status),
"error_message": error_message,
}
if result_path:
data["result_path"] = result_path
if outputs_directory:
data["outputs_directory"] = outputs_directory
db_handler.update_or_create("job_status", dedup_hash, data)

def save_recipe_and_config_to_output(self, output_folder, config_data, recipe_data):
output_path = Path(output_folder)
Expand All @@ -583,15 +592,15 @@ def upload_packing_results_workflow(
self,
source_folder,
recipe_name,
job_id,
dedup_hash,
config_data,
recipe_data,
):
"""
Complete packing results upload workflow including folder preparation and s3 upload
"""
try:
if job_id:
if dedup_hash:

source_path = Path(source_folder)
if not source_path.exists():
Expand All @@ -601,7 +610,7 @@ def upload_packing_results_workflow(

# prepare unique S3 upload folder
parent_folder = source_path.parent
unique_folder_name = f"{source_path.name}_run_{job_id}"
unique_folder_name = f"{source_path.name}_run_{dedup_hash}"
s3_upload_folder = parent_folder / unique_folder_name

logging.debug(f"outputs will be copied to: {s3_upload_folder}")
Expand All @@ -618,7 +627,7 @@ def upload_packing_results_workflow(
upload_result = self.upload_outputs_to_s3(
output_folder=s3_upload_folder,
recipe_name=recipe_name,
job_id=job_id,
dedup_hash=dedup_hash,
)

# clean up temporary folder after upload
Expand All @@ -628,9 +637,12 @@ def upload_packing_results_workflow(
f"Cleaned up temporary upload folder: {s3_upload_folder}"
)

# update outputs directory in firebase
self.update_outputs_directory(
job_id, upload_result.get("outputs_directory")
# update outputs directory in job status
self.upload_job_status(
dedup_hash,
"DONE",
result_path=upload_result.get("simularium_url"),
outputs_directory=upload_result.get("outputs_directory"),
)

return upload_result
Expand All @@ -639,15 +651,15 @@ def upload_packing_results_workflow(
logging.error(e)
return {"success": False, "error": e}

def upload_outputs_to_s3(self, output_folder, recipe_name, job_id):
def upload_outputs_to_s3(self, output_folder, recipe_name, dedup_hash):
"""
Upload packing outputs to S3 bucket
"""

bucket_name = self.db.bucket_name
region_name = self.db.region_name
sub_folder_name = self.db.sub_folder_name
s3_prefix = f"{sub_folder_name}/{recipe_name}/{job_id}"
s3_prefix = f"{sub_folder_name}/{recipe_name}/{dedup_hash}"

try:
upload_result = self.db.upload_directory(
Expand All @@ -661,8 +673,11 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id):
f"{base_url}/{file_info['s3_key']}"
for file_info in upload_result["uploaded_files"]
]
simularium_url = None
for url in public_urls:
if url.endswith(".simularium"):
simularium_url = url
outputs_directory = f"https://us-west-2.console.aws.amazon.com/s3/buckets/{bucket_name}/{s3_prefix}/"

logging.info(
f"Successfully uploaded {upload_result['total_files']} files to {outputs_directory}"
)
Expand All @@ -671,7 +686,7 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id):

return {
"success": True,
"run_id": job_id,
"dedup_hash": dedup_hash,
"s3_bucket": bucket_name,
"s3_prefix": s3_prefix,
"public_url_base": f"{base_url}/{s3_prefix}/",
Expand All @@ -680,30 +695,12 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id):
"total_size": upload_result["total_size"],
"urls": public_urls,
"outputs_directory": outputs_directory,
"simularium_url": simularium_url,
}
except Exception as e:
logging.error(e)
return {"success": False, "error": e}

def update_outputs_directory(self, job_id, outputs_directory):
if not self.db or self.db.s3_client:
# switch to firebase handler to update job status
handler = DATABASE_IDS.handlers().get("firebase")
initialized_db = handler(default_db="staging")
if job_id:
timestamp = initialized_db.create_timestamp()
initialized_db.update_or_create(
"job_status",
job_id,
{
"timestamp": timestamp,
"outputs_directory": outputs_directory,
},
)
logging.debug(
f"Updated outputs s3 location {outputs_directory} for job ID: {job_id}"
)


class DBRecipeLoader(object):
"""
Expand Down
30 changes: 26 additions & 4 deletions cellpack/autopack/loaders/recipe_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,21 @@ class RecipeLoader(object):
# TODO: add all default values here
default_values = default_recipe_values.copy()

def __init__(self, input_file_path, save_converted_recipe=False, use_docker=False):
def __init__(
self,
input_file_path,
save_converted_recipe=False,
use_docker=False,
json_recipe=None,
):
_, file_extension = os.path.splitext(input_file_path)
self.current_version = CURRENT_VERSION
self.file_path = input_file_path
self.file_extension = file_extension
self.ingredient_list = []
self.compartment_list = []
self.save_converted_recipe = save_converted_recipe
self.json_recipe = json_recipe

# set CURRENT_RECIPE_PATH appropriately for remote(firebase) vs local recipes
if autopack.is_remote_path(self.file_path):
Expand All @@ -49,6 +56,15 @@ def __init__(self, input_file_path, save_converted_recipe=False, use_docker=Fals

self.recipe_data = self._read(use_docker=use_docker)

@classmethod
def from_json(cls, json_recipe, save_converted_recipe=False, use_docker=False):
return cls(
input_file_path="",
save_converted_recipe=save_converted_recipe,
use_docker=use_docker,
json_recipe=json_recipe,
)

@staticmethod
def _resolve_object(key, objects):
current_object = objects[key]
Expand Down Expand Up @@ -168,9 +184,15 @@ def _migrate_version(self, old_recipe):
)

def _read(self, resolve_inheritance=True, use_docker=False):
new_values, database_name, is_unnested_firebase = autopack.load_file(
self.file_path, cache="recipes", use_docker=use_docker
)
database_name = None
is_unnested_firebase = False
new_values = self.json_recipe
if new_values is None:
# Read recipe from filepath
new_values, database_name, is_unnested_firebase = autopack.load_file(
self.file_path, cache="recipes", use_docker=use_docker
)

if database_name == "firebase":
if is_unnested_firebase:
objects = new_values.get("objects", {})
Expand Down
46 changes: 18 additions & 28 deletions cellpack/autopack/upy/simularium/simularium_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -1385,40 +1385,30 @@ def raycast(self, **kw):
def raycast_test(self, obj, start, end, length, **kw):
return

def post_and_open_file(self, file_name, open_results_in_browser):
def post_and_open_file(self, file_name, open_results_in_browser, dedup_hash=None):
simularium_file = Path(f"{file_name}.simularium")
url = None
job_id = os.environ.get("AWS_BATCH_JOB_ID", None)
file_name, url = simulariumHelper.store_result_file(
simularium_file, storage="aws", batch_job_id=job_id
)
if file_name and url:
simulariumHelper.store_metadata(
file_name, url, db="firebase", job_id=job_id
if dedup_hash is None:
file_name, url = simulariumHelper.store_result_file(
simularium_file, storage="aws"
)
if open_results_in_browser:
simulariumHelper.open_in_simularium(url)
if file_name and url:
simulariumHelper.store_metadata(
file_name, url, db="firebase"
)
if open_results_in_browser:
simulariumHelper.open_in_simularium(url)

@staticmethod
def store_result_file(
file_path, storage=None, batch_job_id=None, sub_folder="simularium"
file_path, storage=None, sub_folder="simularium"
):
if storage == "aws":
handler = DATABASE_IDS.handlers().get(storage)
# if batch_job_id is not None, then we are in a batch job and should use the temp bucket
# TODO: use cellpack-results bucket for batch jobs once we have the correct permissions
if batch_job_id:
initialized_handler = handler(
bucket_name="cellpack-demo",
sub_folder_name=sub_folder,
region_name="us-west-2",
)
else:
initialized_handler = handler(
bucket_name="cellpack-results",
sub_folder_name=sub_folder,
region_name="us-west-2",
)
initialized_handler = handler(
bucket_name="cellpack-results",
sub_folder_name=sub_folder,
region_name="us-west-2",
)
file_name, url = initialized_handler.save_file_and_get_url(file_path)
if not file_name or not url:
db_maintainer = DBMaintenance(initialized_handler)
Expand All @@ -1428,15 +1418,15 @@ def store_result_file(
return file_name, url

@staticmethod
def store_metadata(file_name, url, db=None, job_id=None):
def store_metadata(file_name, url, db=None):
if db == "firebase":
handler = DATABASE_IDS.handlers().get(db)
initialized_db = handler(
default_db="staging"
) # default to staging for metadata uploads
if initialized_db._initialized:
db_uploader = DBUploader(initialized_db)
db_uploader.upload_result_metadata(file_name, url, job_id)
db_uploader.upload_result_metadata(file_name, url)
else:
db_maintainer = DBMaintenance(initialized_db)
logging.warning(
Expand Down
5 changes: 4 additions & 1 deletion cellpack/autopack/writers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,11 @@ def save_as_simularium(self, env, seed_to_results_map):
number_of_packings = env.config_data.get("number_of_packings", 1)
open_results_in_browser = env.config_data.get("open_results_in_browser", False)
upload_results = env.config_data.get("upload_results", False)
dedup_hash = getattr(env, "dedup_hash", None)
if (number_of_packings == 1 or is_aggregate) and upload_results:
autopack.helper.post_and_open_file(file_name, open_results_in_browser)
autopack.helper.post_and_open_file(
file_name, open_results_in_browser, dedup_hash
)

def save_Mixed_asJson(
self,
Expand Down
Loading
Loading