-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmain.py
More file actions
392 lines (317 loc) · 13 KB
/
main.py
File metadata and controls
392 lines (317 loc) · 13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
"""Script for validating ModelarDB's compression with different data sets."""
import sys
import math
import time
import signal
import tempfile
import subprocess
import collections
import numpy
from pyarrow import parquet
from pyarrow import flight
# Configuration.
MODELARDB_REPOSITORY = "https://github.com/ModelarData/ModelarDB-RS.git"
UTILITIES_REPOSITORY = "https://github.com/ModelarData/Utilities.git"
TABLE_NAME = "evaluate"
STDOUT = subprocess.PIPE
STDERR = subprocess.PIPE
# Helper Functions.
def extract_repository_name(url):
# The plus operator is used instead of an fstring as it was more readable.
return url[url.rfind("/") + 1 : url.rfind(".")] + "/"
def git_clone(url):
subprocess.run(["git", "clone", url], stdout=STDOUT, stderr=STDERR)
def cargo_build_release(modelardb_folder):
process = subprocess.run(
["cargo", "build", "--release"],
cwd=modelardb_folder,
stdout=STDOUT,
stderr=STDERR,
)
return process.returncode == 0
def start_modelardbd(modelardb_folder, data_folder):
process = subprocess.Popen(
["target/release/modelardbd", data_folder],
cwd=modelardb_folder,
stdout=STDOUT,
stderr=STDERR,
)
# Ensure process is fully started.
while not b"Starting Apache Arrow Flight on" in process.stdout.readline():
time.sleep(1)
return process
def errors_occurred(output_stream):
normalized = output_stream.lower()
return b"error" in normalized or b"panicked" in normalized
def print_stream(output_stream):
print()
print(output_stream.decode("utf-8"))
def ingest_test_data(utilities_loader, test_data, error_bound_str):
process = subprocess.run(
[
"python3",
utilities_loader,
"127.0.0.1:9999",
TABLE_NAME,
test_data,
error_bound_str,
],
stdout=STDOUT,
stderr=STDERR,
)
if errors_occurred(process.stderr):
print_stream(process.stderr)
return True
def retrieve_schema(flight_client):
flight_descriptor = flight.FlightDescriptor.for_path(TABLE_NAME)
schema_result = flight_client.get_schema(flight_descriptor)
return schema_result.schema
def retrieve_ingested_columns(flight_client, timestamp_column_name, field_column_name):
ticket = flight.Ticket(
(
f"SELECT {timestamp_column_name}, {field_column_name} "
f"FROM {TABLE_NAME} ORDER BY {timestamp_column_name}"
)
)
reader = flight_client.do_get(ticket)
return reader.read_all()
def compute_and_print_metrics(
test_data_timestamp_column,
test_data_field_column,
decompressed_columns,
error_bound,
):
# Arrays make iteration simpler and float32 match ModelarDB's precision.
test_data_timestamp_column = test_data_timestamp_column.to_numpy()
test_data_field_column = test_data_field_column.to_numpy().astype(numpy.float32)
decompressed_timestamp_column = decompressed_columns[0].to_numpy()
decompressed_field_column = decompressed_columns[1].to_numpy()
# Initialize variables for computing metrics.
equal_values = 0
sum_absolute_difference = 0.0
sum_absolute_test_data_values = 0.0
sum_actual_error_ratio_for_mape = 0.0
max_actual_error = 0.0
max_actual_error_test_data_value = 0.0
max_actual_error_decompressed_value = 0.0
# Initialize a Counter for the actual error of each decompressed value
# rounded to the nearest integer so a simple histogram can be printed.
ceiled_actual_error_counter = collections.Counter()
# Indices of the data points with a value that exceeds the error bound.
indices_of_values_above_error_bound = []
# Indices of the data points with a value that has an undefined error.
indices_of_values_with_undefined_error = []
# The length of each pair of timestamp and value columns should always be
# equal as this is required by both Apache Parquet files and Apache Arrow
# RecordBatches, however, it is checked just to be absolutely sure it is.
if len(test_data_timestamp_column) != len(decompressed_timestamp_column) or len(
test_data_field_column
) != len(decompressed_field_column):
print(
(
"ERROR: the length of the columns in the test data "
f"({len(test_data_timestamp_column)}) and length the decompressed "
f"columns ({len(decompressed_timestamp_column)}) are not equal."
)
)
return
# Compute metrics.
for index in range(0, len(test_data_timestamp_column)):
test_data_timestamp = test_data_timestamp_column[index]
test_data_value = test_data_field_column[index]
decompressed_timestamp = decompressed_timestamp_column[index]
decompressed_value = decompressed_field_column[index]
if test_data_timestamp != decompressed_timestamp:
print(
(
f"ERROR: at index {index}, the timestamp in the test data "
f"({test_data_timestamp}) and the decompressed timestamp "
f"({decompressed_timestamp}) are not equal."
)
)
return
if test_data_value == decompressed_value or (
math.isnan(test_data_value) and math.isnan(decompressed_value)
):
equal_values += 1
difference = 0.0
actual_error_ratio = 0.0
else:
difference = test_data_value - decompressed_value
actual_error_ratio = abs(difference / test_data_value)
actual_error = 100.0 * actual_error_ratio
sum_absolute_difference += abs(difference)
sum_absolute_test_data_values += abs(test_data_value)
sum_actual_error_ratio_for_mape += actual_error_ratio
if max_actual_error < actual_error:
max_actual_error = actual_error
max_actual_error_test_data_value = test_data_value
max_actual_error_decompressed_value = decompressed_value
# math.ceil() raises errors if it receives one of the special floats:
# -inf (OverflowError), inf (OverflowError), and NaN (ValueError).
try:
ceiled_actual_error_counter[math.ceil(actual_error)] += 1
except (OverflowError, ValueError):
ceiled_actual_error_counter["UNDEFINED"] += 1
indices_of_values_with_undefined_error.append(index)
if actual_error > error_bound:
indices_of_values_above_error_bound.append(index)
# Compute and print the final result.
print(f"- Total Number of Values: {len(decompressed_field_column)}")
print(f"- Without Error: {100 * (equal_values / len(decompressed_field_column))}%")
print(
(
"- Average Relative Error: "
f"{100 * (sum_absolute_difference / sum_absolute_test_data_values)}%"
)
)
print(
(
"- Mean Absolute Percentage Error: "
f"{100.0 * (sum_actual_error_ratio_for_mape / len(decompressed_field_column))}%"
)
)
print(
(
f"- Maximum Error: {max_actual_error}% due to {max_actual_error_test_data_value} "
f"(test data) and {max_actual_error_decompressed_value} (decompressed)"
)
)
print("- Error Ceil Histogram:", end="")
for ceiled_error in range(0, math.ceil(max_actual_error) + 1):
print(f" {ceiled_error}% {ceiled_actual_error_counter[ceiled_error]} ", end="")
if ceiled_actual_error_counter["UNDEFINED"] != 0:
print(f" Undefined {ceiled_actual_error_counter['UNDEFINED']}")
else:
print()
print_data_points_if_any(
"- Exceeded Error Bound (Timestamp, Test Data Value, Decompressed Value):",
indices_of_values_above_error_bound,
test_data_timestamp_column,
test_data_field_column,
decompressed_field_column,
)
print_data_points_if_any(
"- Undefined Actual Error (Timestamp, Test Data Value, Decompressed Value):",
indices_of_values_with_undefined_error,
test_data_timestamp_column,
test_data_field_column,
decompressed_field_column,
)
print()
def print_data_points_if_any(
header,
indices,
test_data_timestamp_column,
test_data_field_column,
decompressed_field_column,
):
if indices:
print(header)
for index in indices:
print(
(
f" {test_data_timestamp_column[index]}, "
f"{test_data_field_column[index]: .10f}, "
f"{decompressed_field_column[index]: .10f}"
)
)
def measure_data_folder_size_in_kib(data_folder):
du_output = subprocess.check_output(["du", "-k", "-d0", data_folder])
return int(du_output.split(b"\t")[0])
def send_sigint_to_process(process):
process.send_signal(signal.SIGINT)
# Ensure process is fully shutdown.
while process.poll() is None:
time.sleep(1)
process.wait()
stderr = process.stderr.read()
if errors_occurred(stderr):
print_stream(stderr)
return True
# Main Function.
if __name__ == "__main__":
# Ensure the necessary arguments are provided.
if len(sys.argv) < 2:
print(f"usage: {sys.argv[0]} parquet_file_or_folder relative_error_bound*")
sys.exit(1)
# The script assumes it runs on Linux.
if sys.platform != "linux":
print(f"ERROR: {sys.argv[0]} only supports Linux")
sys.exit(1)
# Clone repositories.
modelardb_folder = extract_repository_name(MODELARDB_REPOSITORY)
git_clone(MODELARDB_REPOSITORY)
utilities_folder = extract_repository_name(UTILITIES_REPOSITORY)
utilities_loader = f"{utilities_folder}Apache-Parquet-Loader/main.py"
git_clone(UTILITIES_REPOSITORY)
# Prepare new executable.
if not cargo_build_release(modelardb_folder):
raise ValueError("Failed to build ModelarDB in release mode.")
# Evaluate error bounds.
flight_client = flight.FlightClient("grpc://127.0.0.1:9999")
for maybe_error_bound in sys.argv[2:]:
# Prepare error bound.
error_bound = float(maybe_error_bound)
if error_bound < 0.0:
raise ValueError("Error bound must be a positive normal float.")
error_bound_str = maybe_error_bound
delimiter = (13 + len(error_bound_str)) * "="
print(delimiter)
print(f"Error Bound: {error_bound_str}")
print(delimiter)
# Prepare data folder.
temporary_directory = tempfile.TemporaryDirectory()
data_folder = temporary_directory.name
# Ingest the test data.
modelardbd = start_modelardbd(modelardb_folder, data_folder)
failed_ingest = ingest_test_data(utilities_loader, sys.argv[1], error_bound_str)
failed_sigint = send_sigint_to_process(modelardbd) # Flush.
if failed_ingest or failed_sigint:
raise ValueError("Failed to ingest test data.")
# Read the test data so metrics can be computed.
test_data = parquet.read_table(sys.argv[1])
# Retrieve each field column, compute metrics for it, and print them.
modelardbd = start_modelardbd(modelardb_folder, data_folder)
schema = retrieve_schema(flight_client)
timestamp_column_name = list(
filter(lambda nc: nc[1] == "timestamp[ms]", zip(schema.names, schema.types))
)[0][0]
test_data_timestamp_column = test_data.column(timestamp_column_name)
for column_name, column_type in zip(schema.names, schema.types):
if column_type == "float":
print(column_name)
field_column_name = column_name
try:
test_data_field_column = test_data.column(field_column_name)
except:
# Spaces in the name may have been replaced by underscores
# and upper case columns may have been converted to lower case.
field_column_name_lowercase_with_space = field_column_name.replace(
"_", " "
).lower()
test_data_field_column = test_data.column(
test_data.column_names.lower().index(
field_column_name_lowercase_with_space
)
)
decompressed_columns = retrieve_ingested_columns(
flight_client, timestamp_column_name, field_column_name
)
compute_and_print_metrics(
test_data_timestamp_column,
test_data_field_column,
decompressed_columns,
error_bound,
)
if send_sigint_to_process(modelardbd):
raise ValueError("Failed to measure the size of the data folder.")
size_of_data_folder = measure_data_folder_size_in_kib(data_folder)
print(
"Data Folder Size: {} KiB / {} MiB / {} GiB".format(
size_of_data_folder,
size_of_data_folder / 1024,
size_of_data_folder / 1024 / 1024,
)
)
flight_client.close()