diff --git a/CHANGES.md b/CHANGES.md index 3f59ef75..509c7742 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,7 @@ - Settings: Stop flagging `gateway.recover_after_time` as a difference when both `gateway.expected_nodes` and `gateway.expected_data_nodes` are unset (`-1`). +- Admin: Added XMover - CrateDB shard analyzer and movement tool. Thanks, @WalBeh. ## 2025/08/19 v0.0.41 - I/O: Updated to `influxio-0.6.0`. Thanks, @ZillKhan. diff --git a/cratedb_toolkit/admin/__init__.py b/cratedb_toolkit/admin/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/admin/xmover/__init__.py b/cratedb_toolkit/admin/xmover/__init__.py new file mode 100644 index 00000000..92e9ee84 --- /dev/null +++ b/cratedb_toolkit/admin/xmover/__init__.py @@ -0,0 +1,10 @@ +""" +XMover - CrateDB Shard Analyzer and Movement Tool + +A tool for analyzing CrateDB shard distribution across nodes and availability zones, +and generating safe SQL commands for shard rebalancing. +""" + +__version__ = "0.1.0" +__author__ = "CrateDB Tools" +__description__ = "CrateDB shard analyzer and movement tool" diff --git a/cratedb_toolkit/admin/xmover/analysis/__init__.py b/cratedb_toolkit/admin/xmover/analysis/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/admin/xmover/analysis/shard.py b/cratedb_toolkit/admin/xmover/analysis/shard.py new file mode 100644 index 00000000..f6f24b6b --- /dev/null +++ b/cratedb_toolkit/admin/xmover/analysis/shard.py @@ -0,0 +1,949 @@ +""" +Shard analysis and rebalancing logic for CrateDB +""" + +import logging +import math +from collections import defaultdict +from typing import Any, Dict, List, Optional, Set, Tuple, Union + +from rich import box +from rich.console import Console +from rich.panel import Panel +from rich.table import Table + +from cratedb_toolkit.admin.xmover.model import ( + DistributionStats, + NodeInfo, + ShardInfo, + ShardRelocationConstraints, + ShardRelocationResponse, +) +from cratedb_toolkit.admin.xmover.util.database import CrateDBClient +from cratedb_toolkit.admin.xmover.util.format import format_percentage, format_size + +logger = logging.getLogger(__name__) + +console = Console() + + +class ShardAnalyzer: + """Analyzer for CrateDB shard distribution and rebalancing""" + + def __init__(self, client: CrateDBClient): + self.client = client + self.nodes: List[NodeInfo] = [] + self.shards: List[ShardInfo] = [] + + # Initialize session-based caches for performance. + self._zone_conflict_cache: Dict[Tuple[str, int, str], Union[str, None]] = {} + self._node_lookup_cache: Dict[str, Union[NodeInfo, None]] = {} + self._target_nodes_cache: Dict[Tuple[float, frozenset[Any], float, float], List[NodeInfo]] = {} + self._cache_hits = 0 + self._cache_misses = 0 + + self._refresh_data() + + def _refresh_data(self): + """Refresh node and shard data from the database""" + self.nodes = self.client.get_nodes_info() + # For analysis, get all shards regardless of state + self.shards = self.client.get_shards_info(for_analysis=True) + + def analyze_distribution(self, table_name: Optional[str] = None) -> DistributionStats: + """Analyze the current shard distribution""" + # Filter shards by table if specified + shards = self.shards + if table_name: + shards = [s for s in shards if s.table_name == table_name] + + if not shards: + return DistributionStats(0, 0.0, {}, {}, 100.0, 100.0) + + total_shards = len(shards) + total_size_gb = sum(s.size_gb for s in shards) + + # Count by zone and node + zone_counts: Dict[str, int] = defaultdict(int) + node_counts: Dict[str, int] = defaultdict(int) + + for shard in shards: + zone_counts[shard.zone] += 1 + node_counts[shard.node_name] += 1 + + # Calculate balance scores + zone_balance_score = self._calculate_balance_score(list(zone_counts.values())) + node_balance_score = self._calculate_balance_score(list(node_counts.values())) + + return DistributionStats( + total_shards=total_shards, + total_size_gb=total_size_gb, + zones=dict(zone_counts), + nodes=dict(node_counts), + zone_balance_score=zone_balance_score, + node_balance_score=node_balance_score, + ) + + def _calculate_balance_score(self, counts: List[int]) -> float: + """Calculate a balance score (0-100) for a distribution""" + if not counts or len(counts) <= 1: + return 100.0 + + mean_count = sum(counts) / len(counts) + if mean_count == 0: + return 100.0 + + # Calculate coefficient of variation + variance = sum((count - mean_count) ** 2 for count in counts) / len(counts) + std_dev = math.sqrt(variance) + cv = std_dev / mean_count + + # Convert to score (lower CV = higher score) + # CV of 0 = 100%, CV of 1 = ~37%, CV of 2 = ~14% + score = max(0, 100 * math.exp(-cv)) + return round(score, 1) + + def find_moveable_shards( + self, min_size_gb: float = 40.0, max_size_gb: float = 60.0, table_name: Optional[str] = None + ) -> List[ShardInfo]: + """Find shards that are candidates for moving based on size + + Only returns healthy shards that are safe to move. + Prioritizes shards from nodes with less available space. + """ + # Get only healthy shards (STARTED + 100% recovered) for safe operations + healthy_shards = self.client.get_shards_info( + table_name=table_name, + min_size_gb=min_size_gb, + max_size_gb=max_size_gb, + for_analysis=False, # Only operational shards + ) + + # Create a mapping of node names to available space + node_space_map = {node.name: node.available_space_gb for node in self.nodes} + + # Sort by node available space (ascending, so low space nodes first), then by shard size + healthy_shards.sort(key=lambda s: (node_space_map.get(s.node_name, float("inf")), s.size_gb)) + return healthy_shards + + def check_zone_balance( + self, table_name: Optional[str] = None, tolerance_percent: float = 10.0 + ) -> Dict[str, Dict[str, int]]: + """Check if zones are balanced within tolerance""" + # Filter shards by table if specified + shards = self.shards + if table_name: + shards = [s for s in shards if s.table_name == table_name] + + # Count shards by zone and type + zone_stats: Dict[str, Dict] = defaultdict(lambda: {"PRIMARY": 0, "REPLICA": 0, "TOTAL": 0}) + + for shard in shards: + shard_type = shard.shard_type + zone_stats[shard.zone][shard_type] += 1 + zone_stats[shard.zone]["TOTAL"] += 1 + + return dict(zone_stats) + + def find_nodes_with_capacity( + self, + required_space_gb: float, + exclude_zones: Optional[Set[str]] = None, + exclude_nodes: Optional[Set[str]] = None, + min_free_space_gb: float = 100.0, + max_disk_usage_percent: float = 85.0, + ) -> List[NodeInfo]: + """Find nodes that have capacity for additional shards + + Args: + required_space_gb: Minimum space needed for the shard + exclude_zones: Zones to exclude from consideration + exclude_nodes: Specific nodes to exclude + min_free_space_gb: Additional buffer space required + max_disk_usage_percent: Maximum disk usage percentage allowed + """ + available_nodes = [] + + for node in self.nodes: + # Skip zones we want to exclude + if exclude_zones and node.zone in exclude_zones: + continue + + # Skip specific nodes we want to exclude + if exclude_nodes and node.name in exclude_nodes: + continue + + # Check disk usage threshold + if node.disk_usage_percent > max_disk_usage_percent: + continue + + # Check if node has enough free space + free_space_gb = node.available_space_gb + if free_space_gb >= (required_space_gb + min_free_space_gb): + available_nodes.append(node) + else: + continue + + # Sort by available space (most space first) - prioritize nodes with more free space + available_nodes.sort(key=lambda n: n.available_space_gb, reverse=True) + return available_nodes + + def generate_rebalancing_recommendations( + self, constraints: ShardRelocationConstraints + ) -> List[ShardRelocationResponse]: + """Generate recommendations for rebalancing shards + + Args: + prioritize_space: If True, prioritizes moving shards from nodes with less available space + regardless of zone balance. If False, prioritizes zone balancing first. + source_node: If specified, only generate recommendations for shards on this node + max_disk_usage_percent: Maximum disk usage percentage for target nodes + """ + recommendations: List[ShardRelocationResponse] = [] + + # Get moveable shards (only healthy ones for actual operations) + moveable_shards = self.find_moveable_shards(constraints.min_size, constraints.max_size, constraints.table_name) + + print( + f"Analyzing {len(moveable_shards)} candidate shards " + f"in size range {constraints.min_size}-{constraints.max_size}GB..." + ) + + if not moveable_shards: + return recommendations + + # Analyze current zone balance + zone_stats = self.check_zone_balance(constraints.table_name, constraints.zone_tolerance) + + # Calculate target distribution + total_shards = sum(stats["TOTAL"] for stats in zone_stats.values()) + zones = list(zone_stats.keys()) + target_per_zone = total_shards // len(zones) if zones else 0 + + # Find zones that are over/under capacity + overloaded_zones = [] + underloaded_zones = [] + + for zone, stats in zone_stats.items(): + current_count = stats["TOTAL"] + threshold_high = target_per_zone * (1 + constraints.zone_tolerance / 100) + threshold_low = target_per_zone * (1 - constraints.zone_tolerance / 100) + + if current_count > threshold_high: + overloaded_zones.append(zone) + elif current_count < threshold_low: + underloaded_zones.append(zone) + + # Optimize processing: if filtering by source node, only process those shards + if constraints.source_node: + processing_shards = [s for s in moveable_shards if s.node_name == constraints.source_node] + print(f"Focusing on {len(processing_shards)} shards from node {constraints.source_node}") + else: + processing_shards = moveable_shards + + # Generate move recommendations + safe_recommendations = 0 # noqa: F841 + total_evaluated = 0 + + for i, shard in enumerate(processing_shards): + if shard is None: + logger.info(f"Shard not found: {i}") + continue + + if len(recommendations) >= constraints.max_recommendations: + logger.info(f"Found {len(recommendations)} recommendations for shard: {shard.shard_id}") + break + + # Show progress every 50 shards when processing many + if len(processing_shards) > 100 and i > 0 and i % 50 == 0: + print(".", end="", flush=True) + + total_evaluated += 1 + + # Skip based on priority mode + if not constraints.prioritize_space: + # Zone balancing mode: only move shards from overloaded zones + if shard.zone not in overloaded_zones: + continue + # In space priority mode, consider all shards regardless of zone balance + + # Find target nodes, excluding the source node and prioritizing by available space (with caching) + target_nodes = self._find_nodes_with_capacity_cached( + required_space_gb=shard.size_gb, + exclude_nodes={shard.node_name}, # Don't move to same node + min_free_space_gb=constraints.min_free_space, + max_disk_usage_percent=constraints.max_disk_usage, + ) + + # Quick pre-filter to avoid expensive safety validations + # Only check nodes in different zones (for zone balancing) + if not constraints.prioritize_space: + target_nodes = [node for node in target_nodes if node.zone != shard.zone] + + # Limit to top 3 candidates to reduce validation overhead + target_nodes = target_nodes[:3] + + # Filter target nodes to find safe candidates + safe_target_nodes = [] + for candidate_node in target_nodes: + # Create a temporary recommendation to test safety + temp_rec = ShardRelocationResponse( + table_name=shard.table_name, + schema_name=shard.schema_name, + shard_id=shard.shard_id, + from_node=shard.node_name, + to_node=candidate_node.name, + from_zone=shard.zone, + to_zone=candidate_node.zone, + shard_type=shard.shard_type, + size_gb=shard.size_gb, + reason="Safety validation", + ) + + # Check if this move would be safe + is_safe, safety_msg = self.validate_move_safety(temp_rec, constraints.max_disk_usage) + if is_safe: + safe_target_nodes.append(candidate_node) + + if not safe_target_nodes: + continue # No safe targets found, skip this shard + + target_node: NodeInfo + if constraints.prioritize_space: + # Space priority mode: choose node with most available space + target_node = safe_target_nodes[0] # Already sorted by available space (desc) + else: + # Zone balance mode: prefer underloaded zones, then available space + target_zones = set(underloaded_zones) - {shard.zone} + preferred_nodes = [n for n in safe_target_nodes if n.zone in target_zones] + other_nodes = [n for n in safe_target_nodes if n.zone not in target_zones] + + # Choose target node with intelligent priority: + # 1. If a node has significantly more space (2x) than zone-preferred nodes, prioritize space + # 2. Otherwise, prefer zone balancing first, then available space + + if preferred_nodes and other_nodes: + best_preferred = preferred_nodes[0] # Most space in preferred zones + best_other = other_nodes[0] # Most space in other zones + + # If the best "other" node has significantly more space (2x), choose it + if best_other.available_space_gb >= (best_preferred.available_space_gb * 2): + target_node = best_other + else: + target_node = best_preferred + elif preferred_nodes: + target_node = preferred_nodes[0] + elif other_nodes: + target_node = other_nodes[0] + else: + continue # No suitable target found + + # Determine the reason for the move + if constraints.prioritize_space: + if shard.zone == target_node.zone: + reason = f"Space optimization within {shard.zone}" + else: + reason = f"Space optimization: {shard.zone} -> {target_node.zone}" + else: + reason = f"Zone rebalancing: {shard.zone} -> {target_node.zone}" + if shard.zone == target_node.zone: + reason = f"Node balancing within {shard.zone}" + + recommendation = ShardRelocationResponse( + table_name=shard.table_name, + schema_name=shard.schema_name, + shard_id=shard.shard_id, + from_node=shard.node_name, + to_node=target_node.name, + from_zone=shard.zone, + to_zone=target_node.zone, + shard_type=shard.shard_type, + size_gb=shard.size_gb, + reason=reason, + ) + + recommendations.append(recommendation) + + if len(processing_shards) > 100: + print() # New line after progress dots + print(f"Generated {len(recommendations)} move recommendations (evaluated {total_evaluated} shards)") + print(f"Performance: {self.get_cache_stats()}") + return recommendations + + def validate_move_safety( + self, recommendation: ShardRelocationResponse, max_disk_usage_percent: float = 90.0 + ) -> Tuple[bool, str]: + """Validate that a move recommendation is safe to execute""" + # Find target node (with caching) + target_node = self._get_node_cached(recommendation.to_node) + + if not target_node: + return False, f"Target node '{recommendation.to_node}' not found" + + # Check for zone conflicts (same shard already exists in target zone) - with caching + zone_conflict = self._check_zone_conflict_cached(recommendation) + if zone_conflict: + return False, zone_conflict + + # Check available space + required_space_gb = recommendation.size_gb + 50 # 50GB buffer + if target_node.available_space_gb < required_space_gb: + return ( + False, + f"Insufficient space on target node (need {required_space_gb:.1f}GB, " + f"have {target_node.available_space_gb:.1f}GB)", + ) + + # Check disk usage + if target_node.disk_usage_percent > max_disk_usage_percent: + return False, f"Target node disk usage too high ({target_node.disk_usage_percent:.1f}%)" + + return True, "Move appears safe" + + def _get_node_cached(self, node_name: str): + """Get node by name with caching""" + if node_name in self._node_lookup_cache: + self._cache_hits += 1 + return self._node_lookup_cache[node_name] + + # Find node (cache miss) + self._cache_misses += 1 + target_node = None + for node in self.nodes: + if node.name == node_name: + target_node = node + break + + self._node_lookup_cache[node_name] = target_node + return target_node + + def _check_zone_conflict_cached(self, recommendation: ShardRelocationResponse) -> Optional[str]: + """Check zone conflicts with caching""" + # Create cache key: table, shard, target zone + target_zone = self._get_node_zone(recommendation.to_node) + cache_key = (recommendation.table_name, recommendation.shard_id, target_zone) + + if cache_key in self._zone_conflict_cache: + self._cache_hits += 1 + return self._zone_conflict_cache[cache_key] + + # Cache miss - do expensive check + self._cache_misses += 1 + result = self._check_zone_conflict(recommendation) + self._zone_conflict_cache[cache_key] = result + return result + + def _get_node_zone(self, node_name: str) -> str: + """Get zone for a node name""" + node = self._get_node_cached(node_name) + return node.zone if node else "unknown" + + def get_cache_stats(self) -> str: + """Get cache performance statistics""" + total = self._cache_hits + self._cache_misses + if total == 0: + return "Cache stats: No operations yet" + + hit_rate = (self._cache_hits / total) * 100 + return f"Cache stats: {hit_rate:.1f}% hit rate ({self._cache_hits} hits, {self._cache_misses} misses)" + + def _find_nodes_with_capacity_cached( + self, required_space_gb: float, exclude_nodes: set, min_free_space_gb: float, max_disk_usage_percent: float + ) -> List[NodeInfo]: + """Find nodes with capacity using caching for repeated queries""" + # Create cache key based on parameters (rounded to avoid float precision issues) + cache_key = ( + round(required_space_gb, 1), + frozenset(exclude_nodes), + round(min_free_space_gb, 1), + round(max_disk_usage_percent, 1), + ) + + if cache_key in self._target_nodes_cache: + self._cache_hits += 1 + return self._target_nodes_cache[cache_key] + + # Cache miss - do expensive calculation + self._cache_misses += 1 + result = self.find_nodes_with_capacity( + required_space_gb=required_space_gb, + exclude_nodes=exclude_nodes, + min_free_space_gb=min_free_space_gb, + max_disk_usage_percent=max_disk_usage_percent, + ) + + self._target_nodes_cache[cache_key] = result + return result + + def _check_zone_conflict(self, recommendation: ShardRelocationResponse) -> Optional[str]: + """Check if moving this shard would create a zone conflict + + Performs comprehensive zone safety analysis: + - Checks if target node already has a copy of this shard + - Checks if target zone already has copies + - Analyzes zone allocation limits and CrateDB's zone awareness rules + - Ensures move doesn't violate zone-awareness principles + """ + try: + # Query to get all copies of this shard across nodes and zones + query = """ + SELECT + s.node['id'] as node_id, + s.node['name'] as node_name, + n.attributes['zone'] as zone, + s."primary" as is_primary, + s.routing_state, + s.state + FROM sys.shards s + JOIN sys.nodes n ON s.node['id'] = n.id + WHERE s.table_name = ? + AND s.schema_name = ? + AND s.id = ? + ORDER BY s."primary" DESC, zone, node_name + """ + + result = self.client.execute_query( + query, [recommendation.table_name, recommendation.schema_name, recommendation.shard_id] + ) + + if not result.get("rows"): + return ( + f"Cannot find shard {recommendation.shard_id} " + f"for table {recommendation.schema_name}.{recommendation.table_name}" + ) + + # Analyze current distribution + zones_with_copies = set() + nodes_with_copies = set() + current_location = None + healthy_copies = 0 + total_copies = 0 + target_node_id = None + + # Get target node ID for the recommendation + for node in self.nodes: + if node.name == recommendation.to_node: + target_node_id = node.id + break + + if not target_node_id: + return f"Target node {recommendation.to_node} not found in cluster" + + for row in result["rows"]: + node_id, node_name, zone, is_primary, routing_state, state = row + zone = zone or "unknown" + total_copies += 1 + + # Track the shard we're planning to move + if node_name == recommendation.from_node: + current_location = { + "zone": zone, + "is_primary": is_primary, + "routing_state": routing_state, + "state": state, + } + + # Track all copies for conflict detection + nodes_with_copies.add(node_id) + if routing_state == "STARTED" and state == "STARTED": + healthy_copies += 1 + zones_with_copies.add(zone) + + # Validate the shard we're trying to move exists and is healthy + if not current_location: + return f"Shard not found on source node {recommendation.from_node}" + + if current_location["routing_state"] != "STARTED": + return f"Source shard is not in STARTED state (current: {current_location['routing_state']})" + + # CRITICAL CHECK 1: Target node already has a copy of this shard + if target_node_id in nodes_with_copies: + return ( + f"Node conflict: Target node {recommendation.to_node} " + f"already has a copy of shard {recommendation.shard_id}" + ) + + # CRITICAL CHECK 2: Target zone already has a copy (zone allocation limits) + if recommendation.to_zone in zones_with_copies: + return f"Zone conflict: {recommendation.to_zone} already has a copy of shard {recommendation.shard_id}" + + # CRITICAL CHECK 3: Ensure we're not creating a single point of failure + if len(zones_with_copies) == 1 and current_location["zone"] in zones_with_copies: + # This is the only zone with this shard - moving it is good for zone distribution + pass + elif len(zones_with_copies) <= 1 and healthy_copies <= 1: + return ( + f"Safety concern: Only {healthy_copies} healthy copy(ies) exist. " + f"Moving might risk data availability." + ) + + # ADDITIONAL CHECK: Verify zone allocation constraints for this table + table_zone_query = """ + SELECT + n.attributes['zone'] as zone, + COUNT(*) as shard_count + FROM sys.shards s + JOIN sys.nodes n ON s.node['id'] = n.id + WHERE s.table_name = ? + AND s.schema_name = ? + AND s.id = ? + AND s.routing_state = 'STARTED' + GROUP BY n.attributes['zone'] + ORDER BY zone + """ + + zone_result = self.client.execute_query( + table_zone_query, [recommendation.table_name, recommendation.schema_name, recommendation.shard_id] + ) + + current_zone_counts = {} + for row in zone_result.get("rows", []): + zone_name, count = row + current_zone_counts[zone_name or "unknown"] = count + + # Check if adding to target zone would violate balance + target_zone_count = current_zone_counts.get(recommendation.to_zone, 0) + if target_zone_count > 0: + return ( + f"Zone allocation violation: {recommendation.to_zone} " + f"would have {target_zone_count + 1} copies after move." + ) + + return None + + except Exception as e: + # If we can't check, err on the side of caution + return f"Cannot verify zone safety: {str(e)}" + + def get_cluster_overview(self) -> Dict[str, Any]: + """Get a comprehensive overview of the cluster""" + # Get cluster watermark settings + watermarks = self.client.get_cluster_watermarks() + + overview: Dict[str, Any] = { + "nodes": len(self.nodes), + "zones": len({node.zone for node in self.nodes}), + "total_shards": len(self.shards), + "primary_shards": len([s for s in self.shards if s.is_primary]), + "replica_shards": len([s for s in self.shards if not s.is_primary]), + "total_size_gb": sum(s.size_gb for s in self.shards), + "zone_distribution": defaultdict(int), + "node_health": [], + "watermarks": watermarks, + } + + # Zone distribution + for shard in self.shards: + overview["zone_distribution"][shard.zone] += 1 + overview["zone_distribution"] = dict(overview["zone_distribution"]) + + # Node health with watermark calculations + for node in self.nodes: + node_shards = [s for s in self.shards if s.node_name == node.name] + watermark_info = self._calculate_node_watermark_remaining(node, watermarks) + + overview["node_health"].append( + { + "name": node.name, + "zone": node.zone, + "shards": len(node_shards), + "size_gb": sum(s.size_gb for s in node_shards), + "disk_usage_percent": node.disk_usage_percent, + "heap_usage_percent": node.heap_usage_percent, + "available_space_gb": node.available_space_gb, + "remaining_to_low_watermark_gb": watermark_info["remaining_to_low_gb"], + "remaining_to_high_watermark_gb": watermark_info["remaining_to_high_gb"], + } + ) + + return overview + + def _calculate_node_watermark_remaining(self, node: "NodeInfo", watermarks: Dict[str, Any]) -> Dict[str, float]: + """Calculate remaining space until watermarks are reached""" + + # Parse watermark percentages + low_watermark = self._parse_watermark_percentage(watermarks.get("low", "85%")) + high_watermark = self._parse_watermark_percentage(watermarks.get("high", "90%")) + + # Calculate remaining space to each watermark + total_space_bytes = node.fs_total + current_used_bytes = node.fs_used + + # Space that would be used at each watermark + low_watermark_used_bytes = total_space_bytes * (low_watermark / 100.0) + high_watermark_used_bytes = total_space_bytes * (high_watermark / 100.0) + + # Remaining space until each watermark (negative if already exceeded) + remaining_to_low_gb = max(0, (low_watermark_used_bytes - current_used_bytes) / (1024**3)) + remaining_to_high_gb = max(0, (high_watermark_used_bytes - current_used_bytes) / (1024**3)) + + return {"remaining_to_low_gb": remaining_to_low_gb, "remaining_to_high_gb": remaining_to_high_gb} + + def _parse_watermark_percentage(self, watermark_value: str) -> float: + """Parse watermark percentage from string like '85%' or '0.85'""" + if isinstance(watermark_value, str): + if watermark_value.endswith("%"): + return float(watermark_value[:-1]) + else: + # Handle decimal format like '0.85' + decimal_value = float(watermark_value) + if decimal_value <= 1.0: + return decimal_value * 100 + return decimal_value + elif isinstance(watermark_value, (int, float)): + if watermark_value <= 1.0: + return watermark_value * 100 + return watermark_value + else: + # Default to common values if parsing fails + return 85.0 # Default low watermark + + def plan_node_decommission(self, node_name: str, min_free_space_gb: float = 100.0) -> Dict[str, Any]: + """Plan the decommissioning of a node by analyzing required shard moves + + Args: + node_name: Name of the node to decommission + min_free_space_gb: Minimum free space required on target nodes + + Returns: + Dictionary with decommission plan and analysis + """ + # Find the node to decommission + target_node = None + for node in self.nodes: + if node.name == node_name: + target_node = node + break + + if not target_node: + return {"error": f"Node {node_name} not found in cluster", "feasible": False} + + # Get all shards on this node (only healthy ones for safety) + node_shards = [s for s in self.shards if s.node_name == node_name and s.routing_state == "STARTED"] + + if not node_shards: + return { + "node": node_name, + "zone": target_node.zone, + "feasible": True, + "shards_to_move": 0, + "total_size_gb": 0, + "recommendations": [], + "warnings": [], + "message": "Node has no healthy shards - safe to decommission", + } + + # Calculate space requirements + total_size_gb = sum(s.size_gb for s in node_shards) + + # Find potential target nodes for each shard + move_plan = [] + warnings = [] + infeasible_moves = [] + + for shard in node_shards: + # Find nodes that can accommodate this shard + potential_targets = self.find_nodes_with_capacity( + shard.size_gb, exclude_nodes={node_name}, min_free_space_gb=min_free_space_gb + ) + + if not potential_targets: + infeasible_moves.append( + { + "shard": f"{shard.schema_name}.{shard.table_name}[{shard.shard_id}]", + "size_gb": shard.size_gb, + "reason": "No nodes with sufficient capacity", + } + ) + continue + + # Check for zone conflicts + safe_targets = [] + for target in potential_targets: + # Create a temporary recommendation to test zone safety + temp_rec = ShardRelocationResponse( + table_name=shard.table_name, + schema_name=shard.schema_name, + shard_id=shard.shard_id, + from_node=node_name, + to_node=target.name, + from_zone=shard.zone, + to_zone=target.zone, + shard_type=shard.shard_type, + size_gb=shard.size_gb, + reason=f"Node decommission: {node_name}", + ) + + zone_conflict = self._check_zone_conflict(temp_rec) + if not zone_conflict: + safe_targets.append(target) + else: + warnings.append( + f"Zone conflict for {shard.schema_name}.{shard.table_name}[{shard.shard_id}]: {zone_conflict}" + ) + + if safe_targets: + # Choose the target with most available space + best_target = safe_targets[0] + move_plan.append( + ShardRelocationResponse( + table_name=shard.table_name, + schema_name=shard.schema_name, + shard_id=shard.shard_id, + from_node=node_name, + to_node=best_target.name, + from_zone=shard.zone, + to_zone=best_target.zone, + shard_type=shard.shard_type, + size_gb=shard.size_gb, + reason=f"Node decommission: {node_name}", + ) + ) + else: + infeasible_moves.append( + { + "shard": f"{shard.schema_name}.{shard.table_name}[{shard.shard_id}]", + "size_gb": shard.size_gb, + "reason": "Zone conflicts prevent safe move", + } + ) + + # Determine feasibility + feasible = len(infeasible_moves) == 0 + + # Add capacity warnings + if feasible: + # Check if remaining cluster capacity is sufficient after decommission + remaining_capacity = sum(n.available_space_gb for n in self.nodes if n.name != node_name) + if remaining_capacity < total_size_gb * 1.2: # 20% safety margin + warnings.append( + f"Low remaining capacity after decommission. " + f"Only {remaining_capacity:.1f}GB available for {total_size_gb:.1f}GB of data" + ) + + return { + "node": node_name, + "zone": target_node.zone, + "feasible": feasible, + "shards_to_move": len(node_shards), + "moveable_shards": len(move_plan), + "total_size_gb": total_size_gb, + "recommendations": move_plan, + "infeasible_moves": infeasible_moves, + "warnings": warnings, + "estimated_time_hours": len(move_plan) * 0.1, # Rough estimate: 6 minutes per move + "message": "Decommission plan generated" if feasible else "Decommission not currently feasible", + } + + +class ShardReporter: + def __init__(self, analyzer: ShardAnalyzer): + self.analyzer = analyzer + + def distribution(self, table: str = None): + """Analyze current shard distribution across nodes and zones""" + console.print(Panel.fit("[bold blue]CrateDB Cluster Analysis[/bold blue]")) + + # Get cluster overview (includes all shards for complete analysis) + overview: Dict[str, Any] = self.analyzer.get_cluster_overview() + + # Cluster summary table + summary_table = Table(title="Cluster Summary", box=box.ROUNDED) + summary_table.add_column("Metric", style="cyan") + summary_table.add_column("Value", style="magenta") + + summary_table.add_row("Nodes", str(overview["nodes"])) + summary_table.add_row("Availability Zones", str(overview["zones"])) + summary_table.add_row("Total Shards", str(overview["total_shards"])) + summary_table.add_row("Primary Shards", str(overview["primary_shards"])) + summary_table.add_row("Replica Shards", str(overview["replica_shards"])) + summary_table.add_row("Total Size", format_size(overview["total_size_gb"])) + + console.print(summary_table) + console.print() + + # Disk watermarks table + if overview.get("watermarks"): + watermarks_table = Table(title="Disk Allocation Watermarks", box=box.ROUNDED) + watermarks_table.add_column("Setting", style="cyan") + watermarks_table.add_column("Value", style="magenta") + + watermarks = overview["watermarks"] + watermarks_table.add_row("Low Watermark", str(watermarks.get("low", "Not set"))) + watermarks_table.add_row("High Watermark", str(watermarks.get("high", "Not set"))) + watermarks_table.add_row("Flood Stage", str(watermarks.get("flood_stage", "Not set"))) + watermarks_table.add_row( + "Enable for Single Node", str(watermarks.get("enable_for_single_data_node", "Not set")) + ) + + console.print(watermarks_table) + console.print() + + # Zone distribution table + zone_table = Table(title="Zone Distribution", box=box.ROUNDED) + zone_table.add_column("Zone", style="cyan") + zone_table.add_column("Shards", justify="right", style="magenta") + zone_table.add_column("Percentage", justify="right", style="green") + + total_shards = overview["total_shards"] + for zone, count in overview["zone_distribution"].items(): + percentage = (count / total_shards * 100) if total_shards > 0 else 0 + zone_table.add_row(zone, str(count), f"{percentage:.1f}%") + + console.print(zone_table) + console.print() + + # Node health table + node_table = Table(title="Node Health", box=box.ROUNDED) + node_table.add_column("Node", style="cyan") + node_table.add_column("Zone", style="blue") + node_table.add_column("Shards", justify="right", style="magenta") + node_table.add_column("Size", justify="right", style="green") + node_table.add_column("Disk Usage", justify="right") + node_table.add_column("Available Space", justify="right", style="green") + node_table.add_column("Until Low WM", justify="right", style="yellow") + node_table.add_column("Until High WM", justify="right", style="red") + + for node_info in overview["node_health"]: + # Format watermark remaining capacity + low_wm_remaining = ( + format_size(node_info["remaining_to_low_watermark_gb"]) + if node_info["remaining_to_low_watermark_gb"] > 0 + else "[red]Exceeded[/red]" + ) + high_wm_remaining = ( + format_size(node_info["remaining_to_high_watermark_gb"]) + if node_info["remaining_to_high_watermark_gb"] > 0 + else "[red]Exceeded[/red]" + ) + + node_table.add_row( + node_info["name"], + node_info["zone"], + str(node_info["shards"]), + format_size(node_info["size_gb"]), + format_percentage(node_info["disk_usage_percent"]), + format_size(node_info["available_space_gb"]), + low_wm_remaining, + high_wm_remaining, + ) + + console.print(node_table) + + # Table-specific analysis if requested + if table: + console.print() + console.print(Panel.fit(f"[bold blue]Analysis for table: {table}[/bold blue]")) + + stats = self.analyzer.analyze_distribution(table) + + table_summary = Table(title=f"Table {table} Distribution", box=box.ROUNDED) + table_summary.add_column("Metric", style="cyan") + table_summary.add_column("Value", style="magenta") + + table_summary.add_row("Total Shards", str(stats.total_shards)) + table_summary.add_row("Total Size", format_size(stats.total_size_gb)) + table_summary.add_row("Zone Balance Score", f"{stats.zone_balance_score:.1f}/100") + table_summary.add_row("Node Balance Score", f"{stats.node_balance_score:.1f}/100") + + console.print(table_summary) diff --git a/cratedb_toolkit/admin/xmover/analysis/zone.py b/cratedb_toolkit/admin/xmover/analysis/zone.py new file mode 100644 index 00000000..718d88f0 --- /dev/null +++ b/cratedb_toolkit/admin/xmover/analysis/zone.py @@ -0,0 +1,159 @@ +from typing import Dict, List, Optional + +from rich import box +from rich.console import Console +from rich.panel import Panel +from rich.table import Table + +from cratedb_toolkit.admin.xmover.analysis.shard import ShardAnalyzer +from cratedb_toolkit.admin.xmover.model import ShardInfo +from cratedb_toolkit.admin.xmover.util.database import CrateDBClient + +console = Console() + + +class ZoneReport: + def __init__(self, client: CrateDBClient): + self.client = client + self.analyzer = ShardAnalyzer(self.client) + + def shard_balance(self, tolerance: float, table: Optional[str] = None): + """Check zone balance for shards""" + console.print(Panel.fit("[bold blue]Zone Balance Check[/bold blue]")) + console.print("[dim]Note: Analyzing all shards regardless of state for complete cluster view[/dim]") + console.print() + + zone_stats = self.analyzer.check_zone_balance(table, tolerance) + + if not zone_stats: + console.print("[yellow]No shards found for analysis[/yellow]") + return + + # Calculate totals and targets + total_shards = sum(stats["TOTAL"] for stats in zone_stats.values()) + zones = list(zone_stats.keys()) + target_per_zone = total_shards // len(zones) if zones else 0 + tolerance_range = (target_per_zone * (1 - tolerance / 100), target_per_zone * (1 + tolerance / 100)) + + balance_table = Table(title=f"Zone Balance Analysis (Target: {target_per_zone} ±{tolerance}%)", box=box.ROUNDED) + balance_table.add_column("Zone", style="cyan") + balance_table.add_column("Primary", justify="right", style="blue") + balance_table.add_column("Replica", justify="right", style="green") + balance_table.add_column("Total", justify="right", style="magenta") + balance_table.add_column("Status", style="bold") + + for zone, stats in zone_stats.items(): + total = stats["TOTAL"] + + if tolerance_range[0] <= total <= tolerance_range[1]: + status = "[green]✓ Balanced[/green]" + elif total < tolerance_range[0]: + status = f"[yellow]⚠ Under ({total - target_per_zone:+})[/yellow]" + else: + status = f"[red]⚠ Over ({total - target_per_zone:+})[/red]" + + balance_table.add_row(zone, str(stats["PRIMARY"]), str(stats["REPLICA"]), str(total), status) + + console.print(balance_table) + + def distribution_conflicts(self, shard_details: bool = False, table: Optional[str] = None): + """Detailed analysis of zone distribution and potential conflicts""" + console.print(Panel.fit("[bold blue]Detailed Zone Analysis[/bold blue]")) + console.print("[dim]Comprehensive zone distribution analysis for CrateDB cluster[/dim]") + console.print() + + # Get all shards for analysis + shards = self.client.get_shards_info(table_name=table, for_analysis=True) + + if not shards: + console.print("[yellow]No shards found for analysis[/yellow]") + return + + # Organize by table and shard + tables: Dict[str, Dict[int, List[ShardInfo]]] = {} + for shard in shards: + table_key = f"{shard.schema_name}.{shard.table_name}" + if table_key not in tables: + tables[table_key] = {} + + shard_key = shard.shard_id + if shard_key not in tables[table_key]: + tables[table_key][shard_key] = [] + + tables[table_key][shard_key].append(shard) + + # Analyze each table + zone_conflicts = 0 + under_replicated = 0 + + for table_name, table_shards in tables.items(): + console.print(f"\n[bold cyan]Table: {table_name}[/bold cyan]") + + # Create analysis table + analysis_table = Table(title=f"Shard Distribution for {table_name}", box=box.ROUNDED) + analysis_table.add_column("Shard ID", justify="right", style="magenta") + analysis_table.add_column("Primary Zone", style="blue") + analysis_table.add_column("Replica Zones", style="green") + analysis_table.add_column("Total Copies", justify="right", style="cyan") + analysis_table.add_column("Status", style="bold") + + for shard_id, shard_copies in sorted(table_shards.items()): + primary_zone = "Unknown" + replica_zones = set() + total_copies = len(shard_copies) + zones_with_copies = set() + + for shard_copy in shard_copies: + zones_with_copies.add(shard_copy.zone) + if shard_copy.is_primary: + primary_zone = shard_copy.zone + else: + replica_zones.add(shard_copy.zone) + + # Determine status + status_parts = [] + if len(zones_with_copies) == 1: + zone_conflicts += 1 + status_parts.append("[red]⚠ ZONE CONFLICT[/red]") + + if total_copies < 2: # Assuming we want at least 1 replica + under_replicated += 1 + status_parts.append("[yellow]⚠ Under-replicated[/yellow]") + + if not status_parts: + status_parts.append("[green]✓ Good[/green]") + + replica_zones_str = ", ".join(sorted(replica_zones)) if replica_zones else "None" + + analysis_table.add_row( + str(shard_id), primary_zone, replica_zones_str, str(total_copies), " ".join(status_parts) + ) + + # Show individual shard details if requested + if shard_details: + for shard_copy in shard_copies: + health_indicator = "✓" if shard_copy.routing_state == "STARTED" else "⚠" + console.print( + f" {health_indicator} {shard_copy.shard_type} " + f"on {shard_copy.node_name} ({shard_copy.zone}) - {shard_copy.routing_state}" + ) + + console.print(analysis_table) + + # Summary + console.print("\n[bold]Zone Analysis Summary:[/bold]") + console.print(f" • Tables analyzed: [cyan]{len(tables)}[/cyan]") + console.print(f" • Zone conflicts detected: [red]{zone_conflicts}[/red]") + console.print(f" • Under-replicated shards: [yellow]{under_replicated}[/yellow]") + + if zone_conflicts > 0: + console.print(f"\n[red]⚠ Found {zone_conflicts} zone conflicts that need attention![/red]") + console.print("[dim]Zone conflicts occur when all copies of a shard are in the same zone.[/dim]") + console.print("[dim]This violates CrateDB's zone-awareness and creates availability risks.[/dim]") + + if under_replicated > 0: + console.print(f"\n[yellow]⚠ Found {under_replicated} under-replicated shards.[/yellow]") + console.print("[dim]Consider increasing replication for better availability.[/dim]") + + if zone_conflicts == 0 and under_replicated == 0: + console.print("\n[green]✓ No critical zone distribution issues detected![/green]") diff --git a/cratedb_toolkit/admin/xmover/attic.py b/cratedb_toolkit/admin/xmover/attic.py new file mode 100644 index 00000000..3cbfc3ee --- /dev/null +++ b/cratedb_toolkit/admin/xmover/attic.py @@ -0,0 +1,118 @@ +# ruff: noqa + +# @main.command() +# @click.argument('node_name') +# @click.option('--min-free-space', default=100.0, help='Minimum free space required on target nodes in GB (default: 100)') +# @click.option('--dry-run/--execute', default=True, help='Show decommission plan without generating SQL commands (default: True)') +# @click.pass_context +# def decommission(ctx, node_name: str, min_free_space: float, dry_run: bool): +# """Plan decommissioning of a node by analyzing required shard moves +# +# NODE_NAME: Name of the node to decommission +# """ +# client = ctx.obj['client'] +# analyzer = ShardAnalyzer(client) +# +# mode_text = "PLANNING MODE" if dry_run else "EXECUTION MODE" +# console.print(Panel.fit(f"[bold blue]Node Decommission Analysis[/bold blue] - [bold {'green' if dry_run else 'red'}]{mode_text}[/bold {'green' if dry_run else 'red'}]")) +# console.print(f"[dim]Analyzing decommission plan for node: {node_name}[/dim]") +# console.print() +# +# # Generate decommission plan +# plan = analyzer.plan_node_decommission(node_name, min_free_space) +# +# if 'error' in plan: +# console.print(f"[red]Error: {plan['error']}[/red]") +# return +# +# # Display plan summary +# summary_table = Table(title=f"Decommission Plan for {node_name}", box=box.ROUNDED) +# summary_table.add_column("Metric", style="cyan") +# summary_table.add_column("Value", style="magenta") +# +# summary_table.add_row("Node", plan['node']) +# summary_table.add_row("Zone", plan['zone']) +# summary_table.add_row("Feasible", "[green]✓ Yes[/green]" if plan['feasible'] else "[red]✗ No[/red]") +# summary_table.add_row("Shards to Move", str(plan['shards_to_move'])) +# summary_table.add_row("Moveable Shards", str(plan['moveable_shards'])) +# summary_table.add_row("Total Data Size", format_size(plan['total_size_gb'])) +# summary_table.add_row("Estimated Time", f"{plan['estimated_time_hours']:.1f} hours") +# +# console.print(summary_table) +# console.print() +# +# # Show warnings if any +# if plan['warnings']: +# console.print("[bold yellow]⚠ Warnings:[/bold yellow]") +# for warning in plan['warnings']: +# console.print(f" • [yellow]{warning}[/yellow]") +# console.print() +# +# # Show infeasible moves if any +# if plan['infeasible_moves']: +# console.print("[bold red]✗ Cannot Move:[/bold red]") +# infeasible_table = Table(box=box.ROUNDED) +# infeasible_table.add_column("Shard", style="cyan") +# infeasible_table.add_column("Size", style="magenta") +# infeasible_table.add_column("Reason", style="red") +# +# for move in plan['infeasible_moves']: +# infeasible_table.add_row( +# move['shard'], +# format_size(move['size_gb']), +# move['reason'] +# ) +# console.print(infeasible_table) +# console.print() +# +# # Show move recommendations +# if plan['recommendations']: +# move_table = Table(title="Required Shard Moves", box=box.ROUNDED) +# move_table.add_column("Table", style="cyan") +# move_table.add_column("Shard", justify="right", style="magenta") +# move_table.add_column("Type", style="blue") +# move_table.add_column("Size", style="green") +# move_table.add_column("From Zone", style="yellow") +# move_table.add_column("To Node", style="cyan") +# move_table.add_column("To Zone", style="yellow") +# +# for rec in plan['recommendations']: +# move_table.add_row( +# f"{rec.schema_name}.{rec.table_name}", +# str(rec.shard_id), +# rec.shard_type, +# format_size(rec.size_gb), +# rec.from_zone, +# rec.to_node, +# rec.to_zone +# ) +# +# console.print(move_table) +# console.print() +# +# # Generate SQL commands if not in dry-run mode +# if not dry_run and plan['feasible']: +# console.print(Panel.fit("[bold green]Decommission SQL Commands[/bold green]")) +# console.print("[dim]# Execute these commands in order to prepare for node decommission[/dim]") +# console.print("[dim]# ALWAYS test in a non-production environment first![/dim]") +# console.print("[dim]# Monitor shard health after each move before proceeding[/dim]") +# console.print() +# +# for i, rec in enumerate(plan['recommendations'], 1): +# console.print(f"-- Move {i}: {rec.reason}") +# console.print(f"{rec.to_sql()}") +# console.print() +# +# console.print(f"-- After all moves complete, the node {node_name} can be safely removed") +# console.print(f"-- Total moves required: {len(plan['recommendations'])}") +# elif dry_run: +# console.print("[green]✓ Decommission plan ready. Use --execute to generate SQL commands.[/green]") +# +# # Final status +# if not plan['feasible']: +# console.print(f"[red]⚠ Node {node_name} cannot be safely decommissioned at this time.[/red]") +# console.print("[dim]Address the issues above before attempting decommission.[/dim]") +# elif plan['shards_to_move'] == 0: +# console.print(f"[green]✓ Node {node_name} is ready for immediate decommission (no shards to move).[/green]") +# else: +# console.print(f"[green]✓ Node {node_name} can be safely decommissioned after moving {len(plan['recommendations'])} shards.[/green]") diff --git a/cratedb_toolkit/admin/xmover/cli.py b/cratedb_toolkit/admin/xmover/cli.py new file mode 100644 index 00000000..339f9e7f --- /dev/null +++ b/cratedb_toolkit/admin/xmover/cli.py @@ -0,0 +1,283 @@ +""" +XMover - CrateDB Shard Analyzer and Movement Tool + +Command Line Interface. +""" + +import sys +from typing import Optional + +import click +from rich.console import Console + +from cratedb_toolkit.admin.xmover.analysis.shard import ShardAnalyzer, ShardReporter +from cratedb_toolkit.admin.xmover.analysis.zone import ZoneReport +from cratedb_toolkit.admin.xmover.model import ( + ShardRelocationConstraints, + ShardRelocationRequest, + SizeCriteria, +) +from cratedb_toolkit.admin.xmover.operational.candidates import CandidateFinder +from cratedb_toolkit.admin.xmover.operational.monitor import RecoveryMonitor, RecoveryOptions +from cratedb_toolkit.admin.xmover.operational.recommend import ShardRelocationRecommender +from cratedb_toolkit.admin.xmover.util.database import CrateDBClient +from cratedb_toolkit.admin.xmover.util.error import explain_cratedb_error + +console = Console() + + +@click.group() +@click.version_option() +@click.pass_context +def main(ctx): + """XMover - CrateDB Shard Analyzer and Movement Tool + + A tool for analyzing CrateDB shard distribution across nodes and availability zones, + and generating safe SQL commands for shard rebalancing. + """ + ctx.ensure_object(dict) + + # Test connection on startup + try: + client = CrateDBClient() + if not client.test_connection(): + console.print("[red]Error: Could not connect to CrateDB[/red]") + console.print("Please check your CRATE_CONNECTION_STRING in .env file") + sys.exit(1) + ctx.obj["client"] = client + except Exception as e: + console.print(f"[red]Error connecting to CrateDB: {e}[/red]") + sys.exit(1) + + +@main.command() +@click.option("--table", "-t", help="Analyze specific table only") +@click.pass_context +def analyze(ctx, table: Optional[str]): + """Analyze current shard distribution across nodes and zones""" + client = ctx.obj["client"] + analyzer = ShardAnalyzer(client) + reporter = ShardReporter(analyzer) + reporter.distribution(table=table) + + +@main.command() +@click.option("--min-size", default=40.0, help="Minimum shard size in GB (default: 40)") +@click.option("--max-size", default=60.0, help="Maximum shard size in GB (default: 60)") +@click.option("--limit", default=20, help="Maximum number of candidates to show (default: 20)") +@click.option("--table", "-t", help="Find candidates for specific table only") +@click.option("--node", help="Only show candidates from this specific source node (e.g., data-hot-4)") +@click.pass_context +def find_candidates(ctx, min_size: float, max_size: float, limit: int, table: Optional[str], node: Optional[str]): + """Find shard candidates for movement based on size criteria""" + client = ctx.obj["client"] + analyzer = ShardAnalyzer(client) + finder = CandidateFinder(analyzer) + finder.movement_candidates( + criteria=SizeCriteria( + min_size=min_size, + max_size=max_size, + table_name=table, + source_node=node, + ), + limit=limit, + ) + + +@main.command() +@click.option("--table", "-t", help="Generate recommendations for specific table only") +@click.option("--min-size", default=40.0, help="Minimum shard size in GB (default: 40)") +@click.option("--max-size", default=60.0, help="Maximum shard size in GB (default: 60)") +@click.option("--zone-tolerance", default=10.0, help="Zone balance tolerance percentage (default: 10)") +@click.option( + "--min-free-space", default=100.0, help="Minimum free space required on target nodes in GB (default: 100)" +) +@click.option("--max-moves", default=10, help="Maximum number of move recommendations (default: 10)") +@click.option("--max-disk-usage", default=90.0, help="Maximum disk usage percentage for target nodes (default: 90)") +@click.option("--validate/--no-validate", default=True, help="Validate move safety (default: True)") +@click.option( + "--prioritize-space/--prioritize-zones", + default=False, + help="Prioritize available space over zone balancing (default: False)", +) +@click.option( + "--dry-run/--execute", default=True, help="Show what would be done without generating SQL commands (default: True)" +) +@click.option( + "--auto-execute", + is_flag=True, + default=False, + help="DANGER: Automatically execute the SQL commands (requires --execute, asks for confirmation)", +) +@click.option("--node", help="Only recommend moves from this specific source node (e.g., data-hot-4)") +@click.pass_context +def recommend( + ctx, + table: Optional[str], + node: Optional[str], + min_size: float, + max_size: float, + zone_tolerance: float, + min_free_space: float, + max_moves: int, + max_disk_usage: float, + prioritize_space: bool, + validate: bool, + dry_run: bool, + auto_execute: bool, +): + """Generate shard movement recommendations for rebalancing""" + recommender = ShardRelocationRecommender(client=ctx.obj["client"]) + recommender.execute( + constraints=ShardRelocationConstraints( + table_name=table, + source_node=node, + min_size=min_size, + max_size=max_size, + zone_tolerance=zone_tolerance, + min_free_space=min_free_space, + max_recommendations=max_moves, + max_disk_usage=max_disk_usage, + prioritize_space=prioritize_space, + ), + auto_execute=auto_execute, + validate=validate, + dry_run=dry_run, + ) + + +@main.command() +@click.option("--connection-string", help="Override connection string from .env") +@click.pass_context +def test_connection(ctx, connection_string: Optional[str]): + """Test connection to CrateDB cluster""" + try: + if connection_string: + client = CrateDBClient(connection_string) + else: + client = CrateDBClient() + + if client.test_connection(): + console.print("[green]✓ Connection successful![/green]") + + # Get basic cluster info + nodes = client.get_nodes_info() + console.print(f"Connected to cluster with {len(nodes)} nodes:") + for node in nodes: + console.print(f" • {node.name} (zone: {node.zone})") + else: + console.print("[red]✗ Connection failed[/red]") + sys.exit(1) + + except Exception as e: + console.print(f"[red]✗ Connection error: {e}[/red]") + sys.exit(1) + + +@main.command() +@click.option("--table", "-t", help="Check balance for specific table only") +@click.option("--tolerance", default=10.0, help="Zone balance tolerance percentage (default: 10)") +@click.pass_context +def check_balance(ctx, table: Optional[str], tolerance: float): + """Check zone balance for shards""" + client = ctx.obj["client"] + report = ZoneReport(client=client) + report.shard_balance(tolerance=tolerance, table=table) + + +@main.command() +@click.option("--table", "-t", help="Analyze zones for specific table only") +@click.option("--show-shards/--no-show-shards", default=False, help="Show individual shard details (default: False)") +@click.pass_context +def zone_analysis(ctx, table: Optional[str], show_shards: bool): + """Detailed analysis of zone distribution and potential conflicts""" + client = ctx.obj["client"] + report = ZoneReport(client=client) + report.distribution_conflicts(shard_details=show_shards, table=table) + + +@main.command() +@click.argument("schema_table") +@click.argument("shard_id", type=int) +@click.argument("from_node") +@click.argument("to_node") +@click.option("--max-disk-usage", default=90.0, help="Maximum disk usage percentage for target node (default: 90)") +@click.pass_context +def validate_move(ctx, schema_table: str, shard_id: int, from_node: str, to_node: str, max_disk_usage: float): + """Validate a specific shard move before execution + + SCHEMA_TABLE: Schema and table name (format: schema.table) + SHARD_ID: Shard ID to move + FROM_NODE: Source node name + TO_NODE: Target node name + + Example: xmover validate-move CUROV.maddoxxS 4 data-hot-1 data-hot-3 + """ + recommender = ShardRelocationRecommender(client=ctx.obj["client"]) + recommender.validate( + request=ShardRelocationRequest( + schema_table=schema_table, + shard_id=shard_id, + from_node=from_node, + to_node=to_node, + max_disk_usage=max_disk_usage, + ) + ) + + +@main.command() +@click.argument("error_message", required=False) +@click.pass_context +def explain_error(ctx, error_message: Optional[str]): + """Explain CrateDB allocation error messages and provide solutions + + ERROR_MESSAGE: The CrateDB error message to analyze (optional - can be provided interactively) + + Example: xmover explain-error "NO(a copy of this shard is already allocated to this node)" + """ + explain_cratedb_error(error_message) + + +@main.command() +@click.option("--table", "-t", help="Monitor recovery for specific table only") +@click.option("--node", "-n", help="Monitor recovery on specific node only") +@click.option("--watch", "-w", is_flag=True, help="Continuously monitor (refresh every 10s)") +@click.option("--refresh-interval", default=10, help="Refresh interval for watch mode (seconds)") +@click.option( + "--recovery-type", type=click.Choice(["PEER", "DISK", "all"]), default="all", help="Filter by recovery type" +) +@click.option("--include-transitioning", is_flag=True, help="Include completed recoveries still in transitioning state") +@click.pass_context +def monitor_recovery( + ctx, table: str, node: str, watch: bool, refresh_interval: int, recovery_type: str, include_transitioning: bool +): + """Monitor active shard recovery operations on the cluster + + This command monitors ongoing shard recoveries by querying sys.allocations + and sys.shards tables. It shows recovery progress, type (PEER/DISK), and timing. + + By default, only shows actively progressing recoveries. Use --include-transitioning + to also see completed recoveries that haven't fully transitioned to STARTED state. + + Examples: + xmover monitor-recovery # Show active recoveries only + xmover monitor-recovery --include-transitioning # Show active + transitioning + xmover monitor-recovery --table myTable # Monitor specific table + xmover monitor-recovery --watch # Continuous monitoring + xmover monitor-recovery --recovery-type PEER # Only PEER recoveries + """ + recovery_monitor = RecoveryMonitor( + client=ctx.obj["client"], + options=RecoveryOptions( + table=table, + node=node, + refresh_interval=refresh_interval, + recovery_type=recovery_type, + include_transitioning=include_transitioning, + ), + ) + recovery_monitor.start(watch=watch, debug=ctx.obj.get("debug")) + + +if __name__ == "__main__": + main() diff --git a/cratedb_toolkit/admin/xmover/model.py b/cratedb_toolkit/admin/xmover/model.py new file mode 100644 index 00000000..34e43f77 --- /dev/null +++ b/cratedb_toolkit/admin/xmover/model.py @@ -0,0 +1,186 @@ +import dataclasses +from dataclasses import dataclass +from typing import Dict, Optional + + +@dataclass +class NodeInfo: + """Information about a CrateDB node""" + + id: str + name: str + zone: str + heap_used: int + heap_max: int + fs_total: int + fs_used: int + fs_available: int + + @property + def heap_usage_percent(self) -> float: + return (self.heap_used / self.heap_max) * 100 if self.heap_max > 0 else 0 + + @property + def disk_usage_percent(self) -> float: + return (self.fs_used / self.fs_total) * 100 if self.fs_total > 0 else 0 + + @property + def available_space_gb(self) -> float: + return self.fs_available / (1024**3) + + +@dataclass +class ShardInfo: + """Information about a shard""" + + table_name: str + schema_name: str + shard_id: int + node_id: str + node_name: str + zone: str + is_primary: bool + size_bytes: int + size_gb: float + num_docs: int + state: str + routing_state: str + + @property + def shard_type(self) -> str: + return "PRIMARY" if self.is_primary else "REPLICA" + + +@dataclass +class RecoveryInfo: + """Information about an active shard recovery""" + + schema_name: str + table_name: str + shard_id: int + node_name: str + node_id: str + recovery_type: str # PEER, DISK, etc. + stage: str # INIT, INDEX, VERIFY_INDEX, TRANSLOG, FINALIZE, DONE + files_percent: float + bytes_percent: float + total_time_ms: int + routing_state: str # INITIALIZING, RELOCATING, etc. + current_state: str # from allocations + is_primary: bool + size_bytes: int + source_node_name: Optional[str] = None # Source node for PEER recoveries + translog_size_bytes: int = 0 # Translog size in bytes + + @property + def overall_progress(self) -> float: + """Calculate overall progress percentage""" + return max(self.files_percent, self.bytes_percent) + + @property + def size_gb(self) -> float: + """Size in GB""" + return self.size_bytes / (1024**3) + + @property + def shard_type(self) -> str: + return "PRIMARY" if self.is_primary else "REPLICA" + + @property + def total_time_seconds(self) -> float: + """Total time in seconds""" + return self.total_time_ms / 1000.0 + + @property + def translog_size_gb(self) -> float: + """Translog size in GB""" + return self.translog_size_bytes / (1024**3) + + @property + def translog_percentage(self) -> float: + """Translog size as percentage of shard size""" + return (self.translog_size_bytes / self.size_bytes * 100) if self.size_bytes > 0 else 0 + + +@dataclass +class ShardRelocationRequest: + """Request for moving a shard""" + + schema_table: str + shard_id: int + from_node: str + to_node: str + max_disk_usage: float + + +@dataclass +class ShardRelocationResponse: + """Recommendation for moving a shard""" + + table_name: str + schema_name: str + shard_id: int + from_node: str + to_node: str + from_zone: str + to_zone: str + shard_type: str + size_gb: float + reason: str + + def to_sql(self) -> str: + """Generate the SQL command for this move""" + return ( + f'ALTER TABLE "{self.schema_name}"."{self.table_name}" ' + f"REROUTE MOVE SHARD {self.shard_id} " + f"FROM '{self.from_node}' TO '{self.to_node}';" + ) + + @property + def safety_score(self) -> float: + """Calculate a safety score for this move (0-1, higher is safer)""" + score = 1.0 + + # Penalize if moving to same zone (not ideal for zone distribution) + if self.from_zone == self.to_zone: + score -= 0.3 + + # Bonus for zone balancing moves + if "rebalancing" in self.reason.lower(): + score += 0.2 + + # Ensure score stays in valid range + return max(0.0, min(1.0, score)) + + +@dataclass +class DistributionStats: + """Statistics about shard distribution""" + + total_shards: int + total_size_gb: float + zones: Dict[str, int] + nodes: Dict[str, int] + zone_balance_score: float # 0-100, higher is better + node_balance_score: float # 0-100, higher is better + + +@dataclasses.dataclass +class SizeCriteria: + min_size: float = 40.0 + max_size: float = 60.0 + table_name: Optional[str] = None + source_node: Optional[str] = None + + +@dataclasses.dataclass +class ShardRelocationConstraints: + min_size: float = SizeCriteria().min_size + max_size: float = SizeCriteria().max_size + table_name: Optional[str] = None + source_node: Optional[str] = None + zone_tolerance: float = 10.0 + min_free_space: float = 100.0 + max_recommendations: int = 10 + max_disk_usage: float = 90.0 + prioritize_space: bool = False diff --git a/cratedb_toolkit/admin/xmover/operational/__init__.py b/cratedb_toolkit/admin/xmover/operational/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/admin/xmover/operational/candidates.py b/cratedb_toolkit/admin/xmover/operational/candidates.py new file mode 100644 index 00000000..dd7d4930 --- /dev/null +++ b/cratedb_toolkit/admin/xmover/operational/candidates.py @@ -0,0 +1,84 @@ +from rich import box +from rich.console import Console +from rich.panel import Panel +from rich.table import Table + +from cratedb_toolkit.admin.xmover.analysis.shard import ShardAnalyzer +from cratedb_toolkit.admin.xmover.model import SizeCriteria +from cratedb_toolkit.admin.xmover.util.format import format_size + +console = Console() + + +class CandidateFinder: + def __init__(self, analyzer: ShardAnalyzer): + self.analyzer = analyzer + + def movement_candidates(self, criteria: SizeCriteria, limit: int): + """ + Find shard candidates for movement based on size criteria + + Results are sorted by nodes with least available space first, + then by shard size (smallest first) for easier moves. + """ + + console.print( + Panel.fit(f"[bold blue]Finding Moveable Shards ({criteria.min_size}-{criteria.max_size}GB)[/bold blue]") + ) + + if criteria.source_node: + console.print(f"[dim]Filtering: Only showing candidates from source node '{criteria.source_node}'[/dim]") + + # Find moveable candidates (only healthy shards suitable for operations) + candidates = self.analyzer.find_moveable_shards(criteria.min_size, criteria.max_size, criteria.table_name) + + # Filter by node if specified + if criteria.source_node: + candidates = [c for c in candidates if c.node_name == criteria.source_node] + + if not candidates: + if criteria.source_node: + console.print( + f"[yellow]No moveable shards found on node '{criteria.source_node}' " + f"in the specified size range.[/yellow]" + ) + console.print("[dim]Tip: Try different size ranges or remove --node filter to see all candidates[/dim]") + else: + console.print("[yellow]No moveable shards found in the specified size range.[/yellow]") + return + + # Show limited results + shown_candidates = candidates[:limit] + + candidates_table = Table( + title=f"Moveable Shard Candidates (showing {len(shown_candidates)} of {len(candidates)})", box=box.ROUNDED + ) + candidates_table.add_column("Table", style="cyan") + candidates_table.add_column("Shard ID", justify="right", style="magenta") + candidates_table.add_column("Type", style="blue") + candidates_table.add_column("Node", style="green") + candidates_table.add_column("Zone", style="yellow") + candidates_table.add_column("Size", justify="right", style="red") + candidates_table.add_column("Node Free Space", justify="right", style="white") + candidates_table.add_column("Documents", justify="right", style="dim") + + # Create a mapping of node names to available space for display + node_space_map = {node.name: node.available_space_gb for node in self.analyzer.nodes} + + for shard in shown_candidates: + node_free_space = node_space_map.get(shard.node_name, 0) + candidates_table.add_row( + f"{shard.schema_name}.{shard.table_name}", + str(shard.shard_id), + shard.shard_type, + shard.node_name, + shard.zone, + format_size(shard.size_gb), + format_size(node_free_space), + f"{shard.num_docs:,}", + ) + + console.print(candidates_table) + + if len(candidates) > limit: + console.print(f"\n[dim]... and {len(candidates) - limit} more candidates[/dim]") diff --git a/cratedb_toolkit/admin/xmover/operational/monitor.py b/cratedb_toolkit/admin/xmover/operational/monitor.py new file mode 100644 index 00000000..d88a295f --- /dev/null +++ b/cratedb_toolkit/admin/xmover/operational/monitor.py @@ -0,0 +1,384 @@ +import dataclasses +import time +from datetime import datetime +from typing import Any, Dict, List, Optional + +from rich.console import Console + +from cratedb_toolkit.admin.xmover.model import RecoveryInfo +from cratedb_toolkit.admin.xmover.util.database import CrateDBClient +from cratedb_toolkit.admin.xmover.util.format import format_translog_info + +console = Console() + + +@dataclasses.dataclass +class RecoveryOptions: + table: Optional[str] = None + node: Optional[str] = None + refresh_interval: int = 10 + include_transitioning: bool = False + recovery_type: Optional[str] = None + + +class RecoveryMonitor: + """Monitor shard recovery operations""" + + def __init__(self, client: CrateDBClient, options: RecoveryOptions): + self.client = client + self.options = options + + def get_cluster_recovery_status(self) -> List[RecoveryInfo]: + """Get comprehensive recovery status with minimal cluster impact""" + + # Get all recovering shards using the efficient combined query + recoveries = self.client.get_all_recovering_shards( + self.options.table, self.options.node, self.options.include_transitioning + ) + + # Apply recovery type filter + if self.options.recovery_type is not None: + recoveries = [r for r in recoveries if r.recovery_type.upper() == self.options.recovery_type.upper()] + + return recoveries + + def get_recovery_summary(self, recoveries: List[RecoveryInfo]) -> Dict[str, Any]: + """Generate a summary of recovery operations""" + + if not recoveries: + return {"total_recoveries": 0, "by_type": {}, "by_stage": {}, "avg_progress": 0.0, "total_size_gb": 0.0} + + # Group by recovery type + by_type = {} + by_stage = {} + total_progress = 0.0 + total_size_gb = 0.0 + + for recovery in recoveries: + # By type + if recovery.recovery_type not in by_type: + by_type[recovery.recovery_type] = {"count": 0, "total_size_gb": 0.0, "avg_progress": 0.0} + by_type[recovery.recovery_type]["count"] += 1 + by_type[recovery.recovery_type]["total_size_gb"] += recovery.size_gb + + # By stage + if recovery.stage not in by_stage: + by_stage[recovery.stage] = 0 + by_stage[recovery.stage] += 1 + + # Totals + total_progress += recovery.overall_progress + total_size_gb += recovery.size_gb + + # Calculate averages + for type_name, rec_type in by_type.items(): + if rec_type["count"] > 0: + type_recoveries = [r for r in recoveries if r.recovery_type == type_name] + if type_recoveries: + rec_type["avg_progress"] = sum(r.overall_progress for r in type_recoveries) / len(type_recoveries) + + return { + "total_recoveries": len(recoveries), + "by_type": by_type, + "by_stage": by_stage, + "avg_progress": total_progress / len(recoveries) if recoveries else 0.0, + "total_size_gb": total_size_gb, + } + + def format_recovery_display(self, recoveries: List[RecoveryInfo]) -> str: + """Format recovery information for display""" + + if not recoveries: + return "✅ No active shard recoveries found" + + # Group by recovery type + peer_recoveries = [r for r in recoveries if r.recovery_type == "PEER"] + disk_recoveries = [r for r in recoveries if r.recovery_type == "DISK"] + other_recoveries = [r for r in recoveries if r.recovery_type not in ["PEER", "DISK"]] + + output = [f"\n🔄 Active Shard Recoveries ({len(recoveries)} total)"] + output.append("=" * 80) + + if peer_recoveries: + output.append(f"\n📡 PEER Recoveries ({len(peer_recoveries)})") + output.append(self._format_recovery_table(peer_recoveries)) + + if disk_recoveries: + output.append(f"\n💾 DISK Recoveries ({len(disk_recoveries)})") + output.append(self._format_recovery_table(disk_recoveries)) + + if other_recoveries: + output.append(f"\n🔧 Other Recoveries ({len(other_recoveries)})") + output.append(self._format_recovery_table(other_recoveries)) + + # Add summary + summary = self.get_recovery_summary(recoveries) + output.append("\n📊 Summary:") + output.append(f" Total size: {summary['total_size_gb']:.1f} GB") + output.append(f" Average progress: {summary['avg_progress']:.1f}%") + + return "\n".join(output) + + def _format_recovery_table(self, recoveries: List[RecoveryInfo]) -> str: + """Format a table of recovery information""" + + if not recoveries: + return " No recoveries of this type" + + # Table headers + headers = ["Table", "Shard", "Node", "Type", "Stage", "Progress", "Size(GB)", "Time(s)"] + + # Calculate column widths + col_widths = [len(h) for h in headers] + + rows = [] + for recovery in recoveries: + row = [ + f"{recovery.schema_name}.{recovery.table_name}", + str(recovery.shard_id), + recovery.node_name, + recovery.shard_type, + recovery.stage, + f"{recovery.overall_progress:.1f}%", + f"{recovery.size_gb:.1f}", + f"{recovery.total_time_seconds:.1f}", + ] + rows.append(row) + + # Update column widths + for i, cell in enumerate(row): + col_widths[i] = max(col_widths[i], len(cell)) + + # Format table + output = [] + + # Header row + header_row = " " + " | ".join(h.ljust(w) for h, w in zip(headers, col_widths)) + output.append(header_row) + output.append(" " + "-" * (len(header_row) - 3)) + + # Data rows + for row in rows: + data_row = " " + " | ".join(cell.ljust(w) for cell, w in zip(row, col_widths)) + output.append(data_row) + + return "\n".join(output) + + def start(self, watch: bool, debug: bool = False): + try: + if watch: + console.print(f"🔄 Monitoring shard recoveries (refreshing every {self.options.refresh_interval}s)") + console.print("Press Ctrl+C to stop") + console.print() + + try: + # Show header once + console.print("📊 Recovery Progress Monitor") + console.print("=" * 80) + + # Track previous state for change detection + previous_recoveries: Dict[str, Dict[str, Any]] = {} + previous_timestamp = None + first_run = True + + while True: + # Get current recovery status + recoveries = self.get_cluster_recovery_status() + + # Display current time + current_time = datetime.now().strftime("%H:%M:%S") + + # Check for any changes + changes = [] + active_count = 0 + completed_count = 0 + + for recovery in recoveries: + recovery_key = ( + f"{recovery.schema_name}.{recovery.table_name}.{recovery.shard_id}.{recovery.node_name}" + ) + + # Create complete table name + if recovery.schema_name == "doc": + table_display = recovery.table_name + else: + table_display = f"{recovery.schema_name}.{recovery.table_name}" + + # Count active vs completed + if recovery.stage == "DONE" and recovery.overall_progress >= 100.0: + completed_count += 1 + else: + active_count += 1 + + # Check for changes since last update + if recovery_key in previous_recoveries: + prev = previous_recoveries[recovery_key] + if prev["progress"] != recovery.overall_progress: + diff = recovery.overall_progress - prev["progress"] + # Create node route display + node_route = "" + if recovery.recovery_type == "PEER" and recovery.source_node_name: + node_route = f" {recovery.source_node_name} → {recovery.node_name}" + elif recovery.recovery_type == "DISK": + node_route = f" disk → {recovery.node_name}" + + # Add translog info + translog_info = format_translog_info(recovery) + + if diff > 0: + changes.append( + f"[green]📈[/green] {table_display} S{recovery.shard_id} " + f"{recovery.overall_progress:.1f}% (+{diff:.1f}%) " + f"{recovery.size_gb:.1f}GB{translog_info}{node_route}" + ) + else: + changes.append( + f"[yellow]📉[/yellow] {table_display} S{recovery.shard_id} " + f"{recovery.overall_progress:.1f}% ({diff:.1f}%) " + f"{recovery.size_gb:.1f}GB{translog_info}{node_route}" + ) + elif prev["stage"] != recovery.stage: + # Create node route display + node_route = "" + if recovery.recovery_type == "PEER" and recovery.source_node_name: + node_route = f" {recovery.source_node_name} → {recovery.node_name}" + elif recovery.recovery_type == "DISK": + node_route = f" disk → {recovery.node_name}" + + # Add translog info + translog_info = format_translog_info(recovery) + + changes.append( + f"[blue]🔄[/blue] {table_display} S{recovery.shard_id} " + f"{prev['stage']}→{recovery.stage} " + f"{recovery.size_gb:.1f}GB{translog_info}{node_route}" + ) + else: + # New recovery - show based on include_transitioning flag or first run + if ( + first_run + or self.options.include_transitioning + or (recovery.overall_progress < 100.0 or recovery.stage != "DONE") + ): + # Create node route display + node_route = "" + if recovery.recovery_type == "PEER" and recovery.source_node_name: + node_route = f" {recovery.source_node_name} → {recovery.node_name}" + elif recovery.recovery_type == "DISK": + node_route = f" disk → {recovery.node_name}" + + status_icon = "[cyan]🆕[/cyan]" if not first_run else "[blue]📋[/blue]" + # Add translog info + translog_info = format_translog_info(recovery) + + changes.append( + f"{status_icon} {table_display} S{recovery.shard_id} " + f"{recovery.stage} {recovery.overall_progress:.1f}% " + f"{recovery.size_gb:.1f}GB{translog_info}{node_route}" + ) + + # Store current state for next comparison + previous_recoveries[recovery_key] = { + "progress": recovery.overall_progress, + "stage": recovery.stage, + } + + # Always show a status line + if not recoveries: + console.print(f"{current_time} | [green]No recoveries - cluster stable[/green]") + previous_recoveries.clear() + else: + # Build status message + status = "" + if active_count > 0: + status = f"{active_count} active" + if completed_count > 0: + status += f", {completed_count} done" if status else f"{completed_count} done" + + # Show status line with changes or periodic update + if changes: + console.print(f"{current_time} | {status}") + for change in changes: + console.print(f" | {change}") + else: + # Show periodic status even without changes + if self.options.include_transitioning and completed_count > 0: + console.print(f"{current_time} | {status} (transitioning)") + elif active_count > 0: + console.print(f"{current_time} | {status} (no changes)") + + previous_timestamp = current_time # noqa: F841 + first_run = False + time.sleep(self.options.refresh_interval) + + except KeyboardInterrupt: + console.print("\n\n[yellow]⏹ Monitoring stopped by user[/yellow]") + + # Show final summary + final_recoveries = self.get_cluster_recovery_status() + + if final_recoveries: + console.print("\n📊 [bold]Final Recovery Summary:[/bold]") + summary = self.get_recovery_summary(final_recoveries) + + # Count active vs completed + active_count = len( + [r for r in final_recoveries if r.overall_progress < 100.0 or r.stage != "DONE"] + ) + completed_count = len(final_recoveries) - active_count + + console.print(f" Total recoveries: {summary['total_recoveries']}") + console.print(f" Active: {active_count}, Completed: {completed_count}") + console.print(f" Total size: {summary['total_size_gb']:.1f} GB") + console.print(f" Average progress: {summary['avg_progress']:.1f}%") + + if summary["by_type"]: + console.print(" By recovery type:") + for rec_type, stats in summary["by_type"].items(): + console.print( + f" {rec_type}: {stats['count']} recoveries, " + f"{stats['avg_progress']:.1f}% avg progress" + ) + else: + console.print("\n[green]✅ No active recoveries at exit[/green]") + + return + + else: + # Single status check + recoveries = self.get_cluster_recovery_status() + + display_output = self.format_recovery_display(recoveries) + console.print(display_output) + + if not recoveries: + if self.options.include_transitioning: + console.print("\n[green]✅ No recoveries found (active or transitioning)[/green]") + else: + console.print("\n[green]✅ No active recoveries found[/green]") + console.print( + "[dim]💡 Use --include-transitioning to see completed recoveries still transitioning[/dim]" + ) + else: + # Show summary + summary = self.get_recovery_summary(recoveries) + console.print("\n📊 [bold]Recovery Summary:[/bold]") + console.print(f" Total recoveries: {summary['total_recoveries']}") + console.print(f" Total size: {summary['total_size_gb']:.1f} GB") + console.print(f" Average progress: {summary['avg_progress']:.1f}%") + + # Show breakdown by type + if summary["by_type"]: + console.print("\n By recovery type:") + for rec_type, stats in summary["by_type"].items(): + console.print( + f" {rec_type}: {stats['count']} recoveries, " + f"{stats['avg_progress']:.1f}% avg progress" + ) + + console.print("\n[dim]💡 Use --watch flag for continuous monitoring[/dim]") + + except Exception as e: + console.print(f"[red]❌ Error monitoring recoveries: {e}[/red]") + if debug: + raise diff --git a/cratedb_toolkit/admin/xmover/operational/recommend.py b/cratedb_toolkit/admin/xmover/operational/recommend.py new file mode 100644 index 00000000..ab5156e6 --- /dev/null +++ b/cratedb_toolkit/admin/xmover/operational/recommend.py @@ -0,0 +1,497 @@ +import time + +from rich import box +from rich.console import Console +from rich.panel import Panel +from rich.table import Table + +from cratedb_toolkit.admin.xmover.analysis.shard import ShardAnalyzer +from cratedb_toolkit.admin.xmover.model import ( + ShardRelocationConstraints, + ShardRelocationRequest, + ShardRelocationResponse, +) +from cratedb_toolkit.admin.xmover.operational.monitor import RecoveryMonitor, RecoveryOptions +from cratedb_toolkit.admin.xmover.util.database import CrateDBClient +from cratedb_toolkit.admin.xmover.util.format import format_size + +console = Console() + + +class ShardRelocationRecommender: + def __init__(self, client: CrateDBClient): + self.client = client + self.analyzer = ShardAnalyzer(self.client) + + def validate(self, request: ShardRelocationRequest): + # Parse schema and table + if "." not in request.schema_table: + console.print("[red]Error: Schema and table must be in format 'schema.table'[/red]") + return + + schema_name, table_name = request.schema_table.split(".", 1) + + console.print(Panel.fit("[bold blue]Validating Shard Move[/bold blue]")) + console.print( + f"[dim]Move: {schema_name}.{table_name}[{request.shard_id}] " + f"from {request.from_node} to {request.to_node}[/dim]" + ) + console.print() + + # Find the nodes + from_node_info = None + to_node_info = None + for node in self.analyzer.nodes: + if node.name == request.from_node: + from_node_info = node + if node.name == request.to_node: + to_node_info = node + + if not from_node_info: + console.print(f"[red]✗ Source node '{request.from_node}' not found in cluster[/red]") + return + + if not to_node_info: + console.print(f"[red]✗ Target node '{request.to_node}' not found in cluster[/red]") + return + + # Find the specific shard + target_shard = None + for shard in self.analyzer.shards: + if ( + shard.schema_name == schema_name + and shard.table_name == table_name + and shard.shard_id == request.shard_id + and shard.node_name == request.from_node + ): + target_shard = shard + break + + if not target_shard: + console.print(f"[red]✗ Shard {request.shard_id} not found on node {request.from_node}[/red]") + console.print("[dim]Use 'xmover find-candidates' to see available shards[/dim]") + return + + # Create a move recommendation for validation + recommendation = ShardRelocationResponse( + table_name=table_name, + schema_name=schema_name, + shard_id=request.shard_id, + from_node=request.from_node, + to_node=request.to_node, + from_zone=from_node_info.zone, + to_zone=to_node_info.zone, + shard_type=target_shard.shard_type, + size_gb=target_shard.size_gb, + reason="Manual validation", + ) + + # Display shard details + details_table = Table(title="Shard Details", box=box.ROUNDED) + details_table.add_column("Property", style="cyan") + details_table.add_column("Value", style="magenta") + + details_table.add_row("Table", f"{schema_name}.{table_name}") + details_table.add_row("Shard ID", str(request.shard_id)) + details_table.add_row("Type", target_shard.shard_type) + details_table.add_row("Size", format_size(target_shard.size_gb)) + details_table.add_row("Documents", f"{target_shard.num_docs:,}") + details_table.add_row("State", target_shard.state) + details_table.add_row("Routing State", target_shard.routing_state) + details_table.add_row("From Node", f"{request.from_node} ({from_node_info.zone})") + details_table.add_row("To Node", f"{request.to_node} ({to_node_info.zone})") + details_table.add_row("Zone Change", "Yes" if from_node_info.zone != to_node_info.zone else "No") + + console.print(details_table) + console.print() + + # Perform comprehensive validation + is_safe, safety_msg = self.analyzer.validate_move_safety( + recommendation, max_disk_usage_percent=request.max_disk_usage + ) + + if is_safe: + console.print("[green]✓ VALIDATION PASSED - Move appears safe[/green]") + console.print(f"[green]✓ {safety_msg}[/green]") + console.print() + + # Show the SQL command + console.print(Panel.fit("[bold green]Ready to Execute[/bold green]")) + console.print("[dim]# Copy and paste this command to execute the move[/dim]") + console.print() + console.print(f"{recommendation.to_sql()}") + console.print() + console.print("[dim]# Monitor shard health after execution[/dim]") + console.print( + "[dim]# Check with: SELECT * FROM sys.shards " + "WHERE table_name = '{table_name}' AND id = {shard_id};[/dim]" + ) + else: + console.print("[red]✗ VALIDATION FAILED - Move not safe[/red]") + console.print(f"[red]✗ {safety_msg}[/red]") + console.print() + + # Provide troubleshooting guidance + if "zone conflict" in safety_msg.lower(): + console.print("[yellow]💡 Troubleshooting Zone Conflicts:[/yellow]") + console.print(" • Check current shard distribution: xmover zone-analysis --show-shards") + console.print(" • Try moving to a different zone") + console.print(" • Verify cluster has proper zone-awareness configuration") + elif "node conflict" in safety_msg.lower(): + console.print("[yellow]💡 Troubleshooting Node Conflicts:[/yellow]") + console.print(" • The target node already has a copy of this shard") + console.print(" • Choose a different target node") + console.print(" • Check shard distribution: xmover analyze") + elif "space" in safety_msg.lower(): + console.print("[yellow]💡 Troubleshooting Space Issues:[/yellow]") + console.print(" • Free up space on the target node") + console.print(" • Choose a node with more available capacity") + console.print(" • Check node capacity: xmover analyze") + elif "usage" in safety_msg.lower(): + console.print("[yellow]💡 Troubleshooting High Disk Usage:[/yellow]") + console.print(" • Wait for target node disk usage to decrease") + console.print(" • Choose a node with lower disk usage") + console.print(" • Check cluster health: xmover analyze") + console.print(" • Consider using --max-disk-usage option for urgent moves") + + def execute( + self, + constraints: ShardRelocationConstraints, + auto_execute: bool, + validate: bool, + dry_run: bool, + ): + # Safety check for auto-execute + if auto_execute and dry_run: + console.print("[red]❌ Error: --auto-execute requires --execute flag[/red]") + console.print("[dim]Use: --execute --auto-execute[/dim]") + return + + mode_text = "DRY RUN - Analysis Only" if dry_run else "EXECUTION MODE" + console.print( + Panel.fit( + f"[bold blue]Generating Rebalancing Recommendations[/bold blue] - " + f"[bold {'green' if dry_run else 'red'}]{mode_text}[/bold {'green' if dry_run else 'red'}]" + ) + ) + console.print("[dim]Note: Only analyzing healthy shards (STARTED + 100% recovered) for safe operations[/dim]") + console.print("[dim]Zone conflict detection: Prevents moves that would violate CrateDB's zone awareness[/dim]") + if constraints.prioritize_space: + console.print("[dim]Mode: Prioritizing available space over zone balancing[/dim]") + else: + console.print("[dim]Mode: Prioritizing zone balancing over available space[/dim]") + + if constraints.source_node: + console.print(f"[dim]Filtering: Only showing moves from source node '{constraints.source_node}'[/dim]") + + console.print( + f"[dim]Safety thresholds: Max disk usage {constraints.max_disk_usage}%, " + f"Min free space {constraints.min_free_space}GB[/dim]" + ) + + if dry_run: + console.print("[green]Running in DRY RUN mode - no SQL commands will be generated[/green]") + else: + console.print("[red]EXECUTION MODE - SQL commands will be generated for actual moves[/red]") + console.print() + + recommendations = self.analyzer.generate_rebalancing_recommendations(constraints=constraints) + + if not recommendations: + if constraints.source_node: + console.print(f"[yellow]No safe recommendations found for node '{constraints.source_node}'[/yellow]") + console.print("[dim]This could be due to:[/dim]") + console.print("[dim] • Zone conflicts preventing safe moves[/dim]") + console.print( + f"[dim] • Target nodes exceeding {constraints.max_disk_usage}% disk usage threshold[/dim]" + ) + console.print( + f"[dim] • Insufficient free space on target nodes (need {constraints.min_free_space}GB)[/dim]" + ) + console.print(f"[dim] • No shards in size range {constraints.min_size}-{constraints.max_size}GB[/dim]") + console.print("[dim]Suggestions:[/dim]") + console.print("[dim] • Try: --max-disk-usage 95 (allow higher disk usage)[/dim]") + console.print("[dim] • Try: --min-free-space 50 (reduce space requirements)[/dim]") + console.print("[dim] • Try: different size ranges or remove --node filter[/dim]") + else: + console.print("[green]No rebalancing recommendations needed. Cluster appears well balanced![/green]") + return + + # Show recommendations table + rec_table = Table(title=f"Rebalancing Recommendations ({len(recommendations)} moves)", box=box.ROUNDED) + rec_table.add_column("Table", style="cyan") + rec_table.add_column("Shard", justify="right", style="magenta") + rec_table.add_column("Type", style="blue") + rec_table.add_column("From Node", style="red") + rec_table.add_column("To Node", style="green") + rec_table.add_column("Target Free Space", justify="right", style="cyan") + rec_table.add_column("Zone Change", style="yellow") + rec_table.add_column("Size", justify="right", style="white") + rec_table.add_column("Reason", style="dim") + if validate: + rec_table.add_column("Safety Check", style="bold") + + # Create a mapping of node names to available space for display + node_space_map = {node.name: node.available_space_gb for node in self.analyzer.nodes} + + for rec in recommendations: + zone_change = f"{rec.from_zone} → {rec.to_zone}" if rec.from_zone != rec.to_zone else rec.from_zone + target_free_space = node_space_map.get(rec.to_node, 0) + + row = [ + f"{rec.schema_name}.{rec.table_name}", + str(rec.shard_id), + rec.shard_type, + rec.from_node, + rec.to_node, + format_size(target_free_space), + zone_change, + format_size(rec.size_gb), + rec.reason, + ] + + if validate: + is_safe, safety_msg = self.analyzer.validate_move_safety( + rec, max_disk_usage_percent=constraints.max_disk_usage + ) + safety_status = "[green]✓ SAFE[/green]" if is_safe else f"[red]✗ {safety_msg}[/red]" + row.append(safety_status) + + rec_table.add_row(*row) + + console.print(rec_table) + console.print() + + # Generate SQL commands or show dry-run analysis + if dry_run: + console.print(Panel.fit("[bold yellow]Dry Run Analysis - No Commands Generated[/bold yellow]")) + console.print("[dim]# This is a dry run - showing what would be recommended[/dim]") + console.print("[dim]# Use --execute flag to generate actual SQL commands[/dim]") + console.print() + + safe_moves = 0 + zone_conflicts = 0 + space_issues = 0 + + for i, rec in enumerate(recommendations, 1): + if validate: + is_safe, safety_msg = self.analyzer.validate_move_safety( + rec, max_disk_usage_percent=constraints.max_disk_usage + ) + if not is_safe: + if "zone conflict" in safety_msg.lower(): + zone_conflicts += 1 + console.print(f"[yellow]⚠ Move {i}: WOULD BE SKIPPED - {safety_msg}[/yellow]") + elif "space" in safety_msg.lower(): + space_issues += 1 + console.print(f"[yellow]⚠ Move {i}: WOULD BE SKIPPED - {safety_msg}[/yellow]") + else: + console.print(f"[yellow]⚠ Move {i}: WOULD BE SKIPPED - {safety_msg}[/yellow]") + continue + safe_moves += 1 + + console.print(f"[green]✓ Move {i}: WOULD EXECUTE - {rec.reason}[/green]") + console.print(f"[dim] Target SQL: {rec.to_sql()}[/dim]") + + console.print() + console.print("[bold]Dry Run Summary:[/bold]") + console.print(f" • Safe moves that would execute: [green]{safe_moves}[/green]") + console.print(f" • Zone conflicts prevented: [yellow]{zone_conflicts}[/yellow]") + console.print(f" • Space-related issues: [yellow]{space_issues}[/yellow]") + if safe_moves > 0: + console.print( + f"\n[green]✓ Ready to execute {safe_moves} safe moves. " + f"Use --execute to generate SQL commands.[/green]" + ) + else: + console.print( + "\n[yellow]⚠ No safe moves identified. Review cluster balance or adjust parameters.[/yellow]" + ) + else: + console.print(Panel.fit("[bold green]Generated SQL Commands[/bold green]")) + console.print("[dim]# Copy and paste these commands to execute the moves[/dim]") + console.print("[dim]# ALWAYS test in a non-production environment first![/dim]") + console.print("[dim]# These commands only operate on healthy shards (STARTED + fully recovered)[/dim]") + console.print("[dim]# Commands use quoted identifiers for schema and table names[/dim]") + console.print() + + safe_moves = 0 + zone_conflicts = 0 + for i, rec in enumerate(recommendations, 1): + if validate: + is_safe, safety_msg = self.analyzer.validate_move_safety( + rec, max_disk_usage_percent=constraints.max_disk_usage + ) + if not is_safe: + if "Zone conflict" in safety_msg: + zone_conflicts += 1 + console.print(f"-- Move {i}: SKIPPED - {safety_msg}") + console.print( + "-- Tip: Try moving to a different zone or check existing shard distribution" + ) + else: + console.print(f"-- Move {i}: SKIPPED - {safety_msg}") + continue + safe_moves += 1 + + console.print(f"-- Move {i}: {rec.reason}") + console.print(f"{rec.to_sql()}") + console.print() + + # Auto-execution if requested + if auto_execute: + self._execute_recommendations_safely(recommendations, validate) + + if validate and safe_moves < len(recommendations): + if zone_conflicts > 0: + console.print(f"[yellow]Warning: {zone_conflicts} moves skipped due to zone conflicts[/yellow]") + console.print( + "[yellow]Tip: Use 'find-candidates' to see current shard distribution across zones[/yellow]" + ) + console.print( + f"[yellow]Warning: Only {safe_moves} of {len(recommendations)} moves passed safety validation[/yellow]" + ) + + def _execute_recommendations_safely(self, recommendations, validate: bool): + """Execute recommendations with extensive safety measures""" + + # Filter to only safe recommendations + safe_recommendations = [] + if validate: + for rec in recommendations: + is_safe, safety_msg = self.analyzer.validate_move_safety(rec, max_disk_usage_percent=95.0) + if is_safe: + safe_recommendations.append(rec) + else: + safe_recommendations = recommendations + + if not safe_recommendations: + console.print("[yellow]⚠ No safe recommendations to execute[/yellow]") + return + + console.print("\n[bold red]🚨 AUTO-EXECUTION MODE 🚨[/bold red]") + console.print(f"About to execute {len(safe_recommendations)} shard moves automatically:") + console.print() + + # Show what will be executed + for i, rec in enumerate(safe_recommendations, 1): + table_display = f"{rec.schema_name}.{rec.table_name}" if rec.schema_name != "doc" else rec.table_name + console.print( + f" {i}. {table_display} S{rec.shard_id} ({rec.size_gb:.1f}GB) {rec.from_node} → {rec.to_node}" + ) + + console.print() + console.print("[bold yellow]⚠ SAFETY WARNINGS:[/bold yellow]") + console.print(" • These commands will immediately start shard movements") + console.print(" • Each move will temporarily impact cluster performance") + console.print(" • Recovery time depends on shard size and network speed") + console.print(" • You should monitor progress with: xmover monitor-recovery --watch") + console.print() + + # Double confirmation + try: + response1 = input("Type 'EXECUTE' to proceed with automatic execution: ").strip() + if response1 != "EXECUTE": + console.print("[yellow]❌ Execution cancelled[/yellow]") + return + + response2 = input(f"Confirm: Execute {len(safe_recommendations)} shard moves? (yes/no): ").strip().lower() + if response2 not in ["yes", "y"]: + console.print("[yellow]❌ Execution cancelled[/yellow]") + return + + except KeyboardInterrupt: + console.print("\n[yellow]❌ Execution cancelled by user[/yellow]") + return + + console.print(f"\n🚀 [bold green]Executing {len(safe_recommendations)} shard moves...[/bold green]") + console.print() + + successful_moves = 0 + failed_moves = 0 + + for i, rec in enumerate(safe_recommendations, 1): + table_display = f"{rec.schema_name}.{rec.table_name}" if rec.schema_name != "doc" else rec.table_name + sql_command = rec.to_sql() + + console.print( + f"[{i}/{len(safe_recommendations)}] Executing: {table_display} S{rec.shard_id} ({rec.size_gb:.1f}GB)" + ) + console.print(f" {rec.from_node} → {rec.to_node}") + + try: + # Execute the SQL command + result = self.client.execute_query(sql_command) + + if result.get("rowcount", 0) >= 0: # Success indicator for ALTER statements + console.print(" [green]✅ SUCCESS[/green] - Move initiated") + successful_moves += 1 + + # Smart delay: check active recoveries before next move + if i < len(safe_recommendations): + self._wait_for_recovery_capacity(max_concurrent_recoveries=5) + else: + console.print(f" [red]❌ FAILED[/red] - Unexpected result: {result}") + failed_moves += 1 + + except Exception as e: + console.print(f" [red]❌ FAILED[/red] - Error: {e}") + failed_moves += 1 + + # Ask whether to continue after a failure + if i < len(safe_recommendations): + try: + continue_response = ( + input(f" Continue with remaining {len(safe_recommendations) - i} moves? (yes/no): ") + .strip() + .lower() + ) + if continue_response not in ["yes", "y"]: + console.print("[yellow]⏹ Execution stopped by user[/yellow]") + break + except KeyboardInterrupt: + console.print("\n[yellow]⏹ Execution stopped by user[/yellow]") + break + + console.print() + + # Final summary + console.print("📊 [bold]Execution Summary:[/bold]") + console.print(f" Successful moves: [green]{successful_moves}[/green]") + console.print(f" Failed moves: [red]{failed_moves}[/red]") + console.print(f" Total attempted: {successful_moves + failed_moves}") + + if successful_moves > 0: + console.print() + console.print("[green]✅ Shard moves initiated successfully![/green]") + console.print("[dim]💡 Monitor progress with:[/dim]") + console.print("[dim] xmover monitor-recovery --watch[/dim]") + console.print("[dim]💡 Check cluster status with:[/dim]") + console.print("[dim] xmover analyze[/dim]") + + if failed_moves > 0: + console.print() + console.print(f"[yellow]⚠ {failed_moves} moves failed - check cluster status and retry if needed[/yellow]") + + def _wait_for_recovery_capacity(self, max_concurrent_recoveries: int = 5): + """Wait until active recovery count is below threshold""" + + recovery_monitor = RecoveryMonitor(self.client, RecoveryOptions(include_transitioning=True)) + wait_time = 0 + + while True: + # Check active recoveries (including transitioning) + recoveries = recovery_monitor.get_cluster_recovery_status() + active_count = len([r for r in recoveries if r.overall_progress < 100.0 or r.stage != "DONE"]) + status = f"{active_count}/{max_concurrent_recoveries}" + if active_count < max_concurrent_recoveries: + if wait_time > 0: + console.print(f" [green]✓ Recovery capacity available ({status} active)[/green]") + break + if wait_time == 0: + console.print(f" [yellow]⏳ Waiting for recovery capacity... ({status} active)[/yellow]") + elif wait_time % 30 == 0: # Update every 30 seconds + console.print(f" [yellow]⏳ Still waiting... ({status} active)[/yellow]") + + time.sleep(10) # Check every 10 seconds + wait_time += 10 diff --git a/cratedb_toolkit/admin/xmover/util/__init__.py b/cratedb_toolkit/admin/xmover/util/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/admin/xmover/util/database.py b/cratedb_toolkit/admin/xmover/util/database.py new file mode 100644 index 00000000..21950ab0 --- /dev/null +++ b/cratedb_toolkit/admin/xmover/util/database.py @@ -0,0 +1,498 @@ +""" +Database connection and query functions for CrateDB +""" + +import logging +import os +from typing import Any, Dict, List, Optional, Union + +import requests +import urllib3 +from dotenv import load_dotenv + +from cratedb_toolkit.admin.xmover.model import NodeInfo, RecoveryInfo, ShardInfo + +logger = logging.getLogger(__name__) + + +class CrateDBClient: + """Client for connecting to CrateDB and executing queries""" + + def __init__(self, connection_string: Optional[str] = None): + load_dotenv() + + self.connection_string: str = ( + connection_string or os.getenv("CRATE_CONNECTION_STRING") or "http://localhost:4200" + ) + if not self.connection_string: + raise ValueError("CRATE_CONNECTION_STRING not found in environment or provided") + + self.username = os.getenv("CRATE_USERNAME") + self.password = os.getenv("CRATE_PASSWORD") + self.ssl_verify = os.getenv("CRATE_SSL_VERIFY", "true").lower() == "true" + + # Suppress SSL warnings when SSL verification is disabled + if not self.ssl_verify: + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + # Ensure connection string ends with _sql endpoint + if not self.connection_string.endswith("/_sql"): + self.connection_string = self.connection_string.rstrip("/") + "/_sql" + + def execute_query(self, query: str, parameters: Optional[List] = None) -> Dict[str, Any]: + """Execute a SQL query against CrateDB""" + payload: Dict[str, Any] = {"stmt": query} + + if parameters: + payload["args"] = parameters + + auth = None + if self.username and self.password: + auth = (self.username, self.password) + + try: + response = requests.post( + self.connection_string, json=payload, auth=auth, verify=self.ssl_verify, timeout=30 + ) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + raise Exception(f"Failed to execute query: {e}") from e + + def get_nodes_info(self) -> List[NodeInfo]: + """Get information about all nodes in the cluster""" + query = """ + SELECT + id, + name, + attributes['zone'] as zone, + heap['used'] as heap_used, + heap['max'] as heap_max, + fs['total']['size'] as fs_total, + fs['total']['used'] as fs_used, + fs['total']['available'] as fs_available + FROM sys.nodes + WHERE name IS NOT NULL + ORDER BY name + """ + + result = self.execute_query(query) + nodes = [] + + for row in result.get("rows", []): + nodes.append( + NodeInfo( + id=row[0], + name=row[1], + zone=row[2] or "unknown", + heap_used=row[3] or 0, + heap_max=row[4] or 0, + fs_total=row[5] or 0, + fs_used=row[6] or 0, + fs_available=row[7] or 0, + ) + ) + + return nodes + + def get_shards_info( + self, + table_name: Optional[str] = None, + min_size_gb: Optional[float] = None, + max_size_gb: Optional[float] = None, + for_analysis: bool = False, + ) -> List[ShardInfo]: + """Get information about shards, optionally filtered by table and size + + Args: + table_name: Filter by specific table + min_size_gb: Minimum shard size in GB + max_size_gb: Maximum shard size in GB + for_analysis: If True, includes all shards regardless of state (for cluster analysis) + If False, only includes healthy shards suitable for operations + """ + + where_conditions = [] + if not for_analysis: + # For operations, only include healthy shards + where_conditions.extend(["s.routing_state = 'STARTED'", "s.recovery['files']['percent'] = 100.0"]) + parameters: List[Union[str, int, Dict]] = [] + + if table_name: + where_conditions.append("s.table_name = ?") + parameters.append(table_name) + + if min_size_gb is not None: + where_conditions.append("s.size >= ?") + parameters.append(int(min_size_gb * 1024**3)) # Convert GB to bytes + + if max_size_gb is not None: + where_conditions.append("s.size <= ?") + parameters.append(int(max_size_gb * 1024**3)) # Convert GB to bytes + + where_clause = "" + if where_conditions: + where_clause = f"WHERE {' AND '.join(where_conditions)}" + + query = f""" + SELECT + s.table_name, + s.schema_name, + s.id as shard_id, + s.node['id'] as node_id, + s.node['name'] as node_name, + n.attributes['zone'] as zone, + s."primary" as is_primary, + s.size as size_bytes, + s.size / 1024.0^3 as size_gb, + s.num_docs, + s.state, + s.routing_state + FROM sys.shards s + JOIN sys.nodes n ON s.node['id'] = n.id + {where_clause} + ORDER BY s.table_name, s.schema_name, s.id, s."primary" DESC + """ # noqa: S608 + + result = self.execute_query(query, parameters) + shards = [] + + for row in result.get("rows", []): + shards.append( + ShardInfo( + table_name=row[0], + schema_name=row[1], + shard_id=row[2], + node_id=row[3], + node_name=row[4], + zone=row[5] or "unknown", + is_primary=row[6], + size_bytes=row[7] or 0, + size_gb=float(row[8] or 0), + num_docs=row[9] or 0, + state=row[10], + routing_state=row[11], + ) + ) + + return shards + + def get_shard_distribution_summary(self, for_analysis: bool = True) -> Dict[str, Any]: + """Get a summary of shard distribution across nodes and zones + + Args: + for_analysis: If True, includes all shards for complete cluster analysis + If False, only includes operational shards + """ + where_clause = "" + if not for_analysis: + where_clause = """ + WHERE s.routing_state = 'STARTED' + AND s.recovery['files']['percent'] = 100.0""" + + query = f""" + SELECT + n.attributes['zone'] as zone, + s.node['name'] as node_name, + CASE WHEN s."primary" = true THEN 'PRIMARY' ELSE 'REPLICA' END as shard_type, + COUNT(*) as shard_count, + SUM(s.size) / 1024.0^3 as total_size_gb, + AVG(s.size) / 1024.0^3 as avg_size_gb + FROM sys.shards s + JOIN sys.nodes n ON s.node['id'] = n.id{where_clause} + GROUP BY n.attributes['zone'], s.node['name'], s."primary" + ORDER BY zone, node_name, shard_type DESC + """ # noqa: S608 + + result = self.execute_query(query) + + summary: Dict[str, Any] = { + "by_zone": {}, + "by_node": {}, + "totals": {"primary": 0, "replica": 0, "total_size_gb": 0}, + } + + for row in result.get("rows", []): + zone = row[0] or "unknown" + node_name = row[1] + shard_type = row[2] + shard_count = row[3] + total_size_gb = float(row[4] or 0) + avg_size_gb = float(row[5] or 0) # noqa: F841 + + # By zone summary + if zone not in summary["by_zone"]: + summary["by_zone"][zone] = {"PRIMARY": 0, "REPLICA": 0, "total_size_gb": 0} + summary["by_zone"][zone][shard_type] += shard_count + summary["by_zone"][zone]["total_size_gb"] += total_size_gb + + # By node summary + if node_name not in summary["by_node"]: + summary["by_node"][node_name] = {"zone": zone, "PRIMARY": 0, "REPLICA": 0, "total_size_gb": 0} + summary["by_node"][node_name][shard_type] += shard_count + summary["by_node"][node_name]["total_size_gb"] += total_size_gb + + # Overall totals + if shard_type == "PRIMARY": + summary["totals"]["primary"] += shard_count + else: + summary["totals"]["replica"] += shard_count + summary["totals"]["total_size_gb"] += total_size_gb + + return summary + + def test_connection(self) -> bool: + """Test the connection to CrateDB""" + try: + result = self.execute_query("SELECT 1") + return result.get("rowcount", 0) >= 0 + except Exception: + return False + + def get_cluster_watermarks(self) -> Dict[str, Any]: + """Get cluster disk watermark settings""" + query = """ + SELECT settings['cluster']['routing']['allocation']['disk']['watermark'] + FROM sys.cluster + """ + + try: + result = self.execute_query(query) + if result.get("rows"): + watermarks = result["rows"][0][0] or {} + return { + "low": watermarks.get("low", "Not set"), + "high": watermarks.get("high", "Not set"), + "flood_stage": watermarks.get("flood_stage", "Not set"), + "enable_for_single_data_node": watermarks.get("enable_for_single_data_node", "Not set"), + } + return {} + except Exception: + return {} + + def get_active_recoveries( + self, table_name: Optional[str] = None, node_name: Optional[str] = None + ) -> List[Dict[str, Any]]: + """Get shards that are currently in recovery states from sys.allocations""" + + where_conditions = ["current_state != 'STARTED'"] + parameters = [] + + if table_name: + where_conditions.append("table_name = ?") + parameters.append(table_name) + + if node_name: + where_conditions.append("node_id = (SELECT id FROM sys.nodes WHERE name = ?)") + parameters.append(node_name) + + where_clause = f"WHERE {' AND '.join(where_conditions)}" + + query = f""" + SELECT + table_name, + shard_id, + current_state, + explanation, + node_id + FROM sys.allocations + {where_clause} + ORDER BY current_state, table_name, shard_id + """ # noqa: S608 + + result = self.execute_query(query, parameters) + + allocations = [] + for row in result.get("rows", []): + allocations.append( + { + "schema_name": "doc", # Default schema since not available in sys.allocations + "table_name": row[0], + "shard_id": row[1], + "current_state": row[2], + "explanation": row[3], + "node_id": row[4], + } + ) + + return allocations + + def get_recovery_details(self, schema_name: str, table_name: str, shard_id: int) -> Optional[Dict[str, Any]]: + """Get detailed recovery information for a specific shard from sys.shards""" + + # Query for shards that are actively recovering (not completed) + query = """ + SELECT + s.table_name, + s.schema_name, + s.id as shard_id, + s.node['name'] as node_name, + s.node['id'] as node_id, + s.routing_state, + s.state, + s.recovery, + s.size, + s."primary", + s.translog_stats['size'] as translog_size + FROM sys.shards s + WHERE s.table_name = ? AND s.id = ? + AND (s.state = 'RECOVERING' OR s.routing_state IN ('INITIALIZING', 'RELOCATING')) + ORDER BY s.schema_name + LIMIT 1 + """ + + result = self.execute_query(query, [table_name, shard_id]) + + if not result.get("rows"): + return None + + row = result["rows"][0] + return { + "table_name": row[0], + "schema_name": row[1], + "shard_id": row[2], + "node_name": row[3], + "node_id": row[4], + "routing_state": row[5], + "state": row[6], + "recovery": row[7], + "size": row[8], + "primary": row[9], + "translog_size": row[10] or 0, + } + + def get_all_recovering_shards( + self, table_name: Optional[str] = None, node_name: Optional[str] = None, include_transitioning: bool = False + ) -> List[RecoveryInfo]: + """Get comprehensive recovery information by combining sys.allocations and sys.shards data""" + + # Step 1: Get active recoveries from allocations (efficient) + active_allocations = self.get_active_recoveries(table_name, node_name) + + if not active_allocations: + return [] + + recoveries = [] + + # Step 2: Get detailed recovery info for each active recovery + for allocation in active_allocations: + recovery_detail = self.get_recovery_details( + allocation["schema_name"], # This will be 'doc' default + allocation["table_name"], + allocation["shard_id"], + ) + + if recovery_detail and recovery_detail.get("recovery"): + # Update allocation with actual schema from sys.shards + allocation["schema_name"] = recovery_detail["schema_name"] + recovery_info = self._parse_recovery_info(allocation, recovery_detail) + + # Filter out completed recoveries unless include_transitioning is True + if include_transitioning or not self._is_recovery_completed(recovery_info): + recoveries.append(recovery_info) + + # Sort by recovery type, then by progress + return sorted(recoveries, key=lambda r: (r.recovery_type, -r.overall_progress)) + + def _parse_recovery_info(self, allocation: Dict[str, Any], shard_detail: Dict[str, Any]) -> RecoveryInfo: + """Parse recovery information from allocation and shard data""" + + recovery = shard_detail.get("recovery", {}) + + # Extract recovery progress information + files_info = recovery.get("files", {}) + size_info = recovery.get("size", {}) + + files_percent = float(files_info.get("percent", 0.0)) + bytes_percent = float(size_info.get("percent", 0.0)) + + # Calculate actual progress based on recovered vs used + files_recovered = files_info.get("recovered", 0) + files_used = files_info.get("used", 1) # Avoid division by zero + size_recovered = size_info.get("recovered", 0) + size_used = size_info.get("used", 1) # Avoid division by zero + + # Use actual progress if different from reported percent + actual_files_percent = (files_recovered / files_used * 100.0) if files_used > 0 else files_percent + actual_size_percent = (size_recovered / size_used * 100.0) if size_used > 0 else bytes_percent + + # Use the more conservative (lower) progress value + final_files_percent = min(files_percent, actual_files_percent) + final_bytes_percent = min(bytes_percent, actual_size_percent) + + # Get source node for PEER recoveries + source_node = None + if recovery.get("type") == "PEER": + source_node = self._find_source_node_for_recovery( + shard_detail["schema_name"], + shard_detail["table_name"], + shard_detail["shard_id"], + shard_detail["node_id"], + ) + + return RecoveryInfo( + schema_name=shard_detail["schema_name"], + table_name=shard_detail["table_name"], + shard_id=shard_detail["shard_id"], + node_name=shard_detail["node_name"], + node_id=shard_detail["node_id"], + recovery_type=recovery.get("type", "UNKNOWN"), + stage=recovery.get("stage", "UNKNOWN"), + files_percent=final_files_percent, + bytes_percent=final_bytes_percent, + total_time_ms=recovery.get("total_time", 0), + routing_state=shard_detail["routing_state"], + current_state=allocation["current_state"], + is_primary=shard_detail["primary"], + size_bytes=shard_detail.get("size", 0), + source_node_name=source_node, + translog_size_bytes=shard_detail.get("translog_size", 0), + ) + + def _find_source_node_for_recovery( + self, schema_name: str, table_name: str, shard_id: int, target_node_id: str + ) -> Optional[str]: + """Find source node for PEER recovery by looking for primary or other replicas""" + try: + # First try to find the primary shard of the same table/shard + query = """ + SELECT node['name'] as node_name + FROM sys.shards + WHERE schema_name = ? AND table_name = ? AND id = ? + AND state = 'STARTED' AND node['id'] != ? + AND "primary" = true + LIMIT 1 + """ + + result = self.execute_query(query, [schema_name, table_name, shard_id, target_node_id]) + + if result.get("rows"): + return result["rows"][0][0] + + # If no primary found, look for any started replica + query_replica = """ + SELECT node['name'] as node_name + FROM sys.shards + WHERE schema_name = ? AND table_name = ? AND id = ? + AND state = 'STARTED' AND node['id'] != ? + LIMIT 1 + """ + + result = self.execute_query(query_replica, [schema_name, table_name, shard_id, target_node_id]) + + if result.get("rows"): + return result["rows"][0][0] + + except Exception: + # If query fails, just return None + logger.warning("Failed to find source node for recovery", exc_info=True) + + return None + + def _is_recovery_completed(self, recovery_info: RecoveryInfo) -> bool: + """Check if a recovery is completed but still transitioning""" + return ( + recovery_info.stage == "DONE" + and recovery_info.files_percent >= 100.0 + and recovery_info.bytes_percent >= 100.0 + ) diff --git a/cratedb_toolkit/admin/xmover/util/error.py b/cratedb_toolkit/admin/xmover/util/error.py new file mode 100644 index 00000000..11dd5f39 --- /dev/null +++ b/cratedb_toolkit/admin/xmover/util/error.py @@ -0,0 +1,133 @@ +from typing import List, Optional, cast + +from rich.console import Console +from rich.panel import Panel + +console = Console() + + +def explain_cratedb_error(error_message: Optional[str]): + console.print(Panel.fit("[bold blue]CrateDB Error Message Decoder[/bold blue]")) + console.print("[dim]Helps decode and troubleshoot CrateDB shard allocation errors[/dim]") + console.print() + + if not error_message: + console.print("Please paste the CrateDB error message (press Enter twice when done):") + lines: List[str] = [] + while True: + try: + line = input() + if line.strip() == "" and lines: + break + lines.append(line) + except (EOFError, KeyboardInterrupt): + break + error_message = "\n".join(lines) + + if not error_message.strip(): + console.print("[yellow]No error message provided[/yellow]") + return + + console.print("[dim]Analyzing error message...[/dim]") + console.print() + + # Common CrateDB allocation error patterns and solutions + error_patterns = [ + { + "pattern": "a copy of this shard is already allocated to this node", + "title": "Node Already Has Shard Copy", + "explanation": "The target node already contains a copy (primary or replica) of this shard.", + "solutions": [ + "Choose a different target node that doesn't have this shard", + "Use 'xmover zone-analysis --show-shards' to see current distribution", + "Verify the shard ID and table name are correct", + ], + "prevention": "Always check current shard locations before moving", + }, + { + "pattern": "there are too many copies of the shard allocated to nodes with attribute", + "title": "Zone Allocation Limit Exceeded", + "explanation": "CrateDB's zone awareness prevents too many copies in the same zone.", + "solutions": [ + "Move the shard to a different availability zone", + "Check zone balance with 'xmover check-balance'", + "Ensure target zone doesn't already have copies of this shard", + ], + "prevention": "Use 'xmover recommend' which respects zone constraints", + }, + { + "pattern": "not enough disk space", + "title": "Insufficient Disk Space", + "explanation": "The target node doesn't have enough free disk space for the shard.", + "solutions": [ + "Free up space on the target node", + "Choose a node with more available capacity", + "Check available space with 'xmover analyze'", + ], + "prevention": "Use '--min-free-space' parameter in recommendations", + }, + { + "pattern": "shard recovery limit", + "title": "Recovery Limit Exceeded", + "explanation": "Too many shards are currently being moved/recovered simultaneously.", + "solutions": [ + "Wait for current recoveries to complete", + "Check recovery status in CrateDB admin UI", + "Reduce concurrent recoveries in cluster settings", + ], + "prevention": "Move shards gradually, monitor recovery progress", + }, + { + "pattern": "allocation is disabled", + "title": "Allocation Disabled", + "explanation": "Shard allocation is temporarily disabled in the cluster.", + "solutions": [ + "Re-enable allocation: PUT /_cluster/settings " + '{"persistent":{"cluster.routing.allocation.enable":"all"}}', + "Check if allocation was disabled for maintenance", + "Verify cluster health before re-enabling", + ], + "prevention": "Check allocation status before performing moves", + }, + ] + + # Find matching patterns + matches = [] + error_lower = error_message.lower() + + for pattern_info in error_patterns: + if cast(str, pattern_info["pattern"]).lower() in error_lower: + matches.append(pattern_info) + + if matches: + for i, match in enumerate(matches): + if i > 0: + console.print("\n" + "─" * 60 + "\n") + + console.print(f"[bold red]🚨 {match['title']}[/bold red]") + console.print(f"[yellow]📝 Explanation:[/yellow] {match['explanation']}") + console.print() + + console.print("[green]💡 Solutions:[/green]") + for j, solution in enumerate(match["solutions"], 1): + console.print(f" {j}. {solution}") + console.print() + + console.print(f"[blue]🛡️ Prevention:[/blue] {match['prevention']}") + else: + console.print("[yellow]⚠ No specific pattern match found[/yellow]") + console.print() + console.print("[bold]General Troubleshooting Steps:[/bold]") + console.print("1. Check current shard distribution: [cyan]xmover analyze[/cyan]") + console.print( + "2. Validate the specific move: [cyan]xmover validate-move schema.table shard_id from_node to_node[/cyan]" + ) + console.print("3. Check zone conflicts: [cyan]xmover zone-analysis --show-shards[/cyan]") + console.print("4. Verify node capacity: [cyan]xmover analyze[/cyan]") + console.print("5. Review CrateDB documentation on shard allocation") + + console.print() + console.print("[dim]💡 Tip: Use 'xmover validate-move' to check moves before execution[/dim]") + console.print( + "[dim]📚 For more help: https://crate.io/docs/crate/reference/en/latest/admin/system-information.html[/dim]" + ) diff --git a/cratedb_toolkit/admin/xmover/util/format.py b/cratedb_toolkit/admin/xmover/util/format.py new file mode 100644 index 00000000..82c8a3d0 --- /dev/null +++ b/cratedb_toolkit/admin/xmover/util/format.py @@ -0,0 +1,45 @@ +def format_size(size_gb: float) -> str: + """Format size in GB with appropriate precision""" + if size_gb >= 1000: + return f"{size_gb / 1000:.1f}TB" + elif size_gb >= 1: + return f"{size_gb:.1f}GB" + else: + return f"{size_gb * 1000:.0f}MB" + + +def format_percentage(value: float) -> str: + """Format percentage with color coding""" + color = "green" + if value > 80: + color = "red" + elif value > 70: + color = "yellow" + return f"[{color}]{value:.1f}%[/{color}]" + + +def format_translog_info(recovery_info) -> str: + """Format translog size information with color coding""" + tl_bytes = recovery_info.translog_size_bytes + + # Only show if significant (>10MB for production) + if tl_bytes < 10 * 1024 * 1024: # 10MB for production + return "" + + tl_gb = recovery_info.translog_size_gb + + # Color coding based on size + if tl_gb >= 5.0: + color = "red" + elif tl_gb >= 1.0: + color = "yellow" + else: + color = "green" + + # Format size + if tl_gb >= 1.0: + size_str = f"{tl_gb:.1f}GB" + else: + size_str = f"{tl_gb * 1000:.0f}MB" + + return f" [dim]([{color}]TL:{size_str}[/{color}])[/dim]" diff --git a/cratedb_toolkit/cli.py b/cratedb_toolkit/cli.py index 4e8e17c2..80e0b395 100644 --- a/cratedb_toolkit/cli.py +++ b/cratedb_toolkit/cli.py @@ -4,6 +4,7 @@ from cratedb_toolkit.util.cli import boot_click from .adapter.rockset.cli import cli as rockset_cli +from .admin.xmover.cli import main as admin_xmover_cli from .cfr.cli import cli as cfr_cli from .cluster.cli import cli as cloud_cli from .cmd.tail.cli import cli as tail_cli @@ -27,6 +28,7 @@ def cli(ctx: click.Context, verbose: bool, debug: bool): return boot_click(ctx, verbose, debug) +cli.add_command(admin_xmover_cli, name="xmover") cli.add_command(info_cli, name="info") cli.add_command(cfr_cli, name="cfr") cli.add_command(cloud_cli, name="cluster") diff --git a/doc/admin/index.md b/doc/admin/index.md new file mode 100644 index 00000000..d36c00e1 --- /dev/null +++ b/doc/admin/index.md @@ -0,0 +1,7 @@ +# Administrative Utilities + +```{toctree} +:maxdepth: 1 + +xmover/index +``` diff --git a/doc/admin/xmover/handbook.md b/doc/admin/xmover/handbook.md new file mode 100644 index 00000000..cf9b4abe --- /dev/null +++ b/doc/admin/xmover/handbook.md @@ -0,0 +1,487 @@ +(xmover-handbook)= +# XMover Handbook + +## Installation + +Install using uv (recommended) or pip: +```bash +uv tool install cratedb-toolkit + +# Alternatively use `pip`. +# pip install --user cratedb-toolkit +``` + +Create an `.env` file with your CrateDB connection details: +```bash +CRATE_CONNECTION_STRING=https://your-cluster.cratedb.net:4200 +CRATE_USERNAME=your-username +CRATE_PASSWORD=your-password +CRATE_SSL_VERIFY=true +``` + +## Quick Start + +### Test Connection +```bash +xmover test-connection +``` + +### Analyze Cluster +```bash +# Complete cluster analysis +xmover analyze + +# Analyze specific table +xmover analyze --table my_table +``` + +### Find Movement Candidates +```bash +# Find shards that can be moved (40-60GB by default) +xmover find-candidates + +# Custom size range +xmover find-candidates --min-size 20 --max-size 100 +``` + +### Generate Recommendations +```bash +# Dry run (default) - shows what would be recommended +xmover recommend + +# Generate actual SQL commands +xmover recommend --execute + +# Prioritize space over zone balancing +xmover recommend --prioritize-space +``` + +### Zone Analysis +```bash +# Check zone balance +xmover check-balance + +# Detailed zone analysis with shard-level details +xmover zone-analysis --show-shards +``` + +### Advanced Troubleshooting +```bash +# Validate specific moves before execution +xmover validate-move SCHEMA.TABLE SHARD_ID FROM_NODE TO_NODE + +# Explain CrateDB error messages +xmover explain-error "your error message here" +``` + +## Commands Reference + +### `analyze` +Analyzes current shard distribution across nodes and zones. + +**Options:** +- `--table, -t`: Analyze specific table only + +**Example:** +```bash +xmover analyze --table events +``` + +### `find-candidates` +Finds shards suitable for movement based on size and health criteria. + +**Options:** +- `--table, -t`: Find candidates in specific table only +- `--min-size`: Minimum shard size in GB (default: 40) +- `--max-size`: Maximum shard size in GB (default: 60) +- `--node`: Only show candidates from this specific source node (e.g., data-hot-4) + +**Examples:** +```bash +# Find candidates in size range for specific table +xmover find-candidates --min-size 20 --max-size 50 --table logs + +# Find candidates on a specific node +xmover find-candidates --min-size 30 --max-size 60 --node data-hot-4 +``` + +### `recommend` +Generates intelligent shard movement recommendations for cluster rebalancing. + +**Options:** +- `--table, -t`: Generate recommendations for specific table only +- `--min-size`: Minimum shard size in GB (default: 40) +- `--max-size`: Maximum shard size in GB (default: 60) +- `--zone-tolerance`: Zone balance tolerance percentage (default: 10) +- `--min-free-space`: Minimum free space required on target nodes in GB (default: 100) +- `--max-moves`: Maximum number of move recommendations (default: 10) +- `--max-disk-usage`: Maximum disk usage percentage for target nodes (default: 85) +- `--validate/--no-validate`: Validate move safety (default: True) +- `--prioritize-space/--prioritize-zones`: Prioritize available space over zone balancing (default: False) +- `--dry-run/--execute`: Show what would be done without generating SQL commands (default: True) +- `--node`: Only recommend moves from this specific source node (e.g., data-hot-4) + +**Examples:** +```bash +# Dry run with zone balancing priority +xmover recommend --prioritize-zones + +# Generate SQL for space optimization +xmover recommend --prioritize-space --execute + +# Focus on specific table with custom parameters +xmover recommend --table events --min-size 10 --max-size 30 --execute + +# Target space relief for a specific node +xmover recommend --prioritize-space --min-size 30 --max-size 60 --node data-hot-4 + +# Allow higher disk usage for urgent moves +xmover recommend --prioritize-space --max-disk-usage 90 +``` + +### `zone-analysis` +Provides detailed analysis of zone distribution and potential conflicts. + +**Options:** +- `--table, -t`: Analyze zones for specific table only +- `--show-shards/--no-show-shards`: Show individual shard details (default: False) + +**Example:** +```bash +xmover zone-analysis --show-shards --table critical_data +``` + +### `check-balance` +Checks zone balance for shards with configurable tolerance. + +**Options:** +- `--table, -t`: Check balance for specific table only +- `--tolerance`: Zone balance tolerance percentage (default: 10) + +**Example:** +```bash +xmover check-balance --tolerance 15 +``` + + + +### `validate-move` +Validates a specific shard move before execution to prevent errors. + +**Arguments:** +- `SCHEMA_TABLE`: Schema and table name (format: schema.table) +- `SHARD_ID`: Shard ID to move +- `FROM_NODE`: Source node name +- `TO_NODE`: Target node name + +**Examples:** +```bash +# Standard validation +xmover validate-move CUROV.maddoxxxS 4 data-hot-1 data-hot-3 + +# Allow higher disk usage for urgent moves +xmover validate-move CUROV.tendedero 4 data-hot-1 data-hot-3 --max-disk-usage 90 +``` + +### `explain-error` +Explains CrateDB allocation error messages and provides troubleshooting guidance. + +**Arguments:** +- `ERROR_MESSAGE`: The CrateDB error message to analyze (optional - can be provided interactively) + +**Examples:** +```bash +# Interactive mode +xmover explain-error + +# Direct analysis +xmover explain-error "NO(a copy of this shard is already allocated to this node)" +``` + +### `monitor-recovery` +Monitors active shard recovery operations on the cluster. + +**Options:** +- `--table, -t`: Monitor recovery for specific table only +- `--node, -n`: Monitor recovery on specific node only +- `--watch, -w`: Continuously monitor (refresh every 10s) +- `--refresh-interval`: Refresh interval for watch mode in seconds (default: 10) +- `--recovery-type`: Filter by recovery type - PEER, DISK, or all (default: all) +- `--include-transitioning`: Include recently completed recoveries (DONE stage) + +**Examples:** +```bash +# Check current recovery status +xmover monitor-recovery + +# Monitor specific table recoveries +xmover monitor-recovery --table PartioffD + +# Continuous monitoring with custom refresh rate +xmover monitor-recovery --watch --refresh-interval 5 + +# Monitor only PEER recoveries on specific node +xmover monitor-recovery --node data-hot-1 --recovery-type PEER + +# Include completed recoveries still transitioning +xmover monitor-recovery --watch --include-transitioning +``` + +**Recovery Types:** +- **PEER**: Copying shard data from another node (replication/relocation) +- **DISK**: Rebuilding shard from local data (after restart/disk issues) + +### `test-connection` +Tests the connection to CrateDB and displays basic cluster information. + +## Operation Modes + +### Analysis vs Operational Views + +XMover provides two distinct views of your cluster: + +1. **Analysis View** (`analyze`, `zone-analysis`): Includes ALL shards regardless of state for complete cluster visibility +2. **Operational View** (`find-candidates`, `recommend`): Only includes healthy shards (STARTED + 100% recovered) for safe operations + +### Prioritization Modes + +When generating recommendations, you can choose between two prioritization strategies: + +1. **Zone Balancing Priority** (default): Focuses on achieving optimal zone distribution first, then considers available space +2. **Space Priority**: Prioritizes moving shards to nodes with more available space, regardless of zone balance + +### Safety Features + +- **Zone Conflict Detection**: Prevents moves that would place multiple copies of the same shard in the same zone +- **Capacity Validation**: Ensures target nodes have sufficient free space +- **Health Checks**: Only operates on healthy shards (STARTED routing state + 100% recovery) +- **SQL Quoting**: Properly quotes schema and table names in generated SQL commands + +## Example Workflows + +### Regular Cluster Maintenance + +1. Analyze current state: +```bash +xmover analyze +``` + +2. Check for zone imbalances: +```bash +xmover check-balance +``` + +3. Generate and review recommendations: +```bash +xmover recommend --dry-run +``` + +4. Execute safe moves: +```bash +xmover recommend --execute +``` + +### Targeted Node Relief + +When a specific node is running low on space: + +1. Check which node needs relief: +```bash +xmover analyze +``` + +2. Generate recommendations for that specific node: +```bash +xmover recommend --prioritize-space --node data-hot-4 --dry-run +``` + +3. Execute the moves: +```bash +xmover recommend --prioritize-space --node data-hot-4 --execute +``` + +### Monitoring Shard Recovery Operations + +After executing shard moves, monitor the recovery progress: + +1. Execute moves and monitor recovery: +```bash +# Execute moves +xmover recommend --node data-hot-1 --execute + +# Monitor the resulting recoveries +xmover monitor-recovery --watch +``` + +2. Monitor specific table or node recovery: +```bash +# Monitor specific table +xmover monitor-recovery --table shipmentFormFieldData --watch + +# Monitor specific node +xmover monitor-recovery --node data-hot-4 --watch + +# Monitor including completed recoveries +xmover monitor-recovery --watch --include-transitioning +``` + +3. Check recovery after node maintenance: +```bash +# After bringing a node back online +xmover monitor-recovery --node data-hot-3 --recovery-type DISK +``` + +### Manual Shard Movement + +1. Validate the move first: +```bash +xmover validate-move SCHEMA.TABLE SHARD_ID FROM_NODE TO_NODE +``` + +2. Generate safe recommendations: +```bash +xmover recommend --prioritize-space --execute +``` + +3. Monitor shard health after moves + +### Troubleshooting Zone Conflicts + +1. Identify conflicts: +```bash +xmover zone-analysis --show-shards +``` + +2. Generate targeted fixes: +```bash +xmover recommend --prioritize-zones --execute +``` + +## Configuration + +### Environment Variables + +- `CRATE_CONNECTION_STRING`: CrateDB HTTP endpoint (required) +- `CRATE_USERNAME`: Username for authentication (optional) +- `CRATE_PASSWORD`: Password for authentication (optional) +- `CRATE_SSL_VERIFY`: Enable SSL certificate verification (default: true) + +### Connection String Format + +```text +https://hostname:port +``` + +The tool automatically appends `/_sql` to the endpoint. + +## Safety Considerations + +⚠️ **Important Safety Notes:** + +1. **Always test in non-production environments first** +2. **Monitor shard health after each move before proceeding with additional moves** +3. **Ensure adequate cluster capacity before decommissioning nodes** +4. **Verify zone distribution after rebalancing operations** +5. **Keep backups current before performing large-scale moves** + +## Troubleshooting + +XMover provides comprehensive troubleshooting tools to help diagnose and resolve shard movement issues. + +### Quick Diagnosis Commands + +```bash +# Validate a specific move before execution +xmover validate-move SCHEMA.TABLE SHARD_ID FROM_NODE TO_NODE + +# Explain CrateDB error messages +xmover explain-error "your error message here" + +# Check zone distribution for conflicts +xmover zone-analysis --show-shards + +# Verify overall cluster health +xmover analyze +``` + +### Common Issues and Solutions + +1. **Zone Conflicts** + ```text + Error: "NO(a copy of this shard is already allocated to this node)" + ``` + - **Cause**: Target node already has a copy of the shard + - **Solution**: Use `xmover zone-analysis --show-shards` to find alternative targets + - **Prevention**: Always use `xmover validate-move` before executing moves + +2. **Zone Allocation Limits** + ```text + Error: "too many copies of the shard allocated to nodes with attribute [zone]" + ``` + - **Cause**: CrateDB's zone awareness prevents too many copies in same zone + - **Solution**: Move shard to a different availability zone + - **Prevention**: Use `xmover recommend` which respects zone constraints + +3. **Insufficient Space** + ```text + Error: "not enough disk space" + ``` + - **Cause**: Target node lacks sufficient free space + - **Solution**: Choose node with more capacity or free up space + - **Check**: `xmover analyze` to see available space per node + +4. **High Disk Usage Blocking Moves** + ```text + Error: "Target node disk usage too high (85.3%)" + ``` + - **Cause**: Target node exceeds default 85% disk usage threshold + - **Solution**: Use `--max-disk-usage` to allow higher usage for urgent moves + - **Example**: `xmover recommend --max-disk-usage 90 --prioritize-space` + +5. **No Recommendations Generated** + - **Cause**: Cluster may already be well balanced + - **Solution**: Adjust size filters or check `xmover check-balance` + - **Try**: `--prioritize-space` mode for capacity-based moves + +### Error Message Decoder + +Use the built-in error decoder for complex CrateDB messages: + +```bash +# Interactive mode - paste your error message +xmover explain-error + +# Direct analysis +xmover explain-error "NO(a copy of this shard is already allocated to this node)" +``` + +### Configurable Safety Thresholds + +XMover uses configurable safety thresholds to prevent risky moves: + +**Disk Usage Threshold (default: 85%)** +```bash +# Allow moves to nodes with higher disk usage +xmover recommend --max-disk-usage 90 --prioritize-space + +# For urgent space relief +xmover validate-move --max-disk-usage 95 +``` + +**When to Adjust Thresholds:** +- **Emergency situations**: Increase to 90-95% for critical space relief +- **Conservative operations**: Decrease to 75-80% for safer moves +- **Staging environments**: Can be more aggressive (90%+) +- **Production**: Keep conservative (80-85%) + +### Advanced Troubleshooting + +For detailed troubleshooting procedures, see {ref}`xmover-troubleshooting` which covers: +- Step-by-step diagnostic procedures +- Emergency recovery procedures +- Best practices for safe operations +- Complete error reference guide + +### Debug Information + +All commands provide detailed safety validation messages and explanations for any issues detected. diff --git a/doc/admin/xmover/index.md b/doc/admin/xmover/index.md new file mode 100644 index 00000000..affa4825 --- /dev/null +++ b/doc/admin/xmover/index.md @@ -0,0 +1,29 @@ +# XMover + +:::{div} sd-text-muted +CrateDB Shard Analyzer and Movement Tool. +::: + +A comprehensive looking-glass utility for analyzing CrateDB shard +distribution across nodes and availability zones. It generates safe +SQL commands for shard rebalancing and node decommissioning. + +## Features + +- **Cluster Analysis**: Complete overview of shard distribution across nodes and zones +- **Shard Movement Recommendations**: Intelligent suggestions for rebalancing with safety validation +- **Recovery Monitoring**: Track ongoing shard recovery operations with progress details +- **Zone Conflict Detection**: Prevents moves that would violate CrateDB's zone awareness +- **Node Decommissioning**: Plan safe node removal with automated shard relocation +- **Dry Run Mode**: Test recommendations without generating actual SQL commands +- **Safety Validation**: Comprehensive checks to ensure data availability during moves + +## Documentation + +```{toctree} +:maxdepth: 1 + +Handbook +Troubleshooting +Query Gallery +``` diff --git a/doc/admin/xmover/queries.md b/doc/admin/xmover/queries.md new file mode 100644 index 00000000..27bd89e6 --- /dev/null +++ b/doc/admin/xmover/queries.md @@ -0,0 +1,218 @@ +(xmover-queries)= +# XMover Query Gallery + +## Shard Distribution over Nodes + +```sql +select node['name'], sum(size) / 1024^3, count(id) from sys.shards group by 1 order by 1 asc; ++--------------+-----------------------------+-----------+ +| node['name'] | (sum(size) / 1.073741824E9) | count(id) | ++--------------+-----------------------------+-----------+ +| data-hot-0 | 1862.5866614403203 | 680 | +| data-hot-1 | 1866.0331328986213 | 684 | +| data-hot-2 | 1856.6581886671484 | 1043 | +| data-hot-3 | 1208.932889252901 | 477 | +| data-hot-4 | 1861.7727940855548 | 674 | +| data-hot-5 | 1863.4315695902333 | 744 | +| data-hot-6 | 1851.3522544233128 | 948 | +| NULL | 0.0 | 35 | ++--------------+-----------------------------+-----------+ +SELECT 8 rows in set (0.061 sec) +``` +## Shard Distribution PRIMARY/REPLICAS over nodes + +```sql + +select node['name'], primary, sum(size) / 1024^3, count(id) from sys.shards group by 1,2 order by 1 asc; ++--------------+---------+-----------------------------+-----------+ +| node['name'] | primary | (sum(size) / 1.073741824E9) | count(id) | ++--------------+---------+-----------------------------+-----------+ +| data-hot-0 | TRUE | 1459.3267894154415 | 447 | +| data-hot-0 | FALSE | 403.25987202487886 | 233 | +| data-hot-1 | TRUE | 1209.6781993638724 | 374 | +| data-hot-1 | FALSE | 656.3549335347489 | 310 | +| data-hot-2 | TRUE | 1624.9012612393126 | 995 | +| data-hot-2 | FALSE | 231.5014410642907 | 48 | +| data-hot-3 | TRUE | 6.339549297466874 | 58 | +| data-hot-3 | FALSE | 1202.486775631085 | 419 | +| data-hot-4 | FALSE | 838.5498185381293 | 225 | +| data-hot-4 | TRUE | 1023.1511942362413 | 449 | +| data-hot-5 | FALSE | 1002.365406149067 | 422 | +| data-hot-5 | TRUE | 860.9174101138487 | 322 | +| data-hot-6 | FALSE | 1850.3959310995415 | 940 | +| data-hot-6 | TRUE | 0.9159421799704432 | 8 | +| NULL | FALSE | 0.0 | 35 | ++--------------+---------+-----------------------------+-----------+ + +``` + +## Nodes available Space +```sql +SELECT + name, + attributes['zone'] AS zone, + fs['total']['available'] / power(1024, 3) AS available_gb +FROM sys.nodes +ORDER BY name; +``` +```text ++------------+--------------------+-----------------------------------------------+ +| name | attributes['zone'] | (fs[1]['disks']['available'] / 1.073741824E9) | ++------------+--------------------+-----------------------------------------------+ +| data-hot-5 | us-west-2a | 142.3342628479004 | +| data-hot-0 | us-west-2a | 142.03089141845703 | +| data-hot-6 | us-west-2b | 159.68728256225586 | +| data-hot-3 | us-west-2b | 798.8147850036621 | +| data-hot-2 | us-west-2b | 156.79160690307617 | +| data-hot-1 | us-west-2c | 145.73613739013672 | +| data-hot-4 | us-west-2c | 148.39511108398438 | ++------------+--------------------+-----------------------------------------------+ +``` + +## List biggest SHARDS on a particular Nodes + +```sql +select node['name'], table_name, schema_name, id, sum(size) / 1024^3 from sys.shards + where node['name'] = 'data-hot-2' + AND routing_state = 'STARTED' + AND recovery['files']['percent'] = 0 + group by 1,2,3,4 order by 5 desc limit 8; ++--------------+-----------------------+-------------+----+-----------------------------+ +| node['name'] | table_name | schema_name | id | (sum(size) / 1.073741824E9) | ++--------------+-----------------------+-------------+----+-----------------------------+ +| data-hot-2 | bottleFieldData | curvo | 5 | 135.568662205711 | +| data-hot-2 | bottleFieldData | curvo | 8 | 134.813782049343 | +| data-hot-2 | bottleFieldData | curvo | 3 | 133.43549298401922 | +| data-hot-2 | bottleFieldData | curvo | 11 | 130.10448653809726 | +| data-hot-2 | turtleFieldData | curvo | 31 | 54.642812703736126 | +| data-hot-2 | turtleFieldData | curvo | 29 | 54.06101848650724 | +| data-hot-2 | turtleFieldData | curvo | 5 | 53.96749582327902 | +| data-hot-2 | turtleFieldData | curvo | 21 | 53.72262619435787 | ++--------------+-----------------------+-------------+----+-----------------------------+ +SELECT 8 rows in set (0.062 sec) +``` + +## Move REROUTE +```sql +ALTER TABLE curvo.bottlefielddata REROUTE MOVE SHARD 21 FROM 'data-hot-2' TO 'data-hot-3'; +``` +--- + +```sql + +WITH shard_summary AS ( + SELECT + node['name'] AS node_name, + table_name, + schema_name, + CASE + WHEN "primary" = true THEN 'PRIMARY' + ELSE 'REPLICA' + END AS shard_type, + COUNT(*) AS shard_count, + SUM(size) / 1024^3 AS total_size_gb + FROM sys.shards + WHERE table_name = 'orderffD' + AND routing_state = 'STARTED' + AND recovery['files']['percent'] = 0 + GROUP BY node['name'], table_name, schema_name, "primary" +) +SELECT + node_name, + table_name, + schema_name, + shard_type, + shard_count, + ROUND(total_size_gb, 2) AS total_size_gb, + ROUND(total_size_gb / shard_count, 2) AS avg_shard_size_gb +FROM shard_summary +ORDER BY node_name, shard_type DESC, total_size_gb DESC; +``` + +```sql +-- Comprehensive shard distribution showing both node and zone details +SELECT + n.attributes['zone'] AS zone, + s.node['name'] AS node_name, + s.table_name, + s.schema_name, + CASE + WHEN s."primary" = true THEN 'PRIMARY' + ELSE 'REPLICA' + END AS shard_type, + s.id AS shard_id, + s.size / 1024^3 AS shard_size_gb, + s.num_docs, + s.state +FROM sys.shards s +JOIN sys.nodes n ON s.node['id'] = n.id +WHERE s.table_name = 'your_table_name' -- Replace with your specific table name + AND s.routing_state = 'STARTED' + AND s.recovery['files']['percent'] = 0 +ORDER BY + n.attributes['zone'], + s.node['name'], + s."primary" DESC, -- Primary shards first + s.id; + +-- Summary by zone and shard type +SELECT + n.attributes['zone'] AS zone, + CASE + WHEN s."primary" = true THEN 'PRIMARY' + ELSE 'REPLICA' + END AS shard_type, + COUNT(*) AS shard_count, + COUNT(DISTINCT s.node['name']) AS nodes_with_shards, + ROUND(SUM(s.size) / 1024^3, 2) AS total_size_gb, + ROUND(AVG(s.size) / 1024^3, 3) AS avg_shard_size_gb, + SUM(s.num_docs) AS total_documents +FROM sys.shards s +JOIN sys.nodes n ON s.node['id'] = n.id +WHERE s.table_name = 'orderffD' -- Replace with your specific table name + AND s.routing_state = 'STARTED' + AND s.recovery['files']['percent'] = 0 +GROUP BY n.attributes['zone'], s."primary" +ORDER BY zone, shard_type DESC; + +``` + +## Relocation + +```sql +SELECT + table_name, + shard_id, + current_state, + explanation, + node_id + FROM sys.allocations + WHERE current_state != 'STARTED' and table_name = 'dispatchio' and shard_id = 19 + ORDER BY current_state, table_name, shard_id; + ++-----------------------+----------+---------------+-------------+------------------------+ +| table_name | shard_id | current_state | explanation | node_id | ++-----------------------+----------+---------------+-------------+------------------------+ +| dispatchio | 19 | RELOCATING | NULL | ZH6fBanGSjanGqeSh-sw0A | ++-----------------------+----------+---------------+-------------+------------------------+ +``` + +```sql +SELECT + COUNT(*) as recovering_shards + FROM sys.shards + WHERE state = 'RECOVERING' OR routing_state IN ('INITIALIZING', 'RELOCATING'); + +``` + +```sql +SELECT + table_name, + shard_id, + current_state, + explanation, + node_id + FROM sys.allocations + WHERE current_state != 'STARTED' and table_name = 'dispatchio' and shard_id = 19 + ORDER BY current_state, table_name, shard_id; +``` diff --git a/doc/admin/xmover/troubleshooting.md b/doc/admin/xmover/troubleshooting.md new file mode 100644 index 00000000..1afa477a --- /dev/null +++ b/doc/admin/xmover/troubleshooting.md @@ -0,0 +1,427 @@ +(xmover-troubleshooting)= +# Troubleshooting CrateDB using XMover + +This guide helps you diagnose and resolve common issues when using XMover for CrateDB shard management. + +## Quick Diagnosis Commands + +Before troubleshooting, run these commands to understand your cluster state: + +```bash +# Check overall cluster health +xmover analyze + +# Check zone distribution for conflicts +xmover zone-analysis --show-shards + +# Validate a specific move before execution +xmover validate-move SCHEMA.TABLE SHARD_ID FROM_NODE TO_NODE + +# Explain CrateDB error messages +xmover explain-error "your error message here" +``` + +## Common Issues and Solutions + +### 1. Zone Conflicts + +#### Symptoms +- Error: `NO(a copy of this shard is already allocated to this node)` +- Error: `NO(there are too many copies of the shard allocated to nodes with attribute [zone])` +- Recommendations show zone conflicts in safety validation + +#### Root Causes +- Target node already has a copy of the shard (primary or replica) +- Target zone already has copies, violating CrateDB's zone awareness +- Incorrect understanding of current shard distribution + +#### Solutions + +**Step 1: Analyze Current Distribution** +```bash +# See exactly where shard copies are located +xmover zone-analysis --show-shards --table YOUR_TABLE + +# Check overall zone balance +xmover check-balance +``` + +**Step 2: Find Alternative Targets** +```bash +# Find nodes with available capacity in different zones +xmover analyze + +# Get movement candidates with size filters +xmover find-candidates --min-size 20 --max-size 30 +``` + +**Step 3: Validate Before Moving** +```bash +# Always validate moves before execution +xmover validate-move SCHEMA.TABLE SHARD_ID FROM_NODE TO_NODE +``` + +#### Prevention +- Always use `xmover recommend` instead of manual moves +- Enable dry-run mode by default: `xmover recommend --dry-run` +- Check zone distribution before planning moves + +### 2. Insufficient Space Issues + +#### Symptoms +- Error: `not enough disk space` +- Safety validation fails with space warnings +- High disk usage percentages in cluster analysis + +#### Root Causes +- Target node doesn't have enough free space for the shard +- High disk usage on target nodes (>85%) +- Insufficient buffer space for safe operations + +#### Solutions + +**Step 1: Check Available Space** +```bash +# Review node capacity and usage +xmover analyze + +# Look for nodes with more available space +xmover find-candidates --min-size 0 --max-size 100 +``` + +**Step 2: Adjust Parameters** +```bash +# Increase minimum free space requirement +xmover recommend --min-free-space 200 + +# Focus on smaller shards +xmover recommend --max-size 50 +``` + +**Step 3: Free Up Space** +- Delete old snapshots and unused data +- Move other shards away from constrained nodes +- Consider adding nodes to the cluster + +#### Prevention +- Monitor disk usage regularly with `xmover analyze` +- Set conservative `--min-free-space` values (default: 100GB) +- Plan capacity expansion before reaching 80% disk usage + +### 3. Node Performance Issues + +#### Symptoms +- Error: `shard recovery limit` +- High heap usage warnings +- Slow shard movement operations + +#### Root Causes +- Too many concurrent shard movements +- High heap usage on target nodes (>80%) +- Resource contention during moves + +#### Solutions + +**Step 1: Check Node Health** +```bash +# Review heap and disk usage +xmover analyze + +# Check for overloaded nodes +xmover check-balance +``` + +**Step 2: Reduce Concurrent Operations** +```bash +# Move fewer shards at once +xmover recommend --max-moves 3 + +# Wait between moves for recovery completion +# Monitor with CrateDB Admin UI +``` + +**Step 3: Target Less Loaded Nodes** +```bash +# Prioritize nodes with better resources +xmover recommend --prioritize-space +``` + +#### Prevention +- Move shards gradually (5-10 at a time) +- Monitor heap usage and wait for recovery completion +- Avoid moves during high-traffic periods + +### 4. Zone Imbalance Issues + +#### Symptoms +- `check-balance` shows zones marked as "Over" or "Under" +- Zone distribution is uneven +- Some zones have significantly more shards + +#### Root Causes +- Historical data distribution patterns +- Node additions/removals without rebalancing +- Tables created with poor initial distribution + +#### Solutions + +**Step 1: Assess Imbalance** +```bash +# Check current zone balance +xmover check-balance --tolerance 15 + +# Get detailed zone analysis +xmover zone-analysis +``` + +**Step 2: Generate Rebalancing Plan** +```bash +# Prioritize zone balancing +xmover recommend --prioritize-zones --dry-run + +# Review recommendations carefully +xmover recommend --prioritize-zones --max-moves 10 +``` + +**Step 3: Execute Gradually** +```bash +# Execute in small batches +xmover recommend --prioritize-zones --max-moves 5 --execute + +# Monitor progress and repeat +``` + +#### Prevention +- Run regular balance checks: `xmover check-balance` +- Use zone-aware table creation with proper shard allocation +- Plan rebalancing during maintenance windows + +### 5. Connection and Authentication Issues + +#### Symptoms +- "Connection failed" errors +- Authentication failures +- SSL/TLS errors + +#### Root Causes +- Incorrect connection string in `.env` +- Wrong credentials +- Network connectivity issues +- SSL certificate problems + +#### Solutions + +**Step 1: Verify Connection** +```bash +# Test basic connectivity +xmover test-connection +``` + +**Step 2: Check Configuration** +```bash +# Verify .env file contents +cat .env + +# Example correct format: +CRATE_CONNECTION_STRING=https://cluster.cratedb.net:4200 +CRATE_USERNAME=admin +CRATE_PASSWORD=your-password +CRATE_SSL_VERIFY=true +``` + +**Step 3: Test Network Access** +```bash +# Test HTTP connectivity +curl -u 'username:password' \ + -H 'Content-Type: application/json' \ + 'https://your-cluster:4200/_sql' \ + -d '{"stmt":"SELECT 1"}' +``` + +#### Prevention +- Use `.env.example` as a template +- Verify credentials with CrateDB admin +- Test connectivity from deployment environment + +## Error Message Decoder + +### CrateDB Allocation Errors + +Use `xmover explain-error` to decode complex CrateDB error messages: + +```bash +# Interactive mode +xmover explain-error + +# Direct analysis +xmover explain-error "your error message here" +``` + +### Common Error Patterns + +| Error Pattern | Meaning | Quick Fix | +|---------------|---------|-----------| +| `copy of this shard is already allocated` | Node already has shard | Choose different target node | +| `too many copies...with attribute [zone]` | Zone limit exceeded | Move to different zone | +| `not enough disk space` | Insufficient space | Free space or choose different node | +| `shard recovery limit` | Too many concurrent moves | Wait and retry with fewer moves | +| `allocation is disabled` | Cluster allocation disabled | Re-enable allocation settings | + +## Best Practices for Safe Operations + +### Pre-Move Checklist + +1. **Analyze cluster state** + ```bash + xmover analyze + ``` + +2. **Check zone distribution** + ```bash + xmover zone-analysis + ``` + +3. **Generate recommendations** + ```bash + xmover recommend --dry-run + ``` + +4. **Validate specific moves** + ```bash + xmover validate-move + ``` + +5. **Execute gradually** + ```bash + xmover recommend --max-moves 5 --execute + ``` + +### During Operations + +1. **Monitor shard health** + - Check CrateDB Admin UI for recovery progress + - Watch for failed or stuck shards + - Verify routing state changes to STARTED + +2. **Track resource usage** + - Monitor disk and heap usage on target nodes + - Watch for network saturation during moves + - Check cluster performance metrics + +3. **Maintain documentation** + - Record moves performed and reasons + - Note any issues encountered + - Document lessons learned + +### Post-Move Verification + +1. **Verify shard health** + ```sql + SELECT table_name, id, "primary", node['name'], routing_state + FROM sys.shards + WHERE table_name = 'your_table' AND routing_state != 'STARTED'; + ``` + +2. **Check zone balance** + ```bash + xmover check-balance + ``` + +3. **Monitor cluster performance** + - Query response times + - Resource utilization + - Error rates + +## Emergency Procedures + +### Stuck Shard Recovery + +If a shard gets stuck during movement: + +1. **Check shard status** + ```sql + SELECT * FROM sys.shards WHERE routing_state != 'STARTED'; + ``` + +2. **Cancel problematic moves** + ```sql + ALTER TABLE "schema"."table" REROUTE CANCEL SHARD ON ''; + ``` + +3. **Retry allocation** + ```sql + ALTER TABLE "schema"."table" REROUTE RETRY FAILED; + ``` + +### Cluster Health Issues + +If moves cause cluster problems: + +1. **Disable allocation temporarily** + ```text + PUT /_cluster/settings + { + "persistent": { + "cluster.routing.allocation.enable": "primaries" + } + } + ``` + +2. **Wait for stabilization** + - Monitor cluster health + - Check node resource usage + - Verify no failed shards + +3. **Re-enable allocation** + ```text + PUT /_cluster/settings + { + "persistent": { + "cluster.routing.allocation.enable": "all" + } + } + ``` + +## Getting Help + +### Built-in Help + +```bash +# Command help +xmover --help +xmover COMMAND --help + +# Error explanation +xmover explain-error + +# Move validation +xmover validate-move SCHEMA.TABLE SHARD_ID FROM TO +``` + +### Additional Resources + +- **CrateDB Documentation**: https://crate.io/docs/ +- **Shard Allocation Guide**: https://crate.io/docs/crate/reference/en/latest/admin/system-information.html +- **Cluster Settings**: https://crate.io/docs/crate/reference/en/latest/config/cluster.html + +### Reporting Issues + +When reporting issues, include: + +1. **XMover version and command used** +2. **Complete error message** +3. **Cluster information** (`xmover analyze` output) +4. **Zone analysis** (`xmover zone-analysis` output) +5. **CrateDB version and configuration** + +### Support Checklist + +Before contacting support: + +- [ ] Tried `xmover validate-move` for the specific operation +- [ ] Checked zone distribution with `xmover zone-analysis` +- [ ] Reviewed cluster health with `xmover analyze` +- [ ] Used `xmover explain-error` to decode error messages +- [ ] Verified connection and authentication with `xmover test-connection` +- [ ] Read through this troubleshooting guide +- [ ] Checked CrateDB documentation for allocation settings diff --git a/doc/index.md b/doc/index.md index 56b849ec..17c55514 100644 --- a/doc/index.md +++ b/doc/index.md @@ -30,6 +30,7 @@ changes :caption: Diagnostics :hidden: +admin/index Cluster Information Cluster Flight Recorder (CFR) ``` diff --git a/pyproject.toml b/pyproject.toml index 0ceb90b0..f6614eb8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,6 +110,7 @@ dependencies = [ "python-slugify<9", "pyyaml<7", "requests>=2.28,<3", + "rich<14", "sqlalchemy-cratedb>=0.41.0", "sqlparse<0.6", "tqdm<5", @@ -263,6 +264,7 @@ scripts.cratedb-retention = "cratedb_toolkit.retention.cli:cli" scripts.cratedb-toolkit = "cratedb_toolkit.cli:cli" scripts.ctk = "cratedb_toolkit.cli:cli" scripts.migr8 = "cratedb_toolkit.io.mongodb.cli:main" +scripts.xmover = "cratedb_toolkit.admin.xmover.cli:main" entry-points.pytest11.cratedb_service = "cratedb_toolkit.testing.pytest" [tool.setuptools.packages.find] @@ -322,11 +324,12 @@ lint.extend-ignore = [ "S108", ] -lint.per-file-ignores."cratedb_toolkit/retention/cli.py" = [ "T201" ] # Allow `print` -lint.per-file-ignores."cratedb_toolkit/sqlalchemy/__init__.py" = [ "F401" ] # Allow `module´ imported but unused +lint.per-file-ignores."cratedb_toolkit/admin/xmover/analysis/shard.py" = [ "T201" ] # Allow `print` +lint.per-file-ignores."cratedb_toolkit/retention/cli.py" = [ "T201" ] # Allow `print` +lint.per-file-ignores."cratedb_toolkit/sqlalchemy/__init__.py" = [ "F401" ] # Allow `module´ imported but unused lint.per-file-ignores."doc/conf.py" = [ "A001", "ERA001" ] -lint.per-file-ignores."examples/*" = [ "ERA001", "F401", "T201", "T203" ] # Allow `print` and `pprint` -lint.per-file-ignores."tests/*" = [ "S101" ] # Allow use of `assert`, and `print`. +lint.per-file-ignores."examples/*" = [ "ERA001", "F401", "T201", "T203" ] # Allow `print` and `pprint` +lint.per-file-ignores."tests/*" = [ "S101" ] # Allow use of `assert`, and `print`. lint.per-file-ignores."tests/adapter/test_rockset.py" = [ "E402" ] lint.per-file-ignores."tests/info/test_http.py" = [ "E402" ] diff --git a/tests/admin/__init__.py b/tests/admin/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/admin/test_cli.py b/tests/admin/test_cli.py new file mode 100644 index 00000000..60e8d810 --- /dev/null +++ b/tests/admin/test_cli.py @@ -0,0 +1,67 @@ +import pytest +from click.testing import CliRunner + +from cratedb_toolkit.admin.xmover.cli import main as cli + + +@pytest.mark.parametrize( + "subcommand", + [ + "analyze", + "check-balance", + "explain-error", + "find-candidates", + "monitor-recovery", + "recommend", + "test-connection", + "zone-analysis", + ], +) +def test_xmover_all(cratedb, subcommand): + """ + CLI test: Invoke `xmover `. + """ + http_url = cratedb.get_http_url() + runner = CliRunner() + + result = runner.invoke( + cli, + args=subcommand, + env={"CRATE_CONNECTION_STRING": http_url}, + catch_exceptions=False, + ) + assert result.exit_code == 0 + + +def test_xmover_validate_move_success(cratedb): + """ + CLI test: Invoke `xmover validate-move`. + """ + http_url = cratedb.get_http_url() + runner = CliRunner() + + result = runner.invoke( + cli, + args=["validate-move", "doc.demo", "1", "42", "84"], + env={"CRATE_CONNECTION_STRING": http_url}, + catch_exceptions=False, + ) + assert result.exit_code == 0 + assert "Source node '42' not found in cluster" in result.output + + +def test_xmover_validate_move_failure(cratedb): + """ + CLI test: Invoke `xmover validate-move`. + """ + http_url = cratedb.get_http_url() + runner = CliRunner() + + result = runner.invoke( + cli, + args=["validate-move"], + env={"CRATE_CONNECTION_STRING": http_url}, + catch_exceptions=False, + ) + assert result.exit_code == 2 + assert "Error: Missing argument 'SCHEMA_TABLE'." in result.output