diff --git a/src/jobflow_remote/cli/flow.py b/src/jobflow_remote/cli/flow.py index d4069732..da443330 100644 --- a/src/jobflow_remote/cli/flow.py +++ b/src/jobflow_remote/cli/flow.py @@ -1,10 +1,17 @@ import contextlib +import fnmatch +import logging +from collections import defaultdict from datetime import datetime -from typing import Annotated +from pathlib import Path +from typing import Annotated, Any import typer from dateutil.tz import tzlocal from jobflow.utils.graph import draw_graph +from monty.json import jsanitize +from monty.os import makedirs_p +from monty.serialization import dumpfn from rich.prompt import Confirm from rich.text import Text @@ -18,6 +25,7 @@ from jobflow_remote.cli.jf import app from jobflow_remote.cli.jfr_typer import JFRTyper from jobflow_remote.cli.types import ( + OptionalStr, break_lock_opt, cli_output_keys_opt, count_opt, @@ -60,9 +68,15 @@ loading_spinner, out_console, ) +from jobflow_remote.jobs.data import projection_job_info from jobflow_remote.jobs.graph import get_graph, plot_dash from jobflow_remote.jobs.report import FlowsReport from jobflow_remote.jobs.state import JobState +from jobflow_remote.remote.data import get_remote_store_filenames +from jobflow_remote.utils.remote import SharedHosts + +logger = logging.getLogger(__name__) + app_flow = JFRTyper( name="flow", help="Commands for managing the flows", no_args_is_help=True @@ -631,6 +645,95 @@ def clean( out_console.print(f"Deleted execution folders of {len(deleted)} Jobs") +@app_flow.command() +def dump( + flow_db_id: flow_db_id_arg, + job_id_flag: job_flow_id_flag_opt = False, + include_jobs_info: Annotated[ + bool, + typer.Option( + "--jobs-info", + "-ji", + help="Include the list of jobs info data", + ), + ] = False, + output: Annotated[ + bool, + typer.Option( + "--output", + "-o", + help="Create a file with all the jobs outputs", + ), + ] = False, + path: Annotated[ + OptionalStr, + typer.Option( + "--path", + "-p", + help="If defined, the location to where the dumped files will be save. Otherwise in the local folder.", + ), + ] = None, +): + """ + Dump to JSON files the data of a Flow. + """ + db_id, jf_id = get_job_db_ids(flow_db_id, None) + db_ids = job_ids = flow_ids = None + if db_id is not None: + db_ids = [db_id] + elif job_id_flag: + job_ids = [jf_id] + else: + flow_ids = [jf_id] + + if not path: + path = "." + save_path = Path(path) + + if not save_path.is_dir(): + raise typer.BadParameter("The path is not an existing directory") + + with loading_spinner(processing=False) as progress: + progress.add_task(description="Retrieving info...", total=None) + jc = get_job_controller() + + with_jobs_info = projection_job_info if include_jobs_info else output + flows_info = jc.get_flows_info( + job_ids=job_ids, + db_ids=db_ids, + flow_ids=flow_ids, + limit=1, + with_jobs_info=with_jobs_info, + ) + + if not flows_info: + exit_with_error_msg("No data matching the request") + flow_info = flows_info[0] + + progress.add_task(description="Dumping data...", total=None) + dumpfn( + jsanitize(flow_info.model_dump(), allow_bson=False, enum_values=True), + save_path / f"{flow_info.flow_id}_data.json", + ) + + if output: + progress.add_task(description="Retrieving output...", total=None) + output_data: dict[str, dict[int, Any]] = defaultdict(dict) + js = jc.project.get_jobstore(name=flow_info.jobstore) + js.connect() + for ji in flow_info.jobs_info: + try: + out = js.get_output(uuid=ji.uuid, which=ji.index, load=True) + output_data[ji.uuid][ji.index] = out + except Exception: + logger.warning( + f"Error while retrieving output for job {ji.uuid} index {ji.index}", + exc_info=True, + ) + progress.add_task(description="Dumping output...", total=None) + dumpfn(output_data, save_path / f"{flow_info.flow_id}_output.json") + + app_flow_set = JFRTyper( name="set", help="Commands for setting properties for flows", no_args_is_help=True ) @@ -663,3 +766,139 @@ def store( jc.set_flow_store(store=store, flow_id=flow_id, db_id=db_id, job_id=job_id) out_console.print("Flow has been updated") + + +app_flow_files = JFRTyper( + name="files", + help="Commands for managing the files associated to a flow", + no_args_is_help=True, +) +app_flow.add_typer(app_flow_files) + + +@app_flow_files.command(name="get") +def files_get( + flow_db_id: flow_db_id_arg, + job_id_flag: job_flow_id_flag_opt = False, + filenames: Annotated[ + list[str] | None, + typer.Argument( + help="A list of file names to be retrieved from the jobs run dir. ", + ), + ] = None, + path: Annotated[ + OptionalStr, + typer.Option( + "--path", + "-p", + help="If defined, the files will be copied to this path. Otherwise in the local folder.", + ), + ] = None, + exclude_jfr: Annotated[ + bool, + typer.Option( + "--exclude-jfr", + "-xj", + help="Exclude the standard files generated by jobflow that may be present in the " + "folder (e.g. jfremote_in.json)", + ), + ] = False, +) -> None: + """ + Retrieve files from the Jobs' execution folders. + """ + db_id, jf_id = get_job_db_ids(flow_db_id, None) + db_ids = job_ids = flow_ids = None + if db_id is not None: + db_ids = [db_id] + elif job_id_flag: + job_ids = [jf_id] + else: + flow_ids = [jf_id] + + if not path: + path = "." + save_path = Path(path) + if not save_path.is_dir(): + raise typer.BadParameter("The path is not an existing directory") + + with loading_spinner(processing=False) as progress: + progress.add_task(description="Retrieving info...", total=None) + jc = get_job_controller() + + flows_info = jc.get_flows_info( + job_ids=job_ids, + db_ids=db_ids, + flow_ids=flow_ids, + limit=1, + with_jobs_info=["run_dir"], + ) + + if not flows_info: + exit_with_error_msg("No data matching the request") + flow_info = flows_info[0] + + jc.project.get_jobstore(name=flow_info.jobstore) + + store_files = get_remote_store_filenames( + jc.project.get_jobstore(name=flow_info.jobstore), + config_dict=jc.project.remote_jobstore, + ) + jfr_files = [f"{sf}*" for sf in store_files] + [ + "jfremote_in.json*", + "jfremote_out.json*", + "submit.sh*", + ] + + with loading_spinner(processing=False) as progress: + progress.add_task(description="Retrieving files...", total=None) + + with SharedHosts(jc.project) as shared_hosts: + for ji in flow_info.jobs_info: + try: + if not ji.run_dir: + logger.info(f"Job with db_id {ji.db_id} has no run_dir defined") + continue + host = shared_hosts.get_host(ji.worker) + run_dir = Path(ji.run_dir) + if not host.exists(run_dir): + logger.warning( + f"The run_dir for db_id {ji.db_id} does not exists on worker {ji.worker}: {run_dir}" + ) + continue + + remote_files = host.listdir(run_dir) + + if filenames: + files_to_transfer = [ + rfn + for rfn in remote_files + if any(fnmatch.fnmatch(rfn, fn) for fn in filenames) + ] + else: + files_to_transfer = remote_files + + if exclude_jfr: + files_to_transfer = [ + rfn + for rfn in files_to_transfer + if not any(fnmatch.fnmatch(rfn, fn) for fn in jfr_files) + ] + + if not files_to_transfer: + logger.info( + f"Job with db_id {ji.db_id} does not contain files to be archived" + ) + continue + + # create a folder to contain the downloaded files + job_dir = save_path / run_dir.name + makedirs_p(job_dir) + + for file_to_transfer in files_to_transfer: + host.get(str(run_dir / file_to_transfer), str(job_dir)) + + except Exception as exc: + raise RuntimeError( + f"Error while fetching db_id {ji.db_id} on worker {ji.worker}. " + ) from exc diff --git a/src/jobflow_remote/jobs/data.py b/src/jobflow_remote/jobs/data.py index a905081f..9d9bd10a 100644 --- a/src/jobflow_remote/jobs/data.py +++ b/src/jobflow_remote/jobs/data.py @@ -523,6 +523,7 @@ class FlowInfo(BaseModel): hosts: list[list[str]] flow_metadata: dict jobs_info: list[JobInfo] | None = None + jobstore: str | None = None @classmethod def from_query_dict(cls, d) -> "FlowInfo": @@ -579,6 +580,9 @@ def from_query_dict(cls, d) -> "FlowInfo": hosts=job_hosts, flow_metadata=d["metadata"], jobs_info=jobs_info or None, + # jobstore should always be present in new flows, but it did not always + # exist. Use get() for backward compatibility + jobstore=d.get("jobstore"), ) @cached_property