diff --git a/examples/torch/pytorch_mnist_process_set.py b/examples/torch/pytorch_mnist_process_set.py new file mode 100644 index 0000000..caa7798 --- /dev/null +++ b/examples/torch/pytorch_mnist_process_set.py @@ -0,0 +1,190 @@ +from __future__ import print_function + +import argparse + +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim +import torch.utils.data.distributed +from torchvision import datasets, transforms + +import horovod.torch as hvd +from grace_dl.torch.communicator.allgather import Allgather +from grace_dl.torch.compressor.topk import TopKCompressor +from grace_dl.torch.memory.residual import ResidualMemory + +# Training settings +parser = argparse.ArgumentParser(description='PyTorch MNIST Example') +parser.add_argument('--batch-size', type=int, default=64, metavar='N', + help='input batch size for training (default: 64)') +parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', + help='input batch size for testing (default: 1000)') +parser.add_argument('--epochs', type=int, default=10, metavar='N', + help='number of epochs to train (default: 10)') +parser.add_argument('--lr', type=float, default=0.01, metavar='LR', + help='learning rate (default: 0.01)') +parser.add_argument('--momentum', type=float, default=0.5, metavar='M', + help='SGD momentum (default: 0.5)') +parser.add_argument('--no-cuda', action='store_true', default=False, + help='disables CUDA training') +parser.add_argument('--seed', type=int, default=42, metavar='S', + help='random seed (default: 42)') +parser.add_argument('--log-interval', type=int, default=10, metavar='N', + help='how many batches to wait before logging training status') +parser.add_argument('--fp16-allreduce', action='store_true', default=False, + help='use fp16 compression during allreduce') +args = parser.parse_args() +args.cuda = not args.no_cuda and torch.cuda.is_available() + +# Horovod: initialize library. +hvd.init() +even_set = hvd.ProcessSet([0,2]) +odd_set = hvd.ProcessSet([1,3]) +hvd.init(process_sets=[even_set, odd_set]) +torch.manual_seed(args.seed) + +if args.cuda: + # Horovod: pin GPU to local rank. + torch.cuda.set_device(hvd.local_rank()) + torch.cuda.manual_seed(args.seed) + +# Horovod: limit # of CPU threads to be used per worker. +torch.set_num_threads(1) + +kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {} +train_dataset = \ + datasets.MNIST('data-%d' % hvd.rank(), train=True, download=True, + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])) +# Horovod: use DistributedSampler to partition the training data. +train_sampler = torch.utils.data.distributed.DistributedSampler( + train_dataset, num_replicas=hvd.size(), rank=hvd.rank()) +train_loader = torch.utils.data.DataLoader( + train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs) + +test_dataset = \ + datasets.MNIST('data-%d' % hvd.rank(), train=False, transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])) +# Horovod: use DistributedSampler to partition the test data. +test_sampler = torch.utils.data.distributed.DistributedSampler( + test_dataset, num_replicas=hvd.size(), rank=hvd.rank()) +test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.test_batch_size, + sampler=test_sampler, **kwargs) + + +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 10, kernel_size=5) + self.conv2 = nn.Conv2d(10, 20, kernel_size=5) + self.conv2_drop = nn.Dropout2d() + self.fc1 = nn.Linear(320, 50) + self.fc2 = nn.Linear(50, 10) + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 2)) + x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) + x = x.view(-1, 320) + x = F.relu(self.fc1(x)) + x = F.dropout(x, training=self.training) + x = self.fc2(x) + return F.log_softmax(x) + + +model = Net() + +if args.cuda: + # Move model to GPU. + model.cuda() + +# Horovod: scale learning rate by the number of GPUs. +optimizer = optim.SGD(model.parameters(), lr=args.lr * hvd.size(), + momentum=args.momentum) + +# Horovod: broadcast parameters & optimizer state. +hvd.broadcast_parameters(model.state_dict(), root_rank=0) +hvd.broadcast_optimizer_state(optimizer, root_rank=0) + +# GRACE: compression algorithm. +# grc = Allgather(TopKCompressor(0.3), ResidualMemory(), hvd.size()) + +from grace_dl.torch.helper import grace_from_params +params = {'compressor': 'topk', 'memory': 'residual', 'communicator': 'allgather'} + + +# Horovod: wrap optimizer with DistributedOptimizer. +if(hvd.ProcessSet.included(even_set)): + grc = grace_from_params(params,even_set) + optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters(),grace=grc,process_set=even_set) +else: + grc = grace_from_params(params,odd_set) + optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters(),grace=grc,process_set=odd_set) + + +def train(epoch): + model.train() + # Horovod: set epoch to sampler for shuffling. + train_sampler.set_epoch(epoch) + for batch_idx, (data, target) in enumerate(train_loader): + if args.cuda: + data, target = data.cuda(), target.cuda() + optimizer.zero_grad() + output = model(data) + loss = F.nll_loss(output, target) + loss.backward() + optimizer.step() + if batch_idx % args.log_interval == 0: + # Horovod: use train_sampler to determine the number of examples in + # this worker's partition. + print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( + epoch, batch_idx * len(data), len(train_sampler), 100. * batch_idx / len(train_loader), loss.item())) + + +def metric_average(val, name): + tensor = torch.tensor(val) + if(hvd.ProcessSet.included(even_set)): + avg_tensor = hvd.allreduce(tensor, name=name,process_set=even_set) + else: + avg_tensor = hvd.allreduce(tensor, name=name,process_set=odd_set) + return avg_tensor.item() + + +def test(): + model.eval() + test_loss = 0. + test_accuracy = 0. + for data, target in test_loader: + if args.cuda: + data, target = data.cuda(), target.cuda() + output = model(data) + # sum up batch loss + test_loss += F.nll_loss(output, target, size_average=False).item() + # get the index of the max log-probability + pred = output.data.max(1, keepdim=True)[1] + test_accuracy += pred.eq(target.data.view_as(pred)).cpu().float().sum() + + # Horovod: use test_sampler to determine the number of examples in + # this worker's partition. + test_loss /= len(test_sampler) + test_accuracy /= len(test_sampler) + + # Horovod: average metric values across workers. + test_loss = metric_average(test_loss, 'avg_loss') + test_accuracy = metric_average(test_accuracy, 'avg_accuracy') + + # Horovod: print output only on first rank. + if(hvd.ProcessSet.included(even_set) and hvd.ProcessSet.rank(even_set)==0): + print('\nTest set: Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format( + test_loss, 100. * test_accuracy)) + elif(hvd.ProcessSet.included(odd_set) and hvd.ProcessSet.rank(odd_set)==0): + print('\nTest set: Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format( + test_loss, 100. * test_accuracy)) + + +for epoch in range(1, args.epochs + 1): + train(epoch) + test() diff --git a/grace_dl/torch/__init__.py b/grace_dl/torch/__init__.py index 12900df..4030ee4 100644 --- a/grace_dl/torch/__init__.py +++ b/grace_dl/torch/__init__.py @@ -36,23 +36,23 @@ def aggregate(self, tensors): class Communicator(ABC): @abstractmethod - def async_send(self, tensors, name): + def async_send(self, tensors, name,process_set): raise NotImplemented("async_send was not implemented.") @abstractmethod - def wait_receive(self, handles, ctx): + def wait_receive(self, handles, ctx,process_set): raise NotImplemented("wait_receive was not implemented.") def __init__(self, compressor, memory): self.compressor = compressor self.memory = memory - def send_step(self, tensor, name): + def send_step(self, tensor, name,process_set): tensor = self.memory.compensate(tensor, name) tensors_compressed, ctx = self.compressor.compress(tensor, name) self.memory.update(tensor, name, self.compressor, tensors_compressed, ctx) - handles = self.async_send(tensors_compressed, name) + handles = self.async_send(tensors_compressed, name,process_set) return handles, ctx - def receive_step(self, handles, ctx): - return self.wait_receive(handles, ctx) + def receive_step(self, handles, ctx,process_set): + return self.wait_receive(handles, ctx,process_set) diff --git a/grace_dl/torch/communicator/allgather.py b/grace_dl/torch/communicator/allgather.py index a598475..4bca0af 100644 --- a/grace_dl/torch/communicator/allgather.py +++ b/grace_dl/torch/communicator/allgather.py @@ -9,7 +9,7 @@ def __init__(self, compressor, memory, world_size): super().__init__(compressor, memory) self.world_size = world_size - def async_send(self, tensors_compressed, name): + def async_send(self, tensors_compressed, name,process_set): """ :param tensors_compressed: list of flat tensors to communicate :param name: for the all_gather operation @@ -26,17 +26,17 @@ def async_send(self, tensors_compressed, name): tensor_sizes = zip(*tensors_size_ag) # transpose else: tensors_size = torch.tensor(tensors_size) # TODO: set device - gathered = allgather(tensors_size) # tensor of tensor sizes per rank + gathered = allgather(tensor=tensors_size,process_set=process_set) # tensor of tensor sizes per rank tensor_sizes = gathered.view([self.world_size, -1]).t().tolist() # transpose, to list handles = [] for tensor_compressed in tensors_compressed: - handle = allgather_async(tensor_compressed) + handle = allgather_async(tensor=tensor_compressed,process_set=process_set) handles.append(handle) return handles, tensor_sizes - def wait_receive(self, result, ctx): + def wait_receive(self, result, ctx,process_set): handles, tensor_sizes = result tensors_ag = [] for handle, sizes in zip(handles, tensor_sizes): diff --git a/grace_dl/torch/helper.py b/grace_dl/torch/helper.py index a59a6b9..c2b170e 100644 --- a/grace_dl/torch/helper.py +++ b/grace_dl/torch/helper.py @@ -1,6 +1,6 @@ -def grace_from_params(params): - import horovod.torch as hvd - world_size = hvd.size() +from horovod.torch.mpi_ops import global_process_set +def grace_from_params(params,processset=global_process_set): + world_size = processset.size() comp = params.get('compressor', 'none') mem = params.get('memory', 'none') comm = params.get('communicator', 'allreduce') diff --git a/patch_files/horovod/torch/__init__.py b/patch_files/horovod/torch/__init__.py index d17c61d..7ad827e 100644 --- a/patch_files/horovod/torch/__init__.py +++ b/patch_files/horovod/torch/__init__.py @@ -16,33 +16,44 @@ from horovod.common.util import check_extension +_MPI_LIB_AVAILABLE = True try: check_extension('horovod.torch', 'HOROVOD_WITH_PYTORCH', __file__, 'mpi_lib_v2') -except: - check_extension('horovod.torch', 'HOROVOD_WITH_PYTORCH', - __file__, 'mpi_lib', '_mpi_lib') - -from horovod.torch import elastic -from horovod.torch.compression import Compression -from horovod.torch.functions import allgather_object, broadcast_object, broadcast_optimizer_state, broadcast_parameters -from horovod.torch.mpi_ops import allreduce, allreduce_async, allreduce_, allreduce_async_ -from horovod.torch.mpi_ops import grouped_allreduce, grouped_allreduce_async, grouped_allreduce_, grouped_allreduce_async_ -from horovod.torch.mpi_ops import allgather, allgather_async -from horovod.torch.mpi_ops import broadcast, broadcast_async, broadcast_, broadcast_async_ -from horovod.torch.mpi_ops import alltoall, alltoall_async -from horovod.torch.mpi_ops import join -from horovod.torch.mpi_ops import poll, synchronize -from horovod.torch.mpi_ops import init, shutdown -from horovod.torch.mpi_ops import is_initialized, start_timeline, stop_timeline -from horovod.torch.mpi_ops import size, local_size, rank, local_rank -from horovod.torch.mpi_ops import mpi_threads_supported, mpi_enabled, mpi_built -from horovod.torch.mpi_ops import gloo_enabled, gloo_built -from horovod.torch.mpi_ops import nccl_built, ddl_built, ccl_built, cuda_built, rocm_built -from horovod.torch.mpi_ops import Average, Sum, Adasum -from horovod.torch.optimizer import DistributedOptimizer -from horovod.torch.sync_batch_norm import SyncBatchNorm +except Exception as e: + # MPI libs are missing, but python applications are still available. + print(e) + print("Warning! MPI libs are missing, but python applications are still available.") + _MPI_LIB_AVAILABLE = False +# only import following function when mpi is available. +if _MPI_LIB_AVAILABLE: + from horovod.torch import elastic + from horovod.torch.compression import Compression + from horovod.torch.functions import allgather_object, broadcast_object, broadcast_optimizer_state, broadcast_parameters + from horovod.torch.mpi_ops import allreduce, allreduce_async, allreduce_, allreduce_async_ + from horovod.torch.mpi_ops import grouped_allreduce, grouped_allreduce_async, grouped_allreduce_, grouped_allreduce_async_ + from horovod.torch.mpi_ops import sparse_allreduce_async + from horovod.torch.mpi_ops import allgather, allgather_async + from horovod.torch.mpi_ops import grouped_allgather, grouped_allgather_async + from horovod.torch.mpi_ops import broadcast, broadcast_async, broadcast_, broadcast_async_ + from horovod.torch.mpi_ops import alltoall, alltoall_async + from horovod.torch.mpi_ops import reducescatter, reducescatter_async + from horovod.torch.mpi_ops import grouped_reducescatter, grouped_reducescatter_async + from horovod.torch.mpi_ops import join + from horovod.torch.mpi_ops import barrier + from horovod.torch.mpi_ops import poll, synchronize + from horovod.torch.mpi_ops import init, shutdown + from horovod.torch.mpi_ops import is_initialized, start_timeline, stop_timeline + from horovod.torch.mpi_ops import size, local_size, cross_size, rank, local_rank, cross_rank + from horovod.torch.mpi_ops import mpi_threads_supported, mpi_enabled, mpi_built + from horovod.torch.mpi_ops import gloo_enabled, gloo_built + from horovod.torch.mpi_ops import nccl_built, ddl_built, ccl_built, cuda_built, rocm_built + from horovod.torch.mpi_ops import ProcessSet, global_process_set, add_process_set, remove_process_set + from horovod.torch.mpi_ops import Average, Sum, Adasum, Min, Max, Product + from horovod.torch.mpi_ops import HorovodInternalError + from horovod.torch.optimizer import DistributedOptimizer + from horovod.torch.sync_batch_norm import SyncBatchNorm # Please run this function in a subprocess def _check_has_gpu(): diff --git a/patch_files/horovod/torch/mpi_ops.py b/patch_files/horovod/torch/mpi_ops.py index a4c16f7..8ee3474 100644 --- a/patch_files/horovod/torch/mpi_ops.py +++ b/patch_files/horovod/torch/mpi_ops.py @@ -20,25 +20,37 @@ import warnings -from horovod.torch import mpi_lib_v2 as mpi_lib from horovod.common.basics import HorovodBasics as _HorovodBasics -_NULL = "" -_basics = _HorovodBasics(__file__, 'mpi_lib_v2') - from horovod.common.exceptions import HorovodInternalError -from horovod.common.util import get_average_backwards_compatibility_fun, gpu_available, num_rank_is_power_2 +from horovod.common.process_sets import _setup as _setup_process_sets +from horovod.common.process_sets import ProcessSet, global_process_set, add_process_set, remove_process_set +from horovod.common.util import check_installed_version, get_average_backwards_compatibility_fun, gpu_available, num_rank_is_power_2 from horovod.torch.compression import Compression +# Check possible symbol not found error from pytorch version mismatch +try: + from horovod.torch import mpi_lib_v2 as mpi_lib +except Exception as e: + check_installed_version('pytorch', torch.__version__, e) + raise e +else: + check_installed_version('pytorch', torch.__version__) + +_NULL = "" + +_basics = _HorovodBasics(__file__, 'mpi_lib_v2') + # import basic methods -init = _basics.init is_initialized = _basics.is_initialized start_timeline = _basics.start_timeline stop_timeline = _basics.stop_timeline size = _basics.size local_size = _basics.local_size +cross_size = _basics.cross_size rank = _basics.rank local_rank = _basics.local_rank +cross_rank = _basics.cross_rank mpi_threads_supported = _basics.mpi_threads_supported mpi_enabled = _basics.mpi_enabled mpi_built = _basics.mpi_built @@ -49,19 +61,32 @@ ccl_built = _basics.ccl_built cuda_built = _basics.cuda_built rocm_built = _basics.rocm_built + def shutdown(*args, **kwargs): mpi_lib.horovod_torch_reset() return _basics.shutdown(*args, **kwargs) +def init(*args, **kwargs): + global _handle_map + _handle_map = {} + _basics.init(*args, **kwargs) + # Call set up again to make sure the basics is in sync + _setup_process_sets(_basics) + # import reduction op values Average = _basics.Average Sum = _basics.Sum Adasum = _basics.Adasum +Min = _basics.Min +Max = _basics.Max +Product = _basics.Product is_homogeneous = _basics.is_homogeneous handle_average_backwards_compatibility = get_average_backwards_compatibility_fun(_basics) +_setup_process_sets(_basics) + # Schema: handle -> input, output # We keep input in order to make sure it does not get garbage collected @@ -82,17 +107,19 @@ def _allreduce_function_factory(tensor): return 'horovod_torch_allreduce_async_' + tensor.type().replace('.', '_') -def _allreduce_async(tensor, output, name, op, prescale_factor, postscale_factor): +def _allreduce_async(tensor, output, name, op, prescale_factor, postscale_factor, process_set: ProcessSet): # Set the divisor for reduced gradients to average when necessary if op == Average: if rocm_built(): # For ROCm, perform averaging at framework level - divisor = size() + divisor = process_set.size() op = Sum else: divisor = 1 elif op == Adasum: + if process_set != global_process_set: + raise NotImplementedError("Adasum does not support non-global process sets yet.") if tensor.device.type != 'cpu' and gpu_available('torch'): if nccl_built(): if not is_homogeneous(): @@ -120,7 +147,7 @@ def _allreduce_async(tensor, output, name, op, prescale_factor, postscale_factor try: handle = getattr(mpi_lib, function)(tensor, output, divisor, name.encode() if name is not None else _NULL, op, - prescale_factor, postscale_factor) + prescale_factor, postscale_factor, process_set.process_set_id) except RuntimeError as e: raise HorovodInternalError(e) _handle_map[handle] = (tensor, output) @@ -128,7 +155,8 @@ def _allreduce_async(tensor, output, name, op, prescale_factor, postscale_factor def allreduce_async(tensor, average=None, name=None, op=None, - prescale_factor=1.0, postscale_factor=1.0): + prescale_factor=1.0, postscale_factor=1.0, + process_set=global_process_set): """ A function that performs asynchronous averaging or summation of the input tensor over all the Horovod processes. The input tensor is not modified. @@ -143,13 +171,16 @@ def allreduce_async(tensor, average=None, name=None, op=None, average: .. warning:: .. deprecated:: 0.19.0 - Use `op` instead. Will be removed in v0.21.0. + Use `op` instead. Will be removed in v1.0. name: A name of the reduction operation. - op: The reduction operation to combine tensors across different - ranks. Defaults to Average if None is given. + op: The reduction operation to combine tensors across different ranks. + Supported op values are Sum, Average, Min, Max, and Product. Defaults + to Average if None is given. prescale_factor: Multiplicative factor to scale tensor before allreduce. postscale_factor: Multiplicative factor to scale tensor after allreduce. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: A handle to the allreduce operation that can be used with `poll()` or @@ -157,30 +188,32 @@ def allreduce_async(tensor, average=None, name=None, op=None, """ op = handle_average_backwards_compatibility(op, average) output = tensor.new(tensor.shape) - return _allreduce_async(tensor, output, name, op, prescale_factor, postscale_factor) + return _allreduce_async(tensor, output, name, op, prescale_factor, postscale_factor, process_set) class HorovodAllreduce(torch.autograd.Function): """An autograd function that performs allreduce on a tensor.""" @staticmethod - def forward(ctx, tensor, average, name, op, prescale_factor, postscale_factor): + def forward(ctx, tensor, average, name, op, prescale_factor, postscale_factor, process_set): ctx.average = average ctx.op = op ctx.prescale_factor = prescale_factor ctx.postscale_factor = postscale_factor - handle = allreduce_async(tensor, average, name, op, prescale_factor, postscale_factor) + ctx.process_set = process_set + handle = allreduce_async(tensor, average, name, op, prescale_factor, postscale_factor, process_set) return synchronize(handle) @staticmethod def backward(ctx, grad_output): return allreduce(grad_output, average=ctx.average, op=ctx.op, prescale_factor=ctx.prescale_factor, - postscale_factor=ctx.postscale_factor), None, None, None, None, None + postscale_factor=ctx.postscale_factor, + process_set=ctx.process_set), None, None, None, None, None, None def allreduce(tensor, average=None, name=None, compression=Compression.none, op=None, - prescale_factor=1.0, postscale_factor=1.0): + prescale_factor=1.0, postscale_factor=1.0, process_set=global_process_set): """ A function that performs averaging or summation of the input tensor over all the Horovod processes. The input tensor is not modified. @@ -199,16 +232,19 @@ def allreduce(tensor, average=None, name=None, compression=Compression.none, op= average: .. warning:: .. deprecated:: 0.19.0 - Use `op` instead. Will be removed in v0.21.0. + Use `op` instead. Will be removed in v1.0. name: A name of the reduction operation. compression: Compression algorithm used during allreduce to reduce the amount of data sent during the each parameter update step. Defaults to not using compression. - op: The reduction operation to combine tensors across different ranks. Defaults + op: The reduction operation to combine tensors across different ranks. + Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given. prescale_factor: Multiplicative factor to scale tensor before allreduce. postscale_factor: Multiplicative factor to scale tensor after allreduce. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: A tensor of the same shape and type as `tensor`, averaged or summed across all @@ -216,12 +252,14 @@ def allreduce(tensor, average=None, name=None, compression=Compression.none, op= """ tensor_compressed, ctx = compression.compress(tensor) summed_tensor_compressed = HorovodAllreduce.apply(tensor_compressed, average, name, op, - prescale_factor, postscale_factor) + prescale_factor, postscale_factor, + process_set) return compression.decompress(summed_tensor_compressed, ctx) def allreduce_async_(tensor, average=None, name=None, op=None, - prescale_factor=1.0, postscale_factor=1.0): + prescale_factor=1.0, postscale_factor=1.0, + process_set=global_process_set): """ A function that performs asynchronous in-place averaging or summation of the input tensor over all the Horovod processes. @@ -236,24 +274,28 @@ def allreduce_async_(tensor, average=None, name=None, op=None, average: .. warning:: .. deprecated:: 0.19.0 - Use `op` instead. Will be removed in v0.21.0. + Use `op` instead. Will be removed in v1.0. name: A name of the reduction operation. - op: The reduction operation to combine tensors across different ranks. Defaults to - Average if None is given. + op: The reduction operation to combine tensors across different ranks. + Supported op values are Sum, Average, Min, Max, and Product. Defaults + to Average if None is given. prescale_factor: Multiplicative factor to scale tensor before allreduce. postscale_factor: Multiplicative factor to scale tensor after allreduce. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: A handle to the allreduce operation that can be used with `poll()` or `synchronize()`. """ op = handle_average_backwards_compatibility(op, average) - return _allreduce_async(tensor, tensor, name, op, prescale_factor, postscale_factor) + return _allreduce_async(tensor, tensor, name, op, prescale_factor, postscale_factor, process_set) def allreduce_(tensor, average=None, name=None, op=None, - prescale_factor=1.0, postscale_factor=1.0): + prescale_factor=1.0, postscale_factor=1.0, + process_set=global_process_set): """ A function that performs in-place averaging or summation of the input tensor over all the Horovod processes. @@ -268,19 +310,22 @@ def allreduce_(tensor, average=None, name=None, op=None, average: .. warning:: .. deprecated:: 0.19.0 - Use `op` instead. Will be removed in v0.21.0. + Use `op` instead. Will be removed in v1.0. name: A name of the reduction operation. - op: The reduction operation to combine tensors across different ranks. Defaults to - Average if None is given. + op: The reduction operation to combine tensors across different ranks. + Supported op values are Sum, Average, Min, Max, and Product. Defaults + to Average if None is given. prescale_factor: Multiplicative factor to scale tensor before allreduce. postscale_factor: Multiplicative factor to scale tensor after allreduce. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: A tensor of the same shape and type as `tensor`, averaged or summed across all processes. """ - handle = allreduce_async_(tensor, average, name, op, prescale_factor, postscale_factor) + handle = allreduce_async_(tensor, average, name, op, prescale_factor, postscale_factor, process_set) return synchronize(handle) @@ -288,16 +333,18 @@ def _grouped_allreduce_function_factory(tensor): return 'horovod_torch_grouped_allreduce_async_' + tensor.type().replace('.', '_') -def _grouped_allreduce_async(tensors, outputs, name, op, prescale_factor, postscale_factor): +def _grouped_allreduce_async(tensors, outputs, name, op, prescale_factor, postscale_factor, process_set: ProcessSet): # Set the divisor for reduced gradients to average when necessary if op == Average: if rocm_built(): # For ROCm, perform averaging at framework level - divisor = size() + divisor = process_set.size() op = Sum else: divisor = 1 elif op == Adasum: + if process_set != global_process_set: + raise NotImplementedError("Adasum does not support non-global process sets yet.") if tensors[0].device.type != 'cpu' and gpu_available('torch'): if nccl_built(): if not is_homogeneous(): @@ -325,7 +372,7 @@ def _grouped_allreduce_async(tensors, outputs, name, op, prescale_factor, postsc try: handle = getattr(mpi_lib, function)(tensors, outputs, divisor, name.encode() if name is not None else _NULL, op, - prescale_factor, postscale_factor) + prescale_factor, postscale_factor, process_set.process_set_id) except RuntimeError as e: raise HorovodInternalError(e) _handle_map[handle] = (tuple(tensors), tuple(outputs)) @@ -333,7 +380,8 @@ def _grouped_allreduce_async(tensors, outputs, name, op, prescale_factor, postsc def grouped_allreduce_async(tensors, average=None, name=None, op=None, - prescale_factor=1.0, postscale_factor=1.0): + prescale_factor=1.0, postscale_factor=1.0, + process_set=global_process_set): """ A function that performs asynchronous averaging or summation of the input tensor list over all the Horovod processes. The input tensors are not modified. @@ -350,13 +398,16 @@ def grouped_allreduce_async(tensors, average=None, name=None, op=None, average: .. warning:: .. deprecated:: 0.19.0 - Use `op` instead. Will be removed in v0.21.0. + Use `op` instead. Will be removed in v1.0. name: A base name to use for the group reduction operation. - op: The reduction operation to combine tensors across different - ranks. Defaults to Average if None is given. + op: The reduction operation to combine tensors across different ranks. + Supported op values are Sum, Average, Min, Max, and Product. Defaults + to Average if None is given. prescale_factor: Multiplicative factor to scale tensor before allreduce. postscale_factor: Multiplicative factor to scale tensor after allreduce. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: A handle to the group allreduce operation that can be used with `poll()` or @@ -364,31 +415,34 @@ def grouped_allreduce_async(tensors, average=None, name=None, op=None, """ op = handle_average_backwards_compatibility(op, average) outputs = [t.new(t.shape) for t in tensors] - return _grouped_allreduce_async(tensors, outputs, name, op, prescale_factor, postscale_factor) + return _grouped_allreduce_async(tensors, outputs, name, op, prescale_factor, postscale_factor, process_set) class HorovodGroupedAllreduce(torch.autograd.Function): """An autograd function that performs allreduce on a list of tensors.""" @staticmethod - def forward(ctx, average, name, op, prescale_factor, postscale_factor, *tensors): + def forward(ctx, average, name, op, prescale_factor, postscale_factor, process_set: ProcessSet, *tensors): ctx.average = average ctx.op = op ctx.prescale_factor = prescale_factor ctx.postscale_factor = postscale_factor - handle = grouped_allreduce_async(list(tensors), average, name, op, prescale_factor, postscale_factor) + ctx.process_set = process_set + handle = grouped_allreduce_async(list(tensors), average, name, op, prescale_factor, postscale_factor, + process_set) return synchronize(handle) @staticmethod def backward(ctx, *grad_output): grad_reduced = grouped_allreduce(list(grad_output), average=ctx.average, op=ctx.op, prescale_factor=ctx.prescale_factor, - postscale_factor=ctx.postscale_factor) - return (None, None, None, None, None, *grad_reduced) + postscale_factor=ctx.postscale_factor, + process_set=ctx.process_set) + return (None, None, None, None, None, None, *grad_reduced) def grouped_allreduce(tensors, average=None, name=None, compression=Compression.none, op=None, - prescale_factor=1.0, postscale_factor=1.0): + prescale_factor=1.0, postscale_factor=1.0, process_set=global_process_set): """ A function that performs averaging or summation of the input tensor list over all the Horovod processes. The input tensors are not modified. @@ -409,16 +463,19 @@ def grouped_allreduce(tensors, average=None, name=None, compression=Compression. average: .. warning:: .. deprecated:: 0.19.0 - Use `op` instead. Will be removed in v0.21.0. + Use `op` instead. Will be removed in v1.0. name: A base name to use for the group reduction operation. compression: Compression algorithm used during allreduce to reduce the amount of data sent during the each parameter update step. Defaults to not using compression. - op: The reduction operation to combine tensors across different ranks. Defaults + op: The reduction operation to combine tensors across different ranks. + Supported op values are Sum, Average, Min, Max, and Product. Defaults to Average if None is given. prescale_factor: Multiplicative factor to scale tensor before allreduce. postscale_factor: Multiplicative factor to scale tensor after allreduce. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: A list containing tensors of the same shape and type as in `tensors`, @@ -427,12 +484,13 @@ def grouped_allreduce(tensors, average=None, name=None, compression=Compression. tensors_compressed, ctxs = zip(*[compression.compress(t) for t in tensors]) summed_tensors_compressed = HorovodGroupedAllreduce.apply(average, name, op, prescale_factor, postscale_factor, - *tensors_compressed) + process_set, *tensors_compressed) return [compression.decompress(t, ctx) for t, ctx in zip(summed_tensors_compressed, ctxs)] def grouped_allreduce_async_(tensors, average=None, name=None, op=None, - prescale_factor=1.0, postscale_factor=1.0): + prescale_factor=1.0, postscale_factor=1.0, + process_set=global_process_set): """ A function that performs asynchronous in-place averaging or summation of the input tensors over all the Horovod processes. @@ -449,24 +507,28 @@ def grouped_allreduce_async_(tensors, average=None, name=None, op=None, average: .. warning:: .. deprecated:: 0.19.0 - Use `op` instead. Will be removed in v0.21.0. + Use `op` instead. Will be removed in v1.0. name: A base name to use for the group reduction operation. - op: The reduction operation to combine tensors across different ranks. Defaults to - Average if None is given. + op: The reduction operation to combine tensors across different ranks. + Supported op values are Sum, Average, Min, Max, and Product. Defaults + to Average if None is given. prescale_factor: Multiplicative factor to scale tensor before allreduce. postscale_factor: Multiplicative factor to scale tensor after allreduce. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: A handle to the group allreduce operation that can be used with `poll()` or `synchronize()`. """ op = handle_average_backwards_compatibility(op, average) - return _grouped_allreduce_async(tensors, tensors, name, op, prescale_factor, postscale_factor) + return _grouped_allreduce_async(tensors, tensors, name, op, prescale_factor, postscale_factor, process_set) def grouped_allreduce_(tensors, average=None, name=None, op=None, - prescale_factor=1.0, postscale_factor=1.0): + prescale_factor=1.0, postscale_factor=1.0, + process_set=global_process_set): """ A function that performs in-place averaging or summation of the input tensors over all the Horovod processes. @@ -483,38 +545,66 @@ def grouped_allreduce_(tensors, average=None, name=None, op=None, average: .. warning:: .. deprecated:: 0.19.0 - Use `op` instead. Will be removed in v0.21.0. + Use `op` instead. Will be removed in v1.0. name: A base name to use for the group reduction operation. - op: The reduction operation to combine tensors across different ranks. Defaults to - Average if None is given. + op: The reduction operation to combine tensors across different ranks. + Supported op values are Sum, Average, Min, Max, and Product. Defaults + to Average if None is given. prescale_factor: Multiplicative factor to scale tensor before allreduce. postscale_factor: Multiplicative factor to scale tensor after allreduce. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: A list containing tensors of the same shape and type as in `tensors`, averaged or summed across all processes. """ - handle = grouped_allreduce_async_(tensors, average, name, op, prescale_factor, postscale_factor) + handle = grouped_allreduce_async_(tensors, average, name, op, prescale_factor, postscale_factor, process_set) return synchronize(handle) +def sparse_allreduce_async(tensor, name, op, process_set=global_process_set): + # Allgather aggregates along the first dimension, so we need to transpose the + # indices to enforce correct concatenation behavior, then transpose back prior to + # constructing the new aggregated sparse gradient + t = tensor + indices_handle = allgather_async(t._indices().transpose(0, 1).contiguous(), name=f'{name}.indices', + process_set=process_set) + values_handle = allgather_async(t._values(), name=f'{name}.values', process_set=process_set) + + def handle(): + # We need to sync values handle firstly for torch nightly >= 10.0 + # Issue: https://github.com/horovod/horovod/issues/2961 + values = synchronize(values_handle) + indices = synchronize(indices_handle) + + values = (values / process_set.size()) if op == Average else values + + if indices.dim() == 0 or values.dim() == 0: + return t.new().resize_as_(t) + return t.new(indices.transpose(0, 1), values, t.size()) + + return handle + + def _allgather_function_factory(tensor): return 'horovod_torch_allgather_async_' + tensor.type().replace('.', '_') -def _allgather_async(tensor, output, name): +def _allgather_async(tensor, output, name, process_set: ProcessSet): function = _check_function(_allgather_function_factory, tensor) try: handle = getattr(mpi_lib, function)( - tensor, output, name.encode() if name is not None else _NULL) + tensor, output, name.encode() if name is not None else _NULL, + process_set.process_set_id) except RuntimeError as e: raise HorovodInternalError(e) _handle_map[handle] = (tensor, output) return handle -def allgather_async(tensor, name=None): +def allgather_async(tensor, name=None, process_set=global_process_set): """ A function that asynchronously concatenates the input tensor with the same input tensor on all other Horovod processes. The input tensor is not modified. @@ -526,44 +616,47 @@ def allgather_async(tensor, name=None): Arguments: tensor: A tensor to allgather. name: A name of the allgather operation. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: A handle to the allgather operation that can be used with `poll()` or `synchronize()`. """ output = tensor.new() - return _allgather_async(tensor, output, name) + return _allgather_async(tensor, output, name, process_set) class HorovodAllgather(torch.autograd.Function): """An autograd function that performs allgather on a tensor.""" @staticmethod - def forward(ctx, tensor, name): + def forward(ctx, tensor, name, process_set: ProcessSet): ctx.dim = tensor.shape[0] - handle = allgather_async(tensor, name) + ctx.process_set = process_set + handle = allgather_async(tensor, name, process_set) return synchronize(handle) @staticmethod def backward(ctx, grad_output): - grad_reduced = allreduce(grad_output, average=False) + grad_reduced = allreduce(grad_output, average=False, process_set=ctx.process_set)#average CHANGE dim_t = torch.IntTensor([ctx.dim]) - dim = allgather(dim_t).view(size()) + dim = allgather(dim_t, process_set=ctx.process_set).view(ctx.process_set.size()) - r = rank() + r = ctx.process_set.rank() offset = torch.sum(dim.narrow(0, 0, r)).item() if r != 0 else 0 - return grad_reduced.narrow(0, offset, ctx.dim), None + return grad_reduced.narrow(0, offset, ctx.dim), None, None -def allgather(tensor, name=None): +def allgather(tensor, name=None, process_set=global_process_set): """ A function that concatenates the input tensor with the same input tensor on all other Horovod processes. The input tensor is not modified. - The concatenation is done on the first dimension, so the input tensors on the - different processes must have the same rank and shape, except for the first - dimension, which is allowed to be different. + The concatenation is done on the first dimension, so the corresponding + input tensors on the different processes must have the same rank and + shape, except for the first dimension, which is allowed to be different. This acts as a thin wrapper around an autograd function. If your input tensor requires gradients, then callings this function will allow gradients @@ -572,6 +665,8 @@ def allgather(tensor, name=None): Arguments: tensor: A tensor to allgather. name: A name of the allgather operation. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: A tensor of the same type as `tensor`, concatenated on dimension zero @@ -579,25 +674,118 @@ def allgather(tensor, name=None): the first dimension, which may be greater and is the sum of all first dimensions of the tensors in different Horovod processes. """ - return HorovodAllgather.apply(tensor, name) + return HorovodAllgather.apply(tensor, name, process_set) + + +def _grouped_allgather_function_factory(tensor): + return 'horovod_torch_grouped_allgather_async_' + tensor.type().replace('.', '_') + + +def _grouped_allgather_async(tensors, outputs, name, process_set: ProcessSet): + function = _check_function(_grouped_allgather_function_factory, tensors[0]) + try: + handle = getattr(mpi_lib, function)( + tensors, outputs, name.encode() if name is not None else _NULL, + process_set.process_set_id) + except RuntimeError as e: + raise HorovodInternalError(e) + _handle_map[handle] = (tuple(tensors), tuple(outputs)) + return handle + + +def grouped_allgather_async(tensors, name=None, process_set=global_process_set): + """ + A function that asynchronously concatenates each input tensor with the corresponding + input tensor on all other Horovod processes for a list of input tensors. The input + tensors are not modified. + + The concatenation is done on the first dimension, so the input tensors on the + different processes must have the same rank and shape, except for the first + dimension, which is allowed to be different. + + Arguments: + tensors: A list of tensors to allgather. + name: A base name to use for the group allgather operation. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. + + Returns: + A handle to the group allgather operation that can be used with `poll()` or + `synchronize()`. + """ + outputs = [t.new() for t in tensors] + return _grouped_allgather_async(tensors, outputs, name, process_set) + + +class HorovodGroupedAllgather(torch.autograd.Function): + """An autograd function that performs allgather on a list of tensors.""" + + @staticmethod + def forward(ctx, name, process_set: ProcessSet, *tensors): + ctx.dims = [t.shape[0] for t in tensors] + ctx.process_set = process_set + handle = grouped_allgather_async(list(tensors), name, process_set) + return synchronize(handle) + + @staticmethod + def backward(ctx, *grad_output): + grad_reduced = grouped_allreduce(list(grad_output), average=True, process_set=ctx.process_set) + + dim_ts = [torch.IntTensor([dim]) for dim in ctx.dims] + dims = [dt.view(ctx.process_set.size()) + for dt in grouped_allgather(dim_ts, process_set=ctx.process_set)] + + r = ctx.process_set.rank() + offsets = [torch.sum(dim.narrow(0, 0, r)).item() if r != 0 else 0 + for dim in dims] + result = [gr.narrow(0, offset, dim) + for gr, offset, dim in zip(grad_reduced, offsets, ctx.dims)] + return (None, None, *result) + + +def grouped_allgather(tensors, name=None, process_set=global_process_set): + """ + A function that concatenates each input tensor with the corresponding + input tensor on all other Horovod processes for a list of input tensors. + The input tensors are not modified. + + The concatenation is done on the first dimension, so the corresponding input + tensors on the different processes must have the same rank and shape, except + for the first dimension, which is allowed to be different. + + Arguments: + tensors: A list of tensors to allgather. + name: A base name to use for the group allgather operation. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. + + Returns: + A list containing tensors of the same type as in `tensors`. Each tensor + is concatenated on dimension zero across all processes. Its shape is + identical to the corresponding input shape, expect for the first + dimension, which may be greater and is the sum of all first dimensions + of the corresponding tensor in different Horovod processes. + """ + return HorovodGroupedAllgather.apply(name, process_set, *tensors) def _broadcast_function_factory(tensor): return 'horovod_torch_broadcast_async_' + tensor.type().replace('.', '_') -def _broadcast_async(tensor, output, root_rank, name): +def _broadcast_async(tensor, output, root_rank, name, process_set: ProcessSet): function = _check_function(_broadcast_function_factory, tensor) try: handle = getattr(mpi_lib, function)( - tensor, output, root_rank, name.encode() if name is not None else _NULL) + tensor, output, root_rank, name.encode() if name is not None else _NULL, + process_set.process_set_id) except RuntimeError as e: raise HorovodInternalError(e) _handle_map[handle] = (tensor, output) return handle -def broadcast_async(tensor, root_rank, name=None): +def broadcast_async(tensor, root_rank, name=None, process_set=global_process_set): """ A function that asynchronously broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The input tensor is not modified. @@ -611,33 +799,36 @@ def broadcast_async(tensor, root_rank, name=None): tensor: A tensor to broadcast. root_rank: The rank to broadcast the value from. name: A name of the broadcast operation. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: A handle to the broadcast operation that can be used with `poll()` or `synchronize()`. """ output = tensor.new(tensor.shape) - return _broadcast_async(tensor, output, root_rank, name) + return _broadcast_async(tensor, output, root_rank, name, process_set) class HorovodBroadcast(torch.autograd.Function): """An autograd function that broadcasts a tensor.""" @staticmethod - def forward(ctx, tensor, root_rank, name): + def forward(ctx, tensor, root_rank, name, process_set: ProcessSet): ctx.root_rank = root_rank - handle = broadcast_async(tensor, root_rank, name) + ctx.process_set = process_set + handle = broadcast_async(tensor, root_rank, name, process_set) return synchronize(handle) @staticmethod def backward(ctx, grad_output): - grad_reduced = allreduce(grad_output, average=False) + grad_reduced = allreduce(grad_output, average=False, process_set=ctx.process_set)#True CHANGED if rank() != ctx.root_rank: grad_reduced *= 0 - return grad_reduced, None, None + return grad_reduced, None, None, None -def broadcast(tensor, root_rank, name=None): +def broadcast(tensor, root_rank, name=None, process_set=global_process_set): """ A function that broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The input tensor is not modified. @@ -655,15 +846,17 @@ def broadcast(tensor, root_rank, name=None): tensor: A tensor to broadcast. root_rank: The rank to broadcast the value from. name: A name of the broadcast operation. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: A tensor of the same shape and type as `tensor`, with the value broadcasted from root rank. """ - return HorovodBroadcast.apply(tensor, root_rank, name) + return HorovodBroadcast.apply(tensor, root_rank, name, process_set) -def broadcast_async_(tensor, root_rank, name=None): +def broadcast_async_(tensor, root_rank, name=None, process_set=global_process_set): """ A function that asynchronously broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The operation is performed in-place. @@ -677,15 +870,17 @@ def broadcast_async_(tensor, root_rank, name=None): tensor: A tensor to broadcast. root_rank: The rank to broadcast the value from. name: A name of the broadcast operation. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: A handle to the broadcast operation that can be used with `poll()` or `synchronize()`. """ - return _broadcast_async(tensor, tensor, root_rank, name) + return _broadcast_async(tensor, tensor, root_rank, name, process_set) -def broadcast_(tensor, root_rank, name=None): +def broadcast_(tensor, root_rank, name=None, process_set=global_process_set): """ A function that broadcasts the input tensor on root rank to the same input tensor on all other Horovod processes. The operation is performed in-place. @@ -699,35 +894,37 @@ def broadcast_(tensor, root_rank, name=None): tensor: A tensor to broadcast. root_rank: The rank to broadcast the value from. name: A name of the broadcast operation. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: A tensor of the same shape and type as `tensor`, with the value broadcasted from root rank. """ - handle = broadcast_async_(tensor, root_rank, name) + handle = broadcast_async_(tensor, root_rank, name, process_set) return synchronize(handle) def _alltoall_function_factory(tensor): return 'horovod_torch_alltoall_async_' + tensor.type().replace('.', '_') -def _alltoall_async(tensor, splits, output, name): +def _alltoall_async(tensor, splits, output, output_received_splits, name, process_set: ProcessSet): if splits is None: # If splits not provided, create empty tensor as placeholder splits = torch.tensor([], dtype=torch.int32, device='cpu') elif not isinstance(splits, torch.Tensor): splits = torch.tensor(splits, dtype=torch.int32, device='cpu') - function = _check_function(_alltoall_function_factory, tensor) try: handle = getattr(mpi_lib, function)( - tensor, splits, output, name.encode() if name is not None else _NULL) + tensor, splits, output, output_received_splits, name.encode() if name is not None else _NULL, + process_set.process_set_id) except RuntimeError as e: raise HorovodInternalError(e) - _handle_map[handle] = (tensor, splits, output) + _handle_map[handle] = (tensor, splits, (output, output_received_splits)) return handle -def alltoall_async(tensor, splits=None, name=None): +def alltoall_async(tensor, splits=None, name=None, process_set=global_process_set): """ A function that scatters slices of the input tensor to all other Horovod processes and returns a tensor of gathered slices from all other Horovod processes. The input @@ -745,37 +942,45 @@ def alltoall_async(tensor, splits=None, name=None): not provided, the first dimension is split equally by the number of Horovod processes. name: A name of the alltoall operation. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: A handle to the alltoall operation that can be used with `poll()` or `synchronize()`. """ output = tensor.new() - return _alltoall_async(tensor, splits, output, name) + if isinstance(splits, torch.Tensor): + output_received_splits = splits.new() + else: + output_received_splits = torch.empty(size(), dtype=torch.int32, device='cpu') + return _alltoall_async(tensor, splits, output, output_received_splits, name, process_set) class HorovodAlltoall(torch.autograd.Function): """An autograd function that performs alltoall on a tensor.""" @staticmethod - def forward(ctx, tensor, splits, name): - ctx.tensor = tensor - ctx.splits = splits - handle = alltoall_async(tensor, splits, name) - return synchronize(handle) + def forward(ctx, tensor, splits, name, process_set: ProcessSet): + handle = alltoall_async(tensor, splits, name, process_set) + output, received_splits = synchronize(handle) + + ctx.process_set = process_set + ctx.recvsplits = received_splits + if splits is None: + return output + else: + ctx.mark_non_differentiable(received_splits) + return output, received_splits @staticmethod - def backward(ctx, grad_output): - recvsplits = None - if ctx.splits is not None: - recvsplits = alltoall(ctx.splits, splits=torch.ones(size(), dtype=torch.int32, device='cpu')) - else: - splits_equal = torch.ones(size(), dtype=torch.int32, device='cpu') * (ctx.tensor.size()[0] // size()) - recvsplits = alltoall(splits_equal, splits=torch.ones(size(), dtype=torch.int32, device='cpu')) - return alltoall(grad_output, splits=recvsplits), None, None + def backward(ctx, grad_output, *dead_gradients): + grad_wrt_tensor, _ = alltoall(grad_output, splits=ctx.recvsplits, + process_set=ctx.process_set) + return grad_wrt_tensor, None, None, None -def alltoall(tensor, splits=None, name=None): +def alltoall(tensor, splits=None, name=None, process_set=global_process_set): """ A function that scatters slices of the input tensor to all other Horovod processes and returns a tensor of gathered slices from all other Horovod processes. The input @@ -797,22 +1002,263 @@ def alltoall(tensor, splits=None, name=None): not provided, the first dimension is split equally by the number of Horovod processes. name: A name of the alltoall operation. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. Returns: - A tensor containing the gathered tensor data from all workers. + 1) A tensor containing the gathered tensor data from all workers. + 2) If `splits` has been provided: A tensor of integers in rank order + describing how many elements in the output tensor have been received + from each worker. + """ + return HorovodAlltoall.apply(tensor, splits, name, process_set) + + +def _reducescatter_function_factory(tensor): + return 'horovod_torch_reducescatter_async_' + tensor.type().replace('.', '_') + + +def _reducescatter_async(tensor, output, name, op, process_set: ProcessSet, + prescale_factor, postscale_factor): + function = _check_function(_reducescatter_function_factory, tensor) + try: + handle = getattr(mpi_lib, function)(tensor, output, + name.encode() if name is not None else _NULL, + op, process_set.process_set_id, + prescale_factor, postscale_factor) + except RuntimeError as e: + raise HorovodInternalError(e) + _handle_map[handle] = (tensor, output) + return handle + + +def reducescatter_async(tensor, name=None, op=Average, process_set=global_process_set, + prescale_factor=1.0, postscale_factor=1.0): """ - return HorovodAlltoall.apply(tensor, splits, name) + A function that performs asynchronous reduction of the input tensor over all the + Horovod processes, then scatters the results across all Horovod processes. The + input tensor is not modified. + + The reduction operation is keyed by the name. If name is not provided, an incremented + auto-generated name is used. The tensor type and shape must be the same on all + Horovod processes for a given name. The reduction will not start until all processes + are ready to send and receive the tensor. + + The input tensors on the different processes must have the same rank and shape. The + output tensor will be the same rank on all processes, but the first dimension may + be different. + + Arguments: + tensor: A tensor to average and sum. + name: A name of the reduction operation. + op: The reduction operation to combine tensors across different ranks. + Defaults to Average. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. + prescale_factor: Multiplicative factor to scale tensor before reducescatter. + postscale_factor: Multiplicative factor to scale tensor after reducescatter. + + Returns: + A handle to the reducescatter operation that can be used with `poll()` or + `synchronize()`. + """ + output = tensor.new() + return _reducescatter_async(tensor, output, name, op, process_set, + prescale_factor, postscale_factor) + + +class HorovodReducescatter(torch.autograd.Function): + """An autograd function that performs reducescatter on a tensor.""" + + @staticmethod + def forward(ctx, tensor, name, op, process_set, prescale_factor, postscale_factor): + ctx.op = op + ctx.process_set = process_set + ctx.prescale_factor = prescale_factor + ctx.postscale_factor = postscale_factor + handle = reducescatter_async(tensor, name, op, process_set, prescale_factor, postscale_factor) + return synchronize(handle) + + @staticmethod + def backward(ctx, grad_output): + if ctx.op == Sum: + grad_output *= ctx.process_set.size() + if ctx.prescale_factor != 1.0: + grad_output *= ctx.prescale_factor + if ctx.postscale_factor != 1.0: + grad_output *= ctx.postscale_factor + + return allgather(grad_output, process_set=ctx.process_set), None, None, None, None, None + + +def reducescatter(tensor, name=None, compression=Compression.none, op=Average, + process_set=global_process_set, prescale_factor=1.0, postscale_factor=1.0): + """ + A function that performs reduction of the input tensor over all the Horovod + processes, then scatters the results across all Horovod processes. The input tensor + is not modified. + + The reduction operation is keyed by the name. If name is not provided, an incremented + auto-generated name is used. The tensor type and shape must be the same on all + Horovod processes for a given name. The reduction will not start until all processes + are ready to send and receive the tensor. + + Arguments: + tensor: A tensor to average/sum and scatter. + name: A name of the reduction operation. + compression: Compression algorithm used during reducescatter to reduce the amount + of data sent during the each parameter update step. Defaults to + not using compression. + op: The reduction operation to combine tensors across different ranks. + Defaults to Average. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. + prescale_factor: Multiplicative factor to scale tensor before reducescatter. + postscale_factor: Multiplicative factor to scale tensor after reducescatter. + + Returns: + A tensor of the same rank and type as `tensor` across all processes. The shape + is identical to the input shape, except for the first dimension, which will be + divided across the different Horovod processes. + """ + tensor_compressed, ctx = compression.compress(tensor) + reduced_tensor_compressed = HorovodReducescatter.apply(tensor_compressed, name, op, process_set, + prescale_factor, postscale_factor) + return compression.decompress(reduced_tensor_compressed, ctx) + + +def _grouped_reducescatter_function_factory(tensor): + return 'horovod_torch_grouped_reducescatter_async_' + tensor.type().replace('.', '_') + + +def _grouped_reducescatter_async(tensors, outputs, name, op, process_set: ProcessSet, + prescale_factor, postscale_factor): + function = _check_function(_grouped_reducescatter_function_factory, tensors[0]) + try: + handle = getattr(mpi_lib, function)(tensors, outputs, + name.encode() if name is not None else _NULL, + op, process_set.process_set_id, + prescale_factor, postscale_factor) + except RuntimeError as e: + raise HorovodInternalError(e) + _handle_map[handle] = (tuple(tensors), tuple(outputs)) + return handle + + +def grouped_reducescatter_async(tensors, name=None, op=Average, process_set=global_process_set, + prescale_factor=1.0, postscale_factor=1.0): + """ + A function that performs asynchronous reduction of a list of input tensors over all the + Horovod processes, then scatters the results across all Horovod processes. The + input tensors are not modified. + + The reduction operation is keyed by the name. If name is not provided, an incremented + auto-generated name is used. For each of the input tensors, the type and shape must + be the same on all Horovod processes for a given name. The reduction will not start + until all processes are ready to send and receive the tensors. + + The input tensor at some place in the list tensor must have the same rank and shape + on the different processes. The corresponding output tensor will be the same rank on + all processes, but the first dimension may be different. + + Arguments: + tensors: A list of tensors to average and sum. + name: A base name to use for the group reduction operation. + op: The reduction operation to combine tensors across different ranks. + Defaults to Average. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. + prescale_factor: Multiplicative factor to scale tensors before reducescatter. + postscale_factor: Multiplicative factor to scale tensors after reducescatter. + + Returns: + A handle to the group reducescatter operation that can be used with `poll()` or + `synchronize()`. + """ + outputs = [t.new() for t in tensors] + return _grouped_reducescatter_async(tensors, outputs, name, op, process_set, + prescale_factor, postscale_factor) + + +class HorovodGroupedReducescatter(torch.autograd.Function): + """An autograd function that performs reducescatter on a list of tensors.""" + + @staticmethod + def forward(ctx, name, op, process_set, prescale_factor, postscale_factor, *tensors): + ctx.op = op + ctx.process_set = process_set + ctx.prescale_factor = prescale_factor + ctx.postscale_factor = postscale_factor + handle = grouped_reducescatter_async(list(tensors), name, op, process_set, + prescale_factor, postscale_factor, ) + return synchronize(handle) + + @staticmethod + def backward(ctx, *grad_output): + if ctx.op == Sum: + grad_output = [g * ctx.process_set.size() for g in grad_output] + if ctx.prescale_factor != 1.0: + grad_output = [ctx.prescale_factor * g for g in grad_output] + if ctx.postscale_factor != 1.0: + grad_output = [ctx.postscale_factor * g for g in grad_output] + + return (None, None, None, None, None, + *grouped_allgather(grad_output, process_set=ctx.process_set)) + + +def grouped_reducescatter(tensors, name=None, compression=Compression.none, op=Average, + process_set=global_process_set, prescale_factor=1.0, postscale_factor=1.0): + """ + A function that performs reduction of a list of input tensors over all the + Horovod processes, then scatters the results across all Horovod processes. The + input tensors are not modified. + + The reduction operation is keyed by the name. If name is not provided, an incremented + auto-generated name is used. The tensor type and shape must be the same on all + Horovod processes for a given name. The reduction will not start until all processes + are ready to send and receive the tensor. + + Arguments: + tensors: A list of tensors to average and sum. + name: A base name to use for the group reduction operation. + compression: Compression algorithm used during reducescatter to reduce the amount + of data sent during the each parameter update step. Defaults to + not using compression. + op: The reduction operation to combine tensors across different ranks. + Defaults to Average. + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. + prescale_factor: Multiplicative factor to scale tensors before reducescatter. + postscale_factor: Multiplicative factor to scale tensors after reducescatter. + + + Returns: + A list containing tensors of the same rank and type as in `tensors`. For each + tensor the shape is identical to the input shape, except for the first dimension, + which will be divided across the different Horovod processes. + """ + tensors_compressed = [] + ctxs = [] + for tensor in tensors: + tensor_compressed, ctx = compression.compress(tensor) + tensors_compressed.append(tensor_compressed) + ctxs.append(ctx) + reduced_tensors_compressed = HorovodGroupedReducescatter.apply(name, op, process_set, + prescale_factor, postscale_factor, + *tensors_compressed) + return [compression.decompress(reduced_tensor_compressed, ctx) + for reduced_tensor_compressed, ctx in zip(reduced_tensors_compressed, ctxs)] def poll(handle): """ - Polls an allreduce, allgather or broadcast handle to determine whether underlying - asynchronous operation has completed. After `poll()` returns `True`, `synchronize()` - will return without blocking. + Polls an allreduce, allgather, alltoall, broadcast, or reducescatter handle to determine whether + underlying asynchronous operation has completed. After `poll()` returns `True`, + `synchronize()` will return without blocking. Arguments: - handle: A handle returned by an allreduce, allgather or broadcast asynchronous - operation. + handle: A handle returned by an allreduce, allgather, alltoall, broadcast, or reducescatter + asynchronous operation. Returns: A flag indicating whether the operation has completed. @@ -822,15 +1268,15 @@ def poll(handle): def synchronize(handle): """ - Synchronizes an asynchronous allreduce, allgather or broadcast operation until - it's completed. Returns the result of the operation. + Synchronizes an asynchronous allreduce, allgather, alltoall, broadcast, or reducescatter operation + until it's completed. Returns the result of the operation. Arguments: - handle: A handle returned by an allreduce, allgather or broadcast asynchronous - operation. + handle: A handle returned by an allreduce, allgather, alltoall, broadcast, or reducescatter + asynchronous operation. Returns: - An output tensor of the operation. + A single output tensor of the operation or a tuple of multiple output tensors. """ if handle not in _handle_map: return @@ -840,10 +1286,11 @@ def synchronize(handle): output = _handle_map.pop(handle)[-1] return output except RuntimeError as e: + _handle_map.pop(handle, None) raise HorovodInternalError(e) -def join(device=-1): +def join(device=-1) -> int: """A function that indicates that the rank finished processing data. All ranks that did not call join() continue to process allreduce operations. @@ -855,7 +1302,32 @@ def join(device=-1): Returns: Id of the rank that joined last. """ + output = torch.tensor(-1, dtype=torch.int, device=torch.device("cpu")) + try: + handle = mpi_lib.horovod_torch_join(output, device) + except RuntimeError as e: + raise HorovodInternalError(e) + + _handle_map[handle] = (None, output) + + return synchronize(handle).item() + +def barrier(process_set=global_process_set): + """ + A function that acts as a simple sychronization point for ranks specified + in the given process group(default to global group). Ranks that reach + this function call will stall until all other ranks have reached. + + Arguments: + process_set: Process set object to limit this operation to a subset of + Horovod processes. Default is the global process set. + """ + try: - return mpi_lib.horovod_torch_join(device) + handle = mpi_lib.horovod_torch_barrier(process_set.process_set_id) except RuntimeError as e: raise HorovodInternalError(e) + + _handle_map[handle] = (None, None) + + synchronize(handle) diff --git a/patch_files/horovod/torch/optimizer.py b/patch_files/horovod/torch/optimizer.py index ffe7d42..bf72158 100644 --- a/patch_files/horovod/torch/optimizer.py +++ b/patch_files/horovod/torch/optimizer.py @@ -25,28 +25,32 @@ from horovod.torch.compression import Compression from horovod.torch.functions import broadcast_object -from horovod.torch.mpi_ops import allreduce_async_, grouped_allreduce_async_ +from horovod.torch.mpi_ops import allreduce_async_, grouped_allreduce_async_, sparse_allreduce_async from horovod.torch.mpi_ops import synchronize from horovod.torch.mpi_ops import size from horovod.torch.mpi_ops import Average, Adasum, Sum from horovod.torch.mpi_ops import rocm_built +from horovod.torch.mpi_ops import ProcessSet, global_process_set class _DistributedOptimizer(torch.optim.Optimizer): - def __init__(self, params, named_parameters, grace, compression, + def __init__(self, params, named_parameters,grace, compression, backward_passes_per_step=1, op=Average, gradient_predivide_factor=1.0, - num_groups=0): + groups=None, + sparse_as_dense=False, + process_set=global_process_set): super(self.__class__, self).__init__(params) - self._compression = compression self._grace = grace + self._process_set=process_set + self._compression = compression if named_parameters is not None: named_parameters = list(named_parameters) else: - named_parameters = [('allreduce.noname.%s' % i, v) - for param_group in self.param_groups - for i, v in enumerate(param_group['params'])] + named_parameters = [(f'allreduce.noname.{i}.{j}', v) + for i, param_group in enumerate(self.param_groups) + for j, v in enumerate(param_group['params'])] # make sure that named_parameters are tuples if any([not isinstance(p, tuple) for p in named_parameters]): raise ValueError('named_parameters should be a sequence of ' @@ -74,15 +78,33 @@ def __init__(self, params, named_parameters, grace, compression, for _, v in sorted(named_parameters)} self.op = op self.gradient_predivide_factor = gradient_predivide_factor + self.sparse_as_dense = sparse_as_dense + self.process_set = process_set + self._handles = {} self._grad_accs = [] self._requires_update = set() self._synchronized = False self._should_synchronize = True - self._num_groups = num_groups + + if groups is not None: + if not (isinstance(groups, list) or groups > 0): + raise ValueError('groups should be a non-negative integer or ' + 'a list of list of torch.Tensor.') + if isinstance(groups, list): + grouped_parameter_ids = set() + for l in groups: + for p in l: + if not isinstance(p, torch.Tensor): + raise ValueError('groups must consist of torch.Tensor.') + if id(p) in grouped_parameter_ids: + raise ValueError('A parameter can only appear once in groups.') + grouped_parameter_ids.add(id(p)) + self._groups = groups self._p_to_group = {} self._group_counts = {} - if size() > 1 or os.environ.get('HOROVOD_ELASTIC') == '1': + + if self.process_set.included() and (size() > 1 or os.environ.get('HOROVOD_ELASTIC') == '1'): self._register_hooks() def load_state_dict(self, *args, **kwargs): @@ -109,8 +131,7 @@ def set_backward_passes_per_step(self, passes): self._allreduce_delay[p] = self.backward_passes_per_step def _register_hooks(self): - - if self._num_groups > 0: + if self._groups is not None: p_list = [] # Get list of parameters with grads for param_group in self.param_groups: @@ -121,16 +142,29 @@ def _register_hooks(self): # To ensure parameter order and group formation is consistent, broadcast p_list order # from rank 0 and use for every worker p_list_names = [self._parameter_names.get(p) for p in p_list] - p_list_names = broadcast_object(p_list_names, root_rank=0) - p_list = sorted(p_list, key=lambda p : p_list_names.index(self._parameter_names.get(p))) + p_list_names = broadcast_object(p_list_names, root_rank=0, process_set=self.process_set) + p_list = sorted(p_list, key=lambda p: p_list_names.index(self._parameter_names.get(p))) # Form groups - p_groups = split_list(p_list, self._num_groups) + if isinstance(self._groups, list): + p_groups = [] + grouped_id = set() + p_list_ids = [id(p) for p in p_list] + for group in self._groups: + p_groups.append([p for p in group if id(p) in p_list_ids]) + for p in p_groups[-1]: + grouped_id.add(id(p)) + for p in p_list: + if id(p) not in grouped_id: + p_groups.append([p]) + else: + p_groups = split_list(p_list, self._groups) + p_groups = [tuple(p) for p in p_groups] for group in p_groups: - for p in group: - self._p_to_group[p] = group - self._group_counts[group] = 0 + for p in group: + self._p_to_group[p] = group + self._group_counts[group] = 0 for param_group in self.param_groups: for p in param_group['params']: @@ -143,16 +177,33 @@ def _register_hooks(self): self._grad_accs.append(grad_acc) def _allreduce_grad_async(self, p): + if p.grad is None: + # Gradient was not computed, but we still need to submit a tensor to allreduce + # as one of the other ranks may have computed it (due to dynamic forward functions). + # + # NOTE: this will not work if the gradient is sparse and we perform an allgather. + # Unfrotunately, there doesn't appear to be a good way to detect that the parameter will + # produce sparse gradients before computing the gradient. + p.grad = p.data.new(p.size()).zero_() + name = self._parameter_names.get(p) tensor = p.grad - if self._grace and self._num_groups == 0 and self.op == Average: - handle, ctx = self._grace.send_step(tensor, name) + + if p.grad.is_sparse: + if self.sparse_as_dense: + tensor = tensor.to_dense() + else: + return self._sparse_allreduce_grad_async(p, name) + if self._grace and self._groups is None and self.op == Average: + handle, ctx = self._grace.send_step(tensor, name,self._process_set) + postscale_factor = self.gradient_predivide_factor else: + tensor_compressed, ctx = self._compression.compress(tensor) if self.op == Average: - # Split average operation across pre/postscale factors - # C++ backend will apply additional 1 / size() factor to postscale_factor for op == Average. + # Split average operation across pre/postscale factors + # C++ backend will apply additional 1 / size() factor to postscale_factor for op == Average. prescale_factor = 1.0 / self.gradient_predivide_factor postscale_factor = self.gradient_predivide_factor else: @@ -160,17 +211,24 @@ def _allreduce_grad_async(self, p): postscale_factor = 1.0 handle = allreduce_async_(tensor_compressed, name=name, op=self.op, - prescale_factor=prescale_factor, - postscale_factor=postscale_factor) + prescale_factor=prescale_factor, + postscale_factor=postscale_factor, + process_set=self.process_set) return handle, ctx def _grouped_allreduce_grad_async(self, ps): name = self._parameter_names.get(ps[0]) tensors_compressed, ctxs = zip(*[self._compression.compress(p.grad) for p in ps]) - handle = grouped_allreduce_async_(tensors_compressed, name=name, op=self.op) + handle = grouped_allreduce_async_(tensors_compressed, name=name, op=self.op, + process_set=self.process_set) return handle, ctxs + def _sparse_allreduce_grad_async(self, p, name): + handle = sparse_allreduce_async(p.grad, name=name, op=self.op, + process_set=self.process_set) + return handle, None + def _make_hook(self, p): def hook(*ignore): if p in self._handles and self._handles[p][0] is not None: @@ -185,7 +243,7 @@ def hook(*ignore): handle, ctx = None, None self._allreduce_delay[p] -= 1 if self._allreduce_delay[p] == 0: - if self._num_groups > 0: + if self._groups is not None: group = self._p_to_group[p] self._group_counts[group] += 1 if self._group_counts[group] == len(group): @@ -202,6 +260,10 @@ def hook(*ignore): return hook def synchronize(self): + if not self.process_set.included(): + self._synchronized = True + return + completed = set() for x in self._handles.keys(): completed.update(x) if isinstance(x, tuple) else completed.add(x) @@ -214,23 +276,37 @@ def synchronize(self): if handle is None: handle, ctx = self._allreduce_grad_async(p) self._handles[p] = (handle, ctx) - for p, (handle, ctx) in self._handles.items(): + if isinstance(p, tuple): # This was a grouped result, need to unpack outputs = synchronize(handle) for gp, output, gctx in zip(p, outputs, ctx): self._allreduce_delay[gp] = self.backward_passes_per_step gp.grad.set_(self._compression.decompress(output, gctx)) + if self._groups is not None and self._group_counts[p] != 0: + self._group_counts[p] = 0 else: - if self._grace and self._num_groups == 0 and self.op == Average: - # in GRACE, p is not tuple, but handle is. - output = self._grace.receive_step(handle, ctx) - self._allreduce_delay[p] = self.backward_passes_per_step - p.grad.set_(output) + # When handle is a callable function, it returns the aggregated tensor result + if self._grace and self._groups is None and self.op == Average: + output = self._grace.receive_step(handle, ctx,self._process_set) + self._allreduce_delay[p] = self.backward_passes_per_step + p.grad.set_(output) else: - output = synchronize(handle) + output = synchronize(handle) if not callable(handle) else handle() self._allreduce_delay[p] = self.backward_passes_per_step + if self._groups is not None: + group = self._p_to_group[p] + if self._group_counts[group] != 0: + self._group_counts[group] = 0 + if p.grad.is_sparse: + aggregated = self._compression.decompress(output, ctx) + if not aggregated.is_sparse: + # When sparse_as_dense=True we need to convert the grad back to sparse before update + aggregated = aggregated.to_sparse() + + # Sparse grads do not support set_ for some reason, so we do this as an equivalent + p.grad.zero_().add_(aggregated) p.grad.set_(self._compression.decompress(output, ctx)) self._handles.clear() @@ -448,13 +524,14 @@ def zero_grad(self): return super(self.__class__, self).zero_grad() -def DistributedOptimizer(optimizer, - grace=None, named_parameters=None, +def DistributedOptimizer(optimizer, named_parameters=None,grace=None, compression=Compression.none, backward_passes_per_step=1, op=Average, gradient_predivide_factor=1.0, - num_groups=0): + num_groups=0, groups=None, + sparse_as_dense=False, + process_set=global_process_set): """ An optimizer that wraps another torch.optim.Optimizer, using an allreduce to combine gradient values before applying gradients to model weights. @@ -499,6 +576,18 @@ def DistributedOptimizer(optimizer, gradient_predivide_factor / size after the sum. num_groups: Number of groups to assign gradient allreduce ops to for explicit grouping. Defaults to no explicit groups. + groups: The parameter to group the gradient allreduce ops. Accept values is a + non-negative integer or a list of list of torch.Tensor. + If groups is a non-negative integer, it is the number of groups to assign + gradient allreduce ops to for explicit grouping. + If groups is a list of list of torch.Tensor. Tensors in the same + inner list will be assigned to the same group, while parameter that does + not appear in any list will form a group itself. + Defaults as None, which is no explicit groups. + sparse_as_dense: If set True, convert all sparse gradients to dense and perform allreduce, then + convert back to sparse before applying the update. + process_set: Gradients will only be reduced over Horovod processes belonging + to this process set. Defaults to the global process set. """ # We dynamically create a new class that inherits from the optimizer that was passed in. # The goal is to override the `step()` method with an allreduce implementation. @@ -508,12 +597,23 @@ def DistributedOptimizer(optimizer, if op != Average: raise ValueError('gradient_predivide_factor not supported with op != Average') + if num_groups != 0: + warnings.warn('Parameter `num_groups` has been replaced by `groups` ' + 'and will be removed in v0.23.0.', DeprecationWarning) + if groups is None: + groups = num_groups + + if backward_passes_per_step <= 0: + raise ValueError("backward_passes_per_step must be > 0") + if op != Adasum or size() == 1: cls = type(optimizer.__class__.__name__, (optimizer.__class__,), dict(_DistributedOptimizer.__dict__)) - return cls(optimizer.param_groups, named_parameters, grace, compression, backward_passes_per_step, op, - gradient_predivide_factor, num_groups) + return cls(optimizer.param_groups, named_parameters,grace, compression, backward_passes_per_step, op, + gradient_predivide_factor, groups, sparse_as_dense, process_set) else: + if process_set != global_process_set: + raise NotImplementedError("Adasum does not support non-global process sets yet.") cls = type(optimizer.__class__.__name__, (optimizer.__class__,), dict(_DistributedAdasumOptimizer.__dict__)) return cls(optimizer.param_groups, named_parameters, compression, backward_passes_per_step)