From b5b6953674e71b53a7bb69a431c36f948dc16e16 Mon Sep 17 00:00:00 2001 From: joyce-yuan Date: Tue, 11 Mar 2025 18:20:06 +0000 Subject: [PATCH 1/3] malicious configs and experiments script --- src/configs/algo_config_malicious.py | 334 +++++++++++++++++++++++ src/configs/sys_config_malicious.py | 384 +++++++++++++++++++++++++++ src/run_malicious_experiments.py | 148 +++++++++++ 3 files changed, 866 insertions(+) create mode 100644 src/configs/algo_config_malicious.py create mode 100644 src/configs/sys_config_malicious.py create mode 100644 src/run_malicious_experiments.py diff --git a/src/configs/algo_config_malicious.py b/src/configs/algo_config_malicious.py new file mode 100644 index 00000000..242b97b0 --- /dev/null +++ b/src/configs/algo_config_malicious.py @@ -0,0 +1,334 @@ +from typing import Dict, List +from .malicious_config import malicious_config_list +import random +from utils.types import ConfigType + + +def get_malicious_types(malicious_config_list: List[ConfigType]) -> Dict[str, str]: + """ + Assign a random malicious type to a single node. + """ + malicious_type = random.choice(malicious_config_list) + return malicious_type # type: ignore + + +# Algorithm Configuration + +iid_dispfl_clients_new: ConfigType = { + "algo": "dispfl", + "exp_type": "iid_dispfl", + "neighbors": 2, + "active_rate": 0.8, + "dense_ratio": 0.5, + "erk_power_scale": 1, + "anneal_factor": 0.5, + "epochs": 1000, + "model": "resnet34", + "model_lr": 3e-4, + "batch_size": 128, +} + +traditional_fl: ConfigType = { + # Collaboration setup + "algo": "fedavg", + "rounds": 2, + + # Model parameters + "model": "resnet10", + "model_lr": 3e-4, + "batch_size": 256, +} + +test_fl_inversion: ConfigType = { + # Collaboration setup + "algo": "fedavg", + "rounds": 5, + "optimizer": "sgd", + # Model parameters + "model": "resnet10", + "model_lr": 3e-4, + # "batch_size": 256, + "gia": True, +} + +fedweight: ConfigType = { + "algo": "fedweight", + "num_rep": 1, + # Client selection + "target_users": 3, + "similarity": "CosineSimilarity", # "EuclideanDistance", "CosineSimilarity", + # "community_type": "dataset", + "with_sim_consensus": True, + # Learning setup + "rounds": 10, + "epochs_per_round": 5, + "warmup_epochs": 50, + "model": "resnet10", + "local_train_after_aggr": True, + # "pretrained": True, + # "train_only_fc": True, + "model_lr": 1e-4, + "batch_size": 16, + # Knowledge transfer params + "average_last_layer": True, + "mask_finetune_last_layer": False, + # params for model + "position": 0, +} + +defkt: ConfigType = { + "algo": "defkt", + "central_client": 1, + "mask_last_layer": False, + "fine_tune_last_layer": False, + "epochs_per_round": 5, + "rounds": 10, + "epochs": 10, + "model": "resnet10", + "model_lr": 1e-4, + "batch_size": 16, + "num_teachers": 1, + # params for model + "position": 0, + "inp_shape": [128, 3, 32, 32], # This should be a List[int] +} + +fedavg_object_detect: ConfigType = { + "algo": "fedavg", + "exp_type": "", + # Learning setup + "epochs": 10, + "model": "yolo", + "model_lr": 1e-5, + "batch_size": 8, +} + +fediso: ConfigType = { + "algo": "fediso", + "num_rep": 1, + # Learning setup + "rounds": 100, + "epochs_per_round": 5, + "model": "resnet10", + "model_lr": 1e-4, + "batch_size": 16, + # params for model + "position": 0, +} + +L2C_users: int = 3 +L2C: ConfigType = { + "algo": "l2c", + "sharing": "weights", + "alpha_lr": 0.1, + "alpha_weight_decay": 0.01, + # Clients selection + "target_users_before_T_0": 0, # Only used if adapted_to_assumption True otherwise all users are kept + "target_users_after_T_0": round((L2C_users - 1) * 0.1), + "T_0": 10, # round after which only target_users_after_T_0 peers are kept + "epochs_per_round": 5, + "warmup_epochs": 5, + "rounds": 210, + "model": "resnet10", + "average_last_layer": True, + "model_lr": 1e-4, + "batch_size": 32, + "weight_decay": 5e-4, + "adapted_to_assumption": False, + # params for model + "position": 0, + "inp_shape": [128, 3, 32, 32], # This should be a List[int] +} + +fedcentral: ConfigType = { + "algo": "centralized", + "mask_last_layer": False, + "fine_tune_last_layer": False, + "epochs_per_round": 5, + "rounds": 100, + "model": "resnet10", + "model_lr": 1e-4, + "batch_size": 16, + # params for model + "position": 0, + "inp_shape": [128, 3, 32, 32], +} + +fedval: ConfigType = { + "algo": "fedval", + "num_rep": 1, + # Clients selection + "selection_strategy": "highest", # lowest, + "target_users_before_T_0": 1, + "target_users_after_T_0": 1, + "T_0": 400, # round after which only target_users_after_T_0 peers are kept + "community_type": None, # "dataset", + # Learning setup + "rounds": 200, + "epochs_per_round": 5, + "model": "resnet10", + "local_train_after_aggr": False, + "model_lr": 1e-4, + "batch_size": 16, + # Knowledge transfer params + "average_last_layer": True, + "mask_finetune_last_layer": False, + # params for model + "position": 0, +} + +swarm_users: int = 3 +swarm: ConfigType = { + "algo": "swarm", + "num_rep": 1, + # Clients selection + "target_users": 2, + "similarity": "CosineSimilarity", # "EuclideanDistance", "CosineSimilarity", + "with_sim_consensus": True, + # Learning setup + "epochs": 210, + "rounds": 210, + "epochs_per_round": 5, + "model": "resnet10", + "local_train_after_aggr": True, + "model_lr": 1e-4, + "batch_size": 16, + # Knowledge transfer params + "average_last_layer": True, + "mask_finetune_last_layer": False, + # params for model + "position": 0, +} + +fedstatic: ConfigType = { + # Collaboration setup + "algo": "fedstatic", + "topology": {"name": "ring"}, # type: ignore + "rounds": 200, + # Model parameters + "optimizer": "sgd", # TODO comment out for real training + "model": "resnet10", + "model_lr": 0.1, # 3e-4, + "batch_size": 256, +} + +swift: ConfigType = { + # Collaboration setup + "algo": "swift", + "topology": {"name": "watts_strogatz", "k": 3, "p": 0.2}, # type: ignore + "rounds": 20, + + # Model parameters + "model": "resnet10", + "model_lr": 3e-4, + "batch_size": 256, +} + +fed_dynamic_weights: ConfigType = { + # Collaboration setup + "algo": "feddynamic", + # comparison describes the metric or algorithm used to compare the weights of the models + # sampling describes the method used to sample the neighbors after the comparison + "topology": {"comparison": "weights_l2", "sampling": "closest"}, # type: ignore + "rounds": 200, + + # Model parameters + "optimizer": "sgd", + "model": "resnet10", + "model_lr": 0.1, + "batch_size": 256, +} + +fed_dynamic_loss: ConfigType = { + # Collaboration setup + "algo": "feddynamic", + "topology": {"comparison": "loss", "sampling": "closest"}, # type: ignore + "rounds": 20, + + # Model parameters + "model": "resnet6", + "model_lr": 3e-4, + "batch_size": 256, +} + +fedavgpush: ConfigType = { + # Collaboration setup + "algo": "fedavgpush", + "rounds": 2, + + # Model parameters + "model": "resnet10", + "model_lr": 3e-4, + "batch_size": 256, +} + +metaL2C_cifar10: ConfigType = { + "algo": "metal2c", + "sharing": "weights", # "updates" + # Client selection + "target_users_before_T_0": 0, + "target_users_after_T_0": 1, + "K_0": 0, # number of peers to keep as neighbors at T_0 (!) inverse that in L2C paper + "T_0": 250, # round after wich only K_0 peers are kept + "alpha_lr": 0.1, + "alpha_weight_decay": 0.01, + "epochs_per_round": 5, + "rounds": 3, + "model": "resnet18", + "average_last_layer": False, + "model_lr": 1e-4, + "batch_size": 64, + "optimizer": "sgd", + "weight_decay": 5e-4, + # params for model + "position": 0, + "inp_shape": [128, 3, 32, 32], +} + + +# Malicious Algorithm Configuration +malicious_traditional_model_update_attack: ConfigType = { + **traditional_fl, + **malicious_config_list["bad_weights"], +} + +malicious_traditional_data_poisoning_attack: ConfigType = { + **traditional_fl, + **malicious_config_list["data_poisoning"], +} + +malicious_traditional_model_poisoning_attack: ConfigType = { + **traditional_fl, + **malicious_config_list["backdoor_attack"], +} + + + +# List of algorithm configurations +algo_config_list: List[ConfigType] = [ + iid_dispfl_clients_new, + traditional_fl, + malicious_traditional_data_poisoning_attack, + malicious_traditional_model_poisoning_attack, + malicious_traditional_model_update_attack, + fedweight, + defkt, + fedavg_object_detect, + fediso, + L2C, + fedcentral, + fedval, + swarm, + fedstatic, + metaL2C_cifar10, +] + +# Malicious List of algorithm configurations +malicious_algo_config_list: List[ConfigType] = [ + fedstatic, + malicious_traditional_data_poisoning_attack, + malicious_traditional_model_poisoning_attack, + malicious_traditional_model_update_attack, +] + +default_config_list: List[ConfigType] = malicious_algo_config_list diff --git a/src/configs/sys_config_malicious.py b/src/configs/sys_config_malicious.py new file mode 100644 index 00000000..560b1c0a --- /dev/null +++ b/src/configs/sys_config_malicious.py @@ -0,0 +1,384 @@ +# System Configuration +# TODO: Set up multiple non-iid configurations here. The goal of a separate system config +# is to simulate different real-world scenarios without changing the algorithm configuration. +from typing import Any, Dict, List, Literal, Optional +import random +from utils.types import ConfigType + +# from utils.config_utils import get_sliding_window_support, get_device_ids +from .algo_config import ( + malicious_algo_config_list, + default_config_list, + fedstatic, # type: ignore + traditional_fl, # type: ignore + swift, # type: ignore + fedavgpush, # type: ignore + fed_dynamic_weights, # type: ignore + fed_dynamic_loss, # type: ignore +) + +sliding_window_8c_4cpc_support = { + "1": [0, 1, 2, 3], + "2": [1, 2, 3, 4], + "3": [2, 3, 4, 5], + "4": [3, 4, 5, 6], + "5": [4, 5, 6, 7], + "6": [5, 6, 7, 8], + "7": [6, 7, 8, 9], + "8": [7, 8, 9, 0], +} + + +def get_device_ids(num_users: int, gpus_available: List[int | Literal["cpu"]]) -> Dict[str, List[int | Literal["cpu"]]]: + """ + Get the GPU device IDs for the users. + """ + # TODO: Make it multi-host + device_ids: Dict[str, List[int | Literal["cpu"]]] = {} + for i in range(num_users + 1): # +1 for the super-node + index = i % len(gpus_available) + gpu_id = gpus_available[index] + device_ids[f"node_{i}"] = [gpu_id] + return device_ids + + +def get_algo_configs( + num_users: int, + algo_configs: List[ConfigType], + assignment_method: Literal[ + "sequential", "random", "mapping", "distribution" + ] = "sequential", + seed: Optional[int] = 1, + mapping: Optional[List[int]] = None, + distribution: Optional[Dict[int, int]] = None, +) -> Dict[str, ConfigType]: + """ + Assign an algorithm configuration to each node, allowing for repetition. + sequential: Assigns the algo_configs sequentially to the nodes + random: Assigns the algo_configs randomly to the nodes + mapping: Assigns the algo_configs based on the mapping of node index to algo index provided + distribution: Assigns the algo_configs based on the distribution of algo index to number of nodes provided + """ + algo_config_map: Dict[str, ConfigType] = {} + algo_config_map["node_0"] = algo_configs[0] # Super-node + if assignment_method == "sequential": + for i in range(1, num_users + 1): + algo_config_map[f"node_{i}"] = algo_configs[i % len(algo_configs)] + elif assignment_method == "random": + for i in range(1, num_users + 1): + algo_config_map[f"node_{i}"] = random.choice(algo_configs) + elif assignment_method == "mapping": + if not mapping: + raise ValueError("Mapping must be provided for assignment method 'mapping'") + assert len(mapping) == num_users + for i in range(1, num_users + 1): + algo_config_map[f"node_{i}"] = algo_configs[mapping[i - 1]] + elif assignment_method == "distribution": + if not distribution: + raise ValueError( + "Distribution must be provided for assignment method 'distribution'" + ) + total_users = sum(distribution.values()) + assert total_users == num_users + + # List of node indices to assign + node_indices = list(range(1, total_users + 1)) + # Seed for reproducibility + random.seed(seed) + # Shuffle the node indices based on the seed + random.shuffle(node_indices) + + # Assign nodes based on the shuffled indices + current_index = 0 + for algo_index, num_nodes in distribution.items(): + for i in range(num_nodes): + node_id = node_indices[current_index] + algo_config_map[f"node_{node_id}"] = algo_configs[algo_index] + current_index += 1 + else: + raise ValueError(f"Invalid assignment method: {assignment_method}") + # print("algo config mapping is: ", algo_config_map) + return algo_config_map + + +def get_domain_support( + num_users: int, base: str, domains: List[int] | List[str] +) -> Dict[str, str]: + assert num_users % len(domains) == 0 + + users_per_domain = num_users // len(domains) + support: Dict[str, str] = {} + support["0"] = f"{base}_{domains[0]}" + for i in range(1, num_users + 1): + support[str(i)] = f"{base}_{domains[(i-1) // users_per_domain]}" + return support + + +DOMAINNET_DMN = ["real", "sketch", "clipart"] + + +def get_domainnet_support(num_users: int, domains: List[str] = DOMAINNET_DMN): + return get_domain_support(num_users, "domainnet", domains) + + +domainnet_base_dir = "/u/abhi24/matlaberp2/p2p/imgs/domainnet/" +domainnet_dpath = { + "domainnet_real": domainnet_base_dir, + "domainnet_sketch": domainnet_base_dir, + "domainnet_clipart": domainnet_base_dir, + "domainnet_infograph": domainnet_base_dir, + "domainnet_quickdraw": domainnet_base_dir, + "domainnet_painting": domainnet_base_dir, +} + +CAMELYON17_DMN = [0, 3, 4] # + 1, 2 in test set +CAMELYON17_DMN_EXT = [0, 1, 2, 3, 4] # + 1, 2 in test set + + +def get_camelyon17_support(num_users: int, domains: List[int] = CAMELYON17_DMN): + return get_domain_support(num_users, "wilds_camelyon17", domains) + + +DIGIT_FIVE_2 = ["svhn", "mnist_m"] +DIGIT_FIVE = ["svhn", "mnist_m", "synth_digits"] +DIGIT_FIVE_5 = ["mnist", "usps", "svhn", "mnist_m", "synth_digits"] + + +def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE): + return get_domain_support(num_users, "", domains) + + +digit_five_dpath = { + "mnist": "./imgs/mnist", + "usps": "./imgs/usps", + "svhn": "./imgs/svhn", + "mnist_m": "./imgs/MNIST-M", + "synth_digits": "./imgs/syn_digit", +} + +CIFAR10_DSET = "cifar10" +CIAR10_DPATH = "./datasets/imgs/cifar10/" + +NUM_COLLABORATORS = 1 +DUMP_DIR = "/tmp/" + +num_users = 9 +mpi_system_config: ConfigType = { + "exp_id": "", + "comm": {"type": "MPI"}, + "num_users": num_users, + "num_collaborators": NUM_COLLABORATORS, + "dset": CIFAR10_DSET, + "dump_dir": DUMP_DIR, + "dpath": CIAR10_DPATH, + "seed": 32, + # node_0 is a server currently + # The device_ids dictionary depicts the GPUs on which the nodes reside. + # For a single-GPU environment, the config will look as follows (as it follows a 0-based indexing): + # "device_ids": {"node_0": [0], "node_1": [0], "node_2": [0], "node_3": [0]}, + "device_ids": get_device_ids(num_users=3, gpus_available=[1, 2]), + # use this when the list needs to be imported from the algo_config + # "algo": get_algo_configs(num_users=3, algo_configs=algo_configs_list), + "algos": get_algo_configs( + num_users=3, + algo_configs=default_config_list + ), # type: ignore + "samples_per_user": 5555, # TODO: To model scenarios where different users have different number of samples + # we need to make this a dictionary with user_id as key and number of samples as value + "train_label_distribution": "iid", # Either "iid", "non_iid" "support" + "test_label_distribution": "iid", # Either "iid", "non_iid" "support" + "test_samples_per_user": 200, # Only for non_iid test distribution + "exp_keys": [], +} + +mpi_non_iid_sys_config: ConfigType = { + "exp_id": "", + "comm": {"type": "MPI"}, + "seed": 1, + "num_collaborators": NUM_COLLABORATORS, + # "experiment_path": "./experiments/", + "dset": CIFAR10_DSET, + "dump_dir": DUMP_DIR, + "dpath": CIAR10_DPATH, + "load_existing": False, + "device_ids": get_device_ids(num_users=3, gpus_available=[0, 3]), + "algo": get_algo_configs(num_users=3, algo_configs=default_config_list), # type: ignore + "train_label_distribution": "non_iid", # Either "iid", "non_iid" "support", + "test_label_distribution": "non_iid", # Either "iid" "support", + "samples_per_user": 256, + "test_samples_per_user": 100, + "exp_keys": [], +} + +L2C_users = 3 +mpi_L2C_sys_config: ConfigType = { + "exp_id": "", + "comm": {"type": "MPI"}, + "seed": 1, + "num_collaborators": NUM_COLLABORATORS, + # "experiment_path": "./experiments/", + "dset": CIFAR10_DSET, + "dump_dir": DUMP_DIR, + "dpath": CIAR10_DPATH, + "load_existing": False, + "device_ids": get_device_ids(num_users=3, gpus_available=[1, 2]), + "algo": get_algo_configs(num_users=3, algo_configs=default_config_list), # type: ignore + "train_label_distribution": "iid", # Either "iid", "non_iid" "support", + "test_label_distribution": "iid", # Either "iid" "support", + "samples_per_user": 32, + "test_samples_per_user": 32, + "validation_prop": 0.05, + "exp_keys": [], +} + +mpi_metaL2C_support_sys_config: ConfigType = { + "exp_id": "", + "comm": {"type": "MPI"}, + "seed": 1, + "num_collaborators": NUM_COLLABORATORS, + # "experiment_path": "./experiments/", + "dset": CIFAR10_DSET, + "dump_dir": DUMP_DIR, + "dpath": CIAR10_DPATH, + "load_existing": False, + "device_ids": get_device_ids(num_users=3, gpus_available=[1, 2]), + "algo": get_algo_configs(num_users=3, algo_configs=default_config_list), # type: ignore + "train_label_distribution": "support", # Either "iid", "non_iid" "support", + "test_label_distribution": "support", # Either "iid" "support", + "support": sliding_window_8c_4cpc_support, + "samples_per_user": 32, + "test_samples_per_user": 32, + "validation_prop": 0.05, + "exp_keys": [], +} + +mpi_digitfive_sys_config: ConfigType = { + "exp_id": "", + "comm": {"type": "MPI"}, + "seed": 1, + "num_collaborators": NUM_COLLABORATORS, + "load_existing": False, + "dump_dir": DUMP_DIR, + "device_ids": get_device_ids(num_users=3, gpus_available=[6, 7]), + "algo": get_algo_configs(num_users=3, algo_configs=default_config_list), # type: ignore + # Dataset params + "dset": get_digit_five_support( + 3 + ), # get_camelyon17_support(fedcentral_client), #get_domainnet_support(fedcentral_client), + "dpath": digit_five_dpath, # wilds_dpath,#domainnet_dpath, + "train_label_distribution": "iid", # Either "iid", "shard" "support", + "test_label_distribution": "iid", # Either "iid" "support", + "samples_per_user": 256, + "test_samples_per_class": 100, + "community_type": "dataset", + "exp_keys": [], +} + +swarm_users = 3 +mpi_domainnet_sys_config: ConfigType = { + "exp_id": "", + "comm": {"type": "MPI"}, + "seed": 1, + "num_collaborators": NUM_COLLABORATORS, + "load_existing": False, + "dump_dir": DUMP_DIR, + "device_ids": get_device_ids(num_users=swarm_users, gpus_available=[3, 4]), + "algo": get_algo_configs(num_users=swarm_users, algo_configs=default_config_list), # type: ignore + # Dataset params + "dset": get_domainnet_support( + swarm_users + ), # get_camelyon17_support(fedcentral_client), #get_domainnet_support(fedcentral_client), + "dpath": domainnet_dpath, # wilds_dpath,#domainnet_dpath, + "train_label_distribution": "iid", # Either "iid", "shard" "support", + "test_label_distribution": "iid", # Either "iid" "support", + "samples_per_user": 32, + "test_samples_per_class": 100, + "community_type": "dataset", + "exp_keys": [], +} + +object_detect_system_config: ConfigType = { + "exp_id": "", + "num_users": 1, + "num_collaborators": NUM_COLLABORATORS, + "experiment_path": "./experiments/", + "dset": "pascal", + "dump_dir": DUMP_DIR, + "dpath": "./datasets/pascal/VOCdevkit/VOC2012/", + "seed": 37, + # node_0 is a server currently + # The device_ids dictionary depicts the GPUs on which the nodes reside. + # For a single-GPU environment, the config will look as follows (as it follows a 0-based indexing): + "device_ids": {"node_0": [1], "node_1": [2]}, + "algo": get_algo_configs(num_users=2, algo_configs=default_config_list), # type: ignore + "samples_per_user": 100, # TODO: To model scenarios where different users have different number of samples + # we need to make this a dictionary with user_id as key and number of samples as value + "train_label_distribution": "iid", + "test_label_distribution": "iid", + "exp_keys": [], +} + +dropout_dict: Any = { + "distribution_dict": { # leave dict empty to disable dropout + "method": "uniform", # "uniform", "normal" + "parameters": {} # "mean": 0.5, "std": 0.1 in case of normal distribution + }, + "dropout_rate": 0.0, # cutoff for dropout: [0,1] + "dropout_correlation": 0.0, # correlation between dropouts of successive rounds: [0,1] +} + +dropout_dict = {} #empty dict to disable dropout +dropout_dicts: Any = {"node_0": {}} +for i in range(1, num_users + 1): + dropout_dicts[f"node_{i}"] = dropout_dict + +# for swift or fedavgpush, just modify the algo_configs list +# for swift, synchronous should preferable be False +gpu_ids = [0, 1, 2, 3] +num_malicious = 4 +grpc_system_config: ConfigType = { + "exp_id": "dynamic_test", + "num_users": num_users, + "num_collaborators": NUM_COLLABORATORS, + "comm": {"type": "GRPC", "synchronous": True, "peer_ids": ["localhost:32048"]}, # The super-node + "dset": CIFAR10_DSET, + "dump_dir": DUMP_DIR, + "dpath": CIAR10_DPATH, + "seed": 2, + "device_ids": get_device_ids(num_users, gpu_ids), + "assign_based_on_host": True, + # "algos": get_algo_configs(num_users=num_users, algo_configs=default_config_list), # type: ignore + "algos": get_algo_configs(num_users=num_users, algo_configs=malicious_algo_config_list, assignment_method="distribution", distribution={0: num_users - num_malicious, 1: num_malicious}), # type: ignore + "samples_per_user": 50000 // num_users, # distributed equally + "train_label_distribution": "non_iid", + "alpha_data": 0.1, + "test_label_distribution": "iid", + "exp_keys": [], + "dropout_dicts": dropout_dicts, + "test_samples_per_user": 200, + "log_memory": True, + "streaming_aggregation": True, # Make it true for fedstatic +} + +grpc_system_config_gia: ConfigType = { + "exp_id": "static", + "num_users": num_users, + "num_collaborators": NUM_COLLABORATORS, + "comm": {"type": "GRPC", "synchronous": True, "peer_ids": ["localhost:50048"]}, # The super-node + "dset": CIFAR10_DSET, + "dump_dir": DUMP_DIR, + "dpath": CIAR10_DPATH, + "seed": 2, + "device_ids": get_device_ids(num_users, gpu_ids), + # "algos": get_algo_configs(num_users=num_users, algo_configs=default_config_list), # type: ignore + "algos": get_algo_configs(num_users=num_users, algo_configs=[fedstatic]), # type: ignore + "samples_per_user": 50000 // num_users, # distributed equally + "train_label_distribution": "iid", + "test_label_distribution": "iid", + "exp_keys": [], + "dropout_dicts": dropout_dicts, + "gia":True, + "gia_attackers":[1] +} + +current_config = grpc_system_config diff --git a/src/run_malicious_experiments.py b/src/run_malicious_experiments.py new file mode 100644 index 00000000..92861aa9 --- /dev/null +++ b/src/run_malicious_experiments.py @@ -0,0 +1,148 @@ +""" +Given a set of experiment keys to run, +this module writes the config files for each experiment key +and runs the main.py script for each experiment +""" + +import argparse +import subprocess +from typing import List + +from utils.types import ConfigType +from utils.config_utils import process_config +from utils.post_hoc_plot_utils2 import aggregate_metrics_across_users, plot_all_metrics + +from configs.sys_config import get_algo_configs, get_device_ids +from configs.algo_config import fedstatic +from configs.malicious_config import malicious_config_list +from configs.sys_config import grpc_system_config +import socket +import time + +# Get the hostname +hostname = socket.gethostname() +superhost_name = "" # Fill in the superhost name +full_hostname = "" # Fill in the full hostname + +post_hoc_plot: bool = True + +algo_to_algo_index = { + "data_poisoning": 0, + "gradient_attack": 1, + "backdoor_attack": 2, + "bad_weights": 3, + "sign_flip": 4, + "label_flip": 5, +} + +# for each experiment key, write the modifications to the config file +gpu_ids = [0, 1, 2, 3, 4, 5, 6, 7] +exp_dict = {} +num_nodes = 36 +for num_collaborators in [num_nodes, 1]: + for algo_name, algo_index in algo_to_algo_index.items(): + for topo in ["ring", "torus", "fully_connected", "erdos_renyi"]: + for m in [0, 1, 4]: + topo_config = {"name": topo} + if topo == "erdos_renyi": + topo_config["p"] = 0.13 + exp_dict[f"topo_{topo}x{algo_name}_{m}_malicious_{num_collaborators}_colab_3_5"] = { + "algo_config": fedstatic, + "sys_config": grpc_system_config, + "malicious_config": malicious_config_list[algo_name], + "num_malicious": m, + "algo": { + "topology": topo_config, + }, + "sys": { + "comm": {"type": "GRPC", "synchronous": True, "peer_ids": ["matlaber1.media.mit.edu:1112"]}, + "num_users": num_nodes, + "num_collaborators": num_collaborators, + "samples_per_user": 50000 // num_nodes, + "seed": 2, + "assign_based_on_host": True, + }, + } + +# parse the arguments +parser = argparse.ArgumentParser(description="host address of the nodes") + +args = parser.parse_args() + +skip = True +for exp_id, exp_config in exp_dict.items(): + if skip: + skip = False + continue + print(f"Running experiment {exp_config}") + # update the algo config with config settings + base_algo_config = exp_config["algo_config"].copy() + base_algo_config.update(exp_config["algo"]) + + # update the sys config with config settings + base_sys_config = exp_config["sys_config"].copy() + base_sys_config.update(exp_config["sys"]) + + # update the malicious config with config settings + base_malicious_config = exp_config["malicious_config"].copy() + base_malicious_config.update(base_algo_config) + + # set up the full config file by combining the algo and sys config + n: int = base_sys_config["num_users"] + seed: int = base_sys_config["seed"] + m: int = exp_config["num_malicious"] + base_sys_config["algos"] = get_algo_configs(num_users=n, algo_configs=[base_algo_config, base_malicious_config], seed=seed, assignment_method="distribution", distribution={0: n-m, 1: m}) + base_sys_config["device_ids"] = get_device_ids(n, gpu_ids) + + full_config = base_sys_config.copy() + full_config["exp_id"] = exp_id + + # write the config file as python file configs/temp_config.py + temp_config_path = "./configs/temp_config.py" + with open(temp_config_path, "w") as f: + f.write("current_config = ") + f.write(str(full_config)) + + superprocess = None + all_processes = [] + + # start the supernode + if hostname == superhost_name: + print("Starting supernode") + supernode_command: List[str] = ["python", "main.py", "-host", full_hostname, "-super", "true", "-s", temp_config_path] + superprocess = subprocess.Popen(supernode_command) + else: + print("Waiting for supernode to start") + time.sleep(10) + + # start the nodes + command_list: List[str] = ["python", "main.py", "-host", full_hostname, "-s", temp_config_path] + for i in range(num_nodes): + print(f"Starting process for user {i} exp {exp_id}") + # start a Popen process + all_processes.append(subprocess.Popen(command_list)) + + # once the experiment is done, run the next experiment + # Wait for the supernode process to finish + if superprocess: + superprocess.wait() + else: + # wait for all the processes to finish + for process in all_processes: + process.wait() + # wait for 5 more minutes + print("Processes done, waiting for 5 minutes") + time.sleep(300) + + # run the post-hoc analysis + if post_hoc_plot and superprocess is not None: + full_config = process_config(full_config) # this populates the results path + logs_dir = full_config["results_path"] + '/logs/' + + # aggregate metrics across all users + aggregate_metrics_across_users(logs_dir) + # plot all metrics + plot_all_metrics(logs_dir) + + # Continue with the next set of commands after supernode finishes + print(f"Supernode process {exp_id} finished. Proceeding to next set of commands.") \ No newline at end of file From 0a1bf8826f9c6c911a5f5c1fb4660c423eda9dfe Mon Sep 17 00:00:00 2001 From: joyce-yuan Date: Tue, 11 Mar 2025 18:27:27 +0000 Subject: [PATCH 2/3] small tweaks for malicious nodes --- src/algos/base_class.py | 27 ++++++++++++++++++++++++++- src/run_malicious_experiments.py | 2 +- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/src/algos/base_class.py b/src/algos/base_class.py index 899fcd47..c10ac0ac 100644 --- a/src/algos/base_class.py +++ b/src/algos/base_class.py @@ -29,6 +29,12 @@ TransformDataset, CorruptDataset, ) + +# import the possible attacks +from algos.attack_add_noise import AddNoiseAttack +from algos.attack_bad_weights import BadWeightsAttack +from algos.attack_sign_flip import SignFlipAttack + from utils.log_utils import LogUtils from utils.model_utils import ModelUtils from utils.community_utils import ( @@ -123,6 +129,8 @@ def __init__( if "gia" in config and self.node_id in config["gia_attackers"]: self.gia_attacker = True + + self.malicious_type = config.get("malicious_type", "normal") self.log_memory = config.get("log_memory", False) @@ -266,7 +274,7 @@ def set_shared_exp_parameters(self, config: ConfigType) -> None: def local_round_done(self) -> None: self.round += 1 - def get_model_weights(self, chop_model:bool=False) -> Dict[str, int|Dict[str, Any]]: + def get_model_weights(self, chop_model:bool=False, get_external_repr:bool=True) -> Dict[str, int|Dict[str, Any]]: """ Share the model weights params: @@ -275,6 +283,9 @@ def get_model_weights(self, chop_model:bool=False) -> Dict[str, int|Dict[str, An if chop_model: model, _ = self.model_utils.get_split_model(self.model, self.config["split_layer"]) model = model.state_dict() + elif get_external_repr and self.malicious_type != "normal": + # Get the external representation of the malicious model + model = self.get_malicious_model_weights() else: model = self.model.state_dict() message: Dict[str, int|Dict[str, Any]] = {"sender": self.node_id, "round": self.round, "model": model} @@ -290,6 +301,20 @@ def get_model_weights(self, chop_model:bool=False) -> Dict[str, int|Dict[str, An message["model"][key] = message["model"][key].to("cpu") return message + + def get_malicious_model_weights(self) -> Dict[str, Tensor]: + """ + Get the external representation of the model based on the malicious type. + """ + if self.malicious_type == "sign_flip": + return SignFlipAttack(self.config, self.model.state_dict()).get_representation() + elif self.malicious_type == "bad_weights": + # print("bad weights attack") + return BadWeightsAttack(self.config, self.model.state_dict()).get_representation() + elif self.malicious_type == "add_noise": + return AddNoiseAttack(self.config, self.model.state_dict()).get_representation() + else: + return self.model.state_dict() def get_local_rounds(self) -> int: return self.round diff --git a/src/run_malicious_experiments.py b/src/run_malicious_experiments.py index 92861aa9..315d6a2f 100644 --- a/src/run_malicious_experiments.py +++ b/src/run_malicious_experiments.py @@ -10,7 +10,7 @@ from utils.types import ConfigType from utils.config_utils import process_config -from utils.post_hoc_plot_utils2 import aggregate_metrics_across_users, plot_all_metrics +from utils.post_hoc_plot_utils import aggregate_metrics_across_users, plot_all_metrics from configs.sys_config import get_algo_configs, get_device_ids from configs.algo_config import fedstatic From ef7a56c2f3ae7d099934f3b1f9b0328f3a2970bd Mon Sep 17 00:00:00 2001 From: joyce-yuan Date: Tue, 11 Mar 2025 18:28:42 +0000 Subject: [PATCH 3/3] small fix --- src/configs/sys_config_malicious.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/configs/sys_config_malicious.py b/src/configs/sys_config_malicious.py index 560b1c0a..e8c278d0 100644 --- a/src/configs/sys_config_malicious.py +++ b/src/configs/sys_config_malicious.py @@ -6,7 +6,7 @@ from utils.types import ConfigType # from utils.config_utils import get_sliding_window_support, get_device_ids -from .algo_config import ( +from .algo_config_malicious import ( malicious_algo_config_list, default_config_list, fedstatic, # type: ignore