From ac33bf32cc4d9a8835b9ed47666206ba520a2632 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 11 Nov 2024 10:29:39 +0100 Subject: [PATCH 01/15] Replace types-pkg-resources with types-setuptools --- dev-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 5351cb14..1a36a4e5 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -21,7 +21,7 @@ flake8 mypy<=1.0.1 # https://github.com/pydantic/pydantic/issues/5192 types-paramiko -types-pkg-resources +types-setuptools types-PyYAML types-pycurl types-requests From 098ef6d984d97306966b194136edc5e107b37e60 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 29 Apr 2024 13:05:32 +0200 Subject: [PATCH 02/15] Draft of JSON manifest collector This should be the general strategy for collecting input and output files for ARC, DIRAC, AWS batch etc. --- pulsar/client/action_mapper.py | 44 +++++++++++++++++++ pulsar/client/client.py | 9 ++-- pulsar/client/staging/down.py | 6 +++ pulsar/client/staging/up.py | 5 +++ test/action_mapper_test.py | 11 +++++ ..._test_cli_submit.py => test_cli_submit.py} | 0 test/transfer_action_test.py | 30 ++++++++++++- 7 files changed, 100 insertions(+), 5 deletions(-) rename test/{integration_test_cli_submit.py => test_cli_submit.py} (100%) diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index ce2f54ec..ff02af64 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -190,6 +190,9 @@ def __init__(self, client=None, config=None): self.ssh_port = config.get("ssh_port", None) self.mappers = mappers_from_dicts(config.get("paths", [])) self.files_endpoint = config.get("files_endpoint", None) + self.actions = [] + # Might want to make the working directory available here so that we know where to place archive + # for archive action def action(self, source, type, mapper=None): path = source.get("path", None) @@ -202,8 +205,12 @@ def action(self, source, type, mapper=None): action_kwds = mapper.action_kwds action = action_class(source, file_lister=file_lister, **action_kwds) self.__process_action(action, type) + self.actions.append(action) return action + def finalize(self): + return [_ for _ in (action.finalize() for action in self.actions) if _] + def unstructured_mappers(self): """ Return mappers that will map 'unstructured' files (i.e. go beyond mapping inputs, outputs, and config files). @@ -342,6 +349,9 @@ def _extend_base_dict(self, **kwds): base_dict.update(**kwds) return base_dict + def finalize(self): + pass + def to_dict(self): return self._extend_base_dict() @@ -513,6 +523,38 @@ def write_from_path(self, pulsar_path): tus_upload_file(self.url, pulsar_path) +class JsonTransferAction(BaseAction): + """ + This action indicates that the pulsar server should create a JSON manifest that can be used to stage files by an + external system that can stage files in and out of the compute environment. + """ + inject_url = True + whole_directory_transfer_supported = True + action_type = "json_transfer" + staging = STAGING_ACTION_REMOTE + + def __init__(self, source, file_lister=None, url=None): + super().__init__(source, file_lister) + self.url = url + self._path = None + + @classmethod + def from_dict(cls, action_dict): + return JsonTransferAction(source=action_dict["source"], url=action_dict["url"]) + + def to_dict(self): + return self._extend_base_dict(url=self.url) + + def write_to_path(self, path): + self._path = path + + def write_from_path(self, pulsar_path: str): + self._path = pulsar_path + + def finalize(self): + return {"url": self.url, "path": self.path} + + class RemoteObjectStoreCopyAction(BaseAction): """ """ @@ -664,6 +706,7 @@ def write_to_path(self, path): DICTIFIABLE_ACTION_CLASSES = [ + JsonTransferAction, RemoteCopyAction, RemoteTransferAction, RemoteTransferTusAction, @@ -844,6 +887,7 @@ def unstructured_map(self, path): ACTION_CLASSES: List[Type[BaseAction]] = [ NoneAction, + JsonTransferAction, RewriteAction, TransferAction, CopyAction, diff --git a/pulsar/client/client.py b/pulsar/client/client.py index e28d84ea..939c7ef0 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -168,7 +168,7 @@ def __init__(self, destination_params, job_id, job_manager_interface): self.job_manager_interface = job_manager_interface def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None): + dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): """ Queue up the execution of the supplied `command_line` on the remote server. Called launch for historical reasons, should be renamed to @@ -405,7 +405,7 @@ def _build_status_request_message(self): class MessageJobClient(BaseMessageJobClient): def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None): + dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): """ """ launch_params = self._build_setup_message( @@ -439,7 +439,7 @@ def __init__(self, destination_params, job_id, client_manager, shell): self.shell = shell def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None): + dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): """ """ launch_params = self._build_setup_message( @@ -491,7 +491,8 @@ def launch( dynamic_file_sources=None, container_info=None, token_endpoint=None, - pulsar_app_config=None + pulsar_app_config=None, + staging_manifest=None ) -> Optional[ExternalId]: """ """ diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index ea8f8aa1..82f9d348 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -72,6 +72,9 @@ def collect(self): self.__collect_other_working_directory_files() self.__collect_metadata_directory_files() self.__collect_job_directory_files() + # Give actions that require a final action, like those that write a manifest, to write out their content + self.__finalize_action_mapper() + # finalize collection here for executors that need this ? return self.exception_tracker.collection_failure_exceptions def __collect_working_directory_outputs(self): @@ -134,6 +137,9 @@ def __collect_job_directory_files(self): 'output_jobdir', ) + def __finalize_action_mapper(self): + self.action_mapper.finalize() + def __realized_dynamic_file_source_references(self): references = {"filename": [], "extra_files": []} diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 9f08bef8..57b7b300 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -71,6 +71,11 @@ def submit_job(client, client_job_description, job_config=None): # it needs to be in the response to Pulsar even Pulsar is inititing staging actions launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources launch_kwds["token_endpoint"] = client.token_endpoint + + staging_manifest = file_stager.action_mapper.finalize() + if staging_manifest: + launch_kwds["staging_manifest"] = staging_manifest + # for pulsar modalities that skip the explicit "setup" step, give them a chance to set an external # id from the submission process (e.g. to TES). launch_response = client.launch(**launch_kwds) diff --git a/test/action_mapper_test.py b/test/action_mapper_test.py index 2131cb65..7783696f 100644 --- a/test/action_mapper_test.py +++ b/test/action_mapper_test.py @@ -4,6 +4,17 @@ ) +def test_action_mapper_finalization(): + client = _client("json_transfer") + mapper = FileActionMapper(client) + mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input') + mapper.action({'path': '/the_file'}, 'input') + mapper_summary = mapper.finalize() + assert len(mapper_summary) == 2 + assert mapper_summary[0]["path"] == '/opt/galaxy/tools/filters/catWrapper.py' + assert mapper_summary[1]["path"] == '/the_file' + + def test_endpoint_validation(): client = _min_client("remote_transfer") mapper = FileActionMapper(client) diff --git a/test/integration_test_cli_submit.py b/test/test_cli_submit.py similarity index 100% rename from test/integration_test_cli_submit.py rename to test/test_cli_submit.py diff --git a/test/transfer_action_test.py b/test/transfer_action_test.py index 30b927f9..0cf3053d 100644 --- a/test/transfer_action_test.py +++ b/test/transfer_action_test.py @@ -1,7 +1,35 @@ import os from .test_utils import files_server -from pulsar.client.action_mapper import RemoteTransferAction +from pulsar.client.action_mapper import ( + JsonTransferAction, + RemoteTransferAction, +) + + +def test_write_to_path_json(): + with files_server() as (server, directory): + from_path = os.path.join(directory, "remote_get") + + to_path = os.path.join(directory, "local_get") + url = server.application_url + "?path=%s" % from_path + action = JsonTransferAction({"path": to_path}, url=url) + action.write_to_path(to_path) + assert action.path == to_path + assert action.url == url + assert action.finalize() == {"path": to_path, "url": url} + + +def test_write_from_file_json(): + with files_server() as (server, directory): + from_path = os.path.join(directory, "local_post") + to_path = os.path.join(directory, "remote_post") + url = server.application_url + "?path=%s" % to_path + action = JsonTransferAction({"path": to_path}, url=url) + action.write_from_path(from_path) + assert action.path == to_path + assert action.url == url + assert action.finalize() == {"path": to_path, "url": url} def test_write_to_file(): From f0a467719c159cd2c27277a4fcc8e012439dcc07 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Tue, 12 Nov 2024 15:47:37 +0100 Subject: [PATCH 03/15] Discriminate input from output in staging manifest --- pulsar/client/action_mapper.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index ff02af64..91b2dfa1 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -536,7 +536,8 @@ class JsonTransferAction(BaseAction): def __init__(self, source, file_lister=None, url=None): super().__init__(source, file_lister) self.url = url - self._path = None + self._from_path = None + self._to_path = None @classmethod def from_dict(cls, action_dict): @@ -546,13 +547,16 @@ def to_dict(self): return self._extend_base_dict(url=self.url) def write_to_path(self, path): - self._path = path + self._to_path = path def write_from_path(self, pulsar_path: str): - self._path = pulsar_path + self._from_path = pulsar_path def finalize(self): - return {"url": self.url, "path": self.path} + if self._to_path: + return {"url": self.url, "to_path": self._to_path} + else: + return {"url": self.url, "from_path": self._from_path} class RemoteObjectStoreCopyAction(BaseAction): From 1cf6b5c3fa794587bc6a664e676f213684ec9a06 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 13 Nov 2024 15:51:24 +0100 Subject: [PATCH 04/15] Implement output collection --- pulsar/managers/base/__init__.py | 5 ++- pulsar/managers/staging/post.py | 6 +-- pulsar/scripts/collect_output_manifest.py | 45 +++++++++++++++++++++++ setup.py | 1 + 4 files changed, 52 insertions(+), 5 deletions(-) create mode 100644 pulsar/scripts/collect_output_manifest.py diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py index 6ca07e91..b7d323a7 100644 --- a/pulsar/managers/base/__init__.py +++ b/pulsar/managers/base/__init__.py @@ -232,7 +232,7 @@ class JobDirectory(RemoteJobDirectory): def __init__( self, staging_directory, - job_id, + job_id=None, lock_manager=None, directory_maker=None ): @@ -240,7 +240,8 @@ def __init__( self._directory_maker = directory_maker or DirectoryMaker() self.lock_manager = lock_manager # Assert this job id isn't hacking path somehow. - assert job_id == basename(job_id) + if job_id: + assert job_id == basename(job_id) def _job_file(self, name): return os.path.join(self.job_directory, name) diff --git a/pulsar/managers/staging/post.py b/pulsar/managers/staging/post.py index 6ca59390..3c5fdd32 100644 --- a/pulsar/managers/staging/post.py +++ b/pulsar/managers/staging/post.py @@ -20,14 +20,14 @@ def postprocess(job_directory, action_executor, was_cancelled): staging_config = job_directory.load_metadata("launch_config").get("remote_staging", None) else: staging_config = None - collected = __collect_outputs(job_directory, staging_config, action_executor, was_cancelled) + file_action_mapper, collected = _collect_outputs(job_directory, staging_config, action_executor, was_cancelled) return collected finally: job_directory.write_file("postprocessed", "") return False -def __collect_outputs(job_directory, staging_config, action_executor, was_cancelled): +def _collect_outputs(job_directory, staging_config, action_executor, was_cancelled): collected = True if "action_mapper" in staging_config: file_action_mapper = action_mapper.FileActionMapper(config=staging_config["action_mapper"]) @@ -39,7 +39,7 @@ def __collect_outputs(job_directory, staging_config, action_executor, was_cancel if collection_failure_exceptions: log.warn("Failures collecting results %s" % collection_failure_exceptions) collected = False - return collected + return file_action_mapper, collected def realized_dynamic_file_sources(job_directory): diff --git a/pulsar/scripts/collect_output_manifest.py b/pulsar/scripts/collect_output_manifest.py new file mode 100644 index 00000000..72a7a1a4 --- /dev/null +++ b/pulsar/scripts/collect_output_manifest.py @@ -0,0 +1,45 @@ +import argparse +import json + +from pulsar.managers.base import JobDirectory +from pulsar.managers.staging.post import _collect_outputs +from pulsar.managers.util.retry import RetryActionExecutor + + +def make_parser(): + """Construct an argument parser used to call the script from the command line.""" + + parser = argparse.ArgumentParser(description="Create output staging manifest") + + parser.add_argument("--job-directory") + parser.add_argument("--staging-config-path", help="Path to staging config JSON file") + parser.add_argument("--output-manifest-path") + + return parser + + +def collect_outputs(job_directory: str, staging_config_path: str, output_manifest_path: str): + job_directory_ = JobDirectory(job_directory) + with open(staging_config_path) as staging_fh: + staging_config = json.load(staging_fh) + + action_mapper, _ = _collect_outputs( + job_directory_, + staging_config=staging_config, + action_executor=RetryActionExecutor(), + was_cancelled=lambda: False + ) + new_manifest = action_mapper.finalize() + with open(output_manifest_path, "w") as manifest_fh: + json.dump(new_manifest, manifest_fh) + + +def main(): + """Run the script from the command line.""" + parser = make_parser() + args = parser.parse_args() + collect_outputs(args.job_directory, args.staging_config_path, args.output_manifest_path) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index 68faeed3..843f02cd 100644 --- a/setup.py +++ b/setup.py @@ -92,6 +92,7 @@ pulsar-submit=pulsar.scripts.submit:main pulsar-finish=pulsar.scripts.finish:main pulsar-run=pulsar.scripts.run:main + pulsar-create-output-manifest=pulsar.scripts.collect_output_manifest:main _pulsar-conda-init=pulsar.scripts._conda_init:main _pulsar-configure-slurm=pulsar.scripts._configure_slurm:main _pulsar-configure-galaxy-cvmfs=pulsar.scripts._configure_galaxy_cvmfs:main From f924f1aed85f216edc27c43a55fc11a219fcd11d Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 13 Nov 2024 16:19:53 +0100 Subject: [PATCH 05/15] Track file_type in transfer action --- pulsar/client/action_mapper.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index 91b2dfa1..10fdc186 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -21,6 +21,7 @@ Any, Dict, List, + Optional, Type, ) from urllib.parse import urlencode @@ -203,7 +204,7 @@ def action(self, source, type, mapper=None): if mapper: file_lister = mapper.file_lister action_kwds = mapper.action_kwds - action = action_class(source, file_lister=file_lister, **action_kwds) + action = action_class(source, file_lister=file_lister, file_type=type, **action_kwds) self.__process_action(action, type) self.actions.append(action) return action @@ -272,6 +273,7 @@ def __process_action(self, action, file_type): """ Extension point to populate extra action information after an action has been created. """ + action.file_type = file_type if getattr(action, "inject_url", False): self.__inject_url(action, file_type) if getattr(action, "inject_ssh_properties", False): @@ -307,10 +309,12 @@ class BaseAction: whole_directory_transfer_supported = False action_spec: Dict[str, Any] = {} action_type: str + file_type: Optional[str] = None - def __init__(self, source, file_lister=None): + def __init__(self, source, file_lister=None, file_type=None): self.source = source self.file_lister = file_lister or DEFAULT_FILE_LISTER + self.file_type = file_type @property def path(self): @@ -400,8 +404,8 @@ class RewriteAction(BaseAction): action_type = "rewrite" staging = STAGING_ACTION_NONE - def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None): - super().__init__(source, file_lister=file_lister) + def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None, file_type=None): + super().__init__(source, file_lister=file_lister, file_type=file_type) self.source_directory = source_directory self.destination_directory = destination_directory @@ -477,8 +481,8 @@ class RemoteTransferAction(BaseAction): action_type = "remote_transfer" staging = STAGING_ACTION_REMOTE - def __init__(self, source, file_lister=None, url=None): - super().__init__(source, file_lister=file_lister) + def __init__(self, source, file_lister=None, url=None, file_type=None): + super().__init__(source, file_lister=file_lister, file_type=file_type) self.url = url def to_dict(self): @@ -505,8 +509,8 @@ class RemoteTransferTusAction(BaseAction): action_type = "remote_transfer_tus" staging = STAGING_ACTION_REMOTE - def __init__(self, source, file_lister=None, url=None): - super().__init__(source, file_lister=file_lister) + def __init__(self, source, file_lister=None, url=None, file_type=None): + super().__init__(source, file_lister=file_lister, file_type=file_type) self.url = url def to_dict(self): @@ -533,8 +537,8 @@ class JsonTransferAction(BaseAction): action_type = "json_transfer" staging = STAGING_ACTION_REMOTE - def __init__(self, source, file_lister=None, url=None): - super().__init__(source, file_lister) + def __init__(self, source, file_lister=None, url=None, file_type=None): + super().__init__(source, file_lister, file_type) self.url = url self._from_path = None self._to_path = None From d8cf083bafc41d2d4a4ed0c1bb5072fcb4710c3b Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 13 Nov 2024 16:24:21 +0100 Subject: [PATCH 06/15] Include only inputs in staging manifest --- pulsar/client/staging/up.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 57b7b300..3b8f8eaa 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -72,7 +72,15 @@ def submit_job(client, client_job_description, job_config=None): launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources launch_kwds["token_endpoint"] = client.token_endpoint - staging_manifest = file_stager.action_mapper.finalize() + # populate `to_path` + staging_manifest = [] + for action in file_stager.action_mapper.actions: + if action.file_type not in ("output", "output_workdir"): + name = basename(action.path) + path = file_stager.job_directory.calculate_path(name, action.file_type) + action.write_to_path(path) + staging_manifest.append(action.finalize()) + if staging_manifest: launch_kwds["staging_manifest"] = staging_manifest From 3b4577dca4f9ff019c5e6aa82201d40c2f3717f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Thu, 14 Nov 2024 12:40:58 +0100 Subject: [PATCH 07/15] Change base image in coexecutor Dockerfile Change base image from `conda/miniconda3` (based off Debian Stretch) to `python:3.12-bookworm`. Miniconda is not required in the base image. Add the Galaxy Depot repository, which provides SLURM DRMAA packages for Debian Buster and newer releases. Do not install the package `apt-transport-https`, it is now a dummy package, see https://packages.debian.org/en/bookworm/apt-transport-https. Install the package `slurm` instead of `slurm-llnl`. Newer versions of the `munge` package include the binary `/usr/sbin/mungekey` instead of `/usr/sbin/create-munge-key`. Nevertheless, the key seems to be created automatically when installing the package, as running `mungekey` yields 'mungekey: Error: Failed to create "/etc/munge/munge.key": File exists'. --- docker/coexecutor/Dockerfile | 57 ++++++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 19 deletions(-) diff --git a/docker/coexecutor/Dockerfile b/docker/coexecutor/Dockerfile index e94d9a49..88a11bd4 100644 --- a/docker/coexecutor/Dockerfile +++ b/docker/coexecutor/Dockerfile @@ -1,26 +1,45 @@ -FROM conda/miniconda3 +FROM python:3.12-bookworm ENV PYTHONUNBUFFERED 1 +ENV PIP_ROOT_USER_ACTION=ignore ENV DEBIAN_FRONTEND noninteractive ENV PULSAR_CONFIG_CONDA_PREFIX /usr/local +# set up Galaxy Depot repository (provides SLURM DRMAA packages for Debian Buster and newer releases) +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates curl gnupg \ + && apt-get clean && rm -rf /var/lib/apt/lists/* \ + && curl -fsSL "http://keyserver.ubuntu.com/pks/lookup?op=get&search=0x18381AC8832160AF" | gpg --dearmor -o /etc/apt/trusted.gpg.d/galaxy-depot.gpg \ + && echo "deb https://depot.galaxyproject.org/apt/ $(bash -c '. /etc/os-release; echo ${VERSION_CODENAME:-bookworm}') main" | tee /etc/apt/sources.list.d/galaxy-depot.list + +# set up Debian Bullseye repository (and use it only for libslurm36, needed by slurm-drmaa1, and slurm) +RUN echo "deb http://deb.debian.org/debian/ bullseye main" > /etc/apt/sources.list.d/bullseye.list && \ +cat < /etc/apt/preferences.d/bullseye.pref +Package: * +Pin: release n=bullseye +Pin-Priority: -1 + +Package: libslurm36, slurm +Pin: release n=bullseye +Pin-Priority: 100 +EOF + +# set up CVMFS repository +RUN apt-get update \ + && apt-get install -y --no-install-recommends lsb-release wget \ + && wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \ + && dpkg -i cvmfs-release-latest_all.deb && rm -f cvmfs-release-latest_all.deb + # wget, gcc, pip - to build and install Pulsar. # bzip2 for Miniconda. # TODO: pycurl stuff... -RUN apt-get update \ - && apt-get install -y --no-install-recommends apt-transport-https \ +RUN apt-get update && apt-get install -y --no-install-recommends \ # Install CVMFS client - && apt-get install -y --no-install-recommends lsb-release wget \ - && wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \ - && dpkg -i cvmfs-release-latest_all.deb \ - && rm -f cvmfs-release-latest_all.deb \ + cvmfs cvmfs-config-default \ # Install packages - && apt-get update \ - && apt-get install -y --no-install-recommends gcc \ - libcurl4-openssl-dev \ - cvmfs cvmfs-config-default \ - slurm-llnl slurm-drmaa-dev \ - bzip2 \ + gcc libcurl4-openssl-dev \ + munge libmunge-dev slurm slurm-drmaa-dev \ + bzip2 \ # Install Pulsar Python requirements && pip install --no-cache-dir -U pip \ && pip install --no-cache-dir drmaa wheel kombu pykube pycurl \ @@ -28,14 +47,14 @@ RUN apt-get update \ # Remove build deps and cleanup && apt-get -y remove gcc wget lsb-release \ && apt-get -y autoremove \ - && apt-get autoclean \ - && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log \ - && /usr/sbin/create-munge-key + && apt-get clean && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log + +ADD pulsar_app-*-py2.py3-none-any.whl / -ADD pulsar_app-*-py2.py3-none-any.whl /pulsar_app-*-py2.py3-none-any.whl +SHELL ["/bin/bash", "-c"] -RUN pip install --upgrade setuptools && pip install pyOpenSSL --upgrade && pip install cryptography --upgrade -RUN pip install --no-cache-dir /pulsar_app-*-py2.py3-none-any.whl[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl +RUN pip install --no-cache-dir --upgrade setuptools pyOpenSSL cryptography +RUN pip install --no-cache-dir "$(echo /pulsar_app-*-py2.py3-none-any.whl)"[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl RUN pip install --upgrade 'importlib-metadata<5.0' RUN _pulsar-configure-galaxy-cvmfs RUN _pulsar-conda-init --conda_prefix=/pulsar_dependencies/conda From f217ea246ab3ea662c94cf26542f2e99cd354d00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Thu, 14 Nov 2024 13:05:26 +0100 Subject: [PATCH 08/15] Build Pulsar wheel in coexecutor Dockerfile Build wheel automatically when building the Docker image. Exclude the source code from the output image through a multistage build. --- docker/coexecutor/Dockerfile | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/docker/coexecutor/Dockerfile b/docker/coexecutor/Dockerfile index 88a11bd4..9d3a5374 100644 --- a/docker/coexecutor/Dockerfile +++ b/docker/coexecutor/Dockerfile @@ -1,3 +1,22 @@ +# use the root of the repository as context, i.e. `docker build . -f ./docker/coexecutor/Dockerfile` + +FROM python:3.12-bookworm as build_wheel + +ENV PIP_ROOT_USER_ACTION=ignore + +WORKDIR /build + +# install requirements +COPY requirements.txt . +COPY dev-requirements.txt . +RUN pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir setuptools -r requirements.txt -r dev-requirements.txt + +# build Pulsar wheel +COPY . . +RUN python setup.py sdist bdist_wheel + + FROM python:3.12-bookworm ENV PYTHONUNBUFFERED 1 @@ -49,7 +68,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ && apt-get -y autoremove \ && apt-get clean && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log -ADD pulsar_app-*-py2.py3-none-any.whl / +COPY --from=build_wheel /build/dist/pulsar_app-*-py2.py3-none-any.whl / SHELL ["/bin/bash", "-c"] From 7c8371ae5e4e606eaed33506d6554bf9f0051af7 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 23 Jan 2025 11:21:26 +0100 Subject: [PATCH 09/15] Move `get_pulsar_app_config()` and `_ensure_manager_config()` from `CoexecutionLaunchMixin` to `BaseRemoteConfiguredJobClient` --- pulsar/client/client.py | 148 +++++++++++++++++++++++++--------------- pulsar/client/util.py | 2 +- 2 files changed, 93 insertions(+), 57 deletions(-) diff --git a/pulsar/client/client.py b/pulsar/client/client.py index 939c7ef0..273dc1e7 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -374,6 +374,90 @@ def _build_setup_message(self, command_line, dependencies_description, env, remo launch_params["setup_params"] = setup_params return launch_params + def get_pulsar_app_config( + self, + pulsar_app_config, + container, + wait_after_submission, + manager_name, + manager_type, + dependencies_description, + ): + + pulsar_app_config = pulsar_app_config or {} + manager_config = self._ensure_manager_config( + pulsar_app_config, + manager_name, + manager_type, + ) + + if ( + "staging_directory" not in manager_config and "staging_directory" not in pulsar_app_config + ): + pulsar_app_config["staging_directory"] = CONTAINER_STAGING_DIRECTORY + + if self.amqp_key_prefix: + pulsar_app_config["amqp_key_prefix"] = self.amqp_key_prefix + + if "monitor" not in manager_config: + manager_config["monitor"] = ( + MonitorStyle.BACKGROUND.value + if wait_after_submission + else MonitorStyle.NONE.value + ) + if "persistence_directory" not in pulsar_app_config: + pulsar_app_config["persistence_directory"] = os.path.join( + CONTAINER_STAGING_DIRECTORY, "persisted_data" + ) + elif "manager" in pulsar_app_config and manager_name != "_default_": + log.warning( + "'manager' set in app config but client has non-default manager '%s', this will cause communication" + " failures, remove `manager` from app or client config to fix", + manager_name, + ) + + using_dependencies = container is None and dependencies_description is not None + if using_dependencies and "dependency_resolution" not in pulsar_app_config: + # Setup default dependency resolution for container above... + dependency_resolution = { + "cache": False, + "use": True, + "default_base_path": "/pulsar_dependencies", + "cache_dir": "/pulsar_dependencies/_cache", + "resolvers": [ + { # TODO: add CVMFS resolution... + "type": "conda", + "auto_init": True, + "auto_install": True, + "prefix": "/pulsar_dependencies/conda", + }, + { + "type": "conda", + "auto_init": True, + "auto_install": True, + "prefix": "/pulsar_dependencies/conda", + "versionless": True, + }, + ], + } + pulsar_app_config["dependency_resolution"] = dependency_resolution + return pulsar_app_config + + def _ensure_manager_config(self, pulsar_app_config, manager_name, manager_type): + if "manager" in pulsar_app_config: + manager_config = pulsar_app_config["manager"] + elif "managers" in pulsar_app_config: + managers_config = pulsar_app_config["managers"] + if manager_name not in managers_config: + managers_config[manager_name] = {} + manager_config = managers_config[manager_name] + else: + manager_config = {} + pulsar_app_config["manager"] = manager_config + if "type" not in manager_config: + manager_config["type"] = manager_type + return manager_config + class MessagingClientManagerProtocol(ClientManagerProtocol): status_cache: Dict[str, Dict[str, Any]] @@ -492,7 +576,7 @@ def launch( container_info=None, token_endpoint=None, pulsar_app_config=None, - staging_manifest=None + staging_manifest=None, ) -> Optional[ExternalId]: """ """ @@ -514,48 +598,15 @@ def launch( manager_name = self.client_manager.manager_name manager_type = "coexecution" if container is not None else "unqueued" - pulsar_app_config = pulsar_app_config or {} - manager_config = self._ensure_manager_config( - pulsar_app_config, manager_name, manager_type, + pulsar_app_config = self.get_pulsar_app_config( + pulsar_app_config=pulsar_app_config, + container=container, + wait_after_submission=wait_after_submission, + manager_name=manager_name, + manager_type=manager_type, + dependencies_description=dependencies_description, ) - if "staging_directory" not in manager_config and "staging_directory" not in pulsar_app_config: - pulsar_app_config["staging_directory"] = CONTAINER_STAGING_DIRECTORY - - if self.amqp_key_prefix: - pulsar_app_config["amqp_key_prefix"] = self.amqp_key_prefix - - if "monitor" not in manager_config: - manager_config["monitor"] = MonitorStyle.BACKGROUND.value if wait_after_submission else MonitorStyle.NONE.value - if "persistence_directory" not in pulsar_app_config: - pulsar_app_config["persistence_directory"] = os.path.join(CONTAINER_STAGING_DIRECTORY, "persisted_data") - elif "manager" in pulsar_app_config and manager_name != '_default_': - log.warning( - "'manager' set in app config but client has non-default manager '%s', this will cause communication" - " failures, remove `manager` from app or client config to fix", manager_name) - - using_dependencies = container is None and dependencies_description is not None - if using_dependencies and "dependency_resolution" not in pulsar_app_config: - # Setup default dependency resolution for container above... - dependency_resolution = { - "cache": False, - "use": True, - "default_base_path": "/pulsar_dependencies", - "cache_dir": "/pulsar_dependencies/_cache", - "resolvers": [{ # TODO: add CVMFS resolution... - "type": "conda", - "auto_init": True, - "auto_install": True, - "prefix": '/pulsar_dependencies/conda', - }, { - "type": "conda", - "auto_init": True, - "auto_install": True, - "prefix": '/pulsar_dependencies/conda', - "versionless": True, - }] - } - pulsar_app_config["dependency_resolution"] = dependency_resolution base64_message = to_base64_json(launch_params) base64_app_conf = to_base64_json(pulsar_app_config) pulsar_container_image = self.pulsar_container_image @@ -607,21 +658,6 @@ def _pulsar_script_args(self, manager_name, base64_job, base64_app_conf, wait_ar manager_args.extend(["--base64", base64_job, "--app_conf_base64", base64_app_conf]) return manager_args - def _ensure_manager_config(self, pulsar_app_config, manager_name, manager_type): - if "manager" in pulsar_app_config: - manager_config = pulsar_app_config["manager"] - elif "managers" in pulsar_app_config: - managers_config = pulsar_app_config["managers"] - if manager_name not in managers_config: - managers_config[manager_name] = {} - manager_config = managers_config[manager_name] - else: - manager_config = {} - pulsar_app_config["manager"] = manager_config - if "type" not in manager_config: - manager_config["type"] = manager_type - return manager_config - def _launch_containers( self, pulsar_submit_container: CoexecutionContainerCommand, diff --git a/pulsar/client/util.py b/pulsar/client/util.py index e10531ed..09cedb69 100644 --- a/pulsar/client/util.py +++ b/pulsar/client/util.py @@ -60,7 +60,7 @@ def _copy_and_close(object, output): @wraps(_b64encode) def b64encode(val, **kwargs): try: - return _b64encode(val, **kwargs) + return _b64encode(val, **kwargs).decode("utf-8") except TypeError: return _b64encode(val.encode('UTF-8'), **kwargs).decode('UTF-8') From b1480f2884e2cfbc1895e654499766c5c40be266 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Wed, 16 Jul 2025 10:38:39 +0200 Subject: [PATCH 10/15] Construct staging and output manifests from `FileStager` and `ResultsCollector` At the moment, JSON staging and outputs manifests are constructed by tracking all actions mapped by the `FileActionMapper` using a list `FileActionMapper.actions`. This makes the `FileActionMapper` stateful, requires including `file_type` as keyword argument for `BaseAction` and its children, requires defining a finalize()` method for `FileActionMapper` and for `BaseAction` and its children. Paying the small price of refactoring `JsonTransferAction`, generate the staging manifest from `FileStager.transfer_tracker.remote_staging_actions` and the output manifest as `ResultsCollector` collects the outputs. --- pulsar/client/action_mapper.py | 64 +++++++++++------------ pulsar/client/staging/down.py | 16 +++--- pulsar/client/staging/up.py | 17 +++--- pulsar/managers/staging/post.py | 8 ++- pulsar/scripts/collect_output_manifest.py | 5 +- 5 files changed, 58 insertions(+), 52 deletions(-) diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index 10fdc186..cd06c54b 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -21,7 +21,6 @@ Any, Dict, List, - Optional, Type, ) from urllib.parse import urlencode @@ -191,9 +190,6 @@ def __init__(self, client=None, config=None): self.ssh_port = config.get("ssh_port", None) self.mappers = mappers_from_dicts(config.get("paths", [])) self.files_endpoint = config.get("files_endpoint", None) - self.actions = [] - # Might want to make the working directory available here so that we know where to place archive - # for archive action def action(self, source, type, mapper=None): path = source.get("path", None) @@ -204,14 +200,10 @@ def action(self, source, type, mapper=None): if mapper: file_lister = mapper.file_lister action_kwds = mapper.action_kwds - action = action_class(source, file_lister=file_lister, file_type=type, **action_kwds) + action = action_class(source, file_lister=file_lister, **action_kwds) self.__process_action(action, type) - self.actions.append(action) return action - def finalize(self): - return [_ for _ in (action.finalize() for action in self.actions) if _] - def unstructured_mappers(self): """ Return mappers that will map 'unstructured' files (i.e. go beyond mapping inputs, outputs, and config files). @@ -273,7 +265,6 @@ def __process_action(self, action, file_type): """ Extension point to populate extra action information after an action has been created. """ - action.file_type = file_type if getattr(action, "inject_url", False): self.__inject_url(action, file_type) if getattr(action, "inject_ssh_properties", False): @@ -309,12 +300,10 @@ class BaseAction: whole_directory_transfer_supported = False action_spec: Dict[str, Any] = {} action_type: str - file_type: Optional[str] = None - def __init__(self, source, file_lister=None, file_type=None): + def __init__(self, source, file_lister=None): self.source = source self.file_lister = file_lister or DEFAULT_FILE_LISTER - self.file_type = file_type @property def path(self): @@ -353,9 +342,6 @@ def _extend_base_dict(self, **kwds): base_dict.update(**kwds) return base_dict - def finalize(self): - pass - def to_dict(self): return self._extend_base_dict() @@ -404,8 +390,8 @@ class RewriteAction(BaseAction): action_type = "rewrite" staging = STAGING_ACTION_NONE - def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None, file_type=None): - super().__init__(source, file_lister=file_lister, file_type=file_type) + def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None): + super().__init__(source, file_lister=file_lister) self.source_directory = source_directory self.destination_directory = destination_directory @@ -481,8 +467,8 @@ class RemoteTransferAction(BaseAction): action_type = "remote_transfer" staging = STAGING_ACTION_REMOTE - def __init__(self, source, file_lister=None, url=None, file_type=None): - super().__init__(source, file_lister=file_lister, file_type=file_type) + def __init__(self, source, file_lister=None, url=None): + super().__init__(source, file_lister=file_lister) self.url = url def to_dict(self): @@ -509,8 +495,8 @@ class RemoteTransferTusAction(BaseAction): action_type = "remote_transfer_tus" staging = STAGING_ACTION_REMOTE - def __init__(self, source, file_lister=None, url=None, file_type=None): - super().__init__(source, file_lister=file_lister, file_type=file_type) + def __init__(self, source, file_lister=None, url=None): + super().__init__(source, file_lister=file_lister) self.url = url def to_dict(self): @@ -537,30 +523,39 @@ class JsonTransferAction(BaseAction): action_type = "json_transfer" staging = STAGING_ACTION_REMOTE - def __init__(self, source, file_lister=None, url=None, file_type=None): - super().__init__(source, file_lister, file_type) + def __init__(self, source, file_lister=None, url=None, from_path=None, to_path=None): + super().__init__(source, file_lister) self.url = url - self._from_path = None - self._to_path = None + + self._from_path = from_path + self._to_path = to_path + # `from_path` and `to_path` are mutually exclusive, only one of them should be set @classmethod def from_dict(cls, action_dict): - return JsonTransferAction(source=action_dict["source"], url=action_dict["url"]) + return JsonTransferAction( + source=action_dict["source"], + url=action_dict["url"], + from_path=action_dict.get("from_path"), + to_path=action_dict.get("to_path") + ) def to_dict(self): - return self._extend_base_dict(url=self.url) + return self._extend_base_dict(**self.to_staging_manifest_entry()) def write_to_path(self, path): - self._to_path = path + self._from_path, self._to_path = None, path def write_from_path(self, pulsar_path: str): - self._from_path = pulsar_path + self._from_path, self._to_path = pulsar_path, None - def finalize(self): + def to_staging_manifest_entry(self): + staging_manifest_entry = dict(url=self.url) + if self._from_path: + staging_manifest_entry["from_path"] = self._from_path if self._to_path: - return {"url": self.url, "to_path": self._to_path} - else: - return {"url": self.url, "from_path": self._from_path} + staging_manifest_entry["to_path"] = self._to_path + return staging_manifest_entry class RemoteObjectStoreCopyAction(BaseAction): @@ -911,6 +906,7 @@ def unstructured_map(self, path): __all__ = ( 'FileActionMapper', + 'JsonTransferAction', 'path_type', 'from_dict', 'MessageAction', diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index 82f9d348..114b2ab9 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -3,12 +3,13 @@ from contextlib import contextmanager from json import loads from logging import getLogger +from typing import Optional from os.path import ( join, relpath, ) -from ..action_mapper import FileActionMapper +from ..action_mapper import FileActionMapper, JsonTransferAction from ..staging import COMMAND_VERSION_FILENAME log = getLogger(__name__) @@ -64,16 +65,17 @@ def __init__(self, output_collector, action_mapper, client_outputs, pulsar_outpu self.working_directory_contents = pulsar_outputs.working_directory_contents or [] self.metadata_directory_contents = pulsar_outputs.metadata_directory_contents or [] self.job_directory_contents = pulsar_outputs.job_directory_contents or [] + self.output_manifest: Optional[list] = None def collect(self): + self.output_manifest = [] + self.__collect_working_directory_outputs() self.__collect_outputs() self.__collect_version_file() self.__collect_other_working_directory_files() self.__collect_metadata_directory_files() self.__collect_job_directory_files() - # Give actions that require a final action, like those that write a manifest, to write out their content - self.__finalize_action_mapper() # finalize collection here for executors that need this ? return self.exception_tracker.collection_failure_exceptions @@ -137,9 +139,6 @@ def __collect_job_directory_files(self): 'output_jobdir', ) - def __finalize_action_mapper(self): - self.action_mapper.finalize() - def __realized_dynamic_file_source_references(self): references = {"filename": [], "extra_files": []} @@ -212,7 +211,10 @@ def _attempt_collect_output(self, output_type, path, name=None): def _collect_output(self, output_type, action, name): log.info("collecting output {} with action {}".format(name, action)) try: - return self.output_collector.collect_output(self, output_type, action, name) + collect_result = self.output_collector.collect_output(self, output_type, action, name) + if isinstance(action, JsonTransferAction): + self.output_manifest.append(action.to_staging_manifest_entry()) + return collect_result except Exception as e: if _allow_collect_failure(output_type): log.warning( diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 3b8f8eaa..ea850a33 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -16,6 +16,7 @@ from ..action_mapper import ( FileActionMapper, + JsonTransferAction, MessageAction, path_type, ) @@ -72,15 +73,19 @@ def submit_job(client, client_job_description, job_config=None): launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources launch_kwds["token_endpoint"] = client.token_endpoint - # populate `to_path` + # generate staging manifest staging_manifest = [] - for action in file_stager.action_mapper.actions: - if action.file_type not in ("output", "output_workdir"): + for action_description in remote_staging_actions: + action_dict = action_description["action"] + is_json_transfer_action = action_dict.get("action_type") == JsonTransferAction.action_type + is_not_output_action = action_description.get("type") not in ("output", "output_workdir") + if is_json_transfer_action and is_not_output_action: + file_type = action_description.get("type") + action = JsonTransferAction.from_dict(action_dict) name = basename(action.path) - path = file_stager.job_directory.calculate_path(name, action.file_type) + path = file_stager.job_directory.calculate_path(name, file_type) action.write_to_path(path) - staging_manifest.append(action.finalize()) - + staging_manifest.append(action.to_staging_manifest_entry()) if staging_manifest: launch_kwds["staging_manifest"] = staging_manifest diff --git a/pulsar/managers/staging/post.py b/pulsar/managers/staging/post.py index 3c5fdd32..23b4ed0f 100644 --- a/pulsar/managers/staging/post.py +++ b/pulsar/managers/staging/post.py @@ -20,7 +20,9 @@ def postprocess(job_directory, action_executor, was_cancelled): staging_config = job_directory.load_metadata("launch_config").get("remote_staging", None) else: staging_config = None - file_action_mapper, collected = _collect_outputs(job_directory, staging_config, action_executor, was_cancelled) + file_action_mapper, _, collected = _collect_outputs( + job_directory, staging_config, action_executor, was_cancelled + ) return collected finally: job_directory.write_file("postprocessed", "") @@ -29,6 +31,7 @@ def postprocess(job_directory, action_executor, was_cancelled): def _collect_outputs(job_directory, staging_config, action_executor, was_cancelled): collected = True + output_manifest = None if "action_mapper" in staging_config: file_action_mapper = action_mapper.FileActionMapper(config=staging_config["action_mapper"]) client_outputs = staging.ClientOutputs.from_dict(staging_config["client_outputs"]) @@ -39,7 +42,8 @@ def _collect_outputs(job_directory, staging_config, action_executor, was_cancell if collection_failure_exceptions: log.warn("Failures collecting results %s" % collection_failure_exceptions) collected = False - return file_action_mapper, collected + output_manifest = results_collector.output_manifest + return file_action_mapper, output_manifest, collected def realized_dynamic_file_sources(job_directory): diff --git a/pulsar/scripts/collect_output_manifest.py b/pulsar/scripts/collect_output_manifest.py index 72a7a1a4..d8300c8c 100644 --- a/pulsar/scripts/collect_output_manifest.py +++ b/pulsar/scripts/collect_output_manifest.py @@ -23,15 +23,14 @@ def collect_outputs(job_directory: str, staging_config_path: str, output_manifes with open(staging_config_path) as staging_fh: staging_config = json.load(staging_fh) - action_mapper, _ = _collect_outputs( + action_mapper, output_manifest, _ = _collect_outputs( job_directory_, staging_config=staging_config, action_executor=RetryActionExecutor(), was_cancelled=lambda: False ) - new_manifest = action_mapper.finalize() with open(output_manifest_path, "w") as manifest_fh: - json.dump(new_manifest, manifest_fh) + json.dump(output_manifest, manifest_fh) def main(): From 86c1ea9c28237dfda924e54e883ac9012db12a00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Wed, 16 Jul 2025 10:46:06 +0200 Subject: [PATCH 11/15] Disable whole directory transfers for `JsonTransferAction` Set `JsonTransferAction.whole_directory_transfer_supported` to `False`, as the job files API is not capable of serving directories. --- pulsar/client/action_mapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index cd06c54b..3dd17c41 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -519,7 +519,7 @@ class JsonTransferAction(BaseAction): external system that can stage files in and out of the compute environment. """ inject_url = True - whole_directory_transfer_supported = True + whole_directory_transfer_supported = False action_type = "json_transfer" staging = STAGING_ACTION_REMOTE From 148006183ee94935cd427b63701e2c17e439fe70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Thu, 17 Jul 2025 15:24:17 +0200 Subject: [PATCH 12/15] Calculate staging manifest path from action description Using `basename(action.path)` creates a flat structure for each file type (e.g. job_directory/unstructured/human.fa), but Pulsar expects tree structures to work (e.g. job_directory/unstructured/f0d0164494db6cbf92c12aeb6119ac38/bwa/human.fa). --- pulsar/client/staging/up.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index ea850a33..6b509275 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -82,7 +82,7 @@ def submit_job(client, client_job_description, job_config=None): if is_json_transfer_action and is_not_output_action: file_type = action_description.get("type") action = JsonTransferAction.from_dict(action_dict) - name = basename(action.path) + name = action_description["name"] path = file_stager.job_directory.calculate_path(name, file_type) action.write_to_path(path) staging_manifest.append(action.to_staging_manifest_entry()) From 8b19f7d391ede022d7b6f19b6c4892e0a978fc74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Tue, 18 Mar 2025 15:07:46 +0100 Subject: [PATCH 13/15] Pulsar client for ARC Implement a Pulsar client that runs jobs on computing infrastructure using the Advanced Resource Connector (ARC) middleware. --- pulsar/client/client.py | 370 +++++++++++++++++++++++++++++++++++- pulsar/client/manager.py | 7 +- pulsar/client/test/check.py | 8 + pulsar/managers/util/arc.py | 89 +++++++++ test/integration_test.py | 2 +- 5 files changed, 472 insertions(+), 4 deletions(-) create mode 100644 pulsar/managers/util/arc.py diff --git a/pulsar/client/client.py b/pulsar/client/client.py index 273dc1e7..74762179 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -1,6 +1,9 @@ import logging import os +import tempfile from enum import Enum +from pathlib import Path +from textwrap import dedent from typing import ( Any, Callable, @@ -10,7 +13,15 @@ Optional, ) from typing_extensions import Protocol +from urllib.parse import urlparse, urlunparse +from xml.etree.ElementTree import ( + Element, + fromstring, + SubElement, + tostring, +) +from pulsar.managers.util.arc import ensure_pyarc, pyarcrest, arc_state_to_pulsar_state from pulsar.managers.util.tes import ( ensure_tes_client, TesClient, @@ -68,8 +79,11 @@ sh $path; echo 'ran script'""" - -PULSAR_CONTAINER_IMAGE = "galaxy/pulsar-pod-staging:0.15.0.0" +PULSAR_CONTAINER_VERSION = "0.16.0" +PULSAR_CONTAINER_IMAGE = f"galaxy/pulsar-pod-staging:{PULSAR_CONTAINER_VERSION}" +PULSAR_SINGULARITY_IMAGE = ( + f"https://github.com/galaxyproject/pulsar/releases/download/{PULSAR_CONTAINER_VERSION}/pulsar.sif" +) CONTAINER_STAGING_DIRECTORY = "/pulsar_staging/" @@ -561,6 +575,301 @@ class ExecutionType(str, Enum): PARALLEL = "parallel" +class ARCLaunchMixin(BaseRemoteConfiguredJobClient): + """Execute containers sequentially using ARC.""" + + ensure_library_available = ensure_pyarc + execution_type = ExecutionType.SEQUENTIAL + + default_tool_container_image: str = "docker://python:slim" + + _arc_job_id: str + # Holds the ARC job id after launch, used to kill the job and to get status updates. + + def launch( + self, + command_line, + dependencies_description=None, + env=None, + remote_staging=None, + job_config=None, + dynamic_file_sources=None, + container_info=None, + token_endpoint=None, + pulsar_app_config=None, + staging_manifest=None, + ) -> Optional[ExternalId]: + container = container_info["container_id"] if container_info else None + guest_ports = [int(p) for p in container_info["guest_ports"]] if container_info else None + + # prepare auxiliary artifacts + auxiliary_artifacts_directory = Path(tempfile.mkdtemp(prefix="pulsar_arc_")) + input_manifest_path = auxiliary_artifacts_directory / "input_manifest.json" + staging_config_path = auxiliary_artifacts_directory / "staging_config.json" + with open(input_manifest_path, "w") as input_manifest_fh: + input_manifest_fh.write(json_dumps(staging_manifest)) + with open(staging_config_path, mode="w") as staging_config_fh: + staging_config_fh.write(json_dumps(remote_staging)) + + # build Pulsar submit command + launch_params = self._build_setup_message( + command_line, + dependencies_description=dependencies_description, + env=env, + remote_staging=remote_staging, + job_config=job_config, + dynamic_file_sources=dynamic_file_sources, + token_endpoint=token_endpoint, + ) + pulsar_app_config = self.get_pulsar_app_config( + pulsar_app_config=pulsar_app_config, + container=container, + wait_after_submission=False, + manager_name="_default_", + manager_type="unqueued", + dependencies_description=dependencies_description, + ) + base64_message = to_base64_json(launch_params) + base64_app_conf = to_base64_json(pulsar_app_config) + pulsar_submit = CoexecutionContainerCommand( + self.pulsar_container_image, + "pulsar-submit", + [ + "--base64", + base64_message, + "--app_conf_base64", + base64_app_conf, + "--no_wait", + ], + job_config["job_directory"], + None, + ) + + # build tool command + tool_container = CoexecutionContainerCommand( + container or self.default_tool_container_image, + "bash", + [f"{job_config['job_directory']}/command.sh"], + job_config["job_directory"], + guest_ports, + ) + + # build Pulsar output manifest command + output_manifest_path = "output_manifest.json" + output_manifest = CoexecutionContainerCommand( + self.pulsar_container_image, + "pulsar-create-output-manifest", + [ + "--job-directory", + job_config["job_directory"], + "--staging-config-path", + staging_config_path.name, + "--output-manifest-path", + output_manifest_path, + ], + job_config["job_directory"], + ) + + # build ARC job + executable_path = Path(auxiliary_artifacts_directory) / "job.sh" + with open(executable_path, "wb") as executable_fh: + executable: bytes = self._generate_executable( + job_directory=job_config["job_directory"], + output_manifest_path=output_manifest_path, + persistence_directory=pulsar_app_config["persistence_directory"], + staging_directory=str( + Path(pulsar_app_config["staging_directory"]) / Path(job_config["job_directory"]).name + ), + pulsar_submit_command=pulsar_submit, + tool_container_command=tool_container, + pulsar_manifest_command=output_manifest, + ) + executable_fh.write(executable) + job_description: bytes = self._generate_job_description( + job_directory=job_config["job_directory"], + input_manifest=staging_manifest, + executable_path=executable_path, + staging_config_path=Path(staging_config_path), + ) + + # submit arc job + arc_endpoint = self.destination_params["arc_url"] + arc_oidc_token = self.destination_params["arc_oidc_token"] + arc_job_id = self._launch_arc_job( + arc_endpoint, + arc_oidc_token, + job_description.decode("utf-8"), + ) + self._arc_job_id = arc_job_id.external_id + + return arc_job_id + + @staticmethod + def _generate_executable( + job_directory: str, + output_manifest_path: str, + persistence_directory: str, + staging_directory: str, + pulsar_submit_command: CoexecutionContainerCommand, + tool_container_command: CoexecutionContainerCommand, + pulsar_manifest_command: CoexecutionContainerCommand, + ) -> bytes: + return dedent(f""" + #!/bin/bash + set -e + + # clear SLURM variables (isolate the job's environment from ARC's internal infrastructure) + # https://hpcc.umd.edu/hpcc/help/slurmenv.html + unset SLURM_CPUS_ON_NODE + unset SLURM_CPUS_PER_TASK + unset SLURM_JOB_ID + unset SLURM_JOBID + unset SLURM_JOB_NAME + unset SLURM_JOB_NODELIST + unset SLURM_JOB_NUM_NODES + unset SLURM_LOCALID + unset SLURM_NODEID + unset SLURM_NNODES + unset SLURM_NODELIST + unset SLURM_NTASKS + unset SLURM_SUBMIT_DIR + unset SLURM_SUBMIT_HOST + unset SLURM_TASKS_PER_NODE + + mkdir -p job_directory + mkdir -p persistence_directory + + singularity run \\ + --no-mount bind-paths \\ + --bind /grid:/grid \\ + --bind "job_directory":{staging_directory} \\ + --bind "persistence_directory":{persistence_directory} \\ + --pwd {staging_directory} \\ + {pulsar_submit_command.image} \\ + {pulsar_submit_command.command} {" ".join(pulsar_submit_command.args)} + + singularity run \\ + --no-mount bind-paths \\ + --bind /grid:/grid \\ + --bind "job_directory":{job_directory} \\ + --bind "job_directory":{staging_directory} \\ + --bind "persistence_directory":{persistence_directory} \\ + --pwd {tool_container_command.working_directory} \\ + {tool_container_command.image} \\ + {tool_container_command.command} {" ".join(tool_container_command.args)} + + mv staging_config.json job_directory + singularity run \\ + --no-mount bind-paths \\ + --bind /grid:/grid \\ + --bind "job_directory":{job_directory} \\ + --bind "job_directory":{staging_directory} \\ + --bind "persistence_directory":{persistence_directory} \\ + --pwd {tool_container_command.working_directory} \\ + {pulsar_manifest_command.image} \\ + {pulsar_manifest_command.command} {" ".join(pulsar_manifest_command.args)} + + cat job_directory/{output_manifest_path} | jq -r '.[] | "\(.from_path | sub("^{job_directory}/"; "job_directory/")) \(.url | capture("^(?[^:]+://[^/]+)(?/.*)") | "\(.protocolurlhostport);overwrite=yes\(.fileandmetadataoptions)" )"' > output.files + """).encode("utf-8") + + def _generate_job_description( + self, + job_directory: str, + input_manifest: dict, + executable_path: Path, + staging_config_path: Path, + ) -> bytes: + # job_directory = Path(self.job_directory.job_directory) + # metadata_directory = Path(self.job_directory.metadata_directory) + + activity_description = Element("ActivityDescription") + activity_description.set("xmlns", "http://www.eu-emi.eu/es/2010/12/adl") + activity_description.set("xmlns:emiestypes", "http://www.eu-emi.eu/es/2010/12/types") + activity_description.set("xmlns:nordugrid-adl", "http://www.nordugrid.org/es/2011/12/nordugrid-adl") + + activity_identification = SubElement(activity_description, "ActivityIdentification") + activity_identification_name = SubElement(activity_identification, "Name") + activity_identification_name.text = f"Galaxy job {self.job_id}" + + application = SubElement(activity_description, "Application") + application_executable = SubElement(application, "Executable") + application_executable_path = SubElement(application_executable, "Path") + application_executable_path.text = executable_path.name + application_output = SubElement(application, "Output") + application_output.text = "arc.out" + application_error = SubElement(application, "Error") + application_error.text = "arc.err" + + # resources = SubElement(activity_description, "Resources") + # resources_cpu_time = SubElement(resources, "IndividualCPUTime") + # resources_cpu_time.text = self.cpu_time + # resources_memory = SubElement(resources, "IndividualPhysicalMemory") + # resources_memory.text = self.memory + + data_staging = SubElement(activity_description, "DataStaging") + staging_config = SubElement(data_staging, "InputFile") + staging_config_name = SubElement(staging_config, "Name") + staging_config_name.text = staging_config_path.name + staging_config_uri = SubElement(staging_config, "URI") + staging_config_uri.text = f"file://{staging_config_path.absolute()}" + executable = SubElement(data_staging, "InputFile") + executable_name = SubElement(executable, "Name") + executable_name.text = executable_path.name + executable_uri = SubElement(executable, "URI") + executable_uri.text = f"file://{executable_path.absolute()}" + for input_ in input_manifest: + input_file = SubElement(data_staging, "InputFile") + name = SubElement(input_file, "Name") + name.text = str(Path("job_directory") / Path(input_["to_path"]).relative_to(job_directory)) + source = SubElement(input_file, "Source") + uri = SubElement(source, "URI") + uri_parts = urlparse(input_["url"]) + uri.text = urlunparse(( + uri_parts.scheme, + f"{uri_parts.netloc};cache=copy;readonly=no", + uri_parts.path, + uri_parts.params, + uri_parts.query, + uri_parts.fragment + )) + output_file = SubElement(data_staging, "OutputFile") + output_file_name = SubElement(output_file, "Name") + output_file_name.text = f"@output.files" + + return tostring(activity_description, encoding="UTF-8", method="xml") + + @staticmethod + def _launch_arc_job( + arc_endpoint: str, + arc_oidc_token: str, + job_description: str, + ) -> ExternalId: + client = pyarcrest.arc.ARCRest.getClient(url=arc_endpoint, token=arc_oidc_token) + delegation_id = client.createDelegation() + + results = client.createJobs(job_description, delegationID=delegation_id)[0].value + if isinstance(results, Exception): + raise results + arc_job_id, status = results + + job_description_tree_root = fromstring(job_description) + inputs = { + node.find("{http://www.eu-emi.eu/es/2010/12/adl}Name").text: + node.find("{http://www.eu-emi.eu/es/2010/12/adl}URI").text + for node in job_description_tree_root.findall( + "./{http://www.eu-emi.eu/es/2010/12/adl}DataStaging/{http://www.eu-emi.eu/es/2010/12/adl}InputFile" + ) + if node.find("{http://www.eu-emi.eu/es/2010/12/adl}URI") is not None and node.find( + "{http://www.eu-emi.eu/es/2010/12/adl}URI").text.startswith("file://") + } + + upload_errors = client.uploadJobFiles([arc_job_id], [inputs])[0] + if upload_errors: # input upload error + raise Exception("Error uploading job files to ARC") + + return ExternalId(str(arc_job_id)) + + class CoexecutionLaunchMixin(BaseRemoteConfiguredJobClient): execution_type: ExecutionType pulsar_container_image: str @@ -793,6 +1102,63 @@ def raw_check_complete(self) -> Dict[str, Any]: } +class ARCPollingSequentialJobClient(BasePollingCoexecutionJobClient, ARCLaunchMixin): + """A client that (sequentially) executes containers in ARC and does not depend on AMQP.""" + + def __init__(self, destination_params, job_id, client_manager): + super().__init__(destination_params, job_id, client_manager) + self.pulsar_container_image = destination_params.get("pulsar_container_image", PULSAR_SINGULARITY_IMAGE) + + def get_status(self): + arc_endpoint = self.destination_params["arc_url"] + arc_oidc_token = self.destination_params["arc_oidc_token"] + arc_job_id = self._arc_job_id + # injected by `PulsarARCJobRunner` when creating the client from the job's state, or set when launching the job + + arc_client = pyarcrest.arc.ARCRest.getClient(url=arc_endpoint, token=arc_oidc_token) + + arc_state = arc_client.getJobsStatus([arc_job_id])[0].value + if isinstance(arc_state, (pyarcrest.errors.ARCHTTPError, pyarcrest.errors.NoValueInARCResult)): + pulsar_state = manager_status.LOST + else: + pulsar_state = arc_state_to_pulsar_state(arc_state) + + return pulsar_state + + def raw_check_complete(self): + pulsar_state = self.get_status() + return { + "status": pulsar_state, + "complete": "true" if manager_status.is_job_done(pulsar_state) else "false", + # ancient John, what were you thinking? 👀 + } + + def full_status(self): + return { + **self.raw_check_complete(), + "outputs_directory_contents": [], + # it needs to be defined, otherwise `PulsarOutputs.has_outputs` fails; it is ok that it is empty because + # ARC is responsible for staging the outputs (Galaxy does not have to collect any outputs) + } + + def kill(self) -> None: + arc_endpoint = self.destination_params["arc_url"] + arc_oidc_token = self.destination_params["arc_oidc_token"] + + job_id = self.job_id + arc_jobid = self._arc_job_id + # injected by `PulsarARCJobRunner` when creating the client from the job's state, or set when launching the job + + client = pyarcrest.arc.ARCRest.getClient(url=arc_endpoint, token=arc_oidc_token) + try: + client.killJobs([arc_jobid])[0].value + except Exception as e: + raise Exception(f"Attempt to kill job with ARC id {arc_jobid} and Galaxy id {job_id} failed.") from e + + def clean(self): + pass + + class TesPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesTesContainersMixin): """A client that co-executes pods via GA4GH TES and depends on amqp for status updates.""" diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py index 9c429734..1716b173 100644 --- a/pulsar/client/manager.py +++ b/pulsar/client/manager.py @@ -29,6 +29,7 @@ MessageJobClient, TesMessageCoexecutionJobClient, TesPollingCoexecutionJobClient, + ARCPollingSequentialJobClient, ) from .destination import url_to_destination_params from .object_client import ObjectStoreClient @@ -240,6 +241,8 @@ def get_client(self, destination_params, job_id, **kwargs): return K8sMessageCoexecutionJobClient(destination_params, job_id, self) elif destination_params.get("tes_url", False): return TesMessageCoexecutionJobClient(destination_params, job_id, self) + elif destination_params.get("arc_enabled", False): + return ARCPollingSequentialJobClient(destination_params, job_id, self) else: return MessageJobClient(destination_params, job_id, self) @@ -256,6 +259,8 @@ def get_client(self, destination_params, job_id, **kwargs): return K8sPollingCoexecutionJobClient(destination_params, job_id, self) elif destination_params.get("tes_url", False): return TesPollingCoexecutionJobClient(destination_params, job_id, self) + elif destination_params.get("arc_enabled", False): + return ARCPollingSequentialJobClient(destination_params, job_id, self) else: raise Exception("Unknown client configuration") @@ -268,7 +273,7 @@ def build_client_manager(**kwargs: Dict[str, Any]) -> ClientManagerInterface: return ClientManager(**kwargs) # TODO: Consider more separation here. elif kwargs.get('amqp_url', None): return MessageQueueClientManager(**kwargs) - elif kwargs.get("k8s_enabled") or kwargs.get("tes_url"): + elif kwargs.get("k8s_enabled") or kwargs.get("tes_url") or kwargs.get("arc_enabled"): return PollingJobClientManager(**kwargs) else: return ClientManager(**kwargs) diff --git a/pulsar/client/test/check.py b/pulsar/client/test/check.py index 5ddc00f4..b01a4b52 100644 --- a/pulsar/client/test/check.py +++ b/pulsar/client/test/check.py @@ -501,6 +501,12 @@ def extract_client_options(options): client_options["k8s_enabled"] = options.k8s_enabled if hasattr(options, "tes_url"): client_options["tes_url"] = options.tes_url + if hasattr(options, "arc_enabled"): + client_options["arc_enabled"] = options.arc_enabled + if hasattr(options, "arc_url"): + client_options["arc_url"] = options.arc_url + if hasattr(options, "arc_oidc_token"): + client_options["arc_oidc_token"] = options.arc_oidc_token if hasattr(options, "container"): client_options["container"] = options.container return client_options @@ -532,6 +538,8 @@ def client_manager_from_args(options): manager_args['tes_url'] = options.tes_url if getattr(options, "k8s_enabled", None): manager_args['k8s_enabled'] = options.k8s_enabled + if getattr(options, "arc_enabled", None): + manager_args['arc_enabled'] = options.arc_enabled cm = build_client_manager(**manager_args) return cm diff --git a/pulsar/managers/util/arc.py b/pulsar/managers/util/arc.py new file mode 100644 index 00000000..f2d06c72 --- /dev/null +++ b/pulsar/managers/util/arc.py @@ -0,0 +1,89 @@ +import logging +from enum import Enum +from typing import Optional + +from pulsar.managers import status as state + +try: + import pyarcrest + import pyarcrest.arc +except ImportError: + pyarcrest = None + +__all__ = ("ARCState", "arc_state_to_pulsar_state", "ensure_pyarc", "pyarcrest") + + +log = logging.getLogger(__name__) + +PYARCREST_UNAVAILABLE_MESSAGE = ( + "Pulsar ARC client requires the Python package `pyarcrest` - but it is unavailable. Please install `pyarcrest`." +) + +def ensure_pyarc(): + if pyarcrest is None: + raise ImportError(PYARCREST_UNAVAILABLE_MESSAGE) + + +class ARCState(str, Enum): + """ + ARC job states that the REST interface may report. + + References: + - [1] https://www.nordugrid.org/arc/arc7/tech/rest/rest.html#rest-interface-job-states + """ + + ACCEPTING = "ACCEPTING" + ACCEPTED = "ACCEPTED" + PREPARING = "PREPARING" + PREPARED = "PREPARED" + SUBMITTING = "SUBMITTING" + QUEUING = "QUEUING" + RUNNING = "RUNNING" + HELD = "HELD" + EXITINGLRMS = "EXITINGLRMS" + OTHER = "OTHER" + EXECUTED = "EXECUTED" + FINISHING = "FINISHING" + FINISHED = "FINISHED" + FAILED = "FAILED" + KILLING = "KILLING" + KILLED = "KILLED" + WIPED = "WIPED" + + +ARC_STATE_TO_PULSAR_STATE_MAP = { + # Mapping from ARC REST interface job states to Pulsar job states. + ARCState.ACCEPTING: state.PREPROCESSING, # Session created, files can be uploaded; not yet processed. + ARCState.ACCEPTED: state.PREPROCESSING, # Detected by A-REX, can't proceed yet. + ARCState.PREPARING: state.PREPROCESSING, # Data stage-in, input data gathering. + ARCState.PREPARED: state.QUEUED, # Waiting in queue for batch submission. + ARCState.SUBMITTING: state.QUEUED, # Preparing for submission. + ARCState.QUEUING: state.QUEUED, # In batch system queue. + ARCState.RUNNING: state.RUNNING, # Running. + ARCState.HELD: state.RUNNING, # On hold/suspended; keep as queued. + ARCState.EXITINGLRMS: state.RUNNING, # Finishing execution in batch system. + ARCState.OTHER: state.RUNNING, # Unknown state; treat as lost. + ARCState.EXECUTED: state.POSTPROCESSING, # Completed, waiting for post-processing. + ARCState.FINISHING: state.POSTPROCESSING, # Data stage-out, cleaning up. + ARCState.FINISHED: state.COMPLETE, # Successful completion. + ARCState.FAILED: state.FAILED, # Failed. + ARCState.KILLING: state.CANCELLED, # Being cancelled. + ARCState.KILLED: state.CANCELLED, # Killed by user. + ARCState.WIPED: state.LOST, # Data deleted, treat as lost. +} + + +def arc_state_to_pulsar_state(arc_state: Optional[ARCState]) -> str: + """ + Map ARC REST interface job states to Pulsar job states. + + Assign the Pulsar state FAILED to jobs whose ARC state does not match any of the states from the mapping + ``ARC_STATE_TO_PULSAR_STATE_MAP``. + """ + pulsar_state = ARC_STATE_TO_PULSAR_STATE_MAP.get(arc_state) + + if pulsar_state is None: + log.warning(f"Unknown ARC state encountered [{arc_state}]") + return state.FAILED + + return pulsar_state diff --git a/test/integration_test.py b/test/integration_test.py index 1c7420b2..8d7cc701 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -480,7 +480,7 @@ def _run_direct(test_configuration, **kwds): def _update_options_for_app(options, app, **kwds): if kwds.get("local_setup", False): staging_directory = app.staging_directory - is_coexecution = kwds.get("k8s_enabled") or kwds.get("tes_url") + is_coexecution = kwds.get("k8s_enabled") or kwds.get("tes_url") or kwds.get("arc_enabled") if is_coexecution: # Update client to not require this - seems silly. options["jobs_directory"] = "/pulsar_staging" From 25c843bca54a983452c7a7a7edeeb60d4eadf6dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Thu, 17 Jul 2025 16:16:23 +0200 Subject: [PATCH 14/15] Add integration test for Pulsar polling client for ARC --- test/integration_test.py | 42 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/test/integration_test.py b/test/integration_test.py index 8d7cc701..ed467658 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -237,7 +237,45 @@ def test_integration_cli_slurm(integration_test_configuration: IntegrationTestCo ) -# Test and Kubernetes tests without MQ +# Tes, Kubernetes and ARC tests without MQ + + +@integration_test +@skip_unless_environ("ARC_URL") +@skip_unless_environ("ARC_OIDC_TOKEN") +def test_arc_polling_integration(external_queue_test_configuration: IntegrationTestConfiguration): + """ + Integration test for the Advanced Resource Connector (polling client ``ARCPollingSequentialJobClient``). + + One way to run this integration test is to set up the test machine so that it can be contacted by an ARC instance + running on an external server. To do so, first expose your Pulsar test file server to a network that the ARC server + can access. A simple way to do it is using localhost.run:: + + ssh -R 80:localhost:8095 localhost.run + + It will generate a URL like this one ``https://99c68d2225e0ab.lhr.life``. Then configure the following environment + variables in the test environment:: + + ARC_URL=https://arc-galaxy-ce1.cern-test.uiocloud.no + ARC_OIDC_TOKEN=********************************.................................******************************** + PULSAR_TEST_FILE_SERVER_HOST=127.0.0.1 + PULSAR_TEST_FILE_SERVER_PORT=8095 + PULSAR_TEST_INTERNAL_JOB_FILES_URL=https://99c68d2225e0ab.lhr.life/ + """ + arc_url = environ["ARC_URL"] + arc_oidc_token = environ["ARC_OIDC_TOKEN"] + external_queue_test_configuration.set_app_conf_props(arc_url=arc_url, arc_oidc_token=arc_oidc_token) + run_job( + external_queue_test_configuration, + private_token=None, + local_setup=True, + default_file_action="json_transfer", + inject_files_endpoint=True, + arc_url=arc_url, + arc_enabled=True, + arc_oidc_token=arc_oidc_token, + expecting_full_metadata=False, + ) @integration_test @@ -305,7 +343,7 @@ def test_coexecution_kubernetes_polling_integration(external_queue_test_configur ) -# Test and Kubernetes tests with MQ +# Tes and Kubernetes tests with MQ # PULSAR_RABBIT_MQ_CONNECTION="amqp://guest:guest@localhost:5672" From 7c247fdf5f15541151a71786752ad4e168588941 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Thu, 17 Jul 2025 16:26:51 +0200 Subject: [PATCH 15/15] Increase integration tests timeout to accommodate ARC test --- test/test_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_utils.py b/test/test_utils.py index e41f9130..df704b01 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -68,7 +68,7 @@ def wrapper(*args, **kwargs): return outer_wrapper -INTEGRATION_MAXIMUM_TEST_TIME = 120 +INTEGRATION_MAXIMUM_TEST_TIME = 180 integration_test = timed(INTEGRATION_MAXIMUM_TEST_TIME) TEST_DIR = dirname(__file__)