diff --git a/cratedb_toolkit/admin/xmover/analysis/shard.py b/cratedb_toolkit/admin/xmover/analysis/shard.py index f6f24b6b..96bf6351 100644 --- a/cratedb_toolkit/admin/xmover/analysis/shard.py +++ b/cratedb_toolkit/admin/xmover/analysis/shard.py @@ -13,6 +13,8 @@ from rich.table import Table from cratedb_toolkit.admin.xmover.model import ( + ActiveShardActivity, + ActiveShardSnapshot, DistributionStats, NodeInfo, ShardInfo, @@ -36,7 +38,7 @@ def __init__(self, client: CrateDBClient): self.shards: List[ShardInfo] = [] # Initialize session-based caches for performance. - self._zone_conflict_cache: Dict[Tuple[str, int, str], Union[str, None]] = {} + self._zone_conflict_cache: Dict[Tuple[str, str, int, str], Union[str, None]] = {} self._node_lookup_cache: Dict[str, Union[NodeInfo, None]] = {} self._target_nodes_cache: Dict[Tuple[float, frozenset[Any], float, float], List[NodeInfo]] = {} self._cache_hits = 0 @@ -181,8 +183,6 @@ def find_nodes_with_capacity( free_space_gb = node.available_space_gb if free_space_gb >= (required_space_gb + min_free_space_gb): available_nodes.append(node) - else: - continue # Sort by available space (most space first) - prioritize nodes with more free space available_nodes.sort(key=lambda n: n.available_space_gb, reverse=True) @@ -204,7 +204,7 @@ def generate_rebalancing_recommendations( # Get moveable shards (only healthy ones for actual operations) moveable_shards = self.find_moveable_shards(constraints.min_size, constraints.max_size, constraints.table_name) - print( + logger.info( f"Analyzing {len(moveable_shards)} candidate shards " f"in size range {constraints.min_size}-{constraints.max_size}GB..." ) @@ -237,12 +237,11 @@ def generate_rebalancing_recommendations( # Optimize processing: if filtering by source node, only process those shards if constraints.source_node: processing_shards = [s for s in moveable_shards if s.node_name == constraints.source_node] - print(f"Focusing on {len(processing_shards)} shards from node {constraints.source_node}") + logger.info(f"Focusing on {len(processing_shards)} shards from node {constraints.source_node}") else: processing_shards = moveable_shards # Generate move recommendations - safe_recommendations = 0 # noqa: F841 total_evaluated = 0 for i, shard in enumerate(processing_shards): @@ -366,12 +365,12 @@ def generate_rebalancing_recommendations( if len(processing_shards) > 100: print() # New line after progress dots - print(f"Generated {len(recommendations)} move recommendations (evaluated {total_evaluated} shards)") - print(f"Performance: {self.get_cache_stats()}") + logger.info(f"Generated {len(recommendations)} move recommendations (evaluated {total_evaluated} shards)") + logger.info(f"Performance: {self.get_cache_stats()}") return recommendations def validate_move_safety( - self, recommendation: ShardRelocationResponse, max_disk_usage_percent: float = 90.0 + self, recommendation: ShardRelocationResponse, max_disk_usage_percent: float = 90.0, buffer_gb: float = 50.0 ) -> Tuple[bool, str]: """Validate that a move recommendation is safe to execute""" # Find target node (with caching) @@ -386,7 +385,7 @@ def validate_move_safety( return False, zone_conflict # Check available space - required_space_gb = recommendation.size_gb + 50 # 50GB buffer + required_space_gb = recommendation.size_gb + buffer_gb if target_node.available_space_gb < required_space_gb: return ( False, @@ -421,7 +420,7 @@ def _check_zone_conflict_cached(self, recommendation: ShardRelocationResponse) - """Check zone conflicts with caching""" # Create cache key: table, shard, target zone target_zone = self._get_node_zone(recommendation.to_node) - cache_key = (recommendation.table_name, recommendation.shard_id, target_zone) + cache_key = (recommendation.schema_name, recommendation.table_name, recommendation.shard_id, target_zone) if cache_key in self._zone_conflict_cache: self._cache_hits += 1 @@ -811,11 +810,14 @@ def plan_node_decommission(self, node_name: str, min_free_space_gb: float = 100. # Determine feasibility feasible = len(infeasible_moves) == 0 + # Safety margin for cluster capacity after decommission + capacity_safety_margin = 1.2 # 20 % buffer + # Add capacity warnings if feasible: - # Check if remaining cluster capacity is sufficient after decommission + # Check if the remaining cluster capacity is sufficient after decommission remaining_capacity = sum(n.available_space_gb for n in self.nodes if n.name != node_name) - if remaining_capacity < total_size_gb * 1.2: # 20% safety margin + if remaining_capacity < total_size_gb * capacity_safety_margin: warnings.append( f"Low remaining capacity after decommission. " f"Only {remaining_capacity:.1f}GB available for {total_size_gb:.1f}GB of data" @@ -831,7 +833,7 @@ def plan_node_decommission(self, node_name: str, min_free_space_gb: float = 100. "recommendations": move_plan, "infeasible_moves": infeasible_moves, "warnings": warnings, - "estimated_time_hours": len(move_plan) * 0.1, # Rough estimate: 6 minutes per move + "estimated_time_hours": len(move_plan) * 0.1, # Rough estimate: 0.1 hours (6 minutes) per move "message": "Decommission plan generated" if feasible else "Decommission not currently feasible", } @@ -947,3 +949,181 @@ def distribution(self, table: str = None): table_summary.add_row("Node Balance Score", f"{stats.node_balance_score:.1f}/100") console.print(table_summary) + + +class ActiveShardMonitor: + """Monitor active shard checkpoint progression over time""" + + def __init__(self, client: CrateDBClient): + self.client = client + + def compare_snapshots( + self, + snapshot1: List[ActiveShardSnapshot], + snapshot2: List[ActiveShardSnapshot], + min_activity_threshold: int = 0, + ) -> List["ActiveShardActivity"]: + """Compare two snapshots and return activity data for shards present in both + + Args: + snapshot1: First snapshot (baseline) + snapshot2: Second snapshot (comparison) + min_activity_threshold: Minimum checkpoint delta to consider active (default: 0) + """ + + # Create lookup dict for snapshot1 + snapshot1_dict = {snap.shard_identifier: snap for snap in snapshot1} + + activities = [] + + for snap2 in snapshot2: + snap1 = snapshot1_dict.get(snap2.shard_identifier) + if snap1: + # Calculate local checkpoint delta + local_checkpoint_delta = snap2.local_checkpoint - snap1.local_checkpoint + time_diff = snap2.timestamp - snap1.timestamp + + # Filter based on actual activity between snapshots + if local_checkpoint_delta >= min_activity_threshold: + activity = ActiveShardActivity( + schema_name=snap2.schema_name, + table_name=snap2.table_name, + shard_id=snap2.shard_id, + node_name=snap2.node_name, + is_primary=snap2.is_primary, + partition_ident=snap2.partition_ident, + local_checkpoint_delta=local_checkpoint_delta, + snapshot1=snap1, + snapshot2=snap2, + time_diff_seconds=time_diff, + ) + activities.append(activity) + + # Sort by activity (highest checkpoint delta first) + activities.sort(key=lambda x: x.local_checkpoint_delta, reverse=True) + + return activities + + def format_activity_display( + self, activities: List["ActiveShardActivity"], show_count: int = 10, watch_mode: bool = False + ) -> str: + """Format activity data for console display""" + if not activities: + return "โœ… No active shards with significant checkpoint progression found" + + # Limit to requested count + activities = activities[:show_count] + + # Calculate observation period for context + if activities: + observation_period = activities[0].time_diff_seconds + output = [ + f"\n๐Ÿ”ฅ Most Active Shards ({len(activities)} shown, {observation_period:.0f}s observation period)" + ] + else: + output = [f"\n๐Ÿ”ฅ Most Active Shards ({len(activities)} shown, sorted by checkpoint activity)"] + + output.append("") + + # Add activity rate context + if activities: + total_activity = sum(a.local_checkpoint_delta for a in activities) + avg_rate = sum(a.activity_rate for a in activities) / len(activities) + output.append( + f"[dim]Total checkpoint activity: {total_activity:,} changes, Average rate: {avg_rate:.1f}/sec[/dim]" + ) + output.append("") + + # Create table headers + headers = ["Rank", "Schema.Table", "Shard", "Partition", "Node", "Type", "Checkpoint ฮ”", "Rate/sec", "Trend"] + + # Calculate column widths + col_widths = [len(h) for h in headers] + + # Prepare rows + rows = [] + for i, activity in enumerate(activities, 1): + # Format values + rank = str(i) + table_id = activity.table_identifier + shard_id = str(activity.shard_id) + partition = ( + activity.partition_ident[:14] + "..." + if len(activity.partition_ident) > 14 + else activity.partition_ident or "-" + ) + node = activity.node_name + shard_type = "P" if activity.is_primary else "R" + checkpoint_delta = f"{activity.local_checkpoint_delta:,}" + rate = f"{activity.activity_rate:.1f}" if activity.activity_rate >= 0.1 else "<0.1" + + # Calculate activity trend indicator + if activity.activity_rate >= 100: + trend = "๐Ÿ”ฅ HOT" + elif activity.activity_rate >= 50: + trend = "๐Ÿ“ˆ HIGH" + elif activity.activity_rate >= 10: + trend = "๐Ÿ“Š MED" + else: + trend = "๐Ÿ“‰ LOW" + + row = [rank, table_id, shard_id, partition, node, shard_type, checkpoint_delta, rate, trend] + rows.append(row) + + # Update column widths + for j, cell in enumerate(row): + col_widths[j] = max(col_widths[j], len(cell)) + + # Format table + header_row = " " + " | ".join(h.ljust(w) for h, w in zip(headers, col_widths)) + output.append(header_row) + output.append(" " + "-" * (len(header_row) - 3)) + + # Data rows + for row in rows: + data_row = " " + " | ".join(cell.ljust(w) for cell, w in zip(row, col_widths)) + output.append(data_row) + + # Only show legend and insights in non-watch mode + if not watch_mode: + output.append("") + output.append("Legend:") + output.append(" โ€ข Checkpoint ฮ”: Write operations during observation period") + output.append(" โ€ข Rate/sec: Checkpoint changes per second") + output.append(" โ€ข Partition: partition_ident (truncated if >14 chars, '-' if none)") + output.append(" โ€ข Type: P=Primary, R=Replica") + output.append(" โ€ข Trend: ๐Ÿ”ฅ HOT (โ‰ฅ100/s), ๐Ÿ“ˆ HIGH (โ‰ฅ50/s), ๐Ÿ“Š MED (โ‰ฅ10/s), ๐Ÿ“‰ LOW (<10/s)") + + # Add insights about activity patterns + if activities: + output.append("") + output.append("Insights:") + + # Count by trend + hot_count = len([a for a in activities if a.activity_rate >= 100]) + high_count = len([a for a in activities if 50 <= a.activity_rate < 100]) + med_count = len([a for a in activities if 10 <= a.activity_rate < 50]) + low_count = len([a for a in activities if a.activity_rate < 10]) + + if hot_count > 0: + output.append(f" โ€ข {hot_count} HOT shards (โ‰ฅ100 changes/sec) - consider load balancing") + if high_count > 0: + output.append(f" โ€ข {high_count} HIGH activity shards - monitor capacity") + if med_count > 0: + output.append(f" โ€ข {med_count} MEDIUM activity shards - normal operation") + if low_count > 0: + output.append(f" โ€ข {low_count} LOW activity shards - occasional writes") + + # Identify patterns + primary_activities = [a for a in activities if a.is_primary] + if len(primary_activities) == len(activities): + output.append(" โ€ข All active shards are PRIMARY - normal write pattern") + elif len(primary_activities) < len(activities) * 0.5: + output.append(" โ€ข Many REPLICA shards active - possible recovery/replication activity") + + # Node concentration + nodes = {a.node_name for a in activities} + if len(nodes) <= 2: + output.append(f" โ€ข Activity concentrated on {len(nodes)} node(s) - consider redistribution") + + return "\n".join(output) diff --git a/cratedb_toolkit/admin/xmover/analysis/table.py b/cratedb_toolkit/admin/xmover/analysis/table.py index ef6dbdf3..b3322cd9 100644 --- a/cratedb_toolkit/admin/xmover/analysis/table.py +++ b/cratedb_toolkit/admin/xmover/analysis/table.py @@ -14,6 +14,7 @@ from rich.console import Console from rich.table import Table +from cratedb_toolkit.admin.xmover.model import NodeInfo from cratedb_toolkit.admin.xmover.util.database import CrateDBClient logger = logging.getLogger(__name__) @@ -97,6 +98,9 @@ def find_table_by_name(self, table_name: str) -> Optional[str]: try: choice = input("\nSelect table (enter number): ").strip() + if not choice: + rprint("[yellow]No selection made[/yellow]") + return None idx = int(choice) - 1 if 0 <= idx < len(rows): schema, table = rows[idx] @@ -292,14 +296,9 @@ def format_table_health_report(self, table_dist: TableDistribution) -> None: zone_distribution = {} for node_name, node_data in table_dist.node_distributions.items(): # Try to get zone info for each node - node_info = next((n for n in all_nodes_info if n.name == node_name), None) - if ( - node_info - and hasattr(node_info, "attributes") - and node_info.attributes - and "zone" in node_info.attributes - ): - zone = node_info.attributes["zone"] + node_info: Optional[NodeInfo] = next((n for n in all_nodes_info if n.name == node_name), None) + if node_info and node_info.zone: + zone = node_info.zone if zone not in zone_distribution: zone_distribution[zone] = {"nodes": 0, "shards": 0, "size": 0} zone_distribution[zone]["nodes"] += 1 @@ -576,7 +575,7 @@ def detect_node_coverage_issues(self, table: TableDistribution) -> Optional[Dist nodes_with_shards = set(table.node_distributions.keys()) nodes_without_shards = all_nodes - nodes_with_shards - # Only flag as anomaly if we have missing nodes and the table is significant + # Only flag as an anomaly if we have missing nodes and the table is significant if not nodes_without_shards or table.total_primary_size_gb < 10.0: return None diff --git a/cratedb_toolkit/admin/xmover/analysis/zone.py b/cratedb_toolkit/admin/xmover/analysis/zone.py index 718d88f0..07e67803 100644 --- a/cratedb_toolkit/admin/xmover/analysis/zone.py +++ b/cratedb_toolkit/admin/xmover/analysis/zone.py @@ -135,7 +135,8 @@ def distribution_conflicts(self, shard_details: bool = False, table: Optional[st health_indicator = "โœ“" if shard_copy.routing_state == "STARTED" else "โš " console.print( f" {health_indicator} {shard_copy.shard_type} " - f"on {shard_copy.node_name} ({shard_copy.zone}) - {shard_copy.routing_state}" + f"on {shard_copy.node_name} ({shard_copy.zone}) - " + f"{shard_copy.state}/{shard_copy.routing_state}" ) console.print(analysis_table) diff --git a/cratedb_toolkit/admin/xmover/cli.py b/cratedb_toolkit/admin/xmover/cli.py index e5e6e834..e979986e 100644 --- a/cratedb_toolkit/admin/xmover/cli.py +++ b/cratedb_toolkit/admin/xmover/cli.py @@ -4,13 +4,14 @@ Command Line Interface. """ -import sys +import time from typing import Optional import click from rich.console import Console +from rich.panel import Panel -from cratedb_toolkit.admin.xmover.analysis.shard import ShardAnalyzer, ShardReporter +from cratedb_toolkit.admin.xmover.analysis.shard import ActiveShardMonitor, ShardAnalyzer, ShardReporter from cratedb_toolkit.admin.xmover.analysis.table import DistributionAnalyzer from cratedb_toolkit.admin.xmover.analysis.zone import ZoneReport from cratedb_toolkit.admin.xmover.model import ( @@ -44,11 +45,11 @@ def main(ctx): if not client.test_connection(): console.print("[red]Error: Could not connect to CrateDB[/red]") console.print("Please check your CRATE_CONNECTION_STRING in .env file") - sys.exit(1) + raise click.Abort() ctx.obj["client"] = client except Exception as e: console.print(f"[red]Error connecting to CrateDB: {e}[/red]") - sys.exit(1) + raise click.Abort() from e @main.command() @@ -168,11 +169,11 @@ def test_connection(ctx, connection_string: Optional[str]): console.print(f" โ€ข {node.name} (zone: {node.zone})") else: console.print("[red]โœ— Connection failed[/red]") - sys.exit(1) + raise click.Abort() except Exception as e: console.print(f"[red]โœ— Connection error: {e}[/red]") - sys.exit(1) + raise click.Abort() from e @main.command() @@ -249,6 +250,199 @@ def shard_distribution(ctx, top_tables: int, table: Optional[str]): console.print(f"[dim]{traceback.format_exc()}[/dim]") +@main.command() +@click.option("--count", default=10, help="Number of most active shards to show (default: 10)") +@click.option("--interval", default=30, help="Observation interval in seconds (default: 30)") +@click.option( + "--min-checkpoint-delta", + default=1000, + help="Minimum checkpoint progression between snapshots to show shard (default: 1000)", +) +@click.option("--table", "-t", help="Monitor specific table only") +@click.option("--node", "-n", help="Monitor specific node only") +@click.option("--watch", "-w", is_flag=True, help="Continuously monitor (refresh every interval)") +@click.option("--exclude-system", is_flag=True, help="Exclude system tables (gc.*, information_schema.*)") +@click.option("--min-rate", type=float, help="Minimum activity rate (changes/sec) to show") +@click.option("--show-replicas/--hide-replicas", default=True, help="Show replica shards (default: True)") +@click.pass_context +def active_shards( + ctx, + count: int, + interval: int, + min_checkpoint_delta: int, + table: Optional[str], + node: Optional[str], + watch: bool, + exclude_system: bool, + min_rate: Optional[float], + show_replicas: bool, +): + """Monitor most active shards by checkpoint progression + + This command takes two snapshots of ALL started shards separated by the + observation interval, then shows the shards with the highest checkpoint + progression (activity) between the snapshots. + + Unlike other commands, this tracks ALL shards and filters based on actual + activity between snapshots, not current state. This captures shards that + become active during the observation period. + + Useful for identifying which shards are receiving the most write activity + in your cluster and understanding write patterns. + + Examples: + xmover active-shards --count 20 --interval 60 # Top 20 over 60 seconds + xmover active-shards --watch --interval 30 # Continuous monitoring + xmover active-shards --table my_table --watch # Monitor specific table + xmover active-shards --node data-hot-1 --count 5 # Top 5 on specific node + xmover active-shards --min-checkpoint-delta 500 # Lower activity threshold + xmover active-shards --exclude-system --min-rate 50 # Skip system tables, min 50/sec + xmover active-shards --hide-replicas --count 20 # Only primary shards + """ + client = ctx.obj["client"] + monitor = ActiveShardMonitor(client) + + def get_filtered_snapshot(): + """Get snapshot with optional filtering""" + snapshots = client.get_active_shards_snapshot(min_checkpoint_delta=min_checkpoint_delta) + + # Apply table filter if specified + if table: + snapshots = [s for s in snapshots if s.table_name == table or f"{s.schema_name}.{s.table_name}" == table] + + # Apply node filter if specified + if node: + snapshots = [s for s in snapshots if s.node_name == node] + + # Exclude system tables if requested + if exclude_system: + snapshots = [ + s + for s in snapshots + if not ( + s.schema_name.startswith("gc.") + or s.schema_name == "information_schema" + or s.schema_name == "sys" + or s.table_name.endswith("_events") + or s.table_name.endswith("_log") + ) + ] + + return snapshots + + def run_single_analysis(): + """Run a single analysis cycle""" + if not watch: + console.print(Panel.fit("[bold blue]Active Shards Monitor[/bold blue]")) + + # Show configuration - simplified for watch mode + if watch: + config_parts = [f"{interval}s interval", f"threshold: {min_checkpoint_delta:,}", f"top {count}"] + if table: + config_parts.append(f"table: {table}") + if node: + config_parts.append(f"node: {node}") + console.print(f"[dim]{' | '.join(config_parts)}[/dim]") + else: + config_info = [ + f"Observation interval: {interval}s", + f"Min checkpoint delta: {min_checkpoint_delta:,}", + f"Show count: {count}", + ] + if table: + config_info.append(f"Table filter: {table}") + if node: + config_info.append(f"Node filter: {node}") + if exclude_system: + config_info.append("Excluding system tables") + if min_rate: + config_info.append(f"Min rate: {min_rate}/sec") + if not show_replicas: + config_info.append("Primary shards only") + + console.print("[dim]" + " | ".join(config_info) + "[/dim]") + console.print() + + # Take first snapshot + if not watch: + console.print("๐Ÿ“ท Taking first snapshot...") + snapshot1 = get_filtered_snapshot() + + if not snapshot1: + console.print("[yellow]No started shards found matching criteria[/yellow]") + return + + if not watch: + console.print(f" Tracking {len(snapshot1)} started shards for activity") + console.print(f"โฑ๏ธ Waiting {interval} seconds for activity...") + + # Wait for observation interval + if watch: + # Simplified countdown for watch mode + for remaining in range(interval, 0, -1): + if remaining % 5 == 0 or remaining <= 3: # Show fewer updates + console.print(f"[dim]โฑ๏ธ {remaining}s...[/dim]", end="\r") + time.sleep(1) + console.print(" " * 15, end="\r") # Clear countdown + else: + time.sleep(interval) + + # Take second snapshot + if not watch: + console.print("๐Ÿ“ท Taking second snapshot...") + snapshot2 = get_filtered_snapshot() + + if not snapshot2: + console.print("[yellow]No started shards found in second snapshot[/yellow]") + return + + if not watch: + console.print(f" Tracking {len(snapshot2)} started shards for activity") + + # Compare snapshots and show results + activities = monitor.compare_snapshots(snapshot1, snapshot2, min_activity_threshold=min_checkpoint_delta) + + # Apply additional filters + if not show_replicas: + activities = [a for a in activities if a.is_primary] + + if min_rate: + activities = [a for a in activities if a.activity_rate >= min_rate] + + if not activities: + console.print( + f"[green]โœ… No shards exceeded activity threshold ({min_checkpoint_delta:,} checkpoint changes)[/green]" + ) + if min_rate: + console.print(f"[dim]Also filtered by minimum rate: {min_rate}/sec[/dim]") + else: + if not watch: + overlap_count = len({s.shard_identifier for s in snapshot1} & {s.shard_identifier for s in snapshot2}) + console.print(f"[dim]Analyzed {overlap_count} shards present in both snapshots[/dim]") + console.print(monitor.format_activity_display(activities, show_count=count, watch_mode=watch)) + + try: + if watch: + console.print("[dim]Press Ctrl+C to stop monitoring[/dim]") + console.print() + + while True: + run_single_analysis() + if watch: + console.print(f"\n[dim]โ”โ”โ” Next update in {interval}s โ”โ”โ”[/dim]\n") + time.sleep(interval) + else: + run_single_analysis() + + except KeyboardInterrupt: + console.print("\n[yellow]Monitoring stopped by user[/yellow]") + except Exception as e: + console.print(f"[red]Error during active shards monitoring: {e}[/red]") + import traceback + + console.print(f"[dim]{traceback.format_exc()}[/dim]") + + @main.command() @click.option("--table", "-t", help="Analyze zones for specific table only") @click.option("--show-shards/--no-show-shards", default=False, help="Show individual shard details (default: False)") @@ -330,13 +524,14 @@ def monitor_recovery( xmover monitor-recovery --watch # Continuous monitoring xmover monitor-recovery --recovery-type PEER # Only PEER recoveries """ + effective_recovery_type = None if recovery_type == "all" else recovery_type recovery_monitor = RecoveryMonitor( client=ctx.obj["client"], options=RecoveryOptions( table=table, node=node, refresh_interval=refresh_interval, - recovery_type=recovery_type, + recovery_type=effective_recovery_type, include_transitioning=include_transitioning, ), ) diff --git a/cratedb_toolkit/admin/xmover/model.py b/cratedb_toolkit/admin/xmover/model.py index 34e43f77..7f962c3c 100644 --- a/cratedb_toolkit/admin/xmover/model.py +++ b/cratedb_toolkit/admin/xmover/model.py @@ -1,4 +1,3 @@ -import dataclasses from dataclasses import dataclass from typing import Dict, Optional @@ -149,6 +148,12 @@ def safety_score(self) -> float: if "rebalancing" in self.reason.lower(): score += 0.2 + # Consider shard size - smaller shards are safer to move + if self.size_gb < 10: + score += 0.1 + elif self.size_gb > 100: + score -= 0.2 + # Ensure score stays in valid range return max(0.0, min(1.0, score)) @@ -165,7 +170,7 @@ class DistributionStats: node_balance_score: float # 0-100, higher is better -@dataclasses.dataclass +@dataclass class SizeCriteria: min_size: float = 40.0 max_size: float = 60.0 @@ -173,7 +178,7 @@ class SizeCriteria: source_node: Optional[str] = None -@dataclasses.dataclass +@dataclass class ShardRelocationConstraints: min_size: float = SizeCriteria().min_size max_size: float = SizeCriteria().max_size @@ -184,3 +189,67 @@ class ShardRelocationConstraints: max_recommendations: int = 10 max_disk_usage: float = 90.0 prioritize_space: bool = False + + +@dataclass +class ActiveShardSnapshot: + """Snapshot of active shard checkpoint data for tracking activity""" + + schema_name: str + table_name: str + shard_id: int + node_name: str + is_primary: bool + partition_ident: str + local_checkpoint: int + global_checkpoint: int + translog_uncommitted_bytes: int + timestamp: float # Unix timestamp when snapshot was taken + + @property + def checkpoint_delta(self) -> int: + """Current checkpoint delta (local - global)""" + return self.local_checkpoint - self.global_checkpoint + + @property + def translog_uncommitted_mb(self) -> float: + """Translog uncommitted size in MB""" + return self.translog_uncommitted_bytes / (1024 * 1024) + + @property + def shard_identifier(self) -> str: + """Unique identifier for this shard including partition""" + shard_type = "P" if self.is_primary else "R" + partition = f":{self.partition_ident}" if self.partition_ident else "" + return f"{self.schema_name}.{self.table_name}:{self.shard_id}:{self.node_name}:{shard_type}{partition}" + + +@dataclass +class ActiveShardActivity: + """Activity comparison between two snapshots of the same shard""" + + schema_name: str + table_name: str + shard_id: int + node_name: str + is_primary: bool + partition_ident: str + local_checkpoint_delta: int # Change in local checkpoint between snapshots + snapshot1: ActiveShardSnapshot + snapshot2: ActiveShardSnapshot + time_diff_seconds: float + + @property + def activity_rate(self) -> float: + """Activity rate as checkpoint changes per second""" + if self.time_diff_seconds > 0: + return self.local_checkpoint_delta / self.time_diff_seconds + return 0.0 + + @property + def shard_type(self) -> str: + return "PRIMARY" if self.is_primary else "REPLICA" + + @property + def table_identifier(self) -> str: + return f"{self.schema_name}.{self.table_name}" diff --git a/cratedb_toolkit/admin/xmover/operational/candidates.py b/cratedb_toolkit/admin/xmover/operational/candidates.py index dd7d4930..9841624b 100644 --- a/cratedb_toolkit/admin/xmover/operational/candidates.py +++ b/cratedb_toolkit/admin/xmover/operational/candidates.py @@ -14,7 +14,7 @@ class CandidateFinder: def __init__(self, analyzer: ShardAnalyzer): self.analyzer = analyzer - def movement_candidates(self, criteria: SizeCriteria, limit: int): + def movement_candidates(self, criteria: SizeCriteria, limit: int) -> int: """ Find shard candidates for movement based on size criteria @@ -23,7 +23,7 @@ def movement_candidates(self, criteria: SizeCriteria, limit: int): """ console.print( - Panel.fit(f"[bold blue]Finding Moveable Shards ({criteria.min_size}-{criteria.max_size}GB)[/bold blue]") + Panel.fit(f"[bold blue]Finding Movable Shards ({criteria.min_size}-{criteria.max_size}GB)[/bold blue]") ) if criteria.source_node: @@ -45,7 +45,7 @@ def movement_candidates(self, criteria: SizeCriteria, limit: int): console.print("[dim]Tip: Try different size ranges or remove --node filter to see all candidates[/dim]") else: console.print("[yellow]No moveable shards found in the specified size range.[/yellow]") - return + return 0 # Show limited results shown_candidates = candidates[:limit] @@ -82,3 +82,5 @@ def movement_candidates(self, criteria: SizeCriteria, limit: int): if len(candidates) > limit: console.print(f"\n[dim]... and {len(candidates) - limit} more candidates[/dim]") + + return len(candidates) diff --git a/cratedb_toolkit/admin/xmover/operational/monitor.py b/cratedb_toolkit/admin/xmover/operational/monitor.py index d88a295f..206bacbf 100644 --- a/cratedb_toolkit/admin/xmover/operational/monitor.py +++ b/cratedb_toolkit/admin/xmover/operational/monitor.py @@ -24,9 +24,9 @@ class RecoveryOptions: class RecoveryMonitor: """Monitor shard recovery operations""" - def __init__(self, client: CrateDBClient, options: RecoveryOptions): + def __init__(self, client: CrateDBClient, options: Optional[RecoveryOptions] = None): self.client = client - self.options = options + self.options = options or RecoveryOptions() def get_cluster_recovery_status(self) -> List[RecoveryInfo]: """Get comprehensive recovery status with minimal cluster impact""" @@ -37,7 +37,7 @@ def get_cluster_recovery_status(self) -> List[RecoveryInfo]: ) # Apply recovery type filter - if self.options.recovery_type is not None: + if self.options.recovery_type is not None and self.options.recovery_type.lower() != "all": recoveries = [r for r in recoveries if r.recovery_type.upper() == self.options.recovery_type.upper()] return recoveries @@ -178,7 +178,6 @@ def start(self, watch: bool, debug: bool = False): # Track previous state for change detection previous_recoveries: Dict[str, Dict[str, Any]] = {} - previous_timestamp = None first_run = True while True: @@ -307,7 +306,6 @@ def start(self, watch: bool, debug: bool = False): elif active_count > 0: console.print(f"{current_time} | {status} (no changes)") - previous_timestamp = current_time # noqa: F841 first_run = False time.sleep(self.options.refresh_interval) diff --git a/cratedb_toolkit/admin/xmover/operational/recommend.py b/cratedb_toolkit/admin/xmover/operational/recommend.py index ab5156e6..f7f9e3ea 100644 --- a/cratedb_toolkit/admin/xmover/operational/recommend.py +++ b/cratedb_toolkit/admin/xmover/operational/recommend.py @@ -123,8 +123,8 @@ def validate(self, request: ShardRelocationRequest): console.print() console.print("[dim]# Monitor shard health after execution[/dim]") console.print( - "[dim]# Check with: SELECT * FROM sys.shards " - "WHERE table_name = '{table_name}' AND id = {shard_id};[/dim]" + "[dim]# Check with: SELECT * FROM sys.shards " # noqa: S608 + f"WHERE table_name = '{table_name}' AND id = {request.shard_id};[/dim]" ) else: console.print("[red]โœ— VALIDATION FAILED - Move not safe[/red]") @@ -323,7 +323,7 @@ def execute( rec, max_disk_usage_percent=constraints.max_disk_usage ) if not is_safe: - if "Zone conflict" in safety_msg: + if "zone conflict" in safety_msg.lower(): zone_conflicts += 1 console.print(f"-- Move {i}: SKIPPED - {safety_msg}") console.print( @@ -340,7 +340,7 @@ def execute( # Auto-execution if requested if auto_execute: - self._execute_recommendations_safely(recommendations, validate) + self._execute_recommendations_safely(constraints, recommendations, validate) if validate and safe_moves < len(recommendations): if zone_conflicts > 0: @@ -352,14 +352,16 @@ def execute( f"[yellow]Warning: Only {safe_moves} of {len(recommendations)} moves passed safety validation[/yellow]" ) - def _execute_recommendations_safely(self, recommendations, validate: bool): + def _execute_recommendations_safely(self, constraints, recommendations, validate: bool): """Execute recommendations with extensive safety measures""" # Filter to only safe recommendations safe_recommendations = [] if validate: for rec in recommendations: - is_safe, safety_msg = self.analyzer.validate_move_safety(rec, max_disk_usage_percent=95.0) + is_safe, safety_msg = self.analyzer.validate_move_safety( + rec, max_disk_usage_percent=constraints.max_disk_usage + ) if is_safe: safe_recommendations.append(rec) else: @@ -423,7 +425,8 @@ def _execute_recommendations_safely(self, recommendations, validate: bool): # Execute the SQL command result = self.client.execute_query(sql_command) - if result.get("rowcount", 0) >= 0: # Success indicator for ALTER statements + # ALTER TABLE REROUTE commands don't return rowcount, check for no error instead. + if "error" not in result: console.print(" [green]โœ… SUCCESS[/green] - Move initiated") successful_moves += 1 @@ -482,7 +485,8 @@ def _wait_for_recovery_capacity(self, max_concurrent_recoveries: int = 5): while True: # Check active recoveries (including transitioning) recoveries = recovery_monitor.get_cluster_recovery_status() - active_count = len([r for r in recoveries if r.overall_progress < 100.0 or r.stage != "DONE"]) + # Count recoveries that are actively running (not completed) + active_count = len([r for r in recoveries if r.overall_progress < 100.0]) status = f"{active_count}/{max_concurrent_recoveries}" if active_count < max_concurrent_recoveries: if wait_time > 0: diff --git a/cratedb_toolkit/admin/xmover/util/database.py b/cratedb_toolkit/admin/xmover/util/database.py index 21950ab0..e487ed8f 100644 --- a/cratedb_toolkit/admin/xmover/util/database.py +++ b/cratedb_toolkit/admin/xmover/util/database.py @@ -10,7 +10,7 @@ import urllib3 from dotenv import load_dotenv -from cratedb_toolkit.admin.xmover.model import NodeInfo, RecoveryInfo, ShardInfo +from cratedb_toolkit.admin.xmover.model import ActiveShardSnapshot, NodeInfo, RecoveryInfo, ShardInfo logger = logging.getLogger(__name__) @@ -39,6 +39,8 @@ def __init__(self, connection_string: Optional[str] = None): if not self.connection_string.endswith("/_sql"): self.connection_string = self.connection_string.rstrip("/") + "/_sql" + self.session = requests.Session() + def execute_query(self, query: str, parameters: Optional[List] = None) -> Dict[str, Any]: """Execute a SQL query against CrateDB""" payload: Dict[str, Any] = {"stmt": query} @@ -51,11 +53,18 @@ def execute_query(self, query: str, parameters: Optional[List] = None) -> Dict[s auth = (self.username, self.password) try: - response = requests.post( + response = self.session.post( self.connection_string, json=payload, auth=auth, verify=self.ssl_verify, timeout=30 ) response.raise_for_status() - return response.json() + data = response.json() + # CrateDB may include an "error" field even with 200 OK + if isinstance(data, dict) and "error" in data and data["error"]: + # Best-effort message extraction + err = data["error"] + msg = err.get("message") if isinstance(err, dict) else str(err) + raise Exception(f"CrateDB error: {msg}") + return data except requests.exceptions.RequestException as e: raise Exception(f"Failed to execute query: {e}") from e @@ -335,13 +344,13 @@ def get_recovery_details(self, schema_name: str, table_name: str, shard_id: int) s."primary", s.translog_stats['size'] as translog_size FROM sys.shards s - WHERE s.table_name = ? AND s.id = ? + WHERE s.schema_name = ? AND s.table_name = ? AND s.id = ? AND (s.state = 'RECOVERING' OR s.routing_state IN ('INITIALIZING', 'RELOCATING')) ORDER BY s.schema_name LIMIT 1 """ - result = self.execute_query(query, [table_name, shard_id]) + result = self.execute_query(query, [schema_name, table_name, shard_id]) if not result.get("rows"): return None @@ -496,3 +505,60 @@ def _is_recovery_completed(self, recovery_info: RecoveryInfo) -> bool: and recovery_info.files_percent >= 100.0 and recovery_info.bytes_percent >= 100.0 ) + + def get_active_shards_snapshot(self, min_checkpoint_delta: int = 1000) -> List[ActiveShardSnapshot]: + """Get a snapshot of all started shards for activity monitoring + + Note: This captures ALL started shards regardless of current activity level. + The min_checkpoint_delta parameter is kept for backwards compatibility but + filtering is now done during snapshot comparison to catch shards that + become active between observations. + + Args: + min_checkpoint_delta: Kept for compatibility - filtering now done in comparison + + Returns: + List of ActiveShardSnapshot objects for all started shards + """ + import time + + query = """ + SELECT sh.schema_name, \ + sh.table_name, \ + sh.id AS shard_id, \ + sh."primary", \ + node['name'] as node_name, \ + sh.partition_ident, \ + sh.translog_stats['uncommitted_size'] AS translog_uncommitted_bytes, \ + sh.seq_no_stats['local_checkpoint'] AS local_checkpoint, \ + sh.seq_no_stats['global_checkpoint'] AS global_checkpoint + FROM sys.shards AS sh + WHERE sh.state = 'STARTED' + ORDER BY sh.schema_name, sh.table_name, sh.id, sh.node['name'] \ + """ + + try: + result = self.execute_query(query) + snapshots = [] + current_time = time.time() + + for row in result.get("rows", []): + snapshot = ActiveShardSnapshot( + schema_name=row[0], + table_name=row[1], + shard_id=row[2], + is_primary=row[3], + node_name=row[4], + partition_ident=row[5] or "", + translog_uncommitted_bytes=row[6] or 0, + local_checkpoint=row[7] or 0, + global_checkpoint=row[8] or 0, + timestamp=current_time, + ) + snapshots.append(snapshot) + + return snapshots + + except Exception as e: + logger.error(f"Error getting active shards snapshot: {e}") + return [] diff --git a/cratedb_toolkit/admin/xmover/util/error.py b/cratedb_toolkit/admin/xmover/util/error.py index 11dd5f39..22494098 100644 --- a/cratedb_toolkit/admin/xmover/util/error.py +++ b/cratedb_toolkit/admin/xmover/util/error.py @@ -1,12 +1,23 @@ -from typing import List, Optional, cast +from typing import List, Optional -from rich.console import Console +from rich import get_console from rich.panel import Panel -console = Console() +console = get_console() def explain_cratedb_error(error_message: Optional[str]): + """ + Decode and troubleshoot common CrateDB shard allocation errors. + + Parameters + ---------- + error_message: + Raw CrateDB error message. If None and interactive=True, the user is prompted + to paste the message (finish with two blank lines). + interactive: + When False, never prompt for input; return early if no message is provided. + """ console.print(Panel.fit("[bold blue]CrateDB Error Message Decoder[/bold blue]")) console.print("[dim]Helps decode and troubleshoot CrateDB shard allocation errors[/dim]") console.print() @@ -24,7 +35,7 @@ def explain_cratedb_error(error_message: Optional[str]): break error_message = "\n".join(lines) - if not error_message.strip(): + if not (error_message or "").strip(): console.print("[yellow]No error message provided[/yellow]") return @@ -96,7 +107,7 @@ def explain_cratedb_error(error_message: Optional[str]): error_lower = error_message.lower() for pattern_info in error_patterns: - if cast(str, pattern_info["pattern"]).lower() in error_lower: + if pattern_info["pattern"].lower() in error_lower: # type: ignore[attr-defined] matches.append(pattern_info) if matches: diff --git a/cratedb_toolkit/admin/xmover/util/format.py b/cratedb_toolkit/admin/xmover/util/format.py index 82c8a3d0..7bced419 100644 --- a/cratedb_toolkit/admin/xmover/util/format.py +++ b/cratedb_toolkit/admin/xmover/util/format.py @@ -1,11 +1,14 @@ +TL_MIN_BYTES = 10 * 1024 * 1024 # 10MiB threshold for visibility + + def format_size(size_gb: float) -> str: """Format size in GB with appropriate precision""" - if size_gb >= 1000: - return f"{size_gb / 1000:.1f}TB" + if size_gb >= 1024: + return f"{size_gb / 1024:.1f}TB" elif size_gb >= 1: return f"{size_gb:.1f}GB" else: - return f"{size_gb * 1000:.0f}MB" + return f"{size_gb * 1024:.0f}MB" def format_percentage(value: float) -> str: @@ -22,8 +25,8 @@ def format_translog_info(recovery_info) -> str: """Format translog size information with color coding""" tl_bytes = recovery_info.translog_size_bytes - # Only show if significant (>10MB for production) - if tl_bytes < 10 * 1024 * 1024: # 10MB for production + # Only show if significant (>10MB for production), ignore others. + if tl_bytes < TL_MIN_BYTES: return "" tl_gb = recovery_info.translog_size_gb @@ -36,10 +39,5 @@ def format_translog_info(recovery_info) -> str: else: color = "green" - # Format size - if tl_gb >= 1.0: - size_str = f"{tl_gb:.1f}GB" - else: - size_str = f"{tl_gb * 1000:.0f}MB" - + size_str = format_size(tl_gb) return f" [dim]([{color}]TL:{size_str}[/{color}])[/dim]" diff --git a/doc/admin/xmover/handbook.md b/doc/admin/xmover/handbook.md index 05a3c57a..a2049fa2 100644 --- a/doc/admin/xmover/handbook.md +++ b/doc/admin/xmover/handbook.md @@ -57,7 +57,7 @@ xmover recommend --prioritize-space ``` ### Shard Distribution Analysis -This view is dedicating a specific focus on large tables. +This view focuses on large tables. ```bash # Analyze distribution anomalies for top 10 largest tables xmover shard-distribution @@ -128,11 +128,12 @@ Generates intelligent shard movement recommendations for cluster rebalancing. - `--zone-tolerance`: Zone balance tolerance percentage (default: 10) - `--min-free-space`: Minimum free space required on target nodes in GB (default: 100) - `--max-moves`: Maximum number of move recommendations (default: 10) -- `--max-disk-usage`: Maximum disk usage percentage for target nodes (default: 85) +- `--max-disk-usage`: Maximum disk usage percentage for target nodes (default: 90) - `--validate/--no-validate`: Validate move safety (default: True) - `--prioritize-space/--prioritize-zones`: Prioritize available space over zone balancing (default: False) - `--dry-run/--execute`: Show what would be done without generating SQL commands (default: True) - `--node`: Only recommend moves from this specific source node (e.g., data-hot-4) +- `--auto-execute`: Automatically execute the SQL commands (requires `--execute`, asks for confirmation) (default: False) **Examples:** ```bash @@ -244,6 +245,132 @@ xmover monitor-recovery --watch --include-transitioning - **PEER**: Copying shard data from another node (replication/relocation) - **DISK**: Rebuilding shard from local data (after restart/disk issues) + +### `active-shards` +Monitor the most active shards by tracking checkpoint progression over time. +This command helps identify which shards are receiving the most write activity +by measuring local checkpoint progression between two snapshots. + +**Options:** +- `--count`: Number of most active shards to show (default: 10) +- `--interval`: Observation interval in seconds (default: 30) +- `--min-checkpoint-delta`: Minimum checkpoint progression between snapshots to show shard (default: 1000) +- `--table, -t`: Monitor specific table only +- `--node, -n`: Monitor specific node only +- `--watch, -w`: Continuously monitor (refresh every interval) +- `--exclude-system`: Exclude system tables (gc.*, information_schema.*, *_events, *_log) +- `--min-rate`: Minimum activity rate (changes/sec) to show +- `--show-replicas/--hide-replicas`: Show replica shards (default: True) + +**How it works:** +1. **Takes snapshot of ALL started shards** (not just currently active ones) +2. **Waits for observation interval** (configurable, default: 30 seconds) +3. **Takes second snapshot** of all started shards +4. **Compares snapshots** to find shards with checkpoint progression โ‰ฅ threshold +5. **Shows ranked results** with activity trends and insights + +**Enhanced output features:** +- **Checkpoint visibility**: Shows actual `local_checkpoint` values (CP Start โ†’ CP End โ†’ Delta) +- **Partition awareness**: Separate tracking for partitioned tables (different partition_ident values) +- **Activity trends**: ๐Ÿ”ฅ HOT (โ‰ฅ100/s), ๐Ÿ“ˆ HIGH (โ‰ฅ50/s), ๐Ÿ“Š MED (โ‰ฅ10/s), ๐Ÿ“‰ LOW (<10/s) +- **Smart insights**: Identifies concentration patterns and load distribution (non-watch mode) +- **Flexible filtering**: Exclude system tables, set minimum rates, hide replicas +- **Context information**: Total activity, average rates, observation period +- **Clean watch mode**: Streamlined output without legend/insights for continuous monitoring + +This approach captures shards that become active during the observation period, providing a complete view of cluster write patterns and identifying hot spots. The enhanced filtering helps focus on business-critical activity patterns. + +**Sample output (single run):** +``` +๐Ÿ”ฅ Most Active Shards (3 shown, 30s observation period) +Total checkpoint activity: 190,314 changes, Average rate: 2,109.0/sec + Rank | Schema.Table | Shard | Partition | Node | Type | Checkpoint ฮ” | Rate/sec | Trend + ----------------------------------------------------------------------------------------------------------- + 1 | gc.scheduled_jobs_log | 0 | - | data-hot-8 | P | 113,744 | 3,791.5 | ๐Ÿ”ฅ HOT + 2 | TURVO.events | 0 | 04732dpl6osj8d | data-hot-0 | P | 45,837 | 1,527.9 | ๐Ÿ”ฅ HOT + 3 | doc.user_actions | 1 | 04732dpk70rj6d | data-hot-2 | P | 30,733 | 1,024.4 | ๐Ÿ”ฅ HOT +Legend: + โ€ข Checkpoint ฮ”: Write operations during observation period + โ€ข Partition: partition_ident (truncated if >14 chars, '-' if none) +Insights: + โ€ข 3 HOT shards (โ‰ฅ100 changes/sec) - consider load balancing + โ€ข All active shards are PRIMARY - normal write pattern +``` + +**Sample output (watch mode - cleaner):** +``` +30s interval | threshold: 1,000 | top 5 +๐Ÿ”ฅ Most Active Shards (3 shown, 30s observation period) +Total checkpoint activity: 190,314 changes, Average rate: 2,109.0/sec + Rank | Schema.Table | Shard | Partition | Node | Type | Checkpoint ฮ” | Rate/sec | Trend + ----------------------------------------------------------------------------------------------------------- + 1 | gc.scheduled_jobs_log | 0 | - | data-hot-8 | P | 113,744 | 3,791.5 | ๐Ÿ”ฅ HOT + 2 | TURVO.events | 0 | 04732dpl6osj8d | data-hot-0 | P | 45,837 | 1,527.9 | ๐Ÿ”ฅ HOT + 3 | doc.user_actions | 1 | 04732dpk70rj6d | data-hot-2 | P | 30,733 | 1,024.4 | ๐Ÿ”ฅ HOT +โ”โ”โ” Next update in 30s โ”โ”โ” +``` + +#### Examples +```bash +# Show top 10 most active shards over 30 seconds +xmover active-shards + +# Top 20 shards with 60-second observation period +xmover active-shards --count 20 --interval 60 + +# Continuous monitoring with 30-second intervals +xmover active-shards --watch --interval 30 + +# Monitor specific table activity +xmover active-shards --table my_table --watch + +# Monitor specific node with custom threshold +xmover active-shards --node data-hot-1 --min-checkpoint-delta 500 + +# Exclude system tables and event logs for business data focus +xmover active-shards --exclude-system --count 20 + +# Only show high-activity shards (โ‰ฅ50 changes/sec) +xmover active-shards --min-rate 50 --count 15 + +# Focus on primary shards only +xmover active-shards --hide-replicas --count 20 +``` + +#### Monitoring Active Shards and Write Patterns + +Identify which shards are receiving the most write activity: + +1. Quick snapshot of most active shards: +```bash +# Show top 10 most active shards over 30 seconds +xmover active-shards + +# Longer observation period for more accurate results +xmover active-shards --count 15 --interval 60 +``` + +2. Continuous monitoring for real-time insights: +```bash +# Continuous monitoring with 30-second intervals +xmover active-shards --watch --interval 30 + +# Monitor specific table for focused analysis +xmover active-shards --table critical_table --watch +``` + +3. Integration with rebalancing workflow: +```bash +# Identify hot shards first +xmover active-shards --count 20 --interval 60 + +# Move hot shards away from overloaded nodes +xmover recommend --table hot_table --prioritize-space --execute + +# Monitor the impact +xmover active-shards --table hot_table --watch +``` + ### `test-connection` Tests the connection to CrateDB and displays basic cluster information. diff --git a/doc/admin/xmover/index.md b/doc/admin/xmover/index.md index 99fd4404..d1aead79 100644 --- a/doc/admin/xmover/index.md +++ b/doc/admin/xmover/index.md @@ -11,7 +11,7 @@ SQL commands for shard rebalancing and node decommissioning. ## Features - **Cluster Analysis**: Complete overview of shard distribution across nodes and zones -- **Shard Distribution Analysis**: Detect and rank distribution anomalies across largest tables +- **Shard Distribution Analysis**: Detect and rank distribution anomalies across the largest tables - **Shard Movement Recommendations**: Intelligent suggestions for rebalancing with safety validation - **Recovery Monitoring**: Track ongoing shard recovery operations with progress details - **Zone Conflict Detection**: Prevents moves that would violate CrateDB's zone awareness diff --git a/doc/admin/xmover/queries.md b/doc/admin/xmover/queries.md index 9844d8f6..17af71c9 100644 --- a/doc/admin/xmover/queries.md +++ b/doc/admin/xmover/queries.md @@ -69,7 +69,7 @@ ORDER BY name; +------------+--------------------+-----------------------------------------------+ ``` -## List biggest SHARDS on a particular Nodes +## List biggest shards on a particular node ```sql select node['name'], table_name, schema_name, id, sum(size) / 1024^3 from sys.shards @@ -219,7 +219,7 @@ SELECT ## "BIGDUDES" Focuses on your **biggest storage consumers** and shows how their shards are distributed across nodes. -ยดยดยดsql +```sql WITH largest_tables AS ( SELECT schema_name, diff --git a/pyproject.toml b/pyproject.toml index f6614eb8..6770d234 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -331,6 +331,7 @@ lint.per-file-ignores."doc/conf.py" = [ "A001", "ERA001" ] lint.per-file-ignores."examples/*" = [ "ERA001", "F401", "T201", "T203" ] # Allow `print` and `pprint` lint.per-file-ignores."tests/*" = [ "S101" ] # Allow use of `assert`, and `print`. lint.per-file-ignores."tests/adapter/test_rockset.py" = [ "E402" ] +lint.per-file-ignores."tests/admin/*" = [ "T201" ] # Allow use of `print`. lint.per-file-ignores."tests/info/test_http.py" = [ "E402" ] [tool.pytest.ini_options] diff --git a/tests/admin/test_active_shard_monitor.py b/tests/admin/test_active_shard_monitor.py new file mode 100644 index 00000000..55268b15 --- /dev/null +++ b/tests/admin/test_active_shard_monitor.py @@ -0,0 +1,472 @@ +""" +Tests for ActiveShardMonitor functionality +""" + +import time +from unittest.mock import Mock, patch + +from cratedb_toolkit.admin.xmover.analysis.shard import ActiveShardMonitor +from cratedb_toolkit.admin.xmover.model import ActiveShardActivity, ActiveShardSnapshot +from cratedb_toolkit.admin.xmover.util.database import CrateDBClient + + +class TestActiveShardSnapshot: + """Test ActiveShardSnapshot dataclass""" + + def test_checkpoint_delta(self): + """Test checkpoint delta calculation""" + snapshot = ActiveShardSnapshot( + schema_name="test_schema", + table_name="test_table", + shard_id=1, + node_name="node1", + is_primary=True, + partition_ident="", + local_checkpoint=1500, + global_checkpoint=500, + translog_uncommitted_bytes=10485760, # 10MB + timestamp=time.time(), + ) + + assert snapshot.checkpoint_delta == 1000 + assert snapshot.translog_uncommitted_mb == 10.0 + assert snapshot.shard_identifier == "test_schema.test_table:1:node1:P" + + +class TestActiveShardActivity: + """Test ActiveShardActivity dataclass""" + + def test_activity_calculations(self): + """Test activity rate and property calculations""" + snapshot1 = ActiveShardSnapshot( + schema_name="test_schema", + table_name="test_table", + shard_id=1, + node_name="node1", + is_primary=True, + partition_ident="", + local_checkpoint=1000, + global_checkpoint=500, + translog_uncommitted_bytes=5242880, # 5MB + timestamp=100.0, + ) + + snapshot2 = ActiveShardSnapshot( + schema_name="test_schema", + table_name="test_table", + shard_id=1, + node_name="node1", + is_primary=True, + partition_ident="", + local_checkpoint=1500, + global_checkpoint=500, + translog_uncommitted_bytes=10485760, # 10MB + timestamp=130.0, # 30 seconds later + ) + + activity = ActiveShardActivity( + schema_name="test_schema", + table_name="test_table", + shard_id=1, + node_name="node1", + is_primary=True, + partition_ident="", + local_checkpoint_delta=500, + snapshot1=snapshot1, + snapshot2=snapshot2, + time_diff_seconds=30.0, + ) + + assert activity.activity_rate == 500 / 30.0 # ~16.67 changes/sec + assert activity.shard_type == "PRIMARY" + assert activity.table_identifier == "test_schema.test_table" + + +class TestCrateDBClientActiveShards: + """Test CrateDB client active shards functionality""" + + @patch.object(CrateDBClient, "execute_query") + def test_get_active_shards_snapshot_success(self, mock_execute): + """Test successful snapshot retrieval""" + mock_execute.return_value = { + "rows": [ + ["schema1", "table1", 1, True, "node1", "", 10485760, 1500, 500], + ["schema1", "table2", 2, False, "node2", "part1", 20971520, 2000, 800], + ] + } + + client = CrateDBClient("http://test") + snapshots = client.get_active_shards_snapshot(min_checkpoint_delta=1000) + + assert len(snapshots) == 2 + + # Check first snapshot + snap1 = snapshots[0] + assert snap1.schema_name == "schema1" + assert snap1.table_name == "table1" + assert snap1.shard_id == 1 + assert snap1.is_primary is True + assert snap1.node_name == "node1" + assert snap1.local_checkpoint == 1500 + assert snap1.global_checkpoint == 500 + assert snap1.checkpoint_delta == 1000 + assert snap1.translog_uncommitted_mb == 10.0 + + # Check second snapshot + snap2 = snapshots[1] + assert snap2.schema_name == "schema1" + assert snap2.table_name == "table2" + assert snap2.shard_id == 2 + assert snap2.is_primary is False + assert snap2.node_name == "node2" + assert snap2.partition_ident == "part1" + assert snap2.checkpoint_delta == 1200 + assert snap2.translog_uncommitted_mb == 20.0 + + # Verify query was called without checkpoint delta filter (new behavior) + mock_execute.assert_called_once() + args = mock_execute.call_args[0] + # No longer passes min_checkpoint_delta parameter + assert len(args) == 1 # Only the query, no parameters + + @patch.object(CrateDBClient, "execute_query") + def test_get_active_shards_snapshot_empty(self, mock_execute): + """Test snapshot retrieval with no results""" + mock_execute.return_value = {"rows": []} + + client = CrateDBClient("http://test") + snapshots = client.get_active_shards_snapshot(min_checkpoint_delta=1000) + + assert snapshots == [] + + @patch.object(CrateDBClient, "execute_query") + def test_get_active_shards_snapshot_error(self, mock_execute): + """Test snapshot retrieval with database error""" + mock_execute.side_effect = Exception("Database connection failed") + + client = CrateDBClient("http://test") + snapshots = client.get_active_shards_snapshot(min_checkpoint_delta=1000) + + assert snapshots == [] + + +class TestActiveShardMonitor: + """Test ActiveShardMonitor class""" + + def setup_method(self): + """Set up test fixtures""" + self.mock_client = Mock(spec=CrateDBClient) + self.monitor = ActiveShardMonitor(self.mock_client) + + def create_test_snapshot( + self, + schema: str, + table: str, + shard_id: int, + node: str, + is_primary: bool, + local_checkpoint: int, + timestamp: float, + ): + """Helper to create test snapshots""" + return ActiveShardSnapshot( + schema_name=schema, + table_name=table, + shard_id=shard_id, + node_name=node, + is_primary=is_primary, + partition_ident="", + local_checkpoint=local_checkpoint, + global_checkpoint=500, # Fixed for simplicity + translog_uncommitted_bytes=10485760, # 10MB + timestamp=timestamp, + ) + + def test_compare_snapshots_with_activity(self): + """Test comparing snapshots with active shards""" + # Create first snapshot + snapshot1 = [ + self.create_test_snapshot("schema1", "table1", 1, "node1", True, 1000, 100.0), + self.create_test_snapshot("schema1", "table2", 1, "node2", False, 2000, 100.0), + self.create_test_snapshot("schema1", "table3", 1, "node1", True, 3000, 100.0), + ] + + # Create second snapshot (30 seconds later with activity) + snapshot2 = [ + self.create_test_snapshot("schema1", "table1", 1, "node1", True, 1500, 130.0), # +500 + self.create_test_snapshot("schema1", "table2", 1, "node2", False, 2200, 130.0), # +200 + self.create_test_snapshot("schema1", "table3", 1, "node1", True, 3000, 130.0), # No change + self.create_test_snapshot("schema1", "table4", 1, "node3", True, 1000, 130.0), # New shard + ] + + activities = self.monitor.compare_snapshots(snapshot1, snapshot2, min_activity_threshold=1) + + # Should have 2 activities (table3 had no change, table4 is new) + assert len(activities) == 2 + + # Check activities are sorted by checkpoint delta (highest first) + assert activities[0].local_checkpoint_delta == 500 # table1 + assert activities[0].schema_name == "schema1" + assert activities[0].table_name == "table1" + + assert activities[1].local_checkpoint_delta == 200 # table2 + assert activities[1].schema_name == "schema1" + assert activities[1].table_name == "table2" + + # Check activity rate calculation + assert activities[0].activity_rate == 500 / 30.0 # ~16.67/sec + assert activities[1].activity_rate == 200 / 30.0 # ~6.67/sec + + def test_compare_snapshots_no_activity(self): + """Test comparing snapshots with no activity""" + # Create identical snapshots + snapshot1 = [ + self.create_test_snapshot("schema1", "table1", 1, "node1", True, 1000, 100.0), + ] + + snapshot2 = [ + self.create_test_snapshot("schema1", "table1", 1, "node1", True, 1000, 130.0), # No change + ] + + activities = self.monitor.compare_snapshots(snapshot1, snapshot2, min_activity_threshold=1) + + assert activities == [] + + def test_compare_snapshots_no_overlap(self): + """Test comparing snapshots with no overlapping shards""" + snapshot1 = [ + self.create_test_snapshot("schema1", "table1", 1, "node1", True, 1000, 100.0), + ] + + snapshot2 = [ + self.create_test_snapshot("schema1", "table2", 1, "node2", True, 1500, 130.0), # Different shard + ] + + activities = self.monitor.compare_snapshots(snapshot1, snapshot2, min_activity_threshold=1) + + assert activities == [] + + def test_format_activity_display_with_activities(self): + """Test formatting activity display with data""" + # Create test activities + snapshot1 = self.create_test_snapshot("schema1", "table1", 1, "node1", True, 1000, 100.0) + snapshot2 = self.create_test_snapshot("schema1", "table1", 1, "node1", True, 1500, 130.0) + + activity = ActiveShardActivity( + schema_name="schema1", + table_name="table1", + shard_id=1, + node_name="node1", + is_primary=True, + partition_ident="", + local_checkpoint_delta=500, + snapshot1=snapshot1, + snapshot2=snapshot2, + time_diff_seconds=30.0, + ) + + display = self.monitor.format_activity_display([activity], show_count=10, watch_mode=False) + + # Check that output contains expected elements + assert "Most Active Shards" in display + assert "schema1.table1" in display + assert "500" in display # checkpoint delta + assert "16.7" in display # activity rate + assert "P" in display # primary indicator + assert "Legend:" in display + assert "Trend:" in display # new trend column explanation + assert "Partition:" in display # new partition column explanation + + def test_format_activity_display_empty(self): + """Test formatting activity display with no data""" + display = self.monitor.format_activity_display([], show_count=10, watch_mode=False) + + assert "No active shards with significant checkpoint progression found" in display + + def test_format_activity_display_count_limit(self): + """Test that display respects show_count limit""" + # Create multiple activities + activities = [] + for i in range(15): + snapshot1 = self.create_test_snapshot("schema1", f"table{i}", 1, "node1", True, 1000, 100.0) + snapshot2 = self.create_test_snapshot("schema1", f"table{i}", 1, "node1", True, 1000 + (i + 1) * 100, 130.0) + + activity = ActiveShardActivity( + schema_name="schema1", + table_name=f"table{i}", + shard_id=1, + node_name="node1", + is_primary=True, + partition_ident="", + local_checkpoint_delta=(i + 1) * 100, + snapshot1=snapshot1, + snapshot2=snapshot2, + time_diff_seconds=30.0, + ) + activities.append(activity) + + # Sort activities by checkpoint delta (highest first) - same as compare_snapshots does + activities.sort(key=lambda x: x.local_checkpoint_delta, reverse=True) + + # Should only show top 5 + display = self.monitor.format_activity_display(activities, show_count=5, watch_mode=False) + + # Count number of table entries in display + table_count = display.count("schema1.table") + assert table_count == 5 # Should only show 5 entries + + # Should show highest activity first (table14 has highest checkpoint delta) + assert "schema1.table14" in display + + def test_compare_snapshots_with_activity_threshold(self): + """Test filtering activities by minimum threshold""" + # Create snapshots with various activity levels + snapshot1 = [ + self.create_test_snapshot("schema1", "table1", 1, "node1", True, 1000, 100.0), # Will have +2000 delta + self.create_test_snapshot("schema1", "table2", 1, "node2", False, 2000, 100.0), # Will have +500 delta + self.create_test_snapshot("schema1", "table3", 1, "node1", True, 3000, 100.0), # Will have +100 delta + ] + + snapshot2 = [ + self.create_test_snapshot("schema1", "table1", 1, "node1", True, 3000, 130.0), # +2000 delta + self.create_test_snapshot("schema1", "table2", 1, "node2", False, 2500, 130.0), # +500 delta + self.create_test_snapshot("schema1", "table3", 1, "node1", True, 3100, 130.0), # +100 delta + ] + + # Test with threshold of 1000 - should only show table1 (2000 delta) + activities_high_threshold = self.monitor.compare_snapshots(snapshot1, snapshot2, min_activity_threshold=1000) + assert len(activities_high_threshold) == 1 + assert activities_high_threshold[0].table_name == "table1" + assert activities_high_threshold[0].local_checkpoint_delta == 2000 + + # Test with threshold of 200 - should show table1 and table2 + activities_medium_threshold = self.monitor.compare_snapshots(snapshot1, snapshot2, min_activity_threshold=200) + assert len(activities_medium_threshold) == 2 + assert activities_medium_threshold[0].local_checkpoint_delta == 2000 # table1 first (highest) + assert activities_medium_threshold[1].local_checkpoint_delta == 500 # table2 second + + # Test with threshold of 0 - should show all three + activities_low_threshold = self.monitor.compare_snapshots(snapshot1, snapshot2, min_activity_threshold=0) + assert len(activities_low_threshold) == 3 + assert activities_low_threshold[0].local_checkpoint_delta == 2000 # Sorted by activity + assert activities_low_threshold[1].local_checkpoint_delta == 500 + assert activities_low_threshold[2].local_checkpoint_delta == 100 + + def test_primary_replica_separation(self): + """Test that primary and replica shards are tracked separately""" + # Create snapshots with same table/shard but different primary/replica + snapshot1 = [ + # Primary shard + self.create_test_snapshot("gc", "scheduled_jobs_log", 0, "data-hot-8", True, 15876, 100.0), + # Replica shard (same table/shard/node but different type) + self.create_test_snapshot("gc", "scheduled_jobs_log", 0, "data-hot-8", False, 129434, 100.0), + ] + + snapshot2 = [ + # Primary shard progresses normally + self.create_test_snapshot("gc", "scheduled_jobs_log", 0, "data-hot-8", True, 16000, 130.0), # +124 delta + # Replica shard progresses normally + self.create_test_snapshot("gc", "scheduled_jobs_log", 0, "data-hot-8", False, 129500, 130.0), # +66 delta + ] + + activities = self.monitor.compare_snapshots(snapshot1, snapshot2, min_activity_threshold=1) + + # Should have 2 separate activities (primary and replica tracked separately) + assert len(activities) == 2 + + # Find primary and replica activities + primary_activity = next(a for a in activities if a.is_primary) + replica_activity = next(a for a in activities if not a.is_primary) + + # Verify deltas are calculated correctly for each type + assert primary_activity.local_checkpoint_delta == 124 # 16000 - 15876 + assert replica_activity.local_checkpoint_delta == 66 # 129500 - 129434 + + # Verify they have different shard identifiers + assert primary_activity.snapshot1.shard_identifier != replica_activity.snapshot1.shard_identifier + assert "data-hot-8:P" in primary_activity.snapshot1.shard_identifier + assert "data-hot-8:R" in replica_activity.snapshot1.shard_identifier + + # This test prevents the bug where we mixed primary CP End with replica CP Start + # which created fake deltas like 129434 - 15876 = 113558 + + def test_partition_separation(self): + """Test that partitions within the same table/shard are tracked separately""" + # Create snapshots with same table/shard but different partitions + snapshot1 = [ + # Partition 1 + self.create_test_snapshot("TURVO", "appointmentFormFieldData_events", 0, "data-hot-8", True, 32684, 100.0), + # Partition 2 (same table/shard/node/type but different partition) + self.create_test_snapshot("TURVO", "appointmentFormFieldData_events", 0, "data-hot-8", True, 54289, 100.0), + ] + + # Modify partition_ident for the snapshots to simulate different partitions + snapshot1[0].partition_ident = "04732dpl6osj8d1g60o30c1g" + snapshot1[1].partition_ident = "04732dpl6os3adpm60o30c1g" + + snapshot2 = [ + # Partition 1 progresses + self.create_test_snapshot("TURVO", "appointmentFormFieldData_events", 0, "data-hot-8", True, 32800, 130.0), + # +116 delta + # Partition 2 progresses + self.create_test_snapshot("TURVO", "appointmentFormFieldData_events", 0, "data-hot-8", True, 54400, 130.0), + # +111 delta + ] + + # Set partition_ident for second snapshot + snapshot2[0].partition_ident = "04732dpl6osj8d1g60o30c1g" + snapshot2[1].partition_ident = "04732dpl6os3adpm60o30c1g" + + activities = self.monitor.compare_snapshots(snapshot1, snapshot2, min_activity_threshold=1) + + # Should have 2 separate activities (partitions tracked separately) + assert len(activities) == 2 + + # Verify deltas are calculated correctly for each partition + partition1_activity = next(a for a in activities if "04732dpl6osj8d1g60o30c1g" in a.snapshot1.shard_identifier) + partition2_activity = next(a for a in activities if "04732dpl6os3adpm60o30c1g" in a.snapshot1.shard_identifier) + + assert partition1_activity.local_checkpoint_delta == 116 # 32800 - 32684 + assert partition2_activity.local_checkpoint_delta == 111 # 54400 - 54289 + + # Verify they have different shard identifiers due to partition + assert partition1_activity.snapshot1.shard_identifier != partition2_activity.snapshot1.shard_identifier + assert ":04732dpl6osj8d1g60o30c1g" in partition1_activity.snapshot1.shard_identifier + assert ":04732dpl6os3adpm60o30c1g" in partition2_activity.snapshot1.shard_identifier + + # This test prevents mixing partitions which would create fake activity measurements + + def test_format_activity_display_watch_mode(self): + """Test that watch mode excludes legend and insights""" + snapshot1 = self.create_test_snapshot("schema1", "table1", 1, "node1", True, 1000, 100.0) + snapshot2 = self.create_test_snapshot("schema1", "table1", 1, "node1", True, 1500, 130.0) + + activity = ActiveShardActivity( + schema_name="schema1", + table_name="table1", + shard_id=1, + node_name="node1", + is_primary=True, + partition_ident="", + local_checkpoint_delta=500, + snapshot1=snapshot1, + snapshot2=snapshot2, + time_diff_seconds=30.0, + ) + + # Test non-watch mode (should include legend and insights) + normal_display = self.monitor.format_activity_display([activity], show_count=10, watch_mode=False) + assert "Legend:" in normal_display + assert "Insights:" in normal_display + assert "Checkpoint ฮ”:" in normal_display + + # Test watch mode (should exclude legend and insights) + watch_display = self.monitor.format_activity_display([activity], show_count=10, watch_mode=True) + assert "Legend:" not in watch_display + assert "Insights:" not in watch_display + assert "Checkpoint ฮ”" in watch_display # Core data should still be present + + # But should still contain the core data + assert "Most Active Shards" in watch_display + assert "schema1.table1" in watch_display + assert "500" in watch_display # checkpoint delta diff --git a/tests/admin/test_distribution_analyzer.py b/tests/admin/test_distribution_analyzer.py new file mode 100644 index 00000000..92b4f580 --- /dev/null +++ b/tests/admin/test_distribution_analyzer.py @@ -0,0 +1,338 @@ +""" +Tests for distribution analyzer functionality +""" + +from unittest.mock import Mock, patch + +from cratedb_toolkit.admin.xmover.analysis.table import DistributionAnalyzer, DistributionAnomaly, TableDistribution +from cratedb_toolkit.admin.xmover.model import NodeInfo +from cratedb_toolkit.admin.xmover.util.database import CrateDBClient + + +class TestDistributionAnalyzer: + def setup_method(self): + """Set up test fixtures""" + self.mock_client = Mock(spec=CrateDBClient) + self.analyzer = DistributionAnalyzer(self.mock_client) + + def test_coefficient_of_variation_calculation(self): + """Test CV calculation with different scenarios""" + + # Normal case + values = [10, 12, 8, 14, 6] + cv = self.analyzer.calculate_coefficient_of_variation(values) + assert cv > 0 + + # All equal values (should return 0) + equal_values = [10, 10, 10, 10] + cv_equal = self.analyzer.calculate_coefficient_of_variation(equal_values) + assert cv_equal == 0.0 + + # Empty list + empty_values = [] + cv_empty = self.analyzer.calculate_coefficient_of_variation(empty_values) + assert cv_empty == 0.0 + + # Single value + single_value = [10] + cv_single = self.analyzer.calculate_coefficient_of_variation(single_value) + assert cv_single == 0.0 + + def test_get_largest_tables_distribution(self): + """Test fetching table distribution data""" + + # Mock query results + mock_results = { + "rows": [ + # schema, table, node, primary_shards, replica_shards, total_shards, total_size, primary_size, replica_size, docs # noqa: E501, ERA001 + ["doc", "large_table", "node1", 5, 2, 7, 100.5, 80.2, 20.3, 1000000], + ["doc", "large_table", "node2", 4, 3, 7, 95.1, 75.8, 19.3, 950000], + ["doc", "large_table", "node3", 6, 1, 7, 110.2, 85.9, 24.3, 1100000], + ["custom", "another_table", "node1", 3, 2, 5, 50.1, 40.2, 9.9, 500000], + ["custom", "another_table", "node2", 2, 3, 5, 45.8, 35.1, 10.7, 480000], + ] + } + + self.mock_client.execute_query.return_value = mock_results + + distributions = self.analyzer.get_largest_tables_distribution(top_n=10) + + # Verify query was called with correct parameters + self.mock_client.execute_query.assert_called_once() + call_args = self.mock_client.execute_query.call_args + assert call_args[0][1] == [10] # top_n parameter + + # Verify we got the expected number of tables + assert len(distributions) == 2 + + # Verify table data structure + large_table = next(d for d in distributions if d.table_name == "large_table") + assert large_table.schema_name == "doc" + assert large_table.full_table_name == "large_table" # Should omit 'doc' schema + assert len(large_table.node_distributions) == 3 + + another_table = next(d for d in distributions if d.table_name == "another_table") + assert another_table.schema_name == "custom" + assert another_table.full_table_name == "custom.another_table" + assert len(another_table.node_distributions) == 2 + + # Verify sorting by primary size (descending) + assert distributions[0].total_primary_size_gb >= distributions[1].total_primary_size_gb + + def test_detect_shard_count_imbalance(self): + """Test shard count imbalance detection""" + + # Create test table with imbalanced shard distribution + imbalanced_table = TableDistribution( + schema_name="doc", + table_name="imbalanced_table", + total_primary_size_gb=500.0, + node_distributions={ + "node1": {"total_shards": 10, "primary_shards": 5, "replica_shards": 5}, + "node2": {"total_shards": 15, "primary_shards": 8, "replica_shards": 7}, + "node3": {"total_shards": 5, "primary_shards": 2, "replica_shards": 3}, + }, + ) + + anomaly = self.analyzer.detect_shard_count_imbalance(imbalanced_table) + + assert anomaly is not None + assert anomaly.anomaly_type == "Shard Count Imbalance" + assert anomaly.combined_score > 0 + assert len(anomaly.recommendations) > 0 + + # Create balanced table (should not detect anomaly) + balanced_table = TableDistribution( + schema_name="doc", + table_name="balanced_table", + total_primary_size_gb=100.0, + node_distributions={ + "node1": {"total_shards": 8, "primary_shards": 4, "replica_shards": 4}, + "node2": {"total_shards": 8, "primary_shards": 4, "replica_shards": 4}, + "node3": {"total_shards": 8, "primary_shards": 4, "replica_shards": 4}, + }, + ) + + no_anomaly = self.analyzer.detect_shard_count_imbalance(balanced_table) + assert no_anomaly is None + + def test_detect_storage_imbalance(self): + """Test storage imbalance detection""" + + # Create test table with storage imbalance + storage_imbalanced_table = TableDistribution( + schema_name="doc", + table_name="storage_imbalanced", + total_primary_size_gb=300.0, + node_distributions={ + "node1": {"total_size_gb": 150.0, "primary_size_gb": 100.0, "replica_size_gb": 50.0}, + "node2": {"total_size_gb": 50.0, "primary_size_gb": 30.0, "replica_size_gb": 20.0}, + "node3": {"total_size_gb": 100.0, "primary_size_gb": 70.0, "replica_size_gb": 30.0}, + }, + ) + + anomaly = self.analyzer.detect_storage_imbalance(storage_imbalanced_table) + + assert anomaly is not None + assert anomaly.anomaly_type == "Storage Imbalance" + assert anomaly.combined_score > 0 + + # Small table (should be ignored) + small_table = TableDistribution( + schema_name="doc", + table_name="small_table", + total_primary_size_gb=0.1, + node_distributions={ + "node1": {"total_size_gb": 0.5, "primary_size_gb": 0.05, "replica_size_gb": 0.05}, + "node2": {"total_size_gb": 0.1, "primary_size_gb": 0.03, "replica_size_gb": 0.02}, + }, + ) + + no_anomaly = self.analyzer.detect_storage_imbalance(small_table) + assert no_anomaly is None + + def test_detect_node_coverage_issues(self): + """Test node coverage issue detection""" + + # Mock nodes_info to simulate cluster with 4 nodes + mock_nodes = [ + NodeInfo( + id="node1", + name="node1", + zone=None, + heap_used=None, + heap_max=None, + fs_total=None, + fs_used=None, + fs_available=None, + ), + NodeInfo( + id="node2", + name="node2", + zone=None, + heap_used=None, + heap_max=None, + fs_total=None, + fs_used=None, + fs_available=None, + ), + NodeInfo( + id="node3", + name="node3", + zone=None, + heap_used=None, + heap_max=None, + fs_total=None, + fs_used=None, + fs_available=None, + ), + NodeInfo( + id="node4", + name="node4", + zone=None, + heap_used=None, + heap_max=None, + fs_total=None, + fs_used=None, + fs_available=None, + ), + ] + self.mock_client.get_nodes_info.return_value = mock_nodes + + # Table with limited coverage (only on 2 out of 4 nodes) + limited_coverage_table = TableDistribution( + schema_name="doc", + table_name="limited_coverage", + total_primary_size_gb=100.0, # Significant size + node_distributions={ + "node1": {"total_shards": 10, "primary_shards": 5, "replica_shards": 5}, + "node2": {"total_shards": 10, "primary_shards": 5, "replica_shards": 5}, + # node3 and node4 missing + }, + ) + + anomaly = self.analyzer.detect_node_coverage_issues(limited_coverage_table) + + assert anomaly is not None + assert anomaly.anomaly_type == "Node Coverage Issue" + assert "node3" in anomaly.details["nodes_without_shards"] + assert "node4" in anomaly.details["nodes_without_shards"] + assert len(anomaly.recommendations) > 0 + + def test_detect_document_imbalance(self): + """Test document imbalance detection""" + + # Table with document imbalance + doc_imbalanced_table = TableDistribution( + schema_name="doc", + table_name="doc_imbalanced", + total_primary_size_gb=200.0, + node_distributions={ + "node1": {"total_documents": 1000000}, # 1M docs + "node2": {"total_documents": 500000}, # 500K docs + "node3": {"total_documents": 100000}, # 100K docs (5x imbalance) + }, + ) + + anomaly = self.analyzer.detect_document_imbalance(doc_imbalanced_table) + + assert anomaly is not None + assert anomaly.anomaly_type == "Document Imbalance" + assert "data skew" in anomaly.recommendations[0].lower() + + # Table with very few documents (should be ignored) + low_doc_table = TableDistribution( + schema_name="doc", + table_name="low_docs", + total_primary_size_gb=100.0, + node_distributions={ + "node1": {"total_documents": 1000}, + "node2": {"total_documents": 500}, + }, + ) + + no_anomaly = self.analyzer.detect_document_imbalance(low_doc_table) + assert no_anomaly is None + + def test_analyze_distribution_integration(self): + """Test the full analysis workflow""" + + # Mock the get_largest_tables_distribution method + mock_table = TableDistribution( + schema_name="doc", + table_name="test_table", + total_primary_size_gb=500.0, + node_distributions={ + "node1": { + "total_shards": 15, + "primary_shards": 8, + "replica_shards": 7, + "total_size_gb": 200.0, + "primary_size_gb": 120.0, + "replica_size_gb": 80.0, + "total_documents": 2000000, + }, + "node2": { + "total_shards": 8, + "primary_shards": 4, + "replica_shards": 4, + "total_size_gb": 100.0, + "primary_size_gb": 60.0, + "replica_size_gb": 40.0, + "total_documents": 1000000, + }, + "node3": { + "total_shards": 5, + "primary_shards": 3, + "replica_shards": 2, + "total_size_gb": 50.0, + "primary_size_gb": 30.0, + "replica_size_gb": 20.0, + "total_documents": 500000, + }, + }, + ) + + with patch.object(self.analyzer, "get_largest_tables_distribution", return_value=[mock_table]): + anomalies, tables_analyzed = self.analyzer.analyze_distribution(top_tables=10) + + # Should detect multiple types of anomalies + assert len(anomalies) > 0 + assert tables_analyzed == 1 # We provided 1 mock table + + # Anomalies should be sorted by combined score (descending) + if len(anomalies) > 1: + for i in range(len(anomalies) - 1): + assert anomalies[i].combined_score >= anomalies[i + 1].combined_score + + # Each anomaly should have required fields + for anomaly in anomalies: + assert anomaly.table is not None + assert anomaly.anomaly_type is not None + assert anomaly.combined_score >= 0 + assert isinstance(anomaly.recommendations, list) + + def test_format_distribution_report_no_anomalies(self): + """Test report formatting when no anomalies found""" + + # This should not raise an exception + with patch("builtins.print"): # Mock print to avoid console output during tests + self.analyzer.format_distribution_report([], 5) + + def test_format_distribution_report_with_anomalies(self): + """Test report formatting with anomalies""" + + mock_anomaly = DistributionAnomaly( + table=TableDistribution("doc", "test_table", 100.0, {}), + anomaly_type="Test Anomaly", + severity_score=7.5, + impact_score=8.0, + combined_score=60.0, + description="Test description", + details={}, + recommendations=["Test recommendation"], + ) + + # This should not raise an exception + with patch("builtins.print"): # Mock print to avoid console output during tests + self.analyzer.format_distribution_report([mock_anomaly], 3) diff --git a/tests/admin/test_recovery_monitor.py b/tests/admin/test_recovery_monitor.py new file mode 100644 index 00000000..6041baeb --- /dev/null +++ b/tests/admin/test_recovery_monitor.py @@ -0,0 +1,298 @@ +""" +Test script for XMover recovery monitoring functionality + +This script tests the recovery monitoring features by creating mock recovery scenarios +and verifying the output formatting and data parsing. +""" + +import sys +from typing import Any, Dict +from unittest.mock import Mock + +from cratedb_toolkit.admin.xmover.model import RecoveryInfo +from cratedb_toolkit.admin.xmover.operational.monitor import RecoveryMonitor, RecoveryOptions +from cratedb_toolkit.admin.xmover.util.database import CrateDBClient +from cratedb_toolkit.model import DatabaseAddress + + +def create_mock_allocation( + schema_name: str, table_name: str, shard_id: int, current_state: str, node_id: str +) -> Dict[str, Any]: + """Create a mock allocation response""" + return { + "schema_name": schema_name, + "table_name": table_name, + "shard_id": shard_id, + "current_state": current_state, + "node_id": node_id, + "explanation": None, + } + + +def create_mock_shard_detail( + schema_name: str, + table_name: str, + shard_id: int, + node_name: str, + node_id: str, + recovery_type: str, + stage: str, + files_percent: float, + bytes_percent: float, + total_time: int, + size: int, + is_primary: bool, +) -> Dict[str, Any]: + """Create a mock shard detail response""" + return { + "schema_name": schema_name, + "table_name": table_name, + "shard_id": shard_id, + "node_name": node_name, + "node_id": node_id, + "routing_state": "RELOCATING", + "state": "RECOVERING", + "recovery": { + "type": recovery_type, + "stage": stage, + "files": {"percent": files_percent}, + "size": {"percent": bytes_percent}, + "total_time": total_time, + }, + "size": size, + "primary": is_primary, + } + + +def test_recovery_info_parsing(): + """Test RecoveryInfo dataclass and its properties""" + print("Testing RecoveryInfo parsing...") + + recovery = RecoveryInfo( + schema_name="CURVO", + table_name="PartioffD", + shard_id=19, + node_name="data-hot-1", + node_id="ZH6fBanGSjanGqeSh-sw0A", + recovery_type="PEER", + stage="DONE", + files_percent=100.0, + bytes_percent=100.0, + total_time_ms=1555907, + routing_state="RELOCATING", + current_state="RELOCATING", + is_primary=False, + size_bytes=56565284209, + ) + + # Test properties + assert recovery.overall_progress == 100.0, f"Expected 100.0, got {recovery.overall_progress}" + assert abs(recovery.size_gb - 52.681) < 0.01, f"Expected ~52.681, got {recovery.size_gb:.3f}" + assert recovery.shard_type == "REPLICA", f"Expected REPLICA, got {recovery.shard_type}" + assert recovery.total_time_seconds == 1555.907, f"Expected 1555.907, got {recovery.total_time_seconds}" + + print("โœ… RecoveryInfo parsing tests passed") + + +def test_database_client_parsing(cratedb): + """Test database client recovery parsing logic""" + print("Testing database client recovery parsing...") + + # Create a real client instance to test the parsing method + client = CrateDBClient.__new__(CrateDBClient) # Create without calling __init__ + client.username = None + client.password = None + client.connection_string = DatabaseAddress.from_string(cratedb.database.dburi).httpuri + client.ssl_verify = False + + # Create test data + allocation = create_mock_allocation("CURVO", "PartioffD", 19, "RELOCATING", "node1") + shard_detail = create_mock_shard_detail( + "CURVO", "PartioffD", 19, "data-hot-1", "node1", "PEER", "DONE", 100.0, 100.0, 1555907, 56565284209, False + ) + + # Test the parsing method directly + recovery_info = client._parse_recovery_info(allocation, shard_detail) + + assert recovery_info.recovery_type == "PEER" + assert recovery_info.stage == "DONE" + assert recovery_info.overall_progress == 0.0 + + print("โœ… Database client parsing tests passed") + + +def test_recovery_monitor_formatting(): + """Test recovery monitor display formatting""" + print("Testing recovery monitor formatting...") + + # Create mock client + mock_client = Mock(spec=CrateDBClient) + monitor = RecoveryMonitor(mock_client) + + # Create test recovery data + recoveries = [ + RecoveryInfo( + schema_name="CURVO", + table_name="PartioffD", + shard_id=19, + node_name="data-hot-1", + node_id="node1", + recovery_type="PEER", + stage="DONE", + files_percent=100.0, + bytes_percent=100.0, + total_time_ms=1555907, + routing_state="RELOCATING", + current_state="RELOCATING", + is_primary=False, + size_bytes=56565284209, + ), + RecoveryInfo( + schema_name="CURVO", + table_name="orderTracking", + shard_id=7, + node_name="data-hot-2", + node_id="node2", + recovery_type="DISK", + stage="INDEX", + files_percent=75.5, + bytes_percent=67.8, + total_time_ms=890234, + routing_state="INITIALIZING", + current_state="INITIALIZING", + is_primary=True, + size_bytes=25120456789, + ), + ] + + # Test summary generation + summary = monitor.get_recovery_summary(recoveries) + + assert summary["total_recoveries"] == 2 + assert "PEER" in summary["by_type"] + assert "DISK" in summary["by_type"] + assert summary["by_type"]["PEER"]["count"] == 1 + assert summary["by_type"]["DISK"]["count"] == 1 + + # Test display formatting + display_output = monitor.format_recovery_display(recoveries) + + assert "Active Shard Recoveries (2 total)" in display_output + assert "PEER Recoveries (1)" in display_output + assert "DISK Recoveries (1)" in display_output + assert "PartioffD" in display_output + assert "orderTracking" in display_output + + print("โœ… Recovery monitor formatting tests passed") + + +def test_empty_recovery_handling(): + """Test handling of no active recoveries""" + print("Testing empty recovery handling...") + + mock_client = Mock(spec=CrateDBClient) + monitor = RecoveryMonitor(mock_client) + + # Test empty list + empty_recoveries = [] + + summary = monitor.get_recovery_summary(empty_recoveries) + assert summary["total_recoveries"] == 0 + assert summary["by_type"] == {} + + display_output = monitor.format_recovery_display(empty_recoveries) + assert "No active shard recoveries found" in display_output + + print("โœ… Empty recovery handling tests passed") + + +def test_recovery_type_filtering(): + """Test filtering by recovery type""" + print("Testing recovery type filtering...") + + mock_client = Mock(spec=CrateDBClient) + + # Mock the get_all_recovering_shards method + mock_recoveries = [ + RecoveryInfo( + schema_name="test", + table_name="table1", + shard_id=1, + node_name="node1", + node_id="n1", + recovery_type="PEER", + stage="DONE", + files_percent=100.0, + bytes_percent=100.0, + total_time_ms=1000, + routing_state="RELOCATING", + current_state="RELOCATING", + is_primary=True, + size_bytes=1000000, + ), + RecoveryInfo( + schema_name="test", + table_name="table2", + shard_id=2, + node_name="node2", + node_id="n2", + recovery_type="DISK", + stage="INDEX", + files_percent=50.0, + bytes_percent=45.0, + total_time_ms=2000, + routing_state="INITIALIZING", + current_state="INITIALIZING", + is_primary=False, + size_bytes=2000000, + ), + ] + + mock_client.get_all_recovering_shards.return_value = mock_recoveries + + # Test filtering + monitor = RecoveryMonitor(mock_client, options=RecoveryOptions(recovery_type="PEER")) + peer_only = monitor.get_cluster_recovery_status() + assert len(peer_only) == 1 + assert peer_only[0].recovery_type == "PEER" + + monitor = RecoveryMonitor(mock_client, options=RecoveryOptions(recovery_type="DISK")) + disk_only = monitor.get_cluster_recovery_status() + assert len(disk_only) == 1 + assert disk_only[0].recovery_type == "DISK" + + monitor = RecoveryMonitor(mock_client, options=RecoveryOptions(recovery_type="all")) + all_recoveries = monitor.get_cluster_recovery_status() + assert len(all_recoveries) == 2 + + print("โœ… Recovery type filtering tests passed") + + +def main(): + """Run all tests""" + print("๐Ÿงช Running XMover Recovery Monitor Tests") + print("=" * 50) + + try: + test_recovery_info_parsing() + test_database_client_parsing() + test_recovery_monitor_formatting() + test_empty_recovery_handling() + test_recovery_type_filtering() + + print("\n๐ŸŽ‰ All tests passed successfully!") + print("\n๐Ÿ“‹ Test Summary:") + print(" โœ… RecoveryInfo data class and properties") + print(" โœ… Database client parsing logic") + print(" โœ… Recovery monitor display formatting") + print(" โœ… Empty recovery state handling") + print(" โœ… Recovery type filtering") + + print("\n๐Ÿš€ Recovery monitoring feature is ready for use!") + + except Exception as e: + print(f"\nโŒ Test failed: {e}") + import traceback + + traceback.print_exc() + sys.exit(1)