Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 194 additions & 14 deletions cratedb_toolkit/admin/xmover/analysis/shard.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from rich.table import Table

from cratedb_toolkit.admin.xmover.model import (
ActiveShardActivity,
ActiveShardSnapshot,
DistributionStats,
NodeInfo,
ShardInfo,
Expand All @@ -36,7 +38,7 @@ def __init__(self, client: CrateDBClient):
self.shards: List[ShardInfo] = []

# Initialize session-based caches for performance.
self._zone_conflict_cache: Dict[Tuple[str, int, str], Union[str, None]] = {}
self._zone_conflict_cache: Dict[Tuple[str, str, int, str], Union[str, None]] = {}
self._node_lookup_cache: Dict[str, Union[NodeInfo, None]] = {}
self._target_nodes_cache: Dict[Tuple[float, frozenset[Any], float, float], List[NodeInfo]] = {}
self._cache_hits = 0
Expand Down Expand Up @@ -181,8 +183,6 @@ def find_nodes_with_capacity(
free_space_gb = node.available_space_gb
if free_space_gb >= (required_space_gb + min_free_space_gb):
available_nodes.append(node)
else:
continue

# Sort by available space (most space first) - prioritize nodes with more free space
available_nodes.sort(key=lambda n: n.available_space_gb, reverse=True)
Expand All @@ -204,7 +204,7 @@ def generate_rebalancing_recommendations(
# Get moveable shards (only healthy ones for actual operations)
moveable_shards = self.find_moveable_shards(constraints.min_size, constraints.max_size, constraints.table_name)

print(
logger.info(
f"Analyzing {len(moveable_shards)} candidate shards "
f"in size range {constraints.min_size}-{constraints.max_size}GB..."
)
Expand Down Expand Up @@ -237,12 +237,11 @@ def generate_rebalancing_recommendations(
# Optimize processing: if filtering by source node, only process those shards
if constraints.source_node:
processing_shards = [s for s in moveable_shards if s.node_name == constraints.source_node]
print(f"Focusing on {len(processing_shards)} shards from node {constraints.source_node}")
logger.info(f"Focusing on {len(processing_shards)} shards from node {constraints.source_node}")
else:
processing_shards = moveable_shards

# Generate move recommendations
safe_recommendations = 0 # noqa: F841
total_evaluated = 0

for i, shard in enumerate(processing_shards):
Expand Down Expand Up @@ -366,12 +365,12 @@ def generate_rebalancing_recommendations(

if len(processing_shards) > 100:
print() # New line after progress dots
print(f"Generated {len(recommendations)} move recommendations (evaluated {total_evaluated} shards)")
print(f"Performance: {self.get_cache_stats()}")
logger.info(f"Generated {len(recommendations)} move recommendations (evaluated {total_evaluated} shards)")
logger.info(f"Performance: {self.get_cache_stats()}")
return recommendations

def validate_move_safety(
self, recommendation: ShardRelocationResponse, max_disk_usage_percent: float = 90.0
self, recommendation: ShardRelocationResponse, max_disk_usage_percent: float = 90.0, buffer_gb: float = 50.0
) -> Tuple[bool, str]:
"""Validate that a move recommendation is safe to execute"""
# Find target node (with caching)
Expand All @@ -386,7 +385,7 @@ def validate_move_safety(
return False, zone_conflict

# Check available space
required_space_gb = recommendation.size_gb + 50 # 50GB buffer
required_space_gb = recommendation.size_gb + buffer_gb
if target_node.available_space_gb < required_space_gb:
return (
False,
Expand Down Expand Up @@ -421,7 +420,7 @@ def _check_zone_conflict_cached(self, recommendation: ShardRelocationResponse) -
"""Check zone conflicts with caching"""
# Create cache key: table, shard, target zone
target_zone = self._get_node_zone(recommendation.to_node)
cache_key = (recommendation.table_name, recommendation.shard_id, target_zone)
cache_key = (recommendation.schema_name, recommendation.table_name, recommendation.shard_id, target_zone)

if cache_key in self._zone_conflict_cache:
self._cache_hits += 1
Expand Down Expand Up @@ -811,11 +810,14 @@ def plan_node_decommission(self, node_name: str, min_free_space_gb: float = 100.
# Determine feasibility
feasible = len(infeasible_moves) == 0

# Safety margin for cluster capacity after decommission
capacity_safety_margin = 1.2 # 20 % buffer

# Add capacity warnings
if feasible:
# Check if remaining cluster capacity is sufficient after decommission
# Check if the remaining cluster capacity is sufficient after decommission
remaining_capacity = sum(n.available_space_gb for n in self.nodes if n.name != node_name)
if remaining_capacity < total_size_gb * 1.2: # 20% safety margin
if remaining_capacity < total_size_gb * capacity_safety_margin:
warnings.append(
f"Low remaining capacity after decommission. "
f"Only {remaining_capacity:.1f}GB available for {total_size_gb:.1f}GB of data"
Expand All @@ -831,7 +833,7 @@ def plan_node_decommission(self, node_name: str, min_free_space_gb: float = 100.
"recommendations": move_plan,
"infeasible_moves": infeasible_moves,
"warnings": warnings,
"estimated_time_hours": len(move_plan) * 0.1, # Rough estimate: 6 minutes per move
"estimated_time_hours": len(move_plan) * 0.1, # Rough estimate: 0.1 hours (6 minutes) per move
"message": "Decommission plan generated" if feasible else "Decommission not currently feasible",
}

Expand Down Expand Up @@ -947,3 +949,181 @@ def distribution(self, table: str = None):
table_summary.add_row("Node Balance Score", f"{stats.node_balance_score:.1f}/100")

console.print(table_summary)


class ActiveShardMonitor:
"""Monitor active shard checkpoint progression over time"""

def __init__(self, client: CrateDBClient):
self.client = client

def compare_snapshots(
self,
snapshot1: List[ActiveShardSnapshot],
snapshot2: List[ActiveShardSnapshot],
min_activity_threshold: int = 0,
) -> List["ActiveShardActivity"]:
"""Compare two snapshots and return activity data for shards present in both

Args:
snapshot1: First snapshot (baseline)
snapshot2: Second snapshot (comparison)
min_activity_threshold: Minimum checkpoint delta to consider active (default: 0)
"""

# Create lookup dict for snapshot1
snapshot1_dict = {snap.shard_identifier: snap for snap in snapshot1}

activities = []

for snap2 in snapshot2:
snap1 = snapshot1_dict.get(snap2.shard_identifier)
if snap1:
# Calculate local checkpoint delta
local_checkpoint_delta = snap2.local_checkpoint - snap1.local_checkpoint
time_diff = snap2.timestamp - snap1.timestamp

# Filter based on actual activity between snapshots
if local_checkpoint_delta >= min_activity_threshold:
activity = ActiveShardActivity(
schema_name=snap2.schema_name,
table_name=snap2.table_name,
shard_id=snap2.shard_id,
node_name=snap2.node_name,
is_primary=snap2.is_primary,
partition_ident=snap2.partition_ident,
local_checkpoint_delta=local_checkpoint_delta,
snapshot1=snap1,
snapshot2=snap2,
time_diff_seconds=time_diff,
)
activities.append(activity)

# Sort by activity (highest checkpoint delta first)
activities.sort(key=lambda x: x.local_checkpoint_delta, reverse=True)

return activities

def format_activity_display(
self, activities: List["ActiveShardActivity"], show_count: int = 10, watch_mode: bool = False
) -> str:
"""Format activity data for console display"""
if not activities:
return "✅ No active shards with significant checkpoint progression found"

# Limit to requested count
activities = activities[:show_count]

# Calculate observation period for context
if activities:
observation_period = activities[0].time_diff_seconds
output = [
f"\n🔥 Most Active Shards ({len(activities)} shown, {observation_period:.0f}s observation period)"
]
else:
output = [f"\n🔥 Most Active Shards ({len(activities)} shown, sorted by checkpoint activity)"]

output.append("")

# Add activity rate context
if activities:
total_activity = sum(a.local_checkpoint_delta for a in activities)
avg_rate = sum(a.activity_rate for a in activities) / len(activities)
output.append(
f"[dim]Total checkpoint activity: {total_activity:,} changes, Average rate: {avg_rate:.1f}/sec[/dim]"
)
output.append("")

# Create table headers
headers = ["Rank", "Schema.Table", "Shard", "Partition", "Node", "Type", "Checkpoint Δ", "Rate/sec", "Trend"]

# Calculate column widths
col_widths = [len(h) for h in headers]

# Prepare rows
rows = []
for i, activity in enumerate(activities, 1):
# Format values
rank = str(i)
table_id = activity.table_identifier
shard_id = str(activity.shard_id)
partition = (
activity.partition_ident[:14] + "..."
if len(activity.partition_ident) > 14
else activity.partition_ident or "-"
)
node = activity.node_name
shard_type = "P" if activity.is_primary else "R"
checkpoint_delta = f"{activity.local_checkpoint_delta:,}"
rate = f"{activity.activity_rate:.1f}" if activity.activity_rate >= 0.1 else "<0.1"

# Calculate activity trend indicator
if activity.activity_rate >= 100:
trend = "🔥 HOT"
elif activity.activity_rate >= 50:
trend = "📈 HIGH"
elif activity.activity_rate >= 10:
trend = "📊 MED"
else:
trend = "📉 LOW"

row = [rank, table_id, shard_id, partition, node, shard_type, checkpoint_delta, rate, trend]
rows.append(row)

# Update column widths
for j, cell in enumerate(row):
col_widths[j] = max(col_widths[j], len(cell))

# Format table
header_row = " " + " | ".join(h.ljust(w) for h, w in zip(headers, col_widths))
output.append(header_row)
output.append(" " + "-" * (len(header_row) - 3))

# Data rows
for row in rows:
data_row = " " + " | ".join(cell.ljust(w) for cell, w in zip(row, col_widths))
output.append(data_row)

# Only show legend and insights in non-watch mode
if not watch_mode:
output.append("")
output.append("Legend:")
output.append(" • Checkpoint Δ: Write operations during observation period")
output.append(" • Rate/sec: Checkpoint changes per second")
output.append(" • Partition: partition_ident (truncated if >14 chars, '-' if none)")
output.append(" • Type: P=Primary, R=Replica")
output.append(" • Trend: 🔥 HOT (≥100/s), 📈 HIGH (≥50/s), 📊 MED (≥10/s), 📉 LOW (<10/s)")

# Add insights about activity patterns
if activities:
output.append("")
output.append("Insights:")

# Count by trend
hot_count = len([a for a in activities if a.activity_rate >= 100])
high_count = len([a for a in activities if 50 <= a.activity_rate < 100])
med_count = len([a for a in activities if 10 <= a.activity_rate < 50])
low_count = len([a for a in activities if a.activity_rate < 10])

if hot_count > 0:
output.append(f" • {hot_count} HOT shards (≥100 changes/sec) - consider load balancing")
if high_count > 0:
output.append(f" • {high_count} HIGH activity shards - monitor capacity")
if med_count > 0:
output.append(f" • {med_count} MEDIUM activity shards - normal operation")
if low_count > 0:
output.append(f" • {low_count} LOW activity shards - occasional writes")

# Identify patterns
primary_activities = [a for a in activities if a.is_primary]
if len(primary_activities) == len(activities):
output.append(" • All active shards are PRIMARY - normal write pattern")
elif len(primary_activities) < len(activities) * 0.5:
output.append(" • Many REPLICA shards active - possible recovery/replication activity")

# Node concentration
nodes = {a.node_name for a in activities}
if len(nodes) <= 2:
output.append(f" • Activity concentrated on {len(nodes)} node(s) - consider redistribution")

return "\n".join(output)
17 changes: 8 additions & 9 deletions cratedb_toolkit/admin/xmover/analysis/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from rich.console import Console
from rich.table import Table

from cratedb_toolkit.admin.xmover.model import NodeInfo
from cratedb_toolkit.admin.xmover.util.database import CrateDBClient

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -97,6 +98,9 @@ def find_table_by_name(self, table_name: str) -> Optional[str]:

try:
choice = input("\nSelect table (enter number): ").strip()
if not choice:
rprint("[yellow]No selection made[/yellow]")
return None
idx = int(choice) - 1
if 0 <= idx < len(rows):
schema, table = rows[idx]
Expand Down Expand Up @@ -292,14 +296,9 @@ def format_table_health_report(self, table_dist: TableDistribution) -> None:
zone_distribution = {}
for node_name, node_data in table_dist.node_distributions.items():
# Try to get zone info for each node
node_info = next((n for n in all_nodes_info if n.name == node_name), None)
if (
node_info
and hasattr(node_info, "attributes")
and node_info.attributes
and "zone" in node_info.attributes
):
zone = node_info.attributes["zone"]
node_info: Optional[NodeInfo] = next((n for n in all_nodes_info if n.name == node_name), None)
if node_info and node_info.zone:
zone = node_info.zone
if zone not in zone_distribution:
zone_distribution[zone] = {"nodes": 0, "shards": 0, "size": 0}
zone_distribution[zone]["nodes"] += 1
Expand Down Expand Up @@ -576,7 +575,7 @@ def detect_node_coverage_issues(self, table: TableDistribution) -> Optional[Dist
nodes_with_shards = set(table.node_distributions.keys())
nodes_without_shards = all_nodes - nodes_with_shards

# Only flag as anomaly if we have missing nodes and the table is significant
# Only flag as an anomaly if we have missing nodes and the table is significant
if not nodes_without_shards or table.total_primary_size_gb < 10.0:
return None

Expand Down
3 changes: 2 additions & 1 deletion cratedb_toolkit/admin/xmover/analysis/zone.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ def distribution_conflicts(self, shard_details: bool = False, table: Optional[st
health_indicator = "✓" if shard_copy.routing_state == "STARTED" else "⚠"
console.print(
f" {health_indicator} {shard_copy.shard_type} "
f"on {shard_copy.node_name} ({shard_copy.zone}) - {shard_copy.routing_state}"
f"on {shard_copy.node_name} ({shard_copy.zone}) - "
f"{shard_copy.state}/{shard_copy.routing_state}"
)

console.print(analysis_table)
Expand Down
Loading
Loading