diff --git a/Apache-Arrow-Flight-Tester/common.py b/Apache-Arrow-Flight-Tester/common.py deleted file mode 100644 index 3608dbc..0000000 --- a/Apache-Arrow-Flight-Tester/common.py +++ /dev/null @@ -1,199 +0,0 @@ -import pyarrow -import pprint - -from typing import Literal -from protobuf import protocol_pb2 -from pyarrow import flight, Schema -from pyarrow._flight import FlightInfo, ActionType, Result, Ticket - - -class ModelarDBFlightClient: - """Common functionality for interacting with server and manager ModelarDB instances using Apache Arrow Flight.""" - - def __init__(self, location: str): - self.flight_client = flight.FlightClient(location) - - def list_flights(self) -> list[FlightInfo]: - """Wrapper around the list_flights method of the FlightClient class.""" - response = self.flight_client.list_flights() - - return list(response) - - def get_schema(self, table_name: str) -> Schema: - """Wrapper around the get_schema method of the FlightClient class.""" - upload_descriptor = flight.FlightDescriptor.for_path(table_name) - response = self.flight_client.get_schema(upload_descriptor) - - return response.schema - - def do_get(self, ticket: Ticket) -> None: - """Wrapper around the do_get method of the FlightClient class.""" - response = self.flight_client.do_get(ticket) - - for batch in response: - pprint.pprint(batch.data.to_pydict()) - - def do_action(self, action_type: str, action_body: bytes) -> list[Result]: - """Wrapper around the do_action method of the FlightClient class.""" - action = flight.Action(action_type, action_body) - response = self.flight_client.do_action(action) - - return list(response) - - def list_actions(self) -> list[ActionType]: - """Wrapper around the list_actions method of the FlightClient class.""" - response = self.flight_client.list_actions() - - return list(response) - - def list_table_names(self) -> list[str]: - """Return the names of the tables in the server or manager.""" - flights = self.list_flights() - return [table_name.decode("utf-8") for table_name in flights[0].descriptor.path] - - def create_table(self, table_name: str, columns: list[tuple[str, str]], time_series_table=False) -> None: - """ - Create a table in the server or manager with the given name and columns. Each pair in columns should have the - format (column_name, column_type). - """ - create_table = ( - "CREATE TIME SERIES TABLE" if time_series_table else "CREATE TABLE" - ) - sql = f"{create_table} {table_name}({', '.join([f'{column[0]} {column[1]}' for column in columns])})" - - self.do_get(Ticket(sql)) - - def create_test_tables(self) -> None: - """ - Create a normal table and a time series table using the flight client, print the current tables to ensure the - created tables are included, and print the schema to ensure the tables are created correctly. - """ - print("Creating test tables...") - - self.create_table( - "test_table_1", - [("timestamp", "TIMESTAMP"), ("values", "REAL"), ("metadata", "REAL")], - ) - self.create_table( - "test_time_series_table_1", - [ - ("location", "TAG"), - ("install_year", "TAG"), - ("model", "TAG"), - ("timestamp", "TIMESTAMP"), - ("power_output", "FIELD"), - ("wind_speed", "FIELD"), - ("temperature", "FIELD(5%)"), - ], - time_series_table=True, - ) - - print("\nCurrent tables:") - for table_name in self.list_table_names(): - print(f"{table_name}:") - print(f"{self.get_schema(table_name)}\n") - - def create_normal_table_from_metadata(self, table_name: str, schema: pyarrow.Schema) -> None: - """Create a normal table using the table name and schema.""" - normal_table_metadata = protocol_pb2.TableMetadata.NormalTableMetadata() - - normal_table_metadata.name = table_name - normal_table_metadata.schema = schema.serialize().to_pybytes() - - table_metadata = protocol_pb2.TableMetadata() - table_metadata.normal_table.CopyFrom(normal_table_metadata) - - self.do_action("CreateTable", table_metadata.SerializeToString()) - - def create_time_series_table_from_metadata(self, table_name: str, schema: pyarrow.Schema, error_bounds: list[ - protocol_pb2.TableMetadata.TimeSeriesTableMetadata.ErrorBound], generated_columns: list[bytes]) -> None: - """Create a time series table using the table name, schema, error bounds, and generated columns.""" - time_series_table_metadata = protocol_pb2.TableMetadata.TimeSeriesTableMetadata() - - time_series_table_metadata.name = table_name - time_series_table_metadata.schema = schema.serialize().to_pybytes() - time_series_table_metadata.error_bounds.extend(error_bounds) - time_series_table_metadata.generated_column_expressions.extend(generated_columns) - - table_metadata = protocol_pb2.TableMetadata() - table_metadata.time_series_table.CopyFrom(time_series_table_metadata) - - self.do_action("CreateTable", table_metadata.SerializeToString()) - - def create_test_tables_from_metadata(self): - """ - Create a normal table and a time series table using the CreateTable action, print the current tables to ensure - the created tables are included, and print the schema to ensure the tables are created correctly. - """ - print("Creating test tables from metadata...") - - normal_table_schema = pyarrow.schema([ - ("timestamp", pyarrow.timestamp("us")), - ("values", pyarrow.float32()), - ("metadata", pyarrow.utf8()) - ]) - - self.create_normal_table_from_metadata("test_table_1", normal_table_schema) - - time_series_table_schema = get_time_series_table_schema() - - lossless = protocol_pb2.TableMetadata.TimeSeriesTableMetadata.ErrorBound.Type.LOSSLESS - error_bounds = [protocol_pb2.TableMetadata.TimeSeriesTableMetadata.ErrorBound(value=0, type=lossless) - for _ in range(len(time_series_table_schema))] - - generated_column_expressions = [b'' for _ in range(len(time_series_table_schema))] - - self.create_time_series_table_from_metadata("test_time_series_table_1", time_series_table_schema, - error_bounds, generated_column_expressions) - - print("\nCurrent tables:") - for table_name in self.list_table_names(): - print(f"{table_name}:") - print(f"{self.get_schema(table_name)}\n") - - def drop_table(self, table_name: str) -> None: - """Drop the table with the given name from the server or manager.""" - self.do_get(Ticket(f"DROP TABLE {table_name}")) - - def truncate_table(self, table_name: str) -> None: - """Truncate the table with the given name in the server or manager.""" - self.do_get(Ticket(f"TRUNCATE TABLE {table_name}")) - - def clean_up_tables(self, tables: list[str], operation: Literal["drop", "truncate"]) -> None: - """ - Clean up the given tables by either dropping them or truncating them. If no tables are given, all tables - are dropped or truncated. - """ - if len(tables) == 0: - tables = self.list_table_names() - - print(f"Cleaning up {', '.join(tables)} using {operation}...") - - for table_name in tables: - ( - self.drop_table(table_name) - if operation == "drop" - else self.truncate_table(table_name) - ) - - def vacuum(self, table_names: list[str]) -> None: - """Vacuum the given tables in the server or manager.""" - self.do_get(Ticket(f"VACUUM {', '.join(table_names)}")) - - def node_type(self) -> str: - """Return the type of the node.""" - node_type = self.do_action("NodeType", b"") - return node_type[0].body.to_pybytes().decode("utf-8") - - -def get_time_series_table_schema() -> pyarrow.Schema: - """Return a schema for a time series table with one timestamp column, three tag columns, and three field columns.""" - return pyarrow.schema([ - ("location", pyarrow.utf8()), - ("install_year", pyarrow.utf8()), - ("model", pyarrow.utf8()), - ("timestamp", pyarrow.timestamp("us")), - ("power_output", pyarrow.float32()), - ("wind_speed", pyarrow.float32()), - ("temperature", pyarrow.float32()), - ]) diff --git a/Apache-Arrow-Flight-Tester/manager.py b/Apache-Arrow-Flight-Tester/manager.py deleted file mode 100644 index ce69227..0000000 --- a/Apache-Arrow-Flight-Tester/manager.py +++ /dev/null @@ -1,61 +0,0 @@ -from pyarrow import flight -from pyarrow._flight import Result - -from common import ModelarDBFlightClient -from protobuf import protocol_pb2 -from server import ModelarDBServerFlightClient - - -class ModelarDBManagerFlightClient(ModelarDBFlightClient): - """Functionality for interacting with a ModelarDB manager using Apache Arrow Flight.""" - - def register_node(self, node_url: str, - node_mode: protocol_pb2.NodeMetadata.ServerMode) -> protocol_pb2.ManagerMetadata: - """Register a node with the given URL and mode in the manager.""" - node_metadata = protocol_pb2.NodeMetadata() - node_metadata.url = node_url - node_metadata.server_mode = node_mode - - response = self.do_action("RegisterNode", node_metadata.SerializeToString()) - - manager_metadata = protocol_pb2.ManagerMetadata() - manager_metadata.ParseFromString(response[0].body.to_pybytes()) - - return manager_metadata - - def remove_node(self, node_url: str) -> list[Result]: - """Remove the node with the given URL from the manager.""" - node_metadata = protocol_pb2.NodeMetadata() - node_metadata.url = node_url - - return self.do_action("RemoveNode", node_metadata.SerializeToString()) - - def query(self, query: str) -> None: - """ - Retrieve a cloud node that can execute the given query and execute the query on the node. It is assumed that - at least one cloud node is already registered with the manager. - """ - print("Retrieving cloud node that can execute the query...") - query_descriptor = flight.FlightDescriptor.for_command(query) - flight_info = self.flight_client.get_flight_info(query_descriptor) - - endpoint = flight_info.endpoints[0] - cloud_node_url = endpoint.locations[0] - - print(f"Executing query on {cloud_node_url}...") - cloud_client = ModelarDBServerFlightClient(cloud_node_url) - cloud_client.do_get(endpoint.ticket) - - -if __name__ == "__main__": - manager_client = ModelarDBManagerFlightClient("grpc://127.0.0.1:9998") - print(f"Node type: {manager_client.node_type()}\n") - - manager_client.create_test_tables_from_metadata() - - print(manager_client.register_node("grpc://127.0.0.1:9999", protocol_pb2.NodeMetadata.ServerMode.EDGE)) - print(manager_client.remove_node("grpc://127.0.0.1:9999")) - - manager_client.query("SELECT * FROM test_time_series_table_1 LIMIT 5") - - manager_client.clean_up_tables([], "drop") diff --git a/Apache-Arrow-Flight-Tester/protobuf/protocol.proto b/Apache-Arrow-Flight-Tester/protobuf/protocol.proto index 85242ac..d26b19e 100644 --- a/Apache-Arrow-Flight-Tester/protobuf/protocol.proto +++ b/Apache-Arrow-Flight-Tester/protobuf/protocol.proto @@ -17,47 +17,6 @@ syntax = "proto3"; package modelardb.flight.protocol; -// Metadata for the ModelarDB cluster manager, including its unique key and storage configuration. -message ManagerMetadata { - // Key used to uniquely identify the cluster manager. - string key = 1; - - // Storage configuration used to connect to an S3 object store. - message S3Configuration { - string endpoint = 1; - string bucket_name = 2; - string access_key_id = 3; - string secret_access_key = 4; - } - - // Storage configuration used to connect to an Azure Blob Storage object store. - message AzureConfiguration { - string account_name = 1; - string access_key = 2; - string container_name = 3; - } - - // Storage configuration used by the cluster manager. - oneof storage_configuration { - S3Configuration s3_configuration = 2; - AzureConfiguration azure_configuration = 3; - } -} - -// Metadata for a node in the ModelarDB cluster, including its URL and server mode. -message NodeMetadata { - enum ServerMode { - CLOUD = 0; - EDGE = 1; - } - - // gRPC URL of the node. - string url = 1; - - // Mode indicating whether the node is a cloud or edge server. - ServerMode server_mode = 2; -} - // Metadata for a normal table or a time series table. message TableMetadata { // Metadata for a normal table, including its name and schema. diff --git a/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.py b/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.py index 9d868a0..8ba9068 100644 --- a/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.py +++ b/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.py @@ -24,37 +24,27 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0eprotocol.proto\x12\x19modelardb.flight.protocol\"\xb1\x03\n\x0fManagerMetadata\x12\x0b\n\x03key\x18\x01 \x01(\t\x12V\n\x10s3_configuration\x18\x02 \x01(\x0b\x32:.modelardb.flight.protocol.ManagerMetadata.S3ConfigurationH\x00\x12\\\n\x13\x61zure_configuration\x18\x03 \x01(\x0b\x32=.modelardb.flight.protocol.ManagerMetadata.AzureConfigurationH\x00\x1aj\n\x0fS3Configuration\x12\x10\n\x08\x65ndpoint\x18\x01 \x01(\t\x12\x13\n\x0b\x62ucket_name\x18\x02 \x01(\t\x12\x15\n\raccess_key_id\x18\x03 \x01(\t\x12\x19\n\x11secret_access_key\x18\x04 \x01(\t\x1aV\n\x12\x41zureConfiguration\x12\x14\n\x0c\x61\x63\x63ount_name\x18\x01 \x01(\t\x12\x12\n\naccess_key\x18\x02 \x01(\t\x12\x16\n\x0e\x63ontainer_name\x18\x03 \x01(\tB\x17\n\x15storage_configuration\"\x87\x01\n\x0cNodeMetadata\x12\x0b\n\x03url\x18\x01 \x01(\t\x12G\n\x0bserver_mode\x18\x02 \x01(\x0e\x32\x32.modelardb.flight.protocol.NodeMetadata.ServerMode\"!\n\nServerMode\x12\t\n\x05\x43LOUD\x10\x00\x12\x08\n\x04\x45\x44GE\x10\x01\"\xfe\x04\n\rTableMetadata\x12T\n\x0cnormal_table\x18\x01 \x01(\x0b\x32<.modelardb.flight.protocol.TableMetadata.NormalTableMetadataH\x00\x12]\n\x11time_series_table\x18\x02 \x01(\x0b\x32@.modelardb.flight.protocol.TableMetadata.TimeSeriesTableMetadataH\x00\x1a\x33\n\x13NormalTableMetadata\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x02 \x01(\x0c\x1a\xf0\x02\n\x17TimeSeriesTableMetadata\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x02 \x01(\x0c\x12\x61\n\x0c\x65rror_bounds\x18\x03 \x03(\x0b\x32K.modelardb.flight.protocol.TableMetadata.TimeSeriesTableMetadata.ErrorBound\x12$\n\x1cgenerated_column_expressions\x18\x04 \x03(\x0c\x1a\xad\x01\n\nErrorBound\x12^\n\x04type\x18\x01 \x01(\x0e\x32P.modelardb.flight.protocol.TableMetadata.TimeSeriesTableMetadata.ErrorBound.Type\x12\r\n\x05value\x18\x02 \x01(\x02\"0\n\x04Type\x12\x0c\n\x08\x41\x42SOLUTE\x10\x00\x12\x0c\n\x08RELATIVE\x10\x01\x12\x0c\n\x08LOSSLESS\x10\x02\x42\x10\n\x0etable_metadata\"\xfa\x02\n\rConfiguration\x12-\n%multivariate_reserved_memory_in_bytes\x18\x01 \x01(\x04\x12-\n%uncompressed_reserved_memory_in_bytes\x18\x02 \x01(\x04\x12+\n#compressed_reserved_memory_in_bytes\x18\x03 \x01(\x04\x12)\n\x1ctransfer_batch_size_in_bytes\x18\x04 \x01(\x04H\x00\x88\x01\x01\x12%\n\x18transfer_time_in_seconds\x18\x05 \x01(\x04H\x01\x88\x01\x01\x12\x19\n\x11ingestion_threads\x18\x06 \x01(\r\x12\x1b\n\x13\x63ompression_threads\x18\x07 \x01(\r\x12\x16\n\x0ewriter_threads\x18\x08 \x01(\rB\x1f\n\x1d_transfer_batch_size_in_bytesB\x1b\n\x19_transfer_time_in_seconds\"\xcf\x02\n\x13UpdateConfiguration\x12G\n\x07setting\x18\x01 \x01(\x0e\x32\x36.modelardb.flight.protocol.UpdateConfiguration.Setting\x12\x16\n\tnew_value\x18\x02 \x01(\x04H\x00\x88\x01\x01\"\xc8\x01\n\x07Setting\x12)\n%MULTIVARIATE_RESERVED_MEMORY_IN_BYTES\x10\x00\x12)\n%UNCOMPRESSED_RESERVED_MEMORY_IN_BYTES\x10\x01\x12\'\n#COMPRESSED_RESERVED_MEMORY_IN_BYTES\x10\x02\x12 \n\x1cTRANSFER_BATCH_SIZE_IN_BYTES\x10\x03\x12\x1c\n\x18TRANSFER_TIME_IN_SECONDS\x10\x04\x42\x0c\n\n_new_valueb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0eprotocol.proto\x12\x19modelardb.flight.protocol\"\xfe\x04\n\rTableMetadata\x12T\n\x0cnormal_table\x18\x01 \x01(\x0b\x32<.modelardb.flight.protocol.TableMetadata.NormalTableMetadataH\x00\x12]\n\x11time_series_table\x18\x02 \x01(\x0b\x32@.modelardb.flight.protocol.TableMetadata.TimeSeriesTableMetadataH\x00\x1a\x33\n\x13NormalTableMetadata\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x02 \x01(\x0c\x1a\xf0\x02\n\x17TimeSeriesTableMetadata\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x02 \x01(\x0c\x12\x61\n\x0c\x65rror_bounds\x18\x03 \x03(\x0b\x32K.modelardb.flight.protocol.TableMetadata.TimeSeriesTableMetadata.ErrorBound\x12$\n\x1cgenerated_column_expressions\x18\x04 \x03(\x0c\x1a\xad\x01\n\nErrorBound\x12^\n\x04type\x18\x01 \x01(\x0e\x32P.modelardb.flight.protocol.TableMetadata.TimeSeriesTableMetadata.ErrorBound.Type\x12\r\n\x05value\x18\x02 \x01(\x02\"0\n\x04Type\x12\x0c\n\x08\x41\x42SOLUTE\x10\x00\x12\x0c\n\x08RELATIVE\x10\x01\x12\x0c\n\x08LOSSLESS\x10\x02\x42\x10\n\x0etable_metadata\"\xfa\x02\n\rConfiguration\x12-\n%multivariate_reserved_memory_in_bytes\x18\x01 \x01(\x04\x12-\n%uncompressed_reserved_memory_in_bytes\x18\x02 \x01(\x04\x12+\n#compressed_reserved_memory_in_bytes\x18\x03 \x01(\x04\x12)\n\x1ctransfer_batch_size_in_bytes\x18\x04 \x01(\x04H\x00\x88\x01\x01\x12%\n\x18transfer_time_in_seconds\x18\x05 \x01(\x04H\x01\x88\x01\x01\x12\x19\n\x11ingestion_threads\x18\x06 \x01(\r\x12\x1b\n\x13\x63ompression_threads\x18\x07 \x01(\r\x12\x16\n\x0ewriter_threads\x18\x08 \x01(\rB\x1f\n\x1d_transfer_batch_size_in_bytesB\x1b\n\x19_transfer_time_in_seconds\"\xcf\x02\n\x13UpdateConfiguration\x12G\n\x07setting\x18\x01 \x01(\x0e\x32\x36.modelardb.flight.protocol.UpdateConfiguration.Setting\x12\x16\n\tnew_value\x18\x02 \x01(\x04H\x00\x88\x01\x01\"\xc8\x01\n\x07Setting\x12)\n%MULTIVARIATE_RESERVED_MEMORY_IN_BYTES\x10\x00\x12)\n%UNCOMPRESSED_RESERVED_MEMORY_IN_BYTES\x10\x01\x12\'\n#COMPRESSED_RESERVED_MEMORY_IN_BYTES\x10\x02\x12 \n\x1cTRANSFER_BATCH_SIZE_IN_BYTES\x10\x03\x12\x1c\n\x18TRANSFER_TIME_IN_SECONDS\x10\x04\x42\x0c\n\n_new_valueb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'protocol_pb2', _globals) if not _descriptor._USE_C_DESCRIPTORS: DESCRIPTOR._loaded_options = None - _globals['_MANAGERMETADATA']._serialized_start=46 - _globals['_MANAGERMETADATA']._serialized_end=479 - _globals['_MANAGERMETADATA_S3CONFIGURATION']._serialized_start=260 - _globals['_MANAGERMETADATA_S3CONFIGURATION']._serialized_end=366 - _globals['_MANAGERMETADATA_AZURECONFIGURATION']._serialized_start=368 - _globals['_MANAGERMETADATA_AZURECONFIGURATION']._serialized_end=454 - _globals['_NODEMETADATA']._serialized_start=482 - _globals['_NODEMETADATA']._serialized_end=617 - _globals['_NODEMETADATA_SERVERMODE']._serialized_start=584 - _globals['_NODEMETADATA_SERVERMODE']._serialized_end=617 - _globals['_TABLEMETADATA']._serialized_start=620 - _globals['_TABLEMETADATA']._serialized_end=1258 - _globals['_TABLEMETADATA_NORMALTABLEMETADATA']._serialized_start=818 - _globals['_TABLEMETADATA_NORMALTABLEMETADATA']._serialized_end=869 - _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA']._serialized_start=872 - _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA']._serialized_end=1240 - _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND']._serialized_start=1067 - _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND']._serialized_end=1240 - _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND_TYPE']._serialized_start=1192 - _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND_TYPE']._serialized_end=1240 - _globals['_CONFIGURATION']._serialized_start=1261 - _globals['_CONFIGURATION']._serialized_end=1639 - _globals['_UPDATECONFIGURATION']._serialized_start=1642 - _globals['_UPDATECONFIGURATION']._serialized_end=1977 - _globals['_UPDATECONFIGURATION_SETTING']._serialized_start=1763 - _globals['_UPDATECONFIGURATION_SETTING']._serialized_end=1963 + _globals['_TABLEMETADATA']._serialized_start=46 + _globals['_TABLEMETADATA']._serialized_end=684 + _globals['_TABLEMETADATA_NORMALTABLEMETADATA']._serialized_start=244 + _globals['_TABLEMETADATA_NORMALTABLEMETADATA']._serialized_end=295 + _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA']._serialized_start=298 + _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA']._serialized_end=666 + _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND']._serialized_start=493 + _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND']._serialized_end=666 + _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND_TYPE']._serialized_start=618 + _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND_TYPE']._serialized_end=666 + _globals['_CONFIGURATION']._serialized_start=687 + _globals['_CONFIGURATION']._serialized_end=1065 + _globals['_UPDATECONFIGURATION']._serialized_start=1068 + _globals['_UPDATECONFIGURATION']._serialized_end=1403 + _globals['_UPDATECONFIGURATION_SETTING']._serialized_start=1189 + _globals['_UPDATECONFIGURATION_SETTING']._serialized_end=1389 # @@protoc_insertion_point(module_scope) diff --git a/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.pyi b/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.pyi index e5b4de4..01f2fc5 100644 --- a/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.pyi +++ b/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.pyi @@ -7,50 +7,6 @@ from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union DESCRIPTOR: _descriptor.FileDescriptor -class ManagerMetadata(_message.Message): - __slots__ = ("key", "s3_configuration", "azure_configuration") - class S3Configuration(_message.Message): - __slots__ = ("endpoint", "bucket_name", "access_key_id", "secret_access_key") - ENDPOINT_FIELD_NUMBER: _ClassVar[int] - BUCKET_NAME_FIELD_NUMBER: _ClassVar[int] - ACCESS_KEY_ID_FIELD_NUMBER: _ClassVar[int] - SECRET_ACCESS_KEY_FIELD_NUMBER: _ClassVar[int] - endpoint: str - bucket_name: str - access_key_id: str - secret_access_key: str - def __init__(self, endpoint: _Optional[str] = ..., bucket_name: _Optional[str] = ..., access_key_id: _Optional[str] = ..., secret_access_key: _Optional[str] = ...) -> None: ... - class AzureConfiguration(_message.Message): - __slots__ = ("account_name", "access_key", "container_name") - ACCOUNT_NAME_FIELD_NUMBER: _ClassVar[int] - ACCESS_KEY_FIELD_NUMBER: _ClassVar[int] - CONTAINER_NAME_FIELD_NUMBER: _ClassVar[int] - account_name: str - access_key: str - container_name: str - def __init__(self, account_name: _Optional[str] = ..., access_key: _Optional[str] = ..., container_name: _Optional[str] = ...) -> None: ... - KEY_FIELD_NUMBER: _ClassVar[int] - S3_CONFIGURATION_FIELD_NUMBER: _ClassVar[int] - AZURE_CONFIGURATION_FIELD_NUMBER: _ClassVar[int] - key: str - s3_configuration: ManagerMetadata.S3Configuration - azure_configuration: ManagerMetadata.AzureConfiguration - def __init__(self, key: _Optional[str] = ..., s3_configuration: _Optional[_Union[ManagerMetadata.S3Configuration, _Mapping]] = ..., azure_configuration: _Optional[_Union[ManagerMetadata.AzureConfiguration, _Mapping]] = ...) -> None: ... - -class NodeMetadata(_message.Message): - __slots__ = ("url", "server_mode") - class ServerMode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): - __slots__ = () - CLOUD: _ClassVar[NodeMetadata.ServerMode] - EDGE: _ClassVar[NodeMetadata.ServerMode] - CLOUD: NodeMetadata.ServerMode - EDGE: NodeMetadata.ServerMode - URL_FIELD_NUMBER: _ClassVar[int] - SERVER_MODE_FIELD_NUMBER: _ClassVar[int] - url: str - server_mode: NodeMetadata.ServerMode - def __init__(self, url: _Optional[str] = ..., server_mode: _Optional[_Union[NodeMetadata.ServerMode, str]] = ...) -> None: ... - class TableMetadata(_message.Message): __slots__ = ("normal_table", "time_series_table") class NormalTableMetadata(_message.Message): diff --git a/Apache-Arrow-Flight-Tester/server.py b/Apache-Arrow-Flight-Tester/server.py index b7c7cfb..035520e 100644 --- a/Apache-Arrow-Flight-Tester/server.py +++ b/Apache-Arrow-Flight-Tester/server.py @@ -1,24 +1,86 @@ -import time -from random import randrange - import pyarrow from pyarrow import flight from pyarrow._flight import Result, Ticket -from common import ModelarDBFlightClient, get_time_series_table_schema +import util +from wrapper import FlightClientWrapper from protobuf import protocol_pb2 -class ModelarDBServerFlightClient(ModelarDBFlightClient): +class ModelarDBServerFlightClient(FlightClientWrapper): """Functionality for interacting with a ModelarDB server using Apache Arrow Flight.""" - def do_put(self, table_name: str, record_batch: pyarrow.RecordBatch) -> None: - """Insert the data in the given record batch into the table with the given table name.""" - upload_descriptor = flight.FlightDescriptor.for_path(table_name) - writer, _ = self.flight_client.do_put(upload_descriptor, record_batch.schema) + def list_table_names(self) -> list[str]: + """Return the names of the tables in the server.""" + flights = self.list_flights() + return [table_name.decode("utf-8") for table_name in flights[0].descriptor.path] + + def workload_balanced_query(self, query: str) -> None: + """ + Retrieve a cloud node that can execute the given query and execute the query on the node. It is assumed that + the cluster has at least one cloud node. + """ + print("Retrieving cloud node that can execute the query...") + query_descriptor = flight.FlightDescriptor.for_command(query) + flight_info = self.flight_client.get_flight_info(query_descriptor) + + endpoint = flight_info.endpoints[0] + cloud_node_url = endpoint.locations[0] + + print(f"Executing query on {cloud_node_url}...") + cloud_client = ModelarDBServerFlightClient(cloud_node_url) + cloud_client.do_get(endpoint.ticket) + + def create_table(self, table_name: str, columns: list[tuple[str, str]], time_series_table=False) -> None: + """ + Create a table in the server with the given name and columns. Each pair in columns should have the + format (column_name, column_type). + """ + create_table = ( + "CREATE TIME SERIES TABLE" if time_series_table else "CREATE TABLE" + ) + sql = f"{create_table} {table_name}({', '.join([f'{column[0]} {column[1]}' for column in columns])})" + + self.do_get(Ticket(sql)) + + def drop_tables(self, table_names: list[str]) -> None: + """Drop the given tables in the server.""" + self.do_get(Ticket(f"DROP TABLE {', '.join(table_names)}")) + + def truncate_tables(self, table_names: list[str]) -> None: + """Truncate the given tables in the server.""" + self.do_get(Ticket(f"TRUNCATE {', '.join(table_names)}")) + + def vacuum_tables(self, table_names: list[str]) -> None: + """Vacuum the given tables in the server.""" + self.do_get(Ticket(f"VACUUM {', '.join(table_names)}")) + + def create_normal_table_from_metadata(self, table_name: str, schema: pyarrow.Schema) -> None: + """Create a normal table using the table name and schema.""" + normal_table_metadata = protocol_pb2.TableMetadata.NormalTableMetadata() + + normal_table_metadata.name = table_name + normal_table_metadata.schema = schema.serialize().to_pybytes() - writer.write(record_batch) - writer.close() + table_metadata = protocol_pb2.TableMetadata() + table_metadata.normal_table.CopyFrom(normal_table_metadata) + + self.do_action("CreateTable", table_metadata.SerializeToString()) + + def create_time_series_table_from_metadata(self, table_name: str, schema: pyarrow.Schema, error_bounds: list[ + protocol_pb2.TableMetadata.TimeSeriesTableMetadata.ErrorBound], generated_columns: list[bytes]) -> None: + """Create a time series table using the table name, schema, error bounds, and generated columns.""" + time_series_table_metadata = protocol_pb2.TableMetadata.TimeSeriesTableMetadata() + + time_series_table_metadata.name = table_name + time_series_table_metadata.schema = schema.serialize().to_pybytes() + time_series_table_metadata.error_bounds.extend(error_bounds) + time_series_table_metadata.generated_column_expressions.extend(generated_columns) + + table_metadata = protocol_pb2.TableMetadata() + table_metadata.time_series_table.CopyFrom(time_series_table_metadata) + + self.do_action("CreateTable", table_metadata.SerializeToString()) def get_configuration(self) -> protocol_pb2.Configuration: """Get the current configuration of the server.""" @@ -38,62 +100,22 @@ def update_configuration(self, setting: protocol_pb2.UpdateConfiguration.Setting return self.do_action("UpdateConfiguration", update_configuration.SerializeToString()) - def ingest_into_server_and_query_table(self, table_name: str, num_rows: int) -> None: - """ - Ingest num_rows rows into the table, flush the memory of the server, and query the first five rows of the table. - """ - record_batch = create_record_batch(num_rows) - - print(f"Ingesting data into {table_name}...\n") - self.do_put(table_name, record_batch) - - print("Flushing memory of the edge...\n") - self.do_action("FlushMemory", b"") - - print(f"First five rows of {table_name}:") - self.do_get(Ticket(f"SELECT * FROM {table_name} LIMIT 5")) - - -def create_record_batch(num_rows: int) -> pyarrow.RecordBatch: - """ - Create a record batch with num_rows rows of randomly generated data for a table with one timestamp column, - three tag columns, and three field columns. - """ - schema = get_time_series_table_schema() - - location = ["aalborg" if i % 2 == 0 else "nibe" for i in range(num_rows)] - install_year = ["2021" if i % 2 == 0 else "2022" for i in range(num_rows)] - model = ["w72" if i % 2 == 0 else "w73" for i in range(num_rows)] - - timestamp = [round(time.time() * 1000000) + (i * 1000000) for i in range(num_rows)] - power_output = [float(randrange(0, 30)) for _ in range(num_rows)] - wind_speed = [float(randrange(50, 100)) for _ in range(num_rows)] - temperature = [float(randrange(0, 40)) for _ in range(num_rows)] - - return pyarrow.RecordBatch.from_arrays( - [ - location, - install_year, - model, - timestamp, - power_output, - wind_speed, - temperature, - ], - schema=schema, - ) + def node_type(self) -> str: + """Return the type of the node.""" + node_type = self.do_action("NodeType", b"") + return node_type[0].body.to_pybytes().decode("utf-8") if __name__ == "__main__": server_client = ModelarDBServerFlightClient("grpc://127.0.0.1:9999") print(f"Node type: {server_client.node_type()}\n") - server_client.create_test_tables() - server_client.ingest_into_server_and_query_table("test_time_series_table_1", 10000) + util.create_test_tables(server_client) + util.ingest_into_server_and_query_table(server_client, "test_time_series_table_1", 10000) print("\nCurrent configuration:") server_client.update_configuration(protocol_pb2.UpdateConfiguration.Setting.COMPRESSED_RESERVED_MEMORY_IN_BYTES, 10000000) print(server_client.get_configuration()) - server_client.clean_up_tables([], "drop") + util.clean_up_tables(server_client, [], "drop") diff --git a/Apache-Arrow-Flight-Tester/util.py b/Apache-Arrow-Flight-Tester/util.py new file mode 100644 index 0000000..758562a --- /dev/null +++ b/Apache-Arrow-Flight-Tester/util.py @@ -0,0 +1,149 @@ +import time +from random import randrange +from typing import Literal + +import pyarrow +from protobuf import protocol_pb2 +from pyarrow._flight import Ticket + +from server import ModelarDBServerFlightClient + + +def create_record_batch(num_rows: int) -> pyarrow.RecordBatch: + """ + Create a record batch with num_rows rows of randomly generated data for a table with one timestamp column, + three tag columns, and three field columns. + """ + schema = get_time_series_table_schema() + + location = ["aalborg" if i % 2 == 0 else "nibe" for i in range(num_rows)] + install_year = ["2021" if i % 2 == 0 else "2022" for i in range(num_rows)] + model = ["w72" if i % 2 == 0 else "w73" for i in range(num_rows)] + + timestamp = [round(time.time() * 1000000) + (i * 1000000) for i in range(num_rows)] + power_output = [float(randrange(0, 30)) for _ in range(num_rows)] + wind_speed = [float(randrange(50, 100)) for _ in range(num_rows)] + temperature = [float(randrange(0, 40)) for _ in range(num_rows)] + + return pyarrow.RecordBatch.from_arrays( + [ + location, + install_year, + model, + timestamp, + power_output, + wind_speed, + temperature, + ], + schema=schema, + ) + + +def get_time_series_table_schema() -> pyarrow.Schema: + """Return a schema for a time series table with one timestamp column, three tag columns, and three field columns.""" + return pyarrow.schema([ + ("location", pyarrow.utf8()), + ("install_year", pyarrow.utf8()), + ("model", pyarrow.utf8()), + ("timestamp", pyarrow.timestamp("us")), + ("power_output", pyarrow.float32()), + ("wind_speed", pyarrow.float32()), + ("temperature", pyarrow.float32()), + ]) + + +def create_test_tables(server_client: ModelarDBServerFlightClient) -> None: + """ + Create a normal table and a time series table using the flight client, print the current tables to ensure the + created tables are included, and print the schema to ensure the tables are created correctly. + """ + print("Creating test tables...") + + server_client.create_table( + "test_table_1", + [("timestamp", "TIMESTAMP"), ("values", "REAL"), ("metadata", "REAL")], + ) + server_client.create_table( + "test_time_series_table_1", + [ + ("location", "TAG"), + ("install_year", "TAG"), + ("model", "TAG"), + ("timestamp", "TIMESTAMP"), + ("power_output", "FIELD"), + ("wind_speed", "FIELD"), + ("temperature", "FIELD(5%)"), + ], + time_series_table=True, + ) + + print("\nCurrent tables:") + for table_name in server_client.list_table_names(): + print(f"{table_name}:") + print(f"{server_client.get_schema(table_name)}\n") + + +def create_test_tables_from_metadata(server_client: ModelarDBServerFlightClient) -> None: + """ + Create a normal table and a time series table using the CreateTable action, print the current tables to ensure + the created tables are included, and print the schema to ensure the tables are created correctly. + """ + print("Creating test tables from metadata...") + + normal_table_schema = pyarrow.schema([ + ("timestamp", pyarrow.timestamp("us")), + ("values", pyarrow.float32()), + ("metadata", pyarrow.utf8()) + ]) + + server_client.create_normal_table_from_metadata("test_table_1", normal_table_schema) + + time_series_table_schema = get_time_series_table_schema() + + lossless = protocol_pb2.TableMetadata.TimeSeriesTableMetadata.ErrorBound.Type.LOSSLESS + error_bounds = [protocol_pb2.TableMetadata.TimeSeriesTableMetadata.ErrorBound(value=0, type=lossless) + for _ in range(len(time_series_table_schema))] + + generated_column_expressions = [b'' for _ in range(len(time_series_table_schema))] + + server_client.create_time_series_table_from_metadata("test_time_series_table_1", time_series_table_schema, + error_bounds, generated_column_expressions) + + print("\nCurrent tables:") + for table_name in server_client.list_table_names(): + print(f"{table_name}:") + print(f"{server_client.get_schema(table_name)}\n") + + +def ingest_into_server_and_query_table(server_client: ModelarDBServerFlightClient, table_name: str, + num_rows: int) -> None: + """ + Ingest num_rows rows into the table, flush the memory of the server, and query the first five rows of the table. + """ + record_batch = create_record_batch(num_rows) + + print(f"Ingesting data into {table_name}...\n") + server_client.do_put(table_name, record_batch) + + print("Flushing memory of the node...\n") + server_client.do_action("FlushMemory", b"") + + print(f"First five rows of {table_name}:") + server_client.do_get(Ticket(f"SELECT * FROM {table_name} LIMIT 5")) + + +def clean_up_tables(server_client: ModelarDBServerFlightClient, tables: list[str], + operation: Literal["drop", "truncate"]) -> None: + """ + Clean up the given tables by either dropping them or truncating them. If no tables are given, all tables + are dropped or truncated. + """ + if len(tables) == 0: + tables = server_client.list_table_names() + + print(f"Cleaning up {', '.join(tables)} using {operation}...") + + if operation == "drop": + server_client.drop_tables(tables) + else: + server_client.truncate_tables(tables) diff --git a/Apache-Arrow-Flight-Tester/wrapper.py b/Apache-Arrow-Flight-Tester/wrapper.py new file mode 100644 index 0000000..c8f2142 --- /dev/null +++ b/Apache-Arrow-Flight-Tester/wrapper.py @@ -0,0 +1,53 @@ +import pyarrow +import pprint + +from pyarrow import flight, Schema +from pyarrow._flight import FlightInfo, ActionType, Result, Ticket + + +class FlightClientWrapper: + """Wrapper around the FlightClient class to simplify interaction with an Apache Arrow Flight server.""" + + def __init__(self, location: str): + self.flight_client = flight.FlightClient(location) + + def list_flights(self) -> list[FlightInfo]: + """Wrapper around the list_flights method of the FlightClient class.""" + response = self.flight_client.list_flights() + + return list(response) + + def get_schema(self, table_name: str) -> Schema: + """Wrapper around the get_schema method of the FlightClient class.""" + upload_descriptor = flight.FlightDescriptor.for_path(table_name) + response = self.flight_client.get_schema(upload_descriptor) + + return response.schema + + def do_get(self, ticket: Ticket) -> None: + """Wrapper around the do_get method of the FlightClient class.""" + response = self.flight_client.do_get(ticket) + + for batch in response: + pprint.pprint(batch.data.to_pydict()) + + def do_put(self, table_name: str, record_batch: pyarrow.RecordBatch) -> None: + """Wrapper around the do_put method of the FlightClient class.""" + upload_descriptor = flight.FlightDescriptor.for_path(table_name) + writer, _ = self.flight_client.do_put(upload_descriptor, record_batch.schema) + + writer.write(record_batch) + writer.close() + + def do_action(self, action_type: str, action_body: bytes) -> list[Result]: + """Wrapper around the do_action method of the FlightClient class.""" + action = flight.Action(action_type, action_body) + response = self.flight_client.do_action(action) + + return list(response) + + def list_actions(self) -> list[ActionType]: + """Wrapper around the list_actions method of the FlightClient class.""" + response = self.flight_client.list_actions() + + return list(response) diff --git a/README.md b/README.md index 65a7941..2854452 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,6 @@ # Utilities Utilities for simplifying development and testing developed in this repository: -- [Apache Arrow Flight manager testing](Apache-Arrow-Flight-Tester/manager.py) is a script written in - [Python 3](https://www.python.org/) to test the different endpoints of the ModelarDB manager Apache Arrow Flight API. - - [Apache Arrow Flight server testing](Apache-Arrow-Flight-Tester/server.py) is a script written in [Python 3](https://www.python.org/) to test the different endpoints of the ModelarDB server Apache Arrow Flight API.