Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 63 additions & 83 deletions quasardb/stats.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import re
from time import sleep

import quasardb
import logging
from datetime import datetime

logger = logging.getLogger("quasardb.stats")


MAX_KEYS = 4 * 1024 * 1024 # 4 million max keys
stats_prefix = "$qdb.statistics."
user_pattern = re.compile(r"\$qdb.statistics.(.*).uid_([0-9]+)$")
total_pattern = re.compile(r"\$qdb.statistics.(.*)$")
Expand Down Expand Up @@ -50,49 +52,26 @@ def of_node(dconn):
"""

start = datetime.now()
raw = {k: _get_stat(dconn, k) for k in dconn.prefix_get(stats_prefix, 10000)}
raw = {k: _get_stat(dconn, k) for k in _get_all_keys(dconn)}

ret = {"by_uid": _by_uid(raw), "cumulative": _cumulative(raw)}

check_duration = datetime.now() - start

ret["cumulative"]["check.online"] = 1
ret["cumulative"]["check.duration_ms"] = int(check_duration.total_seconds() * 1000)
ret["cumulative"]["check.online"] = {
"value": 1,
"type": "accumulator",
"unit": "unit",
}
ret["cumulative"]["check.duration_ms"] = {
"value": int(check_duration.total_seconds() * 1000),
"type": "accumulator",
"unit": "milliseconds",
}

return ret


_stat_types = {
"node_id": ("constant", None),
"operating_system": ("constant", None),
"partitions_count": ("constant", "count"),
"cpu.system": ("counter", "ns"),
"cpu.user": ("counter", "ns"),
"cpu.idle": ("counter", "ns"),
"startup": ("constant", None),
"startup_time": ("constant", None),
"shutdown_time": ("constant", None),
"network.current_users_count": ("gauge", "count"),
"hardware_concurrency": ("gauge", "count"),
"check.online": ("gauge", "count"),
"check.duration_ms": ("constant", "ms"),
"requests.bytes_in": ("counter", "bytes"),
"requests.bytes_out": ("counter", "bytes"),
"requests.errors_count": ("counter", "count"),
"requests.successes_count": ("counter", "count"),
"requests.total_count": ("counter", "count"),
"async_pipelines.merge.bucket_count": ("counter", "count"),
"async_pipelines.merge.duration_us": ("counter", "us"),
"async_pipelines.write.successes_count": ("counter", "count"),
"async_pipelines.write.failures_count": ("counter", "count"),
"async_pipelines.write.time_us": ("counter", "us"),
"async_pipelines.merge.max_bucket_count": ("gauge", "count"),
"async_pipelines.merge.max_depth_count": ("gauge", "count"),
"async_pipelines.merge.requests_count": ("counter", "count"),
"evicted.count": ("counter", "count"),
"pageins.count": ("counter", "count"),
}

async_pipeline_bytes_pattern = re.compile(
r"async_pipelines.pipe_[0-9]+.merge_map.bytes"
)
Expand All @@ -101,39 +80,6 @@ def of_node(dconn):
)


def _stat_type(stat_id):
if stat_id in _stat_types:
return _stat_types[stat_id]
elif stat_id.endswith("total_ns"):
return ("counter", "ns")
elif stat_id.endswith("total_bytes"):
return ("counter", "bytes")
elif stat_id.endswith("read_bytes"):
return ("counter", "bytes")
elif stat_id.endswith("written_bytes"):
return ("counter", "bytes")
elif stat_id.endswith("total_count"):
return ("counter", "count")
elif stat_id.startswith("network.sessions."):
return ("gauge", "count")
elif stat_id.startswith("memory."):
# memory statistics are all gauges i think, describes how much memory currently allocated where
return ("gauge", "bytes")
elif stat_id.startswith("persistence.") or stat_id.startswith("disk"):
# persistence are also all gauges, describes mostly how much is currently available/used on storage
return ("gauge", "bytes")
elif stat_id.startswith("license."):
return ("gauge", None)
elif stat_id.startswith("engine_"):
return ("constant", None)
elif async_pipeline_bytes_pattern.match(stat_id):
return ("gauge", "bytes")
elif async_pipeline_count_pattern.match(stat_id):
return ("gauge", "count")
else:
return None


def stat_type(stat_id):
"""
Returns the statistic type by a stat id. Returns one of:
Expand All @@ -144,35 +90,65 @@ def stat_type(stat_id):

This is useful for determining which value should be reported in a dashboard.
"""
return _stat_type(stat_id)
raise Exception("Deprecated Method.")


def _get_all_keys(dconn, n=1024):
"""
Returns all keys from a single node.

def _calculate_delta_stat(stat_id, prev, cur):
Parameters:
dconn: quasardb.Node
Direct node connection to the node we wish to connect to.

n: int
Number of keys to retrieve.
"""
xs = None
increase_rate = 8
# keep getting keys while number of results exceeds the given "n"
while xs is None or len(xs) >= n:
if xs is not None:
n = n * increase_rate
if n >= MAX_KEYS:
raise Exception(f"ERROR: Cannot fetch more than {MAX_KEYS} keys.")
xs = dconn.prefix_get(stats_prefix, n)

return xs


def _calculate_delta_stat(stat_id, prev_stat, curr_stat):
logger.info(
"calculating delta for stat_id = {}, prev = {}. cur = {}".format(
stat_id, prev, cur
stat_id, str(prev_stat["value"]), str(prev_stat["value"])
)
)

stat_type = _stat_type(stat_id)
if stat_type == "counter":
return cur - prev
elif stat_type == "gauge":
return cur
if curr_stat["type"] in ["counter", "accumulator"]:
return {**curr_stat, "value": curr_stat["value"] - prev_stat["value"]}
elif curr_stat["type"] == "gauge":
return {**curr_stat}
else:
return None


def _calculate_delta_stats(prev_stats, cur_stats):
"""
Args:
prev_stats: previous dictionary of cumulative stats
cur_stats: current dictionary of cumulative stats

Returns:
a new dictionary with same structure and updated values
"""
ret = {}
for stat_id in cur_stats.keys():
try:
prev_stat = cur_stats[stat_id]
prev_stat = prev_stats[stat_id]
cur_stat = cur_stats[stat_id]

value = _calculate_delta_stat(stat_id, prev_stat, cur_stat)
if value is not None:
ret[stat_id] = value
stat_content = _calculate_delta_stat(stat_id, prev_stat, cur_stat)
if stat_content is not None:
ret[stat_id] = stat_content

except KeyError:
# Stat likely was not present yet in prev_stats
Expand All @@ -187,7 +163,8 @@ def calculate_delta(prev, cur):
"""
ret = {}
for node_id in cur.keys():
ret[node_id] = _calculate_delta_stats(
ret[node_id] = {"by_uid": {}, "cumulative": {}}
ret[node_id]["cumulative"] = _calculate_delta_stats(
prev[node_id]["cumulative"], cur[node_id]["cumulative"]
)

Expand Down Expand Up @@ -239,7 +216,10 @@ def _cumulative(stats):
matches = total_pattern.match(k)
if is_cumulative_stat(k) and matches:
metric = matches.groups()[0]
if metric.split(".")[-1] not in ["type", "unit"]:
metric += ".value"
metric_name, metric_att = metric.rsplit(".", 1)
if not metric.startswith("serialized"):
xs[metric] = v
xs[metric_name] = {**xs.get(metric_name, {}), metric_att: v}

return xs