diff --git a/.gitignore b/.gitignore index 71c6c94..3e2cc13 100644 --- a/.gitignore +++ b/.gitignore @@ -53,7 +53,7 @@ coverage.xml *.py,cover .hypothesis/ .pytest_cache/ - +ralf/v2/benchmark/results/*.txt # Translations *.mo *.pot diff --git a/ralf/v2/api.py b/ralf/v2/api.py index 34bcded..81e861f 100644 --- a/ralf/v2/api.py +++ b/ralf/v2/api.py @@ -67,20 +67,47 @@ def on_event(self, record: Record) -> Union[None, Record, Iterable[Record]]: """ raise NotImplementedError("To be implemented by subclass.") - def prepare(self): + def on_stop(self, record: Record) -> Union[None, Record, Iterable[Record]]: + """ + Function called when a Stop Iteration event is received. Executes this on_stop method. + + A StopIteration exception is raised after, and the transform process will terminate. + A StopIteration special record will still be propogated to downstream feature frames. + + This function should be expected to be called once. + """ pass def __repr__(self): return self.__class__.__name__ - def get(self, key): - """Get current feature value for key. Returns null by default. + def getFF(self): + """Get the feature frame transform is being applied to. + """ + return self.feature_frame + + def prepare(self): + pass + + def get(self, key): + """Get current feature value for key. :param key: key to lookup feature value :type key: str """ - return None + return self.getFF().get(key) + + def update(self, record:Record): + self.getFF().update(record) + + def delete(self, key:str): + self.getFF().delete(key) + def get_all(self): + return self.getFF().get_all() + + def get_schema(self): + return self.getFF().get_schema() class FeatureFrame: """Encapsulate a feature transformation and its related policies configuration.""" @@ -95,8 +122,16 @@ def __init__( self.transform_object = transform_object self.scheduler = scheduler self.table_state = table_state + + # Adding component's access to this feature frame's resources + self.transform_object.feature_frame = self + self.scheduler.feature_frame = self + self.config = operator_config - self.children: List["FeatureFrame"] = [] + + self.children: List["FeatureFrame"] = [] + self.parent: "FeatureFrame" = None + logger.msg( "Created FeatureFrame", transform=transform_object, @@ -117,11 +152,31 @@ def transform( """Apply a transformation to this feature frame, with scheduler.""" frame = FeatureFrame(transform_object, scheduler, table_state, operator_config) self.children.append(frame) + frame.parent = self return frame def __repr__(self) -> str: return f"FeatureFrame({repr(self.transform_object)})" + def get(self, key:str): + return self.table_state.point_query(key) + + def update(self, record:Record): + self.table_state.update(record) + + def delete(self, key:str): + self.table_state.delete(key) + + def get_all(self): + return self.table_state.bulk_query() + + def get_schema(self): + return self.table_state.get_schema + + def prepare(self): + if self.table_state: + self.table_state.prepare() + #TODO: Interface to be able to query feature frame class RalfApplication: """An end to end feature processing pipeline in Ralf.""" diff --git a/ralf/v2/benchmark/benchmark.py b/ralf/v2/benchmark/benchmark.py new file mode 100644 index 0000000..093fca3 --- /dev/null +++ b/ralf/v2/benchmark/benchmark.py @@ -0,0 +1,199 @@ +from multiprocessing.sharedctypes import Value +from sqlite3 import Timestamp +import time +from collections import defaultdict +from dataclasses import dataclass, make_dataclass +from typing import List +import os +import redis + +from ralf.v2.connectors.dict_connector import DictConnector +from ralf.v2.connectors.redis_connector import RedisConnector +from ralf.v2.connectors.sqlite3_connector import SQLConnector +from ralf.v2.record import Schema +from ralf.v2.table_state import TableState + +from ralf.v2 import BaseTransform, RalfApplication, RalfConfig, Record +from ralf.v2.operator import OperatorConfig, RayOperatorConfig +from ralf.v2.utils import get_logger + +logger = get_logger() + +@dataclass +class SourceValue: + key: str + value: int + timestamp: float + +@dataclass +class LargeSourceValue: + key: str + value: str + timestamp: float + +LONG_TEXT_SIZE = 10 ** 6 + +#Number of records we're processing +TEST_SIZES = [5,1000,1000000] +deploy_mode = "ray" +sizes_str = "_".join([str(s) for s in TEST_SIZES]) +result_path = f"benchmark/results/size_{sizes_str}_{time.time()}.txt" + +latencies = defaultdict(dict) +throughputs = defaultdict(dict) + +#functions that create the connectors depending on specified connector mode +existing_connectors = { + "dict": DictConnector, + "redis": lambda: RedisConnector(host='127.0.0.1', port='6379', password=''), + "sqlite": lambda: SQLConnector(dbname="key.db") +} + +class FakeSource(BaseTransform): + def __init__(self, total: int) -> None: + self.count = 0 + self.total = total + self.num_keys = max(TEST_SIZES) #not sharding for now + + def on_event(self, _: Record) -> List[Record[SourceValue]]: + + if self.count >= self.total: + print("completed iteration") + raise StopIteration() + + self.count += 1 + key = str(self.count % self.num_keys) + + # return list of records (wrapped around dataclass) + return [ + Record( + entry=SourceValue( + key=key, + value=self.count, + timestamp=time.time(), + ), + shard_key=key, # key to shard/query + ) + ] +class LargeFakeSource(BaseTransform): + def __init__(self, total: int) -> None: + self.count = 0 + self.total = total + self.num_keys = max(TEST_SIZES) #not sharding for now + + def on_event(self, _: Record) -> List[Record[SourceValue]]: + + if self.count >= self.total: + print("completed iteration") + raise StopIteration() + + self.count += 1 + key = str(self.count % self.num_keys) + + # return list of records (wrapped around dataclass) + return [ + Record( + entry=LargeSourceValue( + key=key, + value=key*LONG_TEXT_SIZE, + timestamp=time.time(), + ), + shard_key=key, # key to shard/query + ) + ] + +#write/update +class UpdateReadDelete(BaseTransform): + def __init__(self, connector_type): + self.connector_type = connector_type + + def on_event(self, record: Record) -> Record: + self.getFF().table_state.update(record) + self.getFF().table_state.point_query(record.entry.key) + self.getFF().table_state.delete(record.entry.key) + + if int(record.entry.key) % 50000 == 0: + logger.msg(f"Processed record: {record.entry.key}") + log_and_get_metrics(self) + return record + + def on_stop(self, record:Record) -> Record: + latencies, throughputs = log_and_get_metrics(self) + + results = "-"*50 + f"\nCONNECTOR_MODE: {self.connector_type}\nLATENCIES (ms per action):\n" + for k,v in latencies[self.connector_type].items(): + results += (f"{k}: {v}\n") + results += "THROUGHPUTS (number of requests per second):\n" + for k,v in throughputs[self.connector_type].items(): + results += (f"{k}: {v}\n") + record_benchmark_results(results, result_path) + + return record + +def log_and_get_metrics(transform): + update_latency = transform.getFF().table_state.times["update"]/transform.getFF().table_state.counts["update"] + read_latency = transform.getFF().table_state.times["point_query"]/transform.getFF().table_state.counts["point_query"] + delete_latency = transform.getFF().table_state.times["delete"]/transform.getFF().table_state.counts["delete"] + + update_throughput = transform.getFF().table_state.counts["update"]/transform.getFF().table_state.times["update"] + read_throughput = transform.getFF().table_state.counts["point_query"]/transform.getFF().table_state.times["point_query"] + delete_throughput = transform.getFF().table_state.counts["delete"]/transform.getFF().table_state.times["delete"] + + logger.msg(f"AVERAGE UPDATE LATENCY: {update_latency * 1000} ms per update") + logger.msg(f"AVERAGE READ LATENCY: {read_latency * 1000} ms per read") + logger.msg(f"AVERAGE DELETE LATENCY: {delete_latency * 1000} ms per delete") + + logger.msg(f"AVERAGE UPDATE THROUGHPUTS: {update_throughput} updates per second") + logger.msg(f"AVERAGE READ THROUGHPUTS: {read_throughput} reads per second") + logger.msg(f"AVERAGE DELETE THROUGHPUTS: {delete_throughput} deletes per second") + + latencies[transform.connector_type]["update"] = update_latency * 1000 + latencies[transform.connector_type]["read"] = read_latency * 1000 + latencies[transform.connector_type]["delete"] = delete_latency * 1000 + + throughputs[transform.connector_type]["update"] = update_throughput + throughputs[transform.connector_type]["read"] = read_throughput + throughputs[transform.connector_type]["delete"] = delete_throughput + + return latencies, throughputs + +def record_benchmark_results(results, path): + f = open(path, "a") + f.write(results) + f.close() + +def flush_testing_env(): + if os.path.exists("key.db"): + os.remove("key.db") + r = redis.Redis() + r.flushdb() + +if __name__ == "__main__": + for test_size in TEST_SIZES: + record_benchmark_results(f"TIME: {time.localtime()}\nRECORD SIZE: {LONG_TEXT_SIZE}\nDEPLOY MODE: {deploy_mode}\nTEST SIZE: {test_size}\n\n", result_path) + for connector_mode in existing_connectors: + flush_testing_env() + app = RalfApplication(RalfConfig(deploy_mode=deploy_mode)) + + source_ff = app.source( + LargeFakeSource(test_size), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + ) + + schema = Schema("key", {"key": str, "value": str, "timestamp": float}) + conn = existing_connectors[connector_mode]() + table_state = TableState(schema, conn, LargeSourceValue) + + updateReadDelete_ff = source_ff.transform( + UpdateReadDelete(connector_mode), + operator_config=OperatorConfig( + ray_config=RayOperatorConfig(num_replicas=1), + ), + table_state = table_state + ) + + app.deploy() + app.wait() + record_benchmark_results("\n\n", result_path) \ No newline at end of file diff --git a/ralf/v2/connector.py b/ralf/v2/connector.py index 653c10b..8aa3fdb 100644 --- a/ralf/v2/connector.py +++ b/ralf/v2/connector.py @@ -22,7 +22,7 @@ def delete(self, schema: Schema, key: str): pass @abstractmethod - def get_one(self, schema: Schema, key: str, dataclass) -> Union[Record, None]: + def get(self, schema: Schema, key: str, dataclass) -> Union[Record, None]: pass @abstractmethod diff --git a/ralf/v2/connectors/dict_connector.py b/ralf/v2/connectors/dict_connector.py index f6b1a80..dbe557b 100644 --- a/ralf/v2/connectors/dict_connector.py +++ b/ralf/v2/connectors/dict_connector.py @@ -27,7 +27,7 @@ def delete(self, schema: Schema, key: str): if key in records: records.pop(key, None) - def get_one(self, schema: Schema, key, _) -> Union[Record, None]: + def get(self, schema: Schema, key, _) -> Union[Record, None]: records = self.get_records(schema) if key in records: return records[key] diff --git a/ralf/v2/connectors/redis_connector.py b/ralf/v2/connectors/redis_connector.py index 932c64a..57d57df 100644 --- a/ralf/v2/connectors/redis_connector.py +++ b/ralf/v2/connectors/redis_connector.py @@ -29,7 +29,7 @@ def update(self, schema: Schema, record: Record): def delete(self, schema: Schema, key: str): self.conn.hdel(schema.get_name(), key) - def get_one(self, schema: Schema, key, dataclass) -> Union[Record, None]: + def get(self, schema: Schema, key, dataclass) -> Union[Record, None]: val = self.conn.hget(schema.get_name(), key) if val: return Record.deserialize(val, dataclass) diff --git a/ralf/v2/connectors/sqlite3_connector.py b/ralf/v2/connectors/sqlite3_connector.py index 2cdb4ef..4a4286a 100644 --- a/ralf/v2/connectors/sqlite3_connector.py +++ b/ralf/v2/connectors/sqlite3_connector.py @@ -25,7 +25,7 @@ def get_conn(self): class SQLConnector(Connector): def __init__(self, dbname: str): self.dbname = dbname - + def create_connection(self): # This function will return a new connection # if the connector is such that it cannot be shared @@ -38,7 +38,7 @@ def add_table(self, schema: Schema): if hasattr(self, "conn"): conn = self.conn else: - conn = sqlite3.connect(self.dbname) + conn = sqlite3.connect(self.dbname, check_same_thread=False) table_name = schema.get_name() curr = conn.cursor() if schema.columns[schema.primary_key] not in sql_types: @@ -68,7 +68,7 @@ def delete(self, schema: Schema, key: str): curr.execute(f"DELETE FROM {table_name} WHERE {schema.primary_key} = {key}") self.conn.commit() - def get_one(self, schema: Schema, key: str, dataclass) -> Union[Record, None]: + def get(self, schema: Schema, key: str, dataclass) -> Union[Record, None]: # conn = sqlite3.connect(self.dbname) curr = self.conn.cursor() table_name = schema.get_name() @@ -95,4 +95,4 @@ def count(self, schema: Schema) -> int: return count def prepare(self): - self.conn = sqlite3.connect(self.dbname) \ No newline at end of file + self.conn = sqlite3.connect(self.dbname, check_same_thread=False) \ No newline at end of file diff --git a/ralf/v2/examples/counter.py b/ralf/v2/examples/counter.py index 0b821d4..424a1b2 100644 --- a/ralf/v2/examples/counter.py +++ b/ralf/v2/examples/counter.py @@ -8,6 +8,7 @@ from ralf.v2 import BaseTransform, RalfApplication, RalfConfig, Record from ralf.v2.operator import OperatorConfig, RayOperatorConfig +from ralf.v2.utils import get_logger @dataclass @@ -22,6 +23,7 @@ class SumValue: key: str value: int +logger = get_logger() class FakeSource(BaseTransform): def __init__(self, total: int) -> None: @@ -66,21 +68,22 @@ def on_event(self, record: Record) -> Record[SumValue]: ) class UpdateDict(BaseTransform): - def __init__(self, table_state: TableState): - self.table_state = table_state + def __init__(self): + self.count = 0 def on_event(self, record: Record) -> None: - print("single update table", self.table_state.connector.tables) - self.table_state.update(record) + self.update(record) return None + def on_stop(self, record: Record): + logger.msg(self.get_all()) + class BatchUpdate(BaseTransform): - def __init__(self, table_state:TableState, batch_size: int): + def __init__(self, batch_size: int): self.batch_size = batch_size self.count = 0 self.records = [] - self.table_state = table_state - + def on_event(self, record: Record) -> None: self.records.append(record) self.count += 1 @@ -89,11 +92,13 @@ def on_event(self, record: Record) -> None: self.count = 0 for r in self.records: print(f"batch update, processing {r}") - self.table_state.update(r) + self.update(r) self.records = [] - print("batch table", self.table_state.connector.tables) return None + + def on_stop(self, record: Record): + logger.msg(self.get_all()) if __name__ == "__main__": @@ -117,7 +122,8 @@ def on_event(self, record: Record) -> None: dict_schema = Schema("key", {"key": str, "value": int}) dict_schema_1 = Schema("key", {"key": str, "value": int}) - + dict_schema.name = "single" + dict_schema_1.name = "batch" dict_conn = DictConnector() dict_conn_1 = DictConnector() @@ -125,19 +131,19 @@ def on_event(self, record: Record) -> None: batch_table_state = TableState(dict_schema_1, dict_conn_1, dataclass) update_ff = sum_ff.transform( - UpdateDict(dict_table_state), + UpdateDict(), operator_config=OperatorConfig( ray_config=RayOperatorConfig(num_replicas=1), ), + table_state=dict_table_state ) - batch_updater = BatchUpdate(batch_table_state, 3) - batch_update_ff = sum_ff.transform( - batch_updater, + BatchUpdate(3), operator_config=OperatorConfig( ray_config=RayOperatorConfig(num_replicas=1), ), + table_state=batch_table_state ) app.deploy() diff --git a/ralf/v2/manager.py b/ralf/v2/manager.py index 83c32c7..d6301f6 100644 --- a/ralf/v2/manager.py +++ b/ralf/v2/manager.py @@ -72,11 +72,11 @@ def wait(self): for handle in operator.pool.handles: refs.append(handle.wait_for_exit.remote()) while True: - _, not_done = ray.wait(refs, num_returns=len(refs), timeout=0.5) + _, not_done = ray.wait(refs, num_returns=len(refs), timeout=1) # print("Waiting for", not_done) if len(not_done) == 0: break - time.sleep(1) + time.sleep(0.1) class SimpyManager(RalfManager): diff --git a/ralf/v2/operator.py b/ralf/v2/operator.py index a95a098..f2a0deb 100644 --- a/ralf/v2/operator.py +++ b/ralf/v2/operator.py @@ -78,7 +78,7 @@ def __init__( ) logger.msg("Preparing transform object") - + self.frame.prepare() self.worker_thread = Thread(target=self._run_forever, daemon=True) self.worker_thread.start() @@ -92,7 +92,6 @@ def _run_forever(self): if hasattr(self.transform_object, "table_state"): self.transform_object.table_state.connector.prepare() self.transform_object.prepare() - db_path = f"{self.config.metrics_dir}/{str(self.frame.transform_object)}_{self.context['shard_idx']}.db" metrics_connection = event_metrics.MetricConnection( @@ -152,6 +151,7 @@ def time_and_count(metric_name, labels={}): # If this operator got StopIteration, it should exit elif next_event.is_stop_iteration(): + self.transform_object.on_stop(next_event) raise StopIteration() else: @@ -196,7 +196,7 @@ async def wait_for_exit(self): if not self.worker_thread.is_alive(): logger.msg("Worker thread exited!", frame=self.frame) break - + logger.msg("exiting...") class OperatorActorPool: """Contains a set number of Ray Actors.""" @@ -312,7 +312,9 @@ def enqueue_events(self, records: List[Record]): def dump_transform_state(self): return self.recordings - + + def get(self, key): + pass class SimpyOperator(RalfOperator): """Skip running transform, but record the event ordering""" @@ -379,6 +381,8 @@ def enqueue_events(self, records: List[Record]): for record in records: self.scheduler.push_event(record) + def get(self, key): + return self.transform_object.get(key) @dataclass class RayOperatorConfig: diff --git a/ralf/v2/scheduler.py b/ralf/v2/scheduler.py index e952467..b369baf 100644 --- a/ralf/v2/scheduler.py +++ b/ralf/v2/scheduler.py @@ -117,83 +117,6 @@ def pop_event(self) -> Record: def qsize(self) -> int: return len(self.queue) - -@total_ordering -class KeyCount: - def __init__(self, key, num_processed, record): - self.key = key - self.records = [record] - self.num_processed = num_processed - - def process(self): - self.num_processed += 1 - return self.records.pop() - - def add_record(self, record): - self.records.append(record) - - def __eq__(self, other): - return self.key == other.key and self.num_processed == other.num_processed - - def __lt__(self, other): - return ( - int(len(self.records) == 0) * 1000000000 + self.num_processed - < other.num_processed + int(len(other.records) == 0) * 1000000000 - ) - - def __gt__(self, other): - return ( - int(len(self.records) == 0) * 1000000000 + self.num_processed - > other.num_processed + int(len(other.records) == 0) * 1000000000 - ) - - def __repr__(self): - return f"KeyCount(key : {self.key}, num_processed : {self.num_processed}, # of records : {len(self.records)}" - - -class LeastUpdate(BaseScheduler): - def __init__(self) -> None: - self.seen_keys = dict() - self.queue: List[KeyCount] = [] - self.waker: Optional[threading.Event] = None - self.num_unprocessed = 0 - - def push_event(self, record: Record): - if record.is_stop_iteration(): - kc = KeyCount("stop", 0, record) - self.queue.insert(0, kc) - return - key = record.entry.key - if key not in self.seen_keys: - kc = KeyCount(key, 0, record) - self.seen_keys[key] = kc - heapq.heappush(self.queue, kc) - else: - kc = self.seen_keys[key] - kc.add_record(record) - - heapq.heapify(self.queue) - # print(self.queue[0]) - if len(self.queue[0].records) == 0: - print( - self.num_unprocessed, - sum([len(i.records) for i in self.queue]), - [[len(i.records), i.key, i.num_processed] for i in self.queue], - ) - self.wake_waiter_if_needed() - - def pop_event(self) -> Record: - if not self.queue or len(self.queue[0].records) == 0: - return Record.make_wait_event(self.new_waker()) - least_updated = heapq.heappop(self.queue) - if len(least_updated.records) == 0: - return Record.make_wait_event(self.new_waker()) - record = least_updated.process() - # self.num_unprocessed -= 1 - heapq.heappush(self.queue, least_updated) - return record - - @total_ordering class KeyCount: def __init__(self, key, num_processed, record): @@ -269,6 +192,8 @@ def pop_event(self) -> Record: heapq.heappush(self.queue, least_updated) return record + def qsize(self) -> int: + return len(self.queue) @dataclass class DummyEntry: diff --git a/ralf/v2/table_state.py b/ralf/v2/table_state.py index 65aa3a3..a639dff 100644 --- a/ralf/v2/table_state.py +++ b/ralf/v2/table_state.py @@ -1,5 +1,7 @@ +from collections import defaultdict +from tokenize import String from typing import List -import redis +import time from ralf.v2.record import Record, Schema from ralf.v2.connector import Connector @@ -10,40 +12,55 @@ class TableState: def __init__(self, schema: Schema, connector: Connector, dataclass): self.schema = schema - self.connector = connector.create_connection() + self.connector = connector self.connector.add_table(schema) self.dataclass = dataclass - self.num_updates: int = 0 - self.num_deletes: int = 0 - self.num_records: int = 0 + self.times = defaultdict(float) + self.counts = defaultdict(int) def debug_state(self): self.num_records = self.connector.count(self.schema) return { - "num_updates": self.num_updates, - "num_deletes": self.num_deletes, - "num_records": self.num_records, + "num_updates": self.counts["update"], + "num_deletes": self.counts["delete"], + "num_point_query": self.counts["point_query"], } def update(self, record: Record): - self.schema.validate_record(record) + self.schema.validate_record(record) + t1 = time.time() self.connector.update(self.schema, record) - self.num_updates += 1 + t2 = time.time() + + self.recordQuery("update", t2-t1) def delete(self, key: str): + t1 = time.time() self.connector.delete(self.schema, key) - self.num_deletes += 1 + t2 = time.time() + + self.recordQuery("delete", t2-t1) def point_query(self, key) -> Record: - val = self.connector.get_one(self.schema, key, self.dataclass) + t1 = time.time() + val = self.connector.get(self.schema, key, self.dataclass) + t2 = time.time() + if not val: raise KeyError(f"Key {key} not found.") + self.recordQuery("point_query", t2-t1) return val def bulk_query(self) -> List[Record]: - print(self.schema, self.dataclass) return self.connector.get_all(self.schema, self.dataclass) def get_schema(self) -> Schema: - return self.schema \ No newline at end of file + return self.schema + + def recordQuery(self, queryType: String, timeTaken): + self.times[queryType] += timeTaken + self.counts[queryType] += 1 + + def prepare(self): + self.connector.prepare()