Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 34 additions & 27 deletions memorystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,26 @@ def _pick(labels: Dict[str, str], keys) -> Optional[str]:
return None


def _resolve_inst_key(rlabels: dict, project_id: str = "") -> str:
"""Build a globally unique instance key.

Redis standalone instance_id is a full GCP path (projects/…/instances/name)
which is already globally unique. Valkey and Redis Cluster return short names
via instance_id or cluster_id, so we prefix with project_id to avoid
collisions across projects.
"""
raw_id = (
rlabels.get("instance_id")
or rlabels.get("cluster_id")
or rlabels.get("resource_name")
or "unknown"
)
# Full GCP path (starts with "projects/") is already globally unique
if raw_id.startswith("projects/"):
return raw_id
return f"{project_id}/{raw_id}" if project_id else raw_id


def _point_value(point, default=0):
"""Extract a numeric value from a GCP monitoring point, handling both int64 and double types."""
try:
Expand Down Expand Up @@ -163,12 +183,7 @@ def _accumulate_commands(results, table, product_name: str, project_id: str):
mlabels = dict(ts.metric.labels)

# Identify instance/cluster id & node
inst_key = (
rlabels.get("instance_id")
or rlabels.get("cluster_id")
or rlabels.get("resource_name")
or "unknown"
)
inst_key = _resolve_inst_key(rlabels, project_id)
node_id = rlabels.get("node_id") or rlabels.get("shard_id") or "unknown"
entry = _ensure_node_entry(table, inst_key, node_id)

Expand Down Expand Up @@ -229,15 +244,10 @@ def _apply_processed_categories(table):
del entry["points"]


def _attach_memory_usage(results, table, key_name="BytesUsedForCache"):
def _attach_memory_usage(results, table, project_id="", key_name="BytesUsedForCache"):
for ts in results:
rlabels = dict(ts.resource.labels)
inst_key = (
rlabels.get("instance_id")
or rlabels.get("cluster_id")
or rlabels.get("resource_name")
or "unknown"
)
inst_key = _resolve_inst_key(rlabels, project_id)
node_id = rlabels.get("node_id") or rlabels.get("shard_id") or "unknown"
if inst_key not in table or node_id not in table[inst_key]:
_ensure_node_entry(table, inst_key, node_id)
Expand All @@ -252,17 +262,12 @@ def _attach_memory_usage(results, table, key_name="BytesUsedForCache"):
entry[key_name] = max(prev, maxv)


def _attach_capacity_scalar(results, table, key_name="MaxMemory"):
def _attach_capacity_scalar(results, table, project_id="", key_name="MaxMemory"):
"""Attach a capacity scalar (e.g., memory size); applies to all nodes within the instance/cluster."""
cap_by_inst = defaultdict(int)
for ts in results:
rlabels = dict(ts.resource.labels)
inst_key = (
rlabels.get("instance_id")
or rlabels.get("cluster_id")
or rlabels.get("resource_name")
or "unknown"
)
inst_key = _resolve_inst_key(rlabels, project_id)
v_max = 0
for point in ts.points:
v = int(_point_value(point))
Expand All @@ -277,7 +282,7 @@ def _attach_capacity_scalar(results, table, key_name="MaxMemory"):
nodes[node_id][key_name] = cap_by_inst[inst_key]


def _attach_node_role(results, table):
def _attach_node_role(results, table, project_id=""):
"""Set NodeRole using the dedicated replication/role metric.

The 'role' label on commands/calls is metadata — not its purpose to report
Expand All @@ -289,8 +294,8 @@ def _attach_node_role(results, table):
"""
for ts in results:
rlabels = dict(ts.resource.labels)
inst_key = rlabels.get("instance_id") or "unknown"
node_id = rlabels.get("node_id") or "unknown"
inst_key = _resolve_inst_key(rlabels, project_id)
node_id = rlabels.get("node_id") or rlabels.get("shard_id") or "unknown"
if inst_key not in table or node_id not in table[inst_key]:
continue

