From a51da0de974ca945290666bae4f3501e3469a54f Mon Sep 17 00:00:00 2001 From: CGodiksen <36046286+CGodiksen@users.noreply.github.com> Date: Thu, 20 Nov 2025 21:25:11 +0100 Subject: [PATCH 01/12] Remove mention of manager from readme --- README.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/README.md b/README.md index 65a7941..2854452 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,6 @@ # Utilities Utilities for simplifying development and testing developed in this repository: -- [Apache Arrow Flight manager testing](Apache-Arrow-Flight-Tester/manager.py) is a script written in - [Python 3](https://www.python.org/) to test the different endpoints of the ModelarDB manager Apache Arrow Flight API. - - [Apache Arrow Flight server testing](Apache-Arrow-Flight-Tester/server.py) is a script written in [Python 3](https://www.python.org/) to test the different endpoints of the ModelarDB server Apache Arrow Flight API. From c8dbeb34787046f735a2d4fc53740ad2eb6c63d4 Mon Sep 17 00:00:00 2001 From: CGodiksen <36046286+CGodiksen@users.noreply.github.com> Date: Thu, 20 Nov 2025 21:26:47 +0100 Subject: [PATCH 02/12] Update proto file with the latest changes --- .../protobuf/protocol.proto | 41 ----------------- .../protobuf/protocol_pb2.py | 44 +++++++------------ .../protobuf/protocol_pb2.pyi | 44 ------------------- 3 files changed, 17 insertions(+), 112 deletions(-) diff --git a/Apache-Arrow-Flight-Tester/protobuf/protocol.proto b/Apache-Arrow-Flight-Tester/protobuf/protocol.proto index 85242ac..d26b19e 100644 --- a/Apache-Arrow-Flight-Tester/protobuf/protocol.proto +++ b/Apache-Arrow-Flight-Tester/protobuf/protocol.proto @@ -17,47 +17,6 @@ syntax = "proto3"; package modelardb.flight.protocol; -// Metadata for the ModelarDB cluster manager, including its unique key and storage configuration. -message ManagerMetadata { - // Key used to uniquely identify the cluster manager. - string key = 1; - - // Storage configuration used to connect to an S3 object store. - message S3Configuration { - string endpoint = 1; - string bucket_name = 2; - string access_key_id = 3; - string secret_access_key = 4; - } - - // Storage configuration used to connect to an Azure Blob Storage object store. - message AzureConfiguration { - string account_name = 1; - string access_key = 2; - string container_name = 3; - } - - // Storage configuration used by the cluster manager. - oneof storage_configuration { - S3Configuration s3_configuration = 2; - AzureConfiguration azure_configuration = 3; - } -} - -// Metadata for a node in the ModelarDB cluster, including its URL and server mode. -message NodeMetadata { - enum ServerMode { - CLOUD = 0; - EDGE = 1; - } - - // gRPC URL of the node. - string url = 1; - - // Mode indicating whether the node is a cloud or edge server. - ServerMode server_mode = 2; -} - // Metadata for a normal table or a time series table. message TableMetadata { // Metadata for a normal table, including its name and schema. diff --git a/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.py b/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.py index 9d868a0..8ba9068 100644 --- a/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.py +++ b/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.py @@ -24,37 +24,27 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0eprotocol.proto\x12\x19modelardb.flight.protocol\"\xb1\x03\n\x0fManagerMetadata\x12\x0b\n\x03key\x18\x01 \x01(\t\x12V\n\x10s3_configuration\x18\x02 \x01(\x0b\x32:.modelardb.flight.protocol.ManagerMetadata.S3ConfigurationH\x00\x12\\\n\x13\x61zure_configuration\x18\x03 \x01(\x0b\x32=.modelardb.flight.protocol.ManagerMetadata.AzureConfigurationH\x00\x1aj\n\x0fS3Configuration\x12\x10\n\x08\x65ndpoint\x18\x01 \x01(\t\x12\x13\n\x0b\x62ucket_name\x18\x02 \x01(\t\x12\x15\n\raccess_key_id\x18\x03 \x01(\t\x12\x19\n\x11secret_access_key\x18\x04 \x01(\t\x1aV\n\x12\x41zureConfiguration\x12\x14\n\x0c\x61\x63\x63ount_name\x18\x01 \x01(\t\x12\x12\n\naccess_key\x18\x02 \x01(\t\x12\x16\n\x0e\x63ontainer_name\x18\x03 \x01(\tB\x17\n\x15storage_configuration\"\x87\x01\n\x0cNodeMetadata\x12\x0b\n\x03url\x18\x01 \x01(\t\x12G\n\x0bserver_mode\x18\x02 \x01(\x0e\x32\x32.modelardb.flight.protocol.NodeMetadata.ServerMode\"!\n\nServerMode\x12\t\n\x05\x43LOUD\x10\x00\x12\x08\n\x04\x45\x44GE\x10\x01\"\xfe\x04\n\rTableMetadata\x12T\n\x0cnormal_table\x18\x01 \x01(\x0b\x32<.modelardb.flight.protocol.TableMetadata.NormalTableMetadataH\x00\x12]\n\x11time_series_table\x18\x02 \x01(\x0b\x32@.modelardb.flight.protocol.TableMetadata.TimeSeriesTableMetadataH\x00\x1a\x33\n\x13NormalTableMetadata\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x02 \x01(\x0c\x1a\xf0\x02\n\x17TimeSeriesTableMetadata\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x02 \x01(\x0c\x12\x61\n\x0c\x65rror_bounds\x18\x03 \x03(\x0b\x32K.modelardb.flight.protocol.TableMetadata.TimeSeriesTableMetadata.ErrorBound\x12$\n\x1cgenerated_column_expressions\x18\x04 \x03(\x0c\x1a\xad\x01\n\nErrorBound\x12^\n\x04type\x18\x01 \x01(\x0e\x32P.modelardb.flight.protocol.TableMetadata.TimeSeriesTableMetadata.ErrorBound.Type\x12\r\n\x05value\x18\x02 \x01(\x02\"0\n\x04Type\x12\x0c\n\x08\x41\x42SOLUTE\x10\x00\x12\x0c\n\x08RELATIVE\x10\x01\x12\x0c\n\x08LOSSLESS\x10\x02\x42\x10\n\x0etable_metadata\"\xfa\x02\n\rConfiguration\x12-\n%multivariate_reserved_memory_in_bytes\x18\x01 \x01(\x04\x12-\n%uncompressed_reserved_memory_in_bytes\x18\x02 \x01(\x04\x12+\n#compressed_reserved_memory_in_bytes\x18\x03 \x01(\x04\x12)\n\x1ctransfer_batch_size_in_bytes\x18\x04 \x01(\x04H\x00\x88\x01\x01\x12%\n\x18transfer_time_in_seconds\x18\x05 \x01(\x04H\x01\x88\x01\x01\x12\x19\n\x11ingestion_threads\x18\x06 \x01(\r\x12\x1b\n\x13\x63ompression_threads\x18\x07 \x01(\r\x12\x16\n\x0ewriter_threads\x18\x08 \x01(\rB\x1f\n\x1d_transfer_batch_size_in_bytesB\x1b\n\x19_transfer_time_in_seconds\"\xcf\x02\n\x13UpdateConfiguration\x12G\n\x07setting\x18\x01 \x01(\x0e\x32\x36.modelardb.flight.protocol.UpdateConfiguration.Setting\x12\x16\n\tnew_value\x18\x02 \x01(\x04H\x00\x88\x01\x01\"\xc8\x01\n\x07Setting\x12)\n%MULTIVARIATE_RESERVED_MEMORY_IN_BYTES\x10\x00\x12)\n%UNCOMPRESSED_RESERVED_MEMORY_IN_BYTES\x10\x01\x12\'\n#COMPRESSED_RESERVED_MEMORY_IN_BYTES\x10\x02\x12 \n\x1cTRANSFER_BATCH_SIZE_IN_BYTES\x10\x03\x12\x1c\n\x18TRANSFER_TIME_IN_SECONDS\x10\x04\x42\x0c\n\n_new_valueb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0eprotocol.proto\x12\x19modelardb.flight.protocol\"\xfe\x04\n\rTableMetadata\x12T\n\x0cnormal_table\x18\x01 \x01(\x0b\x32<.modelardb.flight.protocol.TableMetadata.NormalTableMetadataH\x00\x12]\n\x11time_series_table\x18\x02 \x01(\x0b\x32@.modelardb.flight.protocol.TableMetadata.TimeSeriesTableMetadataH\x00\x1a\x33\n\x13NormalTableMetadata\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x02 \x01(\x0c\x1a\xf0\x02\n\x17TimeSeriesTableMetadata\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x02 \x01(\x0c\x12\x61\n\x0c\x65rror_bounds\x18\x03 \x03(\x0b\x32K.modelardb.flight.protocol.TableMetadata.TimeSeriesTableMetadata.ErrorBound\x12$\n\x1cgenerated_column_expressions\x18\x04 \x03(\x0c\x1a\xad\x01\n\nErrorBound\x12^\n\x04type\x18\x01 \x01(\x0e\x32P.modelardb.flight.protocol.TableMetadata.TimeSeriesTableMetadata.ErrorBound.Type\x12\r\n\x05value\x18\x02 \x01(\x02\"0\n\x04Type\x12\x0c\n\x08\x41\x42SOLUTE\x10\x00\x12\x0c\n\x08RELATIVE\x10\x01\x12\x0c\n\x08LOSSLESS\x10\x02\x42\x10\n\x0etable_metadata\"\xfa\x02\n\rConfiguration\x12-\n%multivariate_reserved_memory_in_bytes\x18\x01 \x01(\x04\x12-\n%uncompressed_reserved_memory_in_bytes\x18\x02 \x01(\x04\x12+\n#compressed_reserved_memory_in_bytes\x18\x03 \x01(\x04\x12)\n\x1ctransfer_batch_size_in_bytes\x18\x04 \x01(\x04H\x00\x88\x01\x01\x12%\n\x18transfer_time_in_seconds\x18\x05 \x01(\x04H\x01\x88\x01\x01\x12\x19\n\x11ingestion_threads\x18\x06 \x01(\r\x12\x1b\n\x13\x63ompression_threads\x18\x07 \x01(\r\x12\x16\n\x0ewriter_threads\x18\x08 \x01(\rB\x1f\n\x1d_transfer_batch_size_in_bytesB\x1b\n\x19_transfer_time_in_seconds\"\xcf\x02\n\x13UpdateConfiguration\x12G\n\x07setting\x18\x01 \x01(\x0e\x32\x36.modelardb.flight.protocol.UpdateConfiguration.Setting\x12\x16\n\tnew_value\x18\x02 \x01(\x04H\x00\x88\x01\x01\"\xc8\x01\n\x07Setting\x12)\n%MULTIVARIATE_RESERVED_MEMORY_IN_BYTES\x10\x00\x12)\n%UNCOMPRESSED_RESERVED_MEMORY_IN_BYTES\x10\x01\x12\'\n#COMPRESSED_RESERVED_MEMORY_IN_BYTES\x10\x02\x12 \n\x1cTRANSFER_BATCH_SIZE_IN_BYTES\x10\x03\x12\x1c\n\x18TRANSFER_TIME_IN_SECONDS\x10\x04\x42\x0c\n\n_new_valueb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'protocol_pb2', _globals) if not _descriptor._USE_C_DESCRIPTORS: DESCRIPTOR._loaded_options = None - _globals['_MANAGERMETADATA']._serialized_start=46 - _globals['_MANAGERMETADATA']._serialized_end=479 - _globals['_MANAGERMETADATA_S3CONFIGURATION']._serialized_start=260 - _globals['_MANAGERMETADATA_S3CONFIGURATION']._serialized_end=366 - _globals['_MANAGERMETADATA_AZURECONFIGURATION']._serialized_start=368 - _globals['_MANAGERMETADATA_AZURECONFIGURATION']._serialized_end=454 - _globals['_NODEMETADATA']._serialized_start=482 - _globals['_NODEMETADATA']._serialized_end=617 - _globals['_NODEMETADATA_SERVERMODE']._serialized_start=584 - _globals['_NODEMETADATA_SERVERMODE']._serialized_end=617 - _globals['_TABLEMETADATA']._serialized_start=620 - _globals['_TABLEMETADATA']._serialized_end=1258 - _globals['_TABLEMETADATA_NORMALTABLEMETADATA']._serialized_start=818 - _globals['_TABLEMETADATA_NORMALTABLEMETADATA']._serialized_end=869 - _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA']._serialized_start=872 - _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA']._serialized_end=1240 - _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND']._serialized_start=1067 - _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND']._serialized_end=1240 - _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND_TYPE']._serialized_start=1192 - _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND_TYPE']._serialized_end=1240 - _globals['_CONFIGURATION']._serialized_start=1261 - _globals['_CONFIGURATION']._serialized_end=1639 - _globals['_UPDATECONFIGURATION']._serialized_start=1642 - _globals['_UPDATECONFIGURATION']._serialized_end=1977 - _globals['_UPDATECONFIGURATION_SETTING']._serialized_start=1763 - _globals['_UPDATECONFIGURATION_SETTING']._serialized_end=1963 + _globals['_TABLEMETADATA']._serialized_start=46 + _globals['_TABLEMETADATA']._serialized_end=684 + _globals['_TABLEMETADATA_NORMALTABLEMETADATA']._serialized_start=244 + _globals['_TABLEMETADATA_NORMALTABLEMETADATA']._serialized_end=295 + _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA']._serialized_start=298 + _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA']._serialized_end=666 + _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND']._serialized_start=493 + _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND']._serialized_end=666 + _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND_TYPE']._serialized_start=618 + _globals['_TABLEMETADATA_TIMESERIESTABLEMETADATA_ERRORBOUND_TYPE']._serialized_end=666 + _globals['_CONFIGURATION']._serialized_start=687 + _globals['_CONFIGURATION']._serialized_end=1065 + _globals['_UPDATECONFIGURATION']._serialized_start=1068 + _globals['_UPDATECONFIGURATION']._serialized_end=1403 + _globals['_UPDATECONFIGURATION_SETTING']._serialized_start=1189 + _globals['_UPDATECONFIGURATION_SETTING']._serialized_end=1389 # @@protoc_insertion_point(module_scope) diff --git a/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.pyi b/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.pyi index e5b4de4..01f2fc5 100644 --- a/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.pyi +++ b/Apache-Arrow-Flight-Tester/protobuf/protocol_pb2.pyi @@ -7,50 +7,6 @@ from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union DESCRIPTOR: _descriptor.FileDescriptor -class ManagerMetadata(_message.Message): - __slots__ = ("key", "s3_configuration", "azure_configuration") - class S3Configuration(_message.Message): - __slots__ = ("endpoint", "bucket_name", "access_key_id", "secret_access_key") - ENDPOINT_FIELD_NUMBER: _ClassVar[int] - BUCKET_NAME_FIELD_NUMBER: _ClassVar[int] - ACCESS_KEY_ID_FIELD_NUMBER: _ClassVar[int] - SECRET_ACCESS_KEY_FIELD_NUMBER: _ClassVar[int] - endpoint: str - bucket_name: str - access_key_id: str - secret_access_key: str - def __init__(self, endpoint: _Optional[str] = ..., bucket_name: _Optional[str] = ..., access_key_id: _Optional[str] = ..., secret_access_key: _Optional[str] = ...) -> None: ... - class AzureConfiguration(_message.Message): - __slots__ = ("account_name", "access_key", "container_name") - ACCOUNT_NAME_FIELD_NUMBER: _ClassVar[int] - ACCESS_KEY_FIELD_NUMBER: _ClassVar[int] - CONTAINER_NAME_FIELD_NUMBER: _ClassVar[int] - account_name: str - access_key: str - container_name: str - def __init__(self, account_name: _Optional[str] = ..., access_key: _Optional[str] = ..., container_name: _Optional[str] = ...) -> None: ... - KEY_FIELD_NUMBER: _ClassVar[int] - S3_CONFIGURATION_FIELD_NUMBER: _ClassVar[int] - AZURE_CONFIGURATION_FIELD_NUMBER: _ClassVar[int] - key: str - s3_configuration: ManagerMetadata.S3Configuration - azure_configuration: ManagerMetadata.AzureConfiguration - def __init__(self, key: _Optional[str] = ..., s3_configuration: _Optional[_Union[ManagerMetadata.S3Configuration, _Mapping]] = ..., azure_configuration: _Optional[_Union[ManagerMetadata.AzureConfiguration, _Mapping]] = ...) -> None: ... - -class NodeMetadata(_message.Message): - __slots__ = ("url", "server_mode") - class ServerMode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): - __slots__ = () - CLOUD: _ClassVar[NodeMetadata.ServerMode] - EDGE: _ClassVar[NodeMetadata.ServerMode] - CLOUD: NodeMetadata.ServerMode - EDGE: NodeMetadata.ServerMode - URL_FIELD_NUMBER: _ClassVar[int] - SERVER_MODE_FIELD_NUMBER: _ClassVar[int] - url: str - server_mode: NodeMetadata.ServerMode - def __init__(self, url: _Optional[str] = ..., server_mode: _Optional[_Union[NodeMetadata.ServerMode, str]] = ...) -> None: ... - class TableMetadata(_message.Message): __slots__ = ("normal_table", "time_series_table") class NormalTableMetadata(_message.Message): From cbd452167a2aa0df6ca6cd2f077fbf2ff56ee0cf Mon Sep 17 00:00:00 2001 From: CGodiksen <36046286+CGodiksen@users.noreply.github.com> Date: Thu, 20 Nov 2025 21:28:11 +0100 Subject: [PATCH 03/12] Move method to do workload balanced query to server class --- Apache-Arrow-Flight-Tester/server.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/Apache-Arrow-Flight-Tester/server.py b/Apache-Arrow-Flight-Tester/server.py index b7c7cfb..3e2897f 100644 --- a/Apache-Arrow-Flight-Tester/server.py +++ b/Apache-Arrow-Flight-Tester/server.py @@ -53,6 +53,22 @@ def ingest_into_server_and_query_table(self, table_name: str, num_rows: int) -> print(f"First five rows of {table_name}:") self.do_get(Ticket(f"SELECT * FROM {table_name} LIMIT 5")) + def workload_balanced_query(self, query: str) -> None: + """ + Retrieve a cloud node that can execute the given query and execute the query on the node. It is assumed that + at least one cloud node is already registered with the manager. + """ + print("Retrieving cloud node that can execute the query...") + query_descriptor = flight.FlightDescriptor.for_command(query) + flight_info = self.flight_client.get_flight_info(query_descriptor) + + endpoint = flight_info.endpoints[0] + cloud_node_url = endpoint.locations[0] + + print(f"Executing query on {cloud_node_url}...") + cloud_client = ModelarDBServerFlightClient(cloud_node_url) + cloud_client.do_get(endpoint.ticket) + def create_record_batch(num_rows: int) -> pyarrow.RecordBatch: """ From 37a20c98ff6282fa12585acc0aec04e2ac7da3c0 Mon Sep 17 00:00:00 2001 From: CGodiksen <36046286+CGodiksen@users.noreply.github.com> Date: Thu, 20 Nov 2025 21:35:15 +0100 Subject: [PATCH 04/12] Remove class for testing manager --- Apache-Arrow-Flight-Tester/manager.py | 61 --------------------------- 1 file changed, 61 deletions(-) delete mode 100644 Apache-Arrow-Flight-Tester/manager.py diff --git a/Apache-Arrow-Flight-Tester/manager.py b/Apache-Arrow-Flight-Tester/manager.py deleted file mode 100644 index ce69227..0000000 --- a/Apache-Arrow-Flight-Tester/manager.py +++ /dev/null @@ -1,61 +0,0 @@ -from pyarrow import flight -from pyarrow._flight import Result - -from common import ModelarDBFlightClient -from protobuf import protocol_pb2 -from server import ModelarDBServerFlightClient - - -class ModelarDBManagerFlightClient(ModelarDBFlightClient): - """Functionality for interacting with a ModelarDB manager using Apache Arrow Flight.""" - - def register_node(self, node_url: str, - node_mode: protocol_pb2.NodeMetadata.ServerMode) -> protocol_pb2.ManagerMetadata: - """Register a node with the given URL and mode in the manager.""" - node_metadata = protocol_pb2.NodeMetadata() - node_metadata.url = node_url - node_metadata.server_mode = node_mode - - response = self.do_action("RegisterNode", node_metadata.SerializeToString()) - - manager_metadata = protocol_pb2.ManagerMetadata() - manager_metadata.ParseFromString(response[0].body.to_pybytes()) - - return manager_metadata - - def remove_node(self, node_url: str) -> list[Result]: - """Remove the node with the given URL from the manager.""" - node_metadata = protocol_pb2.NodeMetadata() - node_metadata.url = node_url - - return self.do_action("RemoveNode", node_metadata.SerializeToString()) - - def query(self, query: str) -> None: - """ - Retrieve a cloud node that can execute the given query and execute the query on the node. It is assumed that - at least one cloud node is already registered with the manager. - """ - print("Retrieving cloud node that can execute the query...") - query_descriptor = flight.FlightDescriptor.for_command(query) - flight_info = self.flight_client.get_flight_info(query_descriptor) - - endpoint = flight_info.endpoints[0] - cloud_node_url = endpoint.locations[0] - - print(f"Executing query on {cloud_node_url}...") - cloud_client = ModelarDBServerFlightClient(cloud_node_url) - cloud_client.do_get(endpoint.ticket) - - -if __name__ == "__main__": - manager_client = ModelarDBManagerFlightClient("grpc://127.0.0.1:9998") - print(f"Node type: {manager_client.node_type()}\n") - - manager_client.create_test_tables_from_metadata() - - print(manager_client.register_node("grpc://127.0.0.1:9999", protocol_pb2.NodeMetadata.ServerMode.EDGE)) - print(manager_client.remove_node("grpc://127.0.0.1:9999")) - - manager_client.query("SELECT * FROM test_time_series_table_1 LIMIT 5") - - manager_client.clean_up_tables([], "drop") From cb40b66a5ab8250b0b3ebcba11a367cad81bae0c Mon Sep 17 00:00:00 2001 From: CGodiksen <36046286+CGodiksen@users.noreply.github.com> Date: Thu, 20 Nov 2025 21:49:23 +0100 Subject: [PATCH 05/12] Move util functions to util file --- Apache-Arrow-Flight-Tester/common.py | 14 +-------- Apache-Arrow-Flight-Tester/server.py | 36 ++------------------- Apache-Arrow-Flight-Tester/util.py | 47 ++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 47 deletions(-) create mode 100644 Apache-Arrow-Flight-Tester/util.py diff --git a/Apache-Arrow-Flight-Tester/common.py b/Apache-Arrow-Flight-Tester/common.py index 3608dbc..d9cdc2d 100644 --- a/Apache-Arrow-Flight-Tester/common.py +++ b/Apache-Arrow-Flight-Tester/common.py @@ -5,6 +5,7 @@ from protobuf import protocol_pb2 from pyarrow import flight, Schema from pyarrow._flight import FlightInfo, ActionType, Result, Ticket +from util import get_time_series_table_schema class ModelarDBFlightClient: @@ -184,16 +185,3 @@ def node_type(self) -> str: """Return the type of the node.""" node_type = self.do_action("NodeType", b"") return node_type[0].body.to_pybytes().decode("utf-8") - - -def get_time_series_table_schema() -> pyarrow.Schema: - """Return a schema for a time series table with one timestamp column, three tag columns, and three field columns.""" - return pyarrow.schema([ - ("location", pyarrow.utf8()), - ("install_year", pyarrow.utf8()), - ("model", pyarrow.utf8()), - ("timestamp", pyarrow.timestamp("us")), - ("power_output", pyarrow.float32()), - ("wind_speed", pyarrow.float32()), - ("temperature", pyarrow.float32()), - ]) diff --git a/Apache-Arrow-Flight-Tester/server.py b/Apache-Arrow-Flight-Tester/server.py index 3e2897f..82a0b26 100644 --- a/Apache-Arrow-Flight-Tester/server.py +++ b/Apache-Arrow-Flight-Tester/server.py @@ -1,11 +1,9 @@ -import time -from random import randrange - import pyarrow from pyarrow import flight from pyarrow._flight import Result, Ticket -from common import ModelarDBFlightClient, get_time_series_table_schema +from common import ModelarDBFlightClient +from util import create_record_batch from protobuf import protocol_pb2 @@ -70,36 +68,6 @@ def workload_balanced_query(self, query: str) -> None: cloud_client.do_get(endpoint.ticket) -def create_record_batch(num_rows: int) -> pyarrow.RecordBatch: - """ - Create a record batch with num_rows rows of randomly generated data for a table with one timestamp column, - three tag columns, and three field columns. - """ - schema = get_time_series_table_schema() - - location = ["aalborg" if i % 2 == 0 else "nibe" for i in range(num_rows)] - install_year = ["2021" if i % 2 == 0 else "2022" for i in range(num_rows)] - model = ["w72" if i % 2 == 0 else "w73" for i in range(num_rows)] - - timestamp = [round(time.time() * 1000000) + (i * 1000000) for i in range(num_rows)] - power_output = [float(randrange(0, 30)) for _ in range(num_rows)] - wind_speed = [float(randrange(50, 100)) for _ in range(num_rows)] - temperature = [float(randrange(0, 40)) for _ in range(num_rows)] - - return pyarrow.RecordBatch.from_arrays( - [ - location, - install_year, - model, - timestamp, - power_output, - wind_speed, - temperature, - ], - schema=schema, - ) - - if __name__ == "__main__": server_client = ModelarDBServerFlightClient("grpc://127.0.0.1:9999") print(f"Node type: {server_client.node_type()}\n") diff --git a/Apache-Arrow-Flight-Tester/util.py b/Apache-Arrow-Flight-Tester/util.py new file mode 100644 index 0000000..7e7105c --- /dev/null +++ b/Apache-Arrow-Flight-Tester/util.py @@ -0,0 +1,47 @@ +import time +from random import randrange + +import pyarrow + + +def create_record_batch(num_rows: int) -> pyarrow.RecordBatch: + """ + Create a record batch with num_rows rows of randomly generated data for a table with one timestamp column, + three tag columns, and three field columns. + """ + schema = get_time_series_table_schema() + + location = ["aalborg" if i % 2 == 0 else "nibe" for i in range(num_rows)] + install_year = ["2021" if i % 2 == 0 else "2022" for i in range(num_rows)] + model = ["w72" if i % 2 == 0 else "w73" for i in range(num_rows)] + + timestamp = [round(time.time() * 1000000) + (i * 1000000) for i in range(num_rows)] + power_output = [float(randrange(0, 30)) for _ in range(num_rows)] + wind_speed = [float(randrange(50, 100)) for _ in range(num_rows)] + temperature = [float(randrange(0, 40)) for _ in range(num_rows)] + + return pyarrow.RecordBatch.from_arrays( + [ + location, + install_year, + model, + timestamp, + power_output, + wind_speed, + temperature, + ], + schema=schema, + ) + + +def get_time_series_table_schema() -> pyarrow.Schema: + """Return a schema for a time series table with one timestamp column, three tag columns, and three field columns.""" + return pyarrow.schema([ + ("location", pyarrow.utf8()), + ("install_year", pyarrow.utf8()), + ("model", pyarrow.utf8()), + ("timestamp", pyarrow.timestamp("us")), + ("power_output", pyarrow.float32()), + ("wind_speed", pyarrow.float32()), + ("temperature", pyarrow.float32()), + ]) From d2e18b95d2566c39ea958f067b0fbba71a1172b5 Mon Sep 17 00:00:00 2001 From: CGodiksen <36046286+CGodiksen@users.noreply.github.com> Date: Thu, 20 Nov 2025 21:57:05 +0100 Subject: [PATCH 06/12] Move more util functions to util file --- Apache-Arrow-Flight-Tester/common.py | 80 --------------------- Apache-Arrow-Flight-Tester/server.py | 25 ++----- Apache-Arrow-Flight-Tester/util.py | 104 +++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 100 deletions(-) diff --git a/Apache-Arrow-Flight-Tester/common.py b/Apache-Arrow-Flight-Tester/common.py index d9cdc2d..6a55958 100644 --- a/Apache-Arrow-Flight-Tester/common.py +++ b/Apache-Arrow-Flight-Tester/common.py @@ -1,11 +1,9 @@ import pyarrow import pprint -from typing import Literal from protobuf import protocol_pb2 from pyarrow import flight, Schema from pyarrow._flight import FlightInfo, ActionType, Result, Ticket -from util import get_time_series_table_schema class ModelarDBFlightClient: @@ -64,36 +62,6 @@ def create_table(self, table_name: str, columns: list[tuple[str, str]], time_ser self.do_get(Ticket(sql)) - def create_test_tables(self) -> None: - """ - Create a normal table and a time series table using the flight client, print the current tables to ensure the - created tables are included, and print the schema to ensure the tables are created correctly. - """ - print("Creating test tables...") - - self.create_table( - "test_table_1", - [("timestamp", "TIMESTAMP"), ("values", "REAL"), ("metadata", "REAL")], - ) - self.create_table( - "test_time_series_table_1", - [ - ("location", "TAG"), - ("install_year", "TAG"), - ("model", "TAG"), - ("timestamp", "TIMESTAMP"), - ("power_output", "FIELD"), - ("wind_speed", "FIELD"), - ("temperature", "FIELD(5%)"), - ], - time_series_table=True, - ) - - print("\nCurrent tables:") - for table_name in self.list_table_names(): - print(f"{table_name}:") - print(f"{self.get_schema(table_name)}\n") - def create_normal_table_from_metadata(self, table_name: str, schema: pyarrow.Schema) -> None: """Create a normal table using the table name and schema.""" normal_table_metadata = protocol_pb2.TableMetadata.NormalTableMetadata() @@ -121,37 +89,6 @@ def create_time_series_table_from_metadata(self, table_name: str, schema: pyarro self.do_action("CreateTable", table_metadata.SerializeToString()) - def create_test_tables_from_metadata(self): - """ - Create a normal table and a time series table using the CreateTable action, print the current tables to ensure - the created tables are included, and print the schema to ensure the tables are created correctly. - """ - print("Creating test tables from metadata...") - - normal_table_schema = pyarrow.schema([ - ("timestamp", pyarrow.timestamp("us")), - ("values", pyarrow.float32()), - ("metadata", pyarrow.utf8()) - ]) - - self.create_normal_table_from_metadata("test_table_1", normal_table_schema) - - time_series_table_schema = get_time_series_table_schema() - - lossless = protocol_pb2.TableMetadata.TimeSeriesTableMetadata.ErrorBound.Type.LOSSLESS - error_bounds = [protocol_pb2.TableMetadata.TimeSeriesTableMetadata.ErrorBound(value=0, type=lossless) - for _ in range(len(time_series_table_schema))] - - generated_column_expressions = [b'' for _ in range(len(time_series_table_schema))] - - self.create_time_series_table_from_metadata("test_time_series_table_1", time_series_table_schema, - error_bounds, generated_column_expressions) - - print("\nCurrent tables:") - for table_name in self.list_table_names(): - print(f"{table_name}:") - print(f"{self.get_schema(table_name)}\n") - def drop_table(self, table_name: str) -> None: """Drop the table with the given name from the server or manager.""" self.do_get(Ticket(f"DROP TABLE {table_name}")) @@ -160,23 +97,6 @@ def truncate_table(self, table_name: str) -> None: """Truncate the table with the given name in the server or manager.""" self.do_get(Ticket(f"TRUNCATE TABLE {table_name}")) - def clean_up_tables(self, tables: list[str], operation: Literal["drop", "truncate"]) -> None: - """ - Clean up the given tables by either dropping them or truncating them. If no tables are given, all tables - are dropped or truncated. - """ - if len(tables) == 0: - tables = self.list_table_names() - - print(f"Cleaning up {', '.join(tables)} using {operation}...") - - for table_name in tables: - ( - self.drop_table(table_name) - if operation == "drop" - else self.truncate_table(table_name) - ) - def vacuum(self, table_names: list[str]) -> None: """Vacuum the given tables in the server or manager.""" self.do_get(Ticket(f"VACUUM {', '.join(table_names)}")) diff --git a/Apache-Arrow-Flight-Tester/server.py b/Apache-Arrow-Flight-Tester/server.py index 82a0b26..55bbf1e 100644 --- a/Apache-Arrow-Flight-Tester/server.py +++ b/Apache-Arrow-Flight-Tester/server.py @@ -1,10 +1,10 @@ import pyarrow from pyarrow import flight -from pyarrow._flight import Result, Ticket +from pyarrow._flight import Result from common import ModelarDBFlightClient -from util import create_record_batch from protobuf import protocol_pb2 +from util import ingest_into_server_and_query_table, create_test_tables, clean_up_tables class ModelarDBServerFlightClient(ModelarDBFlightClient): @@ -36,21 +36,6 @@ def update_configuration(self, setting: protocol_pb2.UpdateConfiguration.Setting return self.do_action("UpdateConfiguration", update_configuration.SerializeToString()) - def ingest_into_server_and_query_table(self, table_name: str, num_rows: int) -> None: - """ - Ingest num_rows rows into the table, flush the memory of the server, and query the first five rows of the table. - """ - record_batch = create_record_batch(num_rows) - - print(f"Ingesting data into {table_name}...\n") - self.do_put(table_name, record_batch) - - print("Flushing memory of the edge...\n") - self.do_action("FlushMemory", b"") - - print(f"First five rows of {table_name}:") - self.do_get(Ticket(f"SELECT * FROM {table_name} LIMIT 5")) - def workload_balanced_query(self, query: str) -> None: """ Retrieve a cloud node that can execute the given query and execute the query on the node. It is assumed that @@ -72,12 +57,12 @@ def workload_balanced_query(self, query: str) -> None: server_client = ModelarDBServerFlightClient("grpc://127.0.0.1:9999") print(f"Node type: {server_client.node_type()}\n") - server_client.create_test_tables() - server_client.ingest_into_server_and_query_table("test_time_series_table_1", 10000) + create_test_tables(server_client) + ingest_into_server_and_query_table(server_client, "test_time_series_table_1", 10000) print("\nCurrent configuration:") server_client.update_configuration(protocol_pb2.UpdateConfiguration.Setting.COMPRESSED_RESERVED_MEMORY_IN_BYTES, 10000000) print(server_client.get_configuration()) - server_client.clean_up_tables([], "drop") + clean_up_tables(server_client, [], "drop") diff --git a/Apache-Arrow-Flight-Tester/util.py b/Apache-Arrow-Flight-Tester/util.py index 7e7105c..5c2bbd9 100644 --- a/Apache-Arrow-Flight-Tester/util.py +++ b/Apache-Arrow-Flight-Tester/util.py @@ -1,7 +1,12 @@ import time from random import randrange +from typing import Literal import pyarrow +from protobuf import protocol_pb2 +from pyarrow._flight import Ticket + +from server import ModelarDBServerFlightClient def create_record_batch(num_rows: int) -> pyarrow.RecordBatch: @@ -45,3 +50,102 @@ def get_time_series_table_schema() -> pyarrow.Schema: ("wind_speed", pyarrow.float32()), ("temperature", pyarrow.float32()), ]) + + +def create_test_tables(server_client: ModelarDBServerFlightClient) -> None: + """ + Create a normal table and a time series table using the flight client, print the current tables to ensure the + created tables are included, and print the schema to ensure the tables are created correctly. + """ + print("Creating test tables...") + + server_client.create_table( + "test_table_1", + [("timestamp", "TIMESTAMP"), ("values", "REAL"), ("metadata", "REAL")], + ) + server_client.create_table( + "test_time_series_table_1", + [ + ("location", "TAG"), + ("install_year", "TAG"), + ("model", "TAG"), + ("timestamp", "TIMESTAMP"), + ("power_output", "FIELD"), + ("wind_speed", "FIELD"), + ("temperature", "FIELD(5%)"), + ], + time_series_table=True, + ) + + print("\nCurrent tables:") + for table_name in server_client.list_table_names(): + print(f"{table_name}:") + print(f"{server_client.get_schema(table_name)}\n") + + +def create_test_tables_from_metadata(server_client: ModelarDBServerFlightClient): + """ + Create a normal table and a time series table using the CreateTable action, print the current tables to ensure + the created tables are included, and print the schema to ensure the tables are created correctly. + """ + print("Creating test tables from metadata...") + + normal_table_schema = pyarrow.schema([ + ("timestamp", pyarrow.timestamp("us")), + ("values", pyarrow.float32()), + ("metadata", pyarrow.utf8()) + ]) + + server_client.create_normal_table_from_metadata("test_table_1", normal_table_schema) + + time_series_table_schema = get_time_series_table_schema() + + lossless = protocol_pb2.TableMetadata.TimeSeriesTableMetadata.ErrorBound.Type.LOSSLESS + error_bounds = [protocol_pb2.TableMetadata.TimeSeriesTableMetadata.ErrorBound(value=0, type=lossless) + for _ in range(len(time_series_table_schema))] + + generated_column_expressions = [b'' for _ in range(len(time_series_table_schema))] + + server_client.create_time_series_table_from_metadata("test_time_series_table_1", time_series_table_schema, + error_bounds, generated_column_expressions) + + print("\nCurrent tables:") + for table_name in server_client.list_table_names(): + print(f"{table_name}:") + print(f"{server_client.get_schema(table_name)}\n") + + +def ingest_into_server_and_query_table(server_client: ModelarDBServerFlightClient, table_name: str, + num_rows: int) -> None: + """ + Ingest num_rows rows into the table, flush the memory of the server, and query the first five rows of the table. + """ + record_batch = create_record_batch(num_rows) + + print(f"Ingesting data into {table_name}...\n") + server_client.do_put(table_name, record_batch) + + print("Flushing memory of the edge...\n") + server_client.do_action("FlushMemory", b"") + + print(f"First five rows of {table_name}:") + server_client.do_get(Ticket(f"SELECT * FROM {table_name} LIMIT 5")) + + +def clean_up_tables(server_client: ModelarDBServerFlightClient, tables: list[str], + operation: Literal["drop", "truncate"]) -> None: + """ + Clean up the given tables by either dropping them or truncating them. If no tables are given, all tables + are dropped or truncated. + """ + if len(tables) == 0: + tables = server_client.list_table_names() + + print(f"Cleaning up {', '.join(tables)} using {operation}...") + + for table_name in tables: + ( + server_client.drop_table(table_name) + if operation == "drop" + else server_client.truncate_table(table_name) + ) From af9076d10c0163d1368a72c88132540a44160ffa Mon Sep 17 00:00:00 2001 From: CGodiksen <36046286+CGodiksen@users.noreply.github.com> Date: Thu, 20 Nov 2025 22:02:16 +0100 Subject: [PATCH 07/12] Make ModelarDBFlightClient into a clean wrapper for FlightClient base endpoints --- Apache-Arrow-Flight-Tester/common.py | 107 -------------------------- Apache-Arrow-Flight-Tester/wrapper.py | 53 +++++++++++++ 2 files changed, 53 insertions(+), 107 deletions(-) delete mode 100644 Apache-Arrow-Flight-Tester/common.py create mode 100644 Apache-Arrow-Flight-Tester/wrapper.py diff --git a/Apache-Arrow-Flight-Tester/common.py b/Apache-Arrow-Flight-Tester/common.py deleted file mode 100644 index 6a55958..0000000 --- a/Apache-Arrow-Flight-Tester/common.py +++ /dev/null @@ -1,107 +0,0 @@ -import pyarrow -import pprint - -from protobuf import protocol_pb2 -from pyarrow import flight, Schema -from pyarrow._flight import FlightInfo, ActionType, Result, Ticket - - -class ModelarDBFlightClient: - """Common functionality for interacting with server and manager ModelarDB instances using Apache Arrow Flight.""" - - def __init__(self, location: str): - self.flight_client = flight.FlightClient(location) - - def list_flights(self) -> list[FlightInfo]: - """Wrapper around the list_flights method of the FlightClient class.""" - response = self.flight_client.list_flights() - - return list(response) - - def get_schema(self, table_name: str) -> Schema: - """Wrapper around the get_schema method of the FlightClient class.""" - upload_descriptor = flight.FlightDescriptor.for_path(table_name) - response = self.flight_client.get_schema(upload_descriptor) - - return response.schema - - def do_get(self, ticket: Ticket) -> None: - """Wrapper around the do_get method of the FlightClient class.""" - response = self.flight_client.do_get(ticket) - - for batch in response: - pprint.pprint(batch.data.to_pydict()) - - def do_action(self, action_type: str, action_body: bytes) -> list[Result]: - """Wrapper around the do_action method of the FlightClient class.""" - action = flight.Action(action_type, action_body) - response = self.flight_client.do_action(action) - - return list(response) - - def list_actions(self) -> list[ActionType]: - """Wrapper around the list_actions method of the FlightClient class.""" - response = self.flight_client.list_actions() - - return list(response) - - def list_table_names(self) -> list[str]: - """Return the names of the tables in the server or manager.""" - flights = self.list_flights() - return [table_name.decode("utf-8") for table_name in flights[0].descriptor.path] - - def create_table(self, table_name: str, columns: list[tuple[str, str]], time_series_table=False) -> None: - """ - Create a table in the server or manager with the given name and columns. Each pair in columns should have the - format (column_name, column_type). - """ - create_table = ( - "CREATE TIME SERIES TABLE" if time_series_table else "CREATE TABLE" - ) - sql = f"{create_table} {table_name}({', '.join([f'{column[0]} {column[1]}' for column in columns])})" - - self.do_get(Ticket(sql)) - - def create_normal_table_from_metadata(self, table_name: str, schema: pyarrow.Schema) -> None: - """Create a normal table using the table name and schema.""" - normal_table_metadata = protocol_pb2.TableMetadata.NormalTableMetadata() - - normal_table_metadata.name = table_name - normal_table_metadata.schema = schema.serialize().to_pybytes() - - table_metadata = protocol_pb2.TableMetadata() - table_metadata.normal_table.CopyFrom(normal_table_metadata) - - self.do_action("CreateTable", table_metadata.SerializeToString()) - - def create_time_series_table_from_metadata(self, table_name: str, schema: pyarrow.Schema, error_bounds: list[ - protocol_pb2.TableMetadata.TimeSeriesTableMetadata.ErrorBound], generated_columns: list[bytes]) -> None: - """Create a time series table using the table name, schema, error bounds, and generated columns.""" - time_series_table_metadata = protocol_pb2.TableMetadata.TimeSeriesTableMetadata() - - time_series_table_metadata.name = table_name - time_series_table_metadata.schema = schema.serialize().to_pybytes() - time_series_table_metadata.error_bounds.extend(error_bounds) - time_series_table_metadata.generated_column_expressions.extend(generated_columns) - - table_metadata = protocol_pb2.TableMetadata() - table_metadata.time_series_table.CopyFrom(time_series_table_metadata) - - self.do_action("CreateTable", table_metadata.SerializeToString()) - - def drop_table(self, table_name: str) -> None: - """Drop the table with the given name from the server or manager.""" - self.do_get(Ticket(f"DROP TABLE {table_name}")) - - def truncate_table(self, table_name: str) -> None: - """Truncate the table with the given name in the server or manager.""" - self.do_get(Ticket(f"TRUNCATE TABLE {table_name}")) - - def vacuum(self, table_names: list[str]) -> None: - """Vacuum the given tables in the server or manager.""" - self.do_get(Ticket(f"VACUUM {', '.join(table_names)}")) - - def node_type(self) -> str: - """Return the type of the node.""" - node_type = self.do_action("NodeType", b"") - return node_type[0].body.to_pybytes().decode("utf-8") diff --git a/Apache-Arrow-Flight-Tester/wrapper.py b/Apache-Arrow-Flight-Tester/wrapper.py new file mode 100644 index 0000000..c8f2142 --- /dev/null +++ b/Apache-Arrow-Flight-Tester/wrapper.py @@ -0,0 +1,53 @@ +import pyarrow +import pprint + +from pyarrow import flight, Schema +from pyarrow._flight import FlightInfo, ActionType, Result, Ticket + + +class FlightClientWrapper: + """Wrapper around the FlightClient class to simplify interaction with an Apache Arrow Flight server.""" + + def __init__(self, location: str): + self.flight_client = flight.FlightClient(location) + + def list_flights(self) -> list[FlightInfo]: + """Wrapper around the list_flights method of the FlightClient class.""" + response = self.flight_client.list_flights() + + return list(response) + + def get_schema(self, table_name: str) -> Schema: + """Wrapper around the get_schema method of the FlightClient class.""" + upload_descriptor = flight.FlightDescriptor.for_path(table_name) + response = self.flight_client.get_schema(upload_descriptor) + + return response.schema + + def do_get(self, ticket: Ticket) -> None: + """Wrapper around the do_get method of the FlightClient class.""" + response = self.flight_client.do_get(ticket) + + for batch in response: + pprint.pprint(batch.data.to_pydict()) + + def do_put(self, table_name: str, record_batch: pyarrow.RecordBatch) -> None: + """Wrapper around the do_put method of the FlightClient class.""" + upload_descriptor = flight.FlightDescriptor.for_path(table_name) + writer, _ = self.flight_client.do_put(upload_descriptor, record_batch.schema) + + writer.write(record_batch) + writer.close() + + def do_action(self, action_type: str, action_body: bytes) -> list[Result]: + """Wrapper around the do_action method of the FlightClient class.""" + action = flight.Action(action_type, action_body) + response = self.flight_client.do_action(action) + + return list(response) + + def list_actions(self) -> list[ActionType]: + """Wrapper around the list_actions method of the FlightClient class.""" + response = self.flight_client.list_actions() + + return list(response) From 7ad2c9c039ba1dfd0382c36ade6ebfa38a558b3a Mon Sep 17 00:00:00 2001 From: CGodiksen <36046286+CGodiksen@users.noreply.github.com> Date: Thu, 20 Nov 2025 22:04:40 +0100 Subject: [PATCH 08/12] Make ModelarDBServerFlightClient only have top-level ModelarDB functionality --- Apache-Arrow-Flight-Tester/server.py | 101 ++++++++++++++++++++------- Apache-Arrow-Flight-Tester/util.py | 2 +- 2 files changed, 78 insertions(+), 25 deletions(-) diff --git a/Apache-Arrow-Flight-Tester/server.py b/Apache-Arrow-Flight-Tester/server.py index 55bbf1e..e8e11ef 100644 --- a/Apache-Arrow-Flight-Tester/server.py +++ b/Apache-Arrow-Flight-Tester/server.py @@ -1,22 +1,86 @@ import pyarrow from pyarrow import flight -from pyarrow._flight import Result +from pyarrow._flight import Result, Ticket -from common import ModelarDBFlightClient +from wrapper import FlightClientWrapper from protobuf import protocol_pb2 from util import ingest_into_server_and_query_table, create_test_tables, clean_up_tables -class ModelarDBServerFlightClient(ModelarDBFlightClient): +class ModelarDBServerFlightClient(FlightClientWrapper): """Functionality for interacting with a ModelarDB server using Apache Arrow Flight.""" - def do_put(self, table_name: str, record_batch: pyarrow.RecordBatch) -> None: - """Insert the data in the given record batch into the table with the given table name.""" - upload_descriptor = flight.FlightDescriptor.for_path(table_name) - writer, _ = self.flight_client.do_put(upload_descriptor, record_batch.schema) + def list_table_names(self) -> list[str]: + """Return the names of the tables in the server or manager.""" + flights = self.list_flights() + return [table_name.decode("utf-8") for table_name in flights[0].descriptor.path] - writer.write(record_batch) - writer.close() + def workload_balanced_query(self, query: str) -> None: + """ + Retrieve a cloud node that can execute the given query and execute the query on the node. It is assumed that + at least one cloud node is already registered with the manager. + """ + print("Retrieving cloud node that can execute the query...") + query_descriptor = flight.FlightDescriptor.for_command(query) + flight_info = self.flight_client.get_flight_info(query_descriptor) + + endpoint = flight_info.endpoints[0] + cloud_node_url = endpoint.locations[0] + + print(f"Executing query on {cloud_node_url}...") + cloud_client = ModelarDBServerFlightClient(cloud_node_url) + cloud_client.do_get(endpoint.ticket) + + def create_table(self, table_name: str, columns: list[tuple[str, str]], time_series_table=False) -> None: + """ + Create a table in the server or manager with the given name and columns. Each pair in columns should have the + format (column_name, column_type). + """ + create_table = ( + "CREATE TIME SERIES TABLE" if time_series_table else "CREATE TABLE" + ) + sql = f"{create_table} {table_name}({', '.join([f'{column[0]} {column[1]}' for column in columns])})" + + self.do_get(Ticket(sql)) + + def drop_table(self, table_name: str) -> None: + """Drop the table with the given name from the server or manager.""" + self.do_get(Ticket(f"DROP TABLE {table_name}")) + + def truncate(self, table_name: str) -> None: + """Truncate the table with the given name in the server or manager.""" + self.do_get(Ticket(f"TRUNCATE {table_name}")) + + def vacuum(self, table_names: list[str]) -> None: + """Vacuum the given tables in the server or manager.""" + self.do_get(Ticket(f"VACUUM {', '.join(table_names)}")) + + def create_normal_table_from_metadata(self, table_name: str, schema: pyarrow.Schema) -> None: + """Create a normal table using the table name and schema.""" + normal_table_metadata = protocol_pb2.TableMetadata.NormalTableMetadata() + + normal_table_metadata.name = table_name + normal_table_metadata.schema = schema.serialize().to_pybytes() + + table_metadata = protocol_pb2.TableMetadata() + table_metadata.normal_table.CopyFrom(normal_table_metadata) + + self.do_action("CreateTable", table_metadata.SerializeToString()) + + def create_time_series_table_from_metadata(self, table_name: str, schema: pyarrow.Schema, error_bounds: list[ + protocol_pb2.TableMetadata.TimeSeriesTableMetadata.ErrorBound], generated_columns: list[bytes]) -> None: + """Create a time series table using the table name, schema, error bounds, and generated columns.""" + time_series_table_metadata = protocol_pb2.TableMetadata.TimeSeriesTableMetadata() + + time_series_table_metadata.name = table_name + time_series_table_metadata.schema = schema.serialize().to_pybytes() + time_series_table_metadata.error_bounds.extend(error_bounds) + time_series_table_metadata.generated_column_expressions.extend(generated_columns) + + table_metadata = protocol_pb2.TableMetadata() + table_metadata.time_series_table.CopyFrom(time_series_table_metadata) + + self.do_action("CreateTable", table_metadata.SerializeToString()) def get_configuration(self) -> protocol_pb2.Configuration: """Get the current configuration of the server.""" @@ -36,21 +100,10 @@ def update_configuration(self, setting: protocol_pb2.UpdateConfiguration.Setting return self.do_action("UpdateConfiguration", update_configuration.SerializeToString()) - def workload_balanced_query(self, query: str) -> None: - """ - Retrieve a cloud node that can execute the given query and execute the query on the node. It is assumed that - at least one cloud node is already registered with the manager. - """ - print("Retrieving cloud node that can execute the query...") - query_descriptor = flight.FlightDescriptor.for_command(query) - flight_info = self.flight_client.get_flight_info(query_descriptor) - - endpoint = flight_info.endpoints[0] - cloud_node_url = endpoint.locations[0] - - print(f"Executing query on {cloud_node_url}...") - cloud_client = ModelarDBServerFlightClient(cloud_node_url) - cloud_client.do_get(endpoint.ticket) + def node_type(self) -> str: + """Return the type of the node.""" + node_type = self.do_action("NodeType", b"") + return node_type[0].body.to_pybytes().decode("utf-8") if __name__ == "__main__": diff --git a/Apache-Arrow-Flight-Tester/util.py b/Apache-Arrow-Flight-Tester/util.py index 5c2bbd9..fcda010 100644 --- a/Apache-Arrow-Flight-Tester/util.py +++ b/Apache-Arrow-Flight-Tester/util.py @@ -147,5 +147,5 @@ def clean_up_tables(server_client: ModelarDBServerFlightClient, tables: list[str ( server_client.drop_table(table_name) if operation == "drop" - else server_client.truncate_table(table_name) + else server_client.truncate(table_name) ) From 18818a49946c18f9c322b25daf458e0f0ce929c9 Mon Sep 17 00:00:00 2001 From: CGodiksen <36046286+CGodiksen@users.noreply.github.com> Date: Thu, 20 Nov 2025 22:07:40 +0100 Subject: [PATCH 09/12] Remove all mentions of the manager --- Apache-Arrow-Flight-Tester/server.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Apache-Arrow-Flight-Tester/server.py b/Apache-Arrow-Flight-Tester/server.py index e8e11ef..d4161f0 100644 --- a/Apache-Arrow-Flight-Tester/server.py +++ b/Apache-Arrow-Flight-Tester/server.py @@ -11,14 +11,14 @@ class ModelarDBServerFlightClient(FlightClientWrapper): """Functionality for interacting with a ModelarDB server using Apache Arrow Flight.""" def list_table_names(self) -> list[str]: - """Return the names of the tables in the server or manager.""" + """Return the names of the tables in the server.""" flights = self.list_flights() return [table_name.decode("utf-8") for table_name in flights[0].descriptor.path] def workload_balanced_query(self, query: str) -> None: """ Retrieve a cloud node that can execute the given query and execute the query on the node. It is assumed that - at least one cloud node is already registered with the manager. + the cluster has at least one cloud node. """ print("Retrieving cloud node that can execute the query...") query_descriptor = flight.FlightDescriptor.for_command(query) @@ -33,7 +33,7 @@ def workload_balanced_query(self, query: str) -> None: def create_table(self, table_name: str, columns: list[tuple[str, str]], time_series_table=False) -> None: """ - Create a table in the server or manager with the given name and columns. Each pair in columns should have the + Create a table in the server with the given name and columns. Each pair in columns should have the format (column_name, column_type). """ create_table = ( @@ -44,15 +44,15 @@ def create_table(self, table_name: str, columns: list[tuple[str, str]], time_ser self.do_get(Ticket(sql)) def drop_table(self, table_name: str) -> None: - """Drop the table with the given name from the server or manager.""" + """Drop the table with the given name from the server.""" self.do_get(Ticket(f"DROP TABLE {table_name}")) def truncate(self, table_name: str) -> None: - """Truncate the table with the given name in the server or manager.""" + """Truncate the table with the given name in the server.""" self.do_get(Ticket(f"TRUNCATE {table_name}")) def vacuum(self, table_names: list[str]) -> None: - """Vacuum the given tables in the server or manager.""" + """Vacuum the given tables in the server.""" self.do_get(Ticket(f"VACUUM {', '.join(table_names)}")) def create_normal_table_from_metadata(self, table_name: str, schema: pyarrow.Schema) -> None: From bee729f7a36ccc7090cb15ab11c6cb623a43b6d3 Mon Sep 17 00:00:00 2001 From: CGodiksen <36046286+CGodiksen@users.noreply.github.com> Date: Thu, 27 Nov 2025 20:26:29 +0100 Subject: [PATCH 10/12] Fix circular dependency issue --- Apache-Arrow-Flight-Tester/server.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Apache-Arrow-Flight-Tester/server.py b/Apache-Arrow-Flight-Tester/server.py index d4161f0..d555ca9 100644 --- a/Apache-Arrow-Flight-Tester/server.py +++ b/Apache-Arrow-Flight-Tester/server.py @@ -2,9 +2,9 @@ from pyarrow import flight from pyarrow._flight import Result, Ticket +import util from wrapper import FlightClientWrapper from protobuf import protocol_pb2 -from util import ingest_into_server_and_query_table, create_test_tables, clean_up_tables class ModelarDBServerFlightClient(FlightClientWrapper): @@ -110,12 +110,12 @@ def node_type(self) -> str: server_client = ModelarDBServerFlightClient("grpc://127.0.0.1:9999") print(f"Node type: {server_client.node_type()}\n") - create_test_tables(server_client) - ingest_into_server_and_query_table(server_client, "test_time_series_table_1", 10000) + util.create_test_tables(server_client) + util.ingest_into_server_and_query_table(server_client, "test_time_series_table_1", 10000) print("\nCurrent configuration:") server_client.update_configuration(protocol_pb2.UpdateConfiguration.Setting.COMPRESSED_RESERVED_MEMORY_IN_BYTES, 10000000) print(server_client.get_configuration()) - clean_up_tables(server_client, [], "drop") + util.clean_up_tables(server_client, [], "drop") From ea27a78bab444963e8dc117567618961205c4f3c Mon Sep 17 00:00:00 2001 From: CGodiksen <36046286+CGodiksen@users.noreply.github.com> Date: Fri, 28 Nov 2025 10:23:01 +0100 Subject: [PATCH 11/12] Refactor method to drop and truncate to work all multiple tables --- Apache-Arrow-Flight-Tester/server.py | 14 +++++++------- Apache-Arrow-Flight-Tester/util.py | 10 ++++------ 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/Apache-Arrow-Flight-Tester/server.py b/Apache-Arrow-Flight-Tester/server.py index d555ca9..035520e 100644 --- a/Apache-Arrow-Flight-Tester/server.py +++ b/Apache-Arrow-Flight-Tester/server.py @@ -43,15 +43,15 @@ def create_table(self, table_name: str, columns: list[tuple[str, str]], time_ser self.do_get(Ticket(sql)) - def drop_table(self, table_name: str) -> None: - """Drop the table with the given name from the server.""" - self.do_get(Ticket(f"DROP TABLE {table_name}")) + def drop_tables(self, table_names: list[str]) -> None: + """Drop the given tables in the server.""" + self.do_get(Ticket(f"DROP TABLE {', '.join(table_names)}")) - def truncate(self, table_name: str) -> None: - """Truncate the table with the given name in the server.""" - self.do_get(Ticket(f"TRUNCATE {table_name}")) + def truncate_tables(self, table_names: list[str]) -> None: + """Truncate the given tables in the server.""" + self.do_get(Ticket(f"TRUNCATE {', '.join(table_names)}")) - def vacuum(self, table_names: list[str]) -> None: + def vacuum_tables(self, table_names: list[str]) -> None: """Vacuum the given tables in the server.""" self.do_get(Ticket(f"VACUUM {', '.join(table_names)}")) diff --git a/Apache-Arrow-Flight-Tester/util.py b/Apache-Arrow-Flight-Tester/util.py index fcda010..c3c907d 100644 --- a/Apache-Arrow-Flight-Tester/util.py +++ b/Apache-Arrow-Flight-Tester/util.py @@ -143,9 +143,7 @@ def clean_up_tables(server_client: ModelarDBServerFlightClient, tables: list[str print(f"Cleaning up {', '.join(tables)} using {operation}...") - for table_name in tables: - ( - server_client.drop_table(table_name) - if operation == "drop" - else server_client.truncate(table_name) - ) + if operation == "drop": + server_client.drop_tables(tables) + else: + server_client.truncate_tables(tables) From 078da328068a4a4292c87b5399d43d00ec29d547 Mon Sep 17 00:00:00 2001 From: CGodiksen <36046286+CGodiksen@users.noreply.github.com> Date: Fri, 28 Nov 2025 10:39:07 +0100 Subject: [PATCH 12/12] Add missing type and fix log message --- Apache-Arrow-Flight-Tester/util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Apache-Arrow-Flight-Tester/util.py b/Apache-Arrow-Flight-Tester/util.py index c3c907d..758562a 100644 --- a/Apache-Arrow-Flight-Tester/util.py +++ b/Apache-Arrow-Flight-Tester/util.py @@ -83,7 +83,7 @@ def create_test_tables(server_client: ModelarDBServerFlightClient) -> None: print(f"{server_client.get_schema(table_name)}\n") -def create_test_tables_from_metadata(server_client: ModelarDBServerFlightClient): +def create_test_tables_from_metadata(server_client: ModelarDBServerFlightClient) -> None: """ Create a normal table and a time series table using the CreateTable action, print the current tables to ensure the created tables are included, and print the schema to ensure the tables are created correctly. @@ -125,7 +125,7 @@ def ingest_into_server_and_query_table(server_client: ModelarDBServerFlightClien print(f"Ingesting data into {table_name}...\n") server_client.do_put(table_name, record_batch) - print("Flushing memory of the edge...\n") + print("Flushing memory of the node...\n") server_client.do_action("FlushMemory", b"") print(f"First five rows of {table_name}:")