diff --git a/.gitignore b/.gitignore index cd2af6a..05ea011 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,7 @@ logs data build *.pdf -torch_wheels \ No newline at end of file +torch_wheels +Miniforge3-Linux-x86_64.sh +/examples/**/*.txt +/examples/**/*.json diff --git a/examples/flwr_tutorial_1_6/colext_config.yaml b/examples/flwr_tutorial_1_6/colext_config.yaml index c5eafc0..10311bb 100644 --- a/examples/flwr_tutorial_1_6/colext_config.yaml +++ b/examples/flwr_tutorial_1_6/colext_config.yaml @@ -1,4 +1,4 @@ -project: colext_example # project name should not have spaces +project: network_exp # project name should not have spaces job_name: "SOTA FL experiment" # deployer: local_py @@ -13,15 +13,15 @@ code: args: "--num_clients=${COLEXT_N_CLIENTS} --num_rounds=3" devices: - - { device_type: JetsonAGXOrin, count: 2 } - - { device_type: JetsonOrinNano, count: 2 } - - { device_type: JetsonXavierNX, count: 2 } + - { dev_type: JetsonAGXOrin, count: 1 } + - { dev_type: JetsonOrinNano, count: 1 } + - { dev_type: JetsonXavierNX, count: 2 } # - { device_type: JetsonNano, count: 6 } # - { device_type: LattePandaDelta3, count: 2 } # - { device_type: OrangePi5B, count: 8 } # Monitoring defaults -# monitoring: -# live_metrics: True # True/False -# push_interval: 10 # in seconds -# scraping_interval: 0.3 # in seconds +monitoring: + live_metrics: True # True/False + push_interval: 10 # in seconds + scraping_interval: 0.3 # in seconds diff --git a/examples/flwr_tutorial_1_8/colext_config.yaml b/examples/flwr_tutorial_1_8/colext_config.yaml index ef89f95..ba00a25 100644 --- a/examples/flwr_tutorial_1_8/colext_config.yaml +++ b/examples/flwr_tutorial_1_8/colext_config.yaml @@ -1,7 +1,7 @@ -project: colext_example # project name should not have spaces +project: network_exp # project name should not have spaces job_name: "SOTA FL experiment" -# deployer: local_py +deployer: sbc # python_version: "3.10" code: @@ -16,31 +16,57 @@ code: command: >- python3 ./server.py --num_clients=${COLEXT_N_CLIENTS} - --num_rounds=3 + --num_rounds=30 -clients: - # - dev_type: LattePandaDelta3 - # count: 4 - # add_args: "--max_step_count=50" +network: + - tag: default + upstream: + bandwidth: 100Mbps + latency: 1ms + downstream: + bandwidth: 100Mbps + latency: 1ms + - tag: slow + upstream: + bandwidth: 1Mbps + latency: 1ms + downstream: + bandwidth: 1Mbps + latency: 1ms + - tag: veryslow + upstream: + bandwidth: 100Kbps + latency: 1ms + downstream: + bandwidth: 100Kbps + latency: 1ms + - - dev_type: JetsonOrinNano - count: 4 - add_args: "--max_step_count=200" +clients: + # - dev_type: LattePandaDelta3 + # count: 4 + #add_args: "--max_step_count=50" + #network: veryslow - - dev_type: OrangePi5B - add_args: "--max_step_count=100" + - dev_type: JetsonOrinNano + add_args: "--max_step_count=200" + network: slow - - dev_type: OrangePi5B - count: 2 - add_args: "--max_step_count=50" + - dev_type: OrangePi5B + add_args: "--max_step_count=100" + network: default - # - { dev_type: JetsonAGXOrin, count: 1 } + - dev_type: OrangePi5B + count: 1 + add_args: "--max_step_count=50" + network: default + # - { dev_type: JetsonAGXOrin, count: 1 } # - { dev_type: JetsonOrinNano, count: 2 } # - { dev_type: JetsonXavierNX, count: 2 } # - { dev_type: JetsonNano, count: 6 } -# Monitoring defaults -# monitoring: -# live_metrics: True # True/False -# push_interval: 10 # in seconds -# scraping_interval: 0.3 # in seconds + # Monitoring defaults +monitoring: + live_metrics: True # True/False + push_interval: 10 # in seconds + scraping_interval: 0.3 # in seconds diff --git a/examples/flwr_tutorial_1_8/networktemp/group-0/networkrules.txt b/examples/flwr_tutorial_1_8/networktemp/group-0/networkrules.txt new file mode 100644 index 0000000..0d68a27 --- /dev/null +++ b/examples/flwr_tutorial_1_8/networktemp/group-0/networkrules.txt @@ -0,0 +1,2 @@ +tcset eth0 --direction outgoing ['rate 1Mbps', 'delay 1ms'] --change +tcset eth0 --direction incoming ['rate 1Mbps', 'delay 1ms'] --change diff --git a/examples/flwr_tutorial_1_8/networktemp/group-1/networkrules.txt b/examples/flwr_tutorial_1_8/networktemp/group-1/networkrules.txt new file mode 100644 index 0000000..8b083bf --- /dev/null +++ b/examples/flwr_tutorial_1_8/networktemp/group-1/networkrules.txt @@ -0,0 +1,2 @@ +tcset eth0 --direction outgoing ['rate 100Mbps', 'delay 1ms'] --change +tcset eth0 --direction incoming ['rate 100Mbps', 'delay 1ms'] --change diff --git a/examples/flwr_tutorial_1_8/networktemp/group-2/networkrules.txt b/examples/flwr_tutorial_1_8/networktemp/group-2/networkrules.txt new file mode 100644 index 0000000..8b083bf --- /dev/null +++ b/examples/flwr_tutorial_1_8/networktemp/group-2/networkrules.txt @@ -0,0 +1,2 @@ +tcset eth0 --direction outgoing ['rate 100Mbps', 'delay 1ms'] --change +tcset eth0 --direction incoming ['rate 100Mbps', 'delay 1ms'] --change diff --git a/examples/flwr_tutorial_1_8_network/client.py b/examples/flwr_tutorial_1_8_network/client.py new file mode 100644 index 0000000..1147792 --- /dev/null +++ b/examples/flwr_tutorial_1_8_network/client.py @@ -0,0 +1,183 @@ +# Copied from: https://github.com/adap/flower/blob/dcffb484fb7d1e712f65d414fb31aa021f0a760e/examples/quickstart-pytorch/client.py +import argparse +import warnings +from collections import OrderedDict + +from flwr.client import NumPyClient, ClientApp +from flwr_datasets import FederatedDataset +import torch +import torch.nn as nn +import torch.nn.functional as F +from torch.utils.data import DataLoader +from torchvision.transforms import Compose, Normalize, ToTensor +from tqdm import tqdm + +from colext import MonitorFlwrClient + +# ############################################################################# +# 1. Regular PyTorch pipeline: nn.Module, train, test, and DataLoader +# ############################################################################# + +warnings.filterwarnings("ignore", category=UserWarning) +DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + + +class Net(nn.Module): + """Model (simple CNN adapted from 'PyTorch: A 60 Minute Blitz')""" + + def __init__(self) -> None: + super(Net, self).__init__() + self.conv1 = nn.Conv2d(3, 6, 5) + self.pool = nn.MaxPool2d(2, 2) + self.conv2 = nn.Conv2d(6, 16, 5) + self.fc1 = nn.Linear(16 * 5 * 5, 120) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, 10) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + x = self.pool(F.relu(self.conv1(x))) + x = self.pool(F.relu(self.conv2(x))) + x = x.view(-1, 16 * 5 * 5) + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + return self.fc3(x) + + +def train(net, trainloader, epochs): + """Train the model on the training set.""" + criterion = torch.nn.CrossEntropyLoss() + optimizer = torch.optim.SGD(net.parameters(), lr=0.001, momentum=0.9) + for _ in range(epochs): + step = 0 + + for batch in tqdm(trainloader, "Training"): + images = batch["img"] + labels = batch["label"] + optimizer.zero_grad() + criterion(net(images.to(DEVICE)), labels.to(DEVICE)).backward() + optimizer.step() + + if step >= max_step_count: + break + else: + step += 1 + + +def test(net, testloader): + """Validate the model on the test set.""" + criterion = torch.nn.CrossEntropyLoss() + correct, loss = 0, 0.0 + with torch.no_grad(): + step = 0 + + for batch in tqdm(testloader, "Testing"): + images = batch["img"].to(DEVICE) + labels = batch["label"].to(DEVICE) + outputs = net(images) + loss += criterion(outputs, labels).item() + correct += (torch.max(outputs.data, 1)[1] == labels).sum().item() + + if step >= max_step_count: + break + else: + step += 1 + accuracy = correct / len(testloader.dataset) + return loss, accuracy + + +def load_data(partition_id): + """Load partition CIFAR10 data.""" + fds = FederatedDataset(dataset="cifar10", partitioners={"train": 3}) + partition = fds.load_partition(partition_id) + # Divide data on each node: 80% train, 20% test + partition_train_test = partition.train_test_split(test_size=0.2) + pytorch_transforms = Compose( + [ToTensor(), Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))] + ) + + def apply_transforms(batch): + """Apply transforms to the partition from FederatedDataset.""" + batch["img"] = [pytorch_transforms(img) for img in batch["img"]] + return batch + + partition_train_test = partition_train_test.with_transform(apply_transforms) + trainloader = DataLoader(partition_train_test["train"], batch_size=32, shuffle=True) + testloader = DataLoader(partition_train_test["test"], batch_size=32) + return trainloader, testloader + + +# ############################################################################# +# 2. Federation of the pipeline with Flower +# ############################################################################# + +# Get partition id +parser = argparse.ArgumentParser(description="Flower") +parser.add_argument( + "--partition-id", + choices=[0, 1, 2], + default=0, + type=int, + help="Partition of the dataset divided into 3 iid partitions created artificially.", +) +partition_id = parser.parse_known_args()[0].partition_id + +# Load model and data (simple CNN, CIFAR-10) +net = Net().to(DEVICE) +trainloader, testloader = load_data(partition_id=partition_id) + +# Define Flower client +# The decoration does nothing if outsite the CoLExT environment +@MonitorFlwrClient +class FlowerClient(NumPyClient): + def get_parameters(self, config): + return [val.cpu().numpy() for _, val in net.state_dict().items()] + + def set_parameters(self, parameters): + params_dict = zip(net.state_dict().keys(), parameters) + state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict}) + net.load_state_dict(state_dict, strict=True) + + def fit(self, parameters, config): + self.set_parameters(parameters) + train(net, trainloader, epochs=1) + return self.get_parameters(config={}), len(trainloader.dataset), {} + + def evaluate(self, parameters, config): + self.set_parameters(parameters) + loss, accuracy = test(net, testloader) + return loss, len(testloader.dataset), {"accuracy": accuracy} + + +def client_fn(cid: str): + """Create and return an instance of Flower `Client`.""" + return FlowerClient().to_client() + + +# Flower ClientApp +app = ClientApp( + client_fn=client_fn, +) + +def get_args(): + parser = argparse.ArgumentParser( + prog='FL Client', + description='Starts the FL client') + + parser.add_argument('--flserver_address', type=str, default="127.0.0.1:8080", help="FL server address ip:port") + parser.add_argument('--max_step_count', default=3000, type=int, help="Configure number of steps for train and test") + args = parser.parse_args() + return args + +# Legacy mode +if __name__ == "__main__": + from flwr.client import start_client + + args = get_args() + + flserver_address = args.flserver_address + max_step_count = args.max_step_count + + start_client( + server_address=flserver_address, + client=FlowerClient().to_client(), + ) \ No newline at end of file diff --git a/examples/flwr_tutorial_1_8_network/colext_config.yaml b/examples/flwr_tutorial_1_8_network/colext_config.yaml new file mode 100644 index 0000000..d03f33b --- /dev/null +++ b/examples/flwr_tutorial_1_8_network/colext_config.yaml @@ -0,0 +1,88 @@ +project: network_exp # project name should not have spaces +job_name: "SOTA FL experiment" + +deployer: sbc +# python_version: "3.10" + +code: + # path: + # if `path` is ommited it defaults to the config file location + client: + # Assumes relative paths from path + command: >- + python3 ./client.py + --flserver_address=${COLEXT_SERVER_ADDRESS} + server: + command: >- + python3 ./server.py + --num_clients=${COLEXT_N_CLIENTS} + --num_rounds=30 + +network: + - tag: default + upstream: + bandwidth: 50Mbps + downstream: + bandwidth: 50Mbps + - tag: slow + upstream: + bandwidth: 1Mbps + latency: 1ms + downstream: + bandwidth: 1Mbps + latency: 1ms + - tag: veryslow + upstream: + bandwidth: 100Kbps + latency: 1ms + downstream: + bandwidth: 100Kbps + latency: 1ms + - tag: DynFast + dynamic: + - iterator: time + structure: [bandwidth, latency, delay-distribution] + commands: + - [10,set ,outgoing , -1, 1ms, normal] + - [20,set ,outgoing , 1000Mbps, 0.1ms, normal] + - [100,del, outgoing , 1000Mbps, 0.1ms, normal] + - [100, set ,outgoing , 1000Mbps, 0.1ms, normal] + - [120,set,outgoing , 10Mbps , 0.1ms, normal] + + - iterator: epoch # in epochs + commands: + - [10,set,outgoing , 1Mbps, 10ms] + - [20,set,outgoing , 1000Mbps, 0.1ms] + +clients: + # - dev_type: LattePandaDelta3 + #count: 4 + #add_args: "--max_step_count=50" + #network: veryslow + + #- dev_type: JetsonOrinNano + # count: 2 + # add_args: "--max_step_count=200" + # network: slow + + - dev_type: OrangePi5B + count: 2 + add_args: "--max_step_count=100" + network: + - default + - DynFast + + # - dev_type: OrangePi5B + #count: 2 + #add_args: "--max_step_count=50" + #network: default + #- { dev_type: JetsonAGXOrin, count: 1 } + #- { dev_type: JetsonOrinNano, count: 2 } + #- { dev_type: JetsonXavierNX, count: 2 } + #- { dev_type: JetsonNano, count: 6 } + + # Monitoring defaults +monitoring: + live_metrics: True # True/False + push_interval: 10 # in seconds + scraping_interval: 0.3 # in seconds diff --git a/examples/flwr_tutorial_1_8_network/network_scripts/mobiPerf.py b/examples/flwr_tutorial_1_8_network/network_scripts/mobiPerf.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/flwr_tutorial_1_8_network/requirements.txt b/examples/flwr_tutorial_1_8_network/requirements.txt new file mode 100644 index 0000000..21c805b --- /dev/null +++ b/examples/flwr_tutorial_1_8_network/requirements.txt @@ -0,0 +1,10 @@ +flwr==1.8.0 +torch +torchvision +torchaudio +flwr-datasets[vision]>=0.0.2, <1.0.0 +tqdm~=4.66.1 +# --extra-index-url https://download.pytorch.org/whl/cu116 +# fltb # Omitting this for now +# With this dependency we need to install fltb before we install the user requirements +# This forces the user code requirements to be reinstalled everytime we change the colext package code \ No newline at end of file diff --git a/examples/flwr_tutorial_1_8_network/run.sh b/examples/flwr_tutorial_1_8_network/run.sh new file mode 100644 index 0000000..1dc7bb8 --- /dev/null +++ b/examples/flwr_tutorial_1_8_network/run.sh @@ -0,0 +1,55 @@ +# Based in https://github.com/adap/flower/blob/main/examples/quickstart-pytorch/run.sh +#!/bin/bash + +# Enable CTRL+C to stop all background processes +trap "echo 'Cleaning up' && trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT + +set -e +# Run commands as if launched on the script folder +cd "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"/ + +# Add our python folder to the pythonpath so that python can find our package +# Only works because we already cded into the folder with this script +export PYTHONPATH="$(dirname `pwd`)/src:" + +log_folder="./logs" +rm -fr $log_folder +mkdir $log_folder + +# Hardcoded env vars to test +export COLEXT_ENV=1 +export COLEXT_SERVER_ADDRESS="0.0.0.0:8080" +export COLEXT_JOB_ID=1 +export COLEXT_CLIENT_DB_ID=25 +export COLEXT_DEVICE_TYPE=FLServer +# export COLEXT_DATA_HOME_FOLDER=/colext/datasets +# export COLEXT_PYTORCH_DATASETS=/colext/pytorch_datasets +export COLEXT_MONITORING_LIVE_METRICS=True +export COLEXT_MONITORING_PUSH_INTERVAL=10 +export COLEXT_MONITORING_SCRAPE_INTERVAL=1 +export COLEXT_LOG_LEVEL=DEBUG + +num_clients=1 +num_rounds=1 +echo "" +echo "Starting server" +python server.py -n $num_clients -r $num_rounds > ${log_folder}/server.out 2>&1 & +sleep 3 # Sleep for 3s to give the server enough time to start + +for i in `seq 0 $((num_clients - 1))`; do + echo "Starting client $i" + export COLEXT_CLIENT_ID=$i + python client.py --tiny_rounds > ${log_folder}/client_${i}.out 2>&1 & + + if [ "$i" -eq 0 ]; then + # Delay start of next client + # For some reason if 2 clients connect at the same time, the system won't start + sleep 4 + fi +done + +tail -f ${log_folder}/client_0.out +# tail -f ${log_folder}/server.out + +# Wait for all background processes to complete +wait \ No newline at end of file diff --git a/examples/flwr_tutorial_1_8_network/server.py b/examples/flwr_tutorial_1_8_network/server.py new file mode 100644 index 0000000..5fe9e4d --- /dev/null +++ b/examples/flwr_tutorial_1_8_network/server.py @@ -0,0 +1,69 @@ +# Based on: https://github.com/adap/flower/blob/dcffb484fb7d1e712f65d414fb31aa021f0a760e/examples/quickstart-pytorch/server.py +import argparse +from typing import List, Tuple + +from flwr.server import ServerApp, ServerConfig +from flwr.server.strategy import FedAvg +from flwr.common import Metrics +from colext import MonitorFlwrStrategy + +# Define metric aggregation function +def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics: + # Multiply accuracy of each client by number of examples used + accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics] + examples = [num_examples for num_examples, _ in metrics] + + # Aggregate and return custom metric (weighted average) + return {"accuracy": sum(accuracies) / sum(examples)} + + +@MonitorFlwrStrategy +class FlowerStrategy(FedAvg): + pass + +# Define config +config = ServerConfig(num_rounds=3) + + +# Flower ServerApp +# app = ServerApp( +# config=config, +# strategy=strategy, +# ) + +def get_args(): + parser = argparse.ArgumentParser( + prog='FL Server', + description='Starts the FL server') + + parser.add_argument('-n', '--num_clients', type=int, required=True, help="number of FL clients") + parser.add_argument('-r', '--num_rounds', type=int, required=True, help="number of FL rounds") + + args = parser.parse_args() + + assert args.num_clients > 0, "Number of clients should be larger than 0" + assert args.num_rounds > 0, "Number of rounds should be larger than 0" + + return args + +# Legacy mode +if __name__ == "__main__": + from flwr.server import start_server + + args = get_args() + + n_clients = args.num_clients + n_rounds = args.num_rounds + + strategy = FlowerStrategy( + min_fit_clients=n_clients, + min_evaluate_clients=n_clients, + min_available_clients=n_clients, + evaluate_metrics_aggregation_fn=weighted_average) + + + start_server( + server_address="0.0.0.0:8080", + config=ServerConfig(num_rounds=n_rounds), + strategy=strategy, + ) \ No newline at end of file diff --git a/exit b/exit new file mode 100644 index 0000000..23aacbb --- /dev/null +++ b/exit @@ -0,0 +1,645 @@ +commit ab32a68849ba5126b3283c15643bc6daeb8c01cd (HEAD) +Author: o0n1x <127759471+o0n1x@users.noreply.github.com> +Date: Tue Mar 25 16:53:01 2025 +0300 + + Delete Miniforge3-Linux-x86_64.sh + +commit c6969d0a89d2ae2c4e9e5c31848a17125212bf82 +Author: abdullah-alamoudi +Date: Tue Mar 25 16:52:02 2025 +0300 + + removed miniforge + +commit 0c07585c65b94bbd3839cf9d5f7fd82045451081 +Author: abdullah-alamoudi +Date: Tue Mar 25 16:00:30 2025 +0300 + + the modifications done for the static part + +commit b35ae20ac807a9626beb6d1ca8ed87197a4e42fb +Author: Amandio Faustino +Date: Wed Feb 26 18:09:56 2025 +0300 + + Update CoLExT schema image. Added white background. + +commit 6871c08879ddf49556185245306603467aba130f +Author: Amândio Faustino +Date: Mon Feb 17 15:59:31 2025 +0300 + + Update README.md + +commit df41b21d1bccd3d3800f686ee16111b1bac6395c +Author: Nandinski +Date: Wed Feb 5 09:42:53 2025 +0300 + + Readded CoLExT datasets env to local deployer + +commit 816630aebdd60edfdf6984608623d4893d7881c5 +Author: Nandinski +Date: Tue Feb 4 16:06:50 2025 +0300 + + Fix code path to be relative to config file location. + +commit eaca76566c5ab1381bae23ac03e14086dfa99523 +Author: Amândio Faustino +Date: Tue Feb 4 15:08:40 2025 +0300 + + Update README.md + +commit 2af2e52cbef0500980e9a890172c7785ee56262f +Author: Nandinski +Date: Tue Feb 4 15:07:04 2025 +0300 + + Improve specification of clients + Add option to add client specific args + +commit 27f14beefaeedb4398dbf223c5e2dcab3d84da57 +Author: Nandinski +Date: Sun Jan 12 17:16:56 2025 +0300 + + Renamed CoLExT datasets folder to COLEXT_DATASETS + +commit e321cb67d432d52ea66640a15277b293a56a1c0d +Author: Nandinski +Date: Sun Jan 12 10:36:30 2025 +0300 + + Specify default user for DB access when running colext_launch_job + +commit dc2599bdd73d3b7d3467615c861fcef2a8888ae2 +Author: Nandinski +Date: Thu Jan 9 16:39:43 2025 +0300 + + Merged entrypoint + args into command in config file + +commit c0a00a299f54f0366baf68b6cadfbb4cd7bda430 +Author: Amândio Faustino +Date: Wed Jan 8 10:44:16 2025 +0300 + + Update README.md + + Fixed typos + +commit 0e78e1a9ef27efad77b452e704ef0a28c427c565 +Author: Nandinski +Date: Sun Jan 5 09:41:38 2025 +0300 + + Remove figures from jupyter notebook + +commit 13b75760b1c0e7c2be89f25a3bd95f9b0bae70f3 +Author: Amândio Faustino +Date: Tue Dec 17 07:59:48 2024 +0300 + + Update README.md + +commit bfddec605ace94604e643dda12674ed17959fa5e +Author: Nandinski +Date: Mon Dec 16 16:39:59 2024 +0300 + + Added smart plug energy measurements with tapo package + +commit 707e50272bdf7b4c88f76416719eb2eb572a0698 +Author: Nandinski +Date: Mon Dec 16 16:39:19 2024 +0300 + + Fix for flwr 1.8 + +commit 99ee733a4b36b77fee5ae20286bfcc6ffb562863 +Author: Nandinski +Date: Mon Dec 16 16:37:04 2024 +0300 + + Fix plotting issues + +commit a71a6dc35368929f09ca9291b837ac04fd967499 +Author: Nandinski +Date: Wed Nov 13 17:18:49 2024 +0300 + + Remove ssh req when installing package in dockerfile + +commit 80b13cd7f2161d88c7685bef410da55b9db5910b +Author: Amândio Faustino +Date: Wed Nov 13 14:35:02 2024 +0300 + + Update README.md + +commit 8d7dd3d4068b02c8164b2257814115746e933605 +Author: Nandinski +Date: Wed Nov 13 14:04:37 2024 +0300 + + Added support for flwr 1.8 + +commit 48a31df0f30eba6a6ca683c502f20d2dd4c9dce3 +Author: Nandinski +Date: Wed Nov 13 12:29:10 2024 +0300 + + Fix db issues with local deployer + +commit 9cbe4377b878a6a00345f3c570733e5538cfc056 +Author: Nandinski +Date: Sun Nov 10 13:47:41 2024 +0300 + + Improve DB logic + Misc changes + +commit a907196fc1916ecdbef520b671494f1204daeb27 +Author: Nandinski +Date: Wed Oct 16 16:40:31 2024 +0300 + + Updated README + +commit b9fdf11291be93d8880994b427d3fd43ac8e7015 +Author: Amândio Faustino +Date: Thu Sep 26 16:41:35 2024 +0300 + + Update README.md + +commit 595fca472cdd53ab39e0bd67e8104e2fbb559325 +Author: Amândio Faustino +Date: Thu Sep 26 16:29:40 2024 +0300 + + Update README.md + +commit 178abcd01a0dfc5fc0d33d640a5da74e69b0ae06 +Author: Nandinski +Date: Tue Sep 3 19:04:54 2024 +0300 + + CoLexT is now public! + +commit 5bc3495094aa432a5e35cd44d3d299318d6a21a0 +Author: Nandinski +Date: Tue Sep 3 18:49:04 2024 +0300 + + improve database security + +commit 5f916fe277836684526f70c83d33a3a93c687601 +Author: Nandinski +Date: Sun Sep 1 15:27:38 2024 +0300 + + quick fix hostname change + +commit 31399e1634a822b9227e602b187b2a14cc790315 +Author: Amândio Faustino +Date: Sun Aug 4 14:21:04 2024 +0300 + + Update README.md + +commit 7c819e56c362fefa030ec29789f483264a0d0944 +Author: Amândio Faustino +Date: Wed Jul 31 16:37:22 2024 +0300 + + Update README.md + +commit cef04f9306001a29833f1131f97506576bcd8c6f +Author: Amândio Faustino +Date: Wed Jul 31 10:29:39 2024 +0300 + + Update README.md + +commit 4e015005d63899fddb1a55c63b3b33f6cc6b4df1 +Author: Nandinski +Date: Tue Jul 30 17:02:38 2024 +0300 + + Associate projects with jobs + updated readme + +commit 9222bf1c1c38c58a6d676ebfc6e93dec2bc66807 +Author: Nandinski +Date: Tue Jul 30 11:25:18 2024 +0300 + + Updated README + +commit 8a7550d719c73a7f153064699ea018b6444315ed +Author: Nandinski +Date: Tue Jul 30 09:54:51 2024 +0300 + + saving grafana dashboard snapshot + +commit 44dc8420b045dc58ef3e17736fa69d7263342fbd +Author: Nandinski +Date: Mon Jul 29 10:20:39 2024 +0300 + + Improved units for ps plots + +commit a5c97393fed22b1e84e95b530317c7e190d4d4f0 +Author: Amândio Faustino +Date: Mon Jul 29 12:12:39 2024 +0300 + + Update README.md + +commit 70f89d7b505b79a2e4ac27f1bf9620eb26aaf9a7 +Author: Nandinski +Date: Sun Jul 28 22:53:28 2024 +0300 + + Update README + small adjustments + +commit 3afab397ec1c03c22c9afab841a6a4cb1b85db31 +Author: Nandinski +Date: Sun Jul 28 22:51:59 2024 +0300 + + Get metrics now produces summary and plots + +commit f4ef6ffca8796268130aee23fd6846f9e88c5eb6 +Author: Nandinski +Date: Mon Jul 22 15:41:30 2024 +0300 + + Cleaned up plots + +commit 3de0617446d161979278964e1008050fa3e3551f +Merge: f4a8e47 77366d0 +Author: Amândio Faustino +Date: Mon Jul 22 15:34:32 2024 +0300 + + Merge pull request #3 from sands-lab/sbc + + Make SBC the main branch - merge with Android later + +commit 77366d0964c9823cf44aa2bb64749680fd0c4feb +Merge: e889fec f4a8e47 +Author: Amândio Faustino +Date: Mon Jul 22 15:34:15 2024 +0300 + + Merge branch 'main' into sbc + +commit e889fec0f147e8f59d9c3dfdfdc71bc14a00deb3 +Merge: 93db080 53d2e25 +Author: Amândio Faustino +Date: Mon Jul 22 15:31:37 2024 +0300 + + Merge pull request #2 from sands-lab/Nandinski-patch-1 + + Update README.md + +commit 53d2e252c64fd2fbbfc7b85b6337f3719e8c4a23 +Author: Amândio Faustino +Date: Mon Jul 22 15:30:38 2024 +0300 + + Update README.md + +commit 93db0807e4b49d68d4f16702b61b1f5750c0fe03 +Author: Nandinski +Date: Mon Jul 22 14:41:00 2024 +0300 + + Code folder reorganization + cleaned up README + +commit a5bb4bb7061b085b77e39e1e085aff7e7d79a093 +Author: Nandinski +Date: Fri Jun 21 14:26:14 2024 +0300 + + Bug fixes. CPU + memory metrics were not correct, measure self was always true + +commit 6e6908c570cbc7535f1e407b8112567aa94c4835 +Author: Nandinski +Date: Sat Jun 15 21:20:41 2024 +0300 + + Checkpoint plotting code + +commit 1f8c4e071ea404a26d3d2f6bd0f39a1a03552c63 +Author: Nandinski +Date: Thu Jun 13 10:40:03 2024 +0300 + + Added missing env COLEXT_N_CLIENTS to local_deployer + +commit 2d4c32272f9e37fc6a53d6eec6fc2bed9d152674 +Author: Nandinski +Date: Thu Jun 13 10:30:24 2024 +0300 + + hacky fix for fjord - passing accuracy as None string - now we try and cast acc to float before sending it to db + +commit b9b27c703f50999a30f0eccdedd2b85f9ba90bb4 +Author: Nandinski +Date: Wed Jun 12 22:14:03 2024 +0300 + + Add support for jetson nano with pytorch 1.13 + python 3.10 + +commit 4cf187d5a3a18a91cfedf9e77e36858078c875fa +Author: Nandinski +Date: Wed Jun 12 15:51:18 2024 +0300 + + save plotting code + +commit ec9e886d48e03809242b2af421bb627a2ac7564a +Author: Nandinski +Date: Wed Jun 12 15:50:43 2024 +0300 + + minor tweaks + +commit 8d00798d47c0815ee24f8e1b1fda841ca4bfeceb +Author: Nandinski +Date: Sat Jun 8 20:08:33 2024 +0300 + + Skip copying original requirements.txt after modification + +commit b91fd931fcf6ff29b107bc9cf6429021f279cf7f +Author: Nandinski +Date: Sat Jun 8 20:07:39 2024 +0300 + + DB now keeps track of srv_accuracy + distributed acc + +commit 99965785baabdaa401caa0c25acbb0dae0a7cb95 +Author: Nandinski +Date: Sat Jun 8 16:16:26 2024 +0300 + + Fix setuptools + minor tweaks + +commit 9f21bc6cc8950161cc5029f59ad6c866c6ae3e9c +Author: Nandinski +Date: Thu Jun 6 12:21:19 2024 +0300 + + Automatically collect accuracy data from flwr decorator + +commit 6661bd78f7b2c2ba7b05789c78fd4f96db847922 +Author: Nandinski +Date: Wed Jun 5 12:28:36 2024 +0300 + + local_deploy timing fix + +commit f1df65658978cd4fe47bf7e68c40b338de50d527 +Author: Nandinski +Date: Wed Jun 5 12:28:29 2024 +0300 + + rm unnecessary docker ignores + +commit fe1e70a3175c7daadbb304a149d68c527537d4e4 +Author: Nandinski +Date: Wed Jun 5 10:49:47 2024 +0300 + + Allow pods to restart once - temp fix for client pods crashing at the beginning due to grpc + +commit 872cf0429a8449305db78a290d922778be5e90d1 +Author: Nandinski +Date: Wed Jun 5 10:34:27 2024 +0300 + + Terminate exp if clients keep running too long after server finished + +commit ebe93440f5f4bcca061789eb24ce884c6d78d178 +Author: Nandinski +Date: Mon Jun 3 18:07:34 2024 +0300 + + Big repo restructure. Added Deployers. Add local_py deployer + +commit e575d827f5ea098e6e6151dc108a981bd4208035 +Author: Nandinski +Date: Sun Jun 2 13:08:21 2024 +0300 + + clean up ignore files + +commit 7baa9a9bfa59e9653d4f520d464004ab262c28cd +Author: Nandinski +Date: Sun Jun 2 13:07:36 2024 +0300 + + Add support for python3.10 for jetson nano + +commit f14d8cd8bd807ba1dff8751f5c0e83efd529011b +Author: Nandinski +Date: Sun Jun 2 13:07:18 2024 +0300 + + Added plotting code + +commit 1ce81cdc45474e3ea8e08b7e424201f2d26a1632 +Author: Nandinski +Date: Thu May 30 16:21:15 2024 +0300 + + speed up pod termination + +commit f999686cf93646f560b5b5251339735d2e77edff +Author: Nandinski +Date: Thu May 30 16:11:58 2024 +0300 + + fix evaluate round timings + +commit 1f2d53774acde8d9f6c80321219853225c869f50 +Author: Nandinski +Date: Wed May 29 17:34:11 2024 +0300 + + Add plotting code + +commit c3bb47171d69d42de46a381f3369029b7267c54f +Author: Nandinski +Date: Wed May 29 17:32:57 2024 +0300 + + Added support for multiple examples + +commit a3266266172cceabb3d3f83320c60ec3847f302c +Author: Nandinski +Date: Wed May 29 17:29:34 2024 +0300 + + hw scraper is now properly seperated from the main app + +commit c9d2fbf1c4624233e29b8c8c6bcfcb6a0c48922a +Author: Nandinski +Date: Wed May 22 16:43:02 2024 +0300 + + cleanup exp_dispatcher + +commit ebdd09778284b08f07b95e1e8b9e707731ca8adb +Author: Nandinski +Date: Wed May 22 16:42:04 2024 +0300 + + Updated README + +commit c0cfe3b5ad5277ba96512c90f463ea568af0dcf2 +Author: Nandinski +Date: Mon May 20 13:35:07 2024 +0300 + + skip eval round when there's no client_instructions + +commit 6d423735675a40615fa1f0c31373d08a63ea41ce +Author: Nandinski +Date: Mon May 20 13:34:10 2024 +0300 + + client will now fail if it does not receive CIR_ID from server + +commit 617b9a46c0168436a5dbe36f5a308a13b10cb258 +Author: Nandinski +Date: Sun May 19 17:20:20 2024 +0300 + + Move location of ARG COLEXT_COMMIT_HASH down to reuse caching + +commit aca3b9eb74a77d84fcc7e2aaa11d8b0534934edb +Author: Nandinski +Date: Sun May 19 17:14:33 2024 +0300 + + Give server more time to boot + +commit f1e763de16d6111173d2665a40b680e50e4cf307 +Author: Nandinski +Date: Sun May 19 13:19:29 2024 +0300 + + Support for python3.8 + pass colext commit to dockerfile + +commit b031255306152f1e715de23fe8a709d8330a9540 +Author: Nandinski +Date: Sun May 19 10:34:58 2024 +0300 + + colext_config code path defaults to path to config + +commit 04cb02d24e4d5bb583b9230bc0714f963a73af32 +Merge: 2ec7ee0 5d8c902 +Author: Nandinski +Date: Tue May 14 16:08:38 2024 +0300 + + Merge branch 'sbc' of github.com:sands-lab/colext into sbc + +commit 2ec7ee02835a4f5fd3276d31083f3ef335e71379 +Author: Nandinski +Date: Tue May 14 14:15:46 2024 +0300 + + Empty spaces + add verify feasibility + +commit 6648b22e099670b005b9e55e918f619eb50a176c +Author: Nandinski +Date: Tue May 14 14:01:45 2024 +0300 + + Now supporting 28 SBCs + +commit 5d8c9024815b680e68445374c41fab6668011e59 +Author: Amândio Faustino +Date: Wed May 8 10:39:16 2024 +0300 + + Update README.md + +commit 8e19a5fbe6ae783dafa7e8c5a6442308d8dbca8d +Author: Nandinski +Date: Tue May 7 19:11:31 2024 +0300 + + Only create required containers for req devices (not all) + +commit 61b6f7c088e0e8064b9343091d54fe8613dec04a +Author: Nandinski +Date: Mon May 6 12:33:41 2024 +0300 + + Update client model. Using a larger model that can be easily scaled. + +commit a8e96b32da6bf9d3be4c5c31dd4b0e1c481b27a3 +Author: Nandinski +Date: Mon May 6 12:32:10 2024 +0300 + + Update docker bake file for simpler setup + +commit c181f0686a4c89d36d7f5990589da4b334478b31 +Author: Nandinski +Date: Mon May 6 12:31:00 2024 +0300 + + Update readme + +commit 5c130fb3cb72278aa96d98fc888299b0886de465 +Author: Nandinski +Date: Sun May 5 14:39:32 2024 +0300 + + Collect energy measurements from LattePanda + +commit 0a1a4020190413cc9099a32234cde6f6e05647f7 +Author: Nandinski +Date: Wed Apr 17 22:43:57 2024 +0300 + + Add suport for jetson nano! + SBC deployer + +commit 1a0dde38180d315084b9c9b76af31d8f5006a3a4 +Author: Nandinski +Date: Sun Apr 14 17:34:50 2024 +0300 + + revert back to rss. vms is not as accurate when cuda is used (overallocations) + +commit 2da79efd3d9b6f7e2c5f0b21e34acc960e2a7ddb +Author: Nandinski +Date: Sun Apr 14 14:54:11 2024 +0300 + + Fix depency versions + +commit a0f0e8ca6f62f1f09d461a033fecf8dc99c06b06 +Author: Nandinski +Date: Sun Apr 14 14:34:16 2024 +0300 + + Split generic image into cpu and gpu version + +commit 7eda15e2c12914370f8477b6f6bce72816c7e2a2 +Author: Nandinski +Date: Sun Apr 14 13:11:39 2024 +0300 + + Forward ssh agent to docker build for requirements installation + +commit 01b263a89abc47d9fc603a78586233b00ad83f94 +Author: Nandinski +Date: Sun Apr 14 13:11:10 2024 +0300 + + Recording VMS instead of RMS + +commit 71919a188c317157d5b0c25e3896c774faff0de2 +Author: Nandinski +Date: Fri Apr 12 16:03:44 2024 +0300 + + Changed pytorch to be base image for generic devices + +commit ff701c6d3fa9e53768d4fd9bafa340f3898cc63b +Author: Nandinski +Date: Mon Apr 8 18:45:47 2024 +0300 + + fix CIR_ID mapping + +commit 8042f12860ea5b43703cb156f0de44c10424dd08 +Author: Nandinski +Date: Mon Apr 8 13:28:45 2024 +0300 + + Fix parallel metric push with DB pool + +commit dc2b24752c526f9bd8c29b83b6bb12b8fd9c07a5 +Author: Nandinski +Date: Thu Apr 4 17:50:49 2024 +0300 + + Added client round timings to DB + colext_get_metrics + +commit 898713788630a9abaecc310562cef1257988e8eb +Author: Nandinski +Date: Wed Apr 3 13:21:30 2024 +0300 + + Added network monitoring + +commit 5ce0a149a757fc6efa858f95a27e1ceac6bea66b +Author: Nandinski +Date: Tue Apr 2 15:58:58 2024 +0300 + + colext_get_metrics also returns client_info + +commit 541b52129ff7ab0c7ef4b242417e8e049f7b0d8d +Author: Nandinski +Date: Tue Apr 2 13:12:54 2024 +0300 + + colext_get_metrics also retrieves round timestamps + +commit 11fac3d87a99dc52aadf10931a8bacc32001e13b +Author: Nandinski +Date: Wed Mar 20 15:04:11 2024 +0300 + + Renames + record job finish times + +commit f4a8e472f663c781d709f0fabe00d2a425e919a3 +Author: Amândio Faustino +Date: Wed Mar 20 15:10:22 2024 +0300 + + Update README.md + +commit 24bfa0cb5777e9f3e29451e7dfcc59a797ca1efa +Author: Nandinski +Date: Mon Mar 11 10:43:12 2024 +0300 + + Renamed project to colext + metric retrieval command + +commit b89a8a47ff2de7fb41309e9f191098f8e4ffc5c5 +Author: Nandinski +Date: Thu Feb 8 15:20:37 2024 +0300 + + Initial python package with fltb_launch_job command + +commit 9ff9e66e4fa4f4f4f2a1a92676d25e73576551f5 +Author: Nandinski +Date: Tue Jan 16 12:36:02 2024 +0300 + + Base ansible automation + +commit 0bb36e27fdf741320c8c300b82ad65ccbfe4287a +Author: Amândio Faustino +Date: Sun Oct 29 12:44:08 2023 +0300 + + SBC initial version + +commit 293a3ef13360b8a3ecbf7232dc6ed356b06a17de +Author: Amândio Faustino +Date: Sun Oct 29 12:44:08 2023 +0300 + + Initial commit diff --git a/src/colext/exp_deployers/sbc_deployer/.sbc_deployer.py.swp b/src/colext/exp_deployers/sbc_deployer/.sbc_deployer.py.swp new file mode 100644 index 0000000..500c82d Binary files /dev/null and b/src/colext/exp_deployers/sbc_deployer/.sbc_deployer.py.swp differ diff --git a/src/colext/exp_deployers/sbc_deployer/Dockerfiles/pip/colext_general.Dockerfile b/src/colext/exp_deployers/sbc_deployer/Dockerfiles/pip/colext_general.Dockerfile index 6c1b873..e28d658 100644 --- a/src/colext/exp_deployers/sbc_deployer/Dockerfiles/pip/colext_general.Dockerfile +++ b/src/colext/exp_deployers/sbc_deployer/Dockerfiles/pip/colext_general.Dockerfile @@ -15,6 +15,10 @@ RUN apt update && apt install -y git # && install -m 0600 -d ~/.ssh \ # && ssh-keyscan -p 443 ssh.github.com >> ~/.ssh/known_hosts +#Network Setup +RUN apt install -y iproute2 && mkdir -p /network +RUN python3 -m pip install --no-cache-dir tcconfig + ARG COLEXT_COMMIT_HASH # Install the colext package RUN python3 -m pip install \ @@ -33,4 +37,4 @@ RUN python3 -m pip install --no-cache-dir -r ./user_code/requirements.txt # https://github.com/aws-neuron/aws-neuron-sdk/issues/893 RUN python3 -m pip install --no-cache-dir setuptools==69.5.1 COPY --exclude=colext_config.yaml,requirements.txt . ./user_code -WORKDIR /fl_testbed/user_code \ No newline at end of file +WORKDIR /fl_testbed/user_code diff --git a/src/colext/exp_deployers/sbc_deployer/Dockerfiles/pip/colext_test.Dockerfile b/src/colext/exp_deployers/sbc_deployer/Dockerfiles/pip/colext_test.Dockerfile index 9dbb7b2..a178783 100644 --- a/src/colext/exp_deployers/sbc_deployer/Dockerfiles/pip/colext_test.Dockerfile +++ b/src/colext/exp_deployers/sbc_deployer/Dockerfiles/pip/colext_test.Dockerfile @@ -11,6 +11,10 @@ WORKDIR /fl_testbed RUN apt update && apt install -y git gcc RUN python3 -m pip install --no-cache-dir --upgrade pip==24.0 setuptools==69.5.1 +#Network Setup +RUN apt install -y iproute2 +RUN python3 -m pip install --no-cache-dir tcconfig pika + # DOCKER file assumes the context is set to root of the fltb project COPY $TEST_REL_DIR/requirements.txt test_code/requirements.txt COPY ./requirements.txt . @@ -31,4 +35,5 @@ COPY $TEST_REL_DIR/ test_code/ COPY --exclude=colext_config.yaml,requirements.txt . . RUN python3 -m pip install .${INSTALL_OPTIONS} -WORKDIR /fl_testbed/test_code \ No newline at end of file + +WORKDIR /fl_testbed/test_code diff --git a/src/colext/exp_deployers/sbc_deployer/kubernetes_utils.py b/src/colext/exp_deployers/sbc_deployer/kubernetes_utils.py index 6307372..4d2a9f5 100644 --- a/src/colext/exp_deployers/sbc_deployer/kubernetes_utils.py +++ b/src/colext/exp_deployers/sbc_deployer/kubernetes_utils.py @@ -3,8 +3,10 @@ from typing import Tuple from colext.common.logger import log from enum import Enum +import os FL_NAMESPACE = "default" +FL_NETWORK_NAMESPACE = "default" FL_SERVICE_PREFIX = "fl-server-svc" class KubernetesUtils: @@ -31,6 +33,48 @@ def create_from_yaml(self, yaml_file): def create_from_dict(self, dict_obj): kubernetes.utils.create_from_dict(self.k8s_api, dict_obj) + + #TODO: NOT USED ATM delete or reuse. potentially used to simplify the next configmap creation function + def create_config_map(self, name, data_file): + with open(data_file, 'r') as f: + data = f.read() + + config_map = kubernetes.client.V1ConfigMap( + api_version="v1", + kind="ConfigMap", + metadata=kubernetes.client.V1ObjectMeta(name=name), + data={f"tcconfig_rules.txt": data} + ) + self.k8s_core_v1.create_namespaced_config_map(FL_NETWORK_NAMESPACE, config_map) + + def create_config_map_from_dict(self, name, folder_path): + data = {} + + for file_name in os.listdir(folder_path): + file_path = os.path.join(folder_path, file_name) + + if os.path.isfile(file_path): + with open(file_path, "r") as f: + data[file_name] = f.read() + config_map = kubernetes.client.V1ConfigMap( + api_version="v1", + kind="ConfigMap", + metadata=kubernetes.client.V1ObjectMeta(name=name), + data=data + ) + self.k8s_core_v1.create_namespaced_config_map(FL_NETWORK_NAMESPACE, config_map) + + + + def delete_config_map(self,name): + self.k8s_core_v1.delete_namespaced_config_map(name, FL_NETWORK_NAMESPACE) + + def delete_all_config_maps(self): + configmaps = self.k8s_core_v1.list_namespaced_config_map(FL_NETWORK_NAMESPACE).items + + for configmap in configmaps: + log.info(f"Deleteing configmap {configmap.metadata.name}") + self.k8s_core_v1.delete_namespaced_config_map(configmap.metadata.name, FL_NETWORK_NAMESPACE) def delete_experiment_pods(self): pods = self.k8s_core_v1.list_namespaced_pod(FL_NAMESPACE).items diff --git a/src/colext/exp_deployers/sbc_deployer/microk8s/rabbitmq-broker/rabbitmq-broker-configmap.yaml b/src/colext/exp_deployers/sbc_deployer/microk8s/rabbitmq-broker/rabbitmq-broker-configmap.yaml new file mode 100644 index 0000000..f25d1ea --- /dev/null +++ b/src/colext/exp_deployers/sbc_deployer/microk8s/rabbitmq-broker/rabbitmq-broker-configmap.yaml @@ -0,0 +1,16 @@ +# ConfigMap for RabbitMQ Broker +apiVersion: v1 +kind: ConfigMap +metadata: + name: rabbitmq-config + namespace: rabbitmq-system + + labels: + app: rabbitmq +data: + rabbitmq.conf: | + listeners.tcp.default = 6942 + management.tcp.port = 15672 + loopback_users = none + default_user = guest + default_pass = guest diff --git a/src/colext/exp_deployers/sbc_deployer/microk8s/rabbitmq-broker/rabbitmq-broker-deploy.yaml b/src/colext/exp_deployers/sbc_deployer/microk8s/rabbitmq-broker/rabbitmq-broker-deploy.yaml new file mode 100644 index 0000000..542a550 --- /dev/null +++ b/src/colext/exp_deployers/sbc_deployer/microk8s/rabbitmq-broker/rabbitmq-broker-deploy.yaml @@ -0,0 +1,47 @@ +# RabbitMQ Deployment +apiVersion: apps/v1 +kind: Deployment +metadata: + name: rabbitmq + namespace: rabbitmq-system + + labels: + app: rabbitmq +spec: + replicas: 1 + selector: + matchLabels: + app: rabbitmq + template: + metadata: + labels: + app: rabbitmq + spec: + containers: + - name: rabbitmq + image: rabbitmq:3.9-management + ports: + - containerPort: 6942 + name: amqp + - containerPort: 15672 + name: management + volumeMounts: + - name: config-volume + mountPath: /etc/rabbitmq/rabbitmq.conf + subPath: rabbitmq.conf + readinessProbe: + tcpSocket: + port: 6942 + initialDelaySeconds: 10 + periodSeconds: 10 + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "512Mi" + cpu: "200m" + volumes: + - name: config-volume + configMap: + name: rabbitmq-config diff --git a/src/colext/exp_deployers/sbc_deployer/microk8s/rabbitmq-broker/rabbitmq-broker-mngmt-service.yaml b/src/colext/exp_deployers/sbc_deployer/microk8s/rabbitmq-broker/rabbitmq-broker-mngmt-service.yaml new file mode 100644 index 0000000..12297d7 --- /dev/null +++ b/src/colext/exp_deployers/sbc_deployer/microk8s/rabbitmq-broker/rabbitmq-broker-mngmt-service.yaml @@ -0,0 +1,16 @@ +apiVersion: v1 +kind: Service +metadata: + name: rabbitmq-management + namespace: rabbitmq-system + labels: + app: rabbitmq +spec: + type: NodePort + selector: + app: rabbitmq + ports: + - port: 15672 + targetPort: 15672 + nodePort: 31672 + name: management \ No newline at end of file diff --git a/src/colext/exp_deployers/sbc_deployer/microk8s/rabbitmq-broker/rabbitmq-broker-service.yaml b/src/colext/exp_deployers/sbc_deployer/microk8s/rabbitmq-broker/rabbitmq-broker-service.yaml new file mode 100644 index 0000000..3343a96 --- /dev/null +++ b/src/colext/exp_deployers/sbc_deployer/microk8s/rabbitmq-broker/rabbitmq-broker-service.yaml @@ -0,0 +1,19 @@ +# Service to expose RabbitMQ AMQP port +apiVersion: v1 +kind: Service +metadata: + name: rabbitmq-broker + namespace: rabbitmq-system + + labels: + app: rabbitmq +spec: + selector: + app: rabbitmq + ports: + - port: 6942 + targetPort: 6942 + name: amqp + - port: 15672 + targetPort: 15672 + name: management \ No newline at end of file diff --git a/src/colext/exp_deployers/sbc_deployer/microk8s/templates/client_pod.yaml.jinja b/src/colext/exp_deployers/sbc_deployer/microk8s/templates/client_pod.yaml.jinja index a3a33b8..794f885 100644 --- a/src/colext/exp_deployers/sbc_deployer/microk8s/templates/client_pod.yaml.jinja +++ b/src/colext/exp_deployers/sbc_deployer/microk8s/templates/client_pod.yaml.jinja @@ -76,6 +76,20 @@ spec: - mountPath: /run/jtop.sock name: jtop-socket {% endif %} + {% if network_volumeMount %} + - name: "{{network_volumeMount.name}}" + mountPath: "{{network_volumeMount.mountPath}}" + subPath: "{{network_volumeMount.subPath}}" + - name: lib-modules + mountPath: /lib/modules + - name: dev + mountPath: /dev + securityContext: + capabilities: + add: + - NET_ADMIN + {% endif %} + volumes: - name: pgcreds secret: @@ -95,10 +109,23 @@ spec: path: /run/jtop.sock type: Socket {% endif %} + {% if network_volume %} + - name: "{{network_volume.name}}" + configMap: + name: "{{network_volume.configMap.name}}" + - name: lib-modules + hostPath: + path: /lib/modules + type: Directory + - name: dev + hostPath: + path: /dev + type: Directory + {% endif %} {# restartPolicy: Never #} restartPolicy: OnFailure backoffLimit: 1 imagePullSecrets: - name: regcred nodeSelector: - kubernetes.io/hostname: {{ device_hostname }} \ No newline at end of file + kubernetes.io/hostname: {{ device_hostname }} diff --git a/src/colext/exp_deployers/sbc_deployer/sbc_deployer.py b/src/colext/exp_deployers/sbc_deployer/sbc_deployer.py index 8b6f5bc..e8a6678 100644 --- a/src/colext/exp_deployers/sbc_deployer/sbc_deployer.py +++ b/src/colext/exp_deployers/sbc_deployer/sbc_deployer.py @@ -32,6 +32,9 @@ def __init__(self, config, test_env=False) -> None: self.server_template = jinja_env.get_template("server.yaml.jinja") self.server_service_path = os.path.join(dirname, 'microk8s/server_service.yaml') + #network pubsub broker paths + + # Get Docker bake file dir parent_dir = Path(__file__).parent.resolve() self.hcl_file_dir = os.path.join(parent_dir, "Dockerfiles", "pip") @@ -87,7 +90,9 @@ def containerize_app(self, config_dict, test_env): push=True) def clear_prev_experiment(self) -> None: + log.info("Clearing previous experiment") + self.k_utils.delete_all_config_maps() self.k_utils.delete_fl_service() self.k_utils.delete_experiment_pods() @@ -125,20 +130,151 @@ def prepare_client(client_prototype, client_id): pod_config["monitoring_push_interval"] = self.config["monitoring"]["push_interval"] pod_config["monitoring_scrape_interval"] = self.config["monitoring"]["scraping_interval"] pod_config["monitoring_measure_self"] = self.config["monitoring"]["measure_self"] + + # add volume for the configmap + pod_config["network_volumeMount"] = {"name": f"group-{client_prototype['group_id']}-network-config", + "mountPath": "/fl_testbed/test_code/network"} + pod_config["network_volume"] = {"name": f"group-{client_prototype['group_id']}-network-config", + "configMap": {"name": f"group-{client_prototype['group_id']}-network-config"}} # Add IP of smartplug in case it exists pod_config["SP_IP_ADDRESS"] = SMART_PLUGS_HOST_SP_IP_MAP.get(dev_hostname, None) return pod_config + + + # this function is called for every client group to generate the network configmap for it + # clientgroup is the name of the client group aka client_prototype + def generate_network_configmap_folder(clientgroup,group_id): + if os.path.exists(f"networktemp/group-{group_id}"): + for file in os.listdir(f"networktemp/group-{group_id}"): + os.remove(os.path.join(f"networktemp/group-{group_id}", file)) + else: + os.makedirs(f"networktemp/group-{group_id}") + + group_path = f"networktemp/group-{group_id}" + + log.info(f"Generating network configmap for {group_id}") + log.debug(f"group dict: {clientgroup}") + + network_tags = {} + if 'network' in clientgroup.keys(): + network_tags = clientgroup['network'] + + #save the network rules for each client group in a folder + + with open(f"{group_path}/networkrules.txt", 'w') as f: + f.write("") + if isinstance(network_tags,str): # if there is only one network make it a list + network_tags = [network_tags] + #check each network and make sure its a string to verify + for i in range(len(network_tags)): + if not isinstance(network_tags[i],str): + log.error(f"network tags should be a string or a list of strings") + sys.exit(1) + #TODO the function can be divided into 2 functions + #variable to store all the network static rules for each client group + static_network_rules = {"upstream": [], "downstream": []} + for network in network_tags: + log.debug(f"network tag: {network_tags}") + log.info(f"group {group_id} is in {network}") + #save dynamic network configs first + + log.debug(f"network config: {self.config['networks'][network]}") + log.debug(f" network keys: {self.config['networks'][network].keys()}") + + if "dynamic" in self.config["networks"][network].keys(): + dynamic = self.config["networks"][network]['dynamic'] + for iter in dynamic.keys(): + + log.debug(f"saving scripts of : {network} {dynamic[iter]}") + # check if script is provided then save script else save the dict as json to be used + if dynamic[iter]["script"] != False : + log.debug(f"saving script: {dynamic[iter]['script']} into {group_path}/{iter}_{network}_script.py") + script = "" + with open(dynamic[iter]["script"], 'r') as s: + script = s.read() + with open(f"{group_path}/{iter}_{network}_script.py", "w") as f: + f.write(script) + log.debug(f"script saved") + else: + json_str = json.dumps(dynamic[iter], indent=4) + log.debug(f"saving json: {json_str} into {group_path}/{iter}_{network}_dynrules.json") + with open(f"{group_path}/{iter}_{network}_dynrules.json", "w") as f: + f.write(json_str) + log.debug(f"json saved") + # if dynamic doesnt exist then save static of that network + else: + #save static rules + static_network_rules["upstream"] = self.config["networks"][network]["commands"]["upstream"] + static_network_rules["downstream"] = self.config["networks"][network]["commands"]["downstream"] + + # merge the static rules into one rule for each client group + upstream, downstream = merge_static_network_rules(static_network_rules) + # save the merged rules to the configmap + with open(f"{group_path}/networkrules.txt", 'a') as f: + f.write(f"tcset eth0 --direction outgoing {upstream} \n") + f.write(f"tcset eth0 --direction incoming {downstream} --change \n") + + def merge_static_network_rules(network): + # given a network tag output 2 static rules for upstream and downstream + #TODO this is not comptible with input ips and ports yet + + upstream_list = network["upstream"] + downstream_list = network["downstream"] + log.debug(upstream_list) + log.debug(downstream_list) + + + merged_upstream_dict = {} + merged_downstream_dict = {} + + for rules in upstream_list: + rule, value = rules.split() + merged_upstream_dict[rule] = value + for rules in downstream_list: + rule, value = rules.split() + merged_downstream_dict[rule] = value + + # merge the rules into one rule + upstream = " ".join([f"--{key} {value}" for key, value in merged_upstream_dict.items()]) + downstream = " ".join([f"--{key} {value}" for key, value in merged_downstream_dict.items()]) + + return upstream, downstream + + + + + pod_configs = [] + pod_configs_volumes = [] client_id = 0 + group_id = 0 # needed to map the netowkr configmap to the client groups + + # check if networktemp file exists + #delete previous networktemp files if it does exist + if os.path.exists("networktemp"): + for file in os.listdir("networktemp"): + for file2 in os.listdir(os.path.join("networktemp",file)): + os.remove(os.path.join("networktemp", file,file2)) + else: + os.makedirs("networktemp") + for client_prototype in self.config["clients"]: + + client_prototype["group_id"] = group_id + generate_network_configmap_folder(client_prototype,group_id) + + group_id += 1 for _ in range(client_prototype["count"]): pod_configs.append(prepare_client(client_prototype, client_id)) client_id += 1 + + log.debug(f"Generated {len(pod_configs)} pod configs") + log.debug(f"config: {pod_configs}") return pod_configs IMAGE_BY_DEV_TYPE = { @@ -170,7 +306,7 @@ def deploy_setup(self, job_id: int) -> None: Launch experiment in kubernetes cluster Prepares and deployes client pods and fl service """ - + self.clear_prev_experiment() log.info(f"Deploying FL server and service") @@ -178,14 +314,26 @@ def deploy_setup(self, job_id: int) -> None: server_pod_dict = yaml.safe_load(self.server_template.render(server_pod_config)) self.k_utils.create_from_dict(server_pod_dict) self.k_utils.create_from_yaml(self.server_service_path) - + log.debug(f"Server pod config: {server_pod_config}") + log.debug(f"Server pod dict: {server_pod_dict}") log.debug(f"Deploying Client pods") + + + client_pod_configs = self.prepare_clients_for_launch(job_id) + + + + + # create config map for each client group + for client_prototype in self.config["clients"]: + self.k_utils.create_config_map_from_dict(f"group-{client_prototype['group_id']}-network-config", f"networktemp/group-{client_prototype['group_id']}") + for pod_config in client_pod_configs: # log.debug(f"Deploying Client pod = {pod_config}") client_pod_dict = yaml.safe_load(self.client_template.render(pod_config)) self.k_utils.create_from_dict(client_pod_dict) - + log.info(f"Experiment deployed. It can be canceled with 'mk delete pods --all'") def get_available_devices_by_type(self, dev_types): diff --git a/src/colext/metric_collection/README-network.md b/src/colext/metric_collection/README-network.md new file mode 100644 index 0000000..25a300a --- /dev/null +++ b/src/colext/metric_collection/README-network.md @@ -0,0 +1,254 @@ +# Documentation on how the Network control system works + + + +the guide is divided into 2 main parts: +- how to setup and use +- how it works +# Setup and usage guide +in order to for the system to work some setup is needed to work +## Setup + +everything works in joint with CoLExT directly without setup with the exception of the rabbitMQ broker container. + +to setup the rabbitMQ broker we will use the provided files found in in this folder +``` +src\colext\exp_deployers\sbc_deployer\microk8s\rabbitmq-broker +``` + +run the following commands to setup the rabbitMQ broker in the server: + +```bash +mk apply -f rabbitmq-broker-configmap.yaml +mk apply -f rabbitmq-broker-deploy.yaml +mk apply -f rabbitmq-broker-service.yaml +``` +optionally, to graphiclly control and monitor the broker deploy the managmenet service : +```bash +mk apply -f rabbitmq-broker-mngmt-service.yaml +``` +the managment service can be accessed as a website in port 31672 + + + + +## Guide on config syntax + +we first define the network configuration and then apply these configuration to the target clients. + +static network defining: +- +```yaml +network: + - tag: network_name # just a reference name + upstream: #rules for packets leaving the client + rule_name: value + downstream: #rule for packets incoming to the client + rule_name: value +``` +the tag is the name of the network configuration defined below. this tag is used to assign it to clients later. +when defining a static network, 2 directions can be defined: upstream and downstream. then specific network rules and their values can specified for each network direction. + +Note: you can define either or both network directions only + +this is the list of network rules that can be defined: +- Speed: + - rate: bandwidth rate in bits per second - (G/M/K/)bps + +- latency: + - delay: round trip network delay, valid range - (0ms-60min) decimal is allowed + - delay-distribution: valid inputs - normal,pareto,paretonormal + +- packets: + - loss: round trip packet loss rate (%) + - duplicate: round trip packet duplication rate (%) + - corrupt: packet corruption rate (%) + - reordering: packet reordering rate (%) + - limit: limits the number of packets the qdisc may hold when doing delay + +Note: network rules follows tcconfig formatting - (https://tcconfig.readthedocs.io/en/latest/pages/usage/tcset/index.html) + +Dynamic network defining: +- +dynamic network with defined ruleset: +```yaml +network: + - tag: network_name: + dynamic: + - iterator: iterator # can be either time or epoch + structure: [banwdith,latency] # a list of network rules to define command structure below + commands: + #- [iter value , set/del , direction , bandwidth value , latency value ] + - [5, set, outgoing, 100Mbps, 100ms] + - [10, set, outgoing, -1, 1ms] + - [20, set, outgoing, 1000Mbps, 0.1s] + - [100, del, outgoing] +``` +when defining dynamic networks, include the dynamic key isntead of upstream/downstream. + +in dynamic network, 3 things are defined: +- iterator: type of iterator to use (example: time , epoch) +- structure: define a list of networks rules structure the commands below will follow. by default the structure will be [bandwidth,latency] if structure is not defined. +- commands/script: define a set of rules at set iterator time following the structure defined above. + +the commands follow this structure [iterator time, command type, direction , rule values...] +- iterator time: define the time the command is exected depending in the iterator value +- command type: can be either set or del. setting and/or updating new rules and deleting rules in a given direction respectively +- direciton: either incoming or outgoing +- rule values: define the valeus of the rule structure defined. + + +Applying network to nodes: +- + +```yaml +clients: + - dev_type: JetsonOrinNano + add_args: "--max_step_count=200" + network: slow + + - dev_type: OrangePi5B + add_args: "--max_step_count=100" + network: + - default + - DynFast +``` + +when defining the clients in the client section in the CoLExT config file, you can assign the defined networks for each client. + +by adding a network parameter and giving either a single network tag or a list of network tags as shown above + +Note: make sure that the network tags is synactially correct as defined as it is case-senstive. + + + +# How the system works + +The system has 3 main sections. it first reads and validates the colext config file in experiment_dispartcher.py. Then, it translates and deploy those network rules as files to the nodes in sbc_deployer folder. Lastly, the network rules are excuted at the specified time in the network manager. + + +## experiment_dispatcher.py + +- static + - read static + - validate static +- dynamic + - validating structure + - validating commands + + +before processing the network, it checked if its either static or dynamic in the read_network() function before calling the suitable function. + +**static:** +for static networks, its processing is divided into 2 seperate functions. Read_static() will accept a dict of the static network. it checks for each direciton if its a string (simplified input) or a dictonary (complete input) and convert each direction into a single network rule to be applied. + +the resultant network rule commands is then passed to the validate_static_commands() function to validate the command using the deinfe constant mapping defined like COMMAND_MAPPING and VALIDATION_MAPPING for the rule and its value respectively. then it will output either correctly formatted command or give an error for invalid inputs. + + +**dynamic:** +for dynamic network, its processing are all packed into a single function with 2 helper functions. the main functions , named read_validate_dynamic(), will accept a dict of the dynamic network and loop through all the key value pairs in the dynamic section. +it checks for 3 main sections: + +- iterator: check it's from the VALID_ITERS constant +- structure: validates this structure is valid by validating each rule name in the structure using the helper function check_rules() +- commands/script: if it's a script then pass the script path. else then parse the commands given the structure similar to the static rule parsing but instead save it as a dict. + + + +## sbc_deployer folder + +- rabbitmq-broker +- kubernetes_utils.py +- sbc_deployer.py + + +**rabbitmq broker** +for the publish subscribe to work. we need a broker to receive and send msgs between the publishers and subscribers. rabbitmq broker folder has all the files needed to start a broker with the specified used ports and service name. + +**kuberentes functions** +2 main functions in kubernetes utils is used to create and delete configmaps respectively. + +**sbc_deployment** +in deployment, prepare_clients_for_launch functions includes 2 network functions to finalize and generate the files to be send via configmaps. the generate_network_configmap_folder is called with an input of the dict of a clientgroup and their groupid (group id is created right before this function is called at the end). this function will generate files for each assigned network in the clientgroup both static and dynamic. static networks are merged before converted to a file using the merge_static_network_rules() function. + +note: groupid is created and saved as a key entry for each clientgroup. it is used to define the name of the files and its location locally before converted to a configmap. + +after files for each clientgroup is create it is then converted to a configmap using create_config_map_from_dict() function from kubernetes_utils with its clientgroup id its specified to. + +## network_manager.py + + +### we have 3 classes: + +- network generator: a class to hold all functions needed with geenrators +- network manager: main class managing all generators and subscribers +- pubsub: wrapper class for pika (python package for rabbitmq) publish subscribe functions + + + + + +### publisher/server side + +in the server decorator, it will create a PubSub object for each iteration and publish accordingly. +both time and epoch publishers publishes 0 at the init function of the decorator. + +Note: publishers for an iter is hardcoded as each publisher will need a unique loop mechnaism and thus making it sufficient. + +time publishers will publish only once at time=1 at the start of the first round + +epoch publisher will publish at the start of each round in the record_start_round() function. + + +### subscriber/client side + +client decorator will create a networkmanager object and call 2 functions: ParseStaticRules and ParseDynamicRules. + +Note: all files sent via configmaps will resort in the Networks folder in the node. + +**Static Rules:** + +ParseStaticRules will accept a file (should be .txt file) and parse it executing every line using a subprocess. + + +**Dynamic Rules:** + +When NetworkManager is created it automatically fetchs all the files from the Networks folder and convert all files ending with json or py (dynamic rules and dynamic script respectivly) into generators and make them store them a dictionary with iters as keys and a list of generators as value. + +Note: the iterators defined is not hardcoded and fully dependant on the file names as the start of the file name is the iter type for example: time_DynFast_... + +ParseDynamicRules loops the dictionary keys (basicly all available iterators) and make a subscriber for each and then pass a custom Callback function generated using create_callback_for_type function. + +create_callback_for_type function returns an anonymous callback function that wraps the CreateCallback function (which is a callback function with generators and iter passed as parameters). + +the callback function is called everytime the subscriber gets a value and loops through the generators in efficiently using a state dict to save future results we got when generating previously. + +time_loop is called when we get time=1 which loops through the generator for time iter each second with the same manner. + +Note: since time is only published twice (time=0,1) the time_loop will be called at time=1 making it called only once. + + +Dynamic Rules are applied in a similar way to how static rules are applied using Subprocesses. + + +# missing features and odd bugs + +### bugs to be fixed + +dynamic network seem to not be able to have around more than 20 commands staticly typed. having more than 20 prevents from executing the commands in the node for some reason + +having only dynamic network and not include a static network seems to not work and prevent dynamic rules executing. + + +### missing features: + +scripts for dynamic network: not fully implemented yet, but the system is ready and only just missing the conversion of scripts to generators to work. + + + + + + +System is done by: Abdullah Alamoudi with the guidance of Eng.Amandio and supervision of Prof. Marco Canini + + diff --git a/src/colext/metric_collection/decorators/flwr_client_decorator.py b/src/colext/metric_collection/decorators/flwr_client_decorator.py index e7dd80a..0f1b0ee 100644 --- a/src/colext/metric_collection/decorators/flwr_client_decorator.py +++ b/src/colext/metric_collection/decorators/flwr_client_decorator.py @@ -7,6 +7,9 @@ from colext.common.utils import get_colext_env_var_or_exit from colext.metric_collection.metric_manager import MetricManager from colext.metric_collection.typing import StageMetrics +import subprocess +from colext.metric_collection.network_manager import NetworkManager , NetworkPubSub + # Class inheritence inside a decorator was inspired by: # https://stackoverflow.com/a/18938008 @@ -34,6 +37,18 @@ def __init__(self, *args, **kwargs): self.mm_proc.start() # Wait for metric manager to finish startup mm_proc_ready_event.wait() + + # Network setup + net_mngr = NetworkManager() + #Parse static rules and create the generators + #static + net_mngr.ParseStaticRules("network/networkrules.txt") + #dynamic + net_mngr.ParseDynamicRules(str(self.client_id)) + + + + # We might be able to cleanup better if the server tells us this is the last round atexit.register(self.clean_up) diff --git a/src/colext/metric_collection/decorators/flwr_server_decorator.py b/src/colext/metric_collection/decorators/flwr_server_decorator.py index 7f77f66..82f9c24 100644 --- a/src/colext/metric_collection/decorators/flwr_server_decorator.py +++ b/src/colext/metric_collection/decorators/flwr_server_decorator.py @@ -10,6 +10,10 @@ from colext.common.logger import log from colext.common.utils import get_colext_env_var_or_exit +from colext.metric_collection.network_manager import NetworkPubSub +import time +import threading + # Class inheritence inside a decorator was inspired by: # https://stackoverflow.com/a/18938008 def MonitorFlwrStrategy(FlwrStrategy): @@ -28,6 +32,15 @@ def __init__(self, *args, **kwargs): self.DB_CONNECTION = self.create_db_connection() self.clients_cid_to_db_id = {} + #publishing + self.pub_epoch = NetworkPubSub("epoch") + self.pub_time = NetworkPubSub("time") + + # this is the init time for the client + self.pub_time.publish(0) + log.debug("Publishing time 0") + + # Temp variable to hold eval round id between fit/evaluate and configure_[fit/evaluate] self.current_round_id = None @@ -52,6 +65,13 @@ def record_start_round(self, server_round: int, stage: str): self.DB_CONNECTION.commit() cursor.close() + if server_round == 0: + # publish 1 at the start of round which is the first time iter to be used in client + self.pub_time.publish(1) + log.debug("Publishing time 1") + #network recording + self.pub_epoch.publish(server_round) + log.debug(f"Publishing epoch {server_round}") return round_id def record_end_round(self, server_round: int, round_type: str, dist_accuracy: float = None, srv_accuracy: float = None): @@ -101,7 +121,8 @@ def configure_clients_in_round(self, client_instructions: List[Tuple[ClientProxy fit_ins.config["COLEXT_ROUND_ID"] = self.current_round_id return client_instructions - + + # ====== Flower functions ====== def configure_fit(self, server_round: int, parameters: Parameters, client_manager: ClientManager) -> List[Tuple[ClientProxy, FitIns]]: diff --git a/src/colext/metric_collection/network_manager.py b/src/colext/metric_collection/network_manager.py new file mode 100644 index 0000000..8273ff7 --- /dev/null +++ b/src/colext/metric_collection/network_manager.py @@ -0,0 +1,376 @@ +import pika +import json +import os +from colext.common.logger import log +import subprocess +import time + +class NetworkGenerator: + + + def __init__(self,file): + ''' + accept type (script or json) and file (path to the file) + and create the generator based on the type + ''' + self.generator = None + + self.file = file + self.rules = None + self.struct = None + self.script = True + filename = os.path.basename(file) + #determine types by splitting the file name + filesplit = filename.split("_") + if filesplit[-1].endswith(".json"): + self.filetype = "json" + elif filesplit[-1].endswith(".py"): + self.filetype = "script" + + #this is the iter type regardless of the file type + self.type = filesplit[0] + + + log.debug(f"Network generator created with file {self.file} and type {self.type}") + if self.filetype == "json": + self.json_parser() + self.MakeGenerator() + log.debug(f"Network generator with json : {self.generator}") + elif self.filetype == "script": + self.MakeGenerator() + log.debug(f"Network generator with script : {self.generator}") + + def json_parser(self): + ''' + given an input json file, parse it and create a dictionary of rules + ''' + jsondict = json.load(open(self.file, 'r')) + self.rules = jsondict['commands_dict'] + self.struct = jsondict['structure'] + self.script = False + #convert all the rules to a command line string + rules = {} + for iter in self.rules: + rules[iter] = [] + for rule in self.rules[iter]: + rules[iter].append(self.converter(rule, self.struct)) + + self.rules = rules + + + def converter(self,rule, struct): + ''' + given an input rule and struct convert it to a command line string + ''' + output = "" + + if rule[0] == "del": + output += "tcdel eth0" + output += " --direction " + rule[1] + return output + elif rule[0] == "set": + output += "tcset eth0" + + + for i in range(len(rule)-1): + if i == 0: + output += " --direction " + rule[i+1] + elif rule[i+1] == -1: + continue + else: + output += " --" + struct[i-1] + " " + rule[i+1] + return output + def MakeGenerator(self): + ''' + given an input type and dict + if type normal, make a generator for the input list of rules + if type is script, wrap the script with a generator and call the init function + and return the generator object + ''' + if self.script: + self.generator = self.ScriptGenerator() + else: + self.generator = self.JsonGenerator() + + + def JsonGenerator(self): + ''' + given an input json file, parse it and create a generator object + ''' + rules = self.rules + for key in rules: + yield key, rules[key] + + + def ScriptGenerator(self): + ''' + given an input script file, parse it and create a generator object + ''' + pass + + + + + +class NetworkManager: + + + def __init__(self,folder_path="network/"): + ''' + parse the static rules and dynamic rules and create the generators + + ''' + self.generators = {} + self.generatorstype = {"epoch": {}, + "time": {}} + + self.Subscribers = {} + + #get all the files in the folder + files = os.listdir(folder_path) + #filter the files to get only the json and script files + files = [f for f in files if f.endswith(".json") or f.endswith(".py")] + #create the generators for each file + for file in files: + self.generators[file] = NetworkGenerator(folder_path + file) + #get the type of the generator and add it to the list of generators + if self.generators[file].type in self.generatorstype: + self.generatorstype[self.generators[file].type][file] = self.generators[file] + else: + # assume that iter type verification is done + self.generatorstype[self.generators[file].type][file] = self.generators[file] + + + + pass + def ParseStaticRules(self, file): + if os.path.exists(file): + log.info(f"Applying network configuration from {file}") + try: + with open(file, "r") as f: + tc_commands = [line.strip() for line in f if line.strip() and not line.startswith('#')] + + for cmd in tc_commands: + log.debug(f"Running network command: {cmd}") + result = subprocess.run(cmd.split(), capture_output=True, text=True) + if result.returncode != 0: + log.error(f"Network command failed: {result.stderr}") + else: + log.debug(f"Network command output: {result.stdout}") + except Exception as e: + log.error(f"Failed to apply network configuration: {e}") + else: + log.info(f"No network configuration found at {file}") + + def ParseDynamicRules(self,client_id="None"): + ''' + make a subscriber for each generator type and call the generator function depending on the type + ''' + + log.info("Parsing dynamic rules") + for iter in self.generatorstype: + + self.Subscribers[iter] = NetworkPubSub(iter) + + self.Subscribers[iter].subscribe(create_callback_for_type(self.generatorstype[iter],iter),queue_prefix=client_id) + log.info(f"Subscriber for {iter} type created") + +#function specificly to avoid the lambda issue with closures +def create_callback_for_type(generators, type_name): + return lambda ch, method, properties, body: CreateCallback( + ch, method, properties, body, generators, type_name) + + +state = {} + +def CreateCallback(ch,method,properties,body,generators,type_iter=None): + """ + create a callback function that iterates over all the generators for a specific type + + the generators are passed as a dictionary of generators with the index as the key + """ + + current_iter = int(body.decode('utf-8')) + global state + if state is None: + state = {} + + + if type_iter == "time" : + if current_iter > 0 and current_iter < 2: + #start the time loop + time_loop(generators, state) + return + # Silently ignore time messages with values > 1 + # TODO : support for scripts for time iters aka time = 0 , 1 and so on + + + + keys_to_remove = [] + for key , gen in generators.items(): + if key not in state: + state[key] = {} + #check if the current iter is in the state + if str(current_iter) in state[key]: + for cmd in state[key][str(current_iter)]: + log.info(f"Executing command for time {current_iter}: {cmd}") + result = subprocess.run(cmd[0].split(), capture_output=True, text=True) + if result.returncode != 0: + log.error(f"Network command failed: {result.stderr}") + else: + del state[key][str(current_iter)] + + try: + if state[key] == {}: + keygen, command = next(gen.generator) + #save it in state if its not the current iter + if keygen not in state[key]: + state[key][keygen] = [] + state[key][keygen].append(command) + except StopIteration: + # No more commands in the generator + keys_to_remove.append(key) + + #remove the entries from the dict + for key in keys_to_remove: + del generators[key] + + + +def time_loop(generators, state): + """ + a loop that is called after the time iter = 0 is recieved + """ + current_iter = 0 + start_time = time.time() + next_time = start_time + 1 + rules_not_done = True + while rules_not_done: + #check if all the generators are done + rules_not_done = False + # TODO: there is a logical error here, the generators are called regardless of the time and state and updated + # but since the generator is exhausted, it will not be called again and thus rules will not be executed + #a list of keys to be deleted after loop + keys_to_remove = [] + for key , gen in generators.items(): + if key not in state: + state[key] = {} + #check if the current iter is in the state + if str(current_iter) in state[key]: + for cmd in state[key][str(current_iter)]: + log.info(f"Executing command for time {current_iter}: {cmd}") + result = subprocess.run(cmd[0].split(), capture_output=True, text=True) + if result.returncode != 0: + log.error(f"Network command failed: {result.stderr}") + + + else: + rules_not_done = True + del state[key][str(current_iter)] + + try: + if state[key] == {}: + keygen, command = next(gen.generator) + #save it in state if its not the current iter + if keygen not in state[key]: + state[key][keygen] = [] + state[key][keygen].append(command) + rules_not_done = True + except StopIteration: + # No more commands in the generator + keys_to_remove.append(key) + + #remove the entries from the dict + for key in keys_to_remove: + del generators[key] + + if state != {}: + rules_not_done = True + + #sleep for 1 second + time_to_sleep = next_time - time.time() + if time_to_sleep > 0: + time.sleep(time_to_sleep) + current_iter += 1 + next_time = start_time + current_iter + + +import threading + +class NetworkPubSub: + + + HOST = "rabbitmq-broker.rabbitmq-system" + PORT = 6942 + + def __init__(self,topic): + ''' + input topic (epoch, time) + ''' + self.topic = topic + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.HOST, port=self.PORT)) + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange='network', exchange_type='topic') + + self.consumer_thread = None + self.running = False + + + def subscribe(self, callback, queue_prefix=None): + ''' + given a callback function, subscribe to the topic and call the callback function when a message is received + ''' + queue_name = queue_prefix +"-"+ self.topic + queue = self.channel.queue_declare(queue=queue_name, durable=True) + + + + self.channel.queue_bind(exchange='network', queue=queue_name, routing_key=f'sync.{self.topic}') + self.channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) + log.info(f" [*] Subscribed to topic {self.topic}") + log.info(f" [*] Waiting for messages in {self.topic} topic.") + + + # Start consuming in a separate thread + self.running = True + self.consumer_thread = threading.Thread(target=self.consume_thread) + self.consumer_thread.daemon = True # Thread will exit when main program exits + self.consumer_thread.start() + + def consume_thread(self): + + self.channel.start_consuming() + + + def stop_consuming(self): + if self.running: + self.running = False + if self.channel and self.channel.is_open: + self.channel.stop_consuming() + if self.consumer_thread and self.consumer_thread.is_alive(): + self.consumer_thread.join(timeout=1.0) + + def publish(self, message): + # check if string + + + if not isinstance(message, str): + message = str(message) + + log.info(f"Publishing message '{message}' to topic {self.topic}") + + + self.channel.basic_publish(exchange='network', routing_key=f'sync.{self.topic}', + body=message, properties=pika.BasicProperties( + delivery_mode=2, # make message persistent + )) + def close(self): + self.stop_consuming() + if self.connection and self.connection.is_open: + self.connection.close() + + + + + diff --git a/src/colext/scripts/experiment_dispatcher.py b/src/colext/scripts/experiment_dispatcher.py index 9ee5b6f..3d1cfce 100644 --- a/src/colext/scripts/experiment_dispatcher.py +++ b/src/colext/scripts/experiment_dispatcher.py @@ -6,6 +6,38 @@ from colext.common.logger import log from colext.exp_deployers import get_deployer from colext.exp_deployers.db_utils import DBUtils +import re + + +#Network variables + +network_dir = "./network_scripts" + +# Global mappings for commands and validation regexes +COMMAND_MAPPING = { + "bandwidth": "rate", "speed": "rate", "rate": "rate", + "delay": "delay", "delay-time": "delay", "latency": "delay", "latency-time": "delay", + "delay-distribution": "delay-distribution", "delay-distro": "delay-distro", + "loss": "loss", "duplicate": "duplicate", "corrupt": "corrupt", + "reordering": "reordering", "reorder": "reordering", "limit": "limit" +} + +TIME_UNITS = r"(h|hour|hours|m|min|mins|minute|minutes|s|sec|secs|second|seconds|ms|msec|msecs|millisecond|milliseconds|us|usec|usecs|microsecond|microseconds)" +VALIDATION_MAPPING = { + "rate": r"^\d{1,4}(\.\d+)?(Kbps|Mbps|Gbps)$", + "delay": r"^\d+(\.\d+)?" + TIME_UNITS + "$", + "delay-distro": r"^\d+(\.\d+)?" + TIME_UNITS + "$", + "delay-distribution": r"^(normal|pareto|paretonormal|Normal|Pareto|ParetoNormal)$", + "loss": r"^\d+(\.\d+)?%$", + "duplicate": r"^\d+(\.\d+)?%$", + "corrupt": r"^\d+(\.\d+)?%$", + "reordering": r"^\d+(\.\d+)?%$", + "limit": r"^\d+$" +} + +VALID_ITERS = ["time", "epoch"] + + def get_args(): parser = argparse.ArgumentParser(description='Run an experiment on the FL Testbed') @@ -20,6 +52,7 @@ def get_args(): help="Change deployer without changing config. Example: --deployer=local_py") parser.add_argument('-w', '--wait_for_experiment', default=True, action='store_true', help="Wait for experiment to finish.") + parser.add_argument('-n', '--network_dir', type=str, default=network_dir,) # parser.add_argument('-d', '--delete_on_end', default=True, action='store_true', help="Delete FL pods .") args = parser.parse_args() @@ -104,10 +137,219 @@ def read_config(config_file, args): config_dict["req_dev_types"] = list(set([client["dev_type"] for client in config_dict["clients"]])) config_dict["n_clients"] = sum(client["count"] for client in config_dict["clients"]) + + config_dict["networks"] = read_network(config_dict) print("CoLExT configuration read successfully") return config_dict +# given in input dictonary config it will output a new dict with all the networks +def read_network(config): + networks = config['network'] + clients = config['clients'] + + # Create a mapping from network tag to clients using that tag. + network_tags = {} + + # Process static network commands + for net in networks: + tag = net['tag'] + network_tags[tag] = {} + if "dynamic" in net: + network_tags[tag]["dynamic"] = {} # prepare dict for dynamic config + continue + network_tags[tag]["commands"] = read_static(net) + + + # Validate commands for non-dynamic networks + for tag, net in network_tags.items(): + if "dynamic" not in net: + for direction in ["upstream", "downstream"]: + net["commands"][direction] = validate_static_commands(net["commands"][direction], tag) + + + + # Process dynamic network configuration and validate + for net in [n for n in networks if "dynamic" in n]: + tag = net['tag'] + network_tags[tag]["dynamic"] = read_validate_dynamic(net) + + + print("Network tags:", network_tags) + return network_tags + +def read_static(net): + # Build commands for upstream and downstream directions + network_commands = {"upstream": [], "downstream": []} + + + for direction in ["upstream", "downstream"]: + cmd_value = net.get(direction) + if isinstance(cmd_value, str): + # example: (upstream/downstream): 2000Mbps 3ms 50% normal + # each string is a token + tokens = cmd_value.split() + if tokens: + if len(tokens) > 0: + network_commands[direction].append(f"rate {tokens[0]}") + if len(tokens) > 1: + network_commands[direction].append(f"delay {tokens[1]}") + if len(tokens) > 2: + network_commands[direction].append(f"loss {tokens[2]}") + if len(tokens) > 3: + network_commands[direction].append(f"delay-distribution {tokens[3]}") + elif isinstance(cmd_value, dict): + for rule, value in cmd_value.items(): + network_commands[direction].append(f"{rule} {value}") + return network_commands + +def validate_static_commands(commands, network_name): + """ + Validate a list of command strings for a given network. + input commands should be a list of command strings + Returns a list of validated and formatted command strings. + """ + # commands is the attributes defined for each network + # example + validated = [] + for command in commands: + command_split = command.split() + if len(command_split) < 2: + print(f"Missing value for command in network:{network_name}") + sys.exit(1) + if len(command_split) > 2: + print(f"Too many tokens in command in network:{network_name}") + sys.exit(1) + + rule_name = command_split[0] + if rule_name not in COMMAND_MAPPING: + print(f"Invalid command {rule_name} in network:{network_name}") + sys.exit(1) + + rule_name = COMMAND_MAPPING[rule_name] + if not re.match(VALIDATION_MAPPING[rule_name], command_split[1]): + print(f"Invalid {rule_name} format: {command_split[1]} in network:{network_name}") + sys.exit(1) + + validated.append(f"{rule_name} {command_split[1]}") + return validated + +def read_validate_dynamic(net): + ''' + Read and validate dynamic network configuration from the given dict. + input net should be a dict that contains the original network config + Returns a dict with validated and formatted dynamic network config. + ''' + dynamic_config = {} + tag = net['tag'] + #looping through the dynamic config and validating each entry aka each iterator defined with its list of commands + for entry in net['dynamic']: + + #validate the iterator + iterator = entry.get('iterator') + if not iterator or iterator not in VALID_ITERS: + print(f"Invalid or missing iterator in {tag}") + sys.exit(1) + if iterator in dynamic_config: + print(f"Iterator {iterator} already exists in {tag} ignoring this entry") + continue + + + + entry_temp = {} + #validate structure + structure = entry.get("structure", ['rate', 'delay']) # default to [rate,delay] if structure is not defined + corrected = check_rules(structure) + + entry_temp["structure"] = corrected + + + #check for script else take commands + if 'script' in entry: + if not os.path.exists(network_dir + entry["script"]): + print(f"Script file {entry['script']} does not exist.") + sys.exit(1) + entry_temp["script"] = entry['script'] + elif 'commands' in entry: + entry_temp["script"] = False + entry_temp["commands_dict"] = {} + + for command in entry["commands"]: + #parse it into key: values + if not isinstance(command, list) or len(command) < 3: + print(f"Invalid command format: {command} in {tag}") + sys.exit(1) + key = command[0] + value = command[1:] + + if key in ['structure', 'iterator']: + continue + if not check_command(value, entry_temp["structure"]): + print(f"Invalid command: {value} in {tag}") + sys.exit(1) + + #check if the key is already there if it is then we add it to the list instead + if key in entry_temp["commands_dict"]: + entry_temp["commands_dict"][key].append(value) + else: + entry_temp["commands_dict"][key] = [value] + + + + dynamic_config[iterator] = entry_temp + return dynamic_config + +def check_command(command, structure): + ''' + checks if the list of commands follow the given structure + Returns True if the command is valid else False + ''' + + command_split = command.split() if isinstance(command, str) else command + #check first if it is more than or equal to 2 to check for del/set + if len(command_split) < 2: + print("Invalid command length") + return False + + if command_split[0] not in ['set', 'del']: + print(f"Invalid command name: {command_split[0]} in {command} only 'set' and 'del' are allowed") + return False + + if command_split[1] not in ["incoming", "outgoing"]: + print(f"Invalid direction: {command_split[1]} in {command} only 'incoming' and 'outgoing' are allowed") + return False + + #since del does not require any values we can skip the rest of the checks + if command_split[0] == "del": + return True + elif len(command_split) != len(structure) + 2: + print(f"Invalid command length: {command} for {structure}") + return False + + for i, rule in enumerate(structure): + if command_split[i + 2] == -1: + continue + if not re.match(VALIDATION_MAPPING[rule], command_split[i + 2]): + print(f"Invalid {rule} format: {command_split[i + 2]}") + return False + return True + +def check_rules(structure): + """ + Validate and correct a list of rule names using COMMAND_MAPPING. + Returns a tuple (is_valid, corrected_structure). + """ + corrected = [] + for rule in structure: + if rule not in COMMAND_MAPPING: + print(f"Invalid rule: {rule}. Valid rules are: {', '.join(COMMAND_MAPPING.keys())}") + sys.exit(1) + corrected.append(COMMAND_MAPPING[rule]) + return corrected + + + + def print_err(msg): print(f"ERR: {msg}") @@ -127,7 +369,7 @@ def launch_experiment(): args = get_args() config_dict = read_config(args.config_file, args) - + network_dir = args.network_dir Deployer = get_deployer(config_dict["deployer"]) deployer = Deployer(config_dict, args.test_env)