From 89bc359fd2322777ced50ba9b92f6d8795d18b5a Mon Sep 17 00:00:00 2001 From: Negar Foroutan Date: Mon, 22 Oct 2018 16:33:53 +0200 Subject: [PATCH 1/8] implementation of sparsifiedSGD optimizer --- mlbench/refimpls/pytorch/optim/optimizer.py | 181 +++++++++++++++++++- 1 file changed, 180 insertions(+), 1 deletion(-) diff --git a/mlbench/refimpls/pytorch/optim/optimizer.py b/mlbench/refimpls/pytorch/optim/optimizer.py index e4191d6..b629c80 100644 --- a/mlbench/refimpls/pytorch/optim/optimizer.py +++ b/mlbench/refimpls/pytorch/optim/optimizer.py @@ -1,4 +1,7 @@ +import torch import torch.optim as optim +from torch.optim.optimizer import Optimizer, required +import random def get_optimizer(options, model): @@ -15,7 +18,8 @@ def get_optimizer(options, model): :rtype: optimizer :raises: NotImplementedError """ - lr = options.lr if hasattr(options, 'lr') else options.lr_per_sample * options.batch_size + # lr = options.lr if hasattr(options, 'lr') else options.lr_per_sample * options.batch_size + lr = options.lr if options.lr else options.lr_per_sample * options.batch_size if options.opt_name == 'sgd': optimizer = optim.SGD(model.parameters(), @@ -23,8 +27,183 @@ def get_optimizer(options, model): momentum=options.momentum, weight_decay=options.weight_decay, nesterov=options.nesterov) + elif options.opt_name == 'sparsified_sgd': + optimizer = sparsified_SGD(model.parameters(), + lr=lr, + weight_decay=options.weight_decay, + sparse_grad_size=options.sparse_grad_size) else: raise NotImplementedError("The optimizer `{}` specified by `options` is not implemented." .format(options.opt_name)) return optimizer + +class sparsified_SGD(Optimizer): + r"""Implements sparsified version of stochastic gradient descent. + + Args: + params (iterable): iterable of parameters to optimize or dicts defining + parameter groups + lr (float): learning rate + weight_decay (float, optional): weight decay (L2 penalty) (default: 0) + sparse_grad_size (int): Size of the sparsified gradients vector. + + """ + + def __init__(self, params, lr=required, weight_decay=0, sparse_grad_size=10): + + if lr is not required and lr < 0.0: + raise ValueError("Invalid learning rate: {}".format(lr)) + if weight_decay < 0.0: + raise ValueError("Invalid weight_decay value: {}".format(weight_decay)) + + defaults = dict(lr=lr, weight_decay=weight_decay) + + super(sparsified_SGD, self).__init__(params, defaults) + + self.__create_gradients_memory() + self.__create_weighted_average_params() + + self.num_coordinates = sparse_grad_size + self.current_block = -1 + + + def __setstate__(self, state): + super(sparsified_SGD, self).__setstate__(state) + + def __create_weighted_average_params(self): + + for group in self.param_groups: + for p in group['params']: + param_state = self.state[p] + param_state['estimated_w'] = torch.zeros_like(p.data) + + def __create_gradients_memory(self): + """ Create a memory for parameters. """ + for group in self.param_groups: + for p in group['params']: + param_state = self.state[p] + param_state['memory'] = torch.zeros_like(p.data) + + def step(self, closure=None): + """Performs a single optimization step. + + Arguments: + closure (callable, optional): A closure that reevaluates the model + and returns the loss. + """ + loss = None + if closure is not None: + loss = closure() + + for group in self.param_groups: + + weight_decay = group['weight_decay'] + + for p in group['params']: + + if p.grad is None: + continue + d_p = p.grad.data + + if weight_decay != 0: + d_p.add_(weight_decay, p.data) + p.data.add_(-d_p) + + return loss + + def sparsify_gradients(self, model, lr, random_sparse): + + if random_sparse: + return self._random_sparsify(model, lr) + else: + return self._block_sparsify(model, lr) + + def _block_sparsify(self, model, lr): + """ + Sparsify the gradients vector by choosing a block of of them + :param model: learning model + :param lr: learning rate + :return: sparsified gradients vector (a block of gradients and the beginning index of the block) + """ + params_sparse_tensors = [] + + for ind, param in enumerate(model.parameters()): + + param_size = param.data.size()[1] + gradients = param.grad.data * lr[0] + self.state[param]['memory'] + self.state[param]['memory'] += param.grad.data * lr[0] + + num_blocks = int(param_size / self.num_coordinates) + + if self.current_block == -1: + self.current_block = random.randint(0, num_blocks - 1) + elif self.current_block == num_blocks: + self.current_block = 0 + + begin = self.current_block * self.num_coordinates + end = begin + self.num_coordinates + #TODO do something for last block! + # if self.current_block == (num_blocks - 1): + # end = param_size + # else: + # end = begin + self.num_coordinates + + self.state[param]['memory'][begin:end] = 0 + + sparse_tensor = torch.zeros([1, self.num_coordinates + 1]) + sparse_tensor[0, 0:self.num_coordinates] = gradients[0, begin:end] + sparse_tensor[0, self.num_coordinates] = begin + + params_sparse_tensors.append(sparse_tensor) + + self.current_block += 1 + + return params_sparse_tensors + + def _random_sparsify(self, model, lr): + """ + Sparsify the gradients vector by selecting 'k' of them randomly. + param model: learning model + param lr: learning rate + return: sparsified gradients vector ('k' gradients and their indices) + """ + params_sparse_tensors = [] + + for ind, param in enumerate(model.parameters()): + + gradients = param.grad.data * lr[0] + self.state[param]['memory'] + self.state[param]['memory'] += param.grad.data * lr[0] + + indices = [] + sparse_tensor = torch.zeros([2, self.num_coordinates]) + + for i in range(self.num_coordinates): + indices.append(random.randint(self.num_coordinates)) + sparse_tensor[1, i] = gradients[0, i] + self.state[param]['memory'][i] = 0 + sparse_tensor[0, :] = torch.tensor(indices) + + params_sparse_tensors.append(sparse_tensor) + + self.current_block += 1 + return params_sparse_tensors + + def update_estimated_weights(self, model, iteration, sparse_vector_size): + """ Updates the estimated parameters """ + t = iteration + for ind, param in enumerate(model.parameters()): + tau = param.data.size()[1] / sparse_vector_size + rho = 6 * ((t + tau) ** 2) / ((1 + t) * (6 * (tau ** 2) + t + 6 * tau * t + 2 * (t ** 2))) + self.state[param]['estimated_w'] = self.state[param]['estimated_w'] * (1 - rho) + param.data * rho + + def get_estimated_weights(self, model): + estimated_params = [] + for param in model.parameters(): + estimated_params.append(self.state[param]['estimated_w']) + return estimated_params + + + + + From 30301c583137bc716870cd70411cfc03304431c2 Mon Sep 17 00:00:00 2001 From: Negar Foroutan Date: Mon, 22 Oct 2018 16:36:09 +0200 Subject: [PATCH 2/8] Add a new aggregate function to handle sparsifiedSGD --- .../refimpls/pytorch/utils/communication.py | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/mlbench/refimpls/pytorch/utils/communication.py b/mlbench/refimpls/pytorch/utils/communication.py index 8b53952..db2c243 100644 --- a/mlbench/refimpls/pytorch/utils/communication.py +++ b/mlbench/refimpls/pytorch/utils/communication.py @@ -16,10 +16,32 @@ def aggregate_gradients(model, world_size): param.grad.data /= world_size +def aggregate_sparsified_gradients(model, world_size, sparse_vector_size, random_sparse, optimizer, lr): + """Make the gradients vector sparse and average sparsified gradients of models across all processes.""" + + params_sparse_tensors = optimizer.sparsify_gradients(model, lr, random_sparse) + + for ind, param in enumerate(model.parameters()): + + gathered_list = [torch.zeros_like(params_sparse_tensors[ind]) for _ in range(world_size)] + # all gather. + dist.all_gather(gathered_list, params_sparse_tensors[ind]) + + avg_grads = torch.zeros_like(param.data) + + for grad_tensor in gathered_list: + begin = int(grad_tensor[0, sparse_vector_size]) + avg_grads[0, begin:begin + sparse_vector_size] += grad_tensor[0, 0:sparse_vector_size] + + avg_grads /= world_size + param.grad.data = avg_grads + + def global_average(sum, count): def helper(array): array = torch.FloatTensor(array) dist.all_reduce(array, op=dist.reduce_op.SUM) return array[0] / array[1] + avg = helper([sum, count]) return avg From c2522242e3f5fcf2944bc639d052314df1a6a9c8 Mon Sep 17 00:00:00 2001 From: Negar Foroutan Date: Mon, 22 Oct 2018 16:42:22 +0200 Subject: [PATCH 3/8] Add learning scheduler for both sgd and sparsifiedSGD --- mlbench/refimpls/pytorch/optim/lr.py | 95 ++++++++++++++++++++++------ 1 file changed, 77 insertions(+), 18 deletions(-) diff --git a/mlbench/refimpls/pytorch/optim/lr.py b/mlbench/refimpls/pytorch/optim/lr.py index 151c8b1..8afc91a 100644 --- a/mlbench/refimpls/pytorch/optim/lr.py +++ b/mlbench/refimpls/pytorch/optim/lr.py @@ -45,7 +45,7 @@ class SchedulerParser(argparse.ArgumentParser): def __init__(self, add_help=False, multisteplr_milestones=True, multisteplr_gamma=True, warmup=True, warmup_init_lr=True, warmup_linear_scaling=True, warmup_durations=True, clr_cycle_length=True, clr_base_lr=True, clr_max_lr=True, clr_mode=True, - clr_gamma=True, clr_extra=True): + clr_gamma=True, clr_extra=True, sgd_lr_alpha=True, sgd_lr_beta=True, sgd_lr_gamma=True): super(SchedulerParser, self).__init__(add_help=add_help) if multisteplr_milestones: @@ -60,8 +60,8 @@ def __init__(self, add_help=False, multisteplr_milestones=True, multisteplr_gamm if warmup: self.add_argument("--warmup", default=False, action="store_true", help="[default: %(default)s] linearly warmup learning rate before other scheduling." - "For the moment, only implemented for multistep learning rate with warmup." - "The warmup is used for training with more than one process.") + "For the moment, only implemented for multistep learning rate with warmup." + "The warmup is used for training with more than one process.") if warmup_init_lr: warmup_init_lr_group = self.add_mutually_exclusive_group() @@ -70,32 +70,32 @@ def __init__(self, add_help=False, multisteplr_milestones=True, multisteplr_gamm warmup_init_lr_group.add_argument("--warmup_init_lr_nonscale", action='store_true', default=False, help="[default: %(default)s] Use nonscaled lr for initial warmup lr" - "for training. If this flag is true, then ignore") + "for training. If this flag is true, then ignore") if warmup_linear_scaling: self.add_argument("--warmup_linear_scaling", action='store_true', default=False, help="[default: %(default)s] scale the learning rate by a factor after warmup." - "For linear scaling rule, this factor is the number of machines.") + "For linear scaling rule, this factor is the number of machines.") if warmup_durations: self.add_argument("--warmup_durations", type=parse_batch_epoch, default={'batch': 1}, metavar='', help="[default: % (default)s] duration for the warmup." - "The warumup should be a batch level.") + "The warumup should be a batch level.") if clr_cycle_length: self.add_argument("--clr_cycle_length", type=parse_batch_epoch, default='batch:2000', metavar='', help="[default: %(default)s] cycle length in a cyclical learning rates training." - "It can be `batch:int_batches` or `epoch:float_epochs`.") + "It can be `batch:int_batches` or `epoch:float_epochs`.") if clr_base_lr: self.add_argument("--clr_base_lr", type=float, default=0.001, metavar='', help="[default: %(default)s] minimum and initial learning rate in cyclical" - "learning rates training.") + "learning rates training.") if clr_max_lr: self.add_argument("--clr_max_lr", type=float, default=0.1, metavar='', help="[default: %(default)s] maximum learning rate in cyclical" - "learning rates training. Note this maximum value might not be reached " - "depending on the chosen scaling mode.") + "learning rates training. Note this maximum value might not be reached " + "depending on the chosen scaling mode.") if clr_mode: self.add_argument("--clr_mode", type=str, default='triangular', metavar='', @@ -104,12 +104,25 @@ def __init__(self, add_help=False, multisteplr_milestones=True, multisteplr_gamm if clr_gamma: self.add_argument("--clr_gamma", type=float, default=0.99, metavar='', help="[default: %(default)s] constant in 'exp_range' scaling function" - " in cyclical learning rate schedule.") + " in cyclical learning rate schedule.") if clr_extra: self.add_argument("--clr_extra", type=float, default=0.1, metavar='', help="[default: %(default)s] Extra number of iterations of training for one cycle.") + if sgd_lr_alpha: + self.add_argument("--alpha", type=float, default=100, + help="[default: %(default)s] Constant is used to calculate the optimal learning rate for" + "SGD ( alpha / beta + t).") + if sgd_lr_beta: + self.add_argument("--beta", type=float, default=100, + help="[default: %(default)s] Constant is used to calculate the optimal learning rate for" + "SGD ( alpha / beta + t).") + if sgd_lr_gamma: + self.add_argument("--sgd_lr_gamma", type=float, default=100, + help="[default: %(default)s] Constant is used to calculate the optimal learning rate for" + "sparsified SGD ( gamma / (a + t) * lambda).") + def const(optimizer): return LambdaLR(optimizer, lr_lambda=lambda x: 1.0) @@ -140,16 +153,16 @@ def triangular_learning_rates(optimizer, base_lr, max_lr, cycle_length, scale_fn def f(iterations): if iterations <= cycle_length: cycle = np.floor(1 + iterations / (2 * step_size)) - x = np.abs(iterations/step_size - 2 * cycle + 1) - lr = base_lr + (max_lr-base_lr) * np.maximum(0, (1-x)) * scale_fn(cycle, iterations) + x = np.abs(iterations / step_size - 2 * cycle + 1) + lr = base_lr + (max_lr - base_lr) * np.maximum(0, (1 - x)) * scale_fn(cycle, iterations) else: lr = base_lr * extra return lr / base_lr else: def f(iterations): cycle = np.floor(1 + iterations / (2 * step_size)) - x = np.abs(iterations/step_size - 2 * cycle + 1) - lr = base_lr + (max_lr-base_lr) * np.maximum(0, (1-x)) * scale_fn(cycle, iterations) + x = np.abs(iterations / step_size - 2 * cycle + 1) + lr = base_lr + (max_lr - base_lr) * np.maximum(0, (1 - x)) * scale_fn(cycle, iterations) return lr / base_lr # Use base_lr to overwrite the --lr @@ -175,11 +188,14 @@ def cyclical_learning_rates(options, optimizer): mode = options.clr_mode gamma = options.clr_gamma if mode in ['linear', 'triangular', 'one_cycle']: - def scale_fn(cycle, iterations): return 1. + def scale_fn(cycle, iterations): + return 1. elif mode == 'triangular2': - def scale_fn(cycle, iterations): return 1 / (2. ** (cycle - 1)) + def scale_fn(cycle, iterations): + return 1 / (2. ** (cycle - 1)) elif mode == 'exp_range': - def scale_fn(cycle, iterations): return gamma ** iterations + def scale_fn(cycle, iterations): + return gamma ** iterations else: raise ValueError("Cycle mode {} not support.".format(mode)) @@ -249,6 +265,45 @@ def f(durations): return LambdaLR(optimizer, lr_lambda=f) +def sgd_optimal_learning_rates(options, optimizer): + """ + Learning rate schedule for SGD (alpha / (t + beta)) + :param options: all configs + :param optimizer: optimizer associated with the scheduler + """ + beta = options.beta + alpha = options.alpha + + def f(iterations): + return beta / (beta + iterations) + + for group in optimizer.param_groups: + group['initial_lr'] = alpha / beta + + optimizer.base_lrs = [alpha / beta for _ in optimizer.param_groups] + return LambdaLR(optimizer, lr_lambda=f) + + +def sparsified_sgd_optimal_learning_rate(options, optimizer): + """ + Learning rate schedule for sparsifiedSGD (gamma / lambda * (t + a)) + param options: all configs + param optimizer: optimizer associated with the scheduler + """ + #TODO get feature from config file + a = 2000 / options.sparse_grad_size + l2_coef = options.l2_coef + gamma = options.sgd_lr_gamma + + def f(iterations): + return 1 / max(1, (a + iterations)) + + for group in optimizer.param_groups: + group['initial_lr'] = gamma / l2_coef + + return LambdaLR(optimizer, lr_lambda=f) + + def get_scheduler(options, optimizer): if options.lr_scheduler == 'const': return const(optimizer) @@ -256,6 +311,10 @@ def get_scheduler(options, optimizer): return cyclical_learning_rates(options, optimizer) elif options.lr_scheduler == 'MultiStepLRW': return multistep_learning_rates_with_warmup(options, optimizer) + elif options.lr_scheduler == 'sgd_optimal': + return sgd_optimal_learning_rates(options, optimizer) + elif options.lr_scheduler == 'sparsified_sgd': + return sparsified_sgd_optimal_learning_rate(options, optimizer) else: raise NotImplementedError From 2efaca4ce044b351dfdcd2e06a541dd885e72715 Mon Sep 17 00:00:00 2001 From: Negar Foroutan Date: Mon, 22 Oct 2018 16:47:12 +0200 Subject: [PATCH 4/8] Modifications to support sparsifiedSGD --- .../pytorch/controlflow/controlflow.py | 60 ++++++++++++++++- .../pytorch/datasets/load_libsvm_dataset.py | 4 +- .../refimpls/pytorch/models/linear_models.py | 1 + mlbench/refimpls/pytorch/utils/criterions.py | 4 +- mlbench/refimpls/pytorch/utils/parser.py | 66 ++++++++++++------- 5 files changed, 104 insertions(+), 31 deletions(-) diff --git a/mlbench/refimpls/pytorch/controlflow/controlflow.py b/mlbench/refimpls/pytorch/controlflow/controlflow.py index ad56280..b748779 100644 --- a/mlbench/refimpls/pytorch/controlflow/controlflow.py +++ b/mlbench/refimpls/pytorch/controlflow/controlflow.py @@ -1,11 +1,12 @@ import torch import torch.distributed as dist +import numpy as np from utils import checkpoint from utils import log from utils.metrics import AverageMeter from utils.helper import Timeit, maybe_range, update_best_runtime_metric -from utils.communication import aggregate_gradients, global_average +from utils.communication import aggregate_gradients, global_average, aggregate_sparsified_gradients from utils.utils import convert_dtype from datasets import create_dataset @@ -32,9 +33,28 @@ def train_epoch(model, optimizer, criterion, scheduler, options, timeit): output = model(data) loss = criterion(output, target) loss.backward() - aggregate_gradients(model, options.world_size) + + if options.opt_name == 'sparsified_sgd': + aggregate_sparsified_gradients(model, options.world_size, + options.sparse_grad_size, + options.random_sparse, + optimizer, + scheduler.get_lr()) + else: + aggregate_gradients(model, options.world_size) + optimizer.step() + if options.model_name == 'logistic_regression' and options.train_validate: + t = options.runtime['current_epoch'] * options.train_num_samples_per_device + batch_idx * options.batch_size + optimizer.update_estimated_weights(model, t, options.sparse_grad_size) + + if t % options.compute_loss_every == 0: + print("Train validation....") + timeit.pause() + train_validate(optimizer, model, options) + timeit.resume() + with torch.no_grad(): loss = loss.item() loss = global_average(loss, 1).item() @@ -46,6 +66,40 @@ def train_epoch(model, optimizer, criterion, scheduler, options, timeit): timeit.resume() +def train_validate(optimizer, model, options): + """ Validation on train data by using estimated weights """ + estimated_weights = optimizer.get_estimated_weights(model) + num_samples = 0 + l1 = options.l1_coef + l2 = options.l2_coef + + loss = 0 + + for batch_idx, (data, target) in zip(maybe_range(options.max_batch_per_epoch), + options.train_loader): + data = convert_dtype(options.dtype, data) + if options.force_target_dtype: + target = convert_dtype(options.dtype, target) + + if options.use_cuda: + data, target = data.cuda(), target.cuda() + + for weight in estimated_weights: + loss += np.log(1 + np.exp(-target * (data @ weight.transpose(0, 1)))).sum()[0].item() + + l2_loss = sum(weight.norm(2) ** 2 for weight in estimated_weights)[0].item() + loss += l2 / 2 * l2_loss + l1_loss = sum(weight.norm(1) for weight in estimated_weights)[0].item() + loss += l1 * l1_loss + num_samples += data.size()[0] + + train_loss = global_average(loss, num_samples) + print("Global Train Loss: " + str(train_loss[0].item())) + + with open(options.ckpt_run_dir + "/" + str(dist.get_rank()) + "_train_validation.txt", "a+") as file: + file.write(str(train_loss[0].item()) + "\n") + + def validate(model, optimizer, criterion, metrics, options): model.eval() @@ -123,7 +177,7 @@ def __call__(self, model, optimizer, criterion, metrics, scheduler, options): options.batch_size), 0) # train the model and evaluate the model per args.eval_freq - max_epochs = min(options.train_epochs, options.max_train_steps)\ + max_epochs = min(options.train_epochs, options.max_train_steps) \ if options.max_train_steps else options.train_epochs start_epoch = options.runtime['current_epoch'] if options.resume else 0 options.runtime['records'] = options.runtime.get('records', []) diff --git a/mlbench/refimpls/pytorch/datasets/load_libsvm_dataset.py b/mlbench/refimpls/pytorch/datasets/load_libsvm_dataset.py index 168ddff..f8637de 100644 --- a/mlbench/refimpls/pytorch/datasets/load_libsvm_dataset.py +++ b/mlbench/refimpls/pytorch/datasets/load_libsvm_dataset.py @@ -76,6 +76,7 @@ def f(x): for ind, (from_index, to_index) in from_to_indices: if from_index <= x and x < to_index: return ind, x - from_index + return f def _get_matched_index(self, index): @@ -200,7 +201,8 @@ def get_dataset_info(name): def load_libsvm_lmdb(name, lmdb_path): stats = get_dataset_info(name) dataset = IMDBPT(lmdb_path, transform=maybe_transform_sparse(stats), - target_transform=lambda x: torch.Tensor(x), is_image=False) + # target_transform=lambda x: torch.Tensor(x), is_image=False) + target_transform=None, is_image=False) return dataset diff --git a/mlbench/refimpls/pytorch/models/linear_models.py b/mlbench/refimpls/pytorch/models/linear_models.py index 23f5103..dea9fe6 100644 --- a/mlbench/refimpls/pytorch/models/linear_models.py +++ b/mlbench/refimpls/pytorch/models/linear_models.py @@ -2,6 +2,7 @@ import math import torch.nn.functional as F from torch.nn.parameter import Parameter +import numpy as np class LogisticRegression(torch.nn.Module): diff --git a/mlbench/refimpls/pytorch/utils/criterions.py b/mlbench/refimpls/pytorch/utils/criterions.py index d8b8267..1bbd82c 100644 --- a/mlbench/refimpls/pytorch/utils/criterions.py +++ b/mlbench/refimpls/pytorch/utils/criterions.py @@ -17,7 +17,7 @@ def __init__(self, weight=None, size_average=None, reduce=None, l1=0.0, l2=0.0, def forward(self, input, target): output = F.binary_cross_entropy(input, target, weight=self.weight, reduction=self.reduction) - l2_loss = sum(param.norm(2)**2 for param in self.model.parameters()) + l2_loss = sum(param.norm(2) ** 2 for param in self.model.parameters()) output += self.l2 / 2 * l2_loss l1_loss = sum(param.norm(1) for param in self.model.parameters()) output += self.l1 * l1_loss @@ -38,7 +38,7 @@ def __init__(self, weight=None, size_average=None, reduce=None, l1=0.0, l2=0.0, def forward(self, input, target): output = F.mse_loss(input, target, reduction=self.reduction) - l2_loss = sum(param.norm(2)**2 for param in self.model.parameters()) + l2_loss = sum(param.norm(2) ** 2 for param in self.model.parameters()) output += self.l2 / 2 * l2_loss l1_loss = sum(param.norm(1) for param in self.model.parameters()) output += self.l1 * l1_loss diff --git a/mlbench/refimpls/pytorch/utils/parser.py b/mlbench/refimpls/pytorch/utils/parser.py index 2b18dc0..a419218 100644 --- a/mlbench/refimpls/pytorch/utils/parser.py +++ b/mlbench/refimpls/pytorch/utils/parser.py @@ -49,7 +49,7 @@ def __init__(self, add_help=True, use_cuda=True, communication_backend=True, run if cudnn_deterministic: self.add_argument('--cudnn_deterministic', action='store_true', default=False, help="[default: %(default)s] enable deterministic cudnn training." - "WARNING: it may slows down training.") + "WARNING: it may slows down training.") if checkpoint_root: self.add_argument('--checkpoint_root', type=str, default='/checkpoint', metavar='', @@ -77,17 +77,17 @@ def __init__(self, add_help=True, num_parallel_workers=True, use_synthetic_data= if num_parallel_workers: self.add_argument("--num_parallel_workers", "-npw", type=int, default=4, help="[default: %(default)s] The number of records that are " - "processed in parallel during input processing. This can be " - "optimized per data set but for generally homogeneous data " - "sets, should be approximately the number of available CPU " - "cores.", + "processed in parallel during input processing. This can be " + "optimized per data set but for generally homogeneous data " + "sets, should be approximately the number of available CPU " + "cores.", metavar="") if use_synthetic_data: self.add_argument("--use_synthetic_data", action="store_true", default=False, help="[default: %(default)s] If set, use fake data (zeroes) instead of a real dataset. " - "This mode is useful for performance debugging, as it removes " - "input processing steps, but will not learn anything.") + "This mode is useful for performance debugging, as it removes " + "input processing steps, but will not learn anything.") if max_train_steps: self.add_argument("--max_train_steps", type=int, default=None, metavar="", @@ -101,8 +101,8 @@ def __init__(self, add_help=True, num_parallel_workers=True, use_synthetic_data= if dtype: self.add_argument("--dtype", type=str, default="fp32", choices=list(DTYPE_MAP.keys()), help="[default: %(default)s] {%(choices)s} The PyTorch datatype " - "used for calculations. Variables may be cast to a higher" - "precision on a case-by-case basis for numerical stability.", + "used for calculations. Variables may be cast to a higher" + "precision on a case-by-case basis for numerical stability.", metavar="
") if force_target_dtype: self.add_argument("--force_target_dtype", action='store_true', default=False, @@ -115,7 +115,7 @@ def __init__(self, add_help=True, num_parallel_workers=True, use_synthetic_data= if dont_post_to_dashboard: self.add_argument('--dont_post_to_dashboard', action='store_true', default=False, help='[default: %(default)s] decide whether or not post metrics to dashboard ' - 'if there is one. This can be usefule when lauching the mpi jobs from worker.') + 'if there is one. This can be usefule when lauching the mpi jobs from worker.') class DatasetParser(argparse.ArgumentParser): @@ -135,7 +135,7 @@ def __init__(self, add_help=True, batch_size=True, val_batch_size=True, root_dat if root_data_dir: self.add_argument("--root_data_dir", type=str, default="/datasets/torch", metavar="
", help="[default: %(default)s] root directory to all datasets." - "If the given dataset name, its directory is root_data_dir/dataset_name.") + "If the given dataset name, its directory is root_data_dir/dataset_name.") if name: self.add_argument("--dataset_name", type=str, default='mnist', metavar="", @@ -152,12 +152,12 @@ def __init__(self, add_help=True, batch_size=True, val_batch_size=True, root_dat if repartition_per_epoch: self.add_argument("--repartition_per_epoch", action='store_true', default=True, help="[default: %(default)s] repartition datasets among workers at the end of each" - "epoch.") + "epoch.") if shuffle_partition_indices: self.add_argument("--shuffle_partition_indices", action='store_true', default=True, help="[default: %(default)s] reshuffle indices of datasets for the partitions." - "If False, repartition_per_epoch gives same partition.") + "If False, repartition_per_epoch gives same partition.") if libsvm_dataset: self.add_argument("--libsvm_dataset", action='store_true', default=False, @@ -170,7 +170,7 @@ def __init__(self, add_help=True, batch_size=True, val_batch_size=True, root_dat if lmdb: self.add_argument("--lmdb", action='store_true', default=False, help="[default: %(default)s] The dataset is already in lmdb database. " - "root_data_dir is the lmdb database.") + "root_data_dir is the lmdb database.") class ModelParser(argparse.ArgumentParser): @@ -178,7 +178,8 @@ class ModelParser(argparse.ArgumentParser): def __init__(self, add_help=True, lr=True, lr_per_sample=True, momentum=True, criterion=True, nesterov=True, weight_decay=True, opt_name=True, model_name=True, model_version=True, - lr_scheduler=True, lr_scheduler_level=True, metrics=True): + lr_scheduler=True, lr_scheduler_level=True, metrics=True, sparsified_sgd=True, random_sparse=True, + param_estimate=True, train_validation=True): super(ModelParser, self).__init__(add_help=add_help) lr_group = self.add_mutually_exclusive_group() @@ -196,17 +197,17 @@ def __init__(self, add_help=True, lr=True, lr_per_sample=True, momentum=True, cr help="[default: %(default)s] version of model.") if lr: - lr_group.add_argument("--lr", type=float, default=0.1, metavar='', + lr_group.add_argument("--lr", type=float, default=None, metavar='', help="[default: %(default)s] initial learning rate for the optimizer." - "This learning rate is mutual exclusive with lr_per_sample." - "Note that if warmup is applied, then this lr means the lr after warmup.") + "This learning rate is mutual exclusive with lr_per_sample." + "Note that if warmup is applied, then this lr means the lr after warmup.") if lr_per_sample: lr_group.add_argument("--lr_per_sample", type=float, default=None, metavar='', help="[default: %(default)s] initial learning rate per sample for the optimizer." - "This learning rate is mutual exclusive with --lr." - "Note that lr_per_sample is similar to --lr in usage except for the batch size." - "The batch size here refers to --minibatch, not (--minibatch * machines).") + "This learning rate is mutual exclusive with --lr." + "Note that lr_per_sample is similar to --lr in usage except for the batch size." + "The batch size here refers to --minibatch, not (--minibatch * machines).") if lr_scheduler: self.add_argument("--lr_scheduler", type=str, default='const', metavar='', @@ -224,7 +225,7 @@ def __init__(self, add_help=True, lr=True, lr_per_sample=True, momentum=True, cr if criterion: self.add_argument("--criterion", type=str, default='CrossEntropyLoss', metavar='', help="[default: %(default)s] name of training loss function." - "Support loss functions from `torch.nn.modules.loss`.") + "Support loss functions from `torch.nn.modules.loss`.") if metrics: self.add_argument("--metrics", type=str, default='topk', metavar='', @@ -237,6 +238,21 @@ def __init__(self, add_help=True, lr=True, lr_per_sample=True, momentum=True, cr if weight_decay: self.add_argument("--weight_decay", type=float, default=5e-4, metavar='', help="[default: %(default)s] weight decay of the optimizer.") + if sparsified_sgd: + self.add_argument("--sparse_grad_size", type=int, default=10, + help="[default: %(default)s] size of the sparse gradients vector.") + if random_sparse: + self.add_argument("--random_sparse", action='store_true', default=False, + help="[default: %(default)s] Boolean flag to indicates the gradients sparsified method" + "(random or blockwise).") + if param_estimate: + self.add_argument("--param_estimate", type=str, default='final', + help="[default: %(default)s] parameter estimation method (final or weighted average).") + if train_validation: + self.add_argument("--train_validate", action='store_true', default=True, + help="[default: %(default)s] to have validation on train dataset.") + self.add_argument("--compute_loss_every", type=int, default=50000, + help="[default: %(default)s] determine when the train loss will be calculated.") class ControlflowParser(argparse.ArgumentParser): @@ -250,7 +266,7 @@ def __init__(self, add_help=True, train_epochs=True, epochs_between_evals=True, if epochs_between_evals: self.add_argument("--epochs_between_evals", type=int, default=1, metavar="", help="[default: %(default)s] The number of training epochs to run " - "between evaluations.") + "between evaluations.") if validation: parser = self.add_mutually_exclusive_group() parser.add_argument('--validation', dest='validation', action='store_true', @@ -266,12 +282,12 @@ def __init__(self, add_help=True, l1_coef=True, l2_coef=True): if l1_coef: self.add_argument("--l1_coef", type=float, default=0.0, metavar='', help="[default: %(default)s] Coefficient of L1 regularizer. This coefficient is used" - "in the criterion.") + "in the criterion.") if l2_coef: self.add_argument("--l2_coef", type=float, default=0.0, metavar='', help="[default: %(default)s] Coefficient of L1 regularizer. This coefficient is used" - "in the criterion. Note that weight_decay should be 0 to avoid duplication.") + "in the criterion. Note that weight_decay should be 0 to avoid duplication.") class MainParser(argparse.ArgumentParser): From fce28a9961731dd76596b73c6bf7f74a163be318 Mon Sep 17 00:00:00 2001 From: Negar Foroutan Date: Tue, 23 Oct 2018 18:02:31 +0200 Subject: [PATCH 5/8] change aggregate function to support random sparsification. --- .../refimpls/pytorch/utils/communication.py | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/mlbench/refimpls/pytorch/utils/communication.py b/mlbench/refimpls/pytorch/utils/communication.py index db2c243..9e35018 100644 --- a/mlbench/refimpls/pytorch/utils/communication.py +++ b/mlbench/refimpls/pytorch/utils/communication.py @@ -1,5 +1,6 @@ import torch import torch.distributed as dist +import timeit def elementwise_min(tensor): @@ -12,7 +13,12 @@ def aggregate_gradients(model, world_size): # all_reduce the gradients. for ind, param in enumerate(model.parameters()): # all reduce. + # start = timeit.default_timer() dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM) + # end = timeit.default_timer() - start + # with open(str(dist.get_rank()) + "_communication_time.txt", "a+") as file: + # file.write(str(end) + "\n") + param.grad.data /= world_size @@ -25,13 +31,22 @@ def aggregate_sparsified_gradients(model, world_size, sparse_vector_size, random gathered_list = [torch.zeros_like(params_sparse_tensors[ind]) for _ in range(world_size)] # all gather. + # start = timeit.default_timer() dist.all_gather(gathered_list, params_sparse_tensors[ind]) + # end = timeit.default_timer() - start + # with open(str(dist.get_rank()) + "_communication_time.txt", "a+") as file: + # file.write(str(end) + "\n") avg_grads = torch.zeros_like(param.data) - for grad_tensor in gathered_list: - begin = int(grad_tensor[0, sparse_vector_size]) - avg_grads[0, begin:begin + sparse_vector_size] += grad_tensor[0, 0:sparse_vector_size] + if random_sparse: + for grad_tensor in gathered_list: + for index in range(grad_tensor.size()[1]): + avg_grads[0, int(grad_tensor[0, index])] += grad_tensor[1, index] + else: + for grad_tensor in gathered_list: + begin = int(grad_tensor[0, sparse_vector_size]) + avg_grads[0, begin:begin + sparse_vector_size] += grad_tensor[0, 0:sparse_vector_size] avg_grads /= world_size param.grad.data = avg_grads From c399a29867f9cbffbf7bb8e59f8686fc66afeb0b Mon Sep 17 00:00:00 2001 From: Negar Foroutan Date: Tue, 23 Oct 2018 18:04:02 +0200 Subject: [PATCH 6/8] fix a problem in _random_sparsify function --- mlbench/refimpls/pytorch/optim/optimizer.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/mlbench/refimpls/pytorch/optim/optimizer.py b/mlbench/refimpls/pytorch/optim/optimizer.py index b629c80..64f0769 100644 --- a/mlbench/refimpls/pytorch/optim/optimizer.py +++ b/mlbench/refimpls/pytorch/optim/optimizer.py @@ -38,6 +38,7 @@ def get_optimizer(options, model): return optimizer + class sparsified_SGD(Optimizer): r"""Implements sparsified version of stochastic gradient descent. @@ -67,7 +68,6 @@ def __init__(self, params, lr=required, weight_decay=0, sparse_grad_size=10): self.num_coordinates = sparse_grad_size self.current_block = -1 - def __setstate__(self, state): super(sparsified_SGD, self).__setstate__(state) @@ -77,6 +77,8 @@ def __create_weighted_average_params(self): for p in group['params']: param_state = self.state[p] param_state['estimated_w'] = torch.zeros_like(p.data) + p.data.normal_(0, 0.01) + param_state['estimated_w'].normal_(0, 0.01) def __create_gradients_memory(self): """ Create a memory for parameters. """ @@ -143,7 +145,7 @@ def _block_sparsify(self, model, lr): begin = self.current_block * self.num_coordinates end = begin + self.num_coordinates - #TODO do something for last block! + # TODO do something for last block! # if self.current_block == (num_blocks - 1): # end = param_size # else: @@ -179,14 +181,14 @@ def _random_sparsify(self, model, lr): sparse_tensor = torch.zeros([2, self.num_coordinates]) for i in range(self.num_coordinates): - indices.append(random.randint(self.num_coordinates)) - sparse_tensor[1, i] = gradients[0, i] + random_index = random.randint(0, self.num_coordinates) + indices.append(random_index) + sparse_tensor[1, i] = gradients[0, random_index] self.state[param]['memory'][i] = 0 sparse_tensor[0, :] = torch.tensor(indices) params_sparse_tensors.append(sparse_tensor) - self.current_block += 1 return params_sparse_tensors def update_estimated_weights(self, model, iteration, sparse_vector_size): @@ -202,8 +204,3 @@ def get_estimated_weights(self, model): for param in model.parameters(): estimated_params.append(self.state[param]['estimated_w']) return estimated_params - - - - - From d3fb819303102344e1609ea1b8e13a0069b2d3a3 Mon Sep 17 00:00:00 2001 From: Negar Foroutan Date: Tue, 23 Oct 2018 18:05:40 +0200 Subject: [PATCH 7/8] minor changes --- .../pytorch/controlflow/controlflow.py | 27 ++++++++++++------- mlbench/refimpls/pytorch/utils/parser.py | 2 +- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/mlbench/refimpls/pytorch/controlflow/controlflow.py b/mlbench/refimpls/pytorch/controlflow/controlflow.py index b748779..105de9f 100644 --- a/mlbench/refimpls/pytorch/controlflow/controlflow.py +++ b/mlbench/refimpls/pytorch/controlflow/controlflow.py @@ -1,6 +1,7 @@ import torch import torch.distributed as dist import numpy as np +import torch.nn.functional as F from utils import checkpoint from utils import log @@ -54,6 +55,7 @@ def train_epoch(model, optimizer, criterion, scheduler, options, timeit): timeit.pause() train_validate(optimizer, model, options) timeit.resume() + print("Train validation is done!") with torch.no_grad(): loss = loss.item() @@ -76,7 +78,7 @@ def train_validate(optimizer, model, options): loss = 0 for batch_idx, (data, target) in zip(maybe_range(options.max_batch_per_epoch), - options.train_loader): + options.val_loader): data = convert_dtype(options.dtype, data) if options.force_target_dtype: target = convert_dtype(options.dtype, target) @@ -84,20 +86,27 @@ def train_validate(optimizer, model, options): if options.use_cuda: data, target = data.cuda(), target.cuda() + # target = target * 2 - 1 + for weight in estimated_weights: - loss += np.log(1 + np.exp(-target * (data @ weight.transpose(0, 1)))).sum()[0].item() + w = weight.squeeze() + loss += (np.log(1 + np.exp(data @ w)) - target * (data @ w)).sum() + # loss += np.log(1 + np.exp((data @ weight.transpose(0, 1)))) - target.transpose(1, 0) * data @ weight.transpose(0, 1) + # loss += np.log(1 + np.exp(-target * (data @ weight.transpose(0, 1)))).sum()[0].item() - l2_loss = sum(weight.norm(2) ** 2 for weight in estimated_weights)[0].item() - loss += l2 / 2 * l2_loss - l1_loss = sum(weight.norm(1) for weight in estimated_weights)[0].item() - loss += l1 * l1_loss num_samples += data.size()[0] - train_loss = global_average(loss, num_samples) - print("Global Train Loss: " + str(train_loss[0].item())) + train_loss = global_average(loss, num_samples).item() + + l2_loss = sum(weight.norm(2) ** 2 for weight in estimated_weights)[0].item() + train_loss += l2 / 2 * l2_loss + l1_loss = sum(weight.norm(1) for weight in estimated_weights)[0].item() + train_loss += l1 * l1_loss + + print("Global Train Loss: " + str(train_loss)) with open(options.ckpt_run_dir + "/" + str(dist.get_rank()) + "_train_validation.txt", "a+") as file: - file.write(str(train_loss[0].item()) + "\n") + file.write(str(train_loss) + "\n") def validate(model, optimizer, criterion, metrics, options): diff --git a/mlbench/refimpls/pytorch/utils/parser.py b/mlbench/refimpls/pytorch/utils/parser.py index a419218..d22b933 100644 --- a/mlbench/refimpls/pytorch/utils/parser.py +++ b/mlbench/refimpls/pytorch/utils/parser.py @@ -249,7 +249,7 @@ def __init__(self, add_help=True, lr=True, lr_per_sample=True, momentum=True, cr self.add_argument("--param_estimate", type=str, default='final', help="[default: %(default)s] parameter estimation method (final or weighted average).") if train_validation: - self.add_argument("--train_validate", action='store_true', default=True, + self.add_argument("--train_validate", action='store_true', default=False, help="[default: %(default)s] to have validation on train dataset.") self.add_argument("--compute_loss_every", type=int, default=50000, help="[default: %(default)s] determine when the train loss will be calculated.") From 839039cd8bd01ae99fbd596cd9071cd8b707daa8 Mon Sep 17 00:00:00 2001 From: Negar Foroutan Date: Thu, 8 Nov 2018 11:28:29 +0100 Subject: [PATCH 8/8] minor modifications and cleaning the code --- .../pytorch/controlflow/controlflow.py | 18 ++-- .../pytorch/datasets/partition_data.py | 3 +- mlbench/refimpls/pytorch/optim/lr.py | 3 +- mlbench/refimpls/pytorch/optim/optimizer.py | 87 ++++++++----------- .../refimpls/pytorch/utils/communication.py | 18 ++-- 5 files changed, 59 insertions(+), 70 deletions(-) diff --git a/mlbench/refimpls/pytorch/controlflow/controlflow.py b/mlbench/refimpls/pytorch/controlflow/controlflow.py index 105de9f..5131cb6 100644 --- a/mlbench/refimpls/pytorch/controlflow/controlflow.py +++ b/mlbench/refimpls/pytorch/controlflow/controlflow.py @@ -24,6 +24,7 @@ def train_epoch(model, optimizer, criterion, scheduler, options, timeit): scheduler.step() data = convert_dtype(options.dtype, data) + if options.force_target_dtype: target = convert_dtype(options.dtype, target) @@ -55,8 +56,6 @@ def train_epoch(model, optimizer, criterion, scheduler, options, timeit): timeit.pause() train_validate(optimizer, model, options) timeit.resume() - print("Train validation is done!") - with torch.no_grad(): loss = loss.item() loss = global_average(loss, 1).item() @@ -69,7 +68,7 @@ def train_epoch(model, optimizer, criterion, scheduler, options, timeit): def train_validate(optimizer, model, options): - """ Validation on train data by using estimated weights """ + """ Validation on train data by using weighted average of parameters """ estimated_weights = optimizer.get_estimated_weights(model) num_samples = 0 l1 = options.l1_coef @@ -85,22 +84,20 @@ def train_validate(optimizer, model, options): if options.use_cuda: data, target = data.cuda(), target.cuda() - - # target = target * 2 - 1 + target = target * 2 - 1 for weight in estimated_weights: w = weight.squeeze() - loss += (np.log(1 + np.exp(data @ w)) - target * (data @ w)).sum() - # loss += np.log(1 + np.exp((data @ weight.transpose(0, 1)))) - target.transpose(1, 0) * data @ weight.transpose(0, 1) - # loss += np.log(1 + np.exp(-target * (data @ weight.transpose(0, 1)))).sum()[0].item() + batch_loss = np.log(1 + np.exp(-target * (data @ w))) + loss += batch_loss.sum().item() num_samples += data.size()[0] train_loss = global_average(loss, num_samples).item() - l2_loss = sum(weight.norm(2) ** 2 for weight in estimated_weights)[0].item() + l2_loss = sum(weight.norm(2) ** 2 for weight in estimated_weights).item() train_loss += l2 / 2 * l2_loss - l1_loss = sum(weight.norm(1) for weight in estimated_weights)[0].item() + l1_loss = sum(weight.norm(1) for weight in estimated_weights).item() train_loss += l1 * l1_loss print("Global Train Loss: " + str(train_loss)) @@ -197,6 +194,7 @@ def __call__(self, model, optimizer, criterion, metrics, scheduler, options): timeit = Timeit(0 if len(options.runtime['cumu_time_val']) == 0 else options.runtime['cumu_time_val'][-1]) + for epoch in range(start_epoch, max_epochs): options.runtime['current_epoch'] = epoch diff --git a/mlbench/refimpls/pytorch/datasets/partition_data.py b/mlbench/refimpls/pytorch/datasets/partition_data.py index afb9213..6fb0ed2 100644 --- a/mlbench/refimpls/pytorch/datasets/partition_data.py +++ b/mlbench/refimpls/pytorch/datasets/partition_data.py @@ -39,7 +39,8 @@ class DataPartitioner(Partitioner): def __init__(self, data, rank, shuffle, sizes=[0.7, 0.2, 0.1]): # prepare info. self.data = data - self.data_size = len(self.data) + # Drop a few of the last samples to make the dataset size divisible by the the number of workers + self.data_size = int(len(self.data)/len(sizes)) * len(sizes) self.partitions = [] # get shuffled/unshuffled data. diff --git a/mlbench/refimpls/pytorch/optim/lr.py b/mlbench/refimpls/pytorch/optim/lr.py index 8afc91a..e7c835b 100644 --- a/mlbench/refimpls/pytorch/optim/lr.py +++ b/mlbench/refimpls/pytorch/optim/lr.py @@ -290,7 +290,7 @@ def sparsified_sgd_optimal_learning_rate(options, optimizer): param options: all configs param optimizer: optimizer associated with the scheduler """ - #TODO get feature from config file + # TODO get feature size from the config file a = 2000 / options.sparse_grad_size l2_coef = options.l2_coef gamma = options.sgd_lr_gamma @@ -298,6 +298,7 @@ def sparsified_sgd_optimal_learning_rate(options, optimizer): def f(iterations): return 1 / max(1, (a + iterations)) + optimizer.base_lrs = [gamma / l2_coef for _ in optimizer.param_groups] for group in optimizer.param_groups: group['initial_lr'] = gamma / l2_coef diff --git a/mlbench/refimpls/pytorch/optim/optimizer.py b/mlbench/refimpls/pytorch/optim/optimizer.py index 64f0769..d2371d5 100644 --- a/mlbench/refimpls/pytorch/optim/optimizer.py +++ b/mlbench/refimpls/pytorch/optim/optimizer.py @@ -1,7 +1,7 @@ import torch import torch.optim as optim from torch.optim.optimizer import Optimizer, required -import random +import numpy as np def get_optimizer(options, model): @@ -18,7 +18,6 @@ def get_optimizer(options, model): :rtype: optimizer :raises: NotImplementedError """ - # lr = options.lr if hasattr(options, 'lr') else options.lr_per_sample * options.batch_size lr = options.lr if options.lr else options.lr_per_sample * options.batch_size if options.opt_name == 'sgd': @@ -66,22 +65,21 @@ def __init__(self, params, lr=required, weight_decay=0, sparse_grad_size=10): self.__create_weighted_average_params() self.num_coordinates = sparse_grad_size - self.current_block = -1 def __setstate__(self, state): super(sparsified_SGD, self).__setstate__(state) def __create_weighted_average_params(self): - + """ Create a memory to keep the weighted average of parameters in each iteration """ for group in self.param_groups: for p in group['params']: param_state = self.state[p] param_state['estimated_w'] = torch.zeros_like(p.data) p.data.normal_(0, 0.01) - param_state['estimated_w'].normal_(0, 0.01) + param_state['estimated_w'].copy_(p.data) def __create_gradients_memory(self): - """ Create a memory for parameters. """ + """ Create a memory to keep gradients that are not used in each iteration """ for group in self.param_groups: for p in group['params']: param_state = self.state[p] @@ -115,80 +113,66 @@ def step(self, closure=None): return loss def sparsify_gradients(self, model, lr, random_sparse): - + """ Calls one of the sparsification functions (random or blockwise)""" if random_sparse: return self._random_sparsify(model, lr) else: return self._block_sparsify(model, lr) - def _block_sparsify(self, model, lr): + def _random_sparsify(self, model, lr): """ - Sparsify the gradients vector by choosing a block of of them - :param model: learning model - :param lr: learning rate - :return: sparsified gradients vector (a block of gradients and the beginning index of the block) + Sparsify the gradients vector by selecting 'k' of them randomly. + param model: learning model + param lr: learning rate + return: sparsified gradients vector ('k' gradients and their indices) """ params_sparse_tensors = [] for ind, param in enumerate(model.parameters()): - - param_size = param.data.size()[1] - gradients = param.grad.data * lr[0] + self.state[param]['memory'] self.state[param]['memory'] += param.grad.data * lr[0] - num_blocks = int(param_size / self.num_coordinates) - - if self.current_block == -1: - self.current_block = random.randint(0, num_blocks - 1) - elif self.current_block == num_blocks: - self.current_block = 0 - - begin = self.current_block * self.num_coordinates - end = begin + self.num_coordinates - # TODO do something for last block! - # if self.current_block == (num_blocks - 1): - # end = param_size - # else: - # end = begin + self.num_coordinates + indices = np.random.choice(param.data.size()[1], self.num_coordinates, replace=False) + sparse_tensor = torch.zeros(2, self.num_coordinates) - self.state[param]['memory'][begin:end] = 0 - - sparse_tensor = torch.zeros([1, self.num_coordinates + 1]) - sparse_tensor[0, 0:self.num_coordinates] = gradients[0, begin:end] - sparse_tensor[0, self.num_coordinates] = begin + for i, random_index in enumerate(indices): + sparse_tensor[1, i] = self.state[param]['memory'][0, random_index] + self.state[param]['memory'][0, random_index] = 0 + sparse_tensor[0, :] = torch.tensor(indices) params_sparse_tensors.append(sparse_tensor) - self.current_block += 1 - return params_sparse_tensors - def _random_sparsify(self, model, lr): + def _block_sparsify(self, model, lr): """ - Sparsify the gradients vector by selecting 'k' of them randomly. + Sparsify the gradients vector by selecting a block of them. param model: learning model param lr: learning rate - return: sparsified gradients vector ('k' gradients and their indices) + return: sparsified gradients vector """ params_sparse_tensors = [] - for ind, param in enumerate(model.parameters()): - - gradients = param.grad.data * lr[0] + self.state[param]['memory'] + for param in model.parameters(): self.state[param]['memory'] += param.grad.data * lr[0] - indices = [] - sparse_tensor = torch.zeros([2, self.num_coordinates]) + num_block = int(param.data.size()[1] / self.num_coordinates) - for i in range(self.num_coordinates): - random_index = random.randint(0, self.num_coordinates) - indices.append(random_index) - sparse_tensor[1, i] = gradients[0, random_index] - self.state[param]['memory'][i] = 0 - sparse_tensor[0, :] = torch.tensor(indices) + self.current_block = np.random.randint(0, num_block) + begin_index = self.current_block * self.num_coordinates - params_sparse_tensors.append(sparse_tensor) + if self.current_block == (num_block - 1): + end_index = param.data.size()[1] - 1 + else: + end_index = begin_index + self.num_coordinates - 1 + output_size = 1 + end_index - begin_index + 1 + sparse_tensor = torch.zeros(1, output_size) + sparse_tensor[0, 0] = begin_index + sparse_tensor[0, 1:] = self.state[param]['memory'][0, begin_index: end_index + 1] + self.state[param]['memory'][0, begin_index: end_index + 1] = 0 + + self.current_block += 1 + params_sparse_tensors.append(sparse_tensor) return params_sparse_tensors def update_estimated_weights(self, model, iteration, sparse_vector_size): @@ -200,6 +184,7 @@ def update_estimated_weights(self, model, iteration, sparse_vector_size): self.state[param]['estimated_w'] = self.state[param]['estimated_w'] * (1 - rho) + param.data * rho def get_estimated_weights(self, model): + """ Returns the weighted average parameter tensor """ estimated_params = [] for param in model.parameters(): estimated_params.append(self.state[param]['estimated_w']) diff --git a/mlbench/refimpls/pytorch/utils/communication.py b/mlbench/refimpls/pytorch/utils/communication.py index 9e35018..4980bac 100644 --- a/mlbench/refimpls/pytorch/utils/communication.py +++ b/mlbench/refimpls/pytorch/utils/communication.py @@ -1,6 +1,6 @@ import torch import torch.distributed as dist -import timeit +import time def elementwise_min(tensor): @@ -13,9 +13,11 @@ def aggregate_gradients(model, world_size): # all_reduce the gradients. for ind, param in enumerate(model.parameters()): # all reduce. - # start = timeit.default_timer() + + # dist.barrier() + # start = time.time() dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM) - # end = timeit.default_timer() - start + # end = time.time() - start # with open(str(dist.get_rank()) + "_communication_time.txt", "a+") as file: # file.write(str(end) + "\n") @@ -31,9 +33,10 @@ def aggregate_sparsified_gradients(model, world_size, sparse_vector_size, random gathered_list = [torch.zeros_like(params_sparse_tensors[ind]) for _ in range(world_size)] # all gather. - # start = timeit.default_timer() + # dist.barrier() + # start = time.time() dist.all_gather(gathered_list, params_sparse_tensors[ind]) - # end = timeit.default_timer() - start + # end = time.time() - start # with open(str(dist.get_rank()) + "_communication_time.txt", "a+") as file: # file.write(str(end) + "\n") @@ -45,8 +48,9 @@ def aggregate_sparsified_gradients(model, world_size, sparse_vector_size, random avg_grads[0, int(grad_tensor[0, index])] += grad_tensor[1, index] else: for grad_tensor in gathered_list: - begin = int(grad_tensor[0, sparse_vector_size]) - avg_grads[0, begin:begin + sparse_vector_size] += grad_tensor[0, 0:sparse_vector_size] + tensor_size = grad_tensor.size()[1] + begin = int(grad_tensor[0, 0]) + avg_grads[0, begin:(begin + tensor_size - 1)] += grad_tensor[0, 1:] avg_grads /= world_size param.grad.data = avg_grads