Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/pr-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ on:
permissions:
contents: read

# Only one validation run at a time on the self-hosted runner to prevent
# resource contention (GPU, Docker, disk) that causes flaky test failures.
concurrency:
group: pr-validation-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
validate-swarm:
runs-on: self-hosted
Expand All @@ -36,6 +42,16 @@ jobs:
docker run --rm -v "$GITHUB_WORKSPACE":/ws alpine sh -c \
'find /ws -mindepth 1 -maxdepth 1 -not -name ".git" -exec chown -R '"$(id -u):$(id -g)"' {} + 2>/dev/null || true'

- name: Reclaim disk space (prune stale Docker resources)
run: |
echo "Disk usage before cleanup:"
df -h / | tail -1
# Remove stopped containers, dangling images, and build cache older
# than 24 h to prevent "no space left on device" failures.
docker system prune -f --filter "until=24h" || true
echo "Disk usage after cleanup:"
df -h / | tail -1

- name: Checkout repository (with submodules)
uses: actions/checkout@v4
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,11 @@ def map_key(unet_key):
print("No matching encoder weights found.")
return self

def create_model(num_classes: int = 3, n_input_channels = 1, spatial_dims=3, pretrained_path=None) -> BasicClassifier:
model = ResidualEncoderClsLightning(in_ch=n_input_channels, out_ch=num_classes, spatial_dims=spatial_dims)
def create_model(num_classes: int = 3, n_input_channels = 1, spatial_dims=3, pretrained_path=None, loss_kwargs=None) -> BasicClassifier:
model = ResidualEncoderClsLightning(
in_ch=n_input_channels, out_ch=num_classes, spatial_dims=spatial_dims,
loss_kwargs=loss_kwargs if loss_kwargs is not None else {},
)
if pretrained_path:
model.load_pretrained_unet_encoder(pretrained_path, verbose=True)
return model
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,10 @@ def forward(self, x):
return cls_output # seg_output, cls_output, hidden_states


def create_model(img_size: int, num_classes: int = 3, n_input_channels = 1, spatial_dims=3) -> BasicClassifier:
def create_model(img_size: int, num_classes: int = 3, n_input_channels = 1, spatial_dims=3, loss_kwargs=None) -> BasicClassifier:
model = SwinUNETRMultiTask(img_size=img_size, in_channels=n_input_channels, out_seg_channels=2, out_cls_classes=num_classes, spatial_dims=spatial_dims)
wrapped_model = ModelWrapper(backbone=model, in_ch=n_input_channels, num_classes=num_classes)
wrapped_model = ModelWrapper(
backbone=model, in_ch=n_input_channels, num_classes=num_classes,
loss_kwargs=loss_kwargs if loss_kwargs is not None else {},
)
return wrapped_model
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,17 @@ def forward(self, volume: torch.Tensor):
return out #A.squeeze(-1) # logits, attention weights


def create_model(model_type = "swin", n_input_channels: int = 3, num_classes: int = 3) -> BasicClassifier:
def create_model(model_type = "swin", n_input_channels: int = 3, num_classes: int = 3, loss_kwargs=None) -> BasicClassifier:

#config = pd.read_csv(config_path, skip_blank_lines=True, na_values=['NaN']).iloc[0]
if model_type =="swin_cross":
model = CrossModalAttentionABMIL_Swin(num_classes=num_classes)
else:
model = ABMIL_Swin(num_classes=num_classes)

wrapped_model = ModelWrapper(backbone=model, in_ch=n_input_channels, num_classes=num_classes)
wrapped_model = ModelWrapper(
backbone=model, in_ch=n_input_channels, num_classes=num_classes,
loss_kwargs=loss_kwargs if loss_kwargs is not None else {},
)

return wrapped_model
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.net(x)


