From e395c4656c05a3fa8f3df75deddcb6ed2e257e92 Mon Sep 17 00:00:00 2001 From: Jeffery Cheng Date: Sat, 9 Apr 2022 04:11:31 -0700 Subject: [PATCH 01/14] added table state and connector class, table state to feature frame (not sure what that really does though, it still gets passed into feature frame) --- ralf/state/table_state.py | 4 +-- ralf/v2/api.py | 11 +++++- ralf/v2/connector.py | 30 +++++++++++++++++ ralf/v2/dict_connector.py | 39 +++++++++++++++++++++ ralf/v2/examples/counter.py | 67 ++++++++++++++++++++++++++++++++++--- ralf/v2/operator.py | 1 + ralf/v2/record.py | 40 ++++++++++++++++++++-- ralf/v2/table_state.py | 66 ++++++++++++++++++++++++++++++++++++ 8 files changed, 248 insertions(+), 10 deletions(-) create mode 100644 ralf/v2/connector.py create mode 100644 ralf/v2/dict_connector.py create mode 100644 ralf/v2/table_state.py diff --git a/ralf/state/table_state.py b/ralf/state/table_state.py index 8a03504..9061679 100644 --- a/ralf/state/table_state.py +++ b/ralf/state/table_state.py @@ -1,7 +1,7 @@ from typing import List -from ralf.record import Record, Schema -from ralf.state.connector import Connector +from ralf.v2.record import Record, Schema +from ralf.v2.connector import Connector # Maintains table values diff --git a/ralf/v2/api.py b/ralf/v2/api.py index eeff756..e9e2e56 100644 --- a/ralf/v2/api.py +++ b/ralf/v2/api.py @@ -9,6 +9,7 @@ from typing import Deque, Dict, Iterable, List, Optional, Type, Union from typing_extensions import Literal +from ralf.v2.table_state import TableState from ralf.v2.manager import LocalManager, RalfManager, RayManager, SimpyManager from ralf.v2.operator import OperatorConfig @@ -85,16 +86,23 @@ def __init__( self, transform_object: BaseTransform, scheduler: BaseScheduler = FIFO(), + table_state: TableState = None, operator_config: OperatorConfig = OperatorConfig(), ): self.transform_object = transform_object self.scheduler = scheduler + self.table_state = table_state + # Adding component's access to each other's states in the feature frame + self.transform_object.table_state = table_state + self.transform_object.scheduler = scheduler + self.scheduler.table_state = table_state self.config = operator_config self.children: List["FeatureFrame"] = [] logger.msg( "Created FeatureFrame", transform=transform_object, scheduler=scheduler, + table_state=table_state, config=operator_config if operator_config != OperatorConfig() else "default", @@ -104,10 +112,11 @@ def transform( self, transform_object: BaseTransform, scheduler: BaseScheduler = FIFO(), + table_state: TableState = None, operator_config: OperatorConfig = OperatorConfig(), ) -> "FeatureFrame": """Apply a transformation to this feature frame, with scheduler.""" - frame = FeatureFrame(transform_object, scheduler, operator_config) + frame = FeatureFrame(transform_object, scheduler, table_state, operator_config) self.children.append(frame) return frame diff --git a/ralf/v2/connector.py b/ralf/v2/connector.py new file mode 100644 index 0000000..8a3a152 --- /dev/null +++ b/ralf/v2/connector.py @@ -0,0 +1,30 @@ +from abc import ABC, abstractmethod +from typing import List, Union + +from ralf.record import Record, Schema + + +class Connector(ABC): + @abstractmethod + def add_table(self, schema: Schema): + pass + + @abstractmethod + def update(self, schema: Schema, record: Record): + pass + + @abstractmethod + def delete(self, schema: Schema, key: str): + pass + + @abstractmethod + def get_one(self, schema: Schema, key: str) -> Union[Record, None]: + pass + + @abstractmethod + def get_all(self, schema: Schema) -> List[Record]: + pass + + @abstractmethod + def count(self, schema: Schema) -> int: + pass diff --git a/ralf/v2/dict_connector.py b/ralf/v2/dict_connector.py new file mode 100644 index 0000000..269dccf --- /dev/null +++ b/ralf/v2/dict_connector.py @@ -0,0 +1,39 @@ +from typing import Dict, List, Union + +from ralf.v2.record import Record, Schema +from ralf.v2.connector import Connector + + +class DictConnector(Connector): + def __init__(self): + self.tables = dict() + + def add_table(self, schema: Schema): + self.tables[schema.name] = dict() + + def get_records(self, schema: Schema) -> Dict: + return self.tables[schema.get_name()] + + def update(self, schema: Schema, record: Record): + records = self.get_records(schema) + key = getattr(record.entry, schema.primary_key) + records[key] = record + + def delete(self, schema: Schema, key: str): + records = self.get_records(schema) + if key in records: + records.pop(key, None) + + def get_one(self, schema: Schema, key) -> Union[Record, None]: + records = self.get_records(schema) + if key in records: + return records[key] + return None + + def get_all(self, schema: Schema) -> List[Record]: + records = self.get_records(schema) + return list(records.values()) + + def count(self, schema: Schema) -> int: + records = self.get_records(schema) + return len(records.items()) diff --git a/ralf/v2/examples/counter.py b/ralf/v2/examples/counter.py index 381748b..4dee554 100644 --- a/ralf/v2/examples/counter.py +++ b/ralf/v2/examples/counter.py @@ -2,6 +2,9 @@ from collections import defaultdict from dataclasses import dataclass from typing import List +from ralf.v2.dict_connector import DictConnector +from ralf.v2.record import Schema +from ralf.v2.table_state import TableState from ralf.v2 import BaseTransform, RalfApplication, RalfConfig, Record from ralf.v2.operator import OperatorConfig, RayOperatorConfig @@ -53,15 +56,44 @@ def on_event(self, _: Record) -> List[Record[SourceValue]]: class Sum(BaseTransform): def __init__(self): - self.total = defaultdict(lambda: 0) + self.total = 0 def on_event(self, record: Record) -> Record[SumValue]: - self.total[record.entry.key] += record.entry.value - print(f"Record {record.entry.key}, value {self.total[record.entry.key]}") + self.total += record.entry.value + print(f"Record {record.entry.key}, value {str(self.total)}") return Record( - entry=SumValue(key=record.entry.key, value=self.total[record.entry.key]) + entry=SumValue(key=record.entry.key, value=self.total) ) +class UpdateDict(BaseTransform): + def __init__(self): + self.count = 0 + + def on_event(self, record: Record) -> None: + print("single update table", self.table_state.connector.tables) + self.table_state.update(record) + return None + +class BatchUpdate(BaseTransform): + def __init__(self ,batch_size: int): + self.batch_size = batch_size + self.count = 0 + self.records = [] + + def on_event(self, record: Record) -> None: + self.records.append(record) + self.count += 1 + + if self.count >= self.batch_size: + self.count = 0 + for r in self.records: + print(f"batch update, processing {r}") + self.table_state.update(r) + self.records = [] + print("batch table", self.table_state.connector.tables) + + return None + if __name__ == "__main__": @@ -70,7 +102,7 @@ def on_event(self, record: Record) -> Record[SumValue]: app = RalfApplication(RalfConfig(deploy_mode=deploy_mode)) source_ff = app.source( - FakeSource(10000), + FakeSource(10), operator_config=OperatorConfig( ray_config=RayOperatorConfig(num_replicas=1), ), @@ -83,5 +115,30 @@ def on_event(self, record: Record) -> Record[SumValue]: ), ) + dict_schema = Schema("key", {"key": str, "value": int}) + dict_schema_1 = Schema("key", {"key": str, "value": int}) + dict_schema.name = "single" + dict_schema_1.name = "batch" + dict_conn = DictConnector() + + dict_table_state = TableState(dict_schema, dict_conn) + batch_table_state = TableState(dict_schema_1, dict_conn) + + update_ff = sum_ff.transform( + UpdateDict(), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + table_state=dict_table_state + ) + + batch_update_ff = sum_ff.transform( + BatchUpdate(3), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + table_state=batch_table_state + ) + app.deploy() app.wait() diff --git a/ralf/v2/operator.py b/ralf/v2/operator.py index c548518..73fcbeb 100644 --- a/ralf/v2/operator.py +++ b/ralf/v2/operator.py @@ -145,6 +145,7 @@ def time_and_count(metric_name, labels={}): # If this operator got StopIteration, it should exit elif next_event.is_stop_iteration(): + self.transform_object.on_stop(next_event) raise StopIteration() else: diff --git a/ralf/v2/record.py b/ralf/v2/record.py index 94fa59c..6a2cea4 100644 --- a/ralf/v2/record.py +++ b/ralf/v2/record.py @@ -1,8 +1,10 @@ import enum import threading -from dataclasses import dataclass, is_dataclass +import json +from string import ascii_lowercase +from dataclasses import dataclass, is_dataclass, asdict from time import time_ns -from typing import Generic, Optional, TypeVar, Union +from typing import Generic, Optional, TypeVar, Union, Dict, Type class RecordType(enum.Enum): @@ -71,3 +73,37 @@ def is_wait_event(self) -> bool: def wait(self): assert self.is_wait_event() self.entry.wait() + + +# Schema validation +class Schema: + def __init__(self, primary_key: str, columns: Dict[str, Type]): + self.primary_key = primary_key + self.columns = columns + self.name = self.compute_name() + + def validate_record(self, record: Record): + record_dict = record.entry.__dict__ + # print(record_dict) + schema_columns = set(self.columns.keys()).union(set([self.primary_key])) + record_columns = set(record_dict.keys()) + assert ( + schema_columns == record_columns + ), f"schema columns are {schema_columns} but record here {str(record)} has {record_columns}, {str(record_dict)}" + #type checking + for key, type in self.columns.items(): + assert(isinstance(record_dict[key], type)), f"schema key {key} has type {type} but record here {str(record)} has type {str(type(record_dict[key]))}" + + def compute_name(self) -> str: + dump = json.dumps(list(self.columns.keys()), sort_keys=True) + hash_val = str(abs(hash(dump))) + name = "" + for c in hash_val: + name += ascii_lowercase[int(c)] + return name + + def get_name(self) -> str: + return self.name + + def __hash__(self) -> int: + return hash(self.name) diff --git a/ralf/v2/table_state.py b/ralf/v2/table_state.py new file mode 100644 index 0000000..f5c7269 --- /dev/null +++ b/ralf/v2/table_state.py @@ -0,0 +1,66 @@ +from collections import defaultdict +from typing import List +import time + +from ralf.v2.record import Record, Schema +from ralf.v2.connector import Connector + + +# Maintains table values +# TODO: This should eventually be a wrapper around a DB connection +class TableState: + def __init__(self, schema: Schema, connector: Connector): + self.schema = schema + self.connector = connector + self.connector.add_table(schema) + + self.times = defaultdict(float) + self.counts = defaultdict(int) + + self.num_updates: int = 0 + self.num_deletes: int = 0 + self.num_records: int = 0 + + def debug_state(self): + self.num_records = self.connector.count(self.schema) + return { + "num_updates": self.num_updates, + "num_deletes": self.num_deletes, + "num_records": self.num_records, + } + + def update(self, record: Record): + self.schema.validate_record(record) + t1 = time.time() + self.connector.update(self.schema, record) + t2 = time.time() + + self.times["update"] += (t2-t1) + self.counts["update"] += 1 + self.num_updates += 1 + + def delete(self, key: str): + t1 = time.time() + self.connector.delete(self.schema, key) + t2 = time.time() + + self.times["delete"] += (t2-t1) + self.counts["delete"] += 1 + self.num_deletes += 1 + + def point_query(self, key) -> Record: + t1 = time.time() + val = self.connector.get_one(self.schema, key) + t2 = time.time() + + self.times["point_query"] += (t2-t1) + self.counts["point_query"] += 1 + if not val: + raise KeyError(f"Key {key} not found.") + return val + + def bulk_query(self) -> List[Record]: + return self.connector.get_all(self.schema) + + def get_schema(self) -> Schema: + return self.schema From ae42aa31f626343db665da0d87838b767c00162d Mon Sep 17 00:00:00 2001 From: Jeffery Cheng Date: Wed, 4 May 2022 15:29:56 -0700 Subject: [PATCH 02/14] latency local testing errors --- ralf/v2/benchmark/latency.py | 132 +++++++++++++++++++++++++++++++++++ ralf/v2/table_state.py | 9 ++- 2 files changed, 139 insertions(+), 2 deletions(-) create mode 100644 ralf/v2/benchmark/latency.py diff --git a/ralf/v2/benchmark/latency.py b/ralf/v2/benchmark/latency.py new file mode 100644 index 0000000..432f4ad --- /dev/null +++ b/ralf/v2/benchmark/latency.py @@ -0,0 +1,132 @@ +import time +from collections import defaultdict +from dataclasses import dataclass +from typing import List + +from nbformat import read +from ralf.v2.dict_connector import DictConnector +from ralf.v2.record import Schema +from ralf.v2.table_state import TableState + +from ralf.v2 import BaseTransform, RalfApplication, RalfConfig, Record +from ralf.v2.operator import OperatorConfig, RayOperatorConfig + + +@dataclass +class SourceValue: + key: str + value: int + timestamp: float + +LOAD = 10 + +class FakeSource(BaseTransform): + def __init__(self, total: int) -> None: + self.count = 0 + self.total = total + self.num_keys = 1000 + + def on_event(self, _: Record) -> List[Record[SourceValue]]: + + if self.count >= self.total: + print("completed iteration") + raise StopIteration() + + self.count += 1 + key = str(self.count % self.num_keys) + + # sleep to slow down send rate + time.sleep(1) + + # return list of records (wrapped around dataclass) + return [ + Record( + entry=SourceValue( + key=key, + value=self.count, + timestamp=time.time(), + ), + shard_key=key, # key to shard/query + ) + ] + +#write/update +class AddOne(BaseTransform): + def on_event(self, record: Record) -> Record: + record.entry.value += 1 + self.table_state.update(record) + return record + + def on_stop(self, record:Record) -> Record: + print("average update latency:" , self.table_state.times["update"]/self.table_state.counts["update"]) + return record + +class ReadFromDict(BaseTransform): + def __init__(self, total: int) -> None: + self.total = total + + def on_event(self, record: Record) -> Record: + self.table_state.point_query(record.entry.key) + return record + + def on_stop(self, record:Record) -> Record: + print("average query latency:" , self.table_state.times["point_query"]/self.table_state.counts["point_query"]) + return record + +class DeleteFromDict(BaseTransform): + def __init__(self, total: int) -> None: + self.total = total + + def on_event(self, record: Record) -> Record: + self.table_state.delete(record.entry.key) + return record + + def on_stop(self, record:Record) -> Record: + print("average delete latency:" , self.table_state.times["delete"]/self.table_state.counts["delete"]) + return record + +if __name__ == "__main__": + + deploy_mode = "ray" + connector = "dict" + # deploy_mode = "local" + app = RalfApplication(RalfConfig(deploy_mode=deploy_mode)) + + source_ff = app.source( + FakeSource(LOAD), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + ) + + dict_schema = Schema("key", {"key": str, "value": int, "timestamp": float}) + dict_conn = DictConnector() + table_state = TableState(dict_schema, dict_conn) + + addOne_ff = source_ff.transform( + AddOne(), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + table_state=table_state + ) + + read_ff = addOne_ff.transform( + ReadFromDict(LOAD), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + table_state=table_state + ) + + delete_ff = read_ff.transform( + DeleteFromDict(LOAD), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + table_state=table_state + ) + + + app.deploy() + app.wait() diff --git a/ralf/v2/table_state.py b/ralf/v2/table_state.py index f5c7269..b4fbd2c 100644 --- a/ralf/v2/table_state.py +++ b/ralf/v2/table_state.py @@ -34,7 +34,8 @@ def update(self, record: Record): t1 = time.time() self.connector.update(self.schema, record) t2 = time.time() - + print('update') + print("table: ", self.connector.tables) self.times["update"] += (t2-t1) self.counts["update"] += 1 self.num_updates += 1 @@ -43,6 +44,8 @@ def delete(self, key: str): t1 = time.time() self.connector.delete(self.schema, key) t2 = time.time() + print('delete') + print("table: ", self.connector.tables) self.times["delete"] += (t2-t1) self.counts["delete"] += 1 @@ -52,7 +55,9 @@ def point_query(self, key) -> Record: t1 = time.time() val = self.connector.get_one(self.schema, key) t2 = time.time() - + print('read') + print("table: ", self.connector.tables) + self.times["point_query"] += (t2-t1) self.counts["point_query"] += 1 if not val: From d5eae139d4366383b486ff5f968cb974f7f4991f Mon Sep 17 00:00:00 2001 From: Jeffery Cheng Date: Sun, 12 Jun 2022 01:35:21 -0700 Subject: [PATCH 03/14] benchmark tests for connectors --- ralf/v2/api.py | 33 ++++++-- ralf/v2/benchmark/latency.py | 45 ++++++----- ralf/v2/benchmark/throughput.py | 135 ++++++++++++++++++++++++++++++++ ralf/v2/connector.py | 2 +- ralf/v2/dict_connector.py | 2 +- ralf/v2/table_state.py | 2 +- 6 files changed, 191 insertions(+), 28 deletions(-) create mode 100644 ralf/v2/benchmark/throughput.py diff --git a/ralf/v2/api.py b/ralf/v2/api.py index e9e2e56..c80136f 100644 --- a/ralf/v2/api.py +++ b/ralf/v2/api.py @@ -71,12 +71,17 @@ def __repr__(self): return self.__class__.__name__ def get(self, key): - """Get current feature value for key. Returns null by default. + """Get current feature value for key. :param key: key to lookup feature value :type key: str """ - return None + return self.getFF().get(key) + + def getFF(self): + """Get the feature frame transform is being applied to. + """ + return self.feature_frame class FeatureFrame: @@ -92,12 +97,17 @@ def __init__( self.transform_object = transform_object self.scheduler = scheduler self.table_state = table_state + # Adding component's access to each other's states in the feature frame - self.transform_object.table_state = table_state - self.transform_object.scheduler = scheduler - self.scheduler.table_state = table_state + self.transform_object.feature_frame = self + + self.scheduler.feature_frame = self + self.config = operator_config - self.children: List["FeatureFrame"] = [] + + self.children: List["FeatureFrame"] = [] + self.parent: "FeatureFrame" = None + logger.msg( "Created FeatureFrame", transform=transform_object, @@ -118,11 +128,22 @@ def transform( """Apply a transformation to this feature frame, with scheduler.""" frame = FeatureFrame(transform_object, scheduler, table_state, operator_config) self.children.append(frame) + frame.parent = self return frame def __repr__(self) -> str: return f"FeatureFrame({repr(self.transform_object)})" + def get(self, key:str): + return self.table_state.point_query(key) + + def update(self, record:Record): + self.table_state.update(record) + + def delete(self, key:str): + self.table_state.delete(key) + + #TODO: Interface to be able to query feature frame class RalfApplication: """An end to end feature processing pipeline in Ralf.""" diff --git a/ralf/v2/benchmark/latency.py b/ralf/v2/benchmark/latency.py index 432f4ad..6f5c7b3 100644 --- a/ralf/v2/benchmark/latency.py +++ b/ralf/v2/benchmark/latency.py @@ -10,7 +10,9 @@ from ralf.v2 import BaseTransform, RalfApplication, RalfConfig, Record from ralf.v2.operator import OperatorConfig, RayOperatorConfig +from ralf.v2.utils import get_logger +logger = get_logger() @dataclass class SourceValue: @@ -54,11 +56,12 @@ def on_event(self, _: Record) -> List[Record[SourceValue]]: class AddOne(BaseTransform): def on_event(self, record: Record) -> Record: record.entry.value += 1 - self.table_state.update(record) + self.getFF().update(record) return record def on_stop(self, record:Record) -> Record: - print("average update latency:" , self.table_state.times["update"]/self.table_state.counts["update"]) + avg = self.getFF().table_state.times["update"]/self.getFF().table_state.counts["update"] + logger.msg(f"AVERAGE UPDATE LATENCY: {avg}") return record class ReadFromDict(BaseTransform): @@ -66,11 +69,12 @@ def __init__(self, total: int) -> None: self.total = total def on_event(self, record: Record) -> Record: - self.table_state.point_query(record.entry.key) + self.getFF().parent.get(str(int(record.entry.key) - 1)) return record def on_stop(self, record:Record) -> Record: - print("average query latency:" , self.table_state.times["point_query"]/self.table_state.counts["point_query"]) + avg = self.getFF().parent.table_state.times["point_query"]/self.getFF().parent.table_state.counts["point_query"] + logger.msg(f"AVERAGE READ LATENCY: {avg}") return record class DeleteFromDict(BaseTransform): @@ -78,11 +82,12 @@ def __init__(self, total: int) -> None: self.total = total def on_event(self, record: Record) -> Record: - self.table_state.delete(record.entry.key) + self.getFF().delete(record.entry.key) return record def on_stop(self, record:Record) -> Record: - print("average delete latency:" , self.table_state.times["delete"]/self.table_state.counts["delete"]) + avg = self.table_state.times["delete"]/self.table_state.counts["delete"] + logger.msg(f"AVERAGE DELETE LATENCY: {avg}") return record if __name__ == "__main__": @@ -99,16 +104,18 @@ def on_stop(self, record:Record) -> Record: ), ) - dict_schema = Schema("key", {"key": str, "value": int, "timestamp": float}) - dict_conn = DictConnector() - table_state = TableState(dict_schema, dict_conn) - + table_States = [] + for _ in range(3): + dict_schema = Schema("key", {"key": str, "value": int, "timestamp": float}) + dict_conn = DictConnector() + table_States.append(TableState(dict_schema, dict_conn)) + addOne_ff = source_ff.transform( AddOne(), operator_config=OperatorConfig( ray_config=RayOperatorConfig(num_replicas=1), ), - table_state=table_state + table_state=table_States[0] ) read_ff = addOne_ff.transform( @@ -116,16 +123,16 @@ def on_stop(self, record:Record) -> Record: operator_config=OperatorConfig( ray_config=RayOperatorConfig(num_replicas=1), ), - table_state=table_state + table_state=table_States[1] ) - delete_ff = read_ff.transform( - DeleteFromDict(LOAD), - operator_config=OperatorConfig( - ray_config=RayOperatorConfig(num_replicas=1), - ), - table_state=table_state - ) + # delete_ff = read_ff.transform( + # DeleteFromDict(LOAD), + # operator_config=OperatorConfig( + # ray_config=RayOperatorConfig(num_replicas=1), + # ), + # table_state=table_States[2] + # ) app.deploy() diff --git a/ralf/v2/benchmark/throughput.py b/ralf/v2/benchmark/throughput.py new file mode 100644 index 0000000..794937d --- /dev/null +++ b/ralf/v2/benchmark/throughput.py @@ -0,0 +1,135 @@ +import time +from collections import defaultdict +from dataclasses import dataclass +from typing import List + +from nbformat import read +from ralf.v2.dict_connector import DictConnector +from ralf.v2.record import Schema +from ralf.v2.table_state import TableState + +from ralf.v2 import BaseTransform, RalfApplication, RalfConfig, Record +from ralf.v2.operator import OperatorConfig, RayOperatorConfig + + +@dataclass +class SourceValue: + key: str + value: int + timestamp: float + +LOAD = 100 + +class FakeSource(BaseTransform): + def __init__(self, total: int) -> None: + self.count = 0 + self.total = total + self.num_keys = 1000 + + def on_event(self, _: Record) -> List[Record[SourceValue]]: + + if self.count >= self.total: + print("completed iteration") + raise StopIteration() + + self.count += 1 + key = str(self.count % self.num_keys) + + # sleep to slow down send rate + time.sleep(1) + + # return list of records (wrapped around dataclass) + return [ + Record( + entry=SourceValue( + key=key, + value=self.count, + timestamp=time.time(), + ), + shard_key=key, # key to shard/query + ) + ] + + +class AddOne(BaseTransform): + def on_event(self, record: Record) -> None: + record.entry.value += 1 + self.table_state.update(record) + return None + + def on_stop(self, record:Record) -> Record: + print("average update throughput:" , self.table_state.counts["update"]/self.table_state.times["update"]) + return record + +class ReadFromDict(BaseTransform): + def __init__(self, total: int) -> None: + self.curr_key = 0 + self.total = total + + def on_event(self, record: Record) -> None: + self.table_state.point_query(self.curr_key) + self.curr_key += 1 + + def on_stop(self, record:Record) -> Record: + print("average query latency:" , self.table_state.counts["point_query"]/self.table_state.times["point_query"]) + return record + +class DeleteFromDict(BaseTransform): + def __init__(self, total: int) -> None: + self.curr_key = 0 + self.total = total + + def on_event(self, record: Record) -> None: + self.table_state.delete(self.curr_key) + self.curr_key += 1 + return None + + def on_stop(self, record:Record) -> Record: + print("average delete latency:" , self.table_state.counts["delete"]/self.table_state.times["delete"]) + return record + +if __name__ == "__main__": + + deploy_mode = "ray" + connector = "dict" + # deploy_mode = "local" + app = RalfApplication(RalfConfig(deploy_mode=deploy_mode)) + + source_ff = app.source( + FakeSource(LOAD), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + ) + + dict_schema = Schema("key", {"key": str, "value": int, "timestamp": float}) + dict_conn = DictConnector() + table_state = TableState(dict_schema, dict_conn) + + addOne_ff = source_ff.transform( + AddOne(), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + table_state=table_state + ) + + read_ff = addOne_ff.transform( + ReadFromDict(LOAD), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + table_state=table_state + ) + + delete_ff = read_ff.transform( + DeleteFromDict(LOAD), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + table_state=table_state + ) + + + app.deploy() + app.wait() diff --git a/ralf/v2/connector.py b/ralf/v2/connector.py index 8a3a152..4878b5b 100644 --- a/ralf/v2/connector.py +++ b/ralf/v2/connector.py @@ -18,7 +18,7 @@ def delete(self, schema: Schema, key: str): pass @abstractmethod - def get_one(self, schema: Schema, key: str) -> Union[Record, None]: + def get(self, schema: Schema, key: str) -> Union[Record, None]: pass @abstractmethod diff --git a/ralf/v2/dict_connector.py b/ralf/v2/dict_connector.py index 269dccf..33e6586 100644 --- a/ralf/v2/dict_connector.py +++ b/ralf/v2/dict_connector.py @@ -24,7 +24,7 @@ def delete(self, schema: Schema, key: str): if key in records: records.pop(key, None) - def get_one(self, schema: Schema, key) -> Union[Record, None]: + def get(self, schema: Schema, key) -> Union[Record, None]: records = self.get_records(schema) if key in records: return records[key] diff --git a/ralf/v2/table_state.py b/ralf/v2/table_state.py index b4fbd2c..27e436f 100644 --- a/ralf/v2/table_state.py +++ b/ralf/v2/table_state.py @@ -53,7 +53,7 @@ def delete(self, key: str): def point_query(self, key) -> Record: t1 = time.time() - val = self.connector.get_one(self.schema, key) + val = self.connector.get(self.schema, key) t2 = time.time() print('read') print("table: ", self.connector.tables) From f079e432bc34266b155265a1cc1150886b90fb55 Mon Sep 17 00:00:00 2001 From: Jeffery Cheng Date: Sun, 12 Jun 2022 01:37:59 -0700 Subject: [PATCH 04/14] revert changes made to non v2 files --- ralf/state/table_state.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ralf/state/table_state.py b/ralf/state/table_state.py index 9061679..8a03504 100644 --- a/ralf/state/table_state.py +++ b/ralf/state/table_state.py @@ -1,7 +1,7 @@ from typing import List -from ralf.v2.record import Record, Schema -from ralf.v2.connector import Connector +from ralf.record import Record, Schema +from ralf.state.connector import Connector # Maintains table values From c40bcad5411fb874bcd07e5826d9c87a52b6b354 Mon Sep 17 00:00:00 2001 From: Jeffery Cheng Date: Sun, 12 Jun 2022 14:41:33 -0700 Subject: [PATCH 05/14] latency test completed for all three modes, generates test results --- ralf/v2/api.py | 34 ++++ ralf/v2/benchmark/benchmark.py | 153 ++++++++++++++++++ ralf/v2/benchmark/latency.py | 139 ---------------- .../benchmark/results/1655069767.430462.txt | 6 + ralf/v2/benchmark/throughput.py | 135 ---------------- ralf/v2/examples/counter.py | 18 ++- ralf/v2/operator.py | 7 +- ralf/v2/table_state.py | 5 +- 8 files changed, 210 insertions(+), 287 deletions(-) create mode 100644 ralf/v2/benchmark/benchmark.py delete mode 100644 ralf/v2/benchmark/latency.py create mode 100644 ralf/v2/benchmark/results/1655069767.430462.txt delete mode 100644 ralf/v2/benchmark/throughput.py diff --git a/ralf/v2/api.py b/ralf/v2/api.py index 9895b62..a3d9da8 100644 --- a/ralf/v2/api.py +++ b/ralf/v2/api.py @@ -67,6 +67,17 @@ def on_event(self, record: Record) -> Union[None, Record, Iterable[Record]]: """ raise NotImplementedError("To be implemented by subclass.") + def on_stop(self, record: Record) -> Union[None, Record, Iterable[Record]]: + """ + Function called when a Stop Iteration event is received. Executes this on_stop method. + + A StopIteration exception is raised after, and the transform process will terminate. + A StopIteration special record will still be propogated to downstream feature frames. + + This function should be expected to be called once. + """ + pass + def prepare(self): pass @@ -86,6 +97,20 @@ def getFF(self): """ return self.feature_frame + def get(self, key:str): + return self.getFF().get(key) + + def update(self, record:Record): + self.getFF().update(record) + + def delete(self, key:str): + self.getFF().delete(key) + + def get_all(self): + return self.getFF().get_all() + + def get_schema(self): + return self.getFF().get_schema() class FeatureFrame: """Encapsulate a feature transformation and its related policies configuration.""" @@ -145,6 +170,15 @@ def update(self, record:Record): def delete(self, key:str): self.table_state.delete(key) + def get_all(self): + return self.table_state.bulk_query() + + def get_schema(self): + return self.table_state.get_schema + + def prepare(self): + if self.table_state: + self.table_state.prepare() #TODO: Interface to be able to query feature frame class RalfApplication: diff --git a/ralf/v2/benchmark/benchmark.py b/ralf/v2/benchmark/benchmark.py new file mode 100644 index 0000000..8cc6bde --- /dev/null +++ b/ralf/v2/benchmark/benchmark.py @@ -0,0 +1,153 @@ +import time +from collections import defaultdict +from dataclasses import dataclass +from typing import List + +from nbformat import read +from ralf.v2.connectors.dict_connector import DictConnector +from ralf.v2.connectors.redis_connector import RedisConnector +from ralf.v2.connectors.sqlite3_connector import SQLConnector +from ralf.v2.record import Schema +from ralf.v2.table_state import TableState + +from ralf.v2 import BaseTransform, RalfApplication, RalfConfig, Record +from ralf.v2.operator import OperatorConfig, RayOperatorConfig +from ralf.v2.utils import get_logger + +logger = get_logger() + +@dataclass +class SumValue: + key: str + value: int + +@dataclass +class SourceValue: + key: str + value: int + timestamp: float + +#Number of records we're processing +TEST_SIZE = 50 + +latencies = defaultdict(dict) +throughputs = defaultdict(dict) + +#functions that create the connectors depending on specified connector mode +existing_connectors = { + "dict": DictConnector, + "redis": lambda: RedisConnector(host='127.0.0.1', port='6379', password=''), + "sqlite": lambda: SQLConnector(dbname="key.db") +} + +class FakeSource(BaseTransform): + def __init__(self, total: int) -> None: + self.count = 0 + self.total = total + self.num_keys = 1000 + + def on_event(self, _: Record) -> List[Record[SourceValue]]: + + if self.count >= self.total: + print("completed iteration") + raise StopIteration() + + self.count += 1 + key = str(self.count % self.num_keys) + + # sleep to slow down send rate + time.sleep(1) + + # return list of records (wrapped around dataclass) + return [ + Record( + entry=SourceValue( + key=key, + value=self.count, + timestamp=time.time(), + ), + shard_key=key, # key to shard/query + ) + ] + +#write/update +class UpdateReadDelete(BaseTransform): + def __init__(self, connector_type): + self.connector_type = connector_type + + def on_event(self, record: Record) -> Record: + record.entry.value += 1 + + self.update(record) + self.get(record.entry.key) + self.delete(record.entry.key) + + return record + + def on_stop(self, record:Record) -> Record: + update_latency = self.getFF().table_state.times["update"]/self.getFF().table_state.counts["update"] + read_latency = self.getFF().table_state.times["point_query"]/self.getFF().table_state.counts["point_query"] + delete_latency = self.getFF().table_state.times["delete"]/self.getFF().table_state.counts["delete"] + + update_throughput = self.getFF().table_state.counts["update"]/self.getFF().table_state.times["update"] + read_throughput = self.getFF().table_state.counts["point_query"]/self.getFF().table_state.times["point_query"] + delete_throughput = self.getFF().table_state.counts["delete"]/self.getFF().table_state.times["delete"] + + logger.msg(f"AVERAGE UPDATE LATENCY: {update_latency * 1000} ms per update") + logger.msg(f"AVERAGE READ LATENCY: {read_latency * 1000} ms per read") + logger.msg(f"AVERAGE DELETE LATENCY: {delete_latency * 1000} ms per delete") + + logger.msg(f"AVERAGE UPDATE LATENCY: {update_throughput} updates per second") + logger.msg(f"AVERAGE READ LATENCY: {read_throughput} reads per second") + logger.msg(f"AVERAGE DELETE LATENCY: {delete_throughput} deletes per second") + + latencies[self.connector_type]["update"] = update_latency + latencies[self.connector_type]["read"] = read_latency + latencies[self.connector_type]["delete"] = delete_latency + + throughputs[self.connector_type]["update"] = update_throughput + throughputs[self.connector_type]["read"] = read_throughput + throughputs[self.connector_type]["delete"] = delete_throughput + + f = open(f"benchmark/results/{time.time()}.txt", "a") + results = "mode: ray\nconnector_mode: dict\nlatencies:\n" + for k,v in latencies.items(): + results += (f"{k}: {v}\n") + results += "throughputs:\n" + for k,v in throughputs.items(): + results += (f"{k}: {v}\n") + f.write(results) + f.close() + + return record + +if __name__ == "__main__": + + deploy_mode = "ray" + connector_mode = "dict" + # deploy_mode = "local" + app = RalfApplication(RalfConfig(deploy_mode=deploy_mode)) + + source_ff = app.source( + FakeSource(TEST_SIZE), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + ) + + table_States = [] + for _ in range(3): + dict_schema = Schema("key", {"key": str, "value": int, "timestamp": float}) + dict_conn = existing_connectors[connector_mode]() + table_States.append(TableState(dict_schema, dict_conn, dataclass)) + + updateReadDelete_ff = source_ff.transform( + UpdateReadDelete(connector_mode), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + table_state=table_States[0] + ) + + app.deploy() + app.wait() diff --git a/ralf/v2/benchmark/latency.py b/ralf/v2/benchmark/latency.py deleted file mode 100644 index 6f5c7b3..0000000 --- a/ralf/v2/benchmark/latency.py +++ /dev/null @@ -1,139 +0,0 @@ -import time -from collections import defaultdict -from dataclasses import dataclass -from typing import List - -from nbformat import read -from ralf.v2.dict_connector import DictConnector -from ralf.v2.record import Schema -from ralf.v2.table_state import TableState - -from ralf.v2 import BaseTransform, RalfApplication, RalfConfig, Record -from ralf.v2.operator import OperatorConfig, RayOperatorConfig -from ralf.v2.utils import get_logger - -logger = get_logger() - -@dataclass -class SourceValue: - key: str - value: int - timestamp: float - -LOAD = 10 - -class FakeSource(BaseTransform): - def __init__(self, total: int) -> None: - self.count = 0 - self.total = total - self.num_keys = 1000 - - def on_event(self, _: Record) -> List[Record[SourceValue]]: - - if self.count >= self.total: - print("completed iteration") - raise StopIteration() - - self.count += 1 - key = str(self.count % self.num_keys) - - # sleep to slow down send rate - time.sleep(1) - - # return list of records (wrapped around dataclass) - return [ - Record( - entry=SourceValue( - key=key, - value=self.count, - timestamp=time.time(), - ), - shard_key=key, # key to shard/query - ) - ] - -#write/update -class AddOne(BaseTransform): - def on_event(self, record: Record) -> Record: - record.entry.value += 1 - self.getFF().update(record) - return record - - def on_stop(self, record:Record) -> Record: - avg = self.getFF().table_state.times["update"]/self.getFF().table_state.counts["update"] - logger.msg(f"AVERAGE UPDATE LATENCY: {avg}") - return record - -class ReadFromDict(BaseTransform): - def __init__(self, total: int) -> None: - self.total = total - - def on_event(self, record: Record) -> Record: - self.getFF().parent.get(str(int(record.entry.key) - 1)) - return record - - def on_stop(self, record:Record) -> Record: - avg = self.getFF().parent.table_state.times["point_query"]/self.getFF().parent.table_state.counts["point_query"] - logger.msg(f"AVERAGE READ LATENCY: {avg}") - return record - -class DeleteFromDict(BaseTransform): - def __init__(self, total: int) -> None: - self.total = total - - def on_event(self, record: Record) -> Record: - self.getFF().delete(record.entry.key) - return record - - def on_stop(self, record:Record) -> Record: - avg = self.table_state.times["delete"]/self.table_state.counts["delete"] - logger.msg(f"AVERAGE DELETE LATENCY: {avg}") - return record - -if __name__ == "__main__": - - deploy_mode = "ray" - connector = "dict" - # deploy_mode = "local" - app = RalfApplication(RalfConfig(deploy_mode=deploy_mode)) - - source_ff = app.source( - FakeSource(LOAD), - operator_config=OperatorConfig( - ray_config=RayOperatorConfig(num_replicas=1), - ), - ) - - table_States = [] - for _ in range(3): - dict_schema = Schema("key", {"key": str, "value": int, "timestamp": float}) - dict_conn = DictConnector() - table_States.append(TableState(dict_schema, dict_conn)) - - addOne_ff = source_ff.transform( - AddOne(), - operator_config=OperatorConfig( - ray_config=RayOperatorConfig(num_replicas=1), - ), - table_state=table_States[0] - ) - - read_ff = addOne_ff.transform( - ReadFromDict(LOAD), - operator_config=OperatorConfig( - ray_config=RayOperatorConfig(num_replicas=1), - ), - table_state=table_States[1] - ) - - # delete_ff = read_ff.transform( - # DeleteFromDict(LOAD), - # operator_config=OperatorConfig( - # ray_config=RayOperatorConfig(num_replicas=1), - # ), - # table_state=table_States[2] - # ) - - - app.deploy() - app.wait() diff --git a/ralf/v2/benchmark/results/1655069767.430462.txt b/ralf/v2/benchmark/results/1655069767.430462.txt new file mode 100644 index 0000000..d65134a --- /dev/null +++ b/ralf/v2/benchmark/results/1655069767.430462.txt @@ -0,0 +1,6 @@ +mode: ray +connector_mode: dict +latencies: +dict: {'update': 1.1305809020996094e-05, 'read': 4.425048828125e-06, 'delete': 3.604888916015625e-06} +throughputs: +dict: {'update': 88450.1054407423, 'read': 225986.2068965517, 'delete': 277401.0582010582} diff --git a/ralf/v2/benchmark/throughput.py b/ralf/v2/benchmark/throughput.py deleted file mode 100644 index 794937d..0000000 --- a/ralf/v2/benchmark/throughput.py +++ /dev/null @@ -1,135 +0,0 @@ -import time -from collections import defaultdict -from dataclasses import dataclass -from typing import List - -from nbformat import read -from ralf.v2.dict_connector import DictConnector -from ralf.v2.record import Schema -from ralf.v2.table_state import TableState - -from ralf.v2 import BaseTransform, RalfApplication, RalfConfig, Record -from ralf.v2.operator import OperatorConfig, RayOperatorConfig - - -@dataclass -class SourceValue: - key: str - value: int - timestamp: float - -LOAD = 100 - -class FakeSource(BaseTransform): - def __init__(self, total: int) -> None: - self.count = 0 - self.total = total - self.num_keys = 1000 - - def on_event(self, _: Record) -> List[Record[SourceValue]]: - - if self.count >= self.total: - print("completed iteration") - raise StopIteration() - - self.count += 1 - key = str(self.count % self.num_keys) - - # sleep to slow down send rate - time.sleep(1) - - # return list of records (wrapped around dataclass) - return [ - Record( - entry=SourceValue( - key=key, - value=self.count, - timestamp=time.time(), - ), - shard_key=key, # key to shard/query - ) - ] - - -class AddOne(BaseTransform): - def on_event(self, record: Record) -> None: - record.entry.value += 1 - self.table_state.update(record) - return None - - def on_stop(self, record:Record) -> Record: - print("average update throughput:" , self.table_state.counts["update"]/self.table_state.times["update"]) - return record - -class ReadFromDict(BaseTransform): - def __init__(self, total: int) -> None: - self.curr_key = 0 - self.total = total - - def on_event(self, record: Record) -> None: - self.table_state.point_query(self.curr_key) - self.curr_key += 1 - - def on_stop(self, record:Record) -> Record: - print("average query latency:" , self.table_state.counts["point_query"]/self.table_state.times["point_query"]) - return record - -class DeleteFromDict(BaseTransform): - def __init__(self, total: int) -> None: - self.curr_key = 0 - self.total = total - - def on_event(self, record: Record) -> None: - self.table_state.delete(self.curr_key) - self.curr_key += 1 - return None - - def on_stop(self, record:Record) -> Record: - print("average delete latency:" , self.table_state.counts["delete"]/self.table_state.times["delete"]) - return record - -if __name__ == "__main__": - - deploy_mode = "ray" - connector = "dict" - # deploy_mode = "local" - app = RalfApplication(RalfConfig(deploy_mode=deploy_mode)) - - source_ff = app.source( - FakeSource(LOAD), - operator_config=OperatorConfig( - ray_config=RayOperatorConfig(num_replicas=1), - ), - ) - - dict_schema = Schema("key", {"key": str, "value": int, "timestamp": float}) - dict_conn = DictConnector() - table_state = TableState(dict_schema, dict_conn) - - addOne_ff = source_ff.transform( - AddOne(), - operator_config=OperatorConfig( - ray_config=RayOperatorConfig(num_replicas=1), - ), - table_state=table_state - ) - - read_ff = addOne_ff.transform( - ReadFromDict(LOAD), - operator_config=OperatorConfig( - ray_config=RayOperatorConfig(num_replicas=1), - ), - table_state=table_state - ) - - delete_ff = read_ff.transform( - DeleteFromDict(LOAD), - operator_config=OperatorConfig( - ray_config=RayOperatorConfig(num_replicas=1), - ), - table_state=table_state - ) - - - app.deploy() - app.wait() diff --git a/ralf/v2/examples/counter.py b/ralf/v2/examples/counter.py index 3551b9d..424a1b2 100644 --- a/ralf/v2/examples/counter.py +++ b/ralf/v2/examples/counter.py @@ -8,6 +8,7 @@ from ralf.v2 import BaseTransform, RalfApplication, RalfConfig, Record from ralf.v2.operator import OperatorConfig, RayOperatorConfig +from ralf.v2.utils import get_logger @dataclass @@ -22,6 +23,7 @@ class SumValue: key: str value: int +logger = get_logger() class FakeSource(BaseTransform): def __init__(self, total: int) -> None: @@ -68,20 +70,20 @@ def on_event(self, record: Record) -> Record[SumValue]: class UpdateDict(BaseTransform): def __init__(self): self.count = 0 - self.table_state = self.getFF().table_state def on_event(self, record: Record) -> None: - print("single update table", self.table_state.connector.tables) - self.table_state.update(record) + self.update(record) return None + def on_stop(self, record: Record): + logger.msg(self.get_all()) + class BatchUpdate(BaseTransform): def __init__(self, batch_size: int): self.batch_size = batch_size self.count = 0 self.records = [] - self.table_state = self.getFF().table_state - + def on_event(self, record: Record) -> None: self.records.append(record) self.count += 1 @@ -90,11 +92,13 @@ def on_event(self, record: Record) -> None: self.count = 0 for r in self.records: print(f"batch update, processing {r}") - self.table_state.update(r) + self.update(r) self.records = [] - print("batch table", self.table_state.connector.tables) return None + + def on_stop(self, record: Record): + logger.msg(self.get_all()) if __name__ == "__main__": diff --git a/ralf/v2/operator.py b/ralf/v2/operator.py index 90487cf..716ba4c 100644 --- a/ralf/v2/operator.py +++ b/ralf/v2/operator.py @@ -78,7 +78,7 @@ def __init__( ) logger.msg("Preparing transform object") - + self.frame.prepare() self.worker_thread = Thread(target=self._run_forever, daemon=True) self.worker_thread.start() @@ -89,10 +89,7 @@ def _run_forever(self): thread="worker", ) - if hasattr(self.transform_object, "table_state"): - self.transform_object.table_state.connector.prepare() - self.transform_object.prepare() - + # self.frame.prepare() db_path = f"{self.config.metrics_dir}/{str(self.frame.transform_object)}_{self.context['shard_idx']}.db" metrics_connection = event_metrics.MetricConnection( diff --git a/ralf/v2/table_state.py b/ralf/v2/table_state.py index e693f2c..a639dff 100644 --- a/ralf/v2/table_state.py +++ b/ralf/v2/table_state.py @@ -44,7 +44,7 @@ def delete(self, key: str): def point_query(self, key) -> Record: t1 = time.time() - val = self.connector.get_one(self.schema, key, self.dataclass) + val = self.connector.get(self.schema, key, self.dataclass) t2 = time.time() if not val: @@ -61,3 +61,6 @@ def get_schema(self) -> Schema: def recordQuery(self, queryType: String, timeTaken): self.times[queryType] += timeTaken self.counts[queryType] += 1 + + def prepare(self): + self.connector.prepare() From 7d55a6184d1e7f14541a3f17bc19da963100e548 Mon Sep 17 00:00:00 2001 From: Jeffery Cheng Date: Sun, 12 Jun 2022 14:45:46 -0700 Subject: [PATCH 06/14] lint tests --- ralf/v2/api.py | 19 +++++------ ralf/v2/scheduler.py | 77 -------------------------------------------- 2 files changed, 8 insertions(+), 88 deletions(-) diff --git a/ralf/v2/api.py b/ralf/v2/api.py index a3d9da8..81e861f 100644 --- a/ralf/v2/api.py +++ b/ralf/v2/api.py @@ -78,12 +78,17 @@ def on_stop(self, record: Record) -> Union[None, Record, Iterable[Record]]: """ pass - def prepare(self): - pass - def __repr__(self): return self.__class__.__name__ + def getFF(self): + """Get the feature frame transform is being applied to. + """ + return self.feature_frame + + def prepare(self): + pass + def get(self, key): """Get current feature value for key. @@ -92,14 +97,6 @@ def get(self, key): """ return self.getFF().get(key) - def getFF(self): - """Get the feature frame transform is being applied to. - """ - return self.feature_frame - - def get(self, key:str): - return self.getFF().get(key) - def update(self, record:Record): self.getFF().update(record) diff --git a/ralf/v2/scheduler.py b/ralf/v2/scheduler.py index e952467..dfbd996 100644 --- a/ralf/v2/scheduler.py +++ b/ralf/v2/scheduler.py @@ -117,83 +117,6 @@ def pop_event(self) -> Record: def qsize(self) -> int: return len(self.queue) - -@total_ordering -class KeyCount: - def __init__(self, key, num_processed, record): - self.key = key - self.records = [record] - self.num_processed = num_processed - - def process(self): - self.num_processed += 1 - return self.records.pop() - - def add_record(self, record): - self.records.append(record) - - def __eq__(self, other): - return self.key == other.key and self.num_processed == other.num_processed - - def __lt__(self, other): - return ( - int(len(self.records) == 0) * 1000000000 + self.num_processed - < other.num_processed + int(len(other.records) == 0) * 1000000000 - ) - - def __gt__(self, other): - return ( - int(len(self.records) == 0) * 1000000000 + self.num_processed - > other.num_processed + int(len(other.records) == 0) * 1000000000 - ) - - def __repr__(self): - return f"KeyCount(key : {self.key}, num_processed : {self.num_processed}, # of records : {len(self.records)}" - - -class LeastUpdate(BaseScheduler): - def __init__(self) -> None: - self.seen_keys = dict() - self.queue: List[KeyCount] = [] - self.waker: Optional[threading.Event] = None - self.num_unprocessed = 0 - - def push_event(self, record: Record): - if record.is_stop_iteration(): - kc = KeyCount("stop", 0, record) - self.queue.insert(0, kc) - return - key = record.entry.key - if key not in self.seen_keys: - kc = KeyCount(key, 0, record) - self.seen_keys[key] = kc - heapq.heappush(self.queue, kc) - else: - kc = self.seen_keys[key] - kc.add_record(record) - - heapq.heapify(self.queue) - # print(self.queue[0]) - if len(self.queue[0].records) == 0: - print( - self.num_unprocessed, - sum([len(i.records) for i in self.queue]), - [[len(i.records), i.key, i.num_processed] for i in self.queue], - ) - self.wake_waiter_if_needed() - - def pop_event(self) -> Record: - if not self.queue or len(self.queue[0].records) == 0: - return Record.make_wait_event(self.new_waker()) - least_updated = heapq.heappop(self.queue) - if len(least_updated.records) == 0: - return Record.make_wait_event(self.new_waker()) - record = least_updated.process() - # self.num_unprocessed -= 1 - heapq.heappush(self.queue, least_updated) - return record - - @total_ordering class KeyCount: def __init__(self, key, num_processed, record): From 6d9a405a7b39898eaca1b2e2841bc3686c923379 Mon Sep 17 00:00:00 2001 From: Jeffery Cheng Date: Sun, 12 Jun 2022 15:00:00 -0700 Subject: [PATCH 07/14] added queue size to LeastUpdate scheduler to pass scheduler_test --- ralf/v2/scheduler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ralf/v2/scheduler.py b/ralf/v2/scheduler.py index dfbd996..b369baf 100644 --- a/ralf/v2/scheduler.py +++ b/ralf/v2/scheduler.py @@ -192,6 +192,8 @@ def pop_event(self) -> Record: heapq.heappush(self.queue, least_updated) return record + def qsize(self) -> int: + return len(self.queue) @dataclass class DummyEntry: From c5ace3da893019ebb5c9dde2cc45c330fd82121b Mon Sep 17 00:00:00 2001 From: Jeffery Cheng Date: Sun, 12 Jun 2022 15:10:41 -0700 Subject: [PATCH 08/14] Changed simpy implementations of operators to pass test_api simpy_lifo and simple_lifo tests --- ralf/v2/operator.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ralf/v2/operator.py b/ralf/v2/operator.py index 716ba4c..ed66481 100644 --- a/ralf/v2/operator.py +++ b/ralf/v2/operator.py @@ -310,7 +310,9 @@ def enqueue_events(self, records: List[Record]): def dump_transform_state(self): return self.recordings - + + def get(self, key): + pass class SimpyOperator(RalfOperator): """Skip running transform, but record the event ordering""" @@ -377,6 +379,8 @@ def enqueue_events(self, records: List[Record]): for record in records: self.scheduler.push_event(record) + def get(self, key): + return self.transform_object.get(key) @dataclass class RayOperatorConfig: From a8237c00134c6b567c8827a007ef688fef270463 Mon Sep 17 00:00:00 2001 From: Jeffery Cheng Date: Tue, 21 Jun 2022 03:10:11 -0700 Subject: [PATCH 09/14] resolved sqlite thread error, all benchmark tests working now --- .gitignore | 2 +- ralf/v2/benchmark/benchmark.py | 109 ++++++++++-------- .../benchmark/results/1655069767.430462.txt | 6 - ralf/v2/connectors/sqlite3_connector.py | 6 +- ralf/v2/operator.py | 4 +- 5 files changed, 66 insertions(+), 61 deletions(-) delete mode 100644 ralf/v2/benchmark/results/1655069767.430462.txt diff --git a/.gitignore b/.gitignore index 71c6c94..3e2cc13 100644 --- a/.gitignore +++ b/.gitignore @@ -53,7 +53,7 @@ coverage.xml *.py,cover .hypothesis/ .pytest_cache/ - +ralf/v2/benchmark/results/*.txt # Translations *.mo *.pot diff --git a/ralf/v2/benchmark/benchmark.py b/ralf/v2/benchmark/benchmark.py index 8cc6bde..15c2839 100644 --- a/ralf/v2/benchmark/benchmark.py +++ b/ralf/v2/benchmark/benchmark.py @@ -1,9 +1,12 @@ import time from collections import defaultdict -from dataclasses import dataclass +from dataclasses import dataclass, make_dataclass from typing import List +import os +from matplotlib.pyplot import table +import redis + -from nbformat import read from ralf.v2.connectors.dict_connector import DictConnector from ralf.v2.connectors.redis_connector import RedisConnector from ralf.v2.connectors.sqlite3_connector import SQLConnector @@ -14,12 +17,9 @@ from ralf.v2.operator import OperatorConfig, RayOperatorConfig from ralf.v2.utils import get_logger -logger = get_logger() +IntValue = make_dataclass("IntValue", ["value"]) -@dataclass -class SumValue: - key: str - value: int +logger = get_logger() @dataclass class SourceValue: @@ -28,7 +28,10 @@ class SourceValue: timestamp: float #Number of records we're processing -TEST_SIZE = 50 +TEST_SIZE = 5 +deploy_mode = "ray" +test_config = f"DEPLOY MODE: {deploy_mode}\nTEST SIZE: {TEST_SIZE}\n\n" +result_path = f"benchmark/results/size_{TEST_SIZE}_{time.time()}.txt" latencies = defaultdict(dict) throughputs = defaultdict(dict) @@ -78,9 +81,9 @@ def __init__(self, connector_type): def on_event(self, record: Record) -> Record: record.entry.value += 1 - self.update(record) - self.get(record.entry.key) - self.delete(record.entry.key) + self.getFF().table_state.update(record) + self.getFF().table_state.point_query(record.entry.key) + self.getFF().table_state.delete(record.entry.key) return record @@ -101,53 +104,59 @@ def on_stop(self, record:Record) -> Record: logger.msg(f"AVERAGE READ LATENCY: {read_throughput} reads per second") logger.msg(f"AVERAGE DELETE LATENCY: {delete_throughput} deletes per second") - latencies[self.connector_type]["update"] = update_latency - latencies[self.connector_type]["read"] = read_latency - latencies[self.connector_type]["delete"] = delete_latency + latencies[self.connector_type]["update"] = update_latency * 1000 + latencies[self.connector_type]["read"] = read_latency * 1000 + latencies[self.connector_type]["delete"] = delete_latency * 1000 throughputs[self.connector_type]["update"] = update_throughput throughputs[self.connector_type]["read"] = read_throughput throughputs[self.connector_type]["delete"] = delete_throughput - f = open(f"benchmark/results/{time.time()}.txt", "a") - results = "mode: ray\nconnector_mode: dict\nlatencies:\n" - for k,v in latencies.items(): + results = "-"*50 + f"\nCONNECTOR_MODE: {self.connector_type}\nLATENCIES (ms per action):\n" + for k,v in latencies[self.connector_type].items(): results += (f"{k}: {v}\n") - results += "throughputs:\n" - for k,v in throughputs.items(): + results += "THROUGHPUTS(number of actions per second):\n" + for k,v in throughputs[self.connector_type].items(): results += (f"{k}: {v}\n") - f.write(results) - f.close() + record_benchmark_results(results, result_path) return record +def record_benchmark_results(results, path): + f = open(path, "a") + f.write(results) + f.close() -if __name__ == "__main__": +def flush_testing_env(): + if os.path.exists("key.db"): + os.remove("key.db") + r = redis.Redis() + r.flushdb() - deploy_mode = "ray" - connector_mode = "dict" - # deploy_mode = "local" - app = RalfApplication(RalfConfig(deploy_mode=deploy_mode)) - - source_ff = app.source( - FakeSource(TEST_SIZE), - operator_config=OperatorConfig( - ray_config=RayOperatorConfig(num_replicas=1), - ), - ) - - table_States = [] - for _ in range(3): - dict_schema = Schema("key", {"key": str, "value": int, "timestamp": float}) - dict_conn = existing_connectors[connector_mode]() - table_States.append(TableState(dict_schema, dict_conn, dataclass)) - - updateReadDelete_ff = source_ff.transform( - UpdateReadDelete(connector_mode), - operator_config=OperatorConfig( - ray_config=RayOperatorConfig(num_replicas=1), - ), - table_state=table_States[0] - ) - - app.deploy() - app.wait() +if __name__ == "__main__": + record_benchmark_results(test_config, result_path) + for connector_mode in existing_connectors: + flush_testing_env() + app = RalfApplication(RalfConfig(deploy_mode=deploy_mode)) + + source_ff = app.source( + FakeSource(TEST_SIZE), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + ) + + schema = Schema("key", {"key": str, "value": int, "timestamp": float}) + conn = existing_connectors[connector_mode]() + table_state = TableState(schema, conn, SourceValue) + + updateReadDelete_ff = source_ff.transform( + UpdateReadDelete(connector_mode), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + table_state = table_state + ) + + app.deploy() + app.wait() + \ No newline at end of file diff --git a/ralf/v2/benchmark/results/1655069767.430462.txt b/ralf/v2/benchmark/results/1655069767.430462.txt deleted file mode 100644 index d65134a..0000000 --- a/ralf/v2/benchmark/results/1655069767.430462.txt +++ /dev/null @@ -1,6 +0,0 @@ -mode: ray -connector_mode: dict -latencies: -dict: {'update': 1.1305809020996094e-05, 'read': 4.425048828125e-06, 'delete': 3.604888916015625e-06} -throughputs: -dict: {'update': 88450.1054407423, 'read': 225986.2068965517, 'delete': 277401.0582010582} diff --git a/ralf/v2/connectors/sqlite3_connector.py b/ralf/v2/connectors/sqlite3_connector.py index 835eb40..4a4286a 100644 --- a/ralf/v2/connectors/sqlite3_connector.py +++ b/ralf/v2/connectors/sqlite3_connector.py @@ -25,7 +25,7 @@ def get_conn(self): class SQLConnector(Connector): def __init__(self, dbname: str): self.dbname = dbname - + def create_connection(self): # This function will return a new connection # if the connector is such that it cannot be shared @@ -38,7 +38,7 @@ def add_table(self, schema: Schema): if hasattr(self, "conn"): conn = self.conn else: - conn = sqlite3.connect(self.dbname) + conn = sqlite3.connect(self.dbname, check_same_thread=False) table_name = schema.get_name() curr = conn.cursor() if schema.columns[schema.primary_key] not in sql_types: @@ -95,4 +95,4 @@ def count(self, schema: Schema) -> int: return count def prepare(self): - self.conn = sqlite3.connect(self.dbname) \ No newline at end of file + self.conn = sqlite3.connect(self.dbname, check_same_thread=False) \ No newline at end of file diff --git a/ralf/v2/operator.py b/ralf/v2/operator.py index ed66481..40a8db5 100644 --- a/ralf/v2/operator.py +++ b/ralf/v2/operator.py @@ -89,7 +89,9 @@ def _run_forever(self): thread="worker", ) - # self.frame.prepare() + if hasattr(self.transform_object, "table_state"): + self.transform_object.table_state.connector.prepare() + self.transform_object.prepare() db_path = f"{self.config.metrics_dir}/{str(self.frame.transform_object)}_{self.context['shard_idx']}.db" metrics_connection = event_metrics.MetricConnection( From 7c225e0a73538c23705dd52e980231808184ab3d Mon Sep 17 00:00:00 2001 From: Jeffery Cheng Date: Tue, 21 Jun 2022 03:25:08 -0700 Subject: [PATCH 10/14] testing for multiple test sizes --- ralf/v2/benchmark/benchmark.py | 66 +++++++++++++++++----------------- 1 file changed, 34 insertions(+), 32 deletions(-) diff --git a/ralf/v2/benchmark/benchmark.py b/ralf/v2/benchmark/benchmark.py index 15c2839..d35e399 100644 --- a/ralf/v2/benchmark/benchmark.py +++ b/ralf/v2/benchmark/benchmark.py @@ -3,10 +3,10 @@ from dataclasses import dataclass, make_dataclass from typing import List import os +from unittest import result from matplotlib.pyplot import table import redis - from ralf.v2.connectors.dict_connector import DictConnector from ralf.v2.connectors.redis_connector import RedisConnector from ralf.v2.connectors.sqlite3_connector import SQLConnector @@ -28,10 +28,10 @@ class SourceValue: timestamp: float #Number of records we're processing -TEST_SIZE = 5 +TEST_SIZES = [1000, 100000, 10000000] deploy_mode = "ray" -test_config = f"DEPLOY MODE: {deploy_mode}\nTEST SIZE: {TEST_SIZE}\n\n" -result_path = f"benchmark/results/size_{TEST_SIZE}_{time.time()}.txt" +sizes_str = "_".join(TEST_SIZES) +result_path = f"benchmark/results/size_{sizes_str}_{time.time()}.txt" latencies = defaultdict(dict) throughputs = defaultdict(dict) @@ -115,12 +115,13 @@ def on_stop(self, record:Record) -> Record: results = "-"*50 + f"\nCONNECTOR_MODE: {self.connector_type}\nLATENCIES (ms per action):\n" for k,v in latencies[self.connector_type].items(): results += (f"{k}: {v}\n") - results += "THROUGHPUTS(number of actions per second):\n" + results += "THROUGHPUTS (number of requests per second):\n" for k,v in throughputs[self.connector_type].items(): results += (f"{k}: {v}\n") record_benchmark_results(results, result_path) return record + def record_benchmark_results(results, path): f = open(path, "a") f.write(results) @@ -133,30 +134,31 @@ def flush_testing_env(): r.flushdb() if __name__ == "__main__": - record_benchmark_results(test_config, result_path) - for connector_mode in existing_connectors: - flush_testing_env() - app = RalfApplication(RalfConfig(deploy_mode=deploy_mode)) - - source_ff = app.source( - FakeSource(TEST_SIZE), - operator_config=OperatorConfig( - ray_config=RayOperatorConfig(num_replicas=1), - ), - ) - - schema = Schema("key", {"key": str, "value": int, "timestamp": float}) - conn = existing_connectors[connector_mode]() - table_state = TableState(schema, conn, SourceValue) - - updateReadDelete_ff = source_ff.transform( - UpdateReadDelete(connector_mode), - operator_config=OperatorConfig( - ray_config=RayOperatorConfig(num_replicas=1), - ), - table_state = table_state - ) - - app.deploy() - app.wait() - \ No newline at end of file + for test_size in TEST_SIZES: + record_benchmark_results(f"DEPLOY MODE: {deploy_mode}\nTEST SIZE: {test_size}\n\n", result_path) + for connector_mode in existing_connectors: + flush_testing_env() + app = RalfApplication(RalfConfig(deploy_mode=deploy_mode)) + + source_ff = app.source( + FakeSource(test_size), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + ) + + schema = Schema("key", {"key": str, "value": int, "timestamp": float}) + conn = existing_connectors[connector_mode]() + table_state = TableState(schema, conn, SourceValue) + + updateReadDelete_ff = source_ff.transform( + UpdateReadDelete(connector_mode), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + table_state = table_state + ) + + app.deploy() + app.wait() + record_benchmark_results("\n\n", result_path) \ No newline at end of file From 1ccc8720d4ba8debbdaa93e215b666d3e22e8683 Mon Sep 17 00:00:00 2001 From: Jeffery Cheng Date: Tue, 21 Jun 2022 03:27:42 -0700 Subject: [PATCH 11/14] nit: unnecessary imports --- ralf/v2/benchmark/benchmark.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/ralf/v2/benchmark/benchmark.py b/ralf/v2/benchmark/benchmark.py index d35e399..7714299 100644 --- a/ralf/v2/benchmark/benchmark.py +++ b/ralf/v2/benchmark/benchmark.py @@ -3,8 +3,6 @@ from dataclasses import dataclass, make_dataclass from typing import List import os -from unittest import result -from matplotlib.pyplot import table import redis from ralf.v2.connectors.dict_connector import DictConnector From 2c917c6b796e0cb6d4d415668262991b31b884fe Mon Sep 17 00:00:00 2001 From: Jeffery Cheng Date: Tue, 21 Jun 2022 03:28:47 -0700 Subject: [PATCH 12/14] result path --- ralf/v2/benchmark/benchmark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ralf/v2/benchmark/benchmark.py b/ralf/v2/benchmark/benchmark.py index 7714299..0bb68dd 100644 --- a/ralf/v2/benchmark/benchmark.py +++ b/ralf/v2/benchmark/benchmark.py @@ -28,7 +28,7 @@ class SourceValue: #Number of records we're processing TEST_SIZES = [1000, 100000, 10000000] deploy_mode = "ray" -sizes_str = "_".join(TEST_SIZES) +sizes_str = "_".join([str(s) for s in TEST_SIZES]) result_path = f"benchmark/results/size_{sizes_str}_{time.time()}.txt" latencies = defaultdict(dict) From 781ffac427d655d412bc7300a2dc9d3db2324ded Mon Sep 17 00:00:00 2001 From: Jeffery Cheng Date: Tue, 21 Jun 2022 03:42:05 -0700 Subject: [PATCH 13/14] test edit --- ralf/v2/benchmark/benchmark.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ralf/v2/benchmark/benchmark.py b/ralf/v2/benchmark/benchmark.py index 0bb68dd..9ab1f5f 100644 --- a/ralf/v2/benchmark/benchmark.py +++ b/ralf/v2/benchmark/benchmark.py @@ -26,7 +26,7 @@ class SourceValue: timestamp: float #Number of records we're processing -TEST_SIZES = [1000, 100000, 10000000] +TEST_SIZES = [1000, 100000] deploy_mode = "ray" sizes_str = "_".join([str(s) for s in TEST_SIZES]) result_path = f"benchmark/results/size_{sizes_str}_{time.time()}.txt" @@ -45,7 +45,7 @@ class FakeSource(BaseTransform): def __init__(self, total: int) -> None: self.count = 0 self.total = total - self.num_keys = 1000 + self.num_keys = max(TEST_SIZES) #not sharding for now def on_event(self, _: Record) -> List[Record[SourceValue]]: From 7e77546a7d2679308c376fa773f3446a533219b4 Mon Sep 17 00:00:00 2001 From: Jeffery Cheng Date: Wed, 31 Aug 2022 00:07:14 -0700 Subject: [PATCH 14/14] load testing for benchmarks --- ralf/v2/benchmark/benchmark.py | 107 ++++++++++++++++++++++----------- ralf/v2/manager.py | 4 +- ralf/v2/operator.py | 2 +- 3 files changed, 75 insertions(+), 38 deletions(-) diff --git a/ralf/v2/benchmark/benchmark.py b/ralf/v2/benchmark/benchmark.py index 9ab1f5f..093fca3 100644 --- a/ralf/v2/benchmark/benchmark.py +++ b/ralf/v2/benchmark/benchmark.py @@ -1,3 +1,5 @@ +from multiprocessing.sharedctypes import Value +from sqlite3 import Timestamp import time from collections import defaultdict from dataclasses import dataclass, make_dataclass @@ -15,8 +17,6 @@ from ralf.v2.operator import OperatorConfig, RayOperatorConfig from ralf.v2.utils import get_logger -IntValue = make_dataclass("IntValue", ["value"]) - logger = get_logger() @dataclass @@ -25,8 +25,16 @@ class SourceValue: value: int timestamp: float +@dataclass +class LargeSourceValue: + key: str + value: str + timestamp: float + +LONG_TEXT_SIZE = 10 ** 6 + #Number of records we're processing -TEST_SIZES = [1000, 100000] +TEST_SIZES = [5,1000,1000000] deploy_mode = "ray" sizes_str = "_".join([str(s) for s in TEST_SIZES]) result_path = f"benchmark/results/size_{sizes_str}_{time.time()}.txt" @@ -56,9 +64,6 @@ def on_event(self, _: Record) -> List[Record[SourceValue]]: self.count += 1 key = str(self.count % self.num_keys) - # sleep to slow down send rate - time.sleep(1) - # return list of records (wrapped around dataclass) return [ Record( @@ -70,6 +75,32 @@ def on_event(self, _: Record) -> List[Record[SourceValue]]: shard_key=key, # key to shard/query ) ] +class LargeFakeSource(BaseTransform): + def __init__(self, total: int) -> None: + self.count = 0 + self.total = total + self.num_keys = max(TEST_SIZES) #not sharding for now + + def on_event(self, _: Record) -> List[Record[SourceValue]]: + + if self.count >= self.total: + print("completed iteration") + raise StopIteration() + + self.count += 1 + key = str(self.count % self.num_keys) + + # return list of records (wrapped around dataclass) + return [ + Record( + entry=LargeSourceValue( + key=key, + value=key*LONG_TEXT_SIZE, + timestamp=time.time(), + ), + shard_key=key, # key to shard/query + ) + ] #write/update class UpdateReadDelete(BaseTransform): @@ -77,38 +108,17 @@ def __init__(self, connector_type): self.connector_type = connector_type def on_event(self, record: Record) -> Record: - record.entry.value += 1 - self.getFF().table_state.update(record) self.getFF().table_state.point_query(record.entry.key) self.getFF().table_state.delete(record.entry.key) + if int(record.entry.key) % 50000 == 0: + logger.msg(f"Processed record: {record.entry.key}") + log_and_get_metrics(self) return record def on_stop(self, record:Record) -> Record: - update_latency = self.getFF().table_state.times["update"]/self.getFF().table_state.counts["update"] - read_latency = self.getFF().table_state.times["point_query"]/self.getFF().table_state.counts["point_query"] - delete_latency = self.getFF().table_state.times["delete"]/self.getFF().table_state.counts["delete"] - - update_throughput = self.getFF().table_state.counts["update"]/self.getFF().table_state.times["update"] - read_throughput = self.getFF().table_state.counts["point_query"]/self.getFF().table_state.times["point_query"] - delete_throughput = self.getFF().table_state.counts["delete"]/self.getFF().table_state.times["delete"] - - logger.msg(f"AVERAGE UPDATE LATENCY: {update_latency * 1000} ms per update") - logger.msg(f"AVERAGE READ LATENCY: {read_latency * 1000} ms per read") - logger.msg(f"AVERAGE DELETE LATENCY: {delete_latency * 1000} ms per delete") - - logger.msg(f"AVERAGE UPDATE LATENCY: {update_throughput} updates per second") - logger.msg(f"AVERAGE READ LATENCY: {read_throughput} reads per second") - logger.msg(f"AVERAGE DELETE LATENCY: {delete_throughput} deletes per second") - - latencies[self.connector_type]["update"] = update_latency * 1000 - latencies[self.connector_type]["read"] = read_latency * 1000 - latencies[self.connector_type]["delete"] = delete_latency * 1000 - - throughputs[self.connector_type]["update"] = update_throughput - throughputs[self.connector_type]["read"] = read_throughput - throughputs[self.connector_type]["delete"] = delete_throughput + latencies, throughputs = log_and_get_metrics(self) results = "-"*50 + f"\nCONNECTOR_MODE: {self.connector_type}\nLATENCIES (ms per action):\n" for k,v in latencies[self.connector_type].items(): @@ -120,6 +130,33 @@ def on_stop(self, record:Record) -> Record: return record +def log_and_get_metrics(transform): + update_latency = transform.getFF().table_state.times["update"]/transform.getFF().table_state.counts["update"] + read_latency = transform.getFF().table_state.times["point_query"]/transform.getFF().table_state.counts["point_query"] + delete_latency = transform.getFF().table_state.times["delete"]/transform.getFF().table_state.counts["delete"] + + update_throughput = transform.getFF().table_state.counts["update"]/transform.getFF().table_state.times["update"] + read_throughput = transform.getFF().table_state.counts["point_query"]/transform.getFF().table_state.times["point_query"] + delete_throughput = transform.getFF().table_state.counts["delete"]/transform.getFF().table_state.times["delete"] + + logger.msg(f"AVERAGE UPDATE LATENCY: {update_latency * 1000} ms per update") + logger.msg(f"AVERAGE READ LATENCY: {read_latency * 1000} ms per read") + logger.msg(f"AVERAGE DELETE LATENCY: {delete_latency * 1000} ms per delete") + + logger.msg(f"AVERAGE UPDATE THROUGHPUTS: {update_throughput} updates per second") + logger.msg(f"AVERAGE READ THROUGHPUTS: {read_throughput} reads per second") + logger.msg(f"AVERAGE DELETE THROUGHPUTS: {delete_throughput} deletes per second") + + latencies[transform.connector_type]["update"] = update_latency * 1000 + latencies[transform.connector_type]["read"] = read_latency * 1000 + latencies[transform.connector_type]["delete"] = delete_latency * 1000 + + throughputs[transform.connector_type]["update"] = update_throughput + throughputs[transform.connector_type]["read"] = read_throughput + throughputs[transform.connector_type]["delete"] = delete_throughput + + return latencies, throughputs + def record_benchmark_results(results, path): f = open(path, "a") f.write(results) @@ -133,21 +170,21 @@ def flush_testing_env(): if __name__ == "__main__": for test_size in TEST_SIZES: - record_benchmark_results(f"DEPLOY MODE: {deploy_mode}\nTEST SIZE: {test_size}\n\n", result_path) + record_benchmark_results(f"TIME: {time.localtime()}\nRECORD SIZE: {LONG_TEXT_SIZE}\nDEPLOY MODE: {deploy_mode}\nTEST SIZE: {test_size}\n\n", result_path) for connector_mode in existing_connectors: flush_testing_env() app = RalfApplication(RalfConfig(deploy_mode=deploy_mode)) source_ff = app.source( - FakeSource(test_size), + LargeFakeSource(test_size), operator_config=OperatorConfig( ray_config=RayOperatorConfig(num_replicas=1), ), ) - schema = Schema("key", {"key": str, "value": int, "timestamp": float}) + schema = Schema("key", {"key": str, "value": str, "timestamp": float}) conn = existing_connectors[connector_mode]() - table_state = TableState(schema, conn, SourceValue) + table_state = TableState(schema, conn, LargeSourceValue) updateReadDelete_ff = source_ff.transform( UpdateReadDelete(connector_mode), diff --git a/ralf/v2/manager.py b/ralf/v2/manager.py index 83c32c7..d6301f6 100644 --- a/ralf/v2/manager.py +++ b/ralf/v2/manager.py @@ -72,11 +72,11 @@ def wait(self): for handle in operator.pool.handles: refs.append(handle.wait_for_exit.remote()) while True: - _, not_done = ray.wait(refs, num_returns=len(refs), timeout=0.5) + _, not_done = ray.wait(refs, num_returns=len(refs), timeout=1) # print("Waiting for", not_done) if len(not_done) == 0: break - time.sleep(1) + time.sleep(0.1) class SimpyManager(RalfManager): diff --git a/ralf/v2/operator.py b/ralf/v2/operator.py index 40a8db5..f2a0deb 100644 --- a/ralf/v2/operator.py +++ b/ralf/v2/operator.py @@ -196,7 +196,7 @@ async def wait_for_exit(self): if not self.worker_thread.is_alive(): logger.msg("Worker thread exited!", frame=self.frame) break - + logger.msg("exiting...") class OperatorActorPool: """Contains a set number of Ray Actors."""