Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ coverage.xml
*.py,cover
.hypothesis/
.pytest_cache/

ralf/v2/benchmark/results/*.txt
# Translations
*.mo
*.pot
Expand Down
65 changes: 60 additions & 5 deletions ralf/v2/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,47 @@ def on_event(self, record: Record) -> Union[None, Record, Iterable[Record]]:
"""
raise NotImplementedError("To be implemented by subclass.")

def prepare(self):
def on_stop(self, record: Record) -> Union[None, Record, Iterable[Record]]:
"""
Function called when a Stop Iteration event is received. Executes this on_stop method.

A StopIteration exception is raised after, and the transform process will terminate.
A StopIteration special record will still be propogated to downstream feature frames.

This function should be expected to be called once.
"""
pass

def __repr__(self):
return self.__class__.__name__

def get(self, key):
"""Get current feature value for key. Returns null by default.
def getFF(self):
"""Get the feature frame transform is being applied to.
"""
return self.feature_frame

def prepare(self):
pass

def get(self, key):
"""Get current feature value for key.

:param key: key to lookup feature value
:type key: str
"""
return None
return self.getFF().get(key)

def update(self, record:Record):
self.getFF().update(record)

def delete(self, key:str):
self.getFF().delete(key)

def get_all(self):
return self.getFF().get_all()

def get_schema(self):
return self.getFF().get_schema()

class FeatureFrame:
"""Encapsulate a feature transformation and its related policies configuration."""
Expand All @@ -95,8 +122,16 @@ def __init__(
self.transform_object = transform_object
self.scheduler = scheduler
self.table_state = table_state

# Adding component's access to this feature frame's resources
self.transform_object.feature_frame = self
self.scheduler.feature_frame = self

self.config = operator_config
self.children: List["FeatureFrame"] = []

self.children: List["FeatureFrame"] = []
self.parent: "FeatureFrame" = None

logger.msg(
"Created FeatureFrame",
transform=transform_object,
Expand All @@ -117,11 +152,31 @@ def transform(
"""Apply a transformation to this feature frame, with scheduler."""
frame = FeatureFrame(transform_object, scheduler, table_state, operator_config)
self.children.append(frame)
frame.parent = self
return frame

def __repr__(self) -> str:
return f"FeatureFrame({repr(self.transform_object)})"

def get(self, key:str):
return self.table_state.point_query(key)

def update(self, record:Record):
self.table_state.update(record)

def delete(self, key:str):
self.table_state.delete(key)

def get_all(self):
return self.table_state.bulk_query()

def get_schema(self):
return self.table_state.get_schema

def prepare(self):
if self.table_state:
self.table_state.prepare()
#TODO: Interface to be able to query feature frame

class RalfApplication:
"""An end to end feature processing pipeline in Ralf."""
Expand Down
199 changes: 199 additions & 0 deletions ralf/v2/benchmark/benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
from multiprocessing.sharedctypes import Value
from sqlite3 import Timestamp
import time
from collections import defaultdict
from dataclasses import dataclass, make_dataclass
from typing import List
import os
import redis

from ralf.v2.connectors.dict_connector import DictConnector
from ralf.v2.connectors.redis_connector import RedisConnector
from ralf.v2.connectors.sqlite3_connector import SQLConnector
from ralf.v2.record import Schema
from ralf.v2.table_state import TableState

from ralf.v2 import BaseTransform, RalfApplication, RalfConfig, Record
from ralf.v2.operator import OperatorConfig, RayOperatorConfig
from ralf.v2.utils import get_logger

logger = get_logger()

@dataclass
class SourceValue:
key: str
value: int
timestamp: float

@dataclass
class LargeSourceValue:
key: str
value: str
timestamp: float

LONG_TEXT_SIZE = 10 ** 6

#Number of records we're processing
TEST_SIZES = [5,1000,1000000]
deploy_mode = "ray"
sizes_str = "_".join([str(s) for s in TEST_SIZES])
result_path = f"benchmark/results/size_{sizes_str}_{time.time()}.txt"

latencies = defaultdict(dict)
throughputs = defaultdict(dict)

#functions that create the connectors depending on specified connector mode
existing_connectors = {
"dict": DictConnector,
"redis": lambda: RedisConnector(host='127.0.0.1', port='6379', password=''),
"sqlite": lambda: SQLConnector(dbname="key.db")
}

class FakeSource(BaseTransform):
def __init__(self, total: int) -> None:
self.count = 0
self.total = total
self.num_keys = max(TEST_SIZES) #not sharding for now

def on_event(self, _: Record) -> List[Record[SourceValue]]:

if self.count >= self.total:
print("completed iteration")
raise StopIteration()

self.count += 1
key = str(self.count % self.num_keys)

# return list of records (wrapped around dataclass)
return [
Record(
entry=SourceValue(
key=key,
value=self.count,
timestamp=time.time(),
),
shard_key=key, # key to shard/query
)
]
class LargeFakeSource(BaseTransform):
def __init__(self, total: int) -> None:
self.count = 0
self.total = total
self.num_keys = max(TEST_SIZES) #not sharding for now

def on_event(self, _: Record) -> List[Record[SourceValue]]:

if self.count >= self.total:
print("completed iteration")
raise StopIteration()

self.count += 1
key = str(self.count % self.num_keys)

# return list of records (wrapped around dataclass)
return [
Record(
entry=LargeSourceValue(
key=key,
value=key*LONG_TEXT_SIZE,
timestamp=time.time(),
),
shard_key=key, # key to shard/query
)
]

#write/update
class UpdateReadDelete(BaseTransform):
def __init__(self, connector_type):
self.connector_type = connector_type

def on_event(self, record: Record) -> Record:
self.getFF().table_state.update(record)
self.getFF().table_state.point_query(record.entry.key)
self.getFF().table_state.delete(record.entry.key)

if int(record.entry.key) % 50000 == 0:
logger.msg(f"Processed record: {record.entry.key}")
log_and_get_metrics(self)
return record

def on_stop(self, record:Record) -> Record:
latencies, throughputs = log_and_get_metrics(self)

results = "-"*50 + f"\nCONNECTOR_MODE: {self.connector_type}\nLATENCIES (ms per action):\n"
for k,v in latencies[self.connector_type].items():
results += (f"{k}: {v}\n")
results += "THROUGHPUTS (number of requests per second):\n"
for k,v in throughputs[self.connector_type].items():
results += (f"{k}: {v}\n")
record_benchmark_results(results, result_path)

return record

def log_and_get_metrics(transform):
update_latency = transform.getFF().table_state.times["update"]/transform.getFF().table_state.counts["update"]
read_latency = transform.getFF().table_state.times["point_query"]/transform.getFF().table_state.counts["point_query"]
delete_latency = transform.getFF().table_state.times["delete"]/transform.getFF().table_state.counts["delete"]

update_throughput = transform.getFF().table_state.counts["update"]/transform.getFF().table_state.times["update"]
read_throughput = transform.getFF().table_state.counts["point_query"]/transform.getFF().table_state.times["point_query"]
delete_throughput = transform.getFF().table_state.counts["delete"]/transform.getFF().table_state.times["delete"]

logger.msg(f"AVERAGE UPDATE LATENCY: {update_latency * 1000} ms per update")
logger.msg(f"AVERAGE READ LATENCY: {read_latency * 1000} ms per read")
logger.msg(f"AVERAGE DELETE LATENCY: {delete_latency * 1000} ms per delete")

logger.msg(f"AVERAGE UPDATE THROUGHPUTS: {update_throughput} updates per second")
logger.msg(f"AVERAGE READ THROUGHPUTS: {read_throughput} reads per second")
logger.msg(f"AVERAGE DELETE THROUGHPUTS: {delete_throughput} deletes per second")

latencies[transform.connector_type]["update"] = update_latency * 1000
latencies[transform.connector_type]["read"] = read_latency * 1000
latencies[transform.connector_type]["delete"] = delete_latency * 1000

throughputs[transform.connector_type]["update"] = update_throughput
throughputs[transform.connector_type]["read"] = read_throughput
throughputs[transform.connector_type]["delete"] = delete_throughput

return latencies, throughputs

def record_benchmark_results(results, path):
f = open(path, "a")
f.write(results)
f.close()

def flush_testing_env():
if os.path.exists("key.db"):
os.remove("key.db")
r = redis.Redis()
r.flushdb()

if __name__ == "__main__":
for test_size in TEST_SIZES:
record_benchmark_results(f"TIME: {time.localtime()}\nRECORD SIZE: {LONG_TEXT_SIZE}\nDEPLOY MODE: {deploy_mode}\nTEST SIZE: {test_size}\n\n", result_path)
for connector_mode in existing_connectors:
flush_testing_env()
app = RalfApplication(RalfConfig(deploy_mode=deploy_mode))

source_ff = app.source(
LargeFakeSource(test_size),
operator_config=OperatorConfig(
ray_config=RayOperatorConfig(num_replicas=1),
),
)

schema = Schema("key", {"key": str, "value": str, "timestamp": float})
conn = existing_connectors[connector_mode]()
table_state = TableState(schema, conn, LargeSourceValue)

updateReadDelete_ff = source_ff.transform(
UpdateReadDelete(connector_mode),
operator_config=OperatorConfig(
ray_config=RayOperatorConfig(num_replicas=1),
),
table_state = table_state
)

app.deploy()
app.wait()
record_benchmark_results("\n\n", result_path)
2 changes: 1 addition & 1 deletion ralf/v2/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def delete(self, schema: Schema, key: str):
pass

@abstractmethod
def get_one(self, schema: Schema, key: str, dataclass) -> Union[Record, None]:
def get(self, schema: Schema, key: str, dataclass) -> Union[Record, None]:
pass

@abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion ralf/v2/connectors/dict_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def delete(self, schema: Schema, key: str):
if key in records:
records.pop(key, None)

def get_one(self, schema: Schema, key, _) -> Union[Record, None]:
def get(self, schema: Schema, key, _) -> Union[Record, None]:
records = self.get_records(schema)
if key in records:
return records[key]
Expand Down
2 changes: 1 addition & 1 deletion ralf/v2/connectors/redis_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def update(self, schema: Schema, record: Record):
def delete(self, schema: Schema, key: str):
self.conn.hdel(schema.get_name(), key)

def get_one(self, schema: Schema, key, dataclass) -> Union[Record, None]:
def get(self, schema: Schema, key, dataclass) -> Union[Record, None]:
val = self.conn.hget(schema.get_name(), key)
if val:
return Record.deserialize(val, dataclass)
Expand Down
8 changes: 4 additions & 4 deletions ralf/v2/connectors/sqlite3_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def get_conn(self):
class SQLConnector(Connector):
def __init__(self, dbname: str):
self.dbname = dbname

def create_connection(self):
# This function will return a new connection
# if the connector is such that it cannot be shared
Expand All @@ -38,7 +38,7 @@ def add_table(self, schema: Schema):
if hasattr(self, "conn"):
conn = self.conn
else:
conn = sqlite3.connect(self.dbname)
conn = sqlite3.connect(self.dbname, check_same_thread=False)
table_name = schema.get_name()
curr = conn.cursor()
if schema.columns[schema.primary_key] not in sql_types:
Expand Down Expand Up @@ -68,7 +68,7 @@ def delete(self, schema: Schema, key: str):
curr.execute(f"DELETE FROM {table_name} WHERE {schema.primary_key} = {key}")
self.conn.commit()

def get_one(self, schema: Schema, key: str, dataclass) -> Union[Record, None]:
def get(self, schema: Schema, key: str, dataclass) -> Union[Record, None]:
# conn = sqlite3.connect(self.dbname)
curr = self.conn.cursor()
table_name = schema.get_name()
Expand All @@ -95,4 +95,4 @@ def count(self, schema: Schema) -> int:
return count

def prepare(self):
self.conn = sqlite3.connect(self.dbname)
self.conn = sqlite3.connect(self.dbname, check_same_thread=False)
Loading