@@ -847,53 +847,52 @@ def __init__(self, analyzer: ShardAnalyzer):
847
847
self .seq_deltas : dict [str , int ]
848
848
self .size_deltas : dict [str , float ]
849
849
850
- def _get_shard_compound_id (self , shard : ShardInfo ) -> str :
851
- return f"{ shard .node_id } -{ shard .shard_id } "
852
-
853
- def calculate_heat_deltas (self , reference_shards : dict [str , ShardInfo ], updated_shards : list [ShardInfo ]):
854
- seq_result : dict [str , int ] = {}
855
- size_result : dict [str , float ] = {}
856
-
857
- for shard in updated_shards :
858
- shard_compound_id = self ._get_shard_compound_id (shard )
859
-
860
- if shard_compound_id not in reference_shards :
861
- seq_result [shard_compound_id ] = 0
862
- size_result [shard_compound_id ] = shard .size_gb
863
- else :
864
- refreshed_number = shard .seq_stats_max_seq_no
865
- reference = reference_shards [shard_compound_id ].seq_stats_max_seq_no
866
-
867
- if refreshed_number < reference :
868
- refreshed_number += 2 ** 63 - 1
850
+ self .table_filter : str | None = None
851
+ self .sort_by : str = 'heat'
869
852
870
- seq_result [shard_compound_id ] = refreshed_number - reference
871
- size_result [shard_compound_id ] = shard .size_gb - reference_shards [shard_compound_id ].size_gb
872
-
873
- self .seq_deltas = seq_result
874
- self .size_deltas = size_result
875
-
876
- def refresh_data (self ):
877
- self .analyzer ._refresh_data ()
878
- updated_shards : list [ShardInfo ] = self .analyzer .shards
879
- self .calculate_heat_deltas (self .reference_shards , updated_shards )
880
- self .latest_shards = sorted (updated_shards , key = lambda s : self .seq_deltas [self ._get_shard_compound_id (s )], reverse = True )
853
+ def monitor_shards (self , table_filter : str | None , interval_in_seconds : int = 5 , repeat : int = 10 , n_shards : int = 40 , sort_by : str = 'heat' ):
854
+ self .table_filter = table_filter
855
+ self .sort_by = sort_by
881
856
882
- def monitor_shards (self , interval_in_seconds : int = 10 , n_shards : int = 25 ):
883
857
self .reference_shards = {self ._get_shard_compound_id (shard ): shard for shard in self .analyzer .shards }
884
858
self .refresh_data ()
885
859
886
860
console .print (Panel .fit (f"[bold blue]The { n_shards } Hottest Shards[/bold blue]" ))
887
- shards_table = self .display_shards_table_header ()
888
861
889
- with Live (self .generate_table (self .latest_shards [:n_shards ], self .seq_deltas ), refresh_per_second = 4 , console = console ) as live :
862
+ go_live = False
863
+ if go_live :
864
+ with Live (self .generate_shards_table (self ._get_top_shards (self .latest_shards , n_shards ), self .seq_deltas ), refresh_per_second = 4 , console = console ) as live_shards :
865
+ while True :
866
+ sleep (interval_in_seconds )
867
+ self .refresh_data ()
868
+ live_shards .update (self .generate_shards_table (self ._get_top_shards (self .latest_shards , n_shards ), self .seq_deltas ))
869
+ else :
870
+ iterations = 0
890
871
while True :
891
872
sleep (interval_in_seconds )
892
873
self .refresh_data ()
893
- # self.display_shards_table_rows(shards_table, self.latest_shards, self.deltas)
894
- live .update (self .generate_table (self .latest_shards [:n_shards ], self .seq_deltas ))
874
+ shards_table = self .generate_shards_table (self ._get_top_shards (self .latest_shards , n_shards ), self .seq_deltas )
875
+ console .print (shards_table )
876
+ nodes_table = self .generate_nodes_table (self ._get_nodes_heat_info (self .reference_shards , self .seq_deltas ))
877
+ console .print (nodes_table )
878
+
879
+ iterations += 1
880
+ if 0 < repeat <= iterations :
881
+ break
882
+
883
+ def generate_nodes_table (self , heat_nodes_info : dict [str , int ]):
884
+ table = Table (title = "Shard heat by node" , box = box .ROUNDED )
885
+ table .add_column ("Node name" , style = "cyan" )
886
+ table .add_column ("Heat" , style = "magenta" )
895
887
896
- def generate_table (self , sorted_shards : list [ShardInfo ], deltas : dict [str , int ]):
888
+ sorted_items = sorted (heat_nodes_info .items (), key = lambda kv : (kv [1 ], kv [0 ]), reverse = True )
889
+
890
+ for k , v in sorted_items :
891
+ table .add_row (k , str (v ))
892
+
893
+ return table
894
+
895
+ def generate_shards_table (self , sorted_shards : list [ShardInfo ], deltas : dict [str , int ]):
897
896
t = self .display_shards_table_header ()
898
897
self .display_shards_table_rows (t , sorted_shards , deltas )
899
898
return t
@@ -909,6 +908,8 @@ def display_shards_table_header(self):
909
908
shards_table .add_column ("Size" , style = "magenta" )
910
909
shards_table .add_column ("Size Delta" , style = "magenta" )
911
910
shards_table .add_column ("Seq Delta" , style = "magenta" )
911
+ shards_table .add_column ("DEBUG original Seq no." , style = "magenta" )
912
+ shards_table .add_column ("DEBUG Seq no." , style = "magenta" )
912
913
return shards_table
913
914
914
915
def display_shards_table_rows (self , shards_table : Table , sorted_shards : list [ShardInfo ], deltas : dict [str , int ]):
@@ -927,9 +928,68 @@ def display_shards_table_rows(self, shards_table: Table, sorted_shards: list[Sha
927
928
format_size (shard .size_gb ),
928
929
format_size (self .size_deltas [shard_compound_id ]),
929
930
str (seq_delta ),
931
+ str (self .reference_shards [shard_compound_id ].seq_stats_max_seq_no ),
932
+ str (shard .seq_stats_max_seq_no )
930
933
)
931
934
console .print (shards_table )
932
935
936
+ def _get_shard_compound_id (self , shard : ShardInfo ) -> str :
937
+ if self .sort_by == 'node' :
938
+ return f"{ shard .node_name } -{ shard .table_name } -{ shard .shard_id } "
939
+ else :
940
+ return f"{ shard .table_name } -{ shard .shard_id } -{ shard .node_name } "
941
+
942
+ def calculate_heat_deltas (self , reference_shards : dict [str , ShardInfo ], updated_shards : list [ShardInfo ]):
943
+ seq_result : dict [str , int ] = {}
944
+ size_result : dict [str , float ] = {}
945
+
946
+ for shard in updated_shards :
947
+ shard_compound_id = self ._get_shard_compound_id (shard )
948
+
949
+ if shard_compound_id not in reference_shards :
950
+ seq_result [shard_compound_id ] = 0
951
+ size_result [shard_compound_id ] = 0
952
+ reference_shards [shard_compound_id ] = shard
953
+ else :
954
+ refreshed_number = shard .seq_stats_max_seq_no
955
+ reference = reference_shards [shard_compound_id ].seq_stats_max_seq_no
956
+
957
+ if refreshed_number < reference :
958
+ refreshed_number += 2 ** 63 - 1
959
+
960
+ seq_result [shard_compound_id ] = refreshed_number - reference
961
+ size_result [shard_compound_id ] = shard .size_gb - reference_shards [shard_compound_id ].size_gb
962
+
963
+ self .seq_deltas = seq_result
964
+ self .size_deltas = size_result
965
+
966
+ def refresh_data (self ):
967
+ self .analyzer ._refresh_data ()
968
+ updated_shards : list [ShardInfo ] = [s for s in self .analyzer .shards if not self .table_filter or self .table_filter == s .table_name ]
969
+ self .calculate_heat_deltas (self .reference_shards , updated_shards )
970
+ if self .sort_by == 'heat' :
971
+ self .latest_shards = sorted (updated_shards , key = lambda s : self .seq_deltas [self ._get_shard_compound_id (s )],
972
+ reverse = True )
973
+ else :
974
+ self .latest_shards = sorted (updated_shards , key = lambda s : self ._get_shard_compound_id (s ))
975
+
976
+
977
+ def _get_top_shards (self , sorted_shards : list [ShardInfo ], n_shards : int ) -> list [ShardInfo ]:
978
+ if n_shards < 1 :
979
+ return sorted_shards [:n_shards ]
980
+ else :
981
+ return sorted_shards
982
+
983
+ def _get_nodes_heat_info (self , shards : dict [str , ShardInfo ], seq_deltas : dict [str , int ]) -> dict [str , int ]:
984
+ nodes : dict [str , int ] = {}
985
+ for k , v in seq_deltas .items ():
986
+ node_name = shards .get (k ).node_name
987
+ if node_name not in nodes :
988
+ nodes [node_name ] = v
989
+ else :
990
+ nodes [node_name ] += v
991
+ return nodes
992
+
933
993
934
994
class ShardReporter :
935
995
def __init__ (self , analyzer : ShardAnalyzer ):
0 commit comments