@@ -842,22 +842,56 @@ def __init__(self, analyzer: ShardAnalyzer):
842
842
self .analyzer = analyzer
843
843
self .shard_time_deltas : dict [str , int ] = {}
844
844
self .shard_seq_deltas : dict [str , int ] = {}
845
+ self .shards : dict [str , ShardInfo ] = {}
846
+
847
+ def _get_shard_compound_id (self , shard : ShardInfo ) -> str :
848
+ return f"{ shard .node_id } -{ shard .shard_id } "
849
+
850
+ def refresh_deltas (self , refresh_data : bool = True ):
851
+ if refresh_data :
852
+ self .analyzer ._refresh_data ()
845
853
846
- def refresh_deltas (self ):
847
- self .analyzer ._refresh_data ()
848
854
shards : list [ShardInfo ] = self .analyzer .shards
849
- shard_deltas : dict [str , int ] = {}
850
- shard_seq : dict [str , int ] = {}
855
+
856
+ refreshed_shards : list [str ] = []
857
+ self .shards = {}
851
858
for shard in shards :
859
+ shard_compound_id = self ._get_shard_compound_id (shard )
860
+ refreshed_shards .append (shard_compound_id )
861
+ self .shards [shard_compound_id ] = shard
862
+
852
863
hot_timestamp = datetime .datetime .fromtimestamp (float (shard .hot_timestamp ) / 1000 )
853
864
hot_delta = int ((datetime .datetime .now () - hot_timestamp ).total_seconds ()) if shard .hot_timestamp else None
854
- shard_compound_id = f"{ shard .node_id } -{ shard .shard_id } "
855
- shard_deltas [shard_compound_id ] = hot_delta
856
- shard_seq [shard_compound_id ] = shard .seq_stats_max_seq_no
865
+
866
+ self .shard_time_deltas [shard_compound_id ] = hot_delta
867
+ if not shard_compound_id in self .shard_seq_deltas :
868
+ self .shard_seq_deltas [shard_compound_id ] = shard .seq_stats_max_seq_no
869
+ else :
870
+ self .shard_seq_deltas [shard_compound_id ] = shard .seq_stats_max_seq_no - self .shard_seq_deltas [shard_compound_id ]
871
+
872
+ # Forget shards that are not reported
873
+ to_delete : set [str ] = set (self .shard_seq_deltas .keys ()).difference (refreshed_shards )
874
+ for id in to_delete :
875
+ del (self .shard_seq_deltas [id ])
876
+ del (self .shard_time_deltas [id ])
877
+
878
+ def filter_shards (self , max_seconds_old : int ) -> list [ShardInfo ]:
879
+ """
880
+ Filter shards not touched in the last `max_seconds_old` seconds
881
+ """
882
+ return [self .shards .get (k ) for k , v in self .shard_time_deltas .items () if v <= max_seconds_old ]
857
883
858
884
def monitor_shards (self ):
885
+ max_seconds_old = 300
886
+
887
+ self .refresh_deltas (refresh_data = False )
888
+ sleep (10 )
889
+ self .refresh_deltas ()
890
+
891
+ shards : list [ShardInfo ] = self .filter_shards (max_seconds_old )
892
+ sorted_shards : list [ShardInfo ] = sorted (shards , key = lambda s : self .shard_seq_deltas [self ._get_shard_compound_id (s )])
893
+
859
894
console .print (Panel .fit ("[bold blue]CrateDB Hot Shard Monitor[/bold blue]" ))
860
- shards : list [ShardInfo ] = self .analyzer .shards
861
895
862
896
# Cluster summary table
863
897
shards_table = Table (title = "Hot shards" , box = box .ROUNDED )
@@ -867,41 +901,27 @@ def monitor_shards(self):
867
901
shards_table .add_column ("Node" , style = "cyan" )
868
902
shards_table .add_column ("Primary" , style = "cyan" )
869
903
shards_table .add_column ("Size" , style = "magenta" )
870
- shards_table .add_column ("Max Seq_no" , style = "magenta" )
871
- # shards_table.add_column("Global Checkpoint", style="magenta")
872
- # shards_table.add_column("Local Checkpoint", style="magenta")
873
- # shards_table.add_column("Hot Timestamp", style="magenta")
874
- shards_table .add_column ("Hot Delta" , style = "magenta" )
875
-
876
- shard_deltas : dict [str , int ] = {}
877
- shard_seq : dict [str , int ] = {}
878
- for shard in shards :
879
- hot_timestamp = datetime .datetime .fromtimestamp (float (shard .hot_timestamp ) / 1000 )
880
- hot_delta = int ((datetime .datetime .now () - hot_timestamp ).total_seconds ()) if shard .hot_timestamp else None
881
- shard_compound_id = f"{ shard .node_id } -{ shard .shard_id } "
882
- shard_deltas [shard_compound_id ] = hot_delta
883
- shard_seq [shard_compound_id ] = shard .seq_stats_max_seq_no
884
-
885
- sorted_shards : list [ShardInfo ] = sorted (shards , key = lambda s : shard_deltas [f"{ s .node_id } -{ s .shard_id } " ])
904
+ shards_table .add_column ("Seq Delta" , style = "magenta" )
905
+ shards_table .add_column ("Seconds ago" , style = "magenta" )
886
906
887
907
for shard in sorted_shards :
888
- hot_timestamp = datetime .datetime .fromtimestamp (float (shard .hot_timestamp ) / 1000 )
889
- hot_delta = int ((datetime .datetime .now () - hot_timestamp ).total_seconds ()) if shard .hot_timestamp else None
908
+ shard_compound_id = self ._get_shard_compound_id (shard )
890
909
shards_table .add_row (
891
910
shard .schema_name ,
892
911
shard .table_name ,
893
912
str (shard .shard_id ),
894
913
shard .node_name ,
895
914
str (shard .is_primary ),
896
915
format_size (shard .size_gb ),
897
- str (shard .seq_stats_max_seq_no ),
898
- # str(shard.seq_stats_global_checkpoint),
899
- # str(shard.seq_stats_local_checkpoint),
900
- # str(hot_timestamp) if shard.hot_timestamp else "-",
901
- f"{ hot_delta } seconds ago" if hot_delta else "-" ,
916
+ str (self .shard_seq_deltas [shard_compound_id ]),
917
+ str (self .shard_time_deltas [shard_compound_id ]),
902
918
)
903
919
console .print (shards_table )
904
920
921
+ console .print (Panel .fit ("[bold blue]CrateDB Heat per Node[/bold blue]" ))
922
+
923
+
924
+
905
925
906
926
class ShardReporter :
907
927
def __init__ (self , analyzer : ShardAnalyzer ):
0 commit comments