Skip to content

Bridge send without client#57

Open
benoitmartin88 wants to merge 9 commits intomainfrom
bridge_send_without_client
Open

Bridge send without client#57
benoitmartin88 wants to merge 9 commits intomainfrom
bridge_send_without_client

Conversation

@benoitmartin88
Copy link
Copy Markdown
Contributor

No description provided.

@benoitmartin88 benoitmartin88 self-assigned this May 4, 2026
Comment thread src/deisa/dask/deisa.py
logger.info(f"Closing deisa. wait_for_bridges={wait_for_bridges}")
if wait_for_bridges:
self._has_close_been_called = True
if not self._has_close_been_called and wait_for_bridges:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we split this conditional statement ? Else __wait_for_bridges is never reached ?

Suggested change
if not self._has_close_been_called and wait_for_bridges:
if not self._has_close_been_called:
self._has_close_been_called = True
if wait_for_bridges:
self.__wait_for_bridges()

Comment thread src/deisa/dask/bridge.py
data2 = valmap(to_serialize, data)

_, who_has, nbytes = await scatter_to_workers(workers, data2, self.client.rpc)
_, who_has, nbytes = await scatter_to_workers(workers, data2)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change ? Should we modify the setup.py to fix the version of distributed (with distributed>=2021.04) ?

Comment on lines -153 to +129
if self.handshake_actor and self.handshake_actor.get_max_bridges().result() > 0:
Event(Handshake.DEISA_WAIT_FOR_DONE_EVENT, client=self.client).wait()
Event(Handshake.DEISA_WAIT_FOR_DONE_EVENT, client=self.client).wait()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a safety timeout if we remove the guard ?

Comment thread src/deisa/dask/bridge.py
Comment on lines +92 to +114
if self.id == 0:
# only id 0 has a real dask client
self.client = self.system_metadata['connection']
assert self.client is not None, "client cannot be None for Bridge id 0."
# get all workers from scheduler
self.workers = self.client.scheduler_info(n_workers=-1)["workers"]
# add rpc handlers to scheduler to handle collective ops
# TODO: move this somewhere else ? Also, this is only needed if CommClient is used.
self.client.run_on_scheduler(setup_comm, size=self.system_metadata['nb_bridges'])
# Use existing rpc connection to the scheduler. Also, use existing client event loop.
self.bridge_comm: ICommunicator = resolve_comm(comm, use_mpi_if_available=True,
comm_state_rpc=self.client.scheduler,
client=self.client,
size=self.system_metadata['nb_bridges'],
*args, **kwargs)
else:
self.bridge_comm: ICommunicator = resolve_comm(comm, use_mpi_if_available=True,
comm_state_rpc=rpc(
self.system_metadata['connection'].scheduler.addr),
client=self.client,
size=self.system_metadata['nb_bridges'],
*args, **kwargs)
time.sleep(.1) # wait for setup_comm to run on scheduler #TODO: find a better way
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Workaround for time.sleep(.1), using a .set() when id is 0, so that when id is not 0 we do not go further than the .wait()

Suggested change
if self.id == 0:
# only id 0 has a real dask client
self.client = self.system_metadata['connection']
assert self.client is not None, "client cannot be None for Bridge id 0."
# get all workers from scheduler
self.workers = self.client.scheduler_info(n_workers=-1)["workers"]
# add rpc handlers to scheduler to handle collective ops
# TODO: move this somewhere else ? Also, this is only needed if CommClient is used.
self.client.run_on_scheduler(setup_comm, size=self.system_metadata['nb_bridges'])
# Use existing rpc connection to the scheduler. Also, use existing client event loop.
self.bridge_comm: ICommunicator = resolve_comm(comm, use_mpi_if_available=True,
comm_state_rpc=self.client.scheduler,
client=self.client,
size=self.system_metadata['nb_bridges'],
*args, **kwargs)
else:
self.bridge_comm: ICommunicator = resolve_comm(comm, use_mpi_if_available=True,
comm_state_rpc=rpc(
self.system_metadata['connection'].scheduler.addr),
client=self.client,
size=self.system_metadata['nb_bridges'],
*args, **kwargs)
time.sleep(.1) # wait for setup_comm to run on scheduler #TODO: find a better way
if self.id == 0:
# only id 0 has a real dask client
self.client = self.system_metadata['connection']
assert self.client is not None, "client cannot be None for Bridge id 0."
# get all workers from scheduler
self.workers = self.client.scheduler_info(n_workers=-1)["workers"]
# add rpc handlers to scheduler to handle collective ops
# TODO: move this somewhere else ? Also, this is only needed if CommClient is used.
self.client.run_on_scheduler(setup_comm, size=self.system_metadata['nb_bridges'])
Event("deisa_comm_ready", client=self.client).set()
# Use existing rpc connection to the scheduler. Also, use existing client event loop.
self.bridge_comm: ICommunicator = resolve_comm(comm, use_mpi_if_available=True,
comm_state_rpc=self.client.scheduler,
client=self.client,
size=self.system_metadata['nb_bridges'],
*args, **kwargs)
else:
connection = self.system_metadata['connection']
Event("deisa_comm_ready", client=conn).wait()
self.bridge_comm: ICommunicator = resolve_comm(comm, use_mpi_if_available=True,
comm_state_rpc=rpc(
self.system_metadata['connection'].scheduler.addr),
client=self.client,
size=self.system_metadata['nb_bridges'],
*args, **kwargs)

Comment thread src/deisa/dask/bridge.py
Comment on lines +277 to +278
else:
return asyncio.run(self.__scatter(data, workers=workers, hash=hash))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A change to keep the asyncio (to be light and not handle the context and extra communication), but use the already existing RPC connection of CommClient, through a corountine

Suggested change
else:
return asyncio.run(self.__scatter(data, workers=workers, hash=hash))
else:
f = asyncio.run_coroutine_threadsafe(
self.__scatter(data, workers=workers, hash=hash),
self.bridge_comm._loop)
return f.result()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants