From 46fd3416a095deb8ae97817b1d814e8dcca71163 Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Sat, 18 Oct 2025 16:06:21 +0000 Subject: [PATCH 01/31] Add Kubernetes backend support in segmentation algorithms - Introduced a new backend option for Kubernetes in the Backends enum. - Updated segmentation algorithms to accept optional keyword arguments for Kubernetes backend, enhancing flexibility for users. --- brats/constants.py | 3 +++ brats/core/segmentation_algorithms.py | 12 ++++++++++++ 2 files changed, 15 insertions(+) diff --git a/brats/constants.py b/brats/constants.py index 1570b8f..c88b739 100644 --- a/brats/constants.py +++ b/brats/constants.py @@ -13,6 +13,9 @@ class Backends(str, Enum): SINGULARITY = "singularity" """Run the algorithms using Singularity containers.""" + KUBERNETES = "kubernetes" + """Run the algorithms using Kubernetes Jobs.""" + class Task(str, Enum): """Available tasks.""" diff --git a/brats/core/segmentation_algorithms.py b/brats/core/segmentation_algorithms.py index dd07e0e..926d799 100644 --- a/brats/core/segmentation_algorithms.py +++ b/brats/core/segmentation_algorithms.py @@ -168,6 +168,7 @@ def infer_single( output_file: Path | str, log_file: Optional[Path | str] = None, backend: Optional[Backends] = Backends.DOCKER, + kubernetes_kwargs: Optional[Dict] = None, ) -> None: """Perform segmentation on a single subject with the provided images and save the result to the output file. @@ -179,6 +180,7 @@ def infer_single( output_file (Path | str): Path to save the segmentation log_file (Path | str, optional): Save logs to this file backend (Backends, optional): Backend to use for inference. Defaults to Backends.DOCKER. + kubernetes_kwargs (Optional[Dict], optional): Optional keyword arguments for Kubernetes Backend. Defaults to None. """ self._infer_single( @@ -186,6 +188,7 @@ def infer_single( output_file=output_file, log_file=log_file, backend=backend, + kubernetes_kwargs=kubernetes_kwargs, ) def infer_batch( @@ -194,6 +197,7 @@ def infer_batch( output_folder: Path | str, log_file: Path | str | None = None, backend: Optional[Backends] = Backends.DOCKER, + kubernetes_kwargs: Optional[Dict] = None, ) -> None: """Perform segmentation on a batch of subjects with the provided images and save the results to the output folder. \n Requires the following structure:\n @@ -213,6 +217,7 @@ def infer_batch( output_folder (Path | str): Output folder to save the segmentations log_file (Path | str, optional): Save logs to this file backend (Backends, optional): Backend to use for inference. Defaults to Backends.DOCKER. + kubernetes_kwargs (Optional[Dict], optional): Optional keyword arguments for Kubernetes Backend. Defaults to None. """ return self._infer_batch( @@ -220,6 +225,7 @@ def infer_batch( output_folder=output_folder, log_file=log_file, backend=backend, + kubernetes_kwargs=kubernetes_kwargs, ) @@ -439,6 +445,7 @@ def infer_single( output_file: Path | str, log_file: Optional[Path | str] = None, backend: Optional[Backends] = Backends.DOCKER, + kubernetes_kwargs: Optional[Dict] = None, ) -> None: """ Perform segmentation on a single subject with the provided T1C image and save the result to the output file. @@ -448,6 +455,7 @@ def infer_single( output_file (Path | str): Output file to save the segmentation. log_file (Optional[Path | str], optional): Save logs to this file. Defaults to None. backend (Backends, optional): Backend to use for inference. Defaults to Backends.DOCKER. + kubernetes_kwargs (Optional[Dict], optional): Optional keyword arguments for Kubernetes Backend. Defaults to None. """ self._infer_single( @@ -455,6 +463,7 @@ def infer_single( output_file=output_file, log_file=log_file, backend=backend, + kubernetes_kwargs=kubernetes_kwargs, ) def infer_batch( @@ -463,6 +472,7 @@ def infer_batch( output_folder: Path | str, log_file: Path | str | None = None, backend: Optional[Backends] = Backends.DOCKER, + kubernetes_kwargs: Optional[Dict] = None, ) -> None: """ Perform segmentation on a batch of subjects with the provided T1C images and save the results to the output folder. \n @@ -482,6 +492,7 @@ def infer_batch( output_folder (Path | str): Output folder to save the segmentations log_file (Path | str, optional): Save logs to this file backend (Backends, optional): Backend to use for inference. Defaults to Backends.DOCKER. + kubernetes_kwargs (Optional[Dict], optional): Optional keyword arguments for Kubernetes Backend. Defaults to None. """ return self._infer_batch( @@ -489,4 +500,5 @@ def infer_batch( output_folder=output_folder, log_file=log_file, backend=backend, + kubernetes_kwargs=kubernetes_kwargs, ) From e9eece3dd250afcc449fdbd55cdaab129a5324b8 Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Sat, 18 Oct 2025 18:11:21 +0000 Subject: [PATCH 02/31] Update README to include Kubernetes support details for remote algorithm execution - Added a new section outlining how to configure and use the Kubernetes backend for running algorithms. - Provided examples for setting the KUBECONFIG environment variable and specifying backend options in the inference method. --- README.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/README.md b/README.md index d8cd4dc..f6fdd07 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,39 @@ segmenter.infer_single( ) ``` +## Kubernetes Support +BraTS orchestrator also supports Kubernetes to run the algorithms remotely, as an alternative to local execution with Docker or Singularity. +To use Kubernetes for execution, ensure that the `KUBECONFIG` environment variable is set to the location of your kubeconfig file, as is standard when interacting with a Kubernetes cluster. +```bash +export KUBECONFIG=/path/to/kubeconfig +``` +Then, specify the backend to use as `kubernetes` (or the corresponding enum value `Backends.KUBERNETES`) when running the inference: +```python +from brats.constants import Backends +segmenter.infer_single( + t1c="path/to/t1c.nii.gz", + output_file="path/to/segmentation.nii.gz", + backend=Backends.KUBERNETES +) +``` +By default, as shown above, the algorithm runs in the default Kubernetes namespace. It uses the default StorageClass and automatically creates a 1Gi PersistentVolumeClaim (PVC) to manage input and output data. If needed, you can customize settings such as the namespace, PVC name, storage size, storage class, job name, and mount path by providing related keyword arguments to the `infer_single` method. The `mount_path` parameter determines where the PVC will be mounted inside the Pod. +When using Kubernetes, the algorithm is executed inside a Kubernetes Job. Input data is first uploaded to a PersistentVolume, which is mounted into the Pod running the job. After the algorithm finishes running in the Pod, the output data is transferred back from the cluster to your local machine. +```python +segmenter.infer_single( + t1c="path/to/t1c.nii.gz", + output_file="path/to/segmentation.nii.gz", + backend=Backends.KUBERNETES, + kubernetes_kwargs={ + "namespace": "brats", + "pvc_name": "brats-iwydw55ej7qm-pvc", + "pvc_storage_size": "2Gi", + "pvc_storage_class": "brats-pvc-storage-class", + "job_name": "brats-oxh24nu4dhk9-job", + "mount_path": "/data", + } +) +``` + ## Available Algorithms and Usage > [!IMPORTANT] From a538d420b99db0eae4f851969dbf100ffa0bd410 Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Sat, 18 Oct 2025 18:11:29 +0000 Subject: [PATCH 03/31] Add Kubernetes support in BraTSAlgorithm for inference runs - Integrated Kubernetes backend option into the BraTSAlgorithm class. - Updated methods to accept optional keyword arguments for Kubernetes, allowing for enhanced configuration during inference. - Added error handling to ensure Kubernetes kwargs are only used with the Kubernetes backend. --- brats/core/brats_algorithm.py | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/brats/core/brats_algorithm.py b/brats/core/brats_algorithm.py index bfef1ac..1b1442b 100644 --- a/brats/core/brats_algorithm.py +++ b/brats/core/brats_algorithm.py @@ -9,6 +9,7 @@ from brats.core.docker import run_container as run_docker_container from brats.core.singularity import run_container as run_singularity_container +from brats.core.kubernetes import run_job as run_kubernetes_job from brats.utils.algorithm_config import load_algorithms from brats.constants import OUTPUT_NAME_SCHEMA, Algorithms, Task, Backends from brats.utils.data_handling import InferenceSetup @@ -169,6 +170,7 @@ def _get_backend_runner(self, backend: Backends | str) -> Callable: backend_dispatch = { Backends.DOCKER: run_docker_container, Backends.SINGULARITY: run_singularity_container, + Backends.KUBERNETES: run_kubernetes_job, } runner = backend_dispatch.get(backend) return runner @@ -179,6 +181,7 @@ def _infer_single( output_file: Path | str, log_file: Optional[Path | str] = None, backend: Backends | str = Backends.DOCKER, + kubernetes_kwargs: Optional[Dict] = None, ) -> None: """ Perform a single inference run with the provided inputs and save the output in the specified file. @@ -188,6 +191,7 @@ def _infer_single( output_file (Path | str): File to save the output log_file (Optional[Path | str], optional): Log file with extra information. Defaults to None. backend (Backends | str, optional): Backend to use for inference. Defaults to Backends.DOCKER. + kubernetes_kwargs (Optional[Dict], optional): Optional keyword arguments for Kubernetes Backend. Defaults to None. """ with InferenceSetup(log_file=log_file) as (tmp_data_folder, tmp_output_folder): logger.info(f"Performing single inference") @@ -203,13 +207,22 @@ def _infer_single( ) runner = self._get_backend_runner(backend) - runner( + runner_kwargs = dict( algorithm=self.algorithm, data_path=tmp_data_folder, output_path=tmp_output_folder, cuda_devices=self.cuda_devices, force_cpu=self.force_cpu, ) + if kubernetes_kwargs is not None: + logger.debug(f"Adding Kubernetes kwargs: {kubernetes_kwargs}") + if runner is not run_kubernetes_job: + raise ValueError( + "Kubernetes kwargs can only be used with the Kubernetes backend (run_kubernetes_job)." + ) + for key, value in kubernetes_kwargs.items(): + runner_kwargs[key] = value + runner(**runner_kwargs) self._process_single_output( tmp_output_folder=tmp_output_folder, subject_id=subject_id, @@ -223,6 +236,7 @@ def _infer_batch( output_folder: Path | str, log_file: Optional[Path | str] = None, backend: Backends = Backends.DOCKER, + kubernetes_kwargs: Optional[Dict] = None, ): """Perform a batch inference run with the provided inputs and save the outputs in the specified folder. @@ -231,6 +245,7 @@ def _infer_batch( output_folder (Path | str): Folder to save the outputs log_file (Optional[Path | str], optional): Log file with extra information. Defaults to None. backend (Backends, optional): Backend to use for inference. Defaults to Backends.DOCKER. + kubernetes_kwargs (Optional[Dict], optional): Optional keyword arguments for Kubernetes Backend. Defaults to None. """ with InferenceSetup(log_file=log_file) as (tmp_data_folder, tmp_output_folder): @@ -250,7 +265,7 @@ def _infer_batch( if runner is None: raise ValueError(f"Unsupported backend: {backend}") # run inference in container - runner( + runner_kwargs = dict( algorithm=self.algorithm, data_path=tmp_data_folder, output_path=tmp_output_folder, @@ -258,6 +273,15 @@ def _infer_batch( force_cpu=self.force_cpu, internal_external_name_map=internal_external_name_map, ) + if kubernetes_kwargs is not None: + logger.debug(f"Adding Kubernetes kwargs: {kubernetes_kwargs}") + if runner is not run_kubernetes_job: + raise ValueError( + "Kubernetes kwargs can only be used with the Kubernetes backend (run_kubernetes_job)." + ) + for key, value in kubernetes_kwargs.items(): + runner_kwargs[key] = value + runner(**runner_kwargs) self._process_batch_output( tmp_output_folder=tmp_output_folder, From cead76ba13ae6009321704d1d3bbf9466bfca560 Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Sat, 18 Oct 2025 18:13:30 +0000 Subject: [PATCH 04/31] Add Kubernetes dependency in pyproject.toml - Updated pyproject.toml to include the Kubernetes package with a minimum version of 34.1.0, enabling support for Kubernetes features in the project. --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index d0ff35f..0bcf08b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ exclude = [ [tool.poetry.dependencies] spython = ">=0.3.14" +kubernetes = ">=34.1.0" python = ">=3.8" docker = ">=7.0.0" rich = ">=13.0.0" From 39ccaffc5b07737a145058611f8e22f1b2b7212e Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Sun, 19 Oct 2025 07:14:14 +0000 Subject: [PATCH 05/31] Update README to reflect parameter name change for PVC mount path in Kubernetes configuration - Changed the parameter name from `mount_path` to `data_mount_path` in the documentation to accurately describe its function in the `infer_single` method for Kubernetes backend usage. --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f6fdd07..223f7b6 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ segmenter.infer_single( backend=Backends.KUBERNETES ) ``` -By default, as shown above, the algorithm runs in the default Kubernetes namespace. It uses the default StorageClass and automatically creates a 1Gi PersistentVolumeClaim (PVC) to manage input and output data. If needed, you can customize settings such as the namespace, PVC name, storage size, storage class, job name, and mount path by providing related keyword arguments to the `infer_single` method. The `mount_path` parameter determines where the PVC will be mounted inside the Pod. +By default, as shown above, the algorithm runs in the default Kubernetes namespace. It uses the default StorageClass and automatically creates a 1Gi PersistentVolumeClaim (PVC) to manage input and output data. If needed, you can customize settings such as the namespace, PVC name, storage size, storage class, job name, and mount path by providing related keyword arguments to the `infer_single` method. The `data_mount_path` parameter determines where the PVC will be mounted inside the Pod. When using Kubernetes, the algorithm is executed inside a Kubernetes Job. Input data is first uploaded to a PersistentVolume, which is mounted into the Pod running the job. After the algorithm finishes running in the Pod, the output data is transferred back from the cluster to your local machine. ```python segmenter.infer_single( @@ -93,7 +93,7 @@ segmenter.infer_single( "pvc_storage_size": "2Gi", "pvc_storage_class": "brats-pvc-storage-class", "job_name": "brats-oxh24nu4dhk9-job", - "mount_path": "/data", + "data_mount_path": "/data", } ) ``` From 573842eabb7c8455ca55b076a9d09467bb86db1e Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Sun, 19 Oct 2025 07:14:39 +0000 Subject: [PATCH 06/31] Add Kubernetes job management functionality in kubernetes.py - Implemented functions for creating and managing Kubernetes jobs, including PVC creation, job execution, and output handling. - Added methods for downloading additional files from Zenodo and checking file presence in pods. - Enhanced logging for better traceability during job execution and output verification. - Integrated command execution within pods to facilitate file uploads and downloads, ensuring smooth operation of the Kubernetes backend for algorithm inference. --- brats/core/kubernetes.py | 890 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 890 insertions(+) create mode 100644 brats/core/kubernetes.py diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py new file mode 100644 index 0000000..c8b647d --- /dev/null +++ b/brats/core/kubernetes.py @@ -0,0 +1,890 @@ +from __future__ import annotations + +import time +from pathlib import Path +from typing import Dict, Optional +import random +import string +import base64 +from loguru import logger +from kubernetes import client, config +from kubernetes.config import load_kube_config +from kubernetes.stream import stream +from brats.constants import PARAMETERS_DIR +from brats.utils.algorithm_config import AlgorithmData +from brats.core.docker import ( + _log_algorithm_info, + _get_container_user, + _handle_device_requests, + _get_parameters_arg, + _sanity_check_output, +) +import io +import tarfile +from typing import List +from brats.utils.zenodo import ( + _get_zenodo_metadata_and_archive_url, + ZenodoException, + get_dummy_path, +) + + +def _build_command_args( + algorithm: AlgorithmData, + additional_files_path: str, + data_path: str, + output_path: str, + mount_path: str = "/data", +) -> List[str]: + """Build the command arguments for the Kubernetes job. + + Args: + algorithm (AlgorithmData): The algorithm data + additional_files_path (Path): The path to the additional files + data_path (Path): The path to the input data + output_path (Path): The path to save the output + mount_path (str): The path to mount the PVC to. Defaults to "/data". + Returns: + command_args: The command arguments + """ + command_args = f"--data_path={str(data_path)} --output_path={str(output_path)}" + if algorithm.additional_files is not None: + for i, param in enumerate(algorithm.additional_files.param_name): + additional_files_arg = f"--{param}={str(additional_files_path)}" + if algorithm.additional_files.param_path: + additional_files_arg += f"/{algorithm.additional_files.param_path[i]}" + command_args += f" {additional_files_arg}" + + params_arg = _get_parameters_arg(algorithm=algorithm) + if params_arg: + command_args += params_arg.replace( + "/mlcube_io3", str(Path(mount_path).joinpath("parameters")) + ) + + return command_args + + +def _observe_job_output(pod_name: str, namespace: str) -> str: + """Observe the output of a running job. + Args: + pod_name (str): The name of the pod to observe the output of + namespace (str): The namespace of the pod to observe the output of + """ + v1 = client.CoreV1Api() + + for _ in range(300): # up to 10 minutes + pod = v1.read_namespaced_pod(name=pod_name, namespace=namespace) + pod_phase = pod.status.phase + statuses = pod.status.container_statuses + is_running = False + if statuses: + for status in statuses: + if ( + status.name == "job-container" + and status.state + and status.state.running + ): + is_running = True + break + if pod_phase == "Running" and is_running: + break + elif pod_phase in ["Failed", "Succeeded"]: + logger.warning(f"Pod '{pod_name}' entered terminal phase: {pod_phase}") + break + time.sleep(2) + else: + logger.error( + f"Timed out waiting for main container in pod '{pod_name}' to be running" + ) + return "" + try: + log = v1.read_namespaced_pod_log( + name=pod_name, + namespace=namespace, + container="job-container", + follow=True, + _preload_content=True, + ) + return log + except Exception as e: + logger.error( + f"Failed to fetch logs from pod '{pod_name}' in namespace '{namespace}': {e}" + ) + return "" + + +def _execute_command_in_pod( + pod_name: str, + namespace: str, + command: List[str], + container: str, + stderr: bool = True, + stdin: bool = False, + stdout: bool = True, + tty: bool = False, + _preload_content: bool = True, +) -> str: + """Execute a command in a pod. + Args: + pod_name (str): The name of the pod to execute the command in + namespace (str): The namespace of the pod to execute the command in + command (List[str]): The command to execute + container (str): The container to execute the command in + stderr (bool): Whether to capture stderr. Defaults to True. + stdin (bool): Whether to capture stdin. Defaults to False. + stdout (bool): Whether to capture stdout. Defaults to True. + tty (bool): Whether to use a TTY. Defaults to False. + _preload_content (bool): Whether to preload the content. Defaults to True. + Returns: + str: The output of the command + """ + v1 = client.CoreV1Api() + output = stream( + v1.connect_get_namespaced_pod_exec, + pod_name, + namespace, + command=command, + container=container, + stderr=stderr, + stdin=stdin, + stdout=stdout, + tty=tty, + _preload_content=_preload_content, + ) + logger.debug( + f"Command '{command}' executed successfully in pod '{pod_name}' in namespace '{namespace}'.\nOutput:\n{output}" + ) + return output + + +def _download_additional_files( + algorithm: AlgorithmData, pod_name: str, namespace: str, mount_path: str = "/data" +) -> Path: + """Download additional files from Zenodo. + Args: + algorithm (AlgorithmData): The algorithm data + pod_name (str): The name of the pod to download the additional files to + namespace (str): The namespace of the pod to download the additional files to + mount_path (str): The path to mount the PVC to. Defaults to "/data". + """ + if algorithm.additional_files is not None: + zenodo_response = _get_zenodo_metadata_and_archive_url( + record_id=algorithm.additional_files.record_id + ) + if not zenodo_response: + msg = "Additional files not found locally and Zenodo could not be reached. Exiting..." + logger.error(msg) + raise ZenodoException(msg) + + zenodo_metadata, archive_url = zenodo_response + record_folder = str( + Path(mount_path).joinpath( + f"{algorithm.additional_files.record_id}_v{zenodo_metadata['version']}" + ) + ) + + commands = [ + "sh", + "-c", + ( + f'if [ ! -d {record_folder} ] || [ -z "$(ls -A {record_folder})" ]; then ' + f" mkdir -p {record_folder} && " + f" wget -O {record_folder}/archive.zip {archive_url} && " + f" apk add --no-cache unzip && " + f" unzip {record_folder}/archive.zip -d {record_folder} && " + f" rm {record_folder}/archive.zip && " + f" for f in {record_folder}/*.zip; do " + f' if [ -f "$f" ]; then unzip "$f" -d {record_folder} && rm "$f"; fi; ' + f" done " + f"else " + f" echo 'Additional files already present in {record_folder}, skipping download.'; " + f"fi" + ), + ] + logger.info(f"Downloading additional files to {record_folder}...") + output = _execute_command_in_pod( + pod_name=pod_name, + namespace=namespace, + command=commands, + container="init-container", + ) + logger.info( + f"Additional files downloaded successfully to pod '{pod_name}' in namespace '{namespace}'." + ) + logger.info(f"Contents of {record_folder}:\n{output}") + return Path(record_folder) + else: + return get_dummy_path() + + +def _check_files_in_pod( + pod_name: str, namespace: str, paths: List[Path], mount_path: str = "/data" +) -> None: + """Check if all the local files are present in the mounted path inside the pod. + Args: + pod_name (str): Name of the pod to check files in + namespace (str): The Kubernetes namespace to check files in + paths (List[Path]): List of local files to check + mount_path (str): The path to mount the PVC to. Defaults to "/data". + """ + logger.debug( + f"Checking files in pod '{pod_name}' in namespace '{namespace}' with mount path '{mount_path}'." + ) + for path in paths: + for file in path.glob("**/*"): + if file.is_file(): + commands = [ + "ls", + "-la", + str(Path(mount_path).joinpath("input", file.relative_to(path))), + ] + output = _execute_command_in_pod( + pod_name=pod_name, + namespace=namespace, + command=commands, + container="init-container", + ) + if "No such file or directory" in output: + logger.warning( + f"File '{file.relative_to(path)}' is not present in pod '{pod_name}' in namespace '{namespace}'. Uploading it now..." + ) + _upload_files_to_pod( + pod_name=pod_name, + namespace=namespace, + paths=[file], + mount_path=mount_path, + relative_to=path, + parent_dir="input", + ) + + +def _download_folder_from_pod( + pod_name, namespace, container, remote_paths, local_base_dir="." +): + v1 = client.CoreV1Api() + + for path in remote_paths: + folder_name = str(path) + + command = ["sh", "-c", f"tar cf - -C {folder_name} . | base64"] + resp = _execute_command_in_pod( + pod_name=pod_name, + namespace=namespace, + command=command, + container=container, + stderr=True, + stdin=False, + stdout=True, + _preload_content=False, + ) + + base64_chunks = [] + + while resp.is_open(): + resp.update(timeout=1) + if resp.peek_stdout(): + chunk = resp.read_stdout() + if isinstance(chunk, str): + chunk = chunk.encode("utf-8") + base64_chunks.append(chunk) + if resp.peek_stderr(): + err = resp.read_stderr() + if err: + print("STDERR:", err) + resp.close() + + full_base64 = b"".join(base64_chunks) + tar_data = base64.b64decode(full_base64) + + base_folder_name = Path(folder_name).name + print(f"base_folder_name: {local_base_dir}") + tarfile_path = local_base_dir / f"{base_folder_name}" + with open(tarfile_path, "wb") as tarfile_obj: + tarfile_obj.write(tar_data) + + with tarfile.open(tarfile_path, "r") as tar: + tar.extractall(path=local_base_dir) + + tarfile_path.unlink() + + +def _upload_files_to_pod( + pod_name: str, + namespace: str, + paths: List[Path], + mount_path: str = "/data", + relative_to: Path = None, + parent_dir: Path = None, +) -> None: + """Upload files to a pod in the specified namespace. + Args: + pod_name (str): Name of the pod to upload files to + namespace (str): The Kubernetes namespace to upload files to + paths (List[Path]): List of local files or directories to upload + mount_path (str): The path to mount the PVC to. Defaults to "/data". + relative_to (Path): The path to relativize the files to. Defaults to None. + parent_dir (Path): The parent directory of the files to upload. Defaults to None. + """ + + tar_stream = io.BytesIO() + with tarfile.open(fileobj=tar_stream, mode="w") as tar: + for path in paths: + if path.is_file(): + tar.add( + path, + arcname=( + Path(parent_dir).joinpath(path.relative_to(relative_to)) + if parent_dir + else path.relative_to(relative_to) + ), + ) + else: + for file in path.glob("**/*"): + if file.is_file(): + tar.add( + file, + arcname=( + Path(parent_dir).joinpath(file.relative_to(path)) + if parent_dir + else file.relative_to(path) + ), + ) + tar_stream.seek(0) + commands = ["tar", "xmf", "-", "-C", mount_path] + resp = _execute_command_in_pod( + pod_name=pod_name, + namespace=namespace, + command=commands, + container="init-container", + stdin=True, + _preload_content=False, + ) + + resp.write_stdin(tar_stream.read()) + resp.close() + logger.info( + f"File uploaded successfully to pod '{pod_name}' in namespace '{namespace}'." + ) + + +def _create_namespaced_pvc( + pvc_name: str, namespace: str, storage_size: str = "1Gi", storage_class: str = None +) -> None: + """Create a namespaced PersistentVolumeClaim (PVC) in the specified namespace. If the PVC already exists, it will be skipped. + + Args: + pvc_name (str): Name of the PVC to create + namespace (str): The Kubernetes namespace to create the PVC in + storage_size (str): The size of the storage to request + storage_class (str): The storage class to use for the PVC. If None, the default storage class will be used. + """ + core_v1_api = client.CoreV1Api() + pvc_list = core_v1_api.list_namespaced_persistent_volume_claim(namespace=namespace) + if len(pvc_list.items) > 0: + for pvc in pvc_list.items: + if pvc.metadata.name == pvc_name: + logger.debug( + f"PVC '{pvc_name}' already exists in namespace '{namespace}'. Skipping creation." + ) + return + + if storage_class is None: + pvc_spec = client.V1PersistentVolumeClaimSpec( + access_modes=["ReadWriteOnce"], + resources=client.V1ResourceRequirements(requests={"storage": storage_size}), + ) + else: + pvc_spec = client.V1PersistentVolumeClaimSpec( + access_modes=["ReadWriteOnce"], + resources=client.V1ResourceRequirements(requests={"storage": storage_size}), + storage_class_name=storage_class, + ) + + core_v1_api = client.CoreV1Api() + core_v1_api.create_namespaced_persistent_volume_claim( + namespace=namespace, + body=client.V1PersistentVolumeClaim( + metadata=client.V1ObjectMeta(name=pvc_name), spec=pvc_spec + ), + ) + + +def _create_finalizer_job( + job_name: str, namespace: str, pvc_name: str, mount_path: str = "/data" +) -> None: + """Create a finalizer job in the specified namespace. + Args: + job_name (str): Name of the Job to create + namespace (str): The Kubernetes namespace to create the Job in + pvc_name (str): Name of the PersistentVolumeClaim (PVC) to use for this Job + mount_path (str): The path to mount the PVC to. Defaults to "/data". + """ + batch_v1_api = client.BatchV1Api() + job_list = batch_v1_api.list_namespaced_job(namespace=namespace) + if len(job_list.items) > 0: + for job in job_list.items: + if job.metadata.name == job_name: + logger.warning( + f"Job '{job_name}' already exists in namespace '{namespace}'. Deleting it." + ) + batch_v1_api.delete_namespaced_job(name=job_name, namespace=namespace) + + batch_v1_api.create_namespaced_job( + namespace=namespace, + body=client.V1Job( + metadata=client.V1ObjectMeta(name=job_name), + spec=client.V1JobSpec( + template=client.V1PodTemplateSpec( + spec=client.V1PodSpec( + restart_policy="Never", + volumes=[ + client.V1Volume( + name=pvc_name, + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( + claim_name=pvc_name + ), + ) + ], + containers=[ + client.V1Container( + name="finalizer-container", + image="alpine:latest", + command=[ + "sh", + "-c", + "while [ ! -f /etc/content_verified ]; do sleep 1; done", + ], + volume_mounts=[ + client.V1VolumeMount( + name=pvc_name, mount_path=mount_path + ) + ], + ) + ], + ) + ) + ), + ), + ) + + core_v1_api = client.CoreV1Api() + label_selector = f"job-name={job_name}" + pod_name = None + for _ in range(60): + pod_list = core_v1_api.list_namespaced_pod( + namespace=namespace, label_selector=label_selector + ) + if pod_list.items: + # If more than one pod, pick the first (common for most K8s jobs) + # Pick the latest created pod (by creation timestamp) + latest_pod = max( + pod_list.items, key=lambda pod: pod.metadata.creation_timestamp + ) + pod_name = latest_pod.metadata.name + break + time.sleep(2) + if pod_name is None: + raise RuntimeError( + f"Timed out waiting for pod to be created for job {job_name}" + ) + return pod_name + + +def _create_namespaced_job( + job_name: str, + namespace: str, + pvc_name: str, + image: str, + device_requests: List[client.V1DeviceRequest], + pv_mounts: Dict[str, str], + args: List[str] = None, + shm_size: str = None, + user: str = None, +) -> None: + """Create a namespaced Job in the specified namespace. + + Args: + job_name (str): Name of the Job to create + namespace (str): The Kubernetes namespace to create the Job in + pvc_name (str): Name of the PersistentVolumeClaim (PVC) to use for this Job + image (str): The image to use for the Job + device_requests (List[client.V1DeviceRequest]): The device requests to use for the Job + pv_mounts (Dict[str, str]): The PersistentVolumeClaims (PVCs) to mount to the Job. + args (List[str]): The arguments to use for the Job. Defaults to None. + shm_size (str): The size of the shared memory to use for the Job. Defaults to None. + user (str): The user to run the Job as. Defaults to None (root is used if not specified). + """ + batch_v1_api = client.BatchV1Api() + job_list = batch_v1_api.list_namespaced_job(namespace=namespace) + if len(job_list.items) > 0: + for job in job_list.items: + if job.metadata.name == job_name: + logger.warning( + f"Job '{job_name}' already exists in namespace '{namespace}'. Deleting it." + ) + batch_v1_api.delete_namespaced_job(name=job_name, namespace=namespace) + core_v1_api = client.CoreV1Api() + label_selector = f"job-name={job_name}" + pod_list = core_v1_api.list_namespaced_pod( + namespace=namespace, label_selector=label_selector + ) + for pod in pod_list.items: + pod_name_to_delete = pod.metadata.name + logger.warning( + f"Deleting pod '{pod_name_to_delete}' in namespace '{namespace}' associated with job '{job_name}'." + ) + try: + core_v1_api.delete_namespaced_pod( + name=pod_name_to_delete, namespace=namespace + ) + except Exception as e: + logger.error( + f"Failed to delete pod '{pod_name_to_delete}' in namespace '{namespace}': {e}" + ) + + # user_id = int(user.split(":")[0]) if user else 0 + # group_id = int(user.split(":")[1]) if user else 0 + volume_mounts = [] + for pvc_name, mount_path in pv_mounts.items(): + volume_mounts.append(client.V1VolumeMount(name=pvc_name, mount_path=mount_path)) + container_spec = client.V1Container( + name="job-container", + image=image, + volume_mounts=volume_mounts, + # security_context=client.V1SecurityContext(run_as_user=user_id, run_as_group=group_id) + ) + if len(device_requests) > 0: + container_spec.resources = client.V1ResourceRequirements( + requests={"nvidia.com/gpu": 1}, limits={"nvidia.com/gpu": 1} + ) + volumes = [ + client.V1Volume( + name=pvc_name, + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( + claim_name=pvc_name + ), + ) + ] + if shm_size is not None: + shm_size_formatted = shm_size.replace("gb", "Gi") + volumes.append( + client.V1Volume( + name="shm", + empty_dir=client.V1EmptyDirVolumeSource( + medium="Memory", size_limit=shm_size_formatted + ), + ) + ) + container_spec.volume_mounts.append( + client.V1VolumeMount(name="shm", mount_path="/dev/shm") + ) + + if args is not None: + container_spec.args = args + batch_v1_api.create_namespaced_job( + namespace=namespace, + body=client.V1Job( + metadata=client.V1ObjectMeta(name=job_name), + spec=client.V1JobSpec( + template=client.V1PodTemplateSpec( + spec=client.V1PodSpec( + restart_policy="Never", + volumes=volumes, + init_containers=[ + client.V1Container( + name="init-container", + image="alpine:latest", + command=[ + "sh", + "-c", + "while [ ! -f /etc/content_verified ]; do sleep 1; done", + ], + volume_mounts=volume_mounts, + ) + ], + containers=[container_spec], + ) + ) + ), + ), + ) + + core_v1_api = client.CoreV1Api() + label_selector = f"job-name={job_name}" + pod_name = None + for _ in range(60): + pod_list = core_v1_api.list_namespaced_pod( + namespace=namespace, label_selector=label_selector + ) + if pod_list.items: + latest_pod = max( + pod_list.items, key=lambda pod: pod.metadata.creation_timestamp + ) + pod_name = latest_pod.metadata.name + break + time.sleep(2) + if pod_name is None: + raise RuntimeError( + f"Timed out waiting for pod to be created for job {job_name}" + ) + return pod_name + + +def run_job( + algorithm: AlgorithmData, + data_path: Path, + output_path: Path, + cuda_devices: str, + force_cpu: bool, + internal_external_name_map: Optional[Dict[str, str]] = None, + namespace: Optional[str] = "default", + pvc_name: Optional[str] = None, + pvc_storage_size: Optional[str] = "1Gi", + pvc_storage_class: Optional[str] = None, + job_name: Optional[str] = None, + data_mount_path: Optional[str] = "/data", +): + """Run a Kubernetes job for the provided algorithm. + + Args: + algorithm (AlgorithmData): The data of the algorithm to run + data_path (Path | str): The path to the input data + output_path (Path | str): The path to save the output + cuda_devices (str): The CUDA devices to use + force_cpu (bool): Whether to force CPU execution + internal_external_name_map (Dict[str, str]): Dictionary mapping internal name (in standardized format) to external subject name provided by user (only used for batch inference) + namespace (Optional[str], optional): The Kubernetes namespace to run the job in. Defaults to "default". + pvc_name (str): Name of the PersistentVolumeClaim (PVC) to use for this job. If the PVC does not already exist, it will be created; otherwise, it must already contain the input data required for running the algorithm. + pvc_storage_size (str): The size of the storage to request for the PVC. Defaults to "1Gi". + pvc_storage_class (str): The storage class to use for the PVC. If None, the default storage class will be used. + job_name (str): Name of the Job to create. If None, a random name will be generated. + data_mount_path (str): The path to mount the PVC to. Defaults to "/data". + """ + if pvc_name is None: + pvc_name = ( + "brats-" + + "".join(random.choices(string.ascii_lowercase + string.digits, k=12)) + + "-pvc" + ) + if job_name is None: + job_name = ( + "brats-" + + "".join(random.choices(string.ascii_lowercase + string.digits, k=12)) + + "-job" + ) + + logger.debug(f"Job name: {job_name}") + logger.debug(f"PersistentVolumeClaim name: {pvc_name}") + config.load_kube_config() + _create_namespaced_pvc( + pvc_name=pvc_name, + namespace=namespace, + storage_size=pvc_storage_size, + storage_class=pvc_storage_class, + ) + _log_algorithm_info(algorithm=algorithm) + + device_requests = _handle_device_requests( + algorithm=algorithm, + cuda_devices=cuda_devices, + force_cpu=force_cpu, + ) + logger.debug(f"GPU Device requests: {device_requests}") + user = _get_container_user(algorithm=algorithm) + logger.debug(f"Container user: {user if user else 'root (required by algorithm)'}") + + output_mount_path = Path(data_mount_path).joinpath("output") + if algorithm.meta.year > 2024: + data_mount_path = "/input" + output_mount_path = "/output" + + if algorithm.meta.year <= 2024: + if algorithm.additional_files is not None: + zenodo_response = _get_zenodo_metadata_and_archive_url( + record_id=algorithm.additional_files.record_id + ) + if not zenodo_response: + msg = "Additional files not found locally and Zenodo could not be reached. Exiting..." + logger.error(msg) + raise ZenodoException(msg) + + zenodo_metadata, archive_url = zenodo_response + additional_files_path = Path(data_mount_path).joinpath( + f"{algorithm.additional_files.record_id}_v{zenodo_metadata['version']}" + ) + else: + additional_files_path = get_dummy_path() + + command_args = _build_command_args( + algorithm=algorithm, + additional_files_path=additional_files_path, + data_path=Path(data_mount_path).joinpath("input"), + output_path=output_mount_path, + mount_path=data_mount_path, + ) + command = ["infer", *command_args.split(" ")] + pv_mounts = { + pvc_name: data_mount_path, + } + else: + command = None + _create_namespaced_pvc( + pvc_name=pvc_name + "-output", + namespace=namespace, + storage_size=pvc_storage_size, + storage_class=pvc_storage_class, + ) + + pv_mounts = { + pvc_name: data_mount_path, + pvc_name + "-output": output_mount_path, + } + + pod_name = _create_namespaced_job( + job_name=job_name, + namespace=namespace, + pvc_name=pvc_name, + pv_mounts=pv_mounts, + image=algorithm.run_args.docker_image, + device_requests=device_requests, + args=command, + shm_size=algorithm.run_args.shm_size, + user=user, + ) + logger.debug(f"Pod name: {pod_name}") + + core_v1_api = client.CoreV1Api() + logger.info(f"Waiting for Pod '{pod_name}' to be running...") + for _ in range(300): # wait up to 10 minutes + pod = core_v1_api.read_namespaced_pod(name=pod_name, namespace=namespace) + pod_phase = pod.status.phase + if pod.status.init_container_statuses: + exit_loop = False + for init_status in pod.status.init_container_statuses: + state = init_status.state + if state and state.running: + logger.info( + f"Pod '{pod_name}' initContainer '{init_status.name}' is running." + ) + exit_loop = True + break + if exit_loop: + break + else: + if pod_phase == "Running": + logger.info(f"Pod '{pod_name}' is running.") + break + elif pod_phase in ["Failed", "Succeeded"]: + logger.warning( + f"Pod '{pod_name}' entered terminal phase: {pod_phase}" + ) + break + else: + if pod_phase == "Running": + logger.info(f"Pod '{pod_name}' is running.") + break + elif pod_phase in ["Failed", "Succeeded"]: + logger.warning(f"Pod '{pod_name}' entered terminal phase: {pod_phase}") + break + time.sleep(2) + else: + raise RuntimeError(f"Timed out waiting for pod {pod_name} to be running") + _check_files_in_pod( + pod_name=pod_name, + namespace=namespace, + paths=[Path(data_path)], + mount_path=data_mount_path, + ) + logger.debug( + f"Files checked successfully in pod '{pod_name}' in namespace '{namespace}'." + ) + + if algorithm.meta.year <= 2024: + additional_files_path = _download_additional_files( + algorithm=algorithm, + pod_name=pod_name, + namespace=namespace, + mount_path=data_mount_path, + ) + _upload_files_to_pod( + pod_name=pod_name, + namespace=namespace, + paths=[PARAMETERS_DIR], + mount_path=data_mount_path, + parent_dir="parameters", + ) + commands = ["tree", data_mount_path] + output = _execute_command_in_pod( + pod_name=pod_name, + namespace=namespace, + command=commands, + container="init-container", + ) + commands = ["touch", "/etc/content_verified"] + output = _execute_command_in_pod( + pod_name=pod_name, + namespace=namespace, + command=commands, + container="init-container", + ) + + output_path.mkdir(parents=True, exist_ok=True) + + logger.info(f"{'Starting inference'}") + start_time = time.time() + + time.sleep(2) + job_output = _observe_job_output(pod_name=pod_name, namespace=namespace) + + core_v1_api = client.CoreV1Api() + for _ in range(300): # Wait up to 10 minutes + pod = core_v1_api.read_namespaced_pod(name=pod_name, namespace=namespace) + pod_phase = pod.status.phase + if pod_phase in ("Succeeded", "Failed"): + logger.info(f"Finalizer pod '{pod_name}' finished with phase: {pod_phase}") + break + time.sleep(2) + else: + raise RuntimeError( + f"Timed out waiting for finalizer pod '{pod_name}' to complete." + ) + + pvc_name_output = pvc_name + mount_path = str(data_mount_path) + if algorithm.meta.year > 2024: + pvc_name_output = pvc_name + "-output" + mount_path = str(output_mount_path) + + pod_name_finalizer = _create_finalizer_job( + job_name=job_name + "-finalizer", + namespace=namespace, + pvc_name=pvc_name_output, + mount_path=mount_path, + ) + time.sleep(2) + _download_folder_from_pod( + pod_name=pod_name_finalizer, + namespace=namespace, + container="finalizer-container", + remote_paths=[output_mount_path], + local_base_dir=output_path, + ) + + commands = ["touch", "/etc/content_verified"] + output = _execute_command_in_pod( + pod_name=pod_name_finalizer, + namespace=namespace, + command=commands, + container="finalizer-container", + ) + + _sanity_check_output( + data_path=data_path, + output_path=output_path, + container_output=job_output, + internal_external_name_map=internal_external_name_map, + ) + + logger.debug(f"Job output: \n\r{job_output}") + + logger.info(f"Finished inference in {time.time() - start_time:.2f} seconds") From e99a8374ebdeb4775cf0e2fc79c4df68f850811f Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Sun, 19 Oct 2025 15:00:01 +0000 Subject: [PATCH 07/31] Remove debug print statement for base folder name in _download_folder_from_pod function in kubernetes.py --- brats/core/kubernetes.py | 1 - 1 file changed, 1 deletion(-) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index c8b647d..79aafff 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -297,7 +297,6 @@ def _download_folder_from_pod( tar_data = base64.b64decode(full_base64) base_folder_name = Path(folder_name).name - print(f"base_folder_name: {local_base_dir}") tarfile_path = local_base_dir / f"{base_folder_name}" with open(tarfile_path, "wb") as tarfile_obj: tarfile_obj.write(tar_data) From a59b90e14a5465ef7bc67a7e290964da0192ebcb Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 3 Nov 2025 19:09:10 +0000 Subject: [PATCH 08/31] Add comprehensive unit tests for Kubernetes integration in `test_kubernetes.py`. Tests cover command argument building, file handling, job creation, and PVC management for different algorithm configurations. --- tests/core/test_kubernetes.py | 476 ++++++++++++++++++++++++++++++++++ 1 file changed, 476 insertions(+) create mode 100644 tests/core/test_kubernetes.py diff --git a/tests/core/test_kubernetes.py b/tests/core/test_kubernetes.py new file mode 100644 index 0000000..bf37be5 --- /dev/null +++ b/tests/core/test_kubernetes.py @@ -0,0 +1,476 @@ +import io +import tarfile +import base64 +import types +import time +from pathlib import Path +from datetime import datetime, timezone +from unittest.mock import MagicMock, patch, call + +import pytest + +# Target module +import brats.core.kubernetes as k8s + + +@pytest.fixture +def dummy_algorithm(): + class AddFiles: + def __init__(self): + self.record_id = "12345" + self.param_name = ["weights", "config"] + self.param_path = ["w.bin", "c.yaml"] + + class Meta: + def __init__(self, year): + self.year = year + + class RunArgs: + def __init__(self): + self.docker_image = "alpine:latest" + self.shm_size = "1gb" + self.parameters_file = None + + class Algo: + def __init__(self, year=2024, with_additional=True): + self.meta = Meta(year) + self.run_args = RunArgs() + self.additional_files = AddFiles() if with_additional else None + + return Algo + + +@pytest.fixture +def tmp_tree(tmp_path): + # creates input dir structure with one file + input_dir = tmp_path / "input" + input_dir.mkdir(parents=True, exist_ok=True) + (input_dir / "case1").mkdir(parents=True, exist_ok=True) + f = input_dir / "case1" / "image.nii.gz" + f.write_bytes(b"dummy") + return tmp_path + + +def _mk_pod(name="pod-1", phase="Running", init_running=True, creation_ts=None): + pod = types.SimpleNamespace() + pod.metadata = types.SimpleNamespace() + pod.metadata.name = name + pod.metadata.creation_timestamp = creation_ts or datetime.now(timezone.utc) + pod.status = types.SimpleNamespace() + pod.status.phase = phase + # container_statuses for _observe_job_output + cstat = types.SimpleNamespace() + cstate = types.SimpleNamespace() + cstate.running = True + cstat.name = "job-container" + cstat.state = cstate + pod.status.container_statuses = [cstat] + # init container statuses for run_job wait loop + istat = types.SimpleNamespace() + istat.name = "init-container" + istate = types.SimpleNamespace() + istate.running = True if init_running else None + istat.state = istate + pod.status.init_container_statuses = [istat] if init_running is not None else None + return pod + + +### _build_command_args + +def test_build_command_args_with_additional_and_params(dummy_algorithm, monkeypatch, tmp_path): + algo = dummy_algorithm(year=2024, with_additional=True) + monkeypatch.setattr(k8s, "_get_parameters_arg", lambda algorithm: " --params=/mlcube_io3/foo.json") + + cmd = k8s._build_command_args( + algorithm=algo, + additional_files_path="/data/weights_dir", + data_path="/data/input", + output_path="/data/output", + mount_path="/data", + ) + + # must include basic paths + assert "--data_path=/data/input" in cmd + assert "--output_path=/data/output" in cmd + # must include additional files args with per-param path + assert "--weights=/data/weights_dir/w.bin" in cmd + assert "--config=/data/weights_dir/c.yaml" in cmd + # must rewrite /mlcube_io3 to /data/parameters + assert "/mlcube_io3" not in cmd + assert "/data/parameters/foo.json" in cmd + + +def test_build_command_args_without_additional_and_params(dummy_algorithm, monkeypatch): + algo = dummy_algorithm(year=2024, with_additional=False) + monkeypatch.setattr(k8s, "_get_parameters_arg", lambda algorithm: "") + + cmd = k8s._build_command_args( + algorithm=algo, + additional_files_path="/ignored", + data_path="/data/input", + output_path="/data/output", + mount_path="/data", + ) + + assert "--data_path=/data/input" in cmd + assert "--output_path=/data/output" in cmd + assert "--weights=" not in cmd + assert "--config=" not in cmd + + +### _execute_command_in_pod + +def test_execute_command_in_pod(monkeypatch): + mock_core = MagicMock() + monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) + mock_stream = MagicMock(return_value="OK") + monkeypatch.setattr(k8s, "stream", mock_stream) + + out = k8s._execute_command_in_pod( + pod_name="p", namespace="ns", command=["echo", "hi"], container="job-container" + ) + + assert out == "OK" + mock_stream.assert_called_once() + # ensure connect_get_namespaced_pod_exec passed into stream + assert mock_stream.call_args.kwargs.get("_preload_content") is True + + +### _download_additional_files + +def test_download_additional_files_success(dummy_algorithm, monkeypatch): + algo = dummy_algorithm(year=2024, with_additional=True) + monkeypatch.setattr( + k8s, "_get_zenodo_metadata_and_archive_url", lambda record_id: ({"version": "2"}, "http://example/archive.zip") + ) + exec_calls = [] + def fake_exec(**kwargs): + exec_calls.append(kwargs) + return "ok" + monkeypatch.setattr(k8s, "_execute_command_in_pod", lambda **kw: fake_exec(**kw)) + p = k8s._download_additional_files(algorithm=algo, pod_name="p", namespace="ns", mount_path="/data") + assert str(p) == "/data/12345_v2" + assert exec_calls and exec_calls[0]["container"] == "init-container" + + +def test_download_additional_files_zenodo_failure(dummy_algorithm, monkeypatch): + algo = dummy_algorithm(year=2024, with_additional=True) + monkeypatch.setattr(k8s, "_get_zenodo_metadata_and_archive_url", lambda record_id: None) + with pytest.raises(k8s.ZenodoException): + k8s._download_additional_files(algorithm=algo, pod_name="p", namespace="ns", mount_path="/data") + + +### _check_files_in_pod + +def test_check_files_in_pod_uploads_missing(monkeypatch, tmp_tree): + # Simulate ls output saying missing + monkeypatch.setattr(k8s, "_execute_command_in_pod", lambda **kw: "No such file or directory") + uploaded = {"called": False, "args": None} + def fake_upload(**kwargs): + uploaded["called"] = True + uploaded["args"] = kwargs + monkeypatch.setattr(k8s, "_upload_files_to_pod", lambda **kw: fake_upload(**kw)) + + k8s._check_files_in_pod( + pod_name="p", namespace="ns", paths=[tmp_tree / "input"], mount_path="/data" + ) + + assert uploaded["called"] is True + assert uploaded["args"]["parent_dir"] == "input" + + +### _download_folder_from_pod + +def test_download_folder_from_pod(monkeypatch, tmp_path): + # Create a tar stream that contains one file "foo.txt" + tar_bytes = io.BytesIO() + with tarfile.open(fileobj=tar_bytes, mode="w") as tar: + data = io.BytesIO(b"hello") + info = tarfile.TarInfo(name="foo.txt") + info.size = len(b"hello") + tar.addfile(info, data) + tar_bytes.seek(0) + b64 = base64.b64encode(tar_bytes.getvalue()) + + # Make fake streaming response + class FakeResp: + def __init__(self): + self._stdout_chunks = [b64] + self._stderr_chunks = [] + self._open = True + def is_open(self): + return self._open + def update(self, timeout=1): + # close after delivering stdout + self._open = False + def peek_stdout(self): + return bool(self._stdout_chunks) + def read_stdout(self): + return self._stdout_chunks.pop(0) + def peek_stderr(self): + return False + def read_stderr(self): + return "" + def close(self): + self._open = False + + monkeypatch.setattr(k8s, "_execute_command_in_pod", lambda **kw: FakeResp()) + + k8s._download_folder_from_pod( + pod_name="p", + namespace="ns", + container="finalizer-container", + remote_paths=[Path("/output")], + local_base_dir=tmp_path, + ) + + # Expect foo.txt extracted into tmp_path + expect = tmp_path / "foo.txt" + assert expect.exists() + assert expect.read_text() == "hello" + + +### _upload_files_to_pod + +def test_upload_files_to_pod(monkeypatch, tmp_path): + f = tmp_path / "a.txt" + f.write_text("hi") + class FakeResp: + def __init__(self): + self.stdin = io.BytesIO() + def write_stdin(self, data): + # consume some bytes + assert isinstance(data, (bytes, bytearray)) + def close(self): + pass + monkeypatch.setattr( + k8s, "_execute_command_in_pod", lambda **kw: FakeResp() + ) + + k8s._upload_files_to_pod( + pod_name="p", + namespace="ns", + paths=[f], + mount_path="/data", + relative_to=tmp_path, + parent_dir="input", + ) + + +### _create_namespaced_pvc + +def test_create_pvc_skips_if_exists(monkeypatch): + mock_core = MagicMock() + pvc = types.SimpleNamespace() + pvc.metadata = types.SimpleNamespace(name="my-pvc") + mock_core.list_namespaced_persistent_volume_claim.return_value = types.SimpleNamespace(items=[pvc]) + monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) + + k8s._create_namespaced_pvc("my-pvc", "default", "5Gi", None) + mock_core.create_namespaced_persistent_volume_claim.assert_not_called() + + +def test_create_pvc_creates_with_and_without_storage_class(monkeypatch): + mock_core = MagicMock() + mock_core.list_namespaced_persistent_volume_claim.return_value = types.SimpleNamespace(items=[]) + monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) + + k8s._create_namespaced_pvc("p1", "ns", "1Gi", None) + k8s._create_namespaced_pvc("p2", "ns", "2Gi", "fast") + + assert mock_core.create_namespaced_persistent_volume_claim.call_count == 2 + + +### _create_finalizer_job + +def test_create_finalizer_job(monkeypatch): + mock_batch = MagicMock() + mock_core = MagicMock() + + # No existing jobs + mock_batch.list_namespaced_job.return_value = types.SimpleNamespace(items=[]) + # pods appear after first poll + pod = _mk_pod(name="final-pod", phase="Running", init_running=None) + mock_core.list_namespaced_pod.side_effect = [ + types.SimpleNamespace(items=[]), + types.SimpleNamespace(items=[pod]), + ] + + monkeypatch.setattr(k8s.client, "BatchV1Api", lambda: mock_batch) + monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) + + name = k8s._create_finalizer_job(job_name="job-final", namespace="ns", pvc_name="pvc", mount_path="/data") + assert name == "final-pod" + assert mock_batch.create_namespaced_job.call_count == 1 + + +### _create_namespaced_job + +def test_create_namespaced_job_deletes_old_pod_and_creates_new(monkeypatch): + mock_batch = MagicMock() + mock_core = MagicMock() + + # Existing job with same name to be deleted + mock_batch.list_namespaced_job.return_value = types.SimpleNamespace(items=[types.SimpleNamespace(metadata=types.SimpleNamespace(name="job-1"))]) + + # Existing pod to be deleted + old_pod = _mk_pod(name="old-pod") + mock_core.list_namespaced_pod.return_value = types.SimpleNamespace(items=[old_pod]) + + # When polling for new pod, first empty then one appears + new_pod = _mk_pod(name="new-pod") + mock_core.list_namespaced_pod.side_effect = [ + types.SimpleNamespace(items=[old_pod]), # for deletion scan + types.SimpleNamespace(items=[]), # first poll + types.SimpleNamespace(items=[new_pod]), # second poll -> found + ] + + monkeypatch.setattr(k8s.client, "BatchV1Api", lambda: mock_batch) + monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) + + name = k8s._create_namespaced_job( + job_name="job-1", + namespace="ns", + pvc_name="pvc", + image="alpine:latest", + device_requests=[], + pv_mounts={"pvc": "/data"}, + args=["echo", "hi"], + shm_size="1gb", + user=None, + ) + + assert name == "new-pod" + assert mock_batch.delete_namespaced_job.call_count == 1 + assert mock_core.delete_namespaced_pod.call_count == 1 + assert mock_batch.create_namespaced_job.call_count == 1 + + +### run_job (two branches) + +def test_run_job_year_2024_flow(monkeypatch, tmp_tree, tmp_path, dummy_algorithm): + algo = dummy_algorithm(year=2024, with_additional=True) + + # Config load + monkeypatch.setattr(k8s.config, "load_kube_config", lambda: None) + + # Helper functions + monkeypatch.setattr(k8s, "_log_algorithm_info", lambda **kw: None) + monkeypatch.setattr(k8s, "_handle_device_requests", lambda **kw: []) + monkeypatch.setattr(k8s, "_get_container_user", lambda **kw: None) + monkeypatch.setattr(k8s, "_get_parameters_arg", lambda algorithm=None: "") + monkeypatch.setattr(k8s, "_get_zenodo_metadata_and_archive_url", lambda record_id: ({"version": "1"}, "http://example/zip")) + monkeypatch.setattr(k8s, "PARAMETERS_DIR", tmp_tree / "params") + (tmp_tree / "params").mkdir(exist_ok=True) + monkeypatch.setattr(k8s, "get_dummy_path", lambda: Path("/dummy")) + monkeypatch.setattr(k8s, "_sanity_check_output", lambda **kw: None) + + # PVC creation + monkeypatch.setattr(k8s, "_create_namespaced_pvc", lambda **kw: None) + + # Job + pod lifecycle + def fake_create_job(**kwargs): + return "job-pod" + monkeypatch.setattr(k8s, "_create_namespaced_job", lambda **kw: fake_create_job(**kw)) + + # CoreV1Api read_namespaced_pod progression + mock_core = MagicMock() + # First loop waits until init container running + mock_core.read_namespaced_pod.return_value = _mk_pod(name="job-pod", phase="Running", init_running=True) + monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) + + # File checking + monkeypatch.setattr(k8s, "_check_files_in_pod", lambda **kw: None) + + # Additional files download and parameters upload + monkeypatch.setattr(k8s, "_download_additional_files", lambda **kw: Path("/data/12345_v1")) + monkeypatch.setattr(k8s, "_upload_files_to_pod", lambda **kw: None) + + # Exec commands in init container + monkeypatch.setattr(k8s, "_execute_command_in_pod", lambda **kw: "ok") + + # Observe logs + monkeypatch.setattr(k8s, "_observe_job_output", lambda **kw: "LOGS") + + # Pod completes + def read_pod_done(name, namespace): + return _mk_pod(name=name, phase="Succeeded", init_running=True) + mock_core.read_namespaced_pod.side_effect = [ + _mk_pod(name="job-pod", phase="Running", init_running=True), # wait loop + _mk_pod(name="job-pod", phase="Succeeded", init_running=True), # completion loop + ] + + # Finalizer job + monkeypatch.setattr(k8s, "_create_finalizer_job", lambda **kw: "final-pod") + monkeypatch.setattr(k8s, "_download_folder_from_pod", lambda **kw: None) + + out_dir = tmp_path / "out" + k8s.run_job( + algorithm=algo, + data_path=tmp_tree / "input", + output_path=out_dir, + cuda_devices="", + force_cpu=True, + internal_external_name_map=None, + namespace="default", + pvc_name="mypvc", + pvc_storage_size="1Gi", + pvc_storage_class=None, + job_name="myjob", + data_mount_path="/data", + ) + + # Ensure output dir created + assert out_dir.exists() + + +def test_run_job_year_2025_flow(monkeypatch, tmp_tree, tmp_path, dummy_algorithm): + algo = dummy_algorithm(year=2025, with_additional=False) + monkeypatch.setattr(k8s.config, "load_kube_config", lambda: None) + monkeypatch.setattr(k8s, "_log_algorithm_info", lambda **kw: None) + monkeypatch.setattr(k8s, "_handle_device_requests", lambda **kw: []) + monkeypatch.setattr(k8s, "_get_container_user", lambda **kw: None) + monkeypatch.setattr(k8s, "_get_parameters_arg", lambda algorithm=None: "") + monkeypatch.setattr(k8s, "_sanity_check_output", lambda **kw: None) + # PVCs + created = [] + def fake_create_pvc(**kw): + created.append(kw["pvc_name"]) + monkeypatch.setattr(k8s, "_create_namespaced_pvc", lambda **kw: fake_create_pvc(**kw)) + + # Job creation + monkeypatch.setattr(k8s, "_create_namespaced_job", lambda **kw: "job-pod") + + # Pod running/completion + mock_core = MagicMock() + mock_core.read_namespaced_pod.side_effect = [ + _mk_pod(name="job-pod", phase="Running", init_running=True), + _mk_pod(name="job-pod", phase="Succeeded", init_running=True), + ] + monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) + + # File checks skipped in behavior but called; mock anyway + monkeypatch.setattr(k8s, "_check_files_in_pod", lambda **kw: None) + monkeypatch.setattr(k8s, "_execute_command_in_pod", lambda **kw: "ok") + monkeypatch.setattr(k8s, "_observe_job_output", lambda **kw: "LOGS") + monkeypatch.setattr(k8s, "_create_finalizer_job", lambda **kw: "final-pod") + monkeypatch.setattr(k8s, "_download_folder_from_pod", lambda **kw: None) + + out_dir = tmp_path / "out2" + k8s.run_job( + algorithm=algo, + data_path=tmp_tree / "input", + output_path=out_dir, + cuda_devices="", + force_cpu=True, + namespace="default", + pvc_name="mypvc2", + pvc_storage_size="1Gi", + job_name="myjob2", + data_mount_path="/data", + ) + + # For year > 2024, an extra output PVC should be created + assert "mypvc2" in created + assert "mypvc2-output" in created + assert out_dir.exists() \ No newline at end of file From 90a16c56fbb1c416452286bec8da6247760e44a1 Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 3 Nov 2025 19:09:23 +0000 Subject: [PATCH 09/31] Update `_download_folder_from_pod` function to use absolute path for `local_base_dir` parameter --- brats/core/kubernetes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index 79aafff..83da5bc 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -259,7 +259,7 @@ def _check_files_in_pod( def _download_folder_from_pod( - pod_name, namespace, container, remote_paths, local_base_dir="." + pod_name, namespace, container, remote_paths, local_base_dir=Path(".").absolute() ): v1 = client.CoreV1Api() From cf340a2f117f4c8c840e9fa79735a1ee1665a296 Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 3 Nov 2025 19:32:09 +0000 Subject: [PATCH 10/31] Add optional `kubernetes_kwargs` parameter to `BraTSAlgorithm` for enhanced Kubernetes backend configuration --- brats/core/brats_algorithm.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/brats/core/brats_algorithm.py b/brats/core/brats_algorithm.py index bf763c9..e7bdf4c 100644 --- a/brats/core/brats_algorithm.py +++ b/brats/core/brats_algorithm.py @@ -175,6 +175,7 @@ def _infer_single( output_file: Path | str, log_file: Optional[Path | str] = None, backend: Backends = Backends.DOCKER, + kubernetes_kwargs: Optional[Dict] = None, ) -> None: """ Perform a single inference run with the provided inputs and save the output in the specified file. @@ -183,7 +184,7 @@ def _infer_single( inputs (dict[str, Path | str]): Input Images for the task output_file (Path | str): File to save the output log_file (Optional[Path | str], optional): Log file with extra information. Defaults to None. - backend (Backends | str, optional): Backend to use for inference. Defaults to Backends.DOCKER. + backend (Backends, optional): Backend to use for inference. Defaults to Backends.DOCKER. kubernetes_kwargs (Optional[Dict], optional): Optional keyword arguments for Kubernetes Backend. Defaults to None. """ with InferenceSetup(log_file=log_file) as (tmp_data_folder, tmp_output_folder): @@ -202,7 +203,7 @@ def _infer_single( runner = self._get_backend_runner(backend) if runner is None: raise ValueError(f"Unsupported backend: {backend}") - runner( + runner_kwargs = dict( algorithm=self.algorithm, data_path=tmp_data_folder, output_path=tmp_output_folder, From f399c7e996d34e8b62e7d9d4e666c6f0fc2b5f35 Mon Sep 17 00:00:00 2001 From: "brainless-bot[bot]" <153751247+brainless-bot[bot]@users.noreply.github.com> Date: Mon, 3 Nov 2025 19:34:40 +0000 Subject: [PATCH 11/31] Autoformat with black --- tests/core/test_kubernetes.py | 107 +++++++++++++++++++++++++++------- 1 file changed, 85 insertions(+), 22 deletions(-) diff --git a/tests/core/test_kubernetes.py b/tests/core/test_kubernetes.py index bf37be5..d5bcc82 100644 --- a/tests/core/test_kubernetes.py +++ b/tests/core/test_kubernetes.py @@ -77,9 +77,14 @@ def _mk_pod(name="pod-1", phase="Running", init_running=True, creation_ts=None): ### _build_command_args -def test_build_command_args_with_additional_and_params(dummy_algorithm, monkeypatch, tmp_path): + +def test_build_command_args_with_additional_and_params( + dummy_algorithm, monkeypatch, tmp_path +): algo = dummy_algorithm(year=2024, with_additional=True) - monkeypatch.setattr(k8s, "_get_parameters_arg", lambda algorithm: " --params=/mlcube_io3/foo.json") + monkeypatch.setattr( + k8s, "_get_parameters_arg", lambda algorithm: " --params=/mlcube_io3/foo.json" + ) cmd = k8s._build_command_args( algorithm=algo, @@ -120,6 +125,7 @@ def test_build_command_args_without_additional_and_params(dummy_algorithm, monke ### _execute_command_in_pod + def test_execute_command_in_pod(monkeypatch): mock_core = MagicMock() monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) @@ -138,37 +144,53 @@ def test_execute_command_in_pod(monkeypatch): ### _download_additional_files + def test_download_additional_files_success(dummy_algorithm, monkeypatch): algo = dummy_algorithm(year=2024, with_additional=True) monkeypatch.setattr( - k8s, "_get_zenodo_metadata_and_archive_url", lambda record_id: ({"version": "2"}, "http://example/archive.zip") + k8s, + "_get_zenodo_metadata_and_archive_url", + lambda record_id: ({"version": "2"}, "http://example/archive.zip"), ) exec_calls = [] + def fake_exec(**kwargs): exec_calls.append(kwargs) return "ok" + monkeypatch.setattr(k8s, "_execute_command_in_pod", lambda **kw: fake_exec(**kw)) - p = k8s._download_additional_files(algorithm=algo, pod_name="p", namespace="ns", mount_path="/data") + p = k8s._download_additional_files( + algorithm=algo, pod_name="p", namespace="ns", mount_path="/data" + ) assert str(p) == "/data/12345_v2" assert exec_calls and exec_calls[0]["container"] == "init-container" def test_download_additional_files_zenodo_failure(dummy_algorithm, monkeypatch): algo = dummy_algorithm(year=2024, with_additional=True) - monkeypatch.setattr(k8s, "_get_zenodo_metadata_and_archive_url", lambda record_id: None) + monkeypatch.setattr( + k8s, "_get_zenodo_metadata_and_archive_url", lambda record_id: None + ) with pytest.raises(k8s.ZenodoException): - k8s._download_additional_files(algorithm=algo, pod_name="p", namespace="ns", mount_path="/data") + k8s._download_additional_files( + algorithm=algo, pod_name="p", namespace="ns", mount_path="/data" + ) ### _check_files_in_pod + def test_check_files_in_pod_uploads_missing(monkeypatch, tmp_tree): # Simulate ls output saying missing - monkeypatch.setattr(k8s, "_execute_command_in_pod", lambda **kw: "No such file or directory") + monkeypatch.setattr( + k8s, "_execute_command_in_pod", lambda **kw: "No such file or directory" + ) uploaded = {"called": False, "args": None} + def fake_upload(**kwargs): uploaded["called"] = True uploaded["args"] = kwargs + monkeypatch.setattr(k8s, "_upload_files_to_pod", lambda **kw: fake_upload(**kw)) k8s._check_files_in_pod( @@ -181,6 +203,7 @@ def fake_upload(**kwargs): ### _download_folder_from_pod + def test_download_folder_from_pod(monkeypatch, tmp_path): # Create a tar stream that contains one file "foo.txt" tar_bytes = io.BytesIO() @@ -198,19 +221,26 @@ def __init__(self): self._stdout_chunks = [b64] self._stderr_chunks = [] self._open = True + def is_open(self): return self._open + def update(self, timeout=1): # close after delivering stdout self._open = False + def peek_stdout(self): return bool(self._stdout_chunks) + def read_stdout(self): return self._stdout_chunks.pop(0) + def peek_stderr(self): return False + def read_stderr(self): return "" + def close(self): self._open = False @@ -232,20 +262,23 @@ def close(self): ### _upload_files_to_pod + def test_upload_files_to_pod(monkeypatch, tmp_path): f = tmp_path / "a.txt" f.write_text("hi") + class FakeResp: def __init__(self): self.stdin = io.BytesIO() + def write_stdin(self, data): # consume some bytes assert isinstance(data, (bytes, bytearray)) + def close(self): pass - monkeypatch.setattr( - k8s, "_execute_command_in_pod", lambda **kw: FakeResp() - ) + + monkeypatch.setattr(k8s, "_execute_command_in_pod", lambda **kw: FakeResp()) k8s._upload_files_to_pod( pod_name="p", @@ -259,11 +292,14 @@ def close(self): ### _create_namespaced_pvc + def test_create_pvc_skips_if_exists(monkeypatch): mock_core = MagicMock() pvc = types.SimpleNamespace() pvc.metadata = types.SimpleNamespace(name="my-pvc") - mock_core.list_namespaced_persistent_volume_claim.return_value = types.SimpleNamespace(items=[pvc]) + mock_core.list_namespaced_persistent_volume_claim.return_value = ( + types.SimpleNamespace(items=[pvc]) + ) monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) k8s._create_namespaced_pvc("my-pvc", "default", "5Gi", None) @@ -272,7 +308,9 @@ def test_create_pvc_skips_if_exists(monkeypatch): def test_create_pvc_creates_with_and_without_storage_class(monkeypatch): mock_core = MagicMock() - mock_core.list_namespaced_persistent_volume_claim.return_value = types.SimpleNamespace(items=[]) + mock_core.list_namespaced_persistent_volume_claim.return_value = ( + types.SimpleNamespace(items=[]) + ) monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) k8s._create_namespaced_pvc("p1", "ns", "1Gi", None) @@ -283,6 +321,7 @@ def test_create_pvc_creates_with_and_without_storage_class(monkeypatch): ### _create_finalizer_job + def test_create_finalizer_job(monkeypatch): mock_batch = MagicMock() mock_core = MagicMock() @@ -299,19 +338,24 @@ def test_create_finalizer_job(monkeypatch): monkeypatch.setattr(k8s.client, "BatchV1Api", lambda: mock_batch) monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) - name = k8s._create_finalizer_job(job_name="job-final", namespace="ns", pvc_name="pvc", mount_path="/data") + name = k8s._create_finalizer_job( + job_name="job-final", namespace="ns", pvc_name="pvc", mount_path="/data" + ) assert name == "final-pod" assert mock_batch.create_namespaced_job.call_count == 1 ### _create_namespaced_job + def test_create_namespaced_job_deletes_old_pod_and_creates_new(monkeypatch): mock_batch = MagicMock() mock_core = MagicMock() # Existing job with same name to be deleted - mock_batch.list_namespaced_job.return_value = types.SimpleNamespace(items=[types.SimpleNamespace(metadata=types.SimpleNamespace(name="job-1"))]) + mock_batch.list_namespaced_job.return_value = types.SimpleNamespace( + items=[types.SimpleNamespace(metadata=types.SimpleNamespace(name="job-1"))] + ) # Existing pod to be deleted old_pod = _mk_pod(name="old-pod") @@ -321,7 +365,7 @@ def test_create_namespaced_job_deletes_old_pod_and_creates_new(monkeypatch): new_pod = _mk_pod(name="new-pod") mock_core.list_namespaced_pod.side_effect = [ types.SimpleNamespace(items=[old_pod]), # for deletion scan - types.SimpleNamespace(items=[]), # first poll + types.SimpleNamespace(items=[]), # first poll types.SimpleNamespace(items=[new_pod]), # second poll -> found ] @@ -348,6 +392,7 @@ def test_create_namespaced_job_deletes_old_pod_and_creates_new(monkeypatch): ### run_job (two branches) + def test_run_job_year_2024_flow(monkeypatch, tmp_tree, tmp_path, dummy_algorithm): algo = dummy_algorithm(year=2024, with_additional=True) @@ -359,7 +404,11 @@ def test_run_job_year_2024_flow(monkeypatch, tmp_tree, tmp_path, dummy_algorithm monkeypatch.setattr(k8s, "_handle_device_requests", lambda **kw: []) monkeypatch.setattr(k8s, "_get_container_user", lambda **kw: None) monkeypatch.setattr(k8s, "_get_parameters_arg", lambda algorithm=None: "") - monkeypatch.setattr(k8s, "_get_zenodo_metadata_and_archive_url", lambda record_id: ({"version": "1"}, "http://example/zip")) + monkeypatch.setattr( + k8s, + "_get_zenodo_metadata_and_archive_url", + lambda record_id: ({"version": "1"}, "http://example/zip"), + ) monkeypatch.setattr(k8s, "PARAMETERS_DIR", tmp_tree / "params") (tmp_tree / "params").mkdir(exist_ok=True) monkeypatch.setattr(k8s, "get_dummy_path", lambda: Path("/dummy")) @@ -371,19 +420,26 @@ def test_run_job_year_2024_flow(monkeypatch, tmp_tree, tmp_path, dummy_algorithm # Job + pod lifecycle def fake_create_job(**kwargs): return "job-pod" - monkeypatch.setattr(k8s, "_create_namespaced_job", lambda **kw: fake_create_job(**kw)) + + monkeypatch.setattr( + k8s, "_create_namespaced_job", lambda **kw: fake_create_job(**kw) + ) # CoreV1Api read_namespaced_pod progression mock_core = MagicMock() # First loop waits until init container running - mock_core.read_namespaced_pod.return_value = _mk_pod(name="job-pod", phase="Running", init_running=True) + mock_core.read_namespaced_pod.return_value = _mk_pod( + name="job-pod", phase="Running", init_running=True + ) monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) # File checking monkeypatch.setattr(k8s, "_check_files_in_pod", lambda **kw: None) # Additional files download and parameters upload - monkeypatch.setattr(k8s, "_download_additional_files", lambda **kw: Path("/data/12345_v1")) + monkeypatch.setattr( + k8s, "_download_additional_files", lambda **kw: Path("/data/12345_v1") + ) monkeypatch.setattr(k8s, "_upload_files_to_pod", lambda **kw: None) # Exec commands in init container @@ -395,9 +451,12 @@ def fake_create_job(**kwargs): # Pod completes def read_pod_done(name, namespace): return _mk_pod(name=name, phase="Succeeded", init_running=True) + mock_core.read_namespaced_pod.side_effect = [ _mk_pod(name="job-pod", phase="Running", init_running=True), # wait loop - _mk_pod(name="job-pod", phase="Succeeded", init_running=True), # completion loop + _mk_pod( + name="job-pod", phase="Succeeded", init_running=True + ), # completion loop ] # Finalizer job @@ -434,9 +493,13 @@ def test_run_job_year_2025_flow(monkeypatch, tmp_tree, tmp_path, dummy_algorithm monkeypatch.setattr(k8s, "_sanity_check_output", lambda **kw: None) # PVCs created = [] + def fake_create_pvc(**kw): created.append(kw["pvc_name"]) - monkeypatch.setattr(k8s, "_create_namespaced_pvc", lambda **kw: fake_create_pvc(**kw)) + + monkeypatch.setattr( + k8s, "_create_namespaced_pvc", lambda **kw: fake_create_pvc(**kw) + ) # Job creation monkeypatch.setattr(k8s, "_create_namespaced_job", lambda **kw: "job-pod") @@ -473,4 +536,4 @@ def fake_create_pvc(**kw): # For year > 2024, an extra output PVC should be created assert "mypvc2" in created assert "mypvc2-output" in created - assert out_dir.exists() \ No newline at end of file + assert out_dir.exists() From a78638bb6f803546a7599514a11a463ceb528d43 Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 3 Nov 2025 20:21:25 +0000 Subject: [PATCH 12/31] Update dependencies in `pyproject.toml` and `poetry.lock` to include new packages: `cachetools`, `durationpy`, `google-auth`, `kubernetes`, `oauthlib`, `pyasn1`, `pyasn1-modules`, `requests-oauthlib`, `rsa`, and `websocket-client`. Adjust version constraints and add optional dependencies for improved functionality. --- poetry.lock | 220 +++++++++++++++++++++++++++++++++++++++++++++++-- pyproject.toml | 2 + 2 files changed, 217 insertions(+), 5 deletions(-) diff --git a/poetry.lock b/poetry.lock index 31bdbc2..5d65d83 100644 --- a/poetry.lock +++ b/poetry.lock @@ -209,6 +209,32 @@ itk-elastix = ["itk-elastix (>=0.20.0,<0.21.0)"] picsl-greedy = ["picsl_greedy (>=0.0.6,<0.0.7)"] synthstrip = ["nipreps-synthstrip (>=0.0.1,<0.0.2)"] +[[package]] +name = "cachetools" +version = "5.5.2" +description = "Extensible memoizing collections and decorators" +optional = false +python-versions = ">=3.7" +groups = ["main"] +markers = "python_version <= \"3.9\"" +files = [ + {file = "cachetools-5.5.2-py3-none-any.whl", hash = "sha256:d26a22bcc62eb95c3beabd9f1ee5e820d3d2704fe2967cbe350e20c8ffcd3f0a"}, + {file = "cachetools-5.5.2.tar.gz", hash = "sha256:1a661caa9175d26759571b2e19580f9d6393969e5dfca11fdb1f947a23e640d4"}, +] + +[[package]] +name = "cachetools" +version = "6.2.1" +description = "Extensible memoizing collections and decorators" +optional = false +python-versions = ">=3.9" +groups = ["main"] +markers = "python_version >= \"3.10\"" +files = [ + {file = "cachetools-6.2.1-py3-none-any.whl", hash = "sha256:09868944b6dde876dfd44e1d47e18484541eaf12f26f29b7af91b26cc892d701"}, + {file = "cachetools-6.2.1.tar.gz", hash = "sha256:3f391e4bd8f8bf0931169baf7456cc822705f4e2a31f840d218f445b9a854201"}, +] + [[package]] name = "certifi" version = "2024.7.4" @@ -676,6 +702,18 @@ files = [ {file = "docutils-0.20.1.tar.gz", hash = "sha256:f08a4e276c3a1583a86dce3e34aba3fe04d02bba2dd51ed16106244e8a923e3b"}, ] +[[package]] +name = "durationpy" +version = "0.10" +description = "Module for converting between datetime.timedelta and Go's Duration strings." +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "durationpy-0.10-py3-none-any.whl", hash = "sha256:3b41e1b601234296b4fb368338fdcd3e13e0b4fb5b67345948f4f2bf9868b286"}, + {file = "durationpy-0.10.tar.gz", hash = "sha256:1fa6893409a6e739c9c72334fc65cca1f355dbdd93405d30f726deb5bde42fba"}, +] + [[package]] name = "exceptiongroup" version = "1.2.2" @@ -863,6 +901,33 @@ pygments = ">=2.7" sphinx = ">=6.0,<9.0" sphinx-basic-ng = ">=1.0.0.beta2" +[[package]] +name = "google-auth" +version = "2.42.1" +description = "Google Authentication Library" +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "google_auth-2.42.1-py2.py3-none-any.whl", hash = "sha256:eb73d71c91fc95dbd221a2eb87477c278a355e7367a35c0d84e6b0e5f9b4ad11"}, + {file = "google_auth-2.42.1.tar.gz", hash = "sha256:30178b7a21aa50bffbdc1ffcb34ff770a2f65c712170ecd5446c4bef4dc2b94e"}, +] + +[package.dependencies] +cachetools = ">=2.0.0,<7.0" +pyasn1-modules = ">=0.2.1" +rsa = ">=3.1.4,<5" + +[package.extras] +aiohttp = ["aiohttp (>=3.6.2,<4.0.0)", "requests (>=2.20.0,<3.0.0)"] +enterprise-cert = ["cryptography", "pyopenssl"] +pyjwt = ["cryptography (<39.0.0) ; python_version < \"3.8\"", "cryptography (>=38.0.3)", "pyjwt (>=2.0)"] +pyopenssl = ["cryptography (<39.0.0) ; python_version < \"3.8\"", "cryptography (>=38.0.3)", "pyopenssl (>=20.0.0)"] +reauth = ["pyu2f (>=0.1.5)"] +requests = ["requests (>=2.20.0,<3.0.0)"] +testing = ["aiohttp (<3.10.0)", "aiohttp (>=3.6.2,<4.0.0)", "aioresponses", "cryptography (<39.0.0) ; python_version < \"3.8\"", "cryptography (<39.0.0) ; python_version < \"3.8\"", "cryptography (>=38.0.3)", "cryptography (>=38.0.3)", "flask", "freezegun", "grpcio", "mock", "oauth2client", "packaging", "pyjwt (>=2.0)", "pyopenssl (<24.3.0)", "pyopenssl (>=20.0.0)", "pytest", "pytest-asyncio", "pytest-cov", "pytest-localserver", "pyu2f (>=0.1.5)", "requests (>=2.20.0,<3.0.0)", "responses", "urllib3"] +urllib3 = ["packaging", "urllib3"] + [[package]] name = "idna" version = "3.8" @@ -1123,6 +1188,33 @@ files = [ {file = "kiwisolver-1.4.9.tar.gz", hash = "sha256:c3b22c26c6fd6811b0ae8363b95ca8ce4ea3c202d3d0975b2914310ceb1bcc4d"}, ] +[[package]] +name = "kubernetes" +version = "34.1.0" +description = "Kubernetes python client" +optional = false +python-versions = ">=3.6" +groups = ["main"] +files = [ + {file = "kubernetes-34.1.0-py2.py3-none-any.whl", hash = "sha256:bffba2272534e224e6a7a74d582deb0b545b7c9879d2cd9e4aae9481d1f2cc2a"}, + {file = "kubernetes-34.1.0.tar.gz", hash = "sha256:8fe8edb0b5d290a2f3ac06596b23f87c658977d46b5f8df9d0f4ea83d0003912"}, +] + +[package.dependencies] +certifi = ">=14.05.14" +durationpy = ">=0.7" +google-auth = ">=1.0.1" +python-dateutil = ">=2.5.3" +pyyaml = ">=5.4.1" +requests = "*" +requests-oauthlib = "*" +six = ">=1.9.0" +urllib3 = ">=1.24.2,<2.4.0" +websocket-client = ">=0.32.0,<0.40.0 || >0.40.0,<0.41.dev0 || >=0.43.dev0" + +[package.extras] +adal = ["adal (>=1.0.2)"] + [[package]] name = "lazy-loader" version = "0.4" @@ -2032,6 +2124,23 @@ files = [ {file = "nvidia_nvtx_cu12-12.8.90-py3-none-win_amd64.whl", hash = "sha256:619c8304aedc69f02ea82dd244541a83c3d9d40993381b3b590f1adaed3db41e"}, ] +[[package]] +name = "oauthlib" +version = "3.3.1" +description = "A generic, spec-compliant, thorough implementation of the OAuth request-signing logic" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "oauthlib-3.3.1-py3-none-any.whl", hash = "sha256:88119c938d2b8fb88561af5f6ee0eec8cc8d552b7bb1f712743136eb7523b7a1"}, + {file = "oauthlib-3.3.1.tar.gz", hash = "sha256:0f0f8aa759826a193cf66c12ea1af1637f87b9b4622d46e866952bb022e538c9"}, +] + +[package.extras] +rsa = ["cryptography (>=3.0.0)"] +signals = ["blinker (>=1.4.0)"] +signedtoken = ["cryptography (>=3.0.0)", "pyjwt (>=2.0.0,<3)"] + [[package]] name = "packaging" version = "24.1" @@ -2311,6 +2420,33 @@ files = [ dev = ["pre-commit", "tox"] testing = ["pytest", "pytest-benchmark"] +[[package]] +name = "pyasn1" +version = "0.6.1" +description = "Pure-Python implementation of ASN.1 types and DER/BER/CER codecs (X.208)" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "pyasn1-0.6.1-py3-none-any.whl", hash = "sha256:0d632f46f2ba09143da3a8afe9e33fb6f92fa2320ab7e886e2d0f7672af84629"}, + {file = "pyasn1-0.6.1.tar.gz", hash = "sha256:6f580d2bdd84365380830acf45550f2511469f673cb4a5ae3857a3170128b034"}, +] + +[[package]] +name = "pyasn1-modules" +version = "0.4.2" +description = "A collection of ASN.1-based protocols modules" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "pyasn1_modules-0.4.2-py3-none-any.whl", hash = "sha256:29253a9207ce32b64c3ac6600edc75368f98473906e8fd1043bd6b5b1de2c14a"}, + {file = "pyasn1_modules-0.4.2.tar.gz", hash = "sha256:677091de870a80aae844b1ca6134f54652fa2c8c5a52aa396440ac3106e941e6"}, +] + +[package.dependencies] +pyasn1 = ">=0.6.1,<0.7.0" + [[package]] name = "pycodestyle" version = "2.9.1" @@ -2412,10 +2548,9 @@ testing = ["fields", "hunter", "process-tests", "pytest-xdist", "virtualenv"] name = "python-dateutil" version = "2.9.0.post0" description = "Extensions to the standard Python datetime module" -optional = true +optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" groups = ["main"] -markers = "python_version >= \"3.10\" and extra == \"preprocessing\"" files = [ {file = "python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3"}, {file = "python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427"}, @@ -2553,6 +2688,25 @@ urllib3 = ">=1.21.1,<3" socks = ["PySocks (>=1.5.6,!=1.5.7)"] use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] +[[package]] +name = "requests-oauthlib" +version = "2.0.0" +description = "OAuthlib authentication support for Requests." +optional = false +python-versions = ">=3.4" +groups = ["main"] +files = [ + {file = "requests-oauthlib-2.0.0.tar.gz", hash = "sha256:b3dffaebd884d8cd778494369603a9e7b58d29111bf6b41bdc2dcd87203af4e9"}, + {file = "requests_oauthlib-2.0.0-py2.py3-none-any.whl", hash = "sha256:7dd8a5c40426b779b0868c404bdef9768deccf22749cde15852df527e6269b36"}, +] + +[package.dependencies] +oauthlib = ">=3.0.0" +requests = ">=2.0.0" + +[package.extras] +rsa = ["oauthlib[signedtoken] (>=3.0.0)"] + [[package]] name = "rich" version = "13.8.0" @@ -2573,6 +2727,21 @@ typing-extensions = {version = ">=4.0.0,<5.0", markers = "python_version < \"3.9 [package.extras] jupyter = ["ipywidgets (>=7.5.1,<9)"] +[[package]] +name = "rsa" +version = "4.9.1" +description = "Pure-Python RSA implementation" +optional = false +python-versions = "<4,>=3.6" +groups = ["main"] +files = [ + {file = "rsa-4.9.1-py3-none-any.whl", hash = "sha256:68635866661c6836b8d39430f97a996acbd61bfa49406748ea243539fe239762"}, + {file = "rsa-4.9.1.tar.gz", hash = "sha256:e7bdbfdb5497da4c07dfd35530e1a902659db6ff241e39d9953cad06ebd0ae75"}, +] + +[package.dependencies] +pyasn1 = ">=0.1.3" + [[package]] name = "scikit-image" version = "0.25.2" @@ -2891,10 +3060,9 @@ files = [ name = "six" version = "1.17.0" description = "Python 2 and 3 compatibility utilities" -optional = true +optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" groups = ["main"] -markers = "python_version >= \"3.10\" and extra == \"preprocessing\"" files = [ {file = "six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274"}, {file = "six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81"}, @@ -3137,6 +3305,12 @@ files = [ {file = "statsmodels-0.14.5-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5a085d47c8ef5387279a991633883d0e700de2b0acc812d7032d165888627bef"}, {file = "statsmodels-0.14.5-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:9f866b2ebb2904b47c342d00def83c526ef2eb1df6a9a3c94ba5fe63d0005aec"}, {file = "statsmodels-0.14.5-cp313-cp313-win_amd64.whl", hash = "sha256:2a06bca03b7a492f88c8106103ab75f1a5ced25de90103a89f3a287518017939"}, + {file = "statsmodels-0.14.5-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:07c4dad25bbb15864a31b4917a820f6d104bdc24e5ddadcda59027390c3bed9e"}, + {file = "statsmodels-0.14.5-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:babb067c852e966c2c933b79dbb5d0240919d861941a2ef6c0e13321c255528d"}, + {file = "statsmodels-0.14.5-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:110194b137286173cc676d7bad0119a197778de6478fc6cbdc3b33571165ac1e"}, + {file = "statsmodels-0.14.5-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9c8a9c384a60c80731b278e7fd18764364c8817f4995b13a175d636f967823d1"}, + {file = "statsmodels-0.14.5-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:557df3a870a57248df744fdfcc444ecbc5bdbf1c042b8a8b5d8e3e797830dc2a"}, + {file = "statsmodels-0.14.5-cp314-cp314-win_amd64.whl", hash = "sha256:95af7a9c4689d514f4341478b891f867766f3da297f514b8c4adf08f4fa61d03"}, {file = "statsmodels-0.14.5-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:b23b8f646dd78ef5e8d775d879208f8dc0a73418b41c16acac37361ff9ab7738"}, {file = "statsmodels-0.14.5-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:4e5e26b21d2920905764fb0860957d08b5ba2fae4466ef41b1f7c53ecf9fc7fa"}, {file = "statsmodels-0.14.5-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:4a060c7e0841c549c8ce2825fd6687e6757e305d9c11c9a73f6c5a0ce849bb69"}, @@ -3548,6 +3722,42 @@ files = [ {file = "webcolors-24.11.1.tar.gz", hash = "sha256:ecb3d768f32202af770477b8b65f318fa4f566c22948673a977b00d589dd80f6"}, ] +[[package]] +name = "websocket-client" +version = "1.8.0" +description = "WebSocket client for Python with low level API options" +optional = false +python-versions = ">=3.8" +groups = ["main"] +markers = "python_version <= \"3.9\"" +files = [ + {file = "websocket_client-1.8.0-py3-none-any.whl", hash = "sha256:17b44cc997f5c498e809b22cdf2d9c7a9e71c02c8cc2b6c56e7c2d1239bfa526"}, + {file = "websocket_client-1.8.0.tar.gz", hash = "sha256:3239df9f44da632f96012472805d40a23281a991027ce11d2f45a6f24ac4c3da"}, +] + +[package.extras] +docs = ["Sphinx (>=6.0)", "myst-parser (>=2.0.0)", "sphinx-rtd-theme (>=1.1.0)"] +optional = ["python-socks", "wsaccel"] +test = ["websockets"] + +[[package]] +name = "websocket-client" +version = "1.9.0" +description = "WebSocket client for Python with low level API options" +optional = false +python-versions = ">=3.9" +groups = ["main"] +markers = "python_version >= \"3.10\"" +files = [ + {file = "websocket_client-1.9.0-py3-none-any.whl", hash = "sha256:af248a825037ef591efbf6ed20cc5faa03d3b47b9e5a2230a529eeee1c1fc3ef"}, + {file = "websocket_client-1.9.0.tar.gz", hash = "sha256:9e813624b6eb619999a97dc7958469217c3176312b3a16a4bd1bc7e08a46ec98"}, +] + +[package.extras] +docs = ["Sphinx (>=6.0)", "myst-parser (>=2.0.0)", "sphinx_rtd_theme (>=1.1.0)"] +optional = ["python-socks", "wsaccel"] +test = ["pytest", "websockets"] + [[package]] name = "win32-setctime" version = "1.1.0" @@ -3591,4 +3801,4 @@ preprocessing = ["brainles_preprocessing"] [metadata] lock-version = "2.1" python-versions = ">=3.8,<4.0" -content-hash = "8406391fff7ba2f2a396a9e0aac3e19348f3e8e9bdc6d6a3a576f77a9098a1dc" +content-hash = "7e5c48c393f1cc64d83005d4b891dbdf59dd6214a7577aafd9fe66d94f5b2e85" diff --git a/pyproject.toml b/pyproject.toml index c4f9929..f23e29d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ dependencies = [ "nibabel>=5.0.0", "numpy>=1.21.0; python_version<='3.9'", "numpy>=1.26.0; python_version>='3.10'", + "kubernetes (>=34.1.0,<35.0.0)", ] [project.optional-dependencies] @@ -64,6 +65,7 @@ flake8 = ">=5.0.0" [tool.pytest.ini_options] pythonpath = ["."] + [tool.poetry.group.docs] optional = true From 3527cf19e37e72828b2f8412d8d8ab82a62cf2cd Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 3 Nov 2025 20:38:58 +0000 Subject: [PATCH 13/31] Refactor Kubernetes integration: remove unused imports, adjust output path handling based on algorithm year, and streamline command execution logging. --- brats/core/kubernetes.py | 11 +++++------ tests/core/test_kubernetes.py | 3 +-- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index 83da5bc..2217044 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -8,7 +8,6 @@ import base64 from loguru import logger from kubernetes import client, config -from kubernetes.config import load_kube_config from kubernetes.stream import stream from brats.constants import PARAMETERS_DIR from brats.utils.algorithm_config import AlgorithmData @@ -399,7 +398,6 @@ def _create_namespaced_pvc( storage_class_name=storage_class, ) - core_v1_api = client.CoreV1Api() core_v1_api.create_namespaced_persistent_volume_claim( namespace=namespace, body=client.V1PersistentVolumeClaim( @@ -692,10 +690,11 @@ def run_job( user = _get_container_user(algorithm=algorithm) logger.debug(f"Container user: {user if user else 'root (required by algorithm)'}") - output_mount_path = Path(data_mount_path).joinpath("output") if algorithm.meta.year > 2024: data_mount_path = "/input" output_mount_path = "/output" + else: + output_mount_path = Path(data_mount_path).joinpath("output") if algorithm.meta.year <= 2024: if algorithm.additional_files is not None: @@ -813,14 +812,14 @@ def run_job( parent_dir="parameters", ) commands = ["tree", data_mount_path] - output = _execute_command_in_pod( + _execute_command_in_pod( pod_name=pod_name, namespace=namespace, command=commands, container="init-container", ) commands = ["touch", "/etc/content_verified"] - output = _execute_command_in_pod( + _execute_command_in_pod( pod_name=pod_name, namespace=namespace, command=commands, @@ -829,7 +828,7 @@ def run_job( output_path.mkdir(parents=True, exist_ok=True) - logger.info(f"{'Starting inference'}") + logger.info("Starting inference") start_time = time.time() time.sleep(2) diff --git a/tests/core/test_kubernetes.py b/tests/core/test_kubernetes.py index d5bcc82..8ad365a 100644 --- a/tests/core/test_kubernetes.py +++ b/tests/core/test_kubernetes.py @@ -2,10 +2,9 @@ import tarfile import base64 import types -import time from pathlib import Path from datetime import datetime, timezone -from unittest.mock import MagicMock, patch, call +from unittest.mock import MagicMock import pytest From a7c97617dff6967a930e31ed60ce3e9ea8f87f3c Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 3 Nov 2025 20:46:00 +0000 Subject: [PATCH 14/31] Update `pyproject.toml` to correct dependency format for `kubernetes` and modify type hints in `kubernetes.py` for clarity. Enhance logging in `_download_folder_from_pod` and add TODO comments for future security context implementation. Remove commented-out code in `run_job` and adjust test cases in `test_kubernetes.py` to reflect changes. --- brats/core/kubernetes.py | 18 +++++++++--------- pyproject.toml | 2 +- tests/core/test_kubernetes.py | 3 --- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index 2217044..6b9d139 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -39,9 +39,9 @@ def _build_command_args( Args: algorithm (AlgorithmData): The algorithm data - additional_files_path (Path): The path to the additional files - data_path (Path): The path to the input data - output_path (Path): The path to save the output + additional_files_path (str): The path to the additional files + data_path (str): The path to the input data + output_path (str): The path to save the output mount_path (str): The path to mount the PVC to. Defaults to "/data". Returns: command_args: The command arguments @@ -260,7 +260,6 @@ def _check_files_in_pod( def _download_folder_from_pod( pod_name, namespace, container, remote_paths, local_base_dir=Path(".").absolute() ): - v1 = client.CoreV1Api() for path in remote_paths: folder_name = str(path) @@ -289,7 +288,7 @@ def _download_folder_from_pod( if resp.peek_stderr(): err = resp.read_stderr() if err: - print("STDERR:", err) + logger.error(f"STDERR: {err}") resp.close() full_base64 = b"".join(base64_chunks) @@ -539,8 +538,8 @@ def _create_namespaced_job( f"Failed to delete pod '{pod_name_to_delete}' in namespace '{namespace}': {e}" ) - # user_id = int(user.split(":")[0]) if user else 0 - # group_id = int(user.split(":")[1]) if user else 0 + # user_id = int(user.split(":")[0]) if user else 0 # TODO: Implement security_context for container if/when user/group IDs are required. + # group_id = int(user.split(":")[1]) if user else 0 # TODO: Implement security_context for container if/when user/group IDs are required. volume_mounts = [] for pvc_name, mount_path in pv_mounts.items(): volume_mounts.append(client.V1VolumeMount(name=pvc_name, mount_path=mount_path)) @@ -549,6 +548,7 @@ def _create_namespaced_job( image=image, volume_mounts=volume_mounts, # security_context=client.V1SecurityContext(run_as_user=user_id, run_as_group=group_id) + # TODO: Implement security_context for container if/when user/group IDs are required. ) if len(device_requests) > 0: container_spec.resources = client.V1ResourceRequirements( @@ -798,7 +798,7 @@ def run_job( ) if algorithm.meta.year <= 2024: - additional_files_path = _download_additional_files( + _download_additional_files( algorithm=algorithm, pod_name=pod_name, namespace=namespace, @@ -869,7 +869,7 @@ def run_job( ) commands = ["touch", "/etc/content_verified"] - output = _execute_command_in_pod( + _execute_command_in_pod( pod_name=pod_name_finalizer, namespace=namespace, command=commands, diff --git a/pyproject.toml b/pyproject.toml index f23e29d..2bed7f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,7 +45,7 @@ dependencies = [ "nibabel>=5.0.0", "numpy>=1.21.0; python_version<='3.9'", "numpy>=1.26.0; python_version>='3.10'", - "kubernetes (>=34.1.0,<35.0.0)", + "kubernetes>=34.1.0,<35.0.0", ] [project.optional-dependencies] diff --git a/tests/core/test_kubernetes.py b/tests/core/test_kubernetes.py index 8ad365a..31edd68 100644 --- a/tests/core/test_kubernetes.py +++ b/tests/core/test_kubernetes.py @@ -447,9 +447,6 @@ def fake_create_job(**kwargs): # Observe logs monkeypatch.setattr(k8s, "_observe_job_output", lambda **kw: "LOGS") - # Pod completes - def read_pod_done(name, namespace): - return _mk_pod(name=name, phase="Succeeded", init_running=True) mock_core.read_namespaced_pod.side_effect = [ _mk_pod(name="job-pod", phase="Running", init_running=True), # wait loop From 27950c48e2b60d8cc8694cfaade3b29f684b3e4d Mon Sep 17 00:00:00 2001 From: "brainless-bot[bot]" <153751247+brainless-bot[bot]@users.noreply.github.com> Date: Mon, 3 Nov 2025 20:47:53 +0000 Subject: [PATCH 15/31] Autoformat with black --- tests/core/test_kubernetes.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/core/test_kubernetes.py b/tests/core/test_kubernetes.py index 31edd68..a0b853c 100644 --- a/tests/core/test_kubernetes.py +++ b/tests/core/test_kubernetes.py @@ -447,7 +447,6 @@ def fake_create_job(**kwargs): # Observe logs monkeypatch.setattr(k8s, "_observe_job_output", lambda **kw: "LOGS") - mock_core.read_namespaced_pod.side_effect = [ _mk_pod(name="job-pod", phase="Running", init_running=True), # wait loop _mk_pod( From 2948475bb67a0f29b9e4eedbd048386f2638ead3 Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 3 Nov 2025 20:49:11 +0000 Subject: [PATCH 16/31] Enhance `_download_folder_from_pod` function with detailed docstring, specifying parameters and their types for improved clarity and usability. --- brats/core/kubernetes.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index 6b9d139..feb8bbc 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -258,8 +258,16 @@ def _check_files_in_pod( def _download_folder_from_pod( - pod_name, namespace, container, remote_paths, local_base_dir=Path(".").absolute() + pod_name: str, namespace: str, container: str, remote_paths: List[Path], local_base_dir: Path = Path(".").absolute() ): + """Download a folder from a pod to a local directory. + Args: + pod_name (str): The name of the pod to download the folder from + namespace (str): The namespace of the pod to download the folder from + container (str): The container to download the folder from + remote_paths (List[Path]): The paths to the remote folder to download + local_base_dir (Path): The base directory to download the folder to. Defaults to the current directory. + """ for path in remote_paths: folder_name = str(path) From 09fa7a2181260f6a63f7f8ff62f2332a05379d21 Mon Sep 17 00:00:00 2001 From: "brainless-bot[bot]" <153751247+brainless-bot[bot]@users.noreply.github.com> Date: Mon, 3 Nov 2025 20:50:27 +0000 Subject: [PATCH 17/31] Autoformat with black --- brats/core/kubernetes.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index feb8bbc..a2f8349 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -258,7 +258,11 @@ def _check_files_in_pod( def _download_folder_from_pod( - pod_name: str, namespace: str, container: str, remote_paths: List[Path], local_base_dir: Path = Path(".").absolute() + pod_name: str, + namespace: str, + container: str, + remote_paths: List[Path], + local_base_dir: Path = Path(".").absolute(), ): """Download a folder from a pod to a local directory. Args: From b1d42e4782b67b26259f5728753137f82e10b48a Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 3 Nov 2025 20:51:49 +0000 Subject: [PATCH 18/31] Update content hash in `poetry.lock` to reflect changes in dependencies. --- poetry.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poetry.lock b/poetry.lock index 5d65d83..67ffbf1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3801,4 +3801,4 @@ preprocessing = ["brainles_preprocessing"] [metadata] lock-version = "2.1" python-versions = ">=3.8,<4.0" -content-hash = "7e5c48c393f1cc64d83005d4b891dbdf59dd6214a7577aafd9fe66d94f5b2e85" +content-hash = "4cd1580101551d78abe4a19fc29c310f10e74a1723025a49f85c4a15e3d9a4c1" From 978ca74648069beca424e9c968053bc86d889a1c Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 10 Nov 2025 08:47:00 +0000 Subject: [PATCH 19/31] Refactor Kubernetes job resource allocation and mount paths Updated GPU resource allocation to dynamically set the count based on device requests. Adjusted volume claim names and mount paths for input and output based on the algorithm's year, ensuring compatibility with future versions. --- brats/core/kubernetes.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index a2f8349..b580b0a 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -563,16 +563,18 @@ def _create_namespaced_job( # TODO: Implement security_context for container if/when user/group IDs are required. ) if len(device_requests) > 0: + gpu_count = len(device_requests) container_spec.resources = client.V1ResourceRequirements( - requests={"nvidia.com/gpu": 1}, limits={"nvidia.com/gpu": 1} + requests={"nvidia.com/gpu": gpu_count}, limits={"nvidia.com/gpu": gpu_count} ) volumes = [ client.V1Volume( name=pvc_name, persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( - claim_name=pvc_name + claim_name=pvc ), ) + for pvc in pv_mounts.keys() ] if shm_size is not None: shm_size_formatted = shm_size.replace("gb", "Gi") @@ -703,7 +705,7 @@ def run_job( logger.debug(f"Container user: {user if user else 'root (required by algorithm)'}") if algorithm.meta.year > 2024: - data_mount_path = "/input" + input_mount_path = "/input" output_mount_path = "/output" else: output_mount_path = Path(data_mount_path).joinpath("output") @@ -746,7 +748,7 @@ def run_job( ) pv_mounts = { - pvc_name: data_mount_path, + pvc_name: input_mount_path, pvc_name + "-output": output_mount_path, } @@ -803,7 +805,7 @@ def run_job( pod_name=pod_name, namespace=namespace, paths=[Path(data_path)], - mount_path=data_mount_path, + mount_path=data_mount_path if algorithm.meta.year <= 2024 else input_mount_path, ) logger.debug( f"Files checked successfully in pod '{pod_name}' in namespace '{namespace}'." @@ -823,7 +825,7 @@ def run_job( mount_path=data_mount_path, parent_dir="parameters", ) - commands = ["tree", data_mount_path] + commands = ["tree", data_mount_path if algorithm.meta.year <= 2024 else input_mount_path] _execute_command_in_pod( pod_name=pod_name, namespace=namespace, @@ -860,7 +862,7 @@ def run_job( ) pvc_name_output = pvc_name - mount_path = str(data_mount_path) + mount_path = str(data_mount_path if algorithm.meta.year <= 2024 else input_mount_path) if algorithm.meta.year > 2024: pvc_name_output = pvc_name + "-output" mount_path = str(output_mount_path) From 0f5d10fae03f58176866a15a37f2cd70032dbd2d Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 10 Nov 2025 08:48:13 +0000 Subject: [PATCH 20/31] Refactor variable names for clarity in Kubernetes job volume mounts Updated variable names in the volume mount loop for better readability, changing `pvc_name` to `pvc_mount_name` and `mount_path` to `pvc_mount_path`. --- brats/core/kubernetes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index b580b0a..1d5cde7 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -553,8 +553,8 @@ def _create_namespaced_job( # user_id = int(user.split(":")[0]) if user else 0 # TODO: Implement security_context for container if/when user/group IDs are required. # group_id = int(user.split(":")[1]) if user else 0 # TODO: Implement security_context for container if/when user/group IDs are required. volume_mounts = [] - for pvc_name, mount_path in pv_mounts.items(): - volume_mounts.append(client.V1VolumeMount(name=pvc_name, mount_path=mount_path)) + for pvc_mount_name, pvc_mount_path in pv_mounts.items(): + volume_mounts.append(client.V1VolumeMount(name=pvc_mount_name, mount_path=pvc_mount_path)) container_spec = client.V1Container( name="job-container", image=image, From 4143fe40b7e8eb95d399a766444de701060c92fc Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 10 Nov 2025 08:49:48 +0000 Subject: [PATCH 21/31] Update Kubernetes backend validation in BraTSAlgorithm Refactored the condition for validating Kubernetes kwargs to ensure they are only used with the Kubernetes backend, improving error messaging for clarity. --- brats/core/brats_algorithm.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/brats/core/brats_algorithm.py b/brats/core/brats_algorithm.py index e7bdf4c..87defcc 100644 --- a/brats/core/brats_algorithm.py +++ b/brats/core/brats_algorithm.py @@ -212,9 +212,9 @@ def _infer_single( ) if kubernetes_kwargs is not None: logger.debug(f"Adding Kubernetes kwargs: {kubernetes_kwargs}") - if runner is not run_kubernetes_job: + if backend is not Backends.KUBERNETES: raise ValueError( - "Kubernetes kwargs can only be used with the Kubernetes backend (run_kubernetes_job)." + "Kubernetes kwargs can only be used with the Kubernetes backend." ) for key, value in kubernetes_kwargs.items(): runner_kwargs[key] = value From 0a26019fd31c43fd01f8cfb78dd2fba439824ad9 Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 10 Nov 2025 08:53:01 +0000 Subject: [PATCH 22/31] Refactor command argument return type and improve pod selection logic in Kubernetes job Changed the return type of the _build_command_args function from List[str] to str for better clarity. Updated the pod selection logic to ensure the latest created pod is chosen when multiple pods are present, enhancing the overall functionality of the Kubernetes job handling. --- brats/core/kubernetes.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index 1d5cde7..0bd162d 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -34,7 +34,7 @@ def _build_command_args( data_path: str, output_path: str, mount_path: str = "/data", -) -> List[str]: +) -> str: """Build the command arguments for the Kubernetes job. Args: @@ -483,8 +483,7 @@ def _create_finalizer_job( namespace=namespace, label_selector=label_selector ) if pod_list.items: - # If more than one pod, pick the first (common for most K8s jobs) - # Pick the latest created pod (by creation timestamp) + # If more than one pod, pick the latest created pod (by creation timestamp) latest_pod = max( pod_list.items, key=lambda pod: pod.metadata.creation_timestamp ) From c1eea9bb54370a4db490de03968bdb44d6483674 Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 10 Nov 2025 08:55:11 +0000 Subject: [PATCH 23/31] Enhance Kubernetes execution instructions in README and update device request type in Kubernetes job Updated the README to clarify that the orchestrator uses the default kubeconfig location and provided instructions for setting the KUBECONFIG environment variable. Changed the device_requests parameter type in the _create_namespaced_job function from List[client.V1DeviceRequest] to List[docker.types.DeviceRequest] for better compatibility with Docker integration. --- README.md | 2 +- brats/core/kubernetes.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 8faeec5..0d44969 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ segmenter.infer_single( ## Kubernetes Support BraTS orchestrator also supports Kubernetes to run the algorithms remotely, as an alternative to local execution with Docker or Singularity. -To use Kubernetes for execution, ensure that the `KUBECONFIG` environment variable is set to the location of your kubeconfig file, as is standard when interacting with a Kubernetes cluster. +To use Kubernetes for execution, the orchestrator will automatically use your kubeconfig file from the default location (`~/.kube/config`). If your kubeconfig file is not in the default location, set the `KUBECONFIG` environment variable to the location of your kubeconfig file: ```bash export KUBECONFIG=/path/to/kubeconfig ``` diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index 0bd162d..5fae749 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -6,6 +6,7 @@ import random import string import base64 +import docker from loguru import logger from kubernetes import client, config from kubernetes.stream import stream @@ -502,7 +503,7 @@ def _create_namespaced_job( namespace: str, pvc_name: str, image: str, - device_requests: List[client.V1DeviceRequest], + device_requests: List[docker.types.DeviceRequest], pv_mounts: Dict[str, str], args: List[str] = None, shm_size: str = None, From 7c1e5df7a33b4a6b7b2a086acd06b85b580c8f00 Mon Sep 17 00:00:00 2001 From: "brainless-bot[bot]" <153751247+brainless-bot[bot]@users.noreply.github.com> Date: Mon, 10 Nov 2025 08:55:53 +0000 Subject: [PATCH 24/31] Autoformat with black --- brats/core/kubernetes.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index 5fae749..62348be 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -554,7 +554,9 @@ def _create_namespaced_job( # group_id = int(user.split(":")[1]) if user else 0 # TODO: Implement security_context for container if/when user/group IDs are required. volume_mounts = [] for pvc_mount_name, pvc_mount_path in pv_mounts.items(): - volume_mounts.append(client.V1VolumeMount(name=pvc_mount_name, mount_path=pvc_mount_path)) + volume_mounts.append( + client.V1VolumeMount(name=pvc_mount_name, mount_path=pvc_mount_path) + ) container_spec = client.V1Container( name="job-container", image=image, @@ -825,7 +827,10 @@ def run_job( mount_path=data_mount_path, parent_dir="parameters", ) - commands = ["tree", data_mount_path if algorithm.meta.year <= 2024 else input_mount_path] + commands = [ + "tree", + data_mount_path if algorithm.meta.year <= 2024 else input_mount_path, + ] _execute_command_in_pod( pod_name=pod_name, namespace=namespace, @@ -862,7 +867,9 @@ def run_job( ) pvc_name_output = pvc_name - mount_path = str(data_mount_path if algorithm.meta.year <= 2024 else input_mount_path) + mount_path = str( + data_mount_path if algorithm.meta.year <= 2024 else input_mount_path + ) if algorithm.meta.year > 2024: pvc_name_output = pvc_name + "-output" mount_path = str(output_mount_path) From 8f457e7e543f7ee9e5b906aaf20b0681758669af Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 10 Nov 2025 09:09:04 +0000 Subject: [PATCH 25/31] Update run_job function to accept Union types for data_path and output_path, enhancing flexibility. Improve documentation for argument types and clarify logging messages related to job pod completion. --- brats/core/kubernetes.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index 62348be..e3d6b01 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -2,7 +2,7 @@ import time from pathlib import Path -from typing import Dict, Optional +from typing import Dict, Optional, Union import random import string import base64 @@ -645,8 +645,8 @@ def _create_namespaced_job( def run_job( algorithm: AlgorithmData, - data_path: Path, - output_path: Path, + data_path: Union[Path, str], + output_path: Union[Path, str], cuda_devices: str, force_cpu: bool, internal_external_name_map: Optional[Dict[str, str]] = None, @@ -661,17 +661,17 @@ def run_job( Args: algorithm (AlgorithmData): The data of the algorithm to run - data_path (Path | str): The path to the input data - output_path (Path | str): The path to save the output + data_path (Union[Path, str]): The path to the input data + output_path (Union[Path, str]): The path to save the output cuda_devices (str): The CUDA devices to use force_cpu (bool): Whether to force CPU execution - internal_external_name_map (Dict[str, str]): Dictionary mapping internal name (in standardized format) to external subject name provided by user (only used for batch inference) - namespace (Optional[str], optional): The Kubernetes namespace to run the job in. Defaults to "default". - pvc_name (str): Name of the PersistentVolumeClaim (PVC) to use for this job. If the PVC does not already exist, it will be created; otherwise, it must already contain the input data required for running the algorithm. - pvc_storage_size (str): The size of the storage to request for the PVC. Defaults to "1Gi". - pvc_storage_class (str): The storage class to use for the PVC. If None, the default storage class will be used. - job_name (str): Name of the Job to create. If None, a random name will be generated. - data_mount_path (str): The path to mount the PVC to. Defaults to "/data". + internal_external_name_map (Optional[Dict[str, str]]): Dictionary mapping internal name (in standardized format) to external subject name provided by user (only used for batch inference) + namespace (Optional[str]): The Kubernetes namespace to run the job in. Defaults to "default". + pvc_name (Optional[str]): Name of the PersistentVolumeClaim (PVC) to use for this job. If the PVC does not already exist, it will be created; otherwise, it must already contain the input data required for running the algorithm. + pvc_storage_size (Optional[str]): The size of the storage to request for the PVC. Defaults to "1Gi". + pvc_storage_class (Optional[str]): The storage class to use for the PVC. If None, the default storage class will be used. + job_name (Optional[str]): Name of the Job to create. If None, a random name will be generated. + data_mount_path (Optional[str]): The path to mount the PVC to. Defaults to "/data". """ if pvc_name is None: pvc_name = ( @@ -858,12 +858,12 @@ def run_job( pod = core_v1_api.read_namespaced_pod(name=pod_name, namespace=namespace) pod_phase = pod.status.phase if pod_phase in ("Succeeded", "Failed"): - logger.info(f"Finalizer pod '{pod_name}' finished with phase: {pod_phase}") + logger.info(f"Job pod '{pod_name}' finished with phase: {pod_phase}") break time.sleep(2) else: raise RuntimeError( - f"Timed out waiting for finalizer pod '{pod_name}' to complete." + f"Timed out waiting for job pod '{pod_name}' to complete." ) pvc_name_output = pvc_name From 20eae49f5bc5d1955bc923af4f7506b1eaf4822a Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 10 Nov 2025 09:13:04 +0000 Subject: [PATCH 26/31] Refactor Kubernetes backend validation and improve logging messages Updated the condition for validating Kubernetes kwargs in the BraTSAlgorithm class to ensure they are only used with the Kubernetes backend. Enhanced the logging message for successful file uploads to indicate multiple files are uploaded. Additionally, refactored variable names in the volume mount loop for better clarity. --- brats/core/brats_algorithm.py | 4 ++-- brats/core/kubernetes.py | 15 +++++++-------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/brats/core/brats_algorithm.py b/brats/core/brats_algorithm.py index 87defcc..36dd003 100644 --- a/brats/core/brats_algorithm.py +++ b/brats/core/brats_algorithm.py @@ -271,9 +271,9 @@ def _infer_batch( ) if kubernetes_kwargs is not None: logger.debug(f"Adding Kubernetes kwargs: {kubernetes_kwargs}") - if runner is not run_kubernetes_job: + if backend is not Backends.KUBERNETES: raise ValueError( - "Kubernetes kwargs can only be used with the Kubernetes backend (run_kubernetes_job)." + "Kubernetes kwargs can only be used with the Kubernetes backend." ) for key, value in kubernetes_kwargs.items(): runner_kwargs[key] = value diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index e3d6b01..8201293 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -2,11 +2,13 @@ import time from pathlib import Path -from typing import Dict, Optional, Union +from typing import Dict, Optional, Union, List import random import string import base64 import docker +import io +import tarfile from loguru import logger from kubernetes import client, config from kubernetes.stream import stream @@ -19,9 +21,6 @@ _get_parameters_arg, _sanity_check_output, ) -import io -import tarfile -from typing import List from brats.utils.zenodo import ( _get_zenodo_metadata_and_archive_url, ZenodoException, @@ -373,7 +372,7 @@ def _upload_files_to_pod( resp.write_stdin(tar_stream.read()) resp.close() logger.info( - f"File uploaded successfully to pod '{pod_name}' in namespace '{namespace}'." + f"Files uploaded successfully to pod '{pod_name}' in namespace '{namespace}'." ) @@ -571,12 +570,12 @@ def _create_namespaced_job( ) volumes = [ client.V1Volume( - name=pvc_name, + name=pvc_mount_name, persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( - claim_name=pvc + claim_name=pvc_mount_name ), ) - for pvc in pv_mounts.keys() + for pvc_mount_name in pv_mounts.keys() ] if shm_size is not None: shm_size_formatted = shm_size.replace("gb", "Gi") From 915a5569e429ddb100ac443eef5ab84a96bdf604 Mon Sep 17 00:00:00 2001 From: "brainless-bot[bot]" <153751247+brainless-bot[bot]@users.noreply.github.com> Date: Mon, 10 Nov 2025 09:14:01 +0000 Subject: [PATCH 27/31] Autoformat with black --- brats/core/kubernetes.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index 8201293..1d89236 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -861,9 +861,7 @@ def run_job( break time.sleep(2) else: - raise RuntimeError( - f"Timed out waiting for job pod '{pod_name}' to complete." - ) + raise RuntimeError(f"Timed out waiting for job pod '{pod_name}' to complete.") pvc_name_output = pvc_name mount_path = str( From 379dacec5128e6e605a283e0f82859f9ca8ba29d Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 10 Nov 2025 09:46:58 +0000 Subject: [PATCH 28/31] Refactor backend validation and path handling in BraTSAlgorithm and run_job Updated the condition for validating Kubernetes kwargs in the BraTSAlgorithm class to use '!=' for clarity. Modified the run_job function to ensure output_path and data_path are wrapped in Path objects for consistent path handling. --- brats/core/brats_algorithm.py | 4 ++-- brats/core/kubernetes.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/brats/core/brats_algorithm.py b/brats/core/brats_algorithm.py index 36dd003..1e3bbc8 100644 --- a/brats/core/brats_algorithm.py +++ b/brats/core/brats_algorithm.py @@ -212,7 +212,7 @@ def _infer_single( ) if kubernetes_kwargs is not None: logger.debug(f"Adding Kubernetes kwargs: {kubernetes_kwargs}") - if backend is not Backends.KUBERNETES: + if backend != Backends.KUBERNETES: raise ValueError( "Kubernetes kwargs can only be used with the Kubernetes backend." ) @@ -271,7 +271,7 @@ def _infer_batch( ) if kubernetes_kwargs is not None: logger.debug(f"Adding Kubernetes kwargs: {kubernetes_kwargs}") - if backend is not Backends.KUBERNETES: + if backend != Backends.KUBERNETES: raise ValueError( "Kubernetes kwargs can only be used with the Kubernetes backend." ) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index 8201293..9cb7745 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -844,7 +844,7 @@ def run_job( container="init-container", ) - output_path.mkdir(parents=True, exist_ok=True) + Path(output_path).mkdir(parents=True, exist_ok=True) logger.info("Starting inference") start_time = time.time() @@ -897,7 +897,7 @@ def run_job( ) _sanity_check_output( - data_path=data_path, + data_path=Path(data_path), output_path=output_path, container_output=job_output, internal_external_name_map=internal_external_name_map, From bdd5288c67a9373bc7060acd379f7623e1e17017 Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 10 Nov 2025 10:17:17 +0000 Subject: [PATCH 29/31] Refactor Kubernetes job functions to improve clarity and add new utility function Updated the return type of _create_finalizer_job and _create_namespaced_job functions to return job names for better usability. Introduced a new utility function _check_pod_terminal_or_running to streamline pod status checks, enhancing code readability and maintainability. --- brats/core/kubernetes.py | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index cfbd8b2..1b70185 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -419,13 +419,15 @@ def _create_namespaced_pvc( def _create_finalizer_job( job_name: str, namespace: str, pvc_name: str, mount_path: str = "/data" -) -> None: +) -> str: """Create a finalizer job in the specified namespace. Args: job_name (str): Name of the Job to create namespace (str): The Kubernetes namespace to create the Job in pvc_name (str): Name of the PersistentVolumeClaim (PVC) to use for this Job mount_path (str): The path to mount the PVC to. Defaults to "/data". + Returns: + str: The name of the finalizer job """ batch_v1_api = client.BatchV1Api() job_list = batch_v1_api.list_namespaced_job(namespace=namespace) @@ -515,11 +517,13 @@ def _create_namespaced_job( namespace (str): The Kubernetes namespace to create the Job in pvc_name (str): Name of the PersistentVolumeClaim (PVC) to use for this Job image (str): The image to use for the Job - device_requests (List[client.V1DeviceRequest]): The device requests to use for the Job + device_requests (List[docker.types.DeviceRequest]): The device requests to use for the Job pv_mounts (Dict[str, str]): The PersistentVolumeClaims (PVCs) to mount to the Job. args (List[str]): The arguments to use for the Job. Defaults to None. shm_size (str): The size of the shared memory to use for the Job. Defaults to None. user (str): The user to run the Job as. Defaults to None (root is used if not specified). + Returns: + str: The name of the pod created for the Job """ batch_v1_api = client.BatchV1Api() job_list = batch_v1_api.list_namespaced_job(namespace=namespace) @@ -642,6 +646,23 @@ def _create_namespaced_job( return pod_name +def _check_pod_terminal_or_running(pod_phase: str, pod_name: str) -> bool: + """Check if the pod is in a terminal phase or running. + Args: + pod_phase (str): The phase of the pod + pod_name (str): The name of the pod + Returns: + bool: True if the pod is in a terminal phase or running, False otherwise + """ + if pod_phase == "Running": + logger.info(f"Pod '{pod_name}' is running.") + return True + elif pod_phase in ["Failed", "Succeeded"]: + logger.warning(f"Pod '{pod_name}' entered terminal phase: {pod_phase}") + return True + else: + return False + def run_job( algorithm: AlgorithmData, data_path: Union[Path, str], @@ -784,20 +805,10 @@ def run_job( if exit_loop: break else: - if pod_phase == "Running": - logger.info(f"Pod '{pod_name}' is running.") - break - elif pod_phase in ["Failed", "Succeeded"]: - logger.warning( - f"Pod '{pod_name}' entered terminal phase: {pod_phase}" - ) + if _check_pod_terminal_or_running(pod_phase, pod_name): break else: - if pod_phase == "Running": - logger.info(f"Pod '{pod_name}' is running.") - break - elif pod_phase in ["Failed", "Succeeded"]: - logger.warning(f"Pod '{pod_name}' entered terminal phase: {pod_phase}") + if _check_pod_terminal_or_running(pod_phase, pod_name): break time.sleep(2) else: From 23cc7580f15ce6a4fc326ca2893b71b6b4976378 Mon Sep 17 00:00:00 2001 From: Simone Bendazzoli Date: Mon, 10 Nov 2025 10:21:57 +0000 Subject: [PATCH 30/31] Enhance Kubernetes job functions with improved documentation and additional checks Updated the _build_command_args function to ensure additional file paths are only appended if they exist. Enhanced the documentation for _observe_job_output and _create_namespaced_job functions to clarify return values and error handling, improving overall code clarity and usability. --- brats/core/kubernetes.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index 1b70185..2416a47 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -50,7 +50,7 @@ def _build_command_args( if algorithm.additional_files is not None: for i, param in enumerate(algorithm.additional_files.param_name): additional_files_arg = f"--{param}={str(additional_files_path)}" - if algorithm.additional_files.param_path: + if algorithm.additional_files.param_path and len(algorithm.additional_files.param_path) > i: additional_files_arg += f"/{algorithm.additional_files.param_path[i]}" command_args += f" {additional_files_arg}" @@ -68,6 +68,8 @@ def _observe_job_output(pod_name: str, namespace: str) -> str: Args: pod_name (str): The name of the pod to observe the output of namespace (str): The namespace of the pod to observe the output of + Returns: + str: The output of the job """ v1 = client.CoreV1Api() @@ -509,7 +511,7 @@ def _create_namespaced_job( args: List[str] = None, shm_size: str = None, user: str = None, -) -> None: +) -> str: """Create a namespaced Job in the specified namespace. Args: @@ -523,7 +525,7 @@ def _create_namespaced_job( shm_size (str): The size of the shared memory to use for the Job. Defaults to None. user (str): The user to run the Job as. Defaults to None (root is used if not specified). Returns: - str: The name of the pod created for the Job + str: The name of the pod created for the Job. If the pod creation fails, an exception is raised. """ batch_v1_api = client.BatchV1Api() job_list = batch_v1_api.list_namespaced_job(namespace=namespace) From 56215a3dd66745c3b8008267a29ab54be08cdb87 Mon Sep 17 00:00:00 2001 From: "brainless-bot[bot]" <153751247+brainless-bot[bot]@users.noreply.github.com> Date: Mon, 10 Nov 2025 10:24:28 +0000 Subject: [PATCH 31/31] Autoformat with black --- brats/core/kubernetes.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py index 2416a47..76b7c46 100644 --- a/brats/core/kubernetes.py +++ b/brats/core/kubernetes.py @@ -50,7 +50,10 @@ def _build_command_args( if algorithm.additional_files is not None: for i, param in enumerate(algorithm.additional_files.param_name): additional_files_arg = f"--{param}={str(additional_files_path)}" - if algorithm.additional_files.param_path and len(algorithm.additional_files.param_path) > i: + if ( + algorithm.additional_files.param_path + and len(algorithm.additional_files.param_path) > i + ): additional_files_arg += f"/{algorithm.additional_files.param_path[i]}" command_args += f" {additional_files_arg}" @@ -665,6 +668,7 @@ def _check_pod_terminal_or_running(pod_phase: str, pod_name: str) -> bool: else: return False + def run_job( algorithm: AlgorithmData, data_path: Union[Path, str],