Skip to content

Commit 256ff3d

Browse files
WalBehamotl
authored andcommitted
Admin/XMover: Add module for active shard monitoring
1 parent 5068671 commit 256ff3d

File tree

9 files changed

+1687
-2
lines changed

9 files changed

+1687
-2
lines changed

cratedb_toolkit/admin/xmover/analysis/shard.py

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
from rich.table import Table
1414

1515
from cratedb_toolkit.admin.xmover.model import (
16+
ActiveShardActivity,
17+
ActiveShardSnapshot,
1618
DistributionStats,
1719
NodeInfo,
1820
ShardInfo,
@@ -947,3 +949,181 @@ def distribution(self, table: str = None):
947949
table_summary.add_row("Node Balance Score", f"{stats.node_balance_score:.1f}/100")
948950

949951
console.print(table_summary)
952+
953+
954+
class ActiveShardMonitor:
955+
"""Monitor active shard checkpoint progression over time"""
956+
957+
def __init__(self, client: CrateDBClient):
958+
self.client = client
959+
960+
def compare_snapshots(
961+
self,
962+
snapshot1: List[ActiveShardSnapshot],
963+
snapshot2: List[ActiveShardSnapshot],
964+
min_activity_threshold: int = 0,
965+
) -> List["ActiveShardActivity"]:
966+
"""Compare two snapshots and return activity data for shards present in both
967+
968+
Args:
969+
snapshot1: First snapshot (baseline)
970+
snapshot2: Second snapshot (comparison)
971+
min_activity_threshold: Minimum checkpoint delta to consider active (default: 0)
972+
"""
973+
974+
# Create lookup dict for snapshot1
975+
snapshot1_dict = {snap.shard_identifier: snap for snap in snapshot1}
976+
977+
activities = []
978+
979+
for snap2 in snapshot2:
980+
snap1 = snapshot1_dict.get(snap2.shard_identifier)
981+
if snap1:
982+
# Calculate local checkpoint delta
983+
local_checkpoint_delta = snap2.local_checkpoint - snap1.local_checkpoint
984+
time_diff = snap2.timestamp - snap1.timestamp
985+
986+
# Filter based on actual activity between snapshots
987+
if local_checkpoint_delta >= min_activity_threshold:
988+
activity = ActiveShardActivity(
989+
schema_name=snap2.schema_name,
990+
table_name=snap2.table_name,
991+
shard_id=snap2.shard_id,
992+
node_name=snap2.node_name,
993+
is_primary=snap2.is_primary,
994+
partition_ident=snap2.partition_ident,
995+
local_checkpoint_delta=local_checkpoint_delta,
996+
snapshot1=snap1,
997+
snapshot2=snap2,
998+
time_diff_seconds=time_diff,
999+
)
1000+
activities.append(activity)
1001+
1002+
# Sort by activity (highest checkpoint delta first)
1003+
activities.sort(key=lambda x: x.local_checkpoint_delta, reverse=True)
1004+
1005+
return activities
1006+
1007+
def format_activity_display(
1008+
self, activities: List["ActiveShardActivity"], show_count: int = 10, watch_mode: bool = False
1009+
) -> str:
1010+
"""Format activity data for console display"""
1011+
if not activities:
1012+
return "✅ No active shards with significant checkpoint progression found"
1013+
1014+
# Limit to requested count
1015+
activities = activities[:show_count]
1016+
1017+
# Calculate observation period for context
1018+
if activities:
1019+
observation_period = activities[0].time_diff_seconds
1020+
output = [
1021+
f"\n🔥 Most Active Shards ({len(activities)} shown, {observation_period:.0f}s observation period)"
1022+
]
1023+
else:
1024+
output = [f"\n🔥 Most Active Shards ({len(activities)} shown, sorted by checkpoint activity)"]
1025+
1026+
output.append("")
1027+
1028+
# Add activity rate context
1029+
if activities:
1030+
total_activity = sum(a.local_checkpoint_delta for a in activities)
1031+
avg_rate = sum(a.activity_rate for a in activities) / len(activities)
1032+
output.append(
1033+
f"[dim]Total checkpoint activity: {total_activity:,} changes, Average rate: {avg_rate:.1f}/sec[/dim]"
1034+
)
1035+
output.append("")
1036+
1037+
# Create table headers
1038+
headers = ["Rank", "Schema.Table", "Shard", "Partition", "Node", "Type", "Checkpoint Δ", "Rate/sec", "Trend"]
1039+
1040+
# Calculate column widths
1041+
col_widths = [len(h) for h in headers]
1042+
1043+
# Prepare rows
1044+
rows = []
1045+
for i, activity in enumerate(activities, 1):
1046+
# Format values
1047+
rank = str(i)
1048+
table_id = activity.table_identifier
1049+
shard_id = str(activity.shard_id)
1050+
partition = (
1051+
activity.partition_ident[:14] + "..."
1052+
if len(activity.partition_ident) > 14
1053+
else activity.partition_ident or "-"
1054+
)
1055+
node = activity.node_name
1056+
shard_type = "P" if activity.is_primary else "R"
1057+
checkpoint_delta = f"{activity.local_checkpoint_delta:,}"
1058+
rate = f"{activity.activity_rate:.1f}" if activity.activity_rate >= 0.1 else "<0.1"
1059+
1060+
# Calculate activity trend indicator
1061+
if activity.activity_rate >= 100:
1062+
trend = "🔥 HOT"
1063+
elif activity.activity_rate >= 50:
1064+
trend = "📈 HIGH"
1065+
elif activity.activity_rate >= 10:
1066+
trend = "📊 MED"
1067+
else:
1068+
trend = "📉 LOW"
1069+
1070+
row = [rank, table_id, shard_id, partition, node, shard_type, checkpoint_delta, rate, trend]
1071+
rows.append(row)
1072+
1073+
# Update column widths
1074+
for j, cell in enumerate(row):
1075+
col_widths[j] = max(col_widths[j], len(cell))
1076+
1077+
# Format table
1078+
header_row = " " + " | ".join(h.ljust(w) for h, w in zip(headers, col_widths))
1079+
output.append(header_row)
1080+
output.append(" " + "-" * (len(header_row) - 3))
1081+
1082+
# Data rows
1083+
for row in rows:
1084+
data_row = " " + " | ".join(cell.ljust(w) for cell, w in zip(row, col_widths))
1085+
output.append(data_row)
1086+
1087+
# Only show legend and insights in non-watch mode
1088+
if not watch_mode:
1089+
output.append("")
1090+
output.append("Legend:")
1091+
output.append(" • Checkpoint Δ: Write operations during observation period")
1092+
output.append(" • Rate/sec: Checkpoint changes per second")
1093+
output.append(" • Partition: partition_ident (truncated if >14 chars, '-' if none)")
1094+
output.append(" • Type: P=Primary, R=Replica")
1095+
output.append(" • Trend: 🔥 HOT (≥100/s), 📈 HIGH (≥50/s), 📊 MED (≥10/s), 📉 LOW (<10/s)")
1096+
1097+
# Add insights about activity patterns
1098+
if activities:
1099+
output.append("")
1100+
output.append("Insights:")
1101+
1102+
# Count by trend
1103+
hot_count = len([a for a in activities if a.activity_rate >= 100])
1104+
high_count = len([a for a in activities if 50 <= a.activity_rate < 100])
1105+
med_count = len([a for a in activities if 10 <= a.activity_rate < 50])
1106+
low_count = len([a for a in activities if a.activity_rate < 10])
1107+
1108+
if hot_count > 0:
1109+
output.append(f" • {hot_count} HOT shards (≥100 changes/sec) - consider load balancing")
1110+
if high_count > 0:
1111+
output.append(f" • {high_count} HIGH activity shards - monitor capacity")
1112+
if med_count > 0:
1113+
output.append(f" • {med_count} MEDIUM activity shards - normal operation")
1114+
if low_count > 0:
1115+
output.append(f" • {low_count} LOW activity shards - occasional writes")
1116+
1117+
# Identify patterns
1118+
primary_activities = [a for a in activities if a.is_primary]
1119+
if len(primary_activities) == len(activities):
1120+
output.append(" • All active shards are PRIMARY - normal write pattern")
1121+
elif len(primary_activities) < len(activities) * 0.5:
1122+
output.append(" • Many REPLICA shards active - possible recovery/replication activity")
1123+
1124+
# Node concentration
1125+
nodes = {a.node_name for a in activities}
1126+
if len(nodes) <= 2:
1127+
output.append(f" • Activity concentrated on {len(nodes)} node(s) - consider redistribution")
1128+
1129+
return "\n".join(output)

0 commit comments

Comments
 (0)