From d42ffbb657e254e6aa9da6e47a6b947cee0c1dfa Mon Sep 17 00:00:00 2001 From: Alexis Date: Sat, 26 Apr 2025 01:15:40 -0500 Subject: [PATCH 01/22] refactoring statistics --- quasardb/stats.py | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/quasardb/stats.py b/quasardb/stats.py index a9e58c26..c69d8253 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -1,11 +1,13 @@ import re +from time import sleep + import quasardb import logging from datetime import datetime logger = logging.getLogger("quasardb.stats") - +MAX_KEYS=4*1024*1024 # 4 million max keys stats_prefix = "$qdb.statistics." user_pattern = re.compile(r"\$qdb.statistics.(.*).uid_([0-9]+)$") total_pattern = re.compile(r"\$qdb.statistics.(.*)$") @@ -50,7 +52,7 @@ def of_node(dconn): """ start = datetime.now() - raw = {k: _get_stat(dconn, k) for k in dconn.prefix_get(stats_prefix, 10000)} + raw = {k: _get_stat(dconn, k) for k in _get_all_keys(dconn)} ret = {"by_uid": _by_uid(raw), "cumulative": _cumulative(raw)} @@ -147,6 +149,29 @@ def stat_type(stat_id): return _stat_type(stat_id) +def _get_all_keys(dconn, n=1024): + """ + Returns all keys from a single node. + + Parameters: + dconn: quasardb.Node + Direct node connection to the node we wish to connect to. + + n: int + Number of keys to retrieve. + """ + xs = dconn.prefix_get(stats_prefix, n) + increase_rate = 8 + # keep getting keys while number of results exceeds the given "n" + while len(xs) >= n: + n = n * increase_rate + if n >= MAX_KEYS: + raise Exception(f"ERROR: Cannot fetch more than {MAX_KEYS} keys.") + xs = dconn.prefix_get(stats_prefix, n) + + return xs + + def _calculate_delta_stat(stat_id, prev, cur): logger.info( "calculating delta for stat_id = {}, prev = {}. cur = {}".format( @@ -239,7 +264,10 @@ def _cumulative(stats): matches = total_pattern.match(k) if is_cumulative_stat(k) and matches: metric = matches.groups()[0] + if metric.split('.')[-1] not in ["type", "unit"]: + metric += ".value" + metric_name, metric_att = metric.rsplit('.', 1) if not metric.startswith("serialized"): - xs[metric] = v + xs[metric_name] = {**xs.get(metric_name, {}), metric_att:v} return xs From 625150f0c1a13df2836511a00942203dff26de0a Mon Sep 17 00:00:00 2001 From: Alexis Date: Tue, 29 Apr 2025 21:34:45 -0500 Subject: [PATCH 02/22] updated stats delta for current structure --- quasardb/stats.py | 126 ++++++++++++++-------------------------------- 1 file changed, 39 insertions(+), 87 deletions(-) diff --git a/quasardb/stats.py b/quasardb/stats.py index c69d8253..d47e6352 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -7,7 +7,7 @@ logger = logging.getLogger("quasardb.stats") -MAX_KEYS=4*1024*1024 # 4 million max keys +MAX_KEYS = 4 * 1024 * 1024 # 4 million max keys stats_prefix = "$qdb.statistics." user_pattern = re.compile(r"\$qdb.statistics.(.*).uid_([0-9]+)$") total_pattern = re.compile(r"\$qdb.statistics.(.*)$") @@ -58,43 +58,20 @@ def of_node(dconn): check_duration = datetime.now() - start - ret["cumulative"]["check.online"] = 1 - ret["cumulative"]["check.duration_ms"] = int(check_duration.total_seconds() * 1000) + ret["cumulative"]["check.online"] = { + "value": 1, + "type": "accumulator", + "unit": "unit", + } + ret["cumulative"]["check.duration_ms"] = { + "value": int(check_duration.total_seconds() * 1000), + "type": "accumulator", + "unit": "milliseconds", + } return ret -_stat_types = { - "node_id": ("constant", None), - "operating_system": ("constant", None), - "partitions_count": ("constant", "count"), - "cpu.system": ("counter", "ns"), - "cpu.user": ("counter", "ns"), - "cpu.idle": ("counter", "ns"), - "startup": ("constant", None), - "startup_time": ("constant", None), - "shutdown_time": ("constant", None), - "network.current_users_count": ("gauge", "count"), - "hardware_concurrency": ("gauge", "count"), - "check.online": ("gauge", "count"), - "check.duration_ms": ("constant", "ms"), - "requests.bytes_in": ("counter", "bytes"), - "requests.bytes_out": ("counter", "bytes"), - "requests.errors_count": ("counter", "count"), - "requests.successes_count": ("counter", "count"), - "requests.total_count": ("counter", "count"), - "async_pipelines.merge.bucket_count": ("counter", "count"), - "async_pipelines.merge.duration_us": ("counter", "us"), - "async_pipelines.write.successes_count": ("counter", "count"), - "async_pipelines.write.failures_count": ("counter", "count"), - "async_pipelines.write.time_us": ("counter", "us"), - "async_pipelines.merge.max_bucket_count": ("gauge", "count"), - "async_pipelines.merge.max_depth_count": ("gauge", "count"), - "async_pipelines.merge.requests_count": ("counter", "count"), - "evicted.count": ("counter", "count"), - "pageins.count": ("counter", "count"), -} - async_pipeline_bytes_pattern = re.compile( r"async_pipelines.pipe_[0-9]+.merge_map.bytes" ) @@ -103,39 +80,6 @@ def of_node(dconn): ) -def _stat_type(stat_id): - if stat_id in _stat_types: - return _stat_types[stat_id] - elif stat_id.endswith("total_ns"): - return ("counter", "ns") - elif stat_id.endswith("total_bytes"): - return ("counter", "bytes") - elif stat_id.endswith("read_bytes"): - return ("counter", "bytes") - elif stat_id.endswith("written_bytes"): - return ("counter", "bytes") - elif stat_id.endswith("total_count"): - return ("counter", "count") - elif stat_id.startswith("network.sessions."): - return ("gauge", "count") - elif stat_id.startswith("memory."): - # memory statistics are all gauges i think, describes how much memory currently allocated where - return ("gauge", "bytes") - elif stat_id.startswith("persistence.") or stat_id.startswith("disk"): - # persistence are also all gauges, describes mostly how much is currently available/used on storage - return ("gauge", "bytes") - elif stat_id.startswith("license."): - return ("gauge", None) - elif stat_id.startswith("engine_"): - return ("constant", None) - elif async_pipeline_bytes_pattern.match(stat_id): - return ("gauge", "bytes") - elif async_pipeline_count_pattern.match(stat_id): - return ("gauge", "count") - else: - return None - - def stat_type(stat_id): """ Returns the statistic type by a stat id. Returns one of: @@ -146,7 +90,7 @@ def stat_type(stat_id): This is useful for determining which value should be reported in a dashboard. """ - return _stat_type(stat_id) + raise Exception("Deprecated Method.") def _get_all_keys(dconn, n=1024): @@ -160,11 +104,12 @@ def _get_all_keys(dconn, n=1024): n: int Number of keys to retrieve. """ - xs = dconn.prefix_get(stats_prefix, n) + xs = None increase_rate = 8 # keep getting keys while number of results exceeds the given "n" - while len(xs) >= n: - n = n * increase_rate + while xs is None or len(xs) >= n: + if xs is not None: + n = n * increase_rate if n >= MAX_KEYS: raise Exception(f"ERROR: Cannot fetch more than {MAX_KEYS} keys.") xs = dconn.prefix_get(stats_prefix, n) @@ -172,32 +117,38 @@ def _get_all_keys(dconn, n=1024): return xs -def _calculate_delta_stat(stat_id, prev, cur): +def _calculate_delta_stat(stat_id, prev_stat, curr_stat): logger.info( "calculating delta for stat_id = {}, prev = {}. cur = {}".format( - stat_id, prev, cur + stat_id, str(prev_stat["value"]), str(prev_stat["value"]) ) ) - - stat_type = _stat_type(stat_id) - if stat_type == "counter": - return cur - prev - elif stat_type == "gauge": - return cur + if curr_stat["type"] in ["counter", "accumulator"]: + return {**curr_stat, "value": curr_stat["value"] - prev_stat["value"]} + elif curr_stat["type"] == "gauge": + return {**curr_stat} else: return None def _calculate_delta_stats(prev_stats, cur_stats): + """ + Args: + prev_stats: previous dictionary of cumulative stats + cur_stats: current dictionary of cumulative stats + + Returns: + a new dictionary with same structure and updated values + """ ret = {} for stat_id in cur_stats.keys(): try: - prev_stat = cur_stats[stat_id] + prev_stat = prev_stats[stat_id] cur_stat = cur_stats[stat_id] - value = _calculate_delta_stat(stat_id, prev_stat, cur_stat) - if value is not None: - ret[stat_id] = value + stat_content = _calculate_delta_stat(stat_id, prev_stat, cur_stat) + if stat_content is not None: + ret[stat_id] = stat_content except KeyError: # Stat likely was not present yet in prev_stats @@ -212,7 +163,8 @@ def calculate_delta(prev, cur): """ ret = {} for node_id in cur.keys(): - ret[node_id] = _calculate_delta_stats( + ret[node_id] = {"by_uid": {}, "cumulative": {}} + ret[node_id]["cumulative"] = _calculate_delta_stats( prev[node_id]["cumulative"], cur[node_id]["cumulative"] ) @@ -264,10 +216,10 @@ def _cumulative(stats): matches = total_pattern.match(k) if is_cumulative_stat(k) and matches: metric = matches.groups()[0] - if metric.split('.')[-1] not in ["type", "unit"]: + if metric.split(".")[-1] not in ["type", "unit"]: metric += ".value" - metric_name, metric_att = metric.rsplit('.', 1) + metric_name, metric_att = metric.rsplit(".", 1) if not metric.startswith("serialized"): - xs[metric_name] = {**xs.get(metric_name, {}), metric_att:v} + xs[metric_name] = {**xs.get(metric_name, {}), metric_att: v} return xs From dba8de783b318f04331e3cf9accd4480e0acece2 Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Fri, 2 May 2025 12:51:29 +0700 Subject: [PATCH 03/22] Improve commtns --- scripts/teamcity/10.build.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/teamcity/10.build.sh b/scripts/teamcity/10.build.sh index 94d56644..042ca998 100755 --- a/scripts/teamcity/10.build.sh +++ b/scripts/teamcity/10.build.sh @@ -9,8 +9,10 @@ git config --global --add safe.directory '*' # No more errors should occur after here set -e -u -x -# Now use a virtualenv to run the tests PYTHON="${PYTHON_CMD:-python3}" + +# Now use a virtualenv to run the tests. If the virtualenv already exists, we remove +# it to ensure a clean install. ${PYTHON} -m venv --clear ${SCRIPT_DIR}/../../.env/ if [[ "$(uname)" == MINGW* ]] then From 3375546c9cc7b22c6a7e0db0358dacda83dd4bbb Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Fri, 2 May 2025 12:51:40 +0700 Subject: [PATCH 04/22] Support incremental rebuilds Reuses venv, reuses `build/` directory --- scripts/teamcity/20.test.sh | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/scripts/teamcity/20.test.sh b/scripts/teamcity/20.test.sh index 8779e95e..e42bb7a0 100644 --- a/scripts/teamcity/20.test.sh +++ b/scripts/teamcity/20.test.sh @@ -129,13 +129,20 @@ fi if [[ -d "build/" ]] then - echo "Removing build/" - rm -rf build/ + echo "Warning: build/ directory already exists, assuming incremental compilation, reusing build artifacts" fi -# Now use a virtualenv to run the tests +# Now use a virtualenv to run the tests. If the virtualenv already exists, we reuse it +# to avoid frequent rebuilds. + +if [[ -d "${SCRIPT_DIR}/../../.env/" ]] +then + echo "virtualenv already exists, skip creating new venv" +else + echo "Creating new virtualenv" + ${PYTHON} -m venv --clear ${SCRIPT_DIR}/../../.env/ +fi -${PYTHON} -m venv --clear ${SCRIPT_DIR}/../../.env/ if [[ "$(uname)" == MINGW* ]] then VENV_PYTHON="${SCRIPT_DIR}/../../.env/Scripts/python.exe" @@ -143,11 +150,10 @@ else VENV_PYTHON="${SCRIPT_DIR}/../../.env/bin/python" fi - ${VENV_PYTHON} -m pip install --upgrade -r dev-requirements.txt export QDB_TESTS_ENABLED=ON -${VENV_PYTHON} -m build -w +${VENV_PYTHON} setup.py bdist_wheel ${VENV_PYTHON} -m pip install --no-deps --force-reinstall dist/quasardb-*.whl From c936c7b3146fdf61b00c58aa20a017d9947932cc Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Fri, 2 May 2025 12:52:56 +0700 Subject: [PATCH 05/22] We can just keep using the `build` module --- scripts/teamcity/20.test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/teamcity/20.test.sh b/scripts/teamcity/20.test.sh index e42bb7a0..ea3512f8 100644 --- a/scripts/teamcity/20.test.sh +++ b/scripts/teamcity/20.test.sh @@ -153,7 +153,7 @@ fi ${VENV_PYTHON} -m pip install --upgrade -r dev-requirements.txt export QDB_TESTS_ENABLED=ON -${VENV_PYTHON} setup.py bdist_wheel +${VENV_PYTHON} -m build -w ${VENV_PYTHON} -m pip install --no-deps --force-reinstall dist/quasardb-*.whl From 5b7e2cb806fab31d87a03e3acd02427642c06b85 Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Fri, 2 May 2025 17:51:04 +0700 Subject: [PATCH 06/22] [WiP] --- quasardb/stats.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/quasardb/stats.py b/quasardb/stats.py index d47e6352..86325be7 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -52,7 +52,7 @@ def of_node(dconn): """ start = datetime.now() - raw = {k: _get_stat(dconn, k) for k in _get_all_keys(dconn)} + raw = {k: _get_stat(dconn, k) for k in _index_keys(dconn, _get_all_keys(dconn))} ret = {"by_uid": _by_uid(raw), "cumulative": _cumulative(raw)} @@ -116,6 +116,17 @@ def _get_all_keys(dconn, n=1024): return xs +def _index_keys(dconn, ks): + """ + Takes all statistics keys that are retrieved, and "indexes" them in such a way + that we end up with a dict of all statistic keys, their type and their unit. + """ + + for k in ks: + print(f"k: {k}") + + return ks + def _calculate_delta_stat(stat_id, prev_stat, curr_stat): logger.info( From bc00d244e349be768595f59d5ad38b6bed16ab35 Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Tue, 6 May 2025 10:18:45 +0700 Subject: [PATCH 07/22] Sync test setup to enable faster statistics --- scripts/tests/setup | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/tests/setup b/scripts/tests/setup index 8fc3c537..685fd72b 160000 --- a/scripts/tests/setup +++ b/scripts/tests/setup @@ -1 +1 @@ -Subproject commit 8fc3c5377b627778b552bbc4a4bea8b867e24038 +Subproject commit 685fd72bcc2d59ca933cb161eceba1829cfcacad From 54cfd7319e403938cf381cc0f4c327a40800c7e8 Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Tue, 6 May 2025 11:00:53 +0700 Subject: [PATCH 08/22] Implement functions that enable for looking up stat type/unit --- quasardb/stats.py | 73 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/quasardb/stats.py b/quasardb/stats.py index 86325be7..bc2a6d74 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -3,7 +3,9 @@ import quasardb import logging +from collections import defaultdict from datetime import datetime +from enum import Enum logger = logging.getLogger("quasardb.stats") @@ -116,6 +118,77 @@ def _get_all_keys(dconn, n=1024): return xs +class Type(Enum): + ACCUMULATOR = 1 + GAUGE = 2 + LABEL = 3 + +class Unit(Enum): + NONE = 0 + COUNT = 1 + + # Size units + BYTES = 32 + + # Time/duration units + EPOCH = 64, + NANOSECONDS = 65 + MICROSECONDS = 66 + MILLISECONDS = 67 + SECONDS = 68 + + + +_type_string_to_enum = {'accumulator': Type.ACCUMULATOR, + 'gauge': Type.GAUGE, + 'label': Type.LABEL, + } + +_unit_string_to_enum = {'none': Unit.NONE, + 'count': Unit.COUNT, + 'bytes': Unit.BYTES, + 'epoch': Unit.EPOCH, + 'nanoseconds': Unit.NANOSECONDS, + 'microseconds': Unit.MICROSECONDS, + 'milliseconds': Unit.MILLISECONDS, + 'seconds': Unit.SECONDS, + } + +def _lookup_enum(dconn, k, m): + """ + Utility function to avoid code duplication: automatically looks up a key's value + from QuasarDB and looks it up in provided dict. + """ + + x = dconn.blob(k).get() + x = _clean_blob(x) + + if x not in m: + print("x: ", x) + print("type(x): ", type(x)) + print("k: ", k) + raise Exception(f"Unrecognized unit/type {x} from key {k}") + + return m[x] + +def _lookup_type(dconn, k): + """ + Looks up and parses/validates the metric type. + """ + assert k.endswith('.type') + + return _lookup_enum(dconn, k, _type_string_to_enum) + + +def _lookup_unit(dconn, k): + """ + Looks up and parses/validates the metric type. + """ + assert k.endswith('.unit') + + return _lookup_enum(dconn, k, _unit_string_to_enum) + + def _index_keys(dconn, ks): """ Takes all statistics keys that are retrieved, and "indexes" them in such a way From e8c3f94e420c6bfb249bc6d08bad7eda3c9491d9 Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Tue, 6 May 2025 11:02:04 +0700 Subject: [PATCH 09/22] Some additional comments on why this function exists --- quasardb/stats.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/quasardb/stats.py b/quasardb/stats.py index bc2a6d74..5985de88 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -256,6 +256,10 @@ def calculate_delta(prev, cur): def _clean_blob(x): + """ + Utility function that decodes a blob as an UTF-8 string, as the direct node C API + does not yet support 'string' types and as such all statistics are stored as blobs. + """ x_ = x.decode("utf-8", "replace") # remove trailing zero-terminator From 2361e4afff4b9b610cb5bc608b5acefe171b9e1f Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Tue, 6 May 2025 11:02:20 +0700 Subject: [PATCH 10/22] Make progress towards indexing and looking up stats --- quasardb/stats.py | 73 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 11 deletions(-) diff --git a/quasardb/stats.py b/quasardb/stats.py index 5985de88..d6ec7c97 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -54,21 +54,24 @@ def of_node(dconn): """ start = datetime.now() - raw = {k: _get_stat(dconn, k) for k in _index_keys(dconn, _get_all_keys(dconn))} - ret = {"by_uid": _by_uid(raw), "cumulative": _cumulative(raw)} + ks = _get_all_keys(dconn) + idx = _index_keys(dconn, ks) + raw = {k: _get_stat(dconn, k) for k in ks} + + ret = {"by_uid": _by_uid(raw, idx), "cumulative": _cumulative(raw, idx)} check_duration = datetime.now() - start ret["cumulative"]["check.online"] = { "value": 1, - "type": "accumulator", - "unit": "unit", + "type": Type.ACCUMULATOR, + "unit": Unit.NONE, } ret["cumulative"]["check.duration_ms"] = { "value": int(check_duration.total_seconds() * 1000), - "type": "accumulator", - "unit": "milliseconds", + "type": Type.ACCUMULATOR, + "unit": Unit.MILLISECONDS, } return ret @@ -195,10 +198,59 @@ def _index_keys(dconn, ks): that we end up with a dict of all statistic keys, their type and their unit. """ + ### + # The keys generally look like this, for example: + # + # $qdb.statistics.requests.out_bytes + # $qdb.statistics.requests.out_bytes.type + # $qdb.statistics.requests.out_bytes.uid_1 + # $qdb.statistics.requests.out_bytes.uid_1.type + # $qdb.statistics.requests.out_bytes.uid_1.unit + # $qdb.statistics.requests.out_bytes.unit + # + # For this purpose, we simply get rid of the "uid" part, as the per-uid metrics are guaranteed + # to be of the exact same type as all the others. So after trimming those, the keys will look + # like this: + # + # $qdb.statistics.requests.out_bytes + # $qdb.statistics.requests.out_bytes.type + # $qdb.statistics.requests.out_bytes + # $qdb.statistics.requests.out_bytes.type + # $qdb.statistics.requests.out_bytes.unit + # $qdb.statistics.requests.out_bytes.unit + # + # And after deduplication like this: + # + # $qdb.statistics.requests.out_bytes + # $qdb.statistics.requests.out_bytes.type + # $qdb.statistics.requests.out_bytes.unit + # + # In which case we'll store `requests.out_bytes` as the statistic type, and look up the type + # and unit for those metrics and add a placeholder value. + + ret = defaultdict(lambda: {'value': None, 'type': None, 'unit': None}) + for k in ks: - print(f"k: {k}") + if "uid_" in k: + # Per-uid key, skipping these as described above + continue + + parts = k.rsplit(".", 1) - return ks + if parts[1] == 'type': + stat_id = parts[0] + + if ret[stat_id]['type'] == None: + # We haven't seen this particular statistic yet + ret[stat_id]['type'] = _lookup_type(dconn, k) + elif parts[1] == 'unit': + stat_id = parts[0] + + if ret[stat_id]['unit'] == None: + # We haven't seen this particular statistic yet + ret[stat_id]['unit'] = _lookup_unit(dconn, k) + + return ret def _calculate_delta_stat(stat_id, prev_stat, curr_stat): @@ -265,7 +317,6 @@ def _clean_blob(x): # remove trailing zero-terminator return "".join(c for c in x_ if ord(c) != 0) - def _get_stat(dconn, k): # Ugly, but works: try to retrieve as integer, if not an int, retrieve as # blob @@ -281,7 +332,7 @@ def _get_stat(dconn, k): return _clean_blob(dconn.blob(k).get()) -def _by_uid(stats): +def _by_uid(stats, idx): xs = {} for k, v in stats.items(): matches = user_pattern.match(k) @@ -297,7 +348,7 @@ def _by_uid(stats): return xs -def _cumulative(stats): +def _cumulative(stats, idx): xs = {} for k, v in stats.items(): From 351123ec2026158bfdf33f59105df7b772f11542 Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Tue, 6 May 2025 11:02:33 +0700 Subject: [PATCH 11/22] Speed up tests, ensure all tests can be executed individually --- tests/test_stats.py | 39 ++++++++++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/tests/test_stats.py b/tests/test_stats.py index 3364094d..e7216882 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -20,16 +20,37 @@ def _write_data(conn, table): ) -def _ensure_stats(conn, table): - global _has_stats +def _has_stats(conn): + xs = qdbst.by_node(conn) + + # As we always use a secure connection, we will wait for by-uid statistics to appear. + # + # We also don't expect a cluster to be running, so just ensure we have exactly 1 node. + ks = list(xs.keys()) + assert len(ks) == 1 + + node_id = ks[0] + node_stats = xs[node_id] + + assert 'by_uid' in node_stats + uid_stats = node_stats['by_uid'] + # The actual check happens here: we expect at least 1 per-uid statistic + return len(uid_stats.keys()) > 0 + +def _ensure_stats(conn, table): # This function is merely here to generate some activity on the cluster, # so that we're guaranteed to have some statistics - if _has_stats is False: + + max_polls = 10 + n = 0 + + while _has_stats(conn) is False: _write_data(conn, table) - # Statistics refresh interval is 5 seconds. - sleep(6) - _has_stats = True + sleep(1) + + n = n + 1 + assert n <= max_polls # A fairly conservative set of user-specific stats we always expect after doing our @@ -81,6 +102,7 @@ def _validate_node_stats(stats): def test_stats_by_node(qdbd_secure_connection, secure_table): _ensure_stats(qdbd_secure_connection, secure_table) + xs = qdbst.by_node(qdbd_secure_connection) assert len(xs) == 1 @@ -91,7 +113,7 @@ def test_stats_by_node(qdbd_secure_connection, secure_table): def test_stats_of_node(qdbd_settings, qdbd_secure_connection, secure_table): # First seed the table - # _ensure_stats(qdbd_secure_connection, secure_table) + _ensure_stats(qdbd_secure_connection, secure_table) # Now establish direct connection conn = quasardb.Node( @@ -106,8 +128,11 @@ def test_stats_of_node(qdbd_settings, qdbd_secure_connection, secure_table): def test_stats_regex(): + _ensure_stats(qdbd_secure_connection, secure_table) + # This is mostly to test a regression, where user-stats for users with multi-digit ids # were not picked up. + user_stats = [ "$qdb.statistics.foo.uid_1", "$qdb.statistics.foo.uid_21", From 79f52d2eddb76f683eb2b0b1fbb06288f6e48f53 Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Tue, 6 May 2025 11:55:06 +0700 Subject: [PATCH 12/22] It's working! --- quasardb/stats.py | 73 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 52 insertions(+), 21 deletions(-) diff --git a/quasardb/stats.py b/quasardb/stats.py index d6ec7c97..1db652d9 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -11,9 +11,11 @@ MAX_KEYS = 4 * 1024 * 1024 # 4 million max keys stats_prefix = "$qdb.statistics." + +# Compile these regexes once for speed user_pattern = re.compile(r"\$qdb.statistics.(.*).uid_([0-9]+)$") total_pattern = re.compile(r"\$qdb.statistics.(.*)$") - +user_clean_pattern = re.compile(r"\.uid_\d+") def is_user_stat(s): return user_pattern.match(s) is not None @@ -57,7 +59,7 @@ def of_node(dconn): ks = _get_all_keys(dconn) idx = _index_keys(dconn, ks) - raw = {k: _get_stat(dconn, k) for k in ks} + raw = {k: _get_stat_value(dconn, k) for k in ks} ret = {"by_uid": _by_uid(raw, idx), "cumulative": _cumulative(raw, idx)} @@ -230,25 +232,27 @@ def _index_keys(dconn, ks): ret = defaultdict(lambda: {'value': None, 'type': None, 'unit': None}) + for k in ks: - if "uid_" in k: - # Per-uid key, skipping these as described above - continue + # Remove any 'uid_[0-9]+' part from the string + k_ = user_clean_pattern.sub("", k) - parts = k.rsplit(".", 1) + matches = total_pattern.match(k_) - if parts[1] == 'type': - stat_id = parts[0] + parts = matches.groups()[0].rsplit(".", 1) + metric_id = parts[0] - if ret[stat_id]['type'] == None: + if len(parts) > 1 and parts[1] == 'type': + if ret[metric_id]['type'] == None: # We haven't seen this particular statistic yet - ret[stat_id]['type'] = _lookup_type(dconn, k) - elif parts[1] == 'unit': - stat_id = parts[0] - - if ret[stat_id]['unit'] == None: + ret[metric_id]['type'] = _lookup_type(dconn, k) + elif len(parts) > 1 and parts[1] == 'unit': + if ret[metric_id]['unit'] == None: # We haven't seen this particular statistic yet - ret[stat_id]['unit'] = _lookup_unit(dconn, k) + ret[metric_id]['unit'] = _lookup_unit(dconn, k) + else: + # It's a value, we look those up later + pass return ret @@ -317,9 +321,12 @@ def _clean_blob(x): # remove trailing zero-terminator return "".join(c for c in x_ if ord(c) != 0) -def _get_stat(dconn, k): +def _get_stat_value(dconn, k): # Ugly, but works: try to retrieve as integer, if not an int, retrieve as # blob + # + # XXX(leon): we could use the index we built to get a much stronger hint + # on what the type is. try: return dconn.integer(k).get() @@ -338,6 +345,18 @@ def _by_uid(stats, idx): matches = user_pattern.match(k) if is_user_stat(k) and matches: (metric, uid_str) = matches.groups() + + if metric.split(".")[-1] in ["type", "unit"]: + # We already indexed the type and unit in our idx, this is not interesting + continue + + if metric.startswith("serialized"): + # Internal stuff we don't care about nor cannot do anything with + continue + + if not metric in idx: + raise Exception(f"Metric not in internal index: {metric}") + uid = int(uid_str) if uid not in xs: xs[uid] = {} @@ -355,10 +374,22 @@ def _cumulative(stats, idx): matches = total_pattern.match(k) if is_cumulative_stat(k) and matches: metric = matches.groups()[0] - if metric.split(".")[-1] not in ["type", "unit"]: - metric += ".value" - metric_name, metric_att = metric.rsplit(".", 1) - if not metric.startswith("serialized"): - xs[metric_name] = {**xs.get(metric_name, {}), metric_att: v} + + if metric.split(".")[-1] in ["type", "unit"]: + # We already indexed the type and unit in our idx, this is not interesting + continue + + if metric.startswith("serialized"): + # Internal stuff we don't care about nor cannot do anything with + continue + + if not metric in idx: + raise Exception(f"Metric not in internal index: {metric}") + + x = idx[metric].copy() + x["value"] = v + xs[metric] = x return xs + +# async_pipelines.buffer.total_bytes From 913eb38d02e03f089df20559fbdba9c3f4efc422 Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Tue, 6 May 2025 11:59:09 +0700 Subject: [PATCH 13/22] Formatting --- quasardb/stats.py | 55 +++++++++++++++++++++++++-------------------- tests/test_stats.py | 5 +++-- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/quasardb/stats.py b/quasardb/stats.py index 1db652d9..b0f01a0b 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -17,6 +17,7 @@ total_pattern = re.compile(r"\$qdb.statistics.(.*)$") user_clean_pattern = re.compile(r"\.uid_\d+") + def is_user_stat(s): return user_pattern.match(s) is not None @@ -123,11 +124,13 @@ def _get_all_keys(dconn, n=1024): return xs + class Type(Enum): ACCUMULATOR = 1 GAUGE = 2 LABEL = 3 + class Unit(Enum): NONE = 0 COUNT = 1 @@ -136,28 +139,30 @@ class Unit(Enum): BYTES = 32 # Time/duration units - EPOCH = 64, + EPOCH = (64,) NANOSECONDS = 65 MICROSECONDS = 66 MILLISECONDS = 67 SECONDS = 68 +_type_string_to_enum = { + "accumulator": Type.ACCUMULATOR, + "gauge": Type.GAUGE, + "label": Type.LABEL, +} -_type_string_to_enum = {'accumulator': Type.ACCUMULATOR, - 'gauge': Type.GAUGE, - 'label': Type.LABEL, - } +_unit_string_to_enum = { + "none": Unit.NONE, + "count": Unit.COUNT, + "bytes": Unit.BYTES, + "epoch": Unit.EPOCH, + "nanoseconds": Unit.NANOSECONDS, + "microseconds": Unit.MICROSECONDS, + "milliseconds": Unit.MILLISECONDS, + "seconds": Unit.SECONDS, +} -_unit_string_to_enum = {'none': Unit.NONE, - 'count': Unit.COUNT, - 'bytes': Unit.BYTES, - 'epoch': Unit.EPOCH, - 'nanoseconds': Unit.NANOSECONDS, - 'microseconds': Unit.MICROSECONDS, - 'milliseconds': Unit.MILLISECONDS, - 'seconds': Unit.SECONDS, - } def _lookup_enum(dconn, k, m): """ @@ -176,11 +181,12 @@ def _lookup_enum(dconn, k, m): return m[x] + def _lookup_type(dconn, k): """ Looks up and parses/validates the metric type. """ - assert k.endswith('.type') + assert k.endswith(".type") return _lookup_enum(dconn, k, _type_string_to_enum) @@ -189,7 +195,7 @@ def _lookup_unit(dconn, k): """ Looks up and parses/validates the metric type. """ - assert k.endswith('.unit') + assert k.endswith(".unit") return _lookup_enum(dconn, k, _unit_string_to_enum) @@ -230,8 +236,7 @@ def _index_keys(dconn, ks): # In which case we'll store `requests.out_bytes` as the statistic type, and look up the type # and unit for those metrics and add a placeholder value. - ret = defaultdict(lambda: {'value': None, 'type': None, 'unit': None}) - + ret = defaultdict(lambda: {"value": None, "type": None, "unit": None}) for k in ks: # Remove any 'uid_[0-9]+' part from the string @@ -242,14 +247,14 @@ def _index_keys(dconn, ks): parts = matches.groups()[0].rsplit(".", 1) metric_id = parts[0] - if len(parts) > 1 and parts[1] == 'type': - if ret[metric_id]['type'] == None: + if len(parts) > 1 and parts[1] == "type": + if ret[metric_id]["type"] == None: # We haven't seen this particular statistic yet - ret[metric_id]['type'] = _lookup_type(dconn, k) - elif len(parts) > 1 and parts[1] == 'unit': - if ret[metric_id]['unit'] == None: + ret[metric_id]["type"] = _lookup_type(dconn, k) + elif len(parts) > 1 and parts[1] == "unit": + if ret[metric_id]["unit"] == None: # We haven't seen this particular statistic yet - ret[metric_id]['unit'] = _lookup_unit(dconn, k) + ret[metric_id]["unit"] = _lookup_unit(dconn, k) else: # It's a value, we look those up later pass @@ -321,6 +326,7 @@ def _clean_blob(x): # remove trailing zero-terminator return "".join(c for c in x_ if ord(c) != 0) + def _get_stat_value(dconn, k): # Ugly, but works: try to retrieve as integer, if not an int, retrieve as # blob @@ -392,4 +398,5 @@ def _cumulative(stats, idx): return xs + # async_pipelines.buffer.total_bytes diff --git a/tests/test_stats.py b/tests/test_stats.py index e7216882..99dd59fe 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -32,12 +32,13 @@ def _has_stats(conn): node_id = ks[0] node_stats = xs[node_id] - assert 'by_uid' in node_stats - uid_stats = node_stats['by_uid'] + assert "by_uid" in node_stats + uid_stats = node_stats["by_uid"] # The actual check happens here: we expect at least 1 per-uid statistic return len(uid_stats.keys()) > 0 + def _ensure_stats(conn, table): # This function is merely here to generate some activity on the cluster, # so that we're guaranteed to have some statistics From 10ff90567c68ad7bd393008114444fa391310370 Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Tue, 6 May 2025 12:00:47 +0700 Subject: [PATCH 14/22] Remove delta calculation, unused code --- quasardb/stats.py | 54 ----------------------------------------------- 1 file changed, 54 deletions(-) diff --git a/quasardb/stats.py b/quasardb/stats.py index b0f01a0b..54ea2096 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -262,60 +262,6 @@ def _index_keys(dconn, ks): return ret -def _calculate_delta_stat(stat_id, prev_stat, curr_stat): - logger.info( - "calculating delta for stat_id = {}, prev = {}. cur = {}".format( - stat_id, str(prev_stat["value"]), str(prev_stat["value"]) - ) - ) - if curr_stat["type"] in ["counter", "accumulator"]: - return {**curr_stat, "value": curr_stat["value"] - prev_stat["value"]} - elif curr_stat["type"] == "gauge": - return {**curr_stat} - else: - return None - - -def _calculate_delta_stats(prev_stats, cur_stats): - """ - Args: - prev_stats: previous dictionary of cumulative stats - cur_stats: current dictionary of cumulative stats - - Returns: - a new dictionary with same structure and updated values - """ - ret = {} - for stat_id in cur_stats.keys(): - try: - prev_stat = prev_stats[stat_id] - cur_stat = cur_stats[stat_id] - - stat_content = _calculate_delta_stat(stat_id, prev_stat, cur_stat) - if stat_content is not None: - ret[stat_id] = stat_content - - except KeyError: - # Stat likely was not present yet in prev_stats - pass - - return ret - - -def calculate_delta(prev, cur): - """ - Calculates the 'delta' between two successive statistic measurements. - """ - ret = {} - for node_id in cur.keys(): - ret[node_id] = {"by_uid": {}, "cumulative": {}} - ret[node_id]["cumulative"] = _calculate_delta_stats( - prev[node_id]["cumulative"], cur[node_id]["cumulative"] - ) - - return ret - - def _clean_blob(x): """ Utility function that decodes a blob as an UTF-8 string, as the direct node C API From 740e6625fc669141975498a16db6d7c6dd2cd55c Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Tue, 6 May 2025 12:01:32 +0700 Subject: [PATCH 15/22] Remove debug --- quasardb/stats.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/quasardb/stats.py b/quasardb/stats.py index 54ea2096..f1be3260 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -174,9 +174,6 @@ def _lookup_enum(dconn, k, m): x = _clean_blob(x) if x not in m: - print("x: ", x) - print("type(x): ", type(x)) - print("k: ", k) raise Exception(f"Unrecognized unit/type {x} from key {k}") return m[x] From adee76cb2f2932ea74e1f7f6f12c1de607d8c7ec Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Tue, 6 May 2025 16:13:23 +0700 Subject: [PATCH 16/22] Proper deprecation warning + tests --- quasardb/stats.py | 10 +++++++++- tests/test_stats.py | 4 ++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/quasardb/stats.py b/quasardb/stats.py index f1be3260..37af6969 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -98,7 +98,15 @@ def stat_type(stat_id): This is useful for determining which value should be reported in a dashboard. """ - raise Exception("Deprecated Method.") + import warnings + + warnings.warn( + "The 'stat_type' method is deprecated and will be removed in a future release." + "The stat type and unit are now part of the return value of invocations to the 'of_node' and 'by_node' methods.", + DeprecationWarning, + stacklevel=2) + + return None def _get_all_keys(dconn, n=1024): diff --git a/tests/test_stats.py b/tests/test_stats.py index 99dd59fe..fdfa2bed 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -153,3 +153,7 @@ def test_stats_regex(): for s in total_stats: assert qdbst.is_user_stat(s) is False assert qdbst.is_cumulative_stat(s) is True + +def test_stat_type_is_deprecated(): + with pytest.deprecated_call(): + qdbst.stat_type("foobar") From 47f4ade8c5dbc72df900c32255573c2e3ff6edbb Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Tue, 6 May 2025 16:18:03 +0700 Subject: [PATCH 17/22] Formatting --- quasardb/stats.py | 3 ++- tests/test_stats.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/quasardb/stats.py b/quasardb/stats.py index 37af6969..8ed9f6bc 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -104,7 +104,8 @@ def stat_type(stat_id): "The 'stat_type' method is deprecated and will be removed in a future release." "The stat type and unit are now part of the return value of invocations to the 'of_node' and 'by_node' methods.", DeprecationWarning, - stacklevel=2) + stacklevel=2, + ) return None diff --git a/tests/test_stats.py b/tests/test_stats.py index fdfa2bed..98ce5822 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -154,6 +154,7 @@ def test_stats_regex(): assert qdbst.is_user_stat(s) is False assert qdbst.is_cumulative_stat(s) is True + def test_stat_type_is_deprecated(): with pytest.deprecated_call(): qdbst.stat_type("foobar") From 1e67d4ef28435eac8c659b6ec0a6956e3e6de4c2 Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Tue, 6 May 2025 16:22:45 +0700 Subject: [PATCH 18/22] Broken test --- tests/test_stats.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_stats.py b/tests/test_stats.py index 98ce5822..cf85111a 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -129,8 +129,6 @@ def test_stats_of_node(qdbd_settings, qdbd_secure_connection, secure_table): def test_stats_regex(): - _ensure_stats(qdbd_secure_connection, secure_table) - # This is mostly to test a regression, where user-stats for users with multi-digit ids # were not picked up. From 23b86b0763464831cc68fa31a229f3b597122137 Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Tue, 6 May 2025 16:40:03 +0700 Subject: [PATCH 19/22] More validations + tests --- quasardb/stats.py | 9 +++++++-- tests/test_stats.py | 26 ++++++++++++++++++++++---- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/quasardb/stats.py b/quasardb/stats.py index 8ed9f6bc..6e218035 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -315,12 +315,17 @@ def _by_uid(stats, idx): if not metric in idx: raise Exception(f"Metric not in internal index: {metric}") + # Parse user id uid = int(uid_str) + + # Prepare our metric dict + x = idx[metric].copy() + x["value"] = v + if uid not in xs: xs[uid] = {} - if not metric.startswith("serialized"): - xs[uid][metric] = v + xs[uid][metric] = x return xs diff --git a/tests/test_stats.py b/tests/test_stats.py index cf85111a..0802acd1 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -79,6 +79,25 @@ def _ensure_stats(conn, table): ] +def _validate_stats_dict(xs): + """ + Validates a dict with keys / stat tuples association. + """ + for k, x in xs.items(): + print("k: ", k, ", x: ", x) + + # Each statistic is a dict of type/unit/value + assert isinstance(x, dict) + assert "value" in x + assert "type" in x + assert "unit" in x + + # Everything that's not a NONE unit (i.e. not a label) should be an int + if x["unit"] != qdbst.Unit.NONE: + + assert isinstance(x['value'], int) + + def _validate_node_stats(stats): assert "by_uid" in stats assert "cumulative" in stats @@ -90,16 +109,15 @@ def _validate_node_stats(stats): for expected in _expected_user_stats: assert expected in xs - for _, v in xs.items(): - # As far as I know, per-user statistics should *always* be - # integers - assert isinstance(v, int) + _validate_stats_dict(xs) # Test cumulative stats xs = stats["cumulative"] for expected in _expected_cumulative_stats: assert expected in xs + _validate_stats_dict(xs) + def test_stats_by_node(qdbd_secure_connection, secure_table): _ensure_stats(qdbd_secure_connection, secure_table) From bb84af1e30f5bcac9a6d7e6e67ea9080f99a699d Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Thu, 8 May 2025 10:46:02 +0700 Subject: [PATCH 20/22] This is not a tuple --- quasardb/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quasardb/stats.py b/quasardb/stats.py index 6e218035..8c3c2358 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -148,7 +148,7 @@ class Unit(Enum): BYTES = 32 # Time/duration units - EPOCH = (64,) + EPOCH = 64 NANOSECONDS = 65 MICROSECONDS = 66 MILLISECONDS = 67 From 7614f4adf507c82c7526c48c6e94c753c6107762 Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Thu, 8 May 2025 10:46:21 +0700 Subject: [PATCH 21/22] Unused import --- quasardb/stats.py | 1 - 1 file changed, 1 deletion(-) diff --git a/quasardb/stats.py b/quasardb/stats.py index 8c3c2358..c07dac53 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -1,5 +1,4 @@ import re -from time import sleep import quasardb import logging From 2e9783c8f7509be230e3553ec2a974e960971656 Mon Sep 17 00:00:00 2001 From: Leon Mergen Date: Thu, 8 May 2025 10:47:51 +0700 Subject: [PATCH 22/22] Formatting --- tests/test_stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_stats.py b/tests/test_stats.py index db798fc1..6657147c 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -95,7 +95,7 @@ def _validate_stats_dict(xs): # Everything that's not a NONE unit (i.e. not a label) should be an int if x["unit"] != qdbst.Unit.NONE: - assert isinstance(x['value'], int) + assert isinstance(x["value"], int) def _validate_node_stats(stats):