@@ -355,6 +355,7 @@ impl Stream {
355
355
// if yes, then merge them and save
356
356
357
357
if let Some ( mut schema) = schema {
358
+ // calculate field stats for all user defined streams
358
359
if self . get_stream_type ( ) != StreamType :: Internal {
359
360
if let Err ( err) = self . calculate_field_stats ( rbs, schema. clone ( ) . into ( ) ) . await {
360
361
warn ! (
@@ -519,6 +520,7 @@ impl Stream {
519
520
let mut writer = ArrowWriter :: try_new ( & mut part_file, schema. clone ( ) , Some ( props) ) ?;
520
521
for ref record in record_reader. merged_iter ( schema, time_partition. cloned ( ) ) {
521
522
writer. write ( record) ?;
523
+ // Collect record batches for finding statistics later
522
524
record_batches. push ( record. clone ( ) ) ;
523
525
}
524
526
writer. close ( ) ?;
@@ -780,6 +782,9 @@ impl Stream {
780
782
Ok ( ( ) )
781
783
}
782
784
785
+ /// Calculates field statistics for the stream and pushes them to the internal stats dataset.
786
+ /// This function creates a new internal stream for stats if it doesn't exist.
787
+ /// It collects statistics for each field in the stream
783
788
async fn calculate_field_stats (
784
789
& self ,
785
790
record_batches : Vec < RecordBatch > ,
@@ -821,6 +826,9 @@ impl Stream {
821
826
Ok ( ( ) )
822
827
}
823
828
829
+ /// Collects statistics for all fields in the stream.
830
+ /// Returns a vector of `FieldStat` for each field with non-zero count.
831
+ /// Uses `buffer_unordered` to run up to `MAX_CONCURRENT_FIELD_STATS` queries concurrently.
824
832
async fn collect_all_field_stats (
825
833
& self ,
826
834
ctx : & SessionContext ,
@@ -841,10 +849,13 @@ impl Stream {
841
849
842
850
futures:: stream:: iter ( field_futures)
843
851
. buffer_unordered ( MAX_CONCURRENT_FIELD_STATS )
844
- . filter_map ( |x| async { x } )
852
+ . filter_map ( std :: future :: ready )
845
853
. collect :: < Vec < _ > > ( )
846
854
. await
847
855
}
856
+
857
+ /// Calculates statistics for a single field in the stream.
858
+ /// Returns `None` if the count query returns 0.
848
859
async fn calculate_single_field_stats (
849
860
ctx : SessionContext ,
850
861
stream_name : String ,
@@ -879,6 +890,9 @@ impl Stream {
879
890
} )
880
891
}
881
892
893
+ /// Queries a single integer value from the DataFusion context.
894
+ /// Returns `None` if the query fails or returns no rows.
895
+ /// This is used for fetching record count for a field and distinct count.
882
896
async fn query_single_i64 ( ctx : & SessionContext , sql : & str ) -> Option < i64 > {
883
897
let df = ctx. sql ( sql) . await . ok ( ) ?;
884
898
let batches = df. collect ( ) . await . ok ( ) ?;
@@ -891,6 +905,8 @@ impl Stream {
891
905
Some ( array. value ( 0 ) )
892
906
}
893
907
908
+ /// Helper function to format an Arrow value at a given index into a string.
909
+ /// Handles null values and different data types like String, Int64, Float64, Timestamp, Date32, and Boolean.
894
910
fn format_arrow_value ( array : & dyn Array , idx : usize ) -> String {
895
911
if array. is_null ( idx) {
896
912
return "NULL" . to_string ( ) ;
@@ -921,6 +937,9 @@ impl Stream {
921
937
}
922
938
}
923
939
940
+ /// This function is used to fetch distinct values and their counts for a field in the stream.
941
+ /// Returns a vector of `DistinctStat` containing distinct values and their counts.
942
+ /// The query groups by the field and orders by the count in descending order, limiting the results to `PARSEABLE.options.max_field_statistics`.
924
943
async fn query_distinct_stats (
925
944
ctx : & SessionContext ,
926
945
stream_name : & str ,
0 commit comments