diff --git a/application/jobs/challenge_1DivideAndConquer/app/config/config_fed_client.conf b/application/jobs/challenge_1DivideAndConquer/app/config/config_fed_client.conf index 27d1b9fc..2c3be089 100644 --- a/application/jobs/challenge_1DivideAndConquer/app/config/config_fed_client.conf +++ b/application/jobs/challenge_1DivideAndConquer/app/config/config_fed_client.conf @@ -42,10 +42,10 @@ learn_task_timeout = 7200 # time to wait for learn task to abort gracefully (5 min) learn_task_abort_timeout = 300 - # time for task acknowledgment (2 min) - learn_task_ack_timeout = 120 - # time for final result acknowledgment (5 min) - final_result_ack_timeout = 300 + # time for task acknowledgment incl. model weight streaming (10 min) + learn_task_ack_timeout = 600 + # time for final result acknowledgment (10 min) + final_result_ack_timeout = 600 # ids must map to corresponding components persistor_id = "persistor" diff --git a/application/jobs/challenge_1DivideAndConquer/app/config/config_fed_server.conf b/application/jobs/challenge_1DivideAndConquer/app/config/config_fed_server.conf index cc4d4a3d..1644b649 100644 --- a/application/jobs/challenge_1DivideAndConquer/app/config/config_fed_server.conf +++ b/application/jobs/challenge_1DivideAndConquer/app/config/config_fed_server.conf @@ -17,12 +17,12 @@ workflows = [ args { # can also set aggregation clients and train clients, see class for all available args num_rounds = 20 - # time for clients to configure and start (10 min) - start_task_timeout = 600 - # max time without any progress before declaring failure (2 hours) - progress_timeout = 7200 - # time for clients to acknowledge configuration (5 min) - configure_task_timeout = 300 + # time for clients to configure and start (15 min) + start_task_timeout = 900 + # max time without any progress before declaring failure (4 hours) + progress_timeout = 14400 + # time for clients to acknowledge configuration (10 min) + configure_task_timeout = 600 # interval for clients to report status (5 min) max_status_report_interval = 300 } diff --git a/docker_config/master_template.yml b/docker_config/master_template.yml index be980911..d04b7ed5 100644 --- a/docker_config/master_template.yml +++ b/docker_config/master_template.yml @@ -803,6 +803,57 @@ docker_cln_sh: | ENV_VARS+=" --env LOG_DATASET_DETAILS=1" fi + # ── Live Sync Integration ────────────────────────────────────────── + # live_sync.sh is co-located in the startup directory (injected by + # _injectLiveSyncIntoStartupKits.sh) and syncs training artifacts to + # the central monitoring server for --local_training and --start_client + # modes. If live_sync.sh is not present the functions are a no-op. + KIT_ROOT="$(cd "$DIR/.." && pwd)" + SYNC_STATE_DIR="$DIR/.mediswarm_sync" + SITE_NAME_RESOLVED="{~~client_name~~}" + + _start_live_sync() { + local mode="$1" + if [ ! -f "$DIR/live_sync.sh" ]; then return; fi + mkdir -p "$SYNC_STATE_DIR" + + if [ "$mode" = "local" ]; then + "$DIR/live_sync.sh" \ + --mode local \ + --site-name "$SITE_NAME_RESOLVED" \ + --kit-root "$KIT_ROOT" \ + --startup-dir "$DIR" \ + --scratch-dir "${MY_SCRATCH_DIR:-}" & + LIVE_SYNC_PID=$! + elif [ "$mode" = "swarm" ]; then + local pid_file="$SYNC_STATE_DIR/swarm_sync.pid" + if [ -f "$pid_file" ]; then + local old_pid + old_pid="$(cat "$pid_file" 2>/dev/null || true)" + if [ -n "$old_pid" ] && kill -0 "$old_pid" 2>/dev/null; then + echo "Live sync daemon already running (PID $old_pid)" + return 0 + fi + fi + nohup "$DIR/live_sync.sh" \ + --mode swarm \ + --site-name "$SITE_NAME_RESOLVED" \ + --kit-root "$KIT_ROOT" \ + --startup-dir "$DIR" \ + --scratch-dir "${MY_SCRATCH_DIR:-}" \ + > "$SYNC_STATE_DIR/live_sync_daemon.log" 2>&1 < /dev/null & + echo $! > "$pid_file" + echo "Started live sync daemon (PID $(cat "$pid_file"))" + fi + } + + _stop_live_sync() { + if [ -n "${LIVE_SYNC_PID:-}" ] && kill -0 "$LIVE_SYNC_PID" 2>/dev/null; then + kill "$LIVE_SYNC_PID" || true + wait "$LIVE_SYNC_PID" || true + fi + } + # Execution modes if [ -n "$DUMMY_TRAINING" ]; then docker run --rm $TTY_OPT $DOCKER_OPTIONS $ENV_VARS --env TRAINING_MODE=local_training $DOCKER_IMAGE \ @@ -815,10 +866,13 @@ docker_cln_sh: | elif [ -n "$LOCAL_TRAINING" ]; then echo "[INFO] Local training using job: $JOB_NAME" + trap _stop_live_sync EXIT INT TERM + _start_live_sync local docker run --rm $TTY_OPT $DOCKER_OPTIONS $ENV_VARS --env TRAINING_MODE=local_training --env NUM_EPOCHS=100 $DOCKER_IMAGE \ /bin/bash -c "/MediSwarm/application/jobs/${JOB_NAME}/app/custom/main.py" elif [ -n "$START_CLIENT" ]; then + _start_live_sync swarm docker run -d -t --restart=on-failure:5 \ --health-cmd="nvidia-smi > /dev/null 2>&1 || exit 1" \ --health-interval=120s --health-start-period=180s --health-retries=3 \ diff --git a/kit_live_sync/build_heartbeat.sh b/kit_live_sync/build_heartbeat.sh index 90f744a9..7f38c871 100755 --- a/kit_live_sync/build_heartbeat.sh +++ b/kit_live_sync/build_heartbeat.sh @@ -11,6 +11,14 @@ OUT_FILE="${7:-/tmp/mediswarm_heartbeat.json}" timestamp="$(date -u +%FT%TZ)" +# Extract kit version from docker.sh (baked in at build time) +kit_version="" +docker_sh="${KIT_ROOT:+$KIT_ROOT/startup/docker.sh}" +if [ -n "$docker_sh" ] && [ -f "$docker_sh" ]; then + kit_version="$(grep -oP '(?<=MEDISWARM_VERSION=)\S+' "$docker_sh" 2>/dev/null | head -1 || true)" + [ -n "$kit_version" ] || kit_version="$(grep -oP '(?<=jefftud/odelia:)\S+' "$docker_sh" 2>/dev/null | head -1 || true)" +fi + log_file="" console_file="" global_model="" @@ -49,6 +57,7 @@ cat > "$OUT_FILE" </dev/null + rsync_cmd "$old_hb_final" "${REMOTE_USER}@${REMOTE_HOST}:${old_remote_dir}/heartbeat_final.json" || true + + # Final sync of the old run's artifacts + old_run_dir="" + if [ -n "$SCRATCHDIR" ] && [ -d "$SCRATCHDIR/runs/$SITE_NAME/$old_run" ]; then + old_run_dir="$SCRATCHDIR/runs/$SITE_NAME/$old_run" + elif [ -d "$STARTUP_DIR/runs/$SITE_NAME/$old_run" ]; then + old_run_dir="$STARTUP_DIR/runs/$SITE_NAME/$old_run" + fi + if [ -n "$old_run_dir" ]; then + rsync_cmd "$old_run_dir/" "${REMOTE_USER}@${REMOTE_HOST}:${old_remote_dir}/run_dir/" || true + fi +} + sync_local() { local run_name run_dir remote_dir hb_file now last run_name="$(find_latest_local_run_name || true)" [ -n "$run_name" ] || return 0 + # If the run changed (new training started), finalize the old one + if [ -n "$CURRENT_LOCAL_RUN" ] && [ "$CURRENT_LOCAL_RUN" != "$run_name" ]; then + _finalize_local_run "$CURRENT_LOCAL_RUN" || true + fi + CURRENT_LOCAL_RUN="$run_name" + # Determine run_dir: check scratch dir first, fall back to startup dir run_dir="" if [ -n "$SCRATCHDIR" ] && [ -d "$SCRATCHDIR/runs/$SITE_NAME/$run_name" ]; then @@ -105,6 +138,7 @@ sync_local() { ensure_remote_dir "$remote_dir" hb_file="$STATE_DIR/local_heartbeat.json" + export SCRATCHDIR "$SCRIPT_DIR/build_heartbeat.sh" "$SITE_NAME" "local" "$KIT_ROOT" "" "$run_name" "running" "$hb_file" >/dev/null rsync_cmd "$hb_file" "${REMOTE_USER}@${REMOTE_HOST}:${remote_dir}/heartbeat.json" || true @@ -210,6 +244,7 @@ final_sync() { remote_dir="$(build_remote_dir "$run_name")" hb_file="$STATE_DIR/local_heartbeat_final.json" + export SCRATCHDIR "$SCRIPT_DIR/build_heartbeat.sh" "$SITE_NAME" "local" "$KIT_ROOT" "" "$run_name" "finished" "$hb_file" >/dev/null rsync_cmd "$hb_file" "${REMOTE_USER}@${REMOTE_HOST}:${remote_dir}/heartbeat_final.json" || true if [ -n "$run_dir" ]; then diff --git a/scripts/build/_injectLiveSyncIntoStartupKits.sh b/scripts/build/_injectLiveSyncIntoStartupKits.sh index 4ad1a17f..94d5c6a5 100755 --- a/scripts/build/_injectLiveSyncIntoStartupKits.sh +++ b/scripts/build/_injectLiveSyncIntoStartupKits.sh @@ -37,116 +37,12 @@ find "$TARGET_FOLDER" -mindepth 1 -maxdepth 1 -type d | while read -r KIT_DIR; d chmod +x "$STARTUP_DIR/build_heartbeat.sh" "$STARTUP_DIR/live_sync.sh" - if [ ! -f "$STARTUP_DIR/docker_original.sh" ]; then - mv "$ORIGINAL_DOCKER_SH" "$STARTUP_DIR/docker_original.sh" - chmod +x "$STARTUP_DIR/docker_original.sh" + # Clean up legacy docker_original.sh wrapper if present from a previous build + if [ -f "$STARTUP_DIR/docker_original.sh" ]; then + rm -f "$STARTUP_DIR/docker_original.sh" fi - cat > "$STARTUP_DIR/docker.sh" <<'EOF' -#!/usr/bin/env bash -set -euo pipefail - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -STARTUP_DIR="$SCRIPT_DIR" -KIT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" -ORIGINAL="$SCRIPT_DIR/docker_original.sh" -STATE_DIR="$STARTUP_DIR/.mediswarm_sync" -mkdir -p "$STATE_DIR" - -MODE="" -SCRATCHDIR="" -SITE_NAME_FROM_ARGS="" - -parse_args() { - while [ "$#" -gt 0 ]; do - case "$1" in - --local_training) - MODE="local" - shift - ;; - --start_client) - MODE="swarm" - shift - ;; - --scratch_dir) - SCRATCHDIR="${2:-}" - shift 2 - ;; - --site_name) - SITE_NAME_FROM_ARGS="${2:-}" - shift 2 - ;; - *) - shift - ;; - esac - done -} - -parse_args "$@" - -SITE_NAME_FALLBACK="$(basename "$KIT_ROOT")" -if [ -n "$SITE_NAME_FROM_ARGS" ]; then - export SITE_NAME="$SITE_NAME_FROM_ARGS" -elif [ -z "${SITE_NAME:-}" ]; then - export SITE_NAME="$SITE_NAME_FALLBACK" -fi - -start_local_sync() { - "$SCRIPT_DIR/live_sync.sh" \ - --mode local \ - --site-name "$SITE_NAME" \ - --kit-root "$KIT_ROOT" \ - --startup-dir "$STARTUP_DIR" \ - --scratch-dir "${SCRATCHDIR:-}" & - SYNC_PID=$! -} - -stop_local_sync() { - if [ -n "${SYNC_PID:-}" ] && kill -0 "$SYNC_PID" >/dev/null 2>&1; then - kill "$SYNC_PID" || true - wait "$SYNC_PID" || true - fi -} - -start_swarm_sync_daemon() { - local pid_file="$STATE_DIR/swarm_sync.pid" - - if [ -f "$pid_file" ]; then - old_pid="$(cat "$pid_file" 2>/dev/null || true)" - if [ -n "$old_pid" ] && kill -0 "$old_pid" >/dev/null 2>&1; then - echo "Live sync daemon already running with PID $old_pid" - return 0 - fi - fi - - nohup "$SCRIPT_DIR/live_sync.sh" \ - --mode swarm \ - --site-name "$SITE_NAME" \ - --kit-root "$KIT_ROOT" \ - --startup-dir "$STARTUP_DIR" \ - --scratch-dir "${SCRATCHDIR:-}" \ - > "$STATE_DIR/live_sync_daemon.log" 2>&1 < /dev/null & - - echo $! > "$pid_file" - echo "Started live sync daemon with PID $(cat "$pid_file")" -} - -if [ "$MODE" = "local" ]; then - trap stop_local_sync EXIT INT TERM - start_local_sync - exec "$ORIGINAL" "$@" -elif [ "$MODE" = "swarm" ]; then - start_swarm_sync_daemon - exec "$ORIGINAL" "$@" -else - exec "$ORIGINAL" "$@" -fi -EOF - - chmod +x "$STARTUP_DIR/docker.sh" - - echo "Patched $STARTUP_DIR/docker.sh" + echo "Injected live sync helpers into $STARTUP_DIR" done echo "Live sync injection finished" \ No newline at end of file diff --git a/scripts/ci/runIntegrationTests.sh b/scripts/ci/runIntegrationTests.sh index 8785164f..b445b39c 100755 --- a/scripts/ci/runIntegrationTests.sh +++ b/scripts/ci/runIntegrationTests.sh @@ -732,12 +732,25 @@ cleanup_synthetic_data () { } +# Helper: remove a directory that may contain root-owned files created by +# Docker containers. Falls back to a disposable Alpine container when the +# regular `rm` fails (which happens on the CI runner because NVFlare runs +# as root inside its container and bind-mounts the workspace). +_rm_rf () { + local dir="$1" + [ -z "$dir" ] && return 0 + [ ! -e "$dir" ] && return 0 + rm -rf "$dir" 2>/dev/null || \ + docker run --rm -v "$(cd "$(dirname "$dir")" && pwd)":/ws alpine \ + rm -rf "/ws/$(basename "$dir")" +} + cleanup_temporary_data () { echo "[Cleanup] Removing synthetic data directory, scratch directory, dummy workspace ..." - rm -rf "$SYNTHETIC_DATA_DIR" - rm -rf "$STAMP_SYNTHETIC_DATA_DIR" - rm -rf "$SCRATCH_DIR" - rm -rf "$PROJECT_DIR" + _rm_rf "$SYNTHETIC_DATA_DIR" + _rm_rf "$STAMP_SYNTHETIC_DATA_DIR" + _rm_rf "$SCRATCH_DIR" + _rm_rf "$PROJECT_DIR" } diff --git a/server_tools/app.py b/server_tools/app.py index a4feba5a..17165b5b 100644 --- a/server_tools/app.py +++ b/server_tools/app.py @@ -2,6 +2,16 @@ Serves a styled web UI that displays live training status, metrics charts, and artifact links for all sites synced by live_sync to /srv/mediswarm/live/. + +Features: +- Filters by site, mode, status, job_id +- Default sort by timestamp (newest first) +- Status inference (stale running → stale, very old → presumed finished) +- Server-side file paths with download links +- Job grouping for swarm runs +- Kit version column +- TensorBoard metric parsing (via tbparse) +- Enriched detail page with full artifact inventory """ from pathlib import Path @@ -15,8 +25,8 @@ from html import escape as html_escape -from fastapi import FastAPI, HTTPException -from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse +from fastapi import FastAPI, HTTPException, Query +from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, FileResponse # Optional: TensorBoard event parsing via tbparse try: @@ -37,28 +47,17 @@ def _safe_segment(value: str) -> str: - """Validate that a URL path segment is safe (no traversal, no slashes). - - Raises HTTPException 400 if the segment looks like a traversal attempt - or contains characters that could escape the BASE directory. - """ if not value or not _SAFE_SEGMENT_RE.match(value) or ".." in value: raise HTTPException(status_code=400, detail="Invalid path segment") return value def _resolve_run_dir(site: str, mode: str, run_id: str) -> Path: - """Build and validate a run directory path under BASE. - - Ensures the resolved path is strictly under BASE to prevent traversal. - Uses os.path.commonpath for robust containment checking. - """ site = _safe_segment(site) mode = _safe_segment(mode) run_id = _safe_segment(run_id) run_dir = (BASE / site / mode / run_id).resolve() base_resolved = BASE.resolve() - # Verify that the resolved path is actually under BASE try: common = os.path.commonpath([str(base_resolved), str(run_dir)]) except ValueError: @@ -84,6 +83,7 @@ def _resolve_run_dir(site: str, mode: str, run_id: str) -> Path: --orange: #e67e22; --gray: #95a5a6; --red: #c0392b; + --purple: #8e44ad; --border: #dfe6e9; --text: #2d3436; --text-light: #636e72; @@ -97,33 +97,62 @@ def _resolve_run_dir(site: str, mode: str, run_id: str) -> Path: header h1 { font-size: 1.4rem; font-weight: 600; } header .meta { font-size: 0.82rem; color: var(--gray); } header .meta a { color: var(--gray); text-decoration: underline; margin-left: 1rem; } -main { max-width: 1400px; margin: 1.5rem auto; padding: 0 1rem; } +main { max-width: 1600px; margin: 1.5rem auto; padding: 0 1rem; } + +/* Filter bar */ +.filter-bar { display: flex; flex-wrap: wrap; gap: 0.6rem; margin-bottom: 1.2rem; + align-items: center; } +.filter-bar label { font-size: 0.82rem; font-weight: 600; color: var(--text-light); } +.filter-bar select, .filter-bar input { font-size: 0.82rem; padding: 4px 10px; + border: 1px solid var(--border); border-radius: 6px; background: var(--card); } +.filter-bar .filter-group { display: flex; align-items: center; gap: 0.3rem; } +.filter-bar .btn-small { display: inline-block; padding: 4px 12px; border-radius: 6px; + background: var(--accent); color: #fff; font-size: 0.8rem; text-decoration: none; + cursor: pointer; border: none; } +.filter-bar .btn-small:hover { background: #16213e; } + table { width: 100%; border-collapse: collapse; background: var(--card); border-radius: 8px; overflow: hidden; box-shadow: 0 1px 4px rgba(0,0,0,0.08); } th { background: var(--accent); color: #fff; text-align: left; - padding: 0.7rem 0.9rem; font-size: 0.82rem; text-transform: uppercase; - letter-spacing: 0.04em; } -td { padding: 0.65rem 0.9rem; border-bottom: 1px solid var(--border); - font-size: 0.88rem; vertical-align: top; } + padding: 0.7rem 0.9rem; font-size: 0.78rem; text-transform: uppercase; + letter-spacing: 0.04em; cursor: pointer; user-select: none; white-space: nowrap; } +th:hover { background: #16213e; } +th .sort-arrow { font-size: 0.7rem; margin-left: 0.3rem; } +td { padding: 0.55rem 0.9rem; border-bottom: 1px solid var(--border); + font-size: 0.85rem; vertical-align: top; } tr:nth-child(even) td { background: #f9fafb; } tr:hover td { background: #eef2f7; } .badge { display: inline-block; padding: 2px 10px; border-radius: 12px; - font-size: 0.78rem; font-weight: 600; color: #fff; } + font-size: 0.75rem; font-weight: 600; color: #fff; } .badge-running { background: var(--green); } .badge-finished { background: var(--blue); } .badge-unknown { background: var(--gray); } .badge-error { background: var(--red); } -.artifact { font-size: 0.8rem; color: var(--text-light); } +.badge-stale { background: var(--orange); } +.artifact { font-size: 0.78rem; color: var(--text-light); } .artifact .yes { color: var(--green); font-weight: 600; } .artifact .no { color: var(--gray); } a { color: var(--accent); text-decoration: none; } a:hover { text-decoration: underline; } -.links a { margin-right: 0.7rem; font-size: 0.82rem; } -.run-id { font-family: var(--mono); font-size: 0.78rem; word-break: break-all; } +.links a { margin-right: 0.5rem; font-size: 0.8rem; } +.run-id { font-family: var(--mono); font-size: 0.75rem; word-break: break-all; } .run-name { font-weight: 500; } .age-stale { color: var(--orange); } .age-dead { color: var(--red); } .empty { text-align: center; padding: 3rem; color: var(--gray); } +.version { font-family: var(--mono); font-size: 0.75rem; color: var(--text-light); } +.job-id { font-family: var(--mono); font-size: 0.72rem; color: var(--purple); } + +/* Job group header */ +.job-group-row td { background: #e8e4f0 !important; font-weight: 600; + font-size: 0.82rem; color: var(--purple); padding: 0.5rem 0.9rem; } + +/* Summary stats */ +.stats-bar { display: flex; gap: 1rem; margin-bottom: 1rem; flex-wrap: wrap; } +.stat-card { background: var(--card); border-radius: 8px; padding: 0.6rem 1.2rem; + box-shadow: 0 1px 3px rgba(0,0,0,0.06); display: flex; align-items: center; gap: 0.5rem; } +.stat-card .stat-num { font-size: 1.4rem; font-weight: 700; color: var(--accent); } +.stat-card .stat-label { font-size: 0.78rem; color: var(--text-light); } /* Detail page */ .detail-grid { display: grid; grid-template-columns: 1fr 1fr; gap: 1.5rem; margin-top: 1.2rem; } @@ -135,15 +164,28 @@ def _resolve_run_dir(site: str, mode: str, run_id: str) -> Path: overflow-x: auto; font-size: 0.78rem; line-height: 1.5; max-height: 400px; overflow-y: auto; } .card table { box-shadow: none; } -.card table th { background: var(--accent); } -.kv-table td:first-child { font-weight: 600; white-space: nowrap; width: 160px; } +.card table th { background: var(--accent); cursor: default; } +.kv-table td:first-child { font-weight: 600; white-space: nowrap; width: 180px; } +.kv-table td:last-child { font-family: var(--mono); font-size: 0.82rem; word-break: break-all; } .btn { display: inline-block; padding: 6px 14px; border-radius: 6px; background: var(--accent); color: #fff; font-size: 0.82rem; - text-decoration: none; margin-right: 0.5rem; } + text-decoration: none; margin-right: 0.5rem; margin-bottom: 0.3rem; } .btn:hover { background: #16213e; text-decoration: none; } -.chart-container { position: relative; width: 100%; height: 320px; } +.btn-download { background: var(--green); } +.btn-download:hover { background: #1e8449; } +.chart-container { position: relative; width: 100%; height: 350px; } .breadcrumb { font-size: 0.85rem; margin-bottom: 1rem; color: var(--text-light); } .breadcrumb a { color: var(--accent); } + +/* File list in detail */ +.file-list { list-style: none; } +.file-list li { padding: 0.3rem 0; border-bottom: 1px solid #f0f0f0; font-size: 0.82rem; + display: flex; align-items: center; gap: 0.5rem; } +.file-list li:last-child { border-bottom: none; } +.file-icon { font-size: 0.9rem; } +.file-path { font-family: var(--mono); font-size: 0.78rem; color: var(--text-light); + word-break: break-all; } +.file-size { font-size: 0.75rem; color: var(--gray); white-space: nowrap; } """ @@ -155,11 +197,12 @@ def _status_badge(status: str) -> str: cls = "badge-finished" elif status in ("error", "failed"): cls = "badge-error" + elif status == "stale": + cls = "badge-stale" return f'{html_escape(status)}' def _age_class(age_str: str) -> str: - """Return a CSS class for stale/dead heartbeats.""" try: secs = int(age_str.rstrip("s")) except (ValueError, AttributeError): @@ -171,7 +214,7 @@ def _age_class(age_str: str) -> str: return "" -def _html_page(title: str, body: str, *, refresh: int = 0) -> str: +def _html_page(title: str, body: str, *, refresh: int = 0, extra_head: str = "") -> str: refresh_tag = ( f'' if refresh else "" ) @@ -184,6 +227,7 @@ def _html_page(title: str, body: str, *, refresh: int = 0) -> str: {refresh_tag} {safe_title} + {extra_head} {body} @@ -191,6 +235,17 @@ def _html_page(title: str, body: str, *, refresh: int = 0) -> str: """ +def _format_size(size_bytes: int) -> str: + if size_bytes < 1024: + return f"{size_bytes} B" + elif size_bytes < 1024 * 1024: + return f"{size_bytes / 1024:.1f} KB" + elif size_bytes < 1024 * 1024 * 1024: + return f"{size_bytes / (1024 * 1024):.1f} MB" + else: + return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB" + + # --------------------------------------------------------------------------- # Data helpers # --------------------------------------------------------------------------- @@ -212,13 +267,25 @@ def parse_age(ts: str) -> str: return f"{secs}s" if secs < 3600: return f"{secs // 60}m {secs % 60}s" - return f"{secs // 3600}h {(secs % 3600) // 60}m" + if secs < 86400: + return f"{secs // 3600}h {(secs % 3600) // 60}m" + return f"{secs // 86400}d {(secs % 86400) // 3600}h" except Exception: return "unknown" +def _age_seconds(ts: str) -> int: + """Return age in seconds from an ISO timestamp, or -1 if unparseable.""" + if not ts: + return -1 + try: + dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) + return int((datetime.now(timezone.utc) - dt).total_seconds()) + except Exception: + return -1 + + def _read_heartbeat(run_dir: Path) -> dict[str, Any]: - """Read the best available heartbeat file (prefer final over live).""" for name in ["heartbeat_final.json", "heartbeat.json"]: p = run_dir / name if p.exists(): @@ -229,8 +296,36 @@ def _read_heartbeat(run_dir: Path) -> dict[str, Any]: return {} +def _infer_status(hb: dict[str, Any], run_dir: Path) -> str: + """Infer the effective status from heartbeat + file system state. + + Rules: + - If heartbeat_final.json exists -> use its status (typically "finished") + - If status is "running" but heartbeat is >5 min old -> "stale" + - If status is "running" but heartbeat is >1 hour old -> "finished" (presumed) + - Otherwise use heartbeat status as-is + """ + has_final = (run_dir / "heartbeat_final.json").exists() + raw_status = hb.get("status", "unknown") + + if has_final: + try: + final = json.loads((run_dir / "heartbeat_final.json").read_text()) + return final.get("status", raw_status) + except Exception: + pass + + if raw_status == "running": + age = _age_seconds(hb.get("timestamp", "")) + if age > 3600: + return "finished" + if age > 300: + return "stale" + + return raw_status + + def _find_csv_files(run_dir: Path) -> list[str]: - """Find class-probability CSV files under run_dir/.""" rd = run_dir / "run_dir" if not rd.exists(): return [] @@ -240,13 +335,49 @@ def _find_csv_files(run_dir: Path) -> list[str]: def _find_tb_events(run_dir: Path) -> list[Path]: - """Find TensorBoard event files under run_dir/.""" rd = run_dir / "run_dir" if not rd.exists(): return [] return sorted(rd.rglob("events.out.tfevents*")) +def _find_checkpoints(run_dir: Path) -> list[dict[str, Any]]: + """Find all checkpoint files under run_dir/run_dir/.""" + rd = run_dir / "run_dir" + results = [] + if not rd.exists(): + return results + for p in sorted(rd.rglob("*.ckpt")): + results.append({ + "name": p.name, + "rel_path": str(p.relative_to(run_dir)), + "size": p.stat().st_size if p.exists() else 0, + "server_path": str(p), + }) + return results + + +def _find_all_files(run_dir: Path) -> list[dict[str, Any]]: + """Find all files in the run directory with metadata.""" + results = [] + if not run_dir.exists(): + return results + for p in sorted(run_dir.rglob("*")): + if p.is_file(): + try: + stat = p.stat() + results.append({ + "name": p.name, + "rel_path": str(p.relative_to(run_dir)), + "size": stat.st_size, + "mtime": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(), + "server_path": str(p), + }) + except Exception: + pass + return results + + def rows() -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] if not BASE.exists(): @@ -258,6 +389,23 @@ def rows() -> list[dict[str, Any]]: hb = _read_heartbeat(run_dir) ts = hb.get("timestamp", "") age = parse_age(ts) + status = _infer_status(hb, run_dir) + csv_files = _find_csv_files(run_dir) + tb_events = _find_tb_events(run_dir) + checkpoints = _find_checkpoints(run_dir) + + # Count total files and size + total_files = 0 + total_size = 0 + rd = run_dir / "run_dir" + if rd.exists(): + for f in rd.rglob("*"): + if f.is_file(): + total_files += 1 + try: + total_size += f.stat().st_size + except Exception: + pass out.append( { @@ -266,20 +414,28 @@ def rows() -> list[dict[str, Any]]: "run_id": run_dir.name, "run_name": hb.get("run_name", ""), "job_id": hb.get("job_id", ""), - "status": hb.get("status", "unknown"), + "status": status, + "raw_status": hb.get("status", "unknown"), "timestamp": ts, "age": age, + "age_seconds": _age_seconds(ts), + "kit_version": hb.get("kit_version", ""), "has_console": (run_dir / "nohup.out").exists() or (run_dir / "local_training_console_output.txt").exists(), "has_log": (run_dir / "log.txt").exists(), - "last_ckpt": bool(hb.get("last_ckpt")), - "epoch_ckpt": bool(hb.get("epoch_ckpt")), - "global_model": bool(hb.get("global_model")), - "best_global_model": bool(hb.get("best_global_model")), - "csv_files": _find_csv_files(run_dir), - "tb_events": bool(_find_tb_events(run_dir)), + "has_global_model": (run_dir / "FL_global_model.pt").exists(), + "has_best_model": (run_dir / "best_FL_global_model.pt").exists(), + "checkpoints": len(checkpoints), + "csv_files": csv_files, + "tb_events": len(tb_events), + "total_files": total_files, + "total_size": total_size, + "server_path": str(run_dir), } ) + + # Sort by timestamp descending (newest first) + out.sort(key=lambda x: x.get("timestamp", ""), reverse=True) return out @@ -293,11 +449,10 @@ def rows() -> list[dict[str, Any]]: def parse_console_metrics(text: str) -> dict[str, Any]: - """Extract epoch-level ACC and AUC_ROC from console output.""" data: dict[str, dict[int, dict[str, float]]] = {} for m in _EPOCH_RE.finditer(text): epoch = int(m.group(1)) - phase = m.group(2) # train / val / test + phase = m.group(2) acc = float(m.group(3)) auc = float(m.group(4)) data.setdefault(phase, {})[epoch] = {"acc": acc, "auc_roc": auc} @@ -325,15 +480,65 @@ def _get_console_text(site: str, mode: str, run_id: str) -> str: return "" +def _extract_training_summary(text: str) -> dict[str, Any]: + """Extract a training summary from console output.""" + summary: dict[str, Any] = {} + + # Total epochs + epochs = _EPOCH_RE.findall(text) + if epochs: + all_epochs = [int(e[0]) for e in epochs] + summary["total_epochs"] = max(all_epochs) + 1 + summary["last_epoch"] = max(all_epochs) + + # Best checkpoint + best_match = re.findall( + r"Epoch\s+(\d+)\s*-\s*val\s+ACC:\s*([\d.]+),\s*AUC_ROC:\s*([\d.]+)", text + ) + if best_match: + best_auc = max(best_match, key=lambda x: float(x[2])) + summary["best_val_epoch"] = int(best_auc[0]) + summary["best_val_acc"] = float(best_auc[1]) + summary["best_val_auc_roc"] = float(best_auc[2]) + + # Final train metrics + train_match = re.findall( + r"Epoch\s+(\d+)\s*-\s*train\s+ACC:\s*([\d.]+),\s*AUC_ROC:\s*([\d.]+)", text + ) + if train_match: + last_train = train_match[-1] + summary["final_train_acc"] = float(last_train[1]) + summary["final_train_auc_roc"] = float(last_train[2]) + + # NVFlare round info (swarm mode) + round_matches = re.findall(r"(?:Round|round)\s+(\d+)", text) + if round_matches: + summary["total_rounds"] = max(int(r) for r in round_matches) + + return summary + + # --------------------------------------------------------------------------- # Index page # --------------------------------------------------------------------------- @app.get("/", response_class=HTMLResponse) -def index(): +def index( + site_filter: str = Query("", alias="site"), + mode_filter: str = Query("", alias="mode"), + status_filter: str = Query("", alias="status"), + job_filter: str = Query("", alias="job"), + group_by_job: bool = Query(False, alias="group"), +): r = rows() - now_str = datetime.now().strftime("%H:%M:%S") + now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # Collect unique values for filters + all_sites = sorted({x["site"] for x in r}) + all_modes = sorted({x["mode"] for x in r}) + all_statuses = sorted({x["status"] for x in r}) + all_jobs = sorted({x["job_id"] for x in r if x["job_id"]}) if not r: body = f""" @@ -345,64 +550,105 @@ def index():
No training runs found under {BASE}
""" return _html_page("MediSwarm Monitor", body, refresh=30) + # Apply filters + filtered = r + if site_filter: + filtered = [x for x in filtered if x["site"] == site_filter] + if mode_filter: + filtered = [x for x in filtered if x["mode"] == mode_filter] + if status_filter: + filtered = [x for x in filtered if x["status"] == status_filter] + if job_filter: + filtered = [x for x in filtered if x["job_id"] == job_filter] + + # Stats + n_total = len(filtered) + n_running = sum(1 for x in filtered if x["status"] == "running") + n_finished = sum(1 for x in filtered if x["status"] == "finished") + n_stale = sum(1 for x in filtered if x["status"] == "stale") + n_sites = len({x["site"] for x in filtered}) + + stats_html = f""" +
+
{n_total}Total Runs
+
{n_running}Running
+
{n_finished}Finished
+
{n_stale}Stale
+
{n_sites}Sites
+
""" + + # Filter bar + def _select_opts(name: str, values: list[str], current: str) -> str: + opts = '' + for v in values: + sel = " selected" if v == current else "" + opts += f'' + return f'' + + group_checked = " checked" if group_by_job else "" + filter_html = f""" +
+
+ {_select_opts("site", all_sites, site_filter)} +
+
+ {_select_opts("mode", all_modes, mode_filter)} +
+
+ {_select_opts("status", all_statuses, status_filter)} +
+
+ {_select_opts("job", all_jobs, job_filter)} +
+
+ +
+ Clear Filters +
""" + + # Build table rows table_rows = [] - for x in r: - # Links - links = [] - detail = f"/detail/{x['site']}/{x['mode']}/{x['run_id']}" - links.append(f'Details') - links.append( - f"heartbeat" - ) - if x["has_console"]: - label = "nohup" if x["mode"] == "swarm" else "console" - links.append( - f"{label}" + + if group_by_job and not job_filter: + # Group swarm runs by job_id, local runs standalone + from collections import OrderedDict + + job_groups: OrderedDict[str, list[dict]] = OrderedDict() + standalone: list[dict] = [] + + for x in filtered: + if x["job_id"]: + job_groups.setdefault(x["job_id"], []).append(x) + else: + standalone.append(x) + + for job_id, items in job_groups.items(): + n_items = len(items) + sites = ", ".join(sorted({x["site"] for x in items})) + statuses = {x["status"] for x in items} + job_status = ( + "running" + if "running" in statuses + else ("stale" if "stale" in statuses else "finished") ) - if x["has_log"]: - links.append( - f"log" + run_name = items[0].get("run_name", "") if items else "" + table_rows.append( + f""" + Job: {html_escape(job_id)} +  ·  {n_items} client(s): {html_escape(sites)} +  ·  {_status_badge(job_status)} + {f' · {html_escape(run_name)}' if run_name else ''} + """ ) + for x in items: + table_rows.append(_build_table_row(x)) - # Artifacts - arts = [] - if x["last_ckpt"]: - arts.append('last.ckpt') - if x["epoch_ckpt"]: - arts.append('epoch.ckpt') - if x["global_model"]: - arts.append('FL_global') - if x["best_global_model"]: - arts.append('best_FL') - if x["csv_files"]: - arts.append(f'{len(x["csv_files"])} CSV') - if x["tb_events"]: - arts.append('TFEvents') - if not arts: - arts.append('none') - - # Run display - run_display = "" - if x["run_name"]: - run_display = f'{html_escape(x["run_name"])}
' - run_display += f'{html_escape(x["run_id"])}' - - age_cls = _age_class(x["age"]) - age_td = ( - f'{x["age"]}' if age_cls else x["age"] - ) - - table_rows.append( - f""" - {html_escape(x['site'])} - {html_escape(x['mode'])} - {run_display} - {_status_badge(x['status'])} - {age_td} - {' · '.join(arts)} - {' '.join(links)} -""" - ) + for x in standalone: + table_rows.append(_build_table_row(x)) + else: + for x in filtered: + table_rows.append(_build_table_row(x)) body = f"""
@@ -412,10 +658,12 @@ def index(): · API
+{stats_html} +{filter_html} - + {''.join(table_rows)} @@ -425,6 +673,78 @@ def index(): return _html_page("MediSwarm Monitor", body, refresh=30) +def _build_table_row(x: dict[str, Any]) -> str: + """Build a single for the index table.""" + # Links + links = [] + detail = f"/detail/{x['site']}/{x['mode']}/{x['run_id']}" + links.append(f'Details') + if x["has_console"]: + label = "nohup" if x["mode"] == "swarm" else "console" + links.append( + f"{label}" + ) + if x["has_log"]: + links.append( + f"log" + ) + + # Artifacts + arts = [] + if x["checkpoints"]: + arts.append(f'{x["checkpoints"]} ckpt') + if x["has_global_model"]: + arts.append('FL_global') + if x["has_best_model"]: + arts.append('best_FL') + if x["csv_files"]: + arts.append(f'{len(x["csv_files"])} CSV') + if x["tb_events"]: + arts.append(f'{x["tb_events"]} TFE') + if not arts: + arts.append('none') + + # Run display + run_display = "" + if x["run_name"]: + run_display = ( + f'{html_escape(x["run_name"])}
' + ) + run_display += f'{html_escape(x["run_id"])}' + if x["job_id"]: + run_display += f'
job: {html_escape(x["job_id"][:8])}...' + + age_cls = _age_class(x["age"]) + age_td = ( + f'{x["age"]}' if age_cls else x["age"] + ) + + version = ( + f'{html_escape(x["kit_version"])}' + if x["kit_version"] + else '-' + ) + + size_str = _format_size(x["total_size"]) if x["total_size"] else "-" + server_path = ( + f'' + f'{html_escape(x["server_path"][-40:])}' + ) + + return f""" + + + + + + + + + + +""" + + # --------------------------------------------------------------------------- # Detail page # --------------------------------------------------------------------------- @@ -434,61 +754,168 @@ def index(): def detail(site: str, mode: str, run_id: str): run_dir = _resolve_run_dir(site, mode, run_id) hb = _read_heartbeat(run_dir) + status = _infer_status(hb, run_dir) console_text = _get_console_text(site, mode, run_id) metrics = parse_console_metrics(console_text) csv_files = _find_csv_files(run_dir) - has_tb = bool(_find_tb_events(run_dir)) - - # Heartbeat info card + tb_events = _find_tb_events(run_dir) + checkpoints = _find_checkpoints(run_dir) + all_files = _find_all_files(run_dir) + training_summary = _extract_training_summary(console_text) + + # -- Heartbeat info card -- + hb_display_keys = [ + ("site_name", "Site Name"), + ("mode", "Mode"), + ("job_id", "Job ID"), + ("run_name", "Run Name"), + ("timestamp", "Last Heartbeat"), + ("status", "Raw Status"), + ("kit_version", "Kit Version"), + ("kit_root", "Kit Root (client)"), + ("run_dir", "Run Dir (client)"), + ("log_file", "Log File (client)"), + ("console_file", "Console File (client)"), + ("global_model", "Global Model (client)"), + ("best_global_model", "Best Global Model (client)"), + ("last_ckpt", "Last Checkpoint (client)"), + ("epoch_ckpt", "Epoch Checkpoint (client)"), + ("tb_file", "TensorBoard File (client)"), + ] hb_rows = "" - for key in [ - "site_name", - "mode", - "job_id", - "run_name", - "timestamp", - "status", - "run_dir", - "last_ckpt", - "epoch_ckpt", - "global_model", - "best_global_model", - "tb_file", - ]: + # Add inferred status first + hb_rows += f"\n" + for key, label in hb_display_keys: val = hb.get(key, "") if val: - hb_rows += f"\n" + hb_rows += f"\n" + # Add age + ts = hb.get("timestamp", "") + if ts: + hb_rows += f"\n" if not hb_rows: hb_rows = '' - # CSV links - csv_links = "" + # -- Training summary card -- + summary_html = "" + if training_summary: + summary_rows = "" + if "total_epochs" in training_summary: + summary_rows += ( + f"" + ) + if "best_val_epoch" in training_summary: + summary_rows += ( + f"" + f"" + ) + if "final_train_acc" in training_summary: + summary_rows += ( + f"" + f"" + ) + if "total_rounds" in training_summary: + summary_rows += ( + f"" + ) + if summary_rows: + summary_html = f""" +
+