Expand Down Expand Up @@ -350,7 +355,7 @@ def collect_for_product(
)
except Exception:
mem_results = []
_attach_memory_usage(mem_results, table)
_attach_memory_usage(mem_results, table, project_id=project_id)
for inst_key, nodes in table.items():
for node_id, entry in nodes.items():
entry["InstanceType"] = instance_type_label
Expand All @@ -361,14 +366,16 @@ def collect_for_product(
mem_results = _list_ts(
client, project_name, metric_map["memory_usage"], interval
)
_attach_memory_usage(mem_results, table)
_attach_memory_usage(mem_results, table, project_id=project_id)
except Exception:
pass

# Capacity (MaxMemory) - instance/cluster level
try:
cap_results = _list_ts(client, project_name, metric_map["max_memory"], interval)
_attach_capacity_scalar(cap_results, table, key_name="MaxMemory")
_attach_capacity_scalar(
cap_results, table, project_id=project_id, key_name="MaxMemory"
)
except Exception:
pass

Expand All @@ -378,7 +385,7 @@ def collect_for_product(
role_results = _list_ts(
client, project_name, metric_map["replication_role"], interval
)
_attach_node_role(role_results, table)
_attach_node_role(role_results, table, project_id=project_id)
except Exception:
pass

Expand Down
88 changes: 88 additions & 0 deletions test_msstats.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
create_workbooks,
)

from memorystore import (
_resolve_inst_key,
_attach_memory_usage,
_attach_capacity_scalar,
)


class TestMSSStats(unittest.TestCase):
"""Test suite for msstats utility functions"""
Expand Down Expand Up @@ -469,5 +475,87 @@ def test_missing_service_account_file(self):
self.assertIsNone(result)


class TestMemorystore(unittest.TestCase):
"""Test suite for memorystore.py functions"""

def test_resolve_inst_key_redis_standalone_full_path(self):
"""Redis standalone returns full GCP path via instance_id — should be used as-is"""
rlabels = {
"instance_id": "projects/my-project/locations/us-central1/instances/my-redis",
"node_id": "node-0",
}
result = _resolve_inst_key(rlabels, "my-project")
self.assertEqual(
result,
"projects/my-project/locations/us-central1/instances/my-redis",
)

def test_resolve_inst_key_valkey_short_name_prefixed(self):
"""Valkey returns short name via instance_id — should be prefixed with project_id"""
rlabels = {"instance_id": "memorystore-valkey", "node_id": "abc123"}
result = _resolve_inst_key(rlabels, "my-project")
self.assertEqual(result, "my-project/memorystore-valkey")

def test_resolve_inst_key_redis_cluster_short_name_prefixed(self):
"""Redis Cluster returns short name via cluster_id — should be prefixed"""
rlabels = {"cluster_id": "memorystore-redis-cluster", "shard_id": "xyz789"}
result = _resolve_inst_key(rlabels, "my-project")
self.assertEqual(result, "my-project/memorystore-redis-cluster")

def test_resolve_inst_key_fallback_to_unknown(self):
"""When no identifiers are present, return 'unknown'"""
rlabels = {"region": "us-central1"}
result = _resolve_inst_key(rlabels, "my-project")
self.assertEqual(result, "my-project/unknown")

def test_resolve_inst_key_no_collision_across_projects(self):
"""Same short name in different projects produces different keys"""
rlabels = {"instance_id": "memorystore-valkey"}
key_a = _resolve_inst_key(rlabels, "project-a")
key_b = _resolve_inst_key(rlabels, "project-b")
self.assertNotEqual(key_a, key_b)
self.assertEqual(key_a, "project-a/memorystore-valkey")
self.assertEqual(key_b, "project-b/memorystore-valkey")

def test_attach_memory_usage_uses_resolve_inst_key(self):
"""_attach_memory_usage should create entries with project-prefixed keys for short names"""
table = {}
mock_ts = MagicMock()
mock_ts.resource.labels = {
"instance_id": "memorystore-valkey",
"node_id": "abc123",
}
mock_point = MagicMock()
mock_point.value.int64_value = 5000000
mock_point.value.double_value = 0
mock_ts.points = [mock_point]

_attach_memory_usage([mock_ts], table, project_id="my-project")

self.assertIn("my-project/memorystore-valkey", table)
self.assertNotIn("memorystore-valkey", table)

def test_attach_capacity_scalar_uses_resolve_inst_key(self):
"""_attach_capacity_scalar should match entries using project-prefixed keys"""
table = {"my-project/memorystore-valkey": {"abc123": {"MaxMemory": 0}}}
mock_ts = MagicMock()
mock_ts.resource.labels = {
"instance_id": "memorystore-valkey",
"node_id": "abc123",
}
mock_point = MagicMock()
mock_point.value.int64_value = 10000000
mock_point.value.double_value = 0
mock_ts.points = [mock_point]

_attach_capacity_scalar(
[mock_ts], table, project_id="my-project", key_name="MaxMemory"
)

self.assertEqual(
table["my-project/memorystore-valkey"]["abc123"]["MaxMemory"], 10000000
)


if __name__ == "__main__":
unittest.main()
Loading