def create_model(model_name: str, num_classes: int = 3, n_input_channels = 1, spatial_dims = 3, norm: str = "batch") -> nn.Module:
def create_model(model_name: str, num_classes: int = 3, n_input_channels = 1, spatial_dims = 3, norm: str = "batch", loss_kwargs=None) -> nn.Module:
model = Resnet(model_name=model_name, num_classes=num_classes, norm=norm)
wrapped_model = ModelWrapper(backbone=model, in_ch=n_input_channels, num_classes=num_classes)
wrapped_model = ModelWrapper(
backbone=model, in_ch=n_input_channels, num_classes=num_classes,
loss_kwargs=loss_kwargs if loss_kwargs is not None else {},
)
return wrapped_model
21 changes: 20 additions & 1 deletion application/jobs/_shared/custom/models/models_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,30 @@ def create_model(logger=None, model_name: str = None, num_classes: int = 3,

if "pretrained_path" in persistor_args:
rel_path = persistor_args["pretrained_path"].rstrip(".")
persistor_args["pretrained_path"] = os.path.join(
# Primary: look relative to the model's challenge directory
primary_path = os.path.join(
base_dir, "challenge", team_name, rel_path
)
# Fallback: Docker image stores weights at /MediSwarm/pretrained_weights/
# to avoid bloating NVFlare job transfers (see _cacheAndCopyPretrainedModelWeights.sh)
fallback_path = os.path.join("/MediSwarm/pretrained_weights", rel_path)
if os.path.isfile(primary_path):
persistor_args["pretrained_path"] = primary_path
elif os.path.isfile(fallback_path):
persistor_args["pretrained_path"] = fallback_path
logger.info(f'Using fallback pretrained weights from Docker image: {fallback_path}')
else:
persistor_args["pretrained_path"] = primary_path # let it fail with original path
logger.info(f'__________ model path is : {persistor_args["pretrained_path"]}')

# Forward loss_kwargs (e.g. class weights) so the challenge model
# registers the same buffers as the server-side persistor model.
# Without this, _class_weight may appear as a buffer on one side
# but not the other, causing state_dict mismatches during
# federated aggregation.
if loss_kwargs:
persistor_args["loss_kwargs"] = loss_kwargs

factory_fn = getattr(module, func_name)
logger.info(f"Now access {persistor_args} from module {module}")
return factory_fn(**persistor_args)
Expand Down
31 changes: 27 additions & 4 deletions deploy_and_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,16 @@ cmd_start_server() {
}

cmd_start_clients() {
local model_name="${1:-}"
step "Starting NVFlare clients on remote sites"
check_dependencies

local model_flag=""
if [[ -n "$model_name" ]]; then
model_flag="--model_name '$model_name'"
info "Using MODEL_NAME=$model_name"
fi

for site in "${SITES[@]}"; do
local site_name host deploy_dir datadir scratchdir gpu
site_name=$(site_var "$site" SITE_NAME)
Expand All @@ -264,14 +271,28 @@ cmd_start_clients() {
export SITE_NAME='$site_name' && \
export DATADIR='$datadir' && \
export SCRATCHDIR='$scratchdir' && \
./docker.sh --data_dir '$datadir' --scratch_dir '$scratchdir' --GPU '$gpu' --start_client"
./docker.sh --data_dir '$datadir' --scratch_dir '$scratchdir' --GPU '$gpu' $model_flag --start_client"

ok " Client started on $site_name"
done

ok "All clients started"
}

# Map job directory names to the MODEL_NAME env var expected by the model factory.
job_to_model_name() {
local job="$1"
case "$job" in
challenge_1DivideAndConquer) echo "1DivideAndConquer" ;;
challenge_2BCN_AIM) echo "2BCN_AIM" ;;
challenge_3agaldran) echo "3agaldran" ;;
challenge_4abmil) echo "4LME_ABMIL" ;;
challenge_5pimed) echo "5Pimed" ;;
ODELIA_ternary_classification) echo "MST" ;;
*) echo "MST" ;;
esac
}

cmd_submit() {
local job_name="${1:-$DEFAULT_JOB}"
step "Submitting job: $job_name"
Expand Down Expand Up @@ -426,15 +447,17 @@ cmd_stop() {

cmd_all() {
local job_name="${1:-$DEFAULT_JOB}"
local model_name
model_name=$(job_to_model_name "$job_name")
step "Full deployment pipeline"
info "Job: $job_name"
info "Job: $job_name (MODEL_NAME=$model_name)"
echo ""

cmd_build
cmd_push
cmd_deploy
cmd_start_server
cmd_start_clients
cmd_start_clients "$model_name"

info "Waiting 15s for clients to register with server..."
sleep 15
Expand Down Expand Up @@ -484,7 +507,7 @@ case "$COMMAND" in
push) cmd_push ;;
deploy) cmd_deploy ;;
start-server) cmd_start_server ;;
start-clients) cmd_start_clients ;;
start-clients) cmd_start_clients "${1:-}" ;;
submit) cmd_submit "${1:-}" ;;
status) cmd_status ;;
logs) cmd_logs "${1:-}" ;;
Expand Down
8 changes: 8 additions & 0 deletions docker_config/Dockerfile_ODELIA
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@ COPY ./torch_home_cache /torch_home

