From 210ec281ac206936b8d1f2833ab909815b399b06 Mon Sep 17 00:00:00 2001 From: Tomaz Vieira Date: Wed, 18 Dec 2019 16:08:37 +0100 Subject: [PATCH 1/5] [WIP]Adds distributed support to opFormattedDataExport --- lazyflow/metaDict.py | 4 ++ .../ioOperators/opFormattedDataExport.py | 54 ++++++++++++++++++- 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/lazyflow/metaDict.py b/lazyflow/metaDict.py index 3eed92b85..f92298433 100644 --- a/lazyflow/metaDict.py +++ b/lazyflow/metaDict.py @@ -25,6 +25,7 @@ import copy import numpy from collections import OrderedDict, defaultdict +from ndstructs import Shape5D class MetaDict(defaultdict): @@ -153,6 +154,9 @@ def getTaggedShape(self): keys = self.getAxisKeys() return OrderedDict(list(zip(keys, self.shape))) + def getShape5D(self): + return Shape5D(**self.getTaggedShape()) + def getAxisKeys(self): assert self.axistags is not None return [tag.key for tag in self.axistags] diff --git a/lazyflow/operators/ioOperators/opFormattedDataExport.py b/lazyflow/operators/ioOperators/opFormattedDataExport.py index 707964655..cdff6266c 100644 --- a/lazyflow/operators/ioOperators/opFormattedDataExport.py +++ b/lazyflow/operators/ioOperators/opFormattedDataExport.py @@ -27,6 +27,11 @@ import collections import warnings import numpy +from typing import Tuple +from pathlib import Path + +import z5py +from ndstructs import Shape5D, Slice5D from lazyflow.utility import format_known_keys from lazyflow.graph import Operator, InputSlot, OutputSlot @@ -34,6 +39,7 @@ from lazyflow.operators.generic import OpSubRegion, OpPixelOperator from lazyflow.operators.valueProviders import OpMetadataInjector from lazyflow.operators.opReorderAxes import OpReorderAxes +from lazyflow.utility.pathHelpers import PathComponents from .opExportSlot import OpExportSlot @@ -125,7 +131,7 @@ def __init__(self, *args, **kwargs): self.FormatSelectionErrorMsg.connect(self._opExportSlot.FormatSelectionErrorMsg) self.progressSignal = self._opExportSlot.progressSignal - def setupOutputs(self): + def get_new_roi(self) -> Tuple[Tuple, Tuple]: # Prepare subregion operator total_roi = roiFromShape(self.Input.meta.shape) total_roi = list(map(tuple, total_roi)) @@ -150,7 +156,22 @@ def setupOutputs(self): ) new_start, new_stop = tuple(clipped_start), tuple(clipped_stop) + return new_start, new_stop + + def get_cutout(self) -> Slice5D: + input_axiskeys = self.Input.meta.getAxisKeys() + cutout_start, cutout_stop = self.get_new_roi() + cutout_slices = tuple(slice(start, stop) for start, stop in zip(cutout_start, cutout_stop)) + return Slice5D.zero(**{axis: slc for axis, slc in zip(input_axiskeys, cutout_slices)}) + def set_cutout(self, cutout: Slice5D): + input_axiskeys = self.Input.meta.getAxisKeys() + start = cutout.start.to_tuple(input_axiskeys, int) + stop = cutout.stop.to_tuple(input_axiskeys, int) + self._opSubRegion.Roi.setValue((start, stop)) + + def setupOutputs(self): + new_start, new_stop = self.get_new_roi() # If we're in the process of switching input data, # then the roi dimensionality might not match up. # Just leave the roi disconnected for now. @@ -258,3 +279,34 @@ def run_export(self): def run_export_to_array(self): return self._opExportSlot.run_export_to_array() + + def run_distributed_export(self, block_shape: Shape5D): + # orchestrator = TaskOrchestrator() + n5_file_path = Path(self.OutputFilenameFormat.value).with_suffix(".n5") + if True: # orchestrator.rank == 0: + output_meta = self.ImageToExport.meta + output_shape = output_meta.getShape5D() + block_shape = block_shape.clamped(maximum=output_shape) + + with z5py.File(n5_file_path, "w") as f: + ds = f.create_dataset( + self.OutputInternalPath.value, + shape=output_meta.shape, + chunks=block_shape.to_tuple(output_meta.getAxisKeys()), + dtype=output_meta.dtype.__name__, + ) + ds[...] = 1 # FIXME: for some reason setting to 0 does nothing + + cutout = self.get_cutout() + # orchestrator.orchestrate(cutout.to_slice_5d().split(block_shape=block_shape)) + def process_tile(tile: Slice5D, rank: int): + self.set_cutout(tile) + slices = tile.to_slices(output_meta.getAxisKeys()) + with z5py.File(n5_file_path, "r+") as n5_file: + print("+++++++++++++++++++++ " + self.OutputInternalPath.value) + dataset = n5_file[self.OutputInternalPath.value] + dataset[slices] = self.ImageToExport.value + + for tile in cutout.split(block_shape=block_shape): + process_tile(tile, 1) + # orchestrator.start_as_worker(process_tile) From 493c075f3ccd62e514e72d3374263216ba7a14f5 Mon Sep 17 00:00:00 2001 From: Tomaz Vieira Date: Wed, 18 Dec 2019 16:23:01 +0100 Subject: [PATCH 2/5] Moves TaskOrchestrator class out of ilastik and into lazyflow --- lazyflow/distributed/TaskOrchestrator.py | 57 ++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 lazyflow/distributed/TaskOrchestrator.py diff --git a/lazyflow/distributed/TaskOrchestrator.py b/lazyflow/distributed/TaskOrchestrator.py new file mode 100644 index 000000000..b743db1e5 --- /dev/null +++ b/lazyflow/distributed/TaskOrchestrator.py @@ -0,0 +1,57 @@ +from mpi4py import MPI +from threading import Thread +import enum +from typing import Generator, TypeVar, Generic, Callable + +TASK_DATUM = TypeVar("TASK_DATUM") + + +class TaskOrchestrator(Generic[TASK_DATUM]): + @enum.unique + class Tags(enum.IntEnum): + TASK_DONE = 13 + TO_WORKER = enum.auto() + + END = "END" + + def __init__(self, comm=None): + self.comm = comm or MPI.COMM_WORLD + self.rank = self.comm.Get_rank() + self.num_workers = self.comm.size - 1 + if self.num_workers <= 0: + raise Exception("Trying to orchestrate tasks with no workers!!!") + + def orchestrate(self, task_data: Generator[TASK_DATUM, None, None]): + print(f"ORCHESTRATOR: Starting orchestration of {self.num_workers}...") + stopped_workers = self.num_workers + for worker_idx in range(1, self.num_workers + 1): + try: + datum = next(task_data) + self.comm.send(datum, dest=worker_idx, tag=self.Tags.TO_WORKER) + stopped_workers -= 1 + print(f"Sent datum {datum} to worker {worker_idx}...") + except StopIteration: + break + + while True: + status = MPI.Status() + self.comm.recv(source=MPI.ANY_SOURCE, tag=self.Tags.TASK_DONE, status=status) + worker_idx = status.Get_source() + + try: + self.comm.send(next(task_data), dest=worker_idx, tag=self.Tags.TO_WORKER) + except StopIteration: + self.comm.send(self.END, dest=worker_idx, tag=self.Tags.TO_WORKER) + stopped_workers += 1 + if stopped_workers == self.num_workers: + break + + def start_as_worker(self, target: Callable[[TASK_DATUM, int], None]): + print(f"WORKER {self.rank}: Started") + while True: + status = MPI.Status() + datum = self.comm.recv(source=MPI.ANY_SOURCE, tag=self.Tags.TO_WORKER, status=status) + if datum == self.END: + break + self.comm.send(target(datum, self.rank), dest=status.Get_source(), tag=self.Tags.TASK_DONE) + print(f"WORKER {self.rank}: Terminated") From dcaa9985258b31d2e315b80641cc36c4d13c37e8 Mon Sep 17 00:00:00 2001 From: Tomaz Vieira Date: Wed, 18 Dec 2019 16:25:32 +0100 Subject: [PATCH 3/5] Implements distributed using mpi. Removes debug prints --- .../ioOperators/opFormattedDataExport.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/lazyflow/operators/ioOperators/opFormattedDataExport.py b/lazyflow/operators/ioOperators/opFormattedDataExport.py index cdff6266c..cdb11fa2a 100644 --- a/lazyflow/operators/ioOperators/opFormattedDataExport.py +++ b/lazyflow/operators/ioOperators/opFormattedDataExport.py @@ -281,10 +281,12 @@ def run_export_to_array(self): return self._opExportSlot.run_export_to_array() def run_distributed_export(self, block_shape: Shape5D): - # orchestrator = TaskOrchestrator() + from lazyflow.distributed.TaskOrchestrator import TaskOrchestrator + + orchestrator = TaskOrchestrator() n5_file_path = Path(self.OutputFilenameFormat.value).with_suffix(".n5") - if True: # orchestrator.rank == 0: - output_meta = self.ImageToExport.meta + output_meta = self.ImageToExport.meta + if orchestrator.rank == 0: output_shape = output_meta.getShape5D() block_shape = block_shape.clamped(maximum=output_shape) @@ -298,15 +300,14 @@ def run_distributed_export(self, block_shape: Shape5D): ds[...] = 1 # FIXME: for some reason setting to 0 does nothing cutout = self.get_cutout() - # orchestrator.orchestrate(cutout.to_slice_5d().split(block_shape=block_shape)) + orchestrator.orchestrate(cutout.split(block_shape=block_shape)) + else: + def process_tile(tile: Slice5D, rank: int): self.set_cutout(tile) slices = tile.to_slices(output_meta.getAxisKeys()) with z5py.File(n5_file_path, "r+") as n5_file: - print("+++++++++++++++++++++ " + self.OutputInternalPath.value) dataset = n5_file[self.OutputInternalPath.value] dataset[slices] = self.ImageToExport.value - for tile in cutout.split(block_shape=block_shape): - process_tile(tile, 1) - # orchestrator.start_as_worker(process_tile) + orchestrator.start_as_worker(process_tile) From ba748916d9896b47ea2df88cee23dce6ea62b581 Mon Sep 17 00:00:00 2001 From: Tomaz Vieira Date: Tue, 14 Jan 2020 15:29:14 +0100 Subject: [PATCH 4/5] Adds 'axes' to attributes.json of .n5 distributed output --- lazyflow/operators/ioOperators/opFormattedDataExport.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lazyflow/operators/ioOperators/opFormattedDataExport.py b/lazyflow/operators/ioOperators/opFormattedDataExport.py index cdb11fa2a..a0551a1bd 100644 --- a/lazyflow/operators/ioOperators/opFormattedDataExport.py +++ b/lazyflow/operators/ioOperators/opFormattedDataExport.py @@ -297,6 +297,7 @@ def run_distributed_export(self, block_shape: Shape5D): chunks=block_shape.to_tuple(output_meta.getAxisKeys()), dtype=output_meta.dtype.__name__, ) + ds.attrs["axes"] = list(reversed(output_meta.getAxisKeys())) ds[...] = 1 # FIXME: for some reason setting to 0 does nothing cutout = self.get_cutout() From 0ba97e28f216b6129402ab1e3e28138bd9dc7b69 Mon Sep 17 00:00:00 2001 From: Tomaz Vieira Date: Mon, 20 Jan 2020 13:48:21 +0100 Subject: [PATCH 5/5] Orchestrator does not hang on more workers than tasks --- lazyflow/distributed/TaskOrchestrator.py | 75 +++++++++++++++--------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/lazyflow/distributed/TaskOrchestrator.py b/lazyflow/distributed/TaskOrchestrator.py index b743db1e5..375d6a426 100644 --- a/lazyflow/distributed/TaskOrchestrator.py +++ b/lazyflow/distributed/TaskOrchestrator.py @@ -1,57 +1,78 @@ from mpi4py import MPI from threading import Thread import enum -from typing import Generator, TypeVar, Generic, Callable +from typing import Generator, TypeVar, Generic, Callable, Tuple +import uuid +END = "END-eb23ae13-709e-4ac3-931d-99ab059ef0c2" TASK_DATUM = TypeVar("TASK_DATUM") -class TaskOrchestrator(Generic[TASK_DATUM]): - @enum.unique - class Tags(enum.IntEnum): - TASK_DONE = 13 - TO_WORKER = enum.auto() +@enum.unique +class Tags(enum.IntEnum): + TASK_DONE = 13 + TO_WORKER = enum.auto() + + +class _Worker(Generic[TASK_DATUM]): + def __init__(self, comm, rank: int): + self.comm = comm + self.rank = rank + self.stopped = False + + def send(self, datum: TASK_DATUM): + print(f"Sending datum {datum} to worker {self.rank}...") + self.comm.send(datum, dest=self.rank, tag=Tags.TO_WORKER) + + def stop(self): + self.send(END) + self.stopped = True - END = "END" +class TaskOrchestrator(Generic[TASK_DATUM]): def __init__(self, comm=None): self.comm = comm or MPI.COMM_WORLD self.rank = self.comm.Get_rank() - self.num_workers = self.comm.size - 1 - if self.num_workers <= 0: + num_workers = self.comm.size - 1 + if num_workers <= 0: raise Exception("Trying to orchestrate tasks with no workers!!!") + self.workers = {rank: _Worker(self.comm, rank) for rank in range(1, num_workers + 1)} + + def get_finished_worker(self) -> _Worker[TASK_DATUM]: + status = MPI.Status() + self.comm.recv(source=MPI.ANY_SOURCE, tag=Tags.TASK_DONE, status=status) + return self.workers[status.Get_source()] def orchestrate(self, task_data: Generator[TASK_DATUM, None, None]): - print(f"ORCHESTRATOR: Starting orchestration of {self.num_workers}...") - stopped_workers = self.num_workers - for worker_idx in range(1, self.num_workers + 1): + print(f"ORCHESTRATOR: Starting orchestration of {len(self.workers)}...") + num_busy_workers = 0 + for worker in self.workers.values(): try: - datum = next(task_data) - self.comm.send(datum, dest=worker_idx, tag=self.Tags.TO_WORKER) - stopped_workers -= 1 - print(f"Sent datum {datum} to worker {worker_idx}...") + worker.send(next(task_data)) + num_busy_workers += 1 except StopIteration: break while True: - status = MPI.Status() - self.comm.recv(source=MPI.ANY_SOURCE, tag=self.Tags.TASK_DONE, status=status) - worker_idx = status.Get_source() - + worker = self.get_finished_worker() try: - self.comm.send(next(task_data), dest=worker_idx, tag=self.Tags.TO_WORKER) + worker.send(next(task_data)) except StopIteration: - self.comm.send(self.END, dest=worker_idx, tag=self.Tags.TO_WORKER) - stopped_workers += 1 - if stopped_workers == self.num_workers: + worker.stop() + num_busy_workers -= 1 + if num_busy_workers == 0: break + for worker in self.workers.values(): + if not worker.stopped: + worker.stop() + def start_as_worker(self, target: Callable[[TASK_DATUM, int], None]): print(f"WORKER {self.rank}: Started") while True: status = MPI.Status() - datum = self.comm.recv(source=MPI.ANY_SOURCE, tag=self.Tags.TO_WORKER, status=status) - if datum == self.END: + datum = self.comm.recv(source=MPI.ANY_SOURCE, tag=Tags.TO_WORKER, status=status) + if datum == END: break - self.comm.send(target(datum, self.rank), dest=status.Get_source(), tag=self.Tags.TASK_DONE) + self.comm.send(target(datum, self.rank), dest=status.Get_source(), tag=Tags.TASK_DONE) print(f"WORKER {self.rank}: Terminated")