Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- Settings: Stop flagging `gateway.recover_after_time` as a difference
when both `gateway.expected_nodes` and `gateway.expected_data_nodes` are
unset (`-1`).
- Admin: Added XMover - CrateDB shard analyzer and movement tool. Thanks, @WalBeh.

## 2025/08/19 v0.0.41
- I/O: Updated to `influxio-0.6.0`. Thanks, @ZillKhan.
Expand Down
Empty file.
10 changes: 10 additions & 0 deletions cratedb_toolkit/admin/xmover/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
XMover - CrateDB Shard Analyzer and Movement Tool

A tool for analyzing CrateDB shard distribution across nodes and availability zones,
and generating safe SQL commands for shard rebalancing.
"""

__version__ = "0.1.0"
__author__ = "CrateDB Tools"
__description__ = "CrateDB shard analyzer and movement tool"
Empty file.
1,129 changes: 1,129 additions & 0 deletions cratedb_toolkit/admin/xmover/analysis/shard.py

Large diffs are not rendered by default.

794 changes: 794 additions & 0 deletions cratedb_toolkit/admin/xmover/analysis/table.py

Large diffs are not rendered by default.

160 changes: 160 additions & 0 deletions cratedb_toolkit/admin/xmover/analysis/zone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from typing import Dict, List, Optional

from rich import box
from rich.console import Console
from rich.panel import Panel
from rich.table import Table

from cratedb_toolkit.admin.xmover.analysis.shard import ShardAnalyzer
from cratedb_toolkit.admin.xmover.model import ShardInfo
from cratedb_toolkit.admin.xmover.util.database import CrateDBClient

console = Console()


class ZoneReport:
def __init__(self, client: CrateDBClient):
self.client = client
self.analyzer = ShardAnalyzer(self.client)

def shard_balance(self, tolerance: float, table: Optional[str] = None):
"""Check zone balance for shards"""
console.print(Panel.fit("[bold blue]Zone Balance Check[/bold blue]"))
console.print("[dim]Note: Analyzing all shards regardless of state for complete cluster view[/dim]")
console.print()

zone_stats = self.analyzer.check_zone_balance(table, tolerance)

if not zone_stats:
console.print("[yellow]No shards found for analysis[/yellow]")
return

# Calculate totals and targets
total_shards = sum(stats["TOTAL"] for stats in zone_stats.values())
zones = list(zone_stats.keys())
target_per_zone = total_shards // len(zones) if zones else 0
tolerance_range = (target_per_zone * (1 - tolerance / 100), target_per_zone * (1 + tolerance / 100))

balance_table = Table(title=f"Zone Balance Analysis (Target: {target_per_zone} ±{tolerance}%)", box=box.ROUNDED)
balance_table.add_column("Zone", style="cyan")
balance_table.add_column("Primary", justify="right", style="blue")
balance_table.add_column("Replica", justify="right", style="green")
balance_table.add_column("Total", justify="right", style="magenta")
balance_table.add_column("Status", style="bold")

for zone, stats in zone_stats.items():
total = stats["TOTAL"]

if tolerance_range[0] <= total <= tolerance_range[1]:
status = "[green]✓ Balanced[/green]"
elif total < tolerance_range[0]:
status = f"[yellow]⚠ Under ({total - target_per_zone:+})[/yellow]"
else:
status = f"[red]⚠ Over ({total - target_per_zone:+})[/red]"

balance_table.add_row(zone, str(stats["PRIMARY"]), str(stats["REPLICA"]), str(total), status)

console.print(balance_table)

def distribution_conflicts(self, shard_details: bool = False, table: Optional[str] = None):
"""Detailed analysis of zone distribution and potential conflicts"""
console.print(Panel.fit("[bold blue]Detailed Zone Analysis[/bold blue]"))
console.print("[dim]Comprehensive zone distribution analysis for CrateDB cluster[/dim]")
console.print()

# Get all shards for analysis
shards = self.client.get_shards_info(table_name=table, for_analysis=True)

if not shards:
console.print("[yellow]No shards found for analysis[/yellow]")
return

# Organize by table and shard
tables: Dict[str, Dict[int, List[ShardInfo]]] = {}
for shard in shards:
table_key = f"{shard.schema_name}.{shard.table_name}"
if table_key not in tables:
tables[table_key] = {}

shard_key = shard.shard_id
if shard_key not in tables[table_key]:
tables[table_key][shard_key] = []

tables[table_key][shard_key].append(shard)

# Analyze each table
zone_conflicts = 0
under_replicated = 0

for table_name, table_shards in tables.items():
console.print(f"\n[bold cyan]Table: {table_name}[/bold cyan]")

# Create analysis table
analysis_table = Table(title=f"Shard Distribution for {table_name}", box=box.ROUNDED)
analysis_table.add_column("Shard ID", justify="right", style="magenta")
analysis_table.add_column("Primary Zone", style="blue")
analysis_table.add_column("Replica Zones", style="green")
analysis_table.add_column("Total Copies", justify="right", style="cyan")
analysis_table.add_column("Status", style="bold")

for shard_id, shard_copies in sorted(table_shards.items()):
primary_zone = "Unknown"
replica_zones = set()
total_copies = len(shard_copies)
zones_with_copies = set()

for shard_copy in shard_copies:
zones_with_copies.add(shard_copy.zone)
if shard_copy.is_primary:
primary_zone = shard_copy.zone
else:
replica_zones.add(shard_copy.zone)

# Determine status
status_parts = []
if len(zones_with_copies) == 1:
zone_conflicts += 1
status_parts.append("[red]⚠ ZONE CONFLICT[/red]")

if total_copies < 2: # Assuming we want at least 1 replica
under_replicated += 1
status_parts.append("[yellow]⚠ Under-replicated[/yellow]")

if not status_parts:
status_parts.append("[green]✓ Good[/green]")

replica_zones_str = ", ".join(sorted(replica_zones)) if replica_zones else "None"

analysis_table.add_row(
str(shard_id), primary_zone, replica_zones_str, str(total_copies), " ".join(status_parts)
)

# Show individual shard details if requested
if shard_details:
for shard_copy in shard_copies:
health_indicator = "✓" if shard_copy.routing_state == "STARTED" else "⚠"
console.print(
f" {health_indicator} {shard_copy.shard_type} "
f"on {shard_copy.node_name} ({shard_copy.zone}) - "
f"{shard_copy.state}/{shard_copy.routing_state}"
)

console.print(analysis_table)

# Summary
console.print("\n[bold]Zone Analysis Summary:[/bold]")
console.print(f" • Tables analyzed: [cyan]{len(tables)}[/cyan]")
console.print(f" • Zone conflicts detected: [red]{zone_conflicts}[/red]")
console.print(f" • Under-replicated shards: [yellow]{under_replicated}[/yellow]")

if zone_conflicts > 0:
console.print(f"\n[red]⚠ Found {zone_conflicts} zone conflicts that need attention![/red]")
console.print("[dim]Zone conflicts occur when all copies of a shard are in the same zone.[/dim]")
console.print("[dim]This violates CrateDB's zone-awareness and creates availability risks.[/dim]")

if under_replicated > 0:
console.print(f"\n[yellow]⚠ Found {under_replicated} under-replicated shards.[/yellow]")
console.print("[dim]Consider increasing replication for better availability.[/dim]")

if zone_conflicts == 0 and under_replicated == 0:
console.print("\n[green]✓ No critical zone distribution issues detected![/green]")
118 changes: 118 additions & 0 deletions cratedb_toolkit/admin/xmover/attic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# ruff: noqa

# @main.command()
# @click.argument('node_name')
# @click.option('--min-free-space', default=100.0, help='Minimum free space required on target nodes in GB (default: 100)')
# @click.option('--dry-run/--execute', default=True, help='Show decommission plan without generating SQL commands (default: True)')
# @click.pass_context
# def decommission(ctx, node_name: str, min_free_space: float, dry_run: bool):
# """Plan decommissioning of a node by analyzing required shard moves
#
# NODE_NAME: Name of the node to decommission
# """
# client = ctx.obj['client']
# analyzer = ShardAnalyzer(client)
#
# mode_text = "PLANNING MODE" if dry_run else "EXECUTION MODE"
# console.print(Panel.fit(f"[bold blue]Node Decommission Analysis[/bold blue] - [bold {'green' if dry_run else 'red'}]{mode_text}[/bold {'green' if dry_run else 'red'}]"))
# console.print(f"[dim]Analyzing decommission plan for node: {node_name}[/dim]")
# console.print()
#
# # Generate decommission plan
# plan = analyzer.plan_node_decommission(node_name, min_free_space)
#
# if 'error' in plan:
# console.print(f"[red]Error: {plan['error']}[/red]")
# return
#
# # Display plan summary
# summary_table = Table(title=f"Decommission Plan for {node_name}", box=box.ROUNDED)
# summary_table.add_column("Metric", style="cyan")
# summary_table.add_column("Value", style="magenta")
#
# summary_table.add_row("Node", plan['node'])
# summary_table.add_row("Zone", plan['zone'])
# summary_table.add_row("Feasible", "[green]✓ Yes[/green]" if plan['feasible'] else "[red]✗ No[/red]")
# summary_table.add_row("Shards to Move", str(plan['shards_to_move']))
# summary_table.add_row("Moveable Shards", str(plan['moveable_shards']))
# summary_table.add_row("Total Data Size", format_size(plan['total_size_gb']))
# summary_table.add_row("Estimated Time", f"{plan['estimated_time_hours']:.1f} hours")
#
# console.print(summary_table)
# console.print()
#
# # Show warnings if any
# if plan['warnings']:
# console.print("[bold yellow]⚠ Warnings:[/bold yellow]")
# for warning in plan['warnings']:
# console.print(f" • [yellow]{warning}[/yellow]")
# console.print()
#
# # Show infeasible moves if any
# if plan['infeasible_moves']:
# console.print("[bold red]✗ Cannot Move:[/bold red]")
# infeasible_table = Table(box=box.ROUNDED)
# infeasible_table.add_column("Shard", style="cyan")
# infeasible_table.add_column("Size", style="magenta")
# infeasible_table.add_column("Reason", style="red")
#
# for move in plan['infeasible_moves']:
# infeasible_table.add_row(
# move['shard'],
# format_size(move['size_gb']),
# move['reason']
# )
# console.print(infeasible_table)
# console.print()
#
# # Show move recommendations
# if plan['recommendations']:
# move_table = Table(title="Required Shard Moves", box=box.ROUNDED)
# move_table.add_column("Table", style="cyan")
# move_table.add_column("Shard", justify="right", style="magenta")
# move_table.add_column("Type", style="blue")
# move_table.add_column("Size", style="green")
# move_table.add_column("From Zone", style="yellow")
# move_table.add_column("To Node", style="cyan")
# move_table.add_column("To Zone", style="yellow")
#
# for rec in plan['recommendations']:
# move_table.add_row(
# f"{rec.schema_name}.{rec.table_name}",
# str(rec.shard_id),
# rec.shard_type,
# format_size(rec.size_gb),
# rec.from_zone,
# rec.to_node,
# rec.to_zone
# )
#
# console.print(move_table)
# console.print()
#
# # Generate SQL commands if not in dry-run mode
# if not dry_run and plan['feasible']:
# console.print(Panel.fit("[bold green]Decommission SQL Commands[/bold green]"))
# console.print("[dim]# Execute these commands in order to prepare for node decommission[/dim]")
# console.print("[dim]# ALWAYS test in a non-production environment first![/dim]")
# console.print("[dim]# Monitor shard health after each move before proceeding[/dim]")
# console.print()
#
# for i, rec in enumerate(plan['recommendations'], 1):
# console.print(f"-- Move {i}: {rec.reason}")
# console.print(f"{rec.to_sql()}")
# console.print()
#
# console.print(f"-- After all moves complete, the node {node_name} can be safely removed")
# console.print(f"-- Total moves required: {len(plan['recommendations'])}")
# elif dry_run:
# console.print("[green]✓ Decommission plan ready. Use --execute to generate SQL commands.[/green]")
#
# # Final status
# if not plan['feasible']:
# console.print(f"[red]⚠ Node {node_name} cannot be safely decommissioned at this time.[/red]")
# console.print("[dim]Address the issues above before attempting decommission.[/dim]")
# elif plan['shards_to_move'] == 0:
# console.print(f"[green]✓ Node {node_name} is ready for immediate decommission (no shards to move).[/green]")
# else:
# console.print(f"[green]✓ Node {node_name} can be safely decommissioned after moving {len(plan['recommendations'])} shards.[/green]")
Loading
Loading