# Copy the source code for local training and deploying to the swarm
COPY ./MediSwarm /MediSwarm
# Replace symlinks with actual copies so NVFlare's os.walk()-based job signing
# and zip utilities can traverse all directories (os.walk does not follow symlinks)
RUN find /MediSwarm -type l | while read link; do \
target=$(readlink -f "$link") && \
rm "$link" && \
if [ -d "$target" ]; then cp -r "$target" "$link"; \
elif [ -f "$target" ]; then cp "$target" "$link"; fi; \
done
RUN mkdir -p /fl_admin/transfer && ln -s /MediSwarm /fl_admin/transfer/MediSwarm

# allow creating home directory for local user inside container if needed
Expand Down
2 changes: 2 additions & 0 deletions docker_config/Dockerfile_STAMP
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
git \
libgl1-mesa-glx \
libglib2.0-0 \
zip \
unzip \
&& rm -rf /var/lib/apt/lists/*

# ---------------------------------------------------------------------------
Expand Down
44 changes: 40 additions & 4 deletions scripts/ci/runIntegrationTests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -527,15 +527,33 @@ verify_wrong_certificates_are_rejected () {


run_dummy_training_in_swarm () {
echo "[Run] Dummy training in swarm (result will be checked after 2 minutes) ..."
echo "[Run] Dummy training in swarm (polling for completion, up to 5 minutes) ..."

cd "$PROJECT_DIR"/prod_00
cd admin@test.odelia/startup
expect -f "$CWD"/tests/integration_tests/_submitDummyTraining.exp
docker kill odelia_swarm_admin_$CONTAINER_VERSION_SUFFIX
sleep 120
cd "$CWD"

# Poll for completion instead of a fixed sleep. The server log will
# contain "Server runner finished." once all rounds are done. We check
# every 10 seconds for up to 5 minutes (30 iterations).
local server_log="$PROJECT_DIR/prod_00/localhost/startup/nohup.out"
local max_attempts=30
local attempt=0
echo " Waiting for swarm training to finish (checking every 10s, max ${max_attempts}0s) ..."
while [ $attempt -lt $max_attempts ]; do
if [ -f "$server_log" ] && grep -q 'Server runner finished\.' "$server_log" 2>/dev/null; then
echo " ✅ Server runner finished detected after $((attempt * 10))s"
break
fi
attempt=$((attempt + 1))
sleep 10
done
if [ $attempt -eq $max_attempts ]; then
echo " ⚠️ Timed out after ${max_attempts}0s waiting for swarm completion — proceeding to assertions"
fi

# check for expected output in server log (clients joined, job ID assigned, 5 rounds, start of round logged, finished training logged)
cd "$PROJECT_DIR"/prod_00/localhost/startup
CONSOLE_OUTPUT=nohup.out
Expand Down Expand Up @@ -634,15 +652,33 @@ run_3dcnn_local_training () {


run_3dcnn_training_in_swarm () {
echo "[Run] 3DCNN training in swarm (result will be checked after 60 minutes) ..."
echo "[Run] 3DCNN training in swarm (polling for completion, up to 60 minutes) ..."

cd "$PROJECT_DIR"/prod_00
cd admin@test.odelia/startup
expect -f "$CWD"/tests/integration_tests/_submit3DCNNTraining.exp
docker kill odelia_swarm_admin_$CONTAINER_VERSION_SUFFIX
sleep 3600
cd "$CWD"

# Poll for completion instead of a fixed sleep. The server log will
# contain "Server runner finished." once all rounds are done. We check
# every 30 seconds for up to 60 minutes (120 iterations).
local server_log="$PROJECT_DIR/prod_00/localhost/startup/nohup.out"
local max_attempts=120
local attempt=0
echo " Waiting for 3DCNN swarm training to finish (checking every 30s, max 60min) ..."
while [ $attempt -lt $max_attempts ]; do
if [ -f "$server_log" ] && grep -q 'Server runner finished\.' "$server_log" 2>/dev/null; then
echo " ✅ Server runner finished detected after $((attempt * 30))s"
break
fi
attempt=$((attempt + 1))
sleep 30
done
if [ $attempt -eq $max_attempts ]; then
echo " ⚠️ Timed out after 60min waiting for 3DCNN swarm completion — proceeding to assertions"
fi

# check for expected output in server log (clients joined, job ID assigned, 20 rounds)
cd "$PROJECT_DIR"/prod_00/localhost/startup
CONSOLE_OUTPUT=nohup.out
Expand Down
Loading