Co-execution client for Google Cloud Platform Batch v1#404
Co-execution client for Google Cloud Platform Batch v1#404jmchilton merged 1 commit intogalaxyproject:masterfrom
Conversation
| state = status.state | ||
| return { | ||
| "status": gcp_state_to_pulsar_status(state), | ||
| "complete": "true" if gcp_state_is_complete(state) else "false", # Ancient John, what were you thinking? |
There was a problem hiding this comment.
For the ARC client implementation I am determining if the job is complete using the Pulsar state rather than the ARC job state.
def full_status(self):
pulsar_state = self.get_status()
return {
"status": pulsar_state,
"complete": "true" if manager_status.is_job_done(pulsar_state) else "false",
# ancient John, what were you thinking? 👀
"outputs_directory_contents": [],
# it needs to be defined, otherwise `PulsarOutputs.has_outputs` fails; it is ok that it is empty because
# ARC is responsible for staging the outputs (Galaxy does not have to collect any outputs)
}I get the Pulsar state via a mapping too (similar to gcp_state_to_pulsar_status()). What would be the advantage of using two mappings gcp_state_to_pulsar_status(), gcp_state_is_complete() as you are doing for the GCP client implementation? (or what is my mistake?)
Would I need to use something akin to gcp_state_is_complete()
| def __init__(self, destination_params, job_id, client_manager): | ||
| super().__init__(destination_params, job_id, client_manager) |
There was a problem hiding this comment.
The client manager is generated by Galaxy calling pulsar.client.manager.build_client_manager(). For the ARC client implementation I was forced to define a kwarg arc_enabled (and do some extra trickery). I assume this is not how things work. How are you managing to make Galaxy create a PollingJobClientManager rather than a ClientManager when you want to use the polling job client?
def build_client_manager(**kwargs: Dict[str, Any]) -> ClientManagerInterface:
if 'job_manager' in kwargs:
return ClientManager(**kwargs) # TODO: Consider more separation here.
elif kwargs.get('amqp_url', None):
return MessageQueueClientManager(**kwargs)
elif kwargs.get("k8s_enabled") or kwargs.get("tes_url") or kwargs.get("arc_enabled"):
return PollingJobClientManager(**kwargs)
else:
return ClientManager(**kwargs)
There was a problem hiding this comment.
I've gone this route also more or less - the matching Pulsar client code looks something like https://github.com/galaxyproject/galaxy/compare/dev...jmchilton:galaxy:pulsar_2025?expand=1#diff-8232d66b380f49f27bbfbe089dad63ea193e3710939463c6294ee28d09fc0bafR1087.
There was a problem hiding this comment.
I did galaxyproject/galaxy@6b65775, which is almost the same. However, I find your approach more attractive.
Could you mention me when you open a PR for this? I'd like to suggest something (merging self.client_manager_kwargs with the superclass' client_manager_kwargs) and adapt galaxyproject/galaxy#20598 to your PR.
There was a problem hiding this comment.
Will do. I'm sorry I have not been very responsive but I do appreciate your attention to the details.
56f07ec to
691c84c
Compare
Implements #390.
From https://pulsar.readthedocs.io/en/latest/containers.html#galaxy-and-shared-file-systems about the advantages of a Pulsar co-execution client (a container native paradigm) over implementing a Galaxy job runner (an interface designed for shared file systems and DRMAA-style batch of jobs, not containers):