diff --git a/README.md b/README.md index ddfda6e..0d44969 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,39 @@ segmenter.infer_single( ) ``` +## Kubernetes Support +BraTS orchestrator also supports Kubernetes to run the algorithms remotely, as an alternative to local execution with Docker or Singularity. +To use Kubernetes for execution, the orchestrator will automatically use your kubeconfig file from the default location (`~/.kube/config`). If your kubeconfig file is not in the default location, set the `KUBECONFIG` environment variable to the location of your kubeconfig file: +```bash +export KUBECONFIG=/path/to/kubeconfig +``` +Then, specify the backend to use as `kubernetes` (or the corresponding enum value `Backends.KUBERNETES`) when running the inference: +```python +from brats.constants import Backends +segmenter.infer_single( + t1c="path/to/t1c.nii.gz", + output_file="path/to/segmentation.nii.gz", + backend=Backends.KUBERNETES +) +``` +By default, as shown above, the algorithm runs in the default Kubernetes namespace. It uses the default StorageClass and automatically creates a 1Gi PersistentVolumeClaim (PVC) to manage input and output data. If needed, you can customize settings such as the namespace, PVC name, storage size, storage class, job name, and mount path by providing related keyword arguments to the `infer_single` method. The `data_mount_path` parameter determines where the PVC will be mounted inside the Pod. +When using Kubernetes, the algorithm is executed inside a Kubernetes Job. Input data is first uploaded to a PersistentVolume, which is mounted into the Pod running the job. After the algorithm finishes running in the Pod, the output data is transferred back from the cluster to your local machine. +```python +segmenter.infer_single( + t1c="path/to/t1c.nii.gz", + output_file="path/to/segmentation.nii.gz", + backend=Backends.KUBERNETES, + kubernetes_kwargs={ + "namespace": "brats", + "pvc_name": "brats-iwydw55ej7qm-pvc", + "pvc_storage_size": "2Gi", + "pvc_storage_class": "brats-pvc-storage-class", + "job_name": "brats-oxh24nu4dhk9-job", + "data_mount_path": "/data", + } +) +``` + ## Available Algorithms and Usage > [!IMPORTANT] diff --git a/brats/constants.py b/brats/constants.py index 1570b8f..c88b739 100644 --- a/brats/constants.py +++ b/brats/constants.py @@ -13,6 +13,9 @@ class Backends(str, Enum): SINGULARITY = "singularity" """Run the algorithms using Singularity containers.""" + KUBERNETES = "kubernetes" + """Run the algorithms using Kubernetes Jobs.""" + class Task(str, Enum): """Available tasks.""" diff --git a/brats/core/brats_algorithm.py b/brats/core/brats_algorithm.py index 83ba317..1e3bbc8 100644 --- a/brats/core/brats_algorithm.py +++ b/brats/core/brats_algorithm.py @@ -9,6 +9,7 @@ from brats.core.docker import run_container as run_docker_container from brats.core.singularity import run_container as run_singularity_container +from brats.core.kubernetes import run_job as run_kubernetes_job from brats.utils.algorithm_config import load_algorithms from brats.constants import OUTPUT_NAME_SCHEMA, Algorithms, Task, Backends from brats.utils.data_handling import InferenceSetup @@ -163,6 +164,7 @@ def _get_backend_runner(self, backend: Backends) -> Optional[Callable]: backend_dispatch = { Backends.DOCKER: run_docker_container, Backends.SINGULARITY: run_singularity_container, + Backends.KUBERNETES: run_kubernetes_job, } runner = backend_dispatch.get(backend, None) return runner @@ -173,6 +175,7 @@ def _infer_single( output_file: Path | str, log_file: Optional[Path | str] = None, backend: Backends = Backends.DOCKER, + kubernetes_kwargs: Optional[Dict] = None, ) -> None: """ Perform a single inference run with the provided inputs and save the output in the specified file. @@ -181,7 +184,8 @@ def _infer_single( inputs (dict[str, Path | str]): Input Images for the task output_file (Path | str): File to save the output log_file (Optional[Path | str], optional): Log file with extra information. Defaults to None. - backend (Backends | str, optional): Backend to use for inference. Defaults to Backends.DOCKER. + backend (Backends, optional): Backend to use for inference. Defaults to Backends.DOCKER. + kubernetes_kwargs (Optional[Dict], optional): Optional keyword arguments for Kubernetes Backend. Defaults to None. """ with InferenceSetup(log_file=log_file) as (tmp_data_folder, tmp_output_folder): logger.info(f"Performing single inference") @@ -199,13 +203,22 @@ def _infer_single( runner = self._get_backend_runner(backend) if runner is None: raise ValueError(f"Unsupported backend: {backend}") - runner( + runner_kwargs = dict( algorithm=self.algorithm, data_path=tmp_data_folder, output_path=tmp_output_folder, cuda_devices=self.cuda_devices, force_cpu=self.force_cpu, ) + if kubernetes_kwargs is not None: + logger.debug(f"Adding Kubernetes kwargs: {kubernetes_kwargs}") + if backend != Backends.KUBERNETES: + raise ValueError( + "Kubernetes kwargs can only be used with the Kubernetes backend." + ) + for key, value in kubernetes_kwargs.items(): + runner_kwargs[key] = value + runner(**runner_kwargs) self._process_single_output( tmp_output_folder=tmp_output_folder, subject_id=subject_id, @@ -219,6 +232,7 @@ def _infer_batch( output_folder: Path | str, log_file: Optional[Path | str] = None, backend: Backends = Backends.DOCKER, + kubernetes_kwargs: Optional[Dict] = None, ): """Perform a batch inference run with the provided inputs and save the outputs in the specified folder. @@ -227,6 +241,7 @@ def _infer_batch( output_folder (Path | str): Folder to save the outputs log_file (Optional[Path | str], optional): Log file with extra information. Defaults to None. backend (Backends, optional): Backend to use for inference. Defaults to Backends.DOCKER. + kubernetes_kwargs (Optional[Dict], optional): Optional keyword arguments for Kubernetes Backend. Defaults to None. """ with InferenceSetup(log_file=log_file) as (tmp_data_folder, tmp_output_folder): @@ -246,7 +261,7 @@ def _infer_batch( if runner is None: raise ValueError(f"Unsupported backend: {backend}") # run inference in container - runner( + runner_kwargs = dict( algorithm=self.algorithm, data_path=tmp_data_folder, output_path=tmp_output_folder, @@ -254,6 +269,15 @@ def _infer_batch( force_cpu=self.force_cpu, internal_external_name_map=internal_external_name_map, ) + if kubernetes_kwargs is not None: + logger.debug(f"Adding Kubernetes kwargs: {kubernetes_kwargs}") + if backend != Backends.KUBERNETES: + raise ValueError( + "Kubernetes kwargs can only be used with the Kubernetes backend." + ) + for key, value in kubernetes_kwargs.items(): + runner_kwargs[key] = value + runner(**runner_kwargs) self._process_batch_output( tmp_output_folder=tmp_output_folder, diff --git a/brats/core/kubernetes.py b/brats/core/kubernetes.py new file mode 100644 index 0000000..76b7c46 --- /dev/null +++ b/brats/core/kubernetes.py @@ -0,0 +1,923 @@ +from __future__ import annotations + +import time +from pathlib import Path +from typing import Dict, Optional, Union, List +import random +import string +import base64 +import docker +import io +import tarfile +from loguru import logger +from kubernetes import client, config +from kubernetes.stream import stream +from brats.constants import PARAMETERS_DIR +from brats.utils.algorithm_config import AlgorithmData +from brats.core.docker import ( + _log_algorithm_info, + _get_container_user, + _handle_device_requests, + _get_parameters_arg, + _sanity_check_output, +) +from brats.utils.zenodo import ( + _get_zenodo_metadata_and_archive_url, + ZenodoException, + get_dummy_path, +) + + +def _build_command_args( + algorithm: AlgorithmData, + additional_files_path: str, + data_path: str, + output_path: str, + mount_path: str = "/data", +) -> str: + """Build the command arguments for the Kubernetes job. + + Args: + algorithm (AlgorithmData): The algorithm data + additional_files_path (str): The path to the additional files + data_path (str): The path to the input data + output_path (str): The path to save the output + mount_path (str): The path to mount the PVC to. Defaults to "/data". + Returns: + command_args: The command arguments + """ + command_args = f"--data_path={str(data_path)} --output_path={str(output_path)}" + if algorithm.additional_files is not None: + for i, param in enumerate(algorithm.additional_files.param_name): + additional_files_arg = f"--{param}={str(additional_files_path)}" + if ( + algorithm.additional_files.param_path + and len(algorithm.additional_files.param_path) > i + ): + additional_files_arg += f"/{algorithm.additional_files.param_path[i]}" + command_args += f" {additional_files_arg}" + + params_arg = _get_parameters_arg(algorithm=algorithm) + if params_arg: + command_args += params_arg.replace( + "/mlcube_io3", str(Path(mount_path).joinpath("parameters")) + ) + + return command_args + + +def _observe_job_output(pod_name: str, namespace: str) -> str: + """Observe the output of a running job. + Args: + pod_name (str): The name of the pod to observe the output of + namespace (str): The namespace of the pod to observe the output of + Returns: + str: The output of the job + """ + v1 = client.CoreV1Api() + + for _ in range(300): # up to 10 minutes + pod = v1.read_namespaced_pod(name=pod_name, namespace=namespace) + pod_phase = pod.status.phase + statuses = pod.status.container_statuses + is_running = False + if statuses: + for status in statuses: + if ( + status.name == "job-container" + and status.state + and status.state.running + ): + is_running = True + break + if pod_phase == "Running" and is_running: + break + elif pod_phase in ["Failed", "Succeeded"]: + logger.warning(f"Pod '{pod_name}' entered terminal phase: {pod_phase}") + break + time.sleep(2) + else: + logger.error( + f"Timed out waiting for main container in pod '{pod_name}' to be running" + ) + return "" + try: + log = v1.read_namespaced_pod_log( + name=pod_name, + namespace=namespace, + container="job-container", + follow=True, + _preload_content=True, + ) + return log + except Exception as e: + logger.error( + f"Failed to fetch logs from pod '{pod_name}' in namespace '{namespace}': {e}" + ) + return "" + + +def _execute_command_in_pod( + pod_name: str, + namespace: str, + command: List[str], + container: str, + stderr: bool = True, + stdin: bool = False, + stdout: bool = True, + tty: bool = False, + _preload_content: bool = True, +) -> str: + """Execute a command in a pod. + Args: + pod_name (str): The name of the pod to execute the command in + namespace (str): The namespace of the pod to execute the command in + command (List[str]): The command to execute + container (str): The container to execute the command in + stderr (bool): Whether to capture stderr. Defaults to True. + stdin (bool): Whether to capture stdin. Defaults to False. + stdout (bool): Whether to capture stdout. Defaults to True. + tty (bool): Whether to use a TTY. Defaults to False. + _preload_content (bool): Whether to preload the content. Defaults to True. + Returns: + str: The output of the command + """ + v1 = client.CoreV1Api() + output = stream( + v1.connect_get_namespaced_pod_exec, + pod_name, + namespace, + command=command, + container=container, + stderr=stderr, + stdin=stdin, + stdout=stdout, + tty=tty, + _preload_content=_preload_content, + ) + logger.debug( + f"Command '{command}' executed successfully in pod '{pod_name}' in namespace '{namespace}'.\nOutput:\n{output}" + ) + return output + + +def _download_additional_files( + algorithm: AlgorithmData, pod_name: str, namespace: str, mount_path: str = "/data" +) -> Path: + """Download additional files from Zenodo. + Args: + algorithm (AlgorithmData): The algorithm data + pod_name (str): The name of the pod to download the additional files to + namespace (str): The namespace of the pod to download the additional files to + mount_path (str): The path to mount the PVC to. Defaults to "/data". + """ + if algorithm.additional_files is not None: + zenodo_response = _get_zenodo_metadata_and_archive_url( + record_id=algorithm.additional_files.record_id + ) + if not zenodo_response: + msg = "Additional files not found locally and Zenodo could not be reached. Exiting..." + logger.error(msg) + raise ZenodoException(msg) + + zenodo_metadata, archive_url = zenodo_response + record_folder = str( + Path(mount_path).joinpath( + f"{algorithm.additional_files.record_id}_v{zenodo_metadata['version']}" + ) + ) + + commands = [ + "sh", + "-c", + ( + f'if [ ! -d {record_folder} ] || [ -z "$(ls -A {record_folder})" ]; then ' + f" mkdir -p {record_folder} && " + f" wget -O {record_folder}/archive.zip {archive_url} && " + f" apk add --no-cache unzip && " + f" unzip {record_folder}/archive.zip -d {record_folder} && " + f" rm {record_folder}/archive.zip && " + f" for f in {record_folder}/*.zip; do " + f' if [ -f "$f" ]; then unzip "$f" -d {record_folder} && rm "$f"; fi; ' + f" done " + f"else " + f" echo 'Additional files already present in {record_folder}, skipping download.'; " + f"fi" + ), + ] + logger.info(f"Downloading additional files to {record_folder}...") + output = _execute_command_in_pod( + pod_name=pod_name, + namespace=namespace, + command=commands, + container="init-container", + ) + logger.info( + f"Additional files downloaded successfully to pod '{pod_name}' in namespace '{namespace}'." + ) + logger.info(f"Contents of {record_folder}:\n{output}") + return Path(record_folder) + else: + return get_dummy_path() + + +def _check_files_in_pod( + pod_name: str, namespace: str, paths: List[Path], mount_path: str = "/data" +) -> None: + """Check if all the local files are present in the mounted path inside the pod. + Args: + pod_name (str): Name of the pod to check files in + namespace (str): The Kubernetes namespace to check files in + paths (List[Path]): List of local files to check + mount_path (str): The path to mount the PVC to. Defaults to "/data". + """ + logger.debug( + f"Checking files in pod '{pod_name}' in namespace '{namespace}' with mount path '{mount_path}'." + ) + for path in paths: + for file in path.glob("**/*"): + if file.is_file(): + commands = [ + "ls", + "-la", + str(Path(mount_path).joinpath("input", file.relative_to(path))), + ] + output = _execute_command_in_pod( + pod_name=pod_name, + namespace=namespace, + command=commands, + container="init-container", + ) + if "No such file or directory" in output: + logger.warning( + f"File '{file.relative_to(path)}' is not present in pod '{pod_name}' in namespace '{namespace}'. Uploading it now..." + ) + _upload_files_to_pod( + pod_name=pod_name, + namespace=namespace, + paths=[file], + mount_path=mount_path, + relative_to=path, + parent_dir="input", + ) + + +def _download_folder_from_pod( + pod_name: str, + namespace: str, + container: str, + remote_paths: List[Path], + local_base_dir: Path = Path(".").absolute(), +): + """Download a folder from a pod to a local directory. + Args: + pod_name (str): The name of the pod to download the folder from + namespace (str): The namespace of the pod to download the folder from + container (str): The container to download the folder from + remote_paths (List[Path]): The paths to the remote folder to download + local_base_dir (Path): The base directory to download the folder to. Defaults to the current directory. + """ + + for path in remote_paths: + folder_name = str(path) + + command = ["sh", "-c", f"tar cf - -C {folder_name} . | base64"] + resp = _execute_command_in_pod( + pod_name=pod_name, + namespace=namespace, + command=command, + container=container, + stderr=True, + stdin=False, + stdout=True, + _preload_content=False, + ) + + base64_chunks = [] + + while resp.is_open(): + resp.update(timeout=1) + if resp.peek_stdout(): + chunk = resp.read_stdout() + if isinstance(chunk, str): + chunk = chunk.encode("utf-8") + base64_chunks.append(chunk) + if resp.peek_stderr(): + err = resp.read_stderr() + if err: + logger.error(f"STDERR: {err}") + resp.close() + + full_base64 = b"".join(base64_chunks) + tar_data = base64.b64decode(full_base64) + + base_folder_name = Path(folder_name).name + tarfile_path = local_base_dir / f"{base_folder_name}" + with open(tarfile_path, "wb") as tarfile_obj: + tarfile_obj.write(tar_data) + + with tarfile.open(tarfile_path, "r") as tar: + tar.extractall(path=local_base_dir) + + tarfile_path.unlink() + + +def _upload_files_to_pod( + pod_name: str, + namespace: str, + paths: List[Path], + mount_path: str = "/data", + relative_to: Path = None, + parent_dir: Path = None, +) -> None: + """Upload files to a pod in the specified namespace. + Args: + pod_name (str): Name of the pod to upload files to + namespace (str): The Kubernetes namespace to upload files to + paths (List[Path]): List of local files or directories to upload + mount_path (str): The path to mount the PVC to. Defaults to "/data". + relative_to (Path): The path to relativize the files to. Defaults to None. + parent_dir (Path): The parent directory of the files to upload. Defaults to None. + """ + + tar_stream = io.BytesIO() + with tarfile.open(fileobj=tar_stream, mode="w") as tar: + for path in paths: + if path.is_file(): + tar.add( + path, + arcname=( + Path(parent_dir).joinpath(path.relative_to(relative_to)) + if parent_dir + else path.relative_to(relative_to) + ), + ) + else: + for file in path.glob("**/*"): + if file.is_file(): + tar.add( + file, + arcname=( + Path(parent_dir).joinpath(file.relative_to(path)) + if parent_dir + else file.relative_to(path) + ), + ) + tar_stream.seek(0) + commands = ["tar", "xmf", "-", "-C", mount_path] + resp = _execute_command_in_pod( + pod_name=pod_name, + namespace=namespace, + command=commands, + container="init-container", + stdin=True, + _preload_content=False, + ) + + resp.write_stdin(tar_stream.read()) + resp.close() + logger.info( + f"Files uploaded successfully to pod '{pod_name}' in namespace '{namespace}'." + ) + + +def _create_namespaced_pvc( + pvc_name: str, namespace: str, storage_size: str = "1Gi", storage_class: str = None +) -> None: + """Create a namespaced PersistentVolumeClaim (PVC) in the specified namespace. If the PVC already exists, it will be skipped. + + Args: + pvc_name (str): Name of the PVC to create + namespace (str): The Kubernetes namespace to create the PVC in + storage_size (str): The size of the storage to request + storage_class (str): The storage class to use for the PVC. If None, the default storage class will be used. + """ + core_v1_api = client.CoreV1Api() + pvc_list = core_v1_api.list_namespaced_persistent_volume_claim(namespace=namespace) + if len(pvc_list.items) > 0: + for pvc in pvc_list.items: + if pvc.metadata.name == pvc_name: + logger.debug( + f"PVC '{pvc_name}' already exists in namespace '{namespace}'. Skipping creation." + ) + return + + if storage_class is None: + pvc_spec = client.V1PersistentVolumeClaimSpec( + access_modes=["ReadWriteOnce"], + resources=client.V1ResourceRequirements(requests={"storage": storage_size}), + ) + else: + pvc_spec = client.V1PersistentVolumeClaimSpec( + access_modes=["ReadWriteOnce"], + resources=client.V1ResourceRequirements(requests={"storage": storage_size}), + storage_class_name=storage_class, + ) + + core_v1_api.create_namespaced_persistent_volume_claim( + namespace=namespace, + body=client.V1PersistentVolumeClaim( + metadata=client.V1ObjectMeta(name=pvc_name), spec=pvc_spec + ), + ) + + +def _create_finalizer_job( + job_name: str, namespace: str, pvc_name: str, mount_path: str = "/data" +) -> str: + """Create a finalizer job in the specified namespace. + Args: + job_name (str): Name of the Job to create + namespace (str): The Kubernetes namespace to create the Job in + pvc_name (str): Name of the PersistentVolumeClaim (PVC) to use for this Job + mount_path (str): The path to mount the PVC to. Defaults to "/data". + Returns: + str: The name of the finalizer job + """ + batch_v1_api = client.BatchV1Api() + job_list = batch_v1_api.list_namespaced_job(namespace=namespace) + if len(job_list.items) > 0: + for job in job_list.items: + if job.metadata.name == job_name: + logger.warning( + f"Job '{job_name}' already exists in namespace '{namespace}'. Deleting it." + ) + batch_v1_api.delete_namespaced_job(name=job_name, namespace=namespace) + + batch_v1_api.create_namespaced_job( + namespace=namespace, + body=client.V1Job( + metadata=client.V1ObjectMeta(name=job_name), + spec=client.V1JobSpec( + template=client.V1PodTemplateSpec( + spec=client.V1PodSpec( + restart_policy="Never", + volumes=[ + client.V1Volume( + name=pvc_name, + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( + claim_name=pvc_name + ), + ) + ], + containers=[ + client.V1Container( + name="finalizer-container", + image="alpine:latest", + command=[ + "sh", + "-c", + "while [ ! -f /etc/content_verified ]; do sleep 1; done", + ], + volume_mounts=[ + client.V1VolumeMount( + name=pvc_name, mount_path=mount_path + ) + ], + ) + ], + ) + ) + ), + ), + ) + + core_v1_api = client.CoreV1Api() + label_selector = f"job-name={job_name}" + pod_name = None + for _ in range(60): + pod_list = core_v1_api.list_namespaced_pod( + namespace=namespace, label_selector=label_selector + ) + if pod_list.items: + # If more than one pod, pick the latest created pod (by creation timestamp) + latest_pod = max( + pod_list.items, key=lambda pod: pod.metadata.creation_timestamp + ) + pod_name = latest_pod.metadata.name + break + time.sleep(2) + if pod_name is None: + raise RuntimeError( + f"Timed out waiting for pod to be created for job {job_name}" + ) + return pod_name + + +def _create_namespaced_job( + job_name: str, + namespace: str, + pvc_name: str, + image: str, + device_requests: List[docker.types.DeviceRequest], + pv_mounts: Dict[str, str], + args: List[str] = None, + shm_size: str = None, + user: str = None, +) -> str: + """Create a namespaced Job in the specified namespace. + + Args: + job_name (str): Name of the Job to create + namespace (str): The Kubernetes namespace to create the Job in + pvc_name (str): Name of the PersistentVolumeClaim (PVC) to use for this Job + image (str): The image to use for the Job + device_requests (List[docker.types.DeviceRequest]): The device requests to use for the Job + pv_mounts (Dict[str, str]): The PersistentVolumeClaims (PVCs) to mount to the Job. + args (List[str]): The arguments to use for the Job. Defaults to None. + shm_size (str): The size of the shared memory to use for the Job. Defaults to None. + user (str): The user to run the Job as. Defaults to None (root is used if not specified). + Returns: + str: The name of the pod created for the Job. If the pod creation fails, an exception is raised. + """ + batch_v1_api = client.BatchV1Api() + job_list = batch_v1_api.list_namespaced_job(namespace=namespace) + if len(job_list.items) > 0: + for job in job_list.items: + if job.metadata.name == job_name: + logger.warning( + f"Job '{job_name}' already exists in namespace '{namespace}'. Deleting it." + ) + batch_v1_api.delete_namespaced_job(name=job_name, namespace=namespace) + core_v1_api = client.CoreV1Api() + label_selector = f"job-name={job_name}" + pod_list = core_v1_api.list_namespaced_pod( + namespace=namespace, label_selector=label_selector + ) + for pod in pod_list.items: + pod_name_to_delete = pod.metadata.name + logger.warning( + f"Deleting pod '{pod_name_to_delete}' in namespace '{namespace}' associated with job '{job_name}'." + ) + try: + core_v1_api.delete_namespaced_pod( + name=pod_name_to_delete, namespace=namespace + ) + except Exception as e: + logger.error( + f"Failed to delete pod '{pod_name_to_delete}' in namespace '{namespace}': {e}" + ) + + # user_id = int(user.split(":")[0]) if user else 0 # TODO: Implement security_context for container if/when user/group IDs are required. + # group_id = int(user.split(":")[1]) if user else 0 # TODO: Implement security_context for container if/when user/group IDs are required. + volume_mounts = [] + for pvc_mount_name, pvc_mount_path in pv_mounts.items(): + volume_mounts.append( + client.V1VolumeMount(name=pvc_mount_name, mount_path=pvc_mount_path) + ) + container_spec = client.V1Container( + name="job-container", + image=image, + volume_mounts=volume_mounts, + # security_context=client.V1SecurityContext(run_as_user=user_id, run_as_group=group_id) + # TODO: Implement security_context for container if/when user/group IDs are required. + ) + if len(device_requests) > 0: + gpu_count = len(device_requests) + container_spec.resources = client.V1ResourceRequirements( + requests={"nvidia.com/gpu": gpu_count}, limits={"nvidia.com/gpu": gpu_count} + ) + volumes = [ + client.V1Volume( + name=pvc_mount_name, + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( + claim_name=pvc_mount_name + ), + ) + for pvc_mount_name in pv_mounts.keys() + ] + if shm_size is not None: + shm_size_formatted = shm_size.replace("gb", "Gi") + volumes.append( + client.V1Volume( + name="shm", + empty_dir=client.V1EmptyDirVolumeSource( + medium="Memory", size_limit=shm_size_formatted + ), + ) + ) + container_spec.volume_mounts.append( + client.V1VolumeMount(name="shm", mount_path="/dev/shm") + ) + + if args is not None: + container_spec.args = args + batch_v1_api.create_namespaced_job( + namespace=namespace, + body=client.V1Job( + metadata=client.V1ObjectMeta(name=job_name), + spec=client.V1JobSpec( + template=client.V1PodTemplateSpec( + spec=client.V1PodSpec( + restart_policy="Never", + volumes=volumes, + init_containers=[ + client.V1Container( + name="init-container", + image="alpine:latest", + command=[ + "sh", + "-c", + "while [ ! -f /etc/content_verified ]; do sleep 1; done", + ], + volume_mounts=volume_mounts, + ) + ], + containers=[container_spec], + ) + ) + ), + ), + ) + + core_v1_api = client.CoreV1Api() + label_selector = f"job-name={job_name}" + pod_name = None + for _ in range(60): + pod_list = core_v1_api.list_namespaced_pod( + namespace=namespace, label_selector=label_selector + ) + if pod_list.items: + latest_pod = max( + pod_list.items, key=lambda pod: pod.metadata.creation_timestamp + ) + pod_name = latest_pod.metadata.name + break + time.sleep(2) + if pod_name is None: + raise RuntimeError( + f"Timed out waiting for pod to be created for job {job_name}" + ) + return pod_name + + +def _check_pod_terminal_or_running(pod_phase: str, pod_name: str) -> bool: + """Check if the pod is in a terminal phase or running. + Args: + pod_phase (str): The phase of the pod + pod_name (str): The name of the pod + Returns: + bool: True if the pod is in a terminal phase or running, False otherwise + """ + if pod_phase == "Running": + logger.info(f"Pod '{pod_name}' is running.") + return True + elif pod_phase in ["Failed", "Succeeded"]: + logger.warning(f"Pod '{pod_name}' entered terminal phase: {pod_phase}") + return True + else: + return False + + +def run_job( + algorithm: AlgorithmData, + data_path: Union[Path, str], + output_path: Union[Path, str], + cuda_devices: str, + force_cpu: bool, + internal_external_name_map: Optional[Dict[str, str]] = None, + namespace: Optional[str] = "default", + pvc_name: Optional[str] = None, + pvc_storage_size: Optional[str] = "1Gi", + pvc_storage_class: Optional[str] = None, + job_name: Optional[str] = None, + data_mount_path: Optional[str] = "/data", +): + """Run a Kubernetes job for the provided algorithm. + + Args: + algorithm (AlgorithmData): The data of the algorithm to run + data_path (Union[Path, str]): The path to the input data + output_path (Union[Path, str]): The path to save the output + cuda_devices (str): The CUDA devices to use + force_cpu (bool): Whether to force CPU execution + internal_external_name_map (Optional[Dict[str, str]]): Dictionary mapping internal name (in standardized format) to external subject name provided by user (only used for batch inference) + namespace (Optional[str]): The Kubernetes namespace to run the job in. Defaults to "default". + pvc_name (Optional[str]): Name of the PersistentVolumeClaim (PVC) to use for this job. If the PVC does not already exist, it will be created; otherwise, it must already contain the input data required for running the algorithm. + pvc_storage_size (Optional[str]): The size of the storage to request for the PVC. Defaults to "1Gi". + pvc_storage_class (Optional[str]): The storage class to use for the PVC. If None, the default storage class will be used. + job_name (Optional[str]): Name of the Job to create. If None, a random name will be generated. + data_mount_path (Optional[str]): The path to mount the PVC to. Defaults to "/data". + """ + if pvc_name is None: + pvc_name = ( + "brats-" + + "".join(random.choices(string.ascii_lowercase + string.digits, k=12)) + + "-pvc" + ) + if job_name is None: + job_name = ( + "brats-" + + "".join(random.choices(string.ascii_lowercase + string.digits, k=12)) + + "-job" + ) + + logger.debug(f"Job name: {job_name}") + logger.debug(f"PersistentVolumeClaim name: {pvc_name}") + config.load_kube_config() + _create_namespaced_pvc( + pvc_name=pvc_name, + namespace=namespace, + storage_size=pvc_storage_size, + storage_class=pvc_storage_class, + ) + _log_algorithm_info(algorithm=algorithm) + + device_requests = _handle_device_requests( + algorithm=algorithm, + cuda_devices=cuda_devices, + force_cpu=force_cpu, + ) + logger.debug(f"GPU Device requests: {device_requests}") + user = _get_container_user(algorithm=algorithm) + logger.debug(f"Container user: {user if user else 'root (required by algorithm)'}") + + if algorithm.meta.year > 2024: + input_mount_path = "/input" + output_mount_path = "/output" + else: + output_mount_path = Path(data_mount_path).joinpath("output") + + if algorithm.meta.year <= 2024: + if algorithm.additional_files is not None: + zenodo_response = _get_zenodo_metadata_and_archive_url( + record_id=algorithm.additional_files.record_id + ) + if not zenodo_response: + msg = "Additional files not found locally and Zenodo could not be reached. Exiting..." + logger.error(msg) + raise ZenodoException(msg) + + zenodo_metadata, archive_url = zenodo_response + additional_files_path = Path(data_mount_path).joinpath( + f"{algorithm.additional_files.record_id}_v{zenodo_metadata['version']}" + ) + else: + additional_files_path = get_dummy_path() + + command_args = _build_command_args( + algorithm=algorithm, + additional_files_path=additional_files_path, + data_path=Path(data_mount_path).joinpath("input"), + output_path=output_mount_path, + mount_path=data_mount_path, + ) + command = ["infer", *command_args.split(" ")] + pv_mounts = { + pvc_name: data_mount_path, + } + else: + command = None + _create_namespaced_pvc( + pvc_name=pvc_name + "-output", + namespace=namespace, + storage_size=pvc_storage_size, + storage_class=pvc_storage_class, + ) + + pv_mounts = { + pvc_name: input_mount_path, + pvc_name + "-output": output_mount_path, + } + + pod_name = _create_namespaced_job( + job_name=job_name, + namespace=namespace, + pvc_name=pvc_name, + pv_mounts=pv_mounts, + image=algorithm.run_args.docker_image, + device_requests=device_requests, + args=command, + shm_size=algorithm.run_args.shm_size, + user=user, + ) + logger.debug(f"Pod name: {pod_name}") + + core_v1_api = client.CoreV1Api() + logger.info(f"Waiting for Pod '{pod_name}' to be running...") + for _ in range(300): # wait up to 10 minutes + pod = core_v1_api.read_namespaced_pod(name=pod_name, namespace=namespace) + pod_phase = pod.status.phase + if pod.status.init_container_statuses: + exit_loop = False + for init_status in pod.status.init_container_statuses: + state = init_status.state + if state and state.running: + logger.info( + f"Pod '{pod_name}' initContainer '{init_status.name}' is running." + ) + exit_loop = True + break + if exit_loop: + break + else: + if _check_pod_terminal_or_running(pod_phase, pod_name): + break + else: + if _check_pod_terminal_or_running(pod_phase, pod_name): + break + time.sleep(2) + else: + raise RuntimeError(f"Timed out waiting for pod {pod_name} to be running") + _check_files_in_pod( + pod_name=pod_name, + namespace=namespace, + paths=[Path(data_path)], + mount_path=data_mount_path if algorithm.meta.year <= 2024 else input_mount_path, + ) + logger.debug( + f"Files checked successfully in pod '{pod_name}' in namespace '{namespace}'." + ) + + if algorithm.meta.year <= 2024: + _download_additional_files( + algorithm=algorithm, + pod_name=pod_name, + namespace=namespace, + mount_path=data_mount_path, + ) + _upload_files_to_pod( + pod_name=pod_name, + namespace=namespace, + paths=[PARAMETERS_DIR], + mount_path=data_mount_path, + parent_dir="parameters", + ) + commands = [ + "tree", + data_mount_path if algorithm.meta.year <= 2024 else input_mount_path, + ] + _execute_command_in_pod( + pod_name=pod_name, + namespace=namespace, + command=commands, + container="init-container", + ) + commands = ["touch", "/etc/content_verified"] + _execute_command_in_pod( + pod_name=pod_name, + namespace=namespace, + command=commands, + container="init-container", + ) + + Path(output_path).mkdir(parents=True, exist_ok=True) + + logger.info("Starting inference") + start_time = time.time() + + time.sleep(2) + job_output = _observe_job_output(pod_name=pod_name, namespace=namespace) + + core_v1_api = client.CoreV1Api() + for _ in range(300): # Wait up to 10 minutes + pod = core_v1_api.read_namespaced_pod(name=pod_name, namespace=namespace) + pod_phase = pod.status.phase + if pod_phase in ("Succeeded", "Failed"): + logger.info(f"Job pod '{pod_name}' finished with phase: {pod_phase}") + break + time.sleep(2) + else: + raise RuntimeError(f"Timed out waiting for job pod '{pod_name}' to complete.") + + pvc_name_output = pvc_name + mount_path = str( + data_mount_path if algorithm.meta.year <= 2024 else input_mount_path + ) + if algorithm.meta.year > 2024: + pvc_name_output = pvc_name + "-output" + mount_path = str(output_mount_path) + + pod_name_finalizer = _create_finalizer_job( + job_name=job_name + "-finalizer", + namespace=namespace, + pvc_name=pvc_name_output, + mount_path=mount_path, + ) + time.sleep(2) + _download_folder_from_pod( + pod_name=pod_name_finalizer, + namespace=namespace, + container="finalizer-container", + remote_paths=[output_mount_path], + local_base_dir=output_path, + ) + + commands = ["touch", "/etc/content_verified"] + _execute_command_in_pod( + pod_name=pod_name_finalizer, + namespace=namespace, + command=commands, + container="finalizer-container", + ) + + _sanity_check_output( + data_path=Path(data_path), + output_path=output_path, + container_output=job_output, + internal_external_name_map=internal_external_name_map, + ) + + logger.debug(f"Job output: \n\r{job_output}") + + logger.info(f"Finished inference in {time.time() - start_time:.2f} seconds") diff --git a/brats/core/segmentation_algorithms.py b/brats/core/segmentation_algorithms.py index ea69138..b140f5a 100644 --- a/brats/core/segmentation_algorithms.py +++ b/brats/core/segmentation_algorithms.py @@ -167,6 +167,7 @@ def infer_single( output_file: Path | str, log_file: Optional[Path | str] = None, backend: Optional[Backends] = Backends.DOCKER, + kubernetes_kwargs: Optional[Dict] = None, ) -> None: """Perform segmentation on a single subject with the provided images and save the result to the output file. @@ -178,6 +179,7 @@ def infer_single( output_file (Path | str): Path to save the segmentation log_file (Path | str, optional): Save logs to this file backend (Backends, optional): Backend to use for inference. Defaults to Backends.DOCKER. + kubernetes_kwargs (Optional[Dict], optional): Optional keyword arguments for Kubernetes Backend. Defaults to None. """ self._infer_single( @@ -185,6 +187,7 @@ def infer_single( output_file=output_file, log_file=log_file, backend=backend, + kubernetes_kwargs=kubernetes_kwargs, ) def infer_batch( @@ -193,6 +196,7 @@ def infer_batch( output_folder: Path | str, log_file: Path | str | None = None, backend: Optional[Backends] = Backends.DOCKER, + kubernetes_kwargs: Optional[Dict] = None, ) -> None: """Perform segmentation on a batch of subjects with the provided images and save the results to the output folder. \n Requires the following structure:\n @@ -212,6 +216,7 @@ def infer_batch( output_folder (Path | str): Output folder to save the segmentations log_file (Path | str, optional): Save logs to this file backend (Backends, optional): Backend to use for inference. Defaults to Backends.DOCKER. + kubernetes_kwargs (Optional[Dict], optional): Optional keyword arguments for Kubernetes Backend. Defaults to None. """ return self._infer_batch( @@ -219,6 +224,7 @@ def infer_batch( output_folder=output_folder, log_file=log_file, backend=backend, + kubernetes_kwargs=kubernetes_kwargs, ) @@ -438,6 +444,7 @@ def infer_single( output_file: Path | str, log_file: Optional[Path | str] = None, backend: Optional[Backends] = Backends.DOCKER, + kubernetes_kwargs: Optional[Dict] = None, ) -> None: """ Perform segmentation on a single subject with the provided T1C image and save the result to the output file. @@ -447,6 +454,7 @@ def infer_single( output_file (Path | str): Output file to save the segmentation. log_file (Optional[Path | str], optional): Save logs to this file. Defaults to None. backend (Backends, optional): Backend to use for inference. Defaults to Backends.DOCKER. + kubernetes_kwargs (Optional[Dict], optional): Optional keyword arguments for Kubernetes Backend. Defaults to None. """ self._infer_single( @@ -454,6 +462,7 @@ def infer_single( output_file=output_file, log_file=log_file, backend=backend, + kubernetes_kwargs=kubernetes_kwargs, ) def infer_batch( @@ -462,6 +471,7 @@ def infer_batch( output_folder: Path | str, log_file: Path | str | None = None, backend: Optional[Backends] = Backends.DOCKER, + kubernetes_kwargs: Optional[Dict] = None, ) -> None: """ Perform segmentation on a batch of subjects with the provided T1C images and save the results to the output folder. \n @@ -481,6 +491,7 @@ def infer_batch( output_folder (Path | str): Output folder to save the segmentations log_file (Path | str, optional): Save logs to this file backend (Backends, optional): Backend to use for inference. Defaults to Backends.DOCKER. + kubernetes_kwargs (Optional[Dict], optional): Optional keyword arguments for Kubernetes Backend. Defaults to None. """ return self._infer_batch( @@ -488,4 +499,5 @@ def infer_batch( output_folder=output_folder, log_file=log_file, backend=backend, + kubernetes_kwargs=kubernetes_kwargs, ) diff --git a/poetry.lock b/poetry.lock index 31bdbc2..67ffbf1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -209,6 +209,32 @@ itk-elastix = ["itk-elastix (>=0.20.0,<0.21.0)"] picsl-greedy = ["picsl_greedy (>=0.0.6,<0.0.7)"] synthstrip = ["nipreps-synthstrip (>=0.0.1,<0.0.2)"] +[[package]] +name = "cachetools" +version = "5.5.2" +description = "Extensible memoizing collections and decorators" +optional = false +python-versions = ">=3.7" +groups = ["main"] +markers = "python_version <= \"3.9\"" +files = [ + {file = "cachetools-5.5.2-py3-none-any.whl", hash = "sha256:d26a22bcc62eb95c3beabd9f1ee5e820d3d2704fe2967cbe350e20c8ffcd3f0a"}, + {file = "cachetools-5.5.2.tar.gz", hash = "sha256:1a661caa9175d26759571b2e19580f9d6393969e5dfca11fdb1f947a23e640d4"}, +] + +[[package]] +name = "cachetools" +version = "6.2.1" +description = "Extensible memoizing collections and decorators" +optional = false +python-versions = ">=3.9" +groups = ["main"] +markers = "python_version >= \"3.10\"" +files = [ + {file = "cachetools-6.2.1-py3-none-any.whl", hash = "sha256:09868944b6dde876dfd44e1d47e18484541eaf12f26f29b7af91b26cc892d701"}, + {file = "cachetools-6.2.1.tar.gz", hash = "sha256:3f391e4bd8f8bf0931169baf7456cc822705f4e2a31f840d218f445b9a854201"}, +] + [[package]] name = "certifi" version = "2024.7.4" @@ -676,6 +702,18 @@ files = [ {file = "docutils-0.20.1.tar.gz", hash = "sha256:f08a4e276c3a1583a86dce3e34aba3fe04d02bba2dd51ed16106244e8a923e3b"}, ] +[[package]] +name = "durationpy" +version = "0.10" +description = "Module for converting between datetime.timedelta and Go's Duration strings." +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "durationpy-0.10-py3-none-any.whl", hash = "sha256:3b41e1b601234296b4fb368338fdcd3e13e0b4fb5b67345948f4f2bf9868b286"}, + {file = "durationpy-0.10.tar.gz", hash = "sha256:1fa6893409a6e739c9c72334fc65cca1f355dbdd93405d30f726deb5bde42fba"}, +] + [[package]] name = "exceptiongroup" version = "1.2.2" @@ -863,6 +901,33 @@ pygments = ">=2.7" sphinx = ">=6.0,<9.0" sphinx-basic-ng = ">=1.0.0.beta2" +[[package]] +name = "google-auth" +version = "2.42.1" +description = "Google Authentication Library" +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "google_auth-2.42.1-py2.py3-none-any.whl", hash = "sha256:eb73d71c91fc95dbd221a2eb87477c278a355e7367a35c0d84e6b0e5f9b4ad11"}, + {file = "google_auth-2.42.1.tar.gz", hash = "sha256:30178b7a21aa50bffbdc1ffcb34ff770a2f65c712170ecd5446c4bef4dc2b94e"}, +] + +[package.dependencies] +cachetools = ">=2.0.0,<7.0" +pyasn1-modules = ">=0.2.1" +rsa = ">=3.1.4,<5" + +[package.extras] +aiohttp = ["aiohttp (>=3.6.2,<4.0.0)", "requests (>=2.20.0,<3.0.0)"] +enterprise-cert = ["cryptography", "pyopenssl"] +pyjwt = ["cryptography (<39.0.0) ; python_version < \"3.8\"", "cryptography (>=38.0.3)", "pyjwt (>=2.0)"] +pyopenssl = ["cryptography (<39.0.0) ; python_version < \"3.8\"", "cryptography (>=38.0.3)", "pyopenssl (>=20.0.0)"] +reauth = ["pyu2f (>=0.1.5)"] +requests = ["requests (>=2.20.0,<3.0.0)"] +testing = ["aiohttp (<3.10.0)", "aiohttp (>=3.6.2,<4.0.0)", "aioresponses", "cryptography (<39.0.0) ; python_version < \"3.8\"", "cryptography (<39.0.0) ; python_version < \"3.8\"", "cryptography (>=38.0.3)", "cryptography (>=38.0.3)", "flask", "freezegun", "grpcio", "mock", "oauth2client", "packaging", "pyjwt (>=2.0)", "pyopenssl (<24.3.0)", "pyopenssl (>=20.0.0)", "pytest", "pytest-asyncio", "pytest-cov", "pytest-localserver", "pyu2f (>=0.1.5)", "requests (>=2.20.0,<3.0.0)", "responses", "urllib3"] +urllib3 = ["packaging", "urllib3"] + [[package]] name = "idna" version = "3.8" @@ -1123,6 +1188,33 @@ files = [ {file = "kiwisolver-1.4.9.tar.gz", hash = "sha256:c3b22c26c6fd6811b0ae8363b95ca8ce4ea3c202d3d0975b2914310ceb1bcc4d"}, ] +[[package]] +name = "kubernetes" +version = "34.1.0" +description = "Kubernetes python client" +optional = false +python-versions = ">=3.6" +groups = ["main"] +files = [ + {file = "kubernetes-34.1.0-py2.py3-none-any.whl", hash = "sha256:bffba2272534e224e6a7a74d582deb0b545b7c9879d2cd9e4aae9481d1f2cc2a"}, + {file = "kubernetes-34.1.0.tar.gz", hash = "sha256:8fe8edb0b5d290a2f3ac06596b23f87c658977d46b5f8df9d0f4ea83d0003912"}, +] + +[package.dependencies] +certifi = ">=14.05.14" +durationpy = ">=0.7" +google-auth = ">=1.0.1" +python-dateutil = ">=2.5.3" +pyyaml = ">=5.4.1" +requests = "*" +requests-oauthlib = "*" +six = ">=1.9.0" +urllib3 = ">=1.24.2,<2.4.0" +websocket-client = ">=0.32.0,<0.40.0 || >0.40.0,<0.41.dev0 || >=0.43.dev0" + +[package.extras] +adal = ["adal (>=1.0.2)"] + [[package]] name = "lazy-loader" version = "0.4" @@ -2032,6 +2124,23 @@ files = [ {file = "nvidia_nvtx_cu12-12.8.90-py3-none-win_amd64.whl", hash = "sha256:619c8304aedc69f02ea82dd244541a83c3d9d40993381b3b590f1adaed3db41e"}, ] +[[package]] +name = "oauthlib" +version = "3.3.1" +description = "A generic, spec-compliant, thorough implementation of the OAuth request-signing logic" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "oauthlib-3.3.1-py3-none-any.whl", hash = "sha256:88119c938d2b8fb88561af5f6ee0eec8cc8d552b7bb1f712743136eb7523b7a1"}, + {file = "oauthlib-3.3.1.tar.gz", hash = "sha256:0f0f8aa759826a193cf66c12ea1af1637f87b9b4622d46e866952bb022e538c9"}, +] + +[package.extras] +rsa = ["cryptography (>=3.0.0)"] +signals = ["blinker (>=1.4.0)"] +signedtoken = ["cryptography (>=3.0.0)", "pyjwt (>=2.0.0,<3)"] + [[package]] name = "packaging" version = "24.1" @@ -2311,6 +2420,33 @@ files = [ dev = ["pre-commit", "tox"] testing = ["pytest", "pytest-benchmark"] +[[package]] +name = "pyasn1" +version = "0.6.1" +description = "Pure-Python implementation of ASN.1 types and DER/BER/CER codecs (X.208)" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "pyasn1-0.6.1-py3-none-any.whl", hash = "sha256:0d632f46f2ba09143da3a8afe9e33fb6f92fa2320ab7e886e2d0f7672af84629"}, + {file = "pyasn1-0.6.1.tar.gz", hash = "sha256:6f580d2bdd84365380830acf45550f2511469f673cb4a5ae3857a3170128b034"}, +] + +[[package]] +name = "pyasn1-modules" +version = "0.4.2" +description = "A collection of ASN.1-based protocols modules" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "pyasn1_modules-0.4.2-py3-none-any.whl", hash = "sha256:29253a9207ce32b64c3ac6600edc75368f98473906e8fd1043bd6b5b1de2c14a"}, + {file = "pyasn1_modules-0.4.2.tar.gz", hash = "sha256:677091de870a80aae844b1ca6134f54652fa2c8c5a52aa396440ac3106e941e6"}, +] + +[package.dependencies] +pyasn1 = ">=0.6.1,<0.7.0" + [[package]] name = "pycodestyle" version = "2.9.1" @@ -2412,10 +2548,9 @@ testing = ["fields", "hunter", "process-tests", "pytest-xdist", "virtualenv"] name = "python-dateutil" version = "2.9.0.post0" description = "Extensions to the standard Python datetime module" -optional = true +optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" groups = ["main"] -markers = "python_version >= \"3.10\" and extra == \"preprocessing\"" files = [ {file = "python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3"}, {file = "python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427"}, @@ -2553,6 +2688,25 @@ urllib3 = ">=1.21.1,<3" socks = ["PySocks (>=1.5.6,!=1.5.7)"] use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] +[[package]] +name = "requests-oauthlib" +version = "2.0.0" +description = "OAuthlib authentication support for Requests." +optional = false +python-versions = ">=3.4" +groups = ["main"] +files = [ + {file = "requests-oauthlib-2.0.0.tar.gz", hash = "sha256:b3dffaebd884d8cd778494369603a9e7b58d29111bf6b41bdc2dcd87203af4e9"}, + {file = "requests_oauthlib-2.0.0-py2.py3-none-any.whl", hash = "sha256:7dd8a5c40426b779b0868c404bdef9768deccf22749cde15852df527e6269b36"}, +] + +[package.dependencies] +oauthlib = ">=3.0.0" +requests = ">=2.0.0" + +[package.extras] +rsa = ["oauthlib[signedtoken] (>=3.0.0)"] + [[package]] name = "rich" version = "13.8.0" @@ -2573,6 +2727,21 @@ typing-extensions = {version = ">=4.0.0,<5.0", markers = "python_version < \"3.9 [package.extras] jupyter = ["ipywidgets (>=7.5.1,<9)"] +[[package]] +name = "rsa" +version = "4.9.1" +description = "Pure-Python RSA implementation" +optional = false +python-versions = "<4,>=3.6" +groups = ["main"] +files = [ + {file = "rsa-4.9.1-py3-none-any.whl", hash = "sha256:68635866661c6836b8d39430f97a996acbd61bfa49406748ea243539fe239762"}, + {file = "rsa-4.9.1.tar.gz", hash = "sha256:e7bdbfdb5497da4c07dfd35530e1a902659db6ff241e39d9953cad06ebd0ae75"}, +] + +[package.dependencies] +pyasn1 = ">=0.1.3" + [[package]] name = "scikit-image" version = "0.25.2" @@ -2891,10 +3060,9 @@ files = [ name = "six" version = "1.17.0" description = "Python 2 and 3 compatibility utilities" -optional = true +optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" groups = ["main"] -markers = "python_version >= \"3.10\" and extra == \"preprocessing\"" files = [ {file = "six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274"}, {file = "six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81"}, @@ -3137,6 +3305,12 @@ files = [ {file = "statsmodels-0.14.5-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5a085d47c8ef5387279a991633883d0e700de2b0acc812d7032d165888627bef"}, {file = "statsmodels-0.14.5-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:9f866b2ebb2904b47c342d00def83c526ef2eb1df6a9a3c94ba5fe63d0005aec"}, {file = "statsmodels-0.14.5-cp313-cp313-win_amd64.whl", hash = "sha256:2a06bca03b7a492f88c8106103ab75f1a5ced25de90103a89f3a287518017939"}, + {file = "statsmodels-0.14.5-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:07c4dad25bbb15864a31b4917a820f6d104bdc24e5ddadcda59027390c3bed9e"}, + {file = "statsmodels-0.14.5-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:babb067c852e966c2c933b79dbb5d0240919d861941a2ef6c0e13321c255528d"}, + {file = "statsmodels-0.14.5-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:110194b137286173cc676d7bad0119a197778de6478fc6cbdc3b33571165ac1e"}, + {file = "statsmodels-0.14.5-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9c8a9c384a60c80731b278e7fd18764364c8817f4995b13a175d636f967823d1"}, + {file = "statsmodels-0.14.5-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:557df3a870a57248df744fdfcc444ecbc5bdbf1c042b8a8b5d8e3e797830dc2a"}, + {file = "statsmodels-0.14.5-cp314-cp314-win_amd64.whl", hash = "sha256:95af7a9c4689d514f4341478b891f867766f3da297f514b8c4adf08f4fa61d03"}, {file = "statsmodels-0.14.5-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:b23b8f646dd78ef5e8d775d879208f8dc0a73418b41c16acac37361ff9ab7738"}, {file = "statsmodels-0.14.5-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:4e5e26b21d2920905764fb0860957d08b5ba2fae4466ef41b1f7c53ecf9fc7fa"}, {file = "statsmodels-0.14.5-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:4a060c7e0841c549c8ce2825fd6687e6757e305d9c11c9a73f6c5a0ce849bb69"}, @@ -3548,6 +3722,42 @@ files = [ {file = "webcolors-24.11.1.tar.gz", hash = "sha256:ecb3d768f32202af770477b8b65f318fa4f566c22948673a977b00d589dd80f6"}, ] +[[package]] +name = "websocket-client" +version = "1.8.0" +description = "WebSocket client for Python with low level API options" +optional = false +python-versions = ">=3.8" +groups = ["main"] +markers = "python_version <= \"3.9\"" +files = [ + {file = "websocket_client-1.8.0-py3-none-any.whl", hash = "sha256:17b44cc997f5c498e809b22cdf2d9c7a9e71c02c8cc2b6c56e7c2d1239bfa526"}, + {file = "websocket_client-1.8.0.tar.gz", hash = "sha256:3239df9f44da632f96012472805d40a23281a991027ce11d2f45a6f24ac4c3da"}, +] + +[package.extras] +docs = ["Sphinx (>=6.0)", "myst-parser (>=2.0.0)", "sphinx-rtd-theme (>=1.1.0)"] +optional = ["python-socks", "wsaccel"] +test = ["websockets"] + +[[package]] +name = "websocket-client" +version = "1.9.0" +description = "WebSocket client for Python with low level API options" +optional = false +python-versions = ">=3.9" +groups = ["main"] +markers = "python_version >= \"3.10\"" +files = [ + {file = "websocket_client-1.9.0-py3-none-any.whl", hash = "sha256:af248a825037ef591efbf6ed20cc5faa03d3b47b9e5a2230a529eeee1c1fc3ef"}, + {file = "websocket_client-1.9.0.tar.gz", hash = "sha256:9e813624b6eb619999a97dc7958469217c3176312b3a16a4bd1bc7e08a46ec98"}, +] + +[package.extras] +docs = ["Sphinx (>=6.0)", "myst-parser (>=2.0.0)", "sphinx_rtd_theme (>=1.1.0)"] +optional = ["python-socks", "wsaccel"] +test = ["pytest", "websockets"] + [[package]] name = "win32-setctime" version = "1.1.0" @@ -3591,4 +3801,4 @@ preprocessing = ["brainles_preprocessing"] [metadata] lock-version = "2.1" python-versions = ">=3.8,<4.0" -content-hash = "8406391fff7ba2f2a396a9e0aac3e19348f3e8e9bdc6d6a3a576f77a9098a1dc" +content-hash = "4cd1580101551d78abe4a19fc29c310f10e74a1723025a49f85c4a15e3d9a4c1" diff --git a/pyproject.toml b/pyproject.toml index c4f9929..2bed7f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ dependencies = [ "nibabel>=5.0.0", "numpy>=1.21.0; python_version<='3.9'", "numpy>=1.26.0; python_version>='3.10'", + "kubernetes>=34.1.0,<35.0.0", ] [project.optional-dependencies] @@ -64,6 +65,7 @@ flake8 = ">=5.0.0" [tool.pytest.ini_options] pythonpath = ["."] + [tool.poetry.group.docs] optional = true diff --git a/tests/core/test_kubernetes.py b/tests/core/test_kubernetes.py new file mode 100644 index 0000000..a0b853c --- /dev/null +++ b/tests/core/test_kubernetes.py @@ -0,0 +1,534 @@ +import io +import tarfile +import base64 +import types +from pathlib import Path +from datetime import datetime, timezone +from unittest.mock import MagicMock + +import pytest + +# Target module +import brats.core.kubernetes as k8s + + +@pytest.fixture +def dummy_algorithm(): + class AddFiles: + def __init__(self): + self.record_id = "12345" + self.param_name = ["weights", "config"] + self.param_path = ["w.bin", "c.yaml"] + + class Meta: + def __init__(self, year): + self.year = year + + class RunArgs: + def __init__(self): + self.docker_image = "alpine:latest" + self.shm_size = "1gb" + self.parameters_file = None + + class Algo: + def __init__(self, year=2024, with_additional=True): + self.meta = Meta(year) + self.run_args = RunArgs() + self.additional_files = AddFiles() if with_additional else None + + return Algo + + +@pytest.fixture +def tmp_tree(tmp_path): + # creates input dir structure with one file + input_dir = tmp_path / "input" + input_dir.mkdir(parents=True, exist_ok=True) + (input_dir / "case1").mkdir(parents=True, exist_ok=True) + f = input_dir / "case1" / "image.nii.gz" + f.write_bytes(b"dummy") + return tmp_path + + +def _mk_pod(name="pod-1", phase="Running", init_running=True, creation_ts=None): + pod = types.SimpleNamespace() + pod.metadata = types.SimpleNamespace() + pod.metadata.name = name + pod.metadata.creation_timestamp = creation_ts or datetime.now(timezone.utc) + pod.status = types.SimpleNamespace() + pod.status.phase = phase + # container_statuses for _observe_job_output + cstat = types.SimpleNamespace() + cstate = types.SimpleNamespace() + cstate.running = True + cstat.name = "job-container" + cstat.state = cstate + pod.status.container_statuses = [cstat] + # init container statuses for run_job wait loop + istat = types.SimpleNamespace() + istat.name = "init-container" + istate = types.SimpleNamespace() + istate.running = True if init_running else None + istat.state = istate + pod.status.init_container_statuses = [istat] if init_running is not None else None + return pod + + +### _build_command_args + + +def test_build_command_args_with_additional_and_params( + dummy_algorithm, monkeypatch, tmp_path +): + algo = dummy_algorithm(year=2024, with_additional=True) + monkeypatch.setattr( + k8s, "_get_parameters_arg", lambda algorithm: " --params=/mlcube_io3/foo.json" + ) + + cmd = k8s._build_command_args( + algorithm=algo, + additional_files_path="/data/weights_dir", + data_path="/data/input", + output_path="/data/output", + mount_path="/data", + ) + + # must include basic paths + assert "--data_path=/data/input" in cmd + assert "--output_path=/data/output" in cmd + # must include additional files args with per-param path + assert "--weights=/data/weights_dir/w.bin" in cmd + assert "--config=/data/weights_dir/c.yaml" in cmd + # must rewrite /mlcube_io3 to /data/parameters + assert "/mlcube_io3" not in cmd + assert "/data/parameters/foo.json" in cmd + + +def test_build_command_args_without_additional_and_params(dummy_algorithm, monkeypatch): + algo = dummy_algorithm(year=2024, with_additional=False) + monkeypatch.setattr(k8s, "_get_parameters_arg", lambda algorithm: "") + + cmd = k8s._build_command_args( + algorithm=algo, + additional_files_path="/ignored", + data_path="/data/input", + output_path="/data/output", + mount_path="/data", + ) + + assert "--data_path=/data/input" in cmd + assert "--output_path=/data/output" in cmd + assert "--weights=" not in cmd + assert "--config=" not in cmd + + +### _execute_command_in_pod + + +def test_execute_command_in_pod(monkeypatch): + mock_core = MagicMock() + monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) + mock_stream = MagicMock(return_value="OK") + monkeypatch.setattr(k8s, "stream", mock_stream) + + out = k8s._execute_command_in_pod( + pod_name="p", namespace="ns", command=["echo", "hi"], container="job-container" + ) + + assert out == "OK" + mock_stream.assert_called_once() + # ensure connect_get_namespaced_pod_exec passed into stream + assert mock_stream.call_args.kwargs.get("_preload_content") is True + + +### _download_additional_files + + +def test_download_additional_files_success(dummy_algorithm, monkeypatch): + algo = dummy_algorithm(year=2024, with_additional=True) + monkeypatch.setattr( + k8s, + "_get_zenodo_metadata_and_archive_url", + lambda record_id: ({"version": "2"}, "http://example/archive.zip"), + ) + exec_calls = [] + + def fake_exec(**kwargs): + exec_calls.append(kwargs) + return "ok" + + monkeypatch.setattr(k8s, "_execute_command_in_pod", lambda **kw: fake_exec(**kw)) + p = k8s._download_additional_files( + algorithm=algo, pod_name="p", namespace="ns", mount_path="/data" + ) + assert str(p) == "/data/12345_v2" + assert exec_calls and exec_calls[0]["container"] == "init-container" + + +def test_download_additional_files_zenodo_failure(dummy_algorithm, monkeypatch): + algo = dummy_algorithm(year=2024, with_additional=True) + monkeypatch.setattr( + k8s, "_get_zenodo_metadata_and_archive_url", lambda record_id: None + ) + with pytest.raises(k8s.ZenodoException): + k8s._download_additional_files( + algorithm=algo, pod_name="p", namespace="ns", mount_path="/data" + ) + + +### _check_files_in_pod + + +def test_check_files_in_pod_uploads_missing(monkeypatch, tmp_tree): + # Simulate ls output saying missing + monkeypatch.setattr( + k8s, "_execute_command_in_pod", lambda **kw: "No such file or directory" + ) + uploaded = {"called": False, "args": None} + + def fake_upload(**kwargs): + uploaded["called"] = True + uploaded["args"] = kwargs + + monkeypatch.setattr(k8s, "_upload_files_to_pod", lambda **kw: fake_upload(**kw)) + + k8s._check_files_in_pod( + pod_name="p", namespace="ns", paths=[tmp_tree / "input"], mount_path="/data" + ) + + assert uploaded["called"] is True + assert uploaded["args"]["parent_dir"] == "input" + + +### _download_folder_from_pod + + +def test_download_folder_from_pod(monkeypatch, tmp_path): + # Create a tar stream that contains one file "foo.txt" + tar_bytes = io.BytesIO() + with tarfile.open(fileobj=tar_bytes, mode="w") as tar: + data = io.BytesIO(b"hello") + info = tarfile.TarInfo(name="foo.txt") + info.size = len(b"hello") + tar.addfile(info, data) + tar_bytes.seek(0) + b64 = base64.b64encode(tar_bytes.getvalue()) + + # Make fake streaming response + class FakeResp: + def __init__(self): + self._stdout_chunks = [b64] + self._stderr_chunks = [] + self._open = True + + def is_open(self): + return self._open + + def update(self, timeout=1): + # close after delivering stdout + self._open = False + + def peek_stdout(self): + return bool(self._stdout_chunks) + + def read_stdout(self): + return self._stdout_chunks.pop(0) + + def peek_stderr(self): + return False + + def read_stderr(self): + return "" + + def close(self): + self._open = False + + monkeypatch.setattr(k8s, "_execute_command_in_pod", lambda **kw: FakeResp()) + + k8s._download_folder_from_pod( + pod_name="p", + namespace="ns", + container="finalizer-container", + remote_paths=[Path("/output")], + local_base_dir=tmp_path, + ) + + # Expect foo.txt extracted into tmp_path + expect = tmp_path / "foo.txt" + assert expect.exists() + assert expect.read_text() == "hello" + + +### _upload_files_to_pod + + +def test_upload_files_to_pod(monkeypatch, tmp_path): + f = tmp_path / "a.txt" + f.write_text("hi") + + class FakeResp: + def __init__(self): + self.stdin = io.BytesIO() + + def write_stdin(self, data): + # consume some bytes + assert isinstance(data, (bytes, bytearray)) + + def close(self): + pass + + monkeypatch.setattr(k8s, "_execute_command_in_pod", lambda **kw: FakeResp()) + + k8s._upload_files_to_pod( + pod_name="p", + namespace="ns", + paths=[f], + mount_path="/data", + relative_to=tmp_path, + parent_dir="input", + ) + + +### _create_namespaced_pvc + + +def test_create_pvc_skips_if_exists(monkeypatch): + mock_core = MagicMock() + pvc = types.SimpleNamespace() + pvc.metadata = types.SimpleNamespace(name="my-pvc") + mock_core.list_namespaced_persistent_volume_claim.return_value = ( + types.SimpleNamespace(items=[pvc]) + ) + monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) + + k8s._create_namespaced_pvc("my-pvc", "default", "5Gi", None) + mock_core.create_namespaced_persistent_volume_claim.assert_not_called() + + +def test_create_pvc_creates_with_and_without_storage_class(monkeypatch): + mock_core = MagicMock() + mock_core.list_namespaced_persistent_volume_claim.return_value = ( + types.SimpleNamespace(items=[]) + ) + monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) + + k8s._create_namespaced_pvc("p1", "ns", "1Gi", None) + k8s._create_namespaced_pvc("p2", "ns", "2Gi", "fast") + + assert mock_core.create_namespaced_persistent_volume_claim.call_count == 2 + + +### _create_finalizer_job + + +def test_create_finalizer_job(monkeypatch): + mock_batch = MagicMock() + mock_core = MagicMock() + + # No existing jobs + mock_batch.list_namespaced_job.return_value = types.SimpleNamespace(items=[]) + # pods appear after first poll + pod = _mk_pod(name="final-pod", phase="Running", init_running=None) + mock_core.list_namespaced_pod.side_effect = [ + types.SimpleNamespace(items=[]), + types.SimpleNamespace(items=[pod]), + ] + + monkeypatch.setattr(k8s.client, "BatchV1Api", lambda: mock_batch) + monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) + + name = k8s._create_finalizer_job( + job_name="job-final", namespace="ns", pvc_name="pvc", mount_path="/data" + ) + assert name == "final-pod" + assert mock_batch.create_namespaced_job.call_count == 1 + + +### _create_namespaced_job + + +def test_create_namespaced_job_deletes_old_pod_and_creates_new(monkeypatch): + mock_batch = MagicMock() + mock_core = MagicMock() + + # Existing job with same name to be deleted + mock_batch.list_namespaced_job.return_value = types.SimpleNamespace( + items=[types.SimpleNamespace(metadata=types.SimpleNamespace(name="job-1"))] + ) + + # Existing pod to be deleted + old_pod = _mk_pod(name="old-pod") + mock_core.list_namespaced_pod.return_value = types.SimpleNamespace(items=[old_pod]) + + # When polling for new pod, first empty then one appears + new_pod = _mk_pod(name="new-pod") + mock_core.list_namespaced_pod.side_effect = [ + types.SimpleNamespace(items=[old_pod]), # for deletion scan + types.SimpleNamespace(items=[]), # first poll + types.SimpleNamespace(items=[new_pod]), # second poll -> found + ] + + monkeypatch.setattr(k8s.client, "BatchV1Api", lambda: mock_batch) + monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) + + name = k8s._create_namespaced_job( + job_name="job-1", + namespace="ns", + pvc_name="pvc", + image="alpine:latest", + device_requests=[], + pv_mounts={"pvc": "/data"}, + args=["echo", "hi"], + shm_size="1gb", + user=None, + ) + + assert name == "new-pod" + assert mock_batch.delete_namespaced_job.call_count == 1 + assert mock_core.delete_namespaced_pod.call_count == 1 + assert mock_batch.create_namespaced_job.call_count == 1 + + +### run_job (two branches) + + +def test_run_job_year_2024_flow(monkeypatch, tmp_tree, tmp_path, dummy_algorithm): + algo = dummy_algorithm(year=2024, with_additional=True) + + # Config load + monkeypatch.setattr(k8s.config, "load_kube_config", lambda: None) + + # Helper functions + monkeypatch.setattr(k8s, "_log_algorithm_info", lambda **kw: None) + monkeypatch.setattr(k8s, "_handle_device_requests", lambda **kw: []) + monkeypatch.setattr(k8s, "_get_container_user", lambda **kw: None) + monkeypatch.setattr(k8s, "_get_parameters_arg", lambda algorithm=None: "") + monkeypatch.setattr( + k8s, + "_get_zenodo_metadata_and_archive_url", + lambda record_id: ({"version": "1"}, "http://example/zip"), + ) + monkeypatch.setattr(k8s, "PARAMETERS_DIR", tmp_tree / "params") + (tmp_tree / "params").mkdir(exist_ok=True) + monkeypatch.setattr(k8s, "get_dummy_path", lambda: Path("/dummy")) + monkeypatch.setattr(k8s, "_sanity_check_output", lambda **kw: None) + + # PVC creation + monkeypatch.setattr(k8s, "_create_namespaced_pvc", lambda **kw: None) + + # Job + pod lifecycle + def fake_create_job(**kwargs): + return "job-pod" + + monkeypatch.setattr( + k8s, "_create_namespaced_job", lambda **kw: fake_create_job(**kw) + ) + + # CoreV1Api read_namespaced_pod progression + mock_core = MagicMock() + # First loop waits until init container running + mock_core.read_namespaced_pod.return_value = _mk_pod( + name="job-pod", phase="Running", init_running=True + ) + monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) + + # File checking + monkeypatch.setattr(k8s, "_check_files_in_pod", lambda **kw: None) + + # Additional files download and parameters upload + monkeypatch.setattr( + k8s, "_download_additional_files", lambda **kw: Path("/data/12345_v1") + ) + monkeypatch.setattr(k8s, "_upload_files_to_pod", lambda **kw: None) + + # Exec commands in init container + monkeypatch.setattr(k8s, "_execute_command_in_pod", lambda **kw: "ok") + + # Observe logs + monkeypatch.setattr(k8s, "_observe_job_output", lambda **kw: "LOGS") + + mock_core.read_namespaced_pod.side_effect = [ + _mk_pod(name="job-pod", phase="Running", init_running=True), # wait loop + _mk_pod( + name="job-pod", phase="Succeeded", init_running=True + ), # completion loop + ] + + # Finalizer job + monkeypatch.setattr(k8s, "_create_finalizer_job", lambda **kw: "final-pod") + monkeypatch.setattr(k8s, "_download_folder_from_pod", lambda **kw: None) + + out_dir = tmp_path / "out" + k8s.run_job( + algorithm=algo, + data_path=tmp_tree / "input", + output_path=out_dir, + cuda_devices="", + force_cpu=True, + internal_external_name_map=None, + namespace="default", + pvc_name="mypvc", + pvc_storage_size="1Gi", + pvc_storage_class=None, + job_name="myjob", + data_mount_path="/data", + ) + + # Ensure output dir created + assert out_dir.exists() + + +def test_run_job_year_2025_flow(monkeypatch, tmp_tree, tmp_path, dummy_algorithm): + algo = dummy_algorithm(year=2025, with_additional=False) + monkeypatch.setattr(k8s.config, "load_kube_config", lambda: None) + monkeypatch.setattr(k8s, "_log_algorithm_info", lambda **kw: None) + monkeypatch.setattr(k8s, "_handle_device_requests", lambda **kw: []) + monkeypatch.setattr(k8s, "_get_container_user", lambda **kw: None) + monkeypatch.setattr(k8s, "_get_parameters_arg", lambda algorithm=None: "") + monkeypatch.setattr(k8s, "_sanity_check_output", lambda **kw: None) + # PVCs + created = [] + + def fake_create_pvc(**kw): + created.append(kw["pvc_name"]) + + monkeypatch.setattr( + k8s, "_create_namespaced_pvc", lambda **kw: fake_create_pvc(**kw) + ) + + # Job creation + monkeypatch.setattr(k8s, "_create_namespaced_job", lambda **kw: "job-pod") + + # Pod running/completion + mock_core = MagicMock() + mock_core.read_namespaced_pod.side_effect = [ + _mk_pod(name="job-pod", phase="Running", init_running=True), + _mk_pod(name="job-pod", phase="Succeeded", init_running=True), + ] + monkeypatch.setattr(k8s.client, "CoreV1Api", lambda: mock_core) + + # File checks skipped in behavior but called; mock anyway + monkeypatch.setattr(k8s, "_check_files_in_pod", lambda **kw: None) + monkeypatch.setattr(k8s, "_execute_command_in_pod", lambda **kw: "ok") + monkeypatch.setattr(k8s, "_observe_job_output", lambda **kw: "LOGS") + monkeypatch.setattr(k8s, "_create_finalizer_job", lambda **kw: "final-pod") + monkeypatch.setattr(k8s, "_download_folder_from_pod", lambda **kw: None) + + out_dir = tmp_path / "out2" + k8s.run_job( + algorithm=algo, + data_path=tmp_tree / "input", + output_path=out_dir, + cuda_devices="", + force_cpu=True, + namespace="default", + pvc_name="mypvc2", + pvc_storage_size="1Gi", + job_name="myjob2", + data_mount_path="/data", + ) + + # For year > 2024, an extra output PVC should be created + assert "mypvc2" in created + assert "mypvc2-output" in created + assert out_dir.exists()