From ce9e981ea91d2df1369092b2ba5e1a90dc4eea03 Mon Sep 17 00:00:00 2001 From: MohammedAljahdali Date: Sun, 30 Jun 2024 05:14:59 +0300 Subject: [PATCH 1/2] Add optional network simulator during Ray simulations --- slower/client/proxy/ray_client_proxy.py | 6 ++- .../proxy/ray_private_server_model_proxy.py | 12 ++++- slower/simulation/app.py | 19 ++++++- slower/simulation/utlis/__init__.py | 0 slower/simulation/utlis/network_simulator.py | 51 +++++++++++++++++++ 5 files changed, 83 insertions(+), 5 deletions(-) create mode 100644 slower/simulation/utlis/__init__.py create mode 100644 slower/simulation/utlis/network_simulator.py diff --git a/slower/client/proxy/ray_client_proxy.py b/slower/client/proxy/ray_client_proxy.py index b8504ee..1ccf52a 100644 --- a/slower/client/proxy/ray_client_proxy.py +++ b/slower/client/proxy/ray_client_proxy.py @@ -13,6 +13,7 @@ from slower.server.server_model.proxy.ray_private_server_model_proxy import ( RayPrivateServerModelProxy ) +from slower.simulation.utlis.network_simulator import NetworkSimulator class RayClientProxy(RayActorClientProxy): @@ -21,10 +22,12 @@ class RayClientProxy(RayActorClientProxy): def __init__( self, server_model_manager: ServerModelManager, + network_simulator: Optional[NetworkSimulator] = None, **kwargs, ): super().__init__(**kwargs) self.server_model_manager = server_model_manager + self.network_simulator = network_simulator def fit( self, @@ -36,7 +39,8 @@ def fit( def fit(client: Client) -> common.FitRes: server_model_proxy = RayPrivateServerModelProxy( server_model, - request_queue_in_separate_thread=True + request_queue_in_separate_thread=True, + network_simulator=self.network_simulator ) # also return the server_model_proxy, so that we can store it outside the # ray actor to the shared ray memory diff --git a/slower/server/server_model/proxy/ray_private_server_model_proxy.py b/slower/server/server_model/proxy/ray_private_server_model_proxy.py index 5e3dbd4..ad3e217 100644 --- a/slower/server/server_model/proxy/ray_private_server_model_proxy.py +++ b/slower/server/server_model/proxy/ray_private_server_model_proxy.py @@ -1,3 +1,4 @@ +from typing import Optional from queue import SimpleQueue from typing import Iterator import threading @@ -5,6 +6,7 @@ from slower.server.server_model.server_model import ServerModel from slower.common import ControlCode, BatchData from slower.server.server_model.proxy.server_model_proxy import ServerModelProxy +from slower.simulation.utlis.network_simulator import NetworkSimulator class RayPrivateServerModelProxy(ServerModelProxy): @@ -13,22 +15,28 @@ class RayPrivateServerModelProxy(ServerModelProxy): def __init__( self, server_model: ServerModel, - request_queue_in_separate_thread: bool = True + request_queue_in_separate_thread: bool = True, + network_simulator: Optional[NetworkSimulator]=None ): super().__init__() self.server_model = server_model self.request_queue = None self.server_request_thread = None self.request_queue_in_separate_thread = request_queue_in_separate_thread + self.network_simulator = network_simulator def _blocking_request(self, method, batch_data, timeout): _ = (timeout,) + if self.network_simulator is not None: + self.network_simulator.simulate_network(batch_data=batch_data) res = getattr(self.server_model, method)(batch_data=batch_data) return res def _streaming_request(self, method, batch_data): - if self.request_queue is not None: + + if self.network_simulator is not None: + self.network_simulator.simulate_network(batch_data=batch_data) self.request_queue.put((method, batch_data)) else: self._blocking_request(method=method, batch_data=batch_data, timeout=None) diff --git a/slower/simulation/app.py b/slower/simulation/app.py index a3795e8..88511e2 100644 --- a/slower/simulation/app.py +++ b/slower/simulation/app.py @@ -22,6 +22,7 @@ from slower.simulation.ray_transport.split_learning_actor_pool import SplitLearningVirtualClientPool +from slower.simulation.utlis.network_simulator import NetworkSimulator from slower.client.typing import ClientFn from slower.client.proxy.ray_client_proxy import RayClientProxy from slower.server.server import Server @@ -45,6 +46,7 @@ def start_simulation( actor_type: Type[VirtualClientEngineActor] = DefaultActor, actor_kwargs: Optional[Dict[str, Any]] = None, actor_scheduling: Union[str, NodeAffinitySchedulingStrategy] = "DEFAULT", + network_simulator_kwargs: Optional[Dict[str, int]] = None, ) -> History: """Start a Ray-based Flower simulation server. @@ -115,7 +117,9 @@ def start_simulation( compute nodes (e.g. via NodeAffinitySchedulingStrategy). Please note this is an advanced feature. For all details, please refer to the Ray documentation: https://docs.ray.io/en/latest/ray-core/scheduling/index.html - + network_simulator_kwargs: Optional[Dict[str, int]] (default: None) + Optional dictionary containing arguments to configure the network simulator. If provided, + the network simulator will be enabled. Returns ------- hist : flwr.server.history.History @@ -257,13 +261,24 @@ def update_resources(f_stop: threading.Event) -> None: pool.num_actors, ) + if network_simulator_kwargs: + network_simulator = NetworkSimulator(**network_simulator_kwargs) + log( + INFO, + "Flower VCE: Network simulator enabled with %s", + network_simulator_kwargs, + ) + else: + network_simulator = None + # Register one RayClientProxy object for each client with the ClientManager for cid in cids: client_proxy = RayClientProxy( client_fn=client_fn, cid=cid, actor_pool=pool, - server_model_manager=initialized_server.server_model_manager + server_model_manager=initialized_server.server_model_manager, + network_simulator=network_simulator ) initialized_server.client_manager().register(client=client_proxy) diff --git a/slower/simulation/utlis/__init__.py b/slower/simulation/utlis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/slower/simulation/utlis/network_simulator.py b/slower/simulation/utlis/network_simulator.py new file mode 100644 index 0000000..bb783e1 --- /dev/null +++ b/slower/simulation/utlis/network_simulator.py @@ -0,0 +1,51 @@ +import time +import random +import sys +from slower.common import BatchData + + +class NetworkSimulator: + def __init__(self, avg_latency_ms, latency_variance_ms, avg_bandwidth_mbps, bandwidth_variance_mbps): + self.avg_latency_ms = avg_latency_ms + self.latency_variance_ms = latency_variance_ms + self.avg_bandwidth_mbps = avg_bandwidth_mbps + self.bandwidth_variance_mbps = bandwidth_variance_mbps + + def simulate_latency(self): + latency = random.gauss(self.avg_latency_ms, self.latency_variance_ms) + time.sleep(max(0, latency) / 1000) + + def simulate_bandwidth(self, data_size): + bandwidth_mbps = max(0, random.gauss(self.avg_bandwidth_mbps, self.bandwidth_variance_mbps)) + if bandwidth_mbps > 0: + transfer_time = data_size / (bandwidth_mbps * 1024 * 1024 / 8) + time.sleep(transfer_time) + + def simulate_network(self, batch_data: BatchData): + data_size = get_data_size(batch_data) + self.simulate_latency() + self.simulate_bandwidth(data_size) + + +def get_data_size(batch_data) -> int: + def get_size(obj, seen=None): + """Recursively finds size of objects""" + size = sys.getsizeof(obj) + if seen is None: + seen = set() + obj_id = id(obj) + if obj_id in seen: + return 0 + # Important mark as seen *before* entering recursion to gracefully handle + # self-referential objects + seen.add(obj_id) + if isinstance(obj, dict): + size += sum([get_size(v, seen) for v in obj.values()]) + size += sum([get_size(k, seen) for k in obj.keys()]) + elif hasattr(obj, '__dict__'): + size += get_size(vars(obj), seen) + elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)): + size += sum([get_size(i, seen) for i in obj]) + return size + + return get_size(batch_data) From dce57634b96d93b7393e60204f4d1d79a5c9a538 Mon Sep 17 00:00:00 2001 From: MohammedAljahdali Date: Mon, 1 Jul 2024 09:47:10 +0300 Subject: [PATCH 2/2] Changed where the streaming request latency is injected, such that it doesn't affect the client --- .../server_model/proxy/ray_private_server_model_proxy.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/slower/server/server_model/proxy/ray_private_server_model_proxy.py b/slower/server/server_model/proxy/ray_private_server_model_proxy.py index ad3e217..deaf89a 100644 --- a/slower/server/server_model/proxy/ray_private_server_model_proxy.py +++ b/slower/server/server_model/proxy/ray_private_server_model_proxy.py @@ -34,9 +34,6 @@ def _blocking_request(self, method, batch_data, timeout): def _streaming_request(self, method, batch_data): if self.request_queue is not None: - - if self.network_simulator is not None: - self.network_simulator.simulate_network(batch_data=batch_data) self.request_queue.put((method, batch_data)) else: self._blocking_request(method=method, batch_data=batch_data, timeout=None) @@ -51,6 +48,8 @@ def _iterate(server_proxy: ServerModelProxy, iterator: Iterator): for method, batch in iterator: if batch.control_code == ControlCode.DO_CLOSE_STREAM: break + if server_proxy.network_simulator is not None: + server_proxy.network_simulator.simulate_network(batch_data=batch) server_proxy._blocking_request(method, batch, None) self.request_queue = SimpleQueue()