Training Summary

+
SiteModeRunStatusAgeArtifactsLinksVersionArtifactsSizeServer PathLinks
{html_escape(x['site'])}{html_escape(x['mode'])}{run_display}{_status_badge(x['status'])}{age_td}{version}{' · '.join(arts)}{size_str}{server_path}
Effective Status{_status_badge(status)}
{html_escape(str(key))}{html_escape(str(val))}
{html_escape(label)}{html_escape(str(val))}
Heartbeat Age{parse_age(ts)}
No heartbeat data available
Total Epochs{training_summary['total_epochs']}
Best ValidationEpoch {training_summary['best_val_epoch']} — " + f"ACC: {training_summary['best_val_acc']:.4f}, " + f"AUC_ROC: {training_summary['best_val_auc_roc']:.4f}
Final TrainingACC: {training_summary['final_train_acc']:.4f}, " + f"AUC_ROC: {training_summary['final_train_auc_roc']:.4f}
FL Rounds{training_summary['total_rounds']}
{summary_rows}
+""" + + # -- Server-side files card -- + file_list_html = "" + if all_files: + items = "" + for f in all_files: + size = _format_size(f["size"]) + dl_url = f"/download/{site}/{mode}/{run_id}/{f['rel_path']}" + icon = "📄" + if f["name"].endswith(".csv"): + icon = "📊" + elif f["name"].endswith(".ckpt") or f["name"].endswith(".pt"): + icon = "📦" + elif f["name"].endswith(".json") or f["name"].endswith(".yaml"): + icon = "📝" + elif "tfevents" in f["name"]: + icon = "📈" + items += ( + f'
  • {icon}' + f'' + f'{html_escape(f["rel_path"])}' + f'
    {html_escape(f["server_path"])}' + f'
    ' + f'{size}' + f'Download' + f'
  • ' + ) + total_size = _format_size(sum(f["size"] for f in all_files)) + file_list_html = f""" +
    +

    Server Files ({len(all_files)} files, {total_size} total)

    +

    + Server directory: {html_escape(str(run_dir))}

    +
      {items}
    +
    """ + + # -- CSV links card -- + csv_links_html = "" if csv_files: csv_items = "".join( - f'
  • {html_escape(f)}
  • ' + f'
  • {html_escape(f)}' + f' · Download
  • ' for f in csv_files ) - csv_links = f"" - else: - csv_links = "

    No CSV result files found.

    " + csv_links_html = f""" +
    +

    Result CSVs

    +
      {csv_items}
    +
    """ + + # -- Checkpoints card -- + ckpt_html = "" + if checkpoints: + ckpt_items = "".join( + f'
  • 📦' + f'{html_escape(c["name"])}' + f'
    {html_escape(c["server_path"])}
    ' + f'{_format_size(c["size"])}' + f'Download' + f'
  • ' + for c in checkpoints + ) + ckpt_html = f""" +
    +

    Checkpoints ({len(checkpoints)})

    +
      {ckpt_items}
    +
    """ - # Console snippet (last 200 lines) + # -- Console snippet (last 300 lines) -- console_lines = console_text.strip().split("\n") - console_tail = "\n".join(console_lines[-200:]) if console_lines else "No output." - # Escape HTML in console output - console_tail = ( + console_tail = "\n".join(console_lines[-300:]) if console_lines else "No output." + console_tail_escaped = ( console_tail.replace("&", "&").replace("<", "<").replace(">", ">") ) + console_len = len(console_lines) - # Chart section + # -- Chart section -- chart_html = "" if metrics["epochs"]: chart_html = f"""
    -

    Training Metrics

    +

    Training Metrics (from console)

    - """ - # TensorBoard metrics link + # -- TensorBoard metrics -- tb_html = "" - if has_tb and HAS_TBPARSE: - tb_html = f""" + if tb_events and HAS_TBPARSE: + # Parse and display inline + try: + reader = SummaryReader(str(tb_events[0].parent)) + df = reader.scalars + tags = sorted(df["tag"].unique()) if len(df) > 0 else [] + tb_data: dict[str, Any] = {} + for tag in tags: + subset = df[df["tag"] == tag].sort_values("step") + tb_data[tag] = { + "steps": subset["step"].tolist(), + "values": subset["value"].tolist(), + } + if tb_data: + tb_html = f""" +
    +

    TensorBoard Metrics ({len(tags)} tags)

    +
    +
    + Raw JSON +
    +
    +""" + except Exception as e: + tb_html = f"""

    TensorBoard Metrics

    -

    TensorBoard events available. - View raw JSON

    +

    Error parsing TensorBoard events: {html_escape(str(e))}

    + Try raw JSON
    """ - elif has_tb: - tb_html = """ + elif tb_events: + tb_html = f"""

    TensorBoard Metrics

    -

    TensorBoard events found but tbparse is not installed. - Install with pip install tbparse to enable parsing.

    +

    Found {len(tb_events)} TensorBoard event file(s).

    +

    tbparse is {'installed' if HAS_TBPARSE else + 'not installed — install with pip install tbparse to enable parsing'}.

    """ + # -- Models card -- + model_html = "" + model_files = [] + for mname in [ + "FL_global_model.pt", + "best_FL_global_model.pt", + "last_global_model.ckpt", + ]: + mp = run_dir / mname + if not mp.exists(): + mp = run_dir / "run_dir" / mname + if mp.exists(): + model_files.append( + { + "name": mname, + "size": mp.stat().st_size, + "path": str(mp), + "rel_path": str(mp.relative_to(run_dir)), + } + ) + if model_files: + model_items = "".join( + f'
  • 🧠' + f'{html_escape(m["name"])}' + f'
    {html_escape(m["path"])}
    ' + f'{_format_size(m["size"])}' + f'Download' + f"
  • " + for m in model_files + ) + model_html = f""" +
    +

    Models

    +
      {model_items}
    +
    """ + + log_btn = "" + if (run_dir / "log.txt").exists(): + log_btn = f'Full NVFlare Log' + body = f"""

    MediSwarm Live Monitor

    @@ -556,30 +1084,75 @@ def detail(site: str, mode: str, run_id: str):
    -

    Heartbeat

    +

    Heartbeat & Status

    {hb_rows}
    -
    - -
    -

    Artifacts & CSVs

    - {csv_links}
    - Raw heartbeat - Full console - {"Full log" if (run_dir / "log.txt").exists() else ""} + Raw Heartbeat JSON
    + {summary_html} + {csv_links_html} + {ckpt_html} + {model_html} + {chart_html} {tb_html} + {file_list_html}
    -

    Console Output (last 200 lines)

    -
    {console_tail}
    +

    Console Output (last 300 of {console_len} lines)

    +
    + Full Console Output + {log_btn} +
    +
    {console_tail_escaped}
    """ - return _html_page(f"{html_escape(site)}/{html_escape(mode)}/{html_escape(run_id)} — MediSwarm", body) + # Include Chart.js if any chart is rendered + needs_chartjs = bool(chart_html) or bool(tb_html and HAS_TBPARSE and tb_events) + chartjs_head = '' if needs_chartjs else "" + + return _html_page( + f"{html_escape(site)}/{html_escape(mode)}/{html_escape(run_id)} — MediSwarm", + body, + extra_head=chartjs_head, + ) + + +# --------------------------------------------------------------------------- +# File download endpoint +# --------------------------------------------------------------------------- + + +@app.get("/download/{site}/{mode}/{run_id}/{file_path:path}") +def download_file(site: str, mode: str, run_id: str, file_path: str): + """Download any file from a run directory.""" + run_dir = _resolve_run_dir(site, mode, run_id) + + # Prevent traversal in file_path + if ".." in file_path: + raise HTTPException(status_code=400, detail="Invalid file path") + + target = (run_dir / file_path).resolve() + + # Ensure target is under run_dir + try: + common = os.path.commonpath([str(run_dir.resolve()), str(target)]) + except ValueError: + raise HTTPException(status_code=400, detail="Invalid path") + if common != str(run_dir.resolve()): + raise HTTPException(status_code=400, detail="Invalid path") + + if not target.exists() or not target.is_file(): + raise HTTPException(status_code=404, detail="File not found") + + return FileResponse( + path=str(target), + filename=target.name, + media_type="application/octet-stream", + ) # --------------------------------------------------------------------------- @@ -615,14 +1188,12 @@ def log(site: str, mode: str, run_id: str): @app.get("/metrics/{site}/{mode}/{run_id}", response_class=JSONResponse) def metrics(site: str, mode: str, run_id: str): - """Return parsed training metrics from console output as JSON.""" text = _get_console_text(site, mode, run_id) return parse_console_metrics(text) @app.get("/tb_metrics/{site}/{mode}/{run_id}", response_class=JSONResponse) def tb_metrics(site: str, mode: str, run_id: str): - """Return TensorBoard scalar metrics as JSON (requires tbparse).""" if not HAS_TBPARSE: return JSONResponse( {"error": "tbparse is not installed"}, status_code=501 @@ -630,11 +1201,12 @@ def tb_metrics(site: str, mode: str, run_id: str): validated_dir = _resolve_run_dir(site, mode, run_id) run_dir = validated_dir / "run_dir" - events = sorted(run_dir.rglob("events.out.tfevents*")) if run_dir.exists() else [] + events = ( + sorted(run_dir.rglob("events.out.tfevents*")) if run_dir.exists() else [] + ) if not events: return {"scalars": []} - # Parse the directory containing events try: reader = SummaryReader(str(events[0].parent)) df = reader.scalars @@ -654,15 +1226,12 @@ def tb_metrics(site: str, mode: str, run_id: str): @app.get("/csv/{site}/{mode}/{run_id}/{filename}", response_class=HTMLResponse) def csv_view(site: str, mode: str, run_id: str, filename: str): - """Render a CSV file as a styled HTML table.""" - # Sanitize filename to prevent directory traversal safe_name = Path(filename).name if not safe_name or safe_name != filename or ".." in filename or "/" in filename: return HTMLResponse("

    Invalid filename

    ", status_code=400) validated_dir = _resolve_run_dir(site, mode, run_id) rd = validated_dir / "run_dir" - # Search recursively for the file matches = list(rd.rglob(safe_name)) if rd.exists() else [] if not matches: return HTMLResponse("

    File not found

    ", status_code=404) @@ -680,7 +1249,7 @@ def csv_view(site: str, mode: str, run_id: str, filename: str): th = "".join(f"{html_escape(h)}" for h in headers) trs = "" - for row in data_rows[:500]: # limit display to 500 rows + for row in data_rows[:500]: tds = "".join(f"{html_escape(cell)}" for cell in row) trs += f"{tds}\n" @@ -690,6 +1259,8 @@ def csv_view(site: str, mode: str, run_id: str, filename: str): else "" ) + dl_url = f"/download/{site}/{mode}/{run_id}/run_dir/{safe_name}" + body = f"""

    MediSwarm Live Monitor

    @@ -703,6 +1274,10 @@ def csv_view(site: str, mode: str, run_id: str, filename: str):

    {html_escape(safe_name)}

    +
    + Download CSV + Server: {html_escape(str(csv_path))} +
    {truncated}
    {th}{trs}
    @@ -719,19 +1294,30 @@ def csv_view(site: str, mode: str, run_id: str, filename: str): @app.get("/api/runs", response_class=JSONResponse) def api_runs(): - """Return all runs as JSON.""" return rows() @app.get("/api/metrics/{site}/{mode}/{run_id}", response_class=JSONResponse) def api_metrics(site: str, mode: str, run_id: str): - """Return parsed training metrics as JSON (alias for /metrics/).""" text = _get_console_text(site, mode, run_id) return parse_console_metrics(text) @app.get("/api/heartbeat/{site}/{mode}/{run_id}", response_class=JSONResponse) def api_heartbeat(site: str, mode: str, run_id: str): - """Return heartbeat JSON directly.""" run_dir = _resolve_run_dir(site, mode, run_id) return _read_heartbeat(run_dir) + + +@app.get("/api/files/{site}/{mode}/{run_id}", response_class=JSONResponse) +def api_files(site: str, mode: str, run_id: str): + """Return all files in a run directory as JSON.""" + run_dir = _resolve_run_dir(site, mode, run_id) + return _find_all_files(run_dir) + + +@app.get("/api/summary/{site}/{mode}/{run_id}", response_class=JSONResponse) +def api_summary(site: str, mode: str, run_id: str): + """Return training summary extracted from console output.""" + text = _get_console_text(site, mode, run_id) + return _extract_training_summary(text)