diff --git a/data_preprocessing/base/distributed_data_util.py b/data_preprocessing/base/distributed_data_util.py new file mode 100644 index 00000000..ad74b935 --- /dev/null +++ b/data_preprocessing/base/distributed_data_util.py @@ -0,0 +1,176 @@ +""" +Utils for generating distributed datasets across different workers +Usage: + python data_preprocessing/base/distributed_data_util.py + --dataset 20news + --data_file data/data_loaders/20news_data_loader.pkl + --partition_file data/partition/20news_partition.pkl + --partition_method uniform + --client_num_per_round 10 + --comm_round 1000 +""" +import argparse +import json +import logging +import os +import random +import sys + +import tqdm + +sys.path.insert(0, os.path.abspath(os.path.join(os.getcwd(), ""))) + +import data_preprocessing.AGNews.data_loader +import data_preprocessing.SST_2.data_loader +import data_preprocessing.SemEval2010Task8.data_loader +import data_preprocessing.Sentiment140.data_loader +import data_preprocessing.news_20.data_loader +from data_preprocessing.base.utils import * + + +def add_args(parser): + parser.add_argument('--dataset', type=str, default='sentiment140', metavar='N', + help='dataset used for training') + + parser.add_argument('--data_file', type=str, default='data/data_loaders/sentiment_140_data_loader.pkl', + metavar="DF", help='data pickle file') + + parser.add_argument('--partition_file', type=str, default='data/partition/sentiment_140_partition.pkl', + metavar="PF", help='partition pickle file') + + parser.add_argument('--partition_method', type=str, default='uniform', metavar='N', + help='how to partition the dataset on local workers') + + parser.add_argument('--client_num_per_round', type=int, default=4, metavar='NN', + help='number of workers') + + parser.add_argument('--comm_round', type=int, default=10, + help='how many round of communications we should use') + + args = parser.parse_args() + return args + + +def load_data(args, idx=0): + dataset_name = args.dataset + print("load_data. dataset_name = %s" % dataset_name) + if dataset_name == "20news": + return data_preprocessing.news_20.data_loader. \ + ClientDataLoader(os.path.abspath(args.data_file), os.path.abspath(args.partition_file), + partition_method=args.partition_method, tokenize=True, client_idx=idx) + elif dataset_name == "agnews": + return data_preprocessing.AGNews.data_loader. \ + ClientDataLoader(os.path.abspath(args.data_file), os.path.abspath(args.partition_file), + partition_method=args.partition_method, tokenize=True, client_idx=idx) + elif dataset_name == "semeval_2010_task8": + return data_preprocessing.SemEval2010Task8.data_loader. \ + ClientDataLoader(os.path.abspath(args.data_file), os.path.abspath(args.partition_file), + partition_method=args.partition_method, tokenize=True, client_idx=idx) + elif dataset_name == "sentiment140": + return data_preprocessing.Sentiment140.data_loader. \ + ClientDataLoader(os.path.abspath(args.data_file), os.path.abspath(args.partition_file), + partition_method=args.partition_method, tokenize=True, client_idx=idx) + elif dataset_name == "sst_2": + return data_preprocessing.SST_2.data_loader. \ + ClientDataLoader(os.path.abspath(args.data_file), os.path.abspath(args.partition_file), + partition_method=args.partition_method, tokenize=True, client_idx=idx) + else: + raise Exception("No such dataset") + return data_preprocessing.Sentiment140.data_loader. \ + ClientDataLoader(os.path.abspath(args.data_file), os.path.abspath(args.partition_file), + partition_method=args.partition_method, tokenize=True, client_idx=idx) + + +def client_sampling_all_rounds(comm_round, client_num_in_total, client_num_per_round): + + def client_sampling(round_idx): + if client_num_in_total == client_num_per_round: + client_indexes = [client_index for client_index in range(client_num_in_total)] + else: + num_clients = min(client_num_per_round, client_num_in_total) + # may select fixed random seeds for comparison, e.g. random.seed(round_idx) + client_indexes = random.sample(range(client_num_in_total), num_clients) + print("client_indexes = %s" % str(client_indexes)) + return client_indexes + + sample_lists = [[] for i in range(comm_round)] + for round_idx in range(comm_round): + samples = client_sampling(round_idx) + for worker_idx in range(len(samples)): + sample_lists[round_idx] = samples + + return sample_lists + + +def generate_source_vocab(args, do_remove_low_freq_words=5, do_remove_stop_words=0): + """ + preprocess global dataset to generate source vocab + """ + print("generate source vocab...") + # remove low frequency words and stop words + # build frequency vocabulary based on tokenized data + data_loader = load_data(args, None) + x = [] + train_x = data_loader.get_train_batch_data()["X"] + test_x = data_loader.get_test_batch_data()["X"] + x.extend(train_x) + x.extend(test_x) + freq_vocab = build_freq_vocab(x) + print("frequency vocab size %d", len(freq_vocab)) + + if do_remove_low_freq_words > 0: + print("remove low frequency words") + # build low frequency words set + low_freq_words = set() + for token, freq in freq_vocab.items(): + if freq <= do_remove_low_freq_words: + low_freq_words.add(token) + train_x = remove_words(train_x, low_freq_words) + test_x = remove_words(test_x, low_freq_words) + + if do_remove_stop_words: + print("remove stop words") + __remove_words(STOP_WORDS) + + x.clear() + x.extend(train_x) + x.extend(test_x) + source_vocab = build_vocab(x) + print("source vocab size %d", len(source_vocab)) + + return source_vocab + + +if __name__ == '__main__': + + parser = argparse.ArgumentParser() + args = add_args(parser) + + dataset = load_data(args) + attributes = dataset.get_attributes() + + sample_lists = client_sampling_all_rounds(args.comm_round, attributes["n_clients"], args.client_num_per_round) + + worker_list = list(map(list, zip(*sample_lists))) # transpose + worker_num = len(worker_list) + + os.makedirs('data/distributed/{}_distributed'.format(args.dataset), exist_ok=True) + + info_file = 'data/distributed/{}_distributed/info.json'.format(args.dataset) + info = vars(args) + with open(info_file, 'w') as f: + json.dump(info, f) + + for worker_idx in tqdm.tqdm(range(worker_num)): + worker_file = 'data/distributed/{}_distributed/{}.json'.format(args.dataset, worker_idx + 1) + with open(worker_file, 'w') as f: + json.dump(worker_list[worker_idx], f) + + # source vocab + vocab_file = 'data/distributed/{}_distributed/vocab.json'.format(args.dataset) + if os.path.exists(vocab_file): + print("vocab_file %s exists, skip generating..."%vocab_file) + else: + source_vocab = generate_source_vocab(args) + with open(vocab_file, 'w') as f: + json.dump(source_vocab, f) \ No newline at end of file diff --git a/experiments/distributed/bilstm_exps/main_text_classification.py b/experiments/distributed/bilstm_exps/main_text_classification.py index e7d5d6b7..8290aed9 100644 --- a/experiments/distributed/bilstm_exps/main_text_classification.py +++ b/experiments/distributed/bilstm_exps/main_text_classification.py @@ -1,4 +1,5 @@ import argparse +import json import logging import os import random @@ -12,6 +13,8 @@ import wandb from spacy.lang.en import STOP_WORDS +sys.path.insert(0, os.path.abspath(os.path.join(os.getcwd(), ""))) + from FedML.fedml_api.distributed.fedopt import FedOptAPI from FedML.fedml_api.distributed.fedavg import FedAvgAPI from FedML.fedml_api.distributed.utils.gpu_mapping import mapping_processes_to_gpu_device_from_yaml_file @@ -82,7 +85,7 @@ def add_args(parser): parser.add_argument('--embedding_length', type=int, default=300, metavar="EL", help='dimension of word embedding') - parser.add_argument('--optimizer', type=str, default='adam', + parser.add_argument('--client_optimizer', type=str, default='adam', help='SGD with momentum; adam') parser.add_argument('--server_optimizer', type=str, default='sgd', @@ -94,7 +97,7 @@ def add_args(parser): parser.add_argument('--server_lr', type=float, default=0.001, help='server learning rate (default: 0.001)') - parser.add_argument('--fea_alg', type=str, default="fedavg", help="The training algorithm in federated learning") + parser.add_argument('--fed_alg', type=str, default="fedavg", help="The training algorithm in federated learning") parser.add_argument('--wd', help='weight decay parameter;', type=float, default=0.001) @@ -126,6 +129,13 @@ def add_args(parser): parser.add_argument('--do_remove_low_freq_words', type=int, default=5, metavar="RLW", help='remove words in lower frequency') + parser.add_argument('--gpu_mapping_file', type=str, default="gpu_mapping.yaml", + help='the gpu utilization file for servers and clients. If there is no \ + gpu_util_file, gpu will not be used.') + + parser.add_argument('--gpu_mapping_key', type=str, default="mapping_default", + help='the key in gpu utilization file') + args = parser.parse_args() return args @@ -197,6 +207,83 @@ def load_data(args, dataset_name): return dataset +def load_distributed_data(process_id, args): + """ + load distributed data from {dataset}_distributed generated by distributed_data_utils.py + args: + process_id -- also referred as worker_id, indicates the index of worker in each training round + args -- program arguments including information like dataset, comm_round etc + """ + dataset_name = args.dataset + train_data_local_num_dict = dict() + train_data_local_dict = dict() + test_data_local_dict = dict() + client_list = None + + info_file = 'data/distributed/{}_distributed/info.json'.format(args.dataset) + with open(info_file, 'r') as f: + info = json.load(f) + assert info['comm_round'] == args.comm_round + assert info['client_num_per_round'] == args.client_num_per_round + assert info['data_file'] == args.data_file + assert info['partition_file'] == args.partition_file + + worker_file = 'data/distributed/{}_distributed/{}.json'.format(args.dataset, process_id) + with open(worker_file, 'r') as f: + client_list = list(set(json.load(f))) + if process_id - 1 not in client_list: + client_list = [process_id - 1] + client_list # include client of (process_id - 1) to bootstrap trainer init + + logging.info("load_data. dataset_name = %s" % dataset_name) + client_data_loaders = [] + if dataset_name == "20news": + for client_index in client_list: + data_loader = data_preprocessing.news_20.data_loader. \ + ClientDataLoader(os.path.abspath(args.data_file), os.path.abspath(args.partition_file), + partition_method=args.partition_method, tokenize=True, client_idx=client_index) + train_data_local_num_dict[client_index] = data_loader.get_train_data_num() + train_data_local_dict[client_index] = data_loader.get_train_batch_data(args.batch_size) + test_data_local_dict[client_index] = data_loader.get_test_batch_data(args.batch_size) + elif dataset_name == "agnews": + for client_index in client_list: + data_loader = data_preprocessing.AGNews.data_loader. \ + ClientDataLoader(os.path.abspath(args.data_file), os.path.abspath(args.partition_file), + partition_method=args.partition_method, tokenize=True, client_idx=client_index) + train_data_local_num_dict[client_index] = data_loader.get_train_data_num() + train_data_local_dict[client_index] = data_loader.get_train_batch_data(args.batch_size) + test_data_local_dict[client_index] = data_loader.get_test_batch_data(args.batch_size) + elif dataset_name == "semeval_2010_task8": + for client_index in client_list: + data_loader = data_preprocessing.SemEval2010Task8.data_loader. \ + ClientDataLoader(os.path.abspath(args.data_file), os.path.abspath(args.partition_file), + partition_method=args.partition_method, tokenize=True, client_idx=client_index) + train_data_local_num_dict[client_index] = data_loader.get_train_data_num() + train_data_local_dict[client_index] = data_loader.get_train_batch_data(args.batch_size) + test_data_local_dict[client_index] = data_loader.get_test_batch_data(args.batch_size) + elif dataset_name == "sentiment140": + for client_index in client_list: + data_loader = data_preprocessing.Sentiment140.data_loader. \ + ClientDataLoader(os.path.abspath(args.data_file), os.path.abspath(args.partition_file), + partition_method=args.partition_method, tokenize=True, client_idx=client_index) + train_data_local_num_dict[client_index] = data_loader.get_train_data_num() + train_data_local_dict[client_index] = data_loader.get_train_batch_data(args.batch_size) + test_data_local_dict[client_index] = data_loader.get_test_batch_data(args.batch_size) + elif dataset_name == "sst_2": + for client_index in client_list: + data_loader = data_preprocessing.SST_2.data_loader. \ + ClientDataLoader(os.path.abspath(args.data_file), os.path.abspath(args.partition_file), + partition_method=args.partition_method, tokenize=True, client_idx=client_index) + train_data_local_num_dict[client_index] = data_loader.get_train_data_num() + train_data_local_dict[client_index] = data_loader.get_train_batch_data(args.batch_size) + test_data_local_dict[client_index] = data_loader.get_test_batch_data(args.batch_size) + else: + raise Exception("No such dataset") + attributes = data_loader.get_attributes() + dataset = [data_loader.get_train_data_num(), data_loader.get_test_data_num(), + train_data_local_num_dict, train_data_local_dict, test_data_local_dict, attributes] + return dataset + + def preprocess_data(args, dataset): """ preprocessing data for further training, which includes load pretrianed embeddings, padding data and transforming @@ -307,6 +394,73 @@ def __remove_words(word_set): return dataset +def preprocess_distributed_data(args, dataset): + """ + distributed version of preprocess_data, loads source_vocab instead of retrieving all tokens from train/test_gloal_data + """ + logging.info("preproccess data") + [train_data_num, test_data_num, train_data_local_num_dict, train_data_local_dict, + test_data_local_dict, attributes] = dataset + + target_vocab = attributes["target_vocab"] + source_vocab = dict() + vocab_file = 'data/distributed/{}_distributed/vocab.json'.format(args.dataset) + with open(vocab_file, 'r') as f: + source_vocab = json.load(f) + logging.info("source vocab size %d", len(source_vocab)) + + # load pretrained embeddings. Note that we use source vocabulary here to reduce the input size + embedding_weights = None + if args.embedding_name: + if args.embedding_name == "word2vec": + logging.info("load word embedding %s" % args.embedding_name) + source_vocab, embedding_weights = load_word2vec_embedding(os.path.abspath(args.embedding_file), + source_vocab) + elif args.embedding_name == "glove": + logging.info("load word embedding %s" % args.embedding_name) + source_vocab, embedding_weights = load_glove_embedding(os.path.abspath(args.embedding_file), source_vocab, + args.embedding_length) + else: + raise Exception("No such embedding") + embedding_weights = torch.tensor(embedding_weights, dtype=torch.float) + + if args.max_seq_len == -1: + raise Exception("Max sequence length must be specified!") + + new_train_data_local_dict = dict() + new_test_data_local_dict = dict() + + # padding data and transforming token as well as label to index + for client_index in train_data_local_num_dict.keys(): + new_train_data_local = list() + for i, batch_data in enumerate(train_data_local_dict[client_index]): + padding_x, seq_lens = padding_data(batch_data["X"], args.max_seq_len) + new_train_data_local.append( + {"X": token_to_idx(padding_x, source_vocab), + "Y": label_to_idx(batch_data["Y"], target_vocab), + "seq_lens": seq_lens}) + new_train_data_local_dict[client_index] = new_train_data_local + + new_test_data_local = list() + for i, batch_data in enumerate(test_data_local_dict[client_index]): + padding_x, seq_lens = padding_data(batch_data["X"], args.max_seq_len) + new_test_data_local.append( + {"X": token_to_idx(padding_x, source_vocab), + "Y": label_to_idx(batch_data["Y"], target_vocab), + "seq_lens": seq_lens}) + new_test_data_local_dict[client_index] = new_test_data_local + + train_global_data = [] + test_global_data = [] + for key in train_data_local_num_dict.keys(): + train_global_data.extend(new_train_data_local_dict[key]) + test_global_data.extend(new_test_data_local_dict[key]) + logging.info("size of source vocab: %s, size of target vocab: %s" % (len(source_vocab), len(target_vocab))) + dataset = [train_data_num, test_data_num, train_global_data, test_global_data, train_data_local_num_dict, + new_train_data_local_dict, new_test_data_local_dict, source_vocab, target_vocab, embedding_weights] + return dataset + + def create_model(args, model_name, input_size, output_size, embedding_weights): logging.info("create_model. model_name = %s, input_size = %s, output_size = %s" % (model_name, input_size, output_size)) @@ -323,22 +477,6 @@ def create_model(args, model_name, input_size, output_size, embedding_weights): return model -def init_training_device(process_ID, fl_worker_num, gpu_num_per_machine): - # initialize the mapping from process ID to GPU ID: - if process_ID == 0: - device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") - return device - process_gpu_dict = dict() - for client_index in range(fl_worker_num): - gpu_index = client_index % gpu_num_per_machine - process_gpu_dict[client_index] = gpu_index - - logging.info(process_gpu_dict) - device = torch.device("cuda:" + str(process_gpu_dict[process_ID - 1]) if torch.cuda.is_available() else "cpu") - logging.info(device) - return device - - if __name__ == "__main__": logging.info("start") @@ -396,11 +534,25 @@ def init_training_device(process_ID, fl_worker_num, gpu_num_per_machine): # check "gpu_mapping.yaml" to see how to define the topology device = mapping_processes_to_gpu_device_from_yaml_file(process_id, worker_number, \ args.gpu_mapping_file, args.gpu_mapping_key) + logging.info("process_id = %d, size = %d, device=%s" % (process_id, worker_number, str(device))) # load data - dataset = load_data(args, args.dataset) - dataset = preprocess_data(args, dataset) + preprocessed_sampling_lists = None + if process_id == 0: + dataset = load_data(args, args.dataset) + worker_lists = [] + for i in range(args.client_num_per_round) : + worker_idx = i + 1 + worker_file = 'data/distributed/{}_distributed/{}.json'.format(args.dataset, worker_idx) + with open(worker_file, 'r') as f: + client_list = json.load(f) + worker_lists.append(client_list) + preprocessed_sampling_lists = list(map(list, zip(*worker_lists))) # return [[round1 clients list], [round2 clients list], ...] + else: + dataset = load_distributed_data(process_id, args) + + dataset = preprocess_distributed_data(args, dataset) [train_data_num, test_data_num, train_data_global, test_data_global, train_data_local_num_dict, train_data_local_dict, test_data_local_dict, source_vocab, target_vocab, embedding_weights] = dataset @@ -420,12 +572,12 @@ def init_training_device(process_ID, fl_worker_num, gpu_num_per_machine): FedAvgAPI.FedML_FedAvg_distributed(process_id, worker_number, device, comm, model, train_data_num, train_data_global, test_data_global, train_data_local_num_dict, train_data_local_dict, test_data_local_dict, args, - model_trainer) + model_trainer, preprocessed_sampling_lists=preprocessed_sampling_lists) elif args.fed_alg == "fedopt": FedOptAPI.FedML_FedOpt_distributed(process_id, worker_number, device, comm, model, train_data_num, train_data_global, test_data_global, train_data_local_num_dict, train_data_local_dict, test_data_local_dict, args, - model_trainer) + model_trainer, preprocessed_sampling_lists=preprocessed_sampling_lists) else: raise Exception("No such federated algorithm") logging.info("end") diff --git a/experiments/distributed/bilstm_exps/run_bilstm_exps.sh b/experiments/distributed/bilstm_exps/run_bilstm_exps.sh index fcd3bdbf..83ba3579 100644 --- a/experiments/distributed/bilstm_exps/run_bilstm_exps.sh +++ b/experiments/distributed/bilstm_exps/run_bilstm_exps.sh @@ -27,11 +27,20 @@ echo $PROCESS_NUM hostname > mpi_host_file +python data_preprocessing/base/distributed_data_util.py \ + --client_num_per_round $CLIENT_NUM \ + --comm_round $ROUND \ + --dataset $DATASET \ + --data_file $DATA_FILE \ + --partition_file $PARTITION_FILE + if [ "$FED_ALG" = "fedavg" ] then - mpirun -np $PROCESS_NUM -hostfile ./mpi_host_file python3 experiments/distributed/bilstm_exps/main_fedavg.py \ + mpirun -np $PROCESS_NUM -hostfile ./mpi_host_file python3 experiments/distributed/bilstm_exps/main_text_classification.py \ --gpu_num_per_server $GPU_NUM_PER_SERVER \ --gpu_server_num $SERVER_NUM \ + --gpu_mapping_file "experiments/distributed/bilstm_exps/gpu_mapping.yaml" \ + --gpu_mapping_key mapping_default \ --dataset $DATASET \ --data_file $DATA_FILE \ --partition_file $PARTITION_FILE \ @@ -48,12 +57,15 @@ then --max_seq_len $MAX_SEQ_LEN \ --do_remove_stop_words $REMOVE_WORD \ --do_remove_low_freq_words $REMOVE_LOW_FREQ_WORD \ + --fed_alg $FED_ALG \ --ci $CI elif [ "$FED_ALG" = "fedopt" ] then - mpirun -np $PROCESS_NUM -hostfile ./mpi_host_file python3 experiments/distributed/bilstm_exps/main_fedopt.py \ + mpirun -np $PROCESS_NUM -hostfile ./mpi_host_file python3 experiments/distributed/bilstm_exps/main_text_classification.py \ --gpu_num_per_server $GPU_NUM_PER_SERVER \ --gpu_server_num $SERVER_NUM \ + --gpu_mapping_file "experiments/distributed/bilstm_exps/gpu_mapping.yaml" \ + --gpu_mapping_key mapping_default \ --dataset $DATASET \ --data_file $DATA_FILE \ --partition_file $PARTITION_FILE \ @@ -71,6 +83,7 @@ then --max_seq_len $MAX_SEQ_LEN \ --do_remove_stop_words $REMOVE_WORD \ --do_remove_low_freq_words $REMOVE_LOW_FREQ_WORD \ + --fed_alg $FED_ALG \ --ci $CI else echo "no such federated algorithm!" diff --git a/training/nwp_rnn_trainer.py b/training/nwp_rnn_trainer.py index 6b6711a9..98ea4710 100644 --- a/training/nwp_rnn_trainer.py +++ b/training/nwp_rnn_trainer.py @@ -63,7 +63,7 @@ def test(self, test_data, device, args): test_loss += loss.item() * target.size(0) test_total += target.size(0) - return test_acc, test_total, test_loss + return {'test_correct': test_acc, 'test_total': test_total, 'test_loss': test_loss} def test_on_the_server(self, train_data_local_dict, test_data_local_dict, device, args=None) -> bool: return False \ No newline at end of file diff --git a/training/text_classification_bilstm_trainer.py b/training/text_classification_bilstm_trainer.py index d4e26f27..2f7c134b 100644 --- a/training/text_classification_bilstm_trainer.py +++ b/training/text_classification_bilstm_trainer.py @@ -14,7 +14,6 @@ def set_model_params(self, model_parameters): def train(self, train_data, device, args): model = self.model - model.to(device) model.train() @@ -45,7 +44,6 @@ def train(self, train_data, device, args): acc = 100.0 * num_corrects / x.size()[0] loss.backward() optimizer.step() - batch_loss.append(loss.item()) batch_acc.append(acc.item()) if len(batch_loss) > 0: @@ -81,4 +79,7 @@ def test(self, test_data, device, args): test_loss += loss.item() * y.size(0) test_total += y.size(0) - return test_acc, test_total, test_loss + return {'test_correct': test_acc, 'test_total': test_total, 'test_loss': test_loss} + + def test_on_the_server(self, train_data_local_dict, test_data_local_dict, device, args=None) -> bool: + return False \ No newline at end of file