diff --git a/.github/workflows/pr-test.yaml b/.github/workflows/pr-test.yaml index 6f192a76..6d094a95 100644 --- a/.github/workflows/pr-test.yaml +++ b/.github/workflows/pr-test.yaml @@ -11,6 +11,12 @@ on: permissions: contents: read +# Only one validation run at a time on the self-hosted runner to prevent +# resource contention (GPU, Docker, disk) that causes flaky test failures. +concurrency: + group: pr-validation-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: validate-swarm: runs-on: self-hosted @@ -36,6 +42,16 @@ jobs: docker run --rm -v "$GITHUB_WORKSPACE":/ws alpine sh -c \ 'find /ws -mindepth 1 -maxdepth 1 -not -name ".git" -exec chown -R '"$(id -u):$(id -g)"' {} + 2>/dev/null || true' + - name: Reclaim disk space (prune stale Docker resources) + run: | + echo "Disk usage before cleanup:" + df -h / | tail -1 + # Remove stopped containers, dangling images, and build cache older + # than 24 h to prevent "no space left on device" failures. + docker system prune -f --filter "until=24h" || true + echo "Disk usage after cleanup:" + df -h / | tail -1 + - name: Checkout repository (with submodules) uses: actions/checkout@v4 with: diff --git a/application/jobs/_shared/custom/models/challenge/1DivideAndConquer/model.py b/application/jobs/_shared/custom/models/challenge/1DivideAndConquer/model.py index 1d73c33a..b51cd084 100644 --- a/application/jobs/_shared/custom/models/challenge/1DivideAndConquer/model.py +++ b/application/jobs/_shared/custom/models/challenge/1DivideAndConquer/model.py @@ -405,8 +405,11 @@ def map_key(unet_key): print("No matching encoder weights found.") return self -def create_model(num_classes: int = 3, n_input_channels = 1, spatial_dims=3, pretrained_path=None) -> BasicClassifier: - model = ResidualEncoderClsLightning(in_ch=n_input_channels, out_ch=num_classes, spatial_dims=spatial_dims) +def create_model(num_classes: int = 3, n_input_channels = 1, spatial_dims=3, pretrained_path=None, loss_kwargs=None) -> BasicClassifier: + model = ResidualEncoderClsLightning( + in_ch=n_input_channels, out_ch=num_classes, spatial_dims=spatial_dims, + loss_kwargs=loss_kwargs if loss_kwargs is not None else {}, + ) if pretrained_path: model.load_pretrained_unet_encoder(pretrained_path, verbose=True) return model diff --git a/application/jobs/_shared/custom/models/challenge/2bcnaim/swinunetr.py b/application/jobs/_shared/custom/models/challenge/2bcnaim/swinunetr.py index 9032665c..f6475975 100644 --- a/application/jobs/_shared/custom/models/challenge/2bcnaim/swinunetr.py +++ b/application/jobs/_shared/custom/models/challenge/2bcnaim/swinunetr.py @@ -1150,7 +1150,10 @@ def forward(self, x): return cls_output # seg_output, cls_output, hidden_states -def create_model(img_size: int, num_classes: int = 3, n_input_channels = 1, spatial_dims=3) -> BasicClassifier: +def create_model(img_size: int, num_classes: int = 3, n_input_channels = 1, spatial_dims=3, loss_kwargs=None) -> BasicClassifier: model = SwinUNETRMultiTask(img_size=img_size, in_channels=n_input_channels, out_seg_channels=2, out_cls_classes=num_classes, spatial_dims=spatial_dims) - wrapped_model = ModelWrapper(backbone=model, in_ch=n_input_channels, num_classes=num_classes) + wrapped_model = ModelWrapper( + backbone=model, in_ch=n_input_channels, num_classes=num_classes, + loss_kwargs=loss_kwargs if loss_kwargs is not None else {}, + ) return wrapped_model diff --git a/application/jobs/_shared/custom/models/challenge/4abmil/model.py b/application/jobs/_shared/custom/models/challenge/4abmil/model.py index a430eba5..34441445 100644 --- a/application/jobs/_shared/custom/models/challenge/4abmil/model.py +++ b/application/jobs/_shared/custom/models/challenge/4abmil/model.py @@ -215,14 +215,17 @@ def forward(self, volume: torch.Tensor): return out #A.squeeze(-1) # logits, attention weights -def create_model(model_type = "swin", n_input_channels: int = 3, num_classes: int = 3) -> BasicClassifier: - +def create_model(model_type = "swin", n_input_channels: int = 3, num_classes: int = 3, loss_kwargs=None) -> BasicClassifier: + #config = pd.read_csv(config_path, skip_blank_lines=True, na_values=['NaN']).iloc[0] if model_type =="swin_cross": model = CrossModalAttentionABMIL_Swin(num_classes=num_classes) else: model = ABMIL_Swin(num_classes=num_classes) - wrapped_model = ModelWrapper(backbone=model, in_ch=n_input_channels, num_classes=num_classes) + wrapped_model = ModelWrapper( + backbone=model, in_ch=n_input_channels, num_classes=num_classes, + loss_kwargs=loss_kwargs if loss_kwargs is not None else {}, + ) return wrapped_model diff --git a/application/jobs/_shared/custom/models/challenge/5pimed/model.py b/application/jobs/_shared/custom/models/challenge/5pimed/model.py index f286200a..ddf859ea 100644 --- a/application/jobs/_shared/custom/models/challenge/5pimed/model.py +++ b/application/jobs/_shared/custom/models/challenge/5pimed/model.py @@ -55,7 +55,10 @@ def forward(self, x: torch.Tensor) -> torch.Tensor: return self.net(x) -def create_model(model_name: str, num_classes: int = 3, n_input_channels = 1, spatial_dims = 3, norm: str = "batch") -> nn.Module: +def create_model(model_name: str, num_classes: int = 3, n_input_channels = 1, spatial_dims = 3, norm: str = "batch", loss_kwargs=None) -> nn.Module: model = Resnet(model_name=model_name, num_classes=num_classes, norm=norm) - wrapped_model = ModelWrapper(backbone=model, in_ch=n_input_channels, num_classes=num_classes) + wrapped_model = ModelWrapper( + backbone=model, in_ch=n_input_channels, num_classes=num_classes, + loss_kwargs=loss_kwargs if loss_kwargs is not None else {}, + ) return wrapped_model diff --git a/application/jobs/_shared/custom/models/models_config.py b/application/jobs/_shared/custom/models/models_config.py index 40a08741..e42b0461 100644 --- a/application/jobs/_shared/custom/models/models_config.py +++ b/application/jobs/_shared/custom/models/models_config.py @@ -199,11 +199,30 @@ def create_model(logger=None, model_name: str = None, num_classes: int = 3, if "pretrained_path" in persistor_args: rel_path = persistor_args["pretrained_path"].rstrip(".") - persistor_args["pretrained_path"] = os.path.join( + # Primary: look relative to the model's challenge directory + primary_path = os.path.join( base_dir, "challenge", team_name, rel_path ) + # Fallback: Docker image stores weights at /MediSwarm/pretrained_weights/ + # to avoid bloating NVFlare job transfers (see _cacheAndCopyPretrainedModelWeights.sh) + fallback_path = os.path.join("/MediSwarm/pretrained_weights", rel_path) + if os.path.isfile(primary_path): + persistor_args["pretrained_path"] = primary_path + elif os.path.isfile(fallback_path): + persistor_args["pretrained_path"] = fallback_path + logger.info(f'Using fallback pretrained weights from Docker image: {fallback_path}') + else: + persistor_args["pretrained_path"] = primary_path # let it fail with original path logger.info(f'__________ model path is : {persistor_args["pretrained_path"]}') + # Forward loss_kwargs (e.g. class weights) so the challenge model + # registers the same buffers as the server-side persistor model. + # Without this, _class_weight may appear as a buffer on one side + # but not the other, causing state_dict mismatches during + # federated aggregation. + if loss_kwargs: + persistor_args["loss_kwargs"] = loss_kwargs + factory_fn = getattr(module, func_name) logger.info(f"Now access {persistor_args} from module {module}") return factory_fn(**persistor_args) diff --git a/deploy_and_test.sh b/deploy_and_test.sh index dfb73b04..0d4a14f8 100755 --- a/deploy_and_test.sh +++ b/deploy_and_test.sh @@ -244,9 +244,16 @@ cmd_start_server() { } cmd_start_clients() { + local model_name="${1:-}" step "Starting NVFlare clients on remote sites" check_dependencies + local model_flag="" + if [[ -n "$model_name" ]]; then + model_flag="--model_name '$model_name'" + info "Using MODEL_NAME=$model_name" + fi + for site in "${SITES[@]}"; do local site_name host deploy_dir datadir scratchdir gpu site_name=$(site_var "$site" SITE_NAME) @@ -264,7 +271,7 @@ cmd_start_clients() { export SITE_NAME='$site_name' && \ export DATADIR='$datadir' && \ export SCRATCHDIR='$scratchdir' && \ - ./docker.sh --data_dir '$datadir' --scratch_dir '$scratchdir' --GPU '$gpu' --start_client" + ./docker.sh --data_dir '$datadir' --scratch_dir '$scratchdir' --GPU '$gpu' $model_flag --start_client" ok " Client started on $site_name" done @@ -272,6 +279,20 @@ cmd_start_clients() { ok "All clients started" } +# Map job directory names to the MODEL_NAME env var expected by the model factory. +job_to_model_name() { + local job="$1" + case "$job" in + challenge_1DivideAndConquer) echo "1DivideAndConquer" ;; + challenge_2BCN_AIM) echo "2BCN_AIM" ;; + challenge_3agaldran) echo "3agaldran" ;; + challenge_4abmil) echo "4LME_ABMIL" ;; + challenge_5pimed) echo "5Pimed" ;; + ODELIA_ternary_classification) echo "MST" ;; + *) echo "MST" ;; + esac +} + cmd_submit() { local job_name="${1:-$DEFAULT_JOB}" step "Submitting job: $job_name" @@ -426,15 +447,17 @@ cmd_stop() { cmd_all() { local job_name="${1:-$DEFAULT_JOB}" + local model_name + model_name=$(job_to_model_name "$job_name") step "Full deployment pipeline" - info "Job: $job_name" + info "Job: $job_name (MODEL_NAME=$model_name)" echo "" cmd_build cmd_push cmd_deploy cmd_start_server - cmd_start_clients + cmd_start_clients "$model_name" info "Waiting 15s for clients to register with server..." sleep 15 @@ -484,7 +507,7 @@ case "$COMMAND" in push) cmd_push ;; deploy) cmd_deploy ;; start-server) cmd_start_server ;; - start-clients) cmd_start_clients ;; + start-clients) cmd_start_clients "${1:-}" ;; submit) cmd_submit "${1:-}" ;; status) cmd_status ;; logs) cmd_logs "${1:-}" ;; diff --git a/docker_config/Dockerfile_ODELIA b/docker_config/Dockerfile_ODELIA index 26cef726..24d753d1 100644 --- a/docker_config/Dockerfile_ODELIA +++ b/docker_config/Dockerfile_ODELIA @@ -277,6 +277,14 @@ COPY ./torch_home_cache /torch_home # Copy the source code for local training and deploying to the swarm COPY ./MediSwarm /MediSwarm +# Replace symlinks with actual copies so NVFlare's os.walk()-based job signing +# and zip utilities can traverse all directories (os.walk does not follow symlinks) +RUN find /MediSwarm -type l | while read link; do \ + target=$(readlink -f "$link") && \ + rm "$link" && \ + if [ -d "$target" ]; then cp -r "$target" "$link"; \ + elif [ -f "$target" ]; then cp "$target" "$link"; fi; \ + done RUN mkdir -p /fl_admin/transfer && ln -s /MediSwarm /fl_admin/transfer/MediSwarm # allow creating home directory for local user inside container if needed diff --git a/docker_config/Dockerfile_STAMP b/docker_config/Dockerfile_STAMP index ea676aff..9cbb8a3d 100644 --- a/docker_config/Dockerfile_STAMP +++ b/docker_config/Dockerfile_STAMP @@ -36,6 +36,8 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ git \ libgl1-mesa-glx \ libglib2.0-0 \ + zip \ + unzip \ && rm -rf /var/lib/apt/lists/* # --------------------------------------------------------------------------- diff --git a/scripts/ci/runIntegrationTests.sh b/scripts/ci/runIntegrationTests.sh index 102cbd1c..8785164f 100755 --- a/scripts/ci/runIntegrationTests.sh +++ b/scripts/ci/runIntegrationTests.sh @@ -527,15 +527,33 @@ verify_wrong_certificates_are_rejected () { run_dummy_training_in_swarm () { - echo "[Run] Dummy training in swarm (result will be checked after 2 minutes) ..." + echo "[Run] Dummy training in swarm (polling for completion, up to 5 minutes) ..." cd "$PROJECT_DIR"/prod_00 cd admin@test.odelia/startup expect -f "$CWD"/tests/integration_tests/_submitDummyTraining.exp docker kill odelia_swarm_admin_$CONTAINER_VERSION_SUFFIX - sleep 120 cd "$CWD" + # Poll for completion instead of a fixed sleep. The server log will + # contain "Server runner finished." once all rounds are done. We check + # every 10 seconds for up to 5 minutes (30 iterations). + local server_log="$PROJECT_DIR/prod_00/localhost/startup/nohup.out" + local max_attempts=30 + local attempt=0 + echo " Waiting for swarm training to finish (checking every 10s, max ${max_attempts}0s) ..." + while [ $attempt -lt $max_attempts ]; do + if [ -f "$server_log" ] && grep -q 'Server runner finished\.' "$server_log" 2>/dev/null; then + echo " ✅ Server runner finished detected after $((attempt * 10))s" + break + fi + attempt=$((attempt + 1)) + sleep 10 + done + if [ $attempt -eq $max_attempts ]; then + echo " ⚠️ Timed out after ${max_attempts}0s waiting for swarm completion — proceeding to assertions" + fi + # check for expected output in server log (clients joined, job ID assigned, 5 rounds, start of round logged, finished training logged) cd "$PROJECT_DIR"/prod_00/localhost/startup CONSOLE_OUTPUT=nohup.out @@ -634,15 +652,33 @@ run_3dcnn_local_training () { run_3dcnn_training_in_swarm () { - echo "[Run] 3DCNN training in swarm (result will be checked after 60 minutes) ..." + echo "[Run] 3DCNN training in swarm (polling for completion, up to 60 minutes) ..." cd "$PROJECT_DIR"/prod_00 cd admin@test.odelia/startup expect -f "$CWD"/tests/integration_tests/_submit3DCNNTraining.exp docker kill odelia_swarm_admin_$CONTAINER_VERSION_SUFFIX - sleep 3600 cd "$CWD" + # Poll for completion instead of a fixed sleep. The server log will + # contain "Server runner finished." once all rounds are done. We check + # every 30 seconds for up to 60 minutes (120 iterations). + local server_log="$PROJECT_DIR/prod_00/localhost/startup/nohup.out" + local max_attempts=120 + local attempt=0 + echo " Waiting for 3DCNN swarm training to finish (checking every 30s, max 60min) ..." + while [ $attempt -lt $max_attempts ]; do + if [ -f "$server_log" ] && grep -q 'Server runner finished\.' "$server_log" 2>/dev/null; then + echo " ✅ Server runner finished detected after $((attempt * 30))s" + break + fi + attempt=$((attempt + 1)) + sleep 30 + done + if [ $attempt -eq $max_attempts ]; then + echo " ⚠️ Timed out after 60min waiting for 3DCNN swarm completion — proceeding to assertions" + fi + # check for expected output in server log (clients joined, job ID assigned, 20 rounds) cd "$PROJECT_DIR"/prod_00/localhost/startup CONSOLE_OUTPUT=nohup.out