Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 240 additions & 1 deletion src/jobflow_remote/cli/flow.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import contextlib
import fnmatch
import logging
from collections import defaultdict
from datetime import datetime
from typing import Annotated
from pathlib import Path
from typing import Annotated, Any

import typer
from dateutil.tz import tzlocal
from jobflow.utils.graph import draw_graph
from monty.json import jsanitize
from monty.os import makedirs_p
from monty.serialization import dumpfn
from rich.prompt import Confirm
from rich.text import Text

Expand All @@ -18,6 +25,7 @@
from jobflow_remote.cli.jf import app
from jobflow_remote.cli.jfr_typer import JFRTyper
from jobflow_remote.cli.types import (
OptionalStr,
break_lock_opt,
cli_output_keys_opt,
count_opt,
Expand Down Expand Up @@ -60,9 +68,15 @@
loading_spinner,
out_console,
)
from jobflow_remote.jobs.data import projection_job_info
from jobflow_remote.jobs.graph import get_graph, plot_dash
from jobflow_remote.jobs.report import FlowsReport
from jobflow_remote.jobs.state import JobState
from jobflow_remote.remote.data import get_remote_store_filenames
from jobflow_remote.utils.remote import SharedHosts

logger = logging.getLogger(__name__)


app_flow = JFRTyper(
name="flow", help="Commands for managing the flows", no_args_is_help=True
Expand Down Expand Up @@ -631,6 +645,95 @@ def clean(
out_console.print(f"Deleted execution folders of {len(deleted)} Jobs")


@app_flow.command()
def dump(
flow_db_id: flow_db_id_arg,
job_id_flag: job_flow_id_flag_opt = False,
include_jobs_info: Annotated[
bool,
typer.Option(
"--jobs-info",
"-ji",
help="Include the list of jobs info data",
),
] = False,
output: Annotated[
bool,
typer.Option(
"--output",
"-o",
help="Create a file with all the jobs outputs",
),
] = False,
path: Annotated[
OptionalStr,
typer.Option(
"--path",
"-p",
help="If defined, the location to where the dumped files will be save. Otherwise in the local folder.",
),
] = None,
):
"""
Dump to JSON files the data of a Flow.
"""
db_id, jf_id = get_job_db_ids(flow_db_id, None)
db_ids = job_ids = flow_ids = None
if db_id is not None:
db_ids = [db_id]
elif job_id_flag:
job_ids = [jf_id]
else:
flow_ids = [jf_id]

if not path:
path = "."
save_path = Path(path)

if not save_path.is_dir():
raise typer.BadParameter("The path is not an existing directory")

with loading_spinner(processing=False) as progress:
progress.add_task(description="Retrieving info...", total=None)
jc = get_job_controller()

with_jobs_info = projection_job_info if include_jobs_info else output
flows_info = jc.get_flows_info(
job_ids=job_ids,
db_ids=db_ids,
flow_ids=flow_ids,
limit=1,
with_jobs_info=with_jobs_info,
)

if not flows_info:
exit_with_error_msg("No data matching the request")
flow_info = flows_info[0]

progress.add_task(description="Dumping data...", total=None)
dumpfn(
jsanitize(flow_info.model_dump(), allow_bson=False, enum_values=True),
save_path / f"{flow_info.flow_id}_data.json",
)

if output:
progress.add_task(description="Retrieving output...", total=None)
output_data: dict[str, dict[int, Any]] = defaultdict(dict)
js = jc.project.get_jobstore(name=flow_info.jobstore)
js.connect()
for ji in flow_info.jobs_info:
try:
out = js.get_output(uuid=ji.uuid, which=ji.index, load=True)
output_data[ji.uuid][ji.index] = out
except Exception:
logger.warning(
f"Error while retrieving output for job {ji.uuid} index {ji.index}",
exc_info=True,
)
progress.add_task(description="Dumping output...", total=None)
dumpfn(output_data, save_path / f"{flow_info.flow_id}_output.json")


app_flow_set = JFRTyper(
name="set", help="Commands for setting properties for flows", no_args_is_help=True
)
Expand Down Expand Up @@ -663,3 +766,139 @@ def store(

jc.set_flow_store(store=store, flow_id=flow_id, db_id=db_id, job_id=job_id)
out_console.print("Flow has been updated")


app_flow_files = JFRTyper(
name="files",
help="Commands for managing the files associated to a flow",
no_args_is_help=True,
)
app_flow.add_typer(app_flow_files)


@app_flow_files.command(name="get")
def files_get(
flow_db_id: flow_db_id_arg,
job_id_flag: job_flow_id_flag_opt = False,
filenames: Annotated[
list[str] | None,
typer.Argument(
help="A list of file names to be retrieved from the jobs run dir. ",
),
] = None,
path: Annotated[
OptionalStr,
typer.Option(
"--path",
"-p",
help="If defined, the files will be copied to this path. Otherwise in the local folder.",
),
] = None,
exclude_jfr: Annotated[
bool,
typer.Option(
"--exclude-jfr",
"-xj",
help="Exclude the standard files generated by jobflow that may be present in the "
"folder (e.g. jfremote_in.json)",
),
] = False,
) -> None:
"""
Retrieve files from the Jobs' execution folders.
"""
db_id, jf_id = get_job_db_ids(flow_db_id, None)
db_ids = job_ids = flow_ids = None
if db_id is not None:
db_ids = [db_id]
elif job_id_flag:
job_ids = [jf_id]
else:
flow_ids = [jf_id]

if not path:
path = "."
save_path = Path(path)
if not save_path.is_dir():
raise typer.BadParameter("The path is not an existing directory")

with loading_spinner(processing=False) as progress:
progress.add_task(description="Retrieving info...", total=None)
jc = get_job_controller()

flows_info = jc.get_flows_info(
job_ids=job_ids,
db_ids=db_ids,
flow_ids=flow_ids,
limit=1,
with_jobs_info=["run_dir"],
)

if not flows_info:
exit_with_error_msg("No data matching the request")
flow_info = flows_info[0]

jc.project.get_jobstore(name=flow_info.jobstore)

store_files = get_remote_store_filenames(
jc.project.get_jobstore(name=flow_info.jobstore),
config_dict=jc.project.remote_jobstore,
)
jfr_files = [f"{sf}*" for sf in store_files] + [
"jfremote_in.json*",
"jfremote_out.json*",
"submit.sh*",
]

with loading_spinner(processing=False) as progress:
progress.add_task(description="Retrieving files...", total=None)

with SharedHosts(jc.project) as shared_hosts:
for ji in flow_info.jobs_info:
try:
if not ji.run_dir:
logger.info(f"Job with db_id {ji.db_id} has no run_dir defined")
continue
host = shared_hosts.get_host(ji.worker)
run_dir = Path(ji.run_dir)
if not host.exists(run_dir):
logger.warning(
f"The run_dir for db_id {ji.db_id} does not exists on worker {ji.worker}: {run_dir}"
)
continue

remote_files = host.listdir(run_dir)

if filenames:
files_to_transfer = [
rfn
for rfn in remote_files
if any(fnmatch.fnmatch(rfn, fn) for fn in filenames)
]
else:
files_to_transfer = remote_files

if exclude_jfr:
files_to_transfer = [
rfn
for rfn in files_to_transfer
if not any(fnmatch.fnmatch(rfn, fn) for fn in jfr_files)
]

if not files_to_transfer:
logger.info(
f"Job with db_id {ji.db_id} does not contain files to be archived"
)
continue

# create a folder to contain the downloaded files
job_dir = save_path / run_dir.name
makedirs_p(job_dir)

for file_to_transfer in files_to_transfer:
host.get(str(run_dir / file_to_transfer), str(job_dir))

except Exception as exc:
raise RuntimeError(
f"Error while fetching db_id {ji.db_id} on worker {ji.worker}. "
) from exc
4 changes: 4 additions & 0 deletions src/jobflow_remote/jobs/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ class FlowInfo(BaseModel):
hosts: list[list[str]]
flow_metadata: dict
jobs_info: list[JobInfo] | None = None
jobstore: str | None = None

@classmethod
def from_query_dict(cls, d) -> "FlowInfo":
Expand Down Expand Up @@ -579,6 +580,9 @@ def from_query_dict(cls, d) -> "FlowInfo":
hosts=job_hosts,
flow_metadata=d["metadata"],
jobs_info=jobs_info or None,
# jobstore should always be present in new flows, but it did not always
# exist. Use get() for backward compatibility
jobstore=d.get("jobstore"),
)

@cached_property
Expand Down
Loading