diff --git a/quasardb/stats.py b/quasardb/stats.py index a9e58c26..d47e6352 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -1,11 +1,13 @@ import re +from time import sleep + import quasardb import logging from datetime import datetime logger = logging.getLogger("quasardb.stats") - +MAX_KEYS = 4 * 1024 * 1024 # 4 million max keys stats_prefix = "$qdb.statistics." user_pattern = re.compile(r"\$qdb.statistics.(.*).uid_([0-9]+)$") total_pattern = re.compile(r"\$qdb.statistics.(.*)$") @@ -50,49 +52,26 @@ def of_node(dconn): """ start = datetime.now() - raw = {k: _get_stat(dconn, k) for k in dconn.prefix_get(stats_prefix, 10000)} + raw = {k: _get_stat(dconn, k) for k in _get_all_keys(dconn)} ret = {"by_uid": _by_uid(raw), "cumulative": _cumulative(raw)} check_duration = datetime.now() - start - ret["cumulative"]["check.online"] = 1 - ret["cumulative"]["check.duration_ms"] = int(check_duration.total_seconds() * 1000) + ret["cumulative"]["check.online"] = { + "value": 1, + "type": "accumulator", + "unit": "unit", + } + ret["cumulative"]["check.duration_ms"] = { + "value": int(check_duration.total_seconds() * 1000), + "type": "accumulator", + "unit": "milliseconds", + } return ret -_stat_types = { - "node_id": ("constant", None), - "operating_system": ("constant", None), - "partitions_count": ("constant", "count"), - "cpu.system": ("counter", "ns"), - "cpu.user": ("counter", "ns"), - "cpu.idle": ("counter", "ns"), - "startup": ("constant", None), - "startup_time": ("constant", None), - "shutdown_time": ("constant", None), - "network.current_users_count": ("gauge", "count"), - "hardware_concurrency": ("gauge", "count"), - "check.online": ("gauge", "count"), - "check.duration_ms": ("constant", "ms"), - "requests.bytes_in": ("counter", "bytes"), - "requests.bytes_out": ("counter", "bytes"), - "requests.errors_count": ("counter", "count"), - "requests.successes_count": ("counter", "count"), - "requests.total_count": ("counter", "count"), - "async_pipelines.merge.bucket_count": ("counter", "count"), - "async_pipelines.merge.duration_us": ("counter", "us"), - "async_pipelines.write.successes_count": ("counter", "count"), - "async_pipelines.write.failures_count": ("counter", "count"), - "async_pipelines.write.time_us": ("counter", "us"), - "async_pipelines.merge.max_bucket_count": ("gauge", "count"), - "async_pipelines.merge.max_depth_count": ("gauge", "count"), - "async_pipelines.merge.requests_count": ("counter", "count"), - "evicted.count": ("counter", "count"), - "pageins.count": ("counter", "count"), -} - async_pipeline_bytes_pattern = re.compile( r"async_pipelines.pipe_[0-9]+.merge_map.bytes" ) @@ -101,39 +80,6 @@ def of_node(dconn): ) -def _stat_type(stat_id): - if stat_id in _stat_types: - return _stat_types[stat_id] - elif stat_id.endswith("total_ns"): - return ("counter", "ns") - elif stat_id.endswith("total_bytes"): - return ("counter", "bytes") - elif stat_id.endswith("read_bytes"): - return ("counter", "bytes") - elif stat_id.endswith("written_bytes"): - return ("counter", "bytes") - elif stat_id.endswith("total_count"): - return ("counter", "count") - elif stat_id.startswith("network.sessions."): - return ("gauge", "count") - elif stat_id.startswith("memory."): - # memory statistics are all gauges i think, describes how much memory currently allocated where - return ("gauge", "bytes") - elif stat_id.startswith("persistence.") or stat_id.startswith("disk"): - # persistence are also all gauges, describes mostly how much is currently available/used on storage - return ("gauge", "bytes") - elif stat_id.startswith("license."): - return ("gauge", None) - elif stat_id.startswith("engine_"): - return ("constant", None) - elif async_pipeline_bytes_pattern.match(stat_id): - return ("gauge", "bytes") - elif async_pipeline_count_pattern.match(stat_id): - return ("gauge", "count") - else: - return None - - def stat_type(stat_id): """ Returns the statistic type by a stat id. Returns one of: @@ -144,35 +90,65 @@ def stat_type(stat_id): This is useful for determining which value should be reported in a dashboard. """ - return _stat_type(stat_id) + raise Exception("Deprecated Method.") + +def _get_all_keys(dconn, n=1024): + """ + Returns all keys from a single node. -def _calculate_delta_stat(stat_id, prev, cur): + Parameters: + dconn: quasardb.Node + Direct node connection to the node we wish to connect to. + + n: int + Number of keys to retrieve. + """ + xs = None + increase_rate = 8 + # keep getting keys while number of results exceeds the given "n" + while xs is None or len(xs) >= n: + if xs is not None: + n = n * increase_rate + if n >= MAX_KEYS: + raise Exception(f"ERROR: Cannot fetch more than {MAX_KEYS} keys.") + xs = dconn.prefix_get(stats_prefix, n) + + return xs + + +def _calculate_delta_stat(stat_id, prev_stat, curr_stat): logger.info( "calculating delta for stat_id = {}, prev = {}. cur = {}".format( - stat_id, prev, cur + stat_id, str(prev_stat["value"]), str(prev_stat["value"]) ) ) - - stat_type = _stat_type(stat_id) - if stat_type == "counter": - return cur - prev - elif stat_type == "gauge": - return cur + if curr_stat["type"] in ["counter", "accumulator"]: + return {**curr_stat, "value": curr_stat["value"] - prev_stat["value"]} + elif curr_stat["type"] == "gauge": + return {**curr_stat} else: return None def _calculate_delta_stats(prev_stats, cur_stats): + """ + Args: + prev_stats: previous dictionary of cumulative stats + cur_stats: current dictionary of cumulative stats + + Returns: + a new dictionary with same structure and updated values + """ ret = {} for stat_id in cur_stats.keys(): try: - prev_stat = cur_stats[stat_id] + prev_stat = prev_stats[stat_id] cur_stat = cur_stats[stat_id] - value = _calculate_delta_stat(stat_id, prev_stat, cur_stat) - if value is not None: - ret[stat_id] = value + stat_content = _calculate_delta_stat(stat_id, prev_stat, cur_stat) + if stat_content is not None: + ret[stat_id] = stat_content except KeyError: # Stat likely was not present yet in prev_stats @@ -187,7 +163,8 @@ def calculate_delta(prev, cur): """ ret = {} for node_id in cur.keys(): - ret[node_id] = _calculate_delta_stats( + ret[node_id] = {"by_uid": {}, "cumulative": {}} + ret[node_id]["cumulative"] = _calculate_delta_stats( prev[node_id]["cumulative"], cur[node_id]["cumulative"] ) @@ -239,7 +216,10 @@ def _cumulative(stats): matches = total_pattern.match(k) if is_cumulative_stat(k) and matches: metric = matches.groups()[0] + if metric.split(".")[-1] not in ["type", "unit"]: + metric += ".value" + metric_name, metric_att = metric.rsplit(".", 1) if not metric.startswith("serialized"): - xs[metric] = v + xs[metric_name] = {**xs.get(metric_name, {}), metric_att: v} return xs