Skip to content

Commit bb3a670

Browse files
optimised single query, stats conversion
1 parent 4fedec5 commit bb3a670

File tree

1 file changed

+93
-72
lines changed

1 file changed

+93
-72
lines changed

src/storage/object_storage.rs

Lines changed: 93 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -889,6 +889,16 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
889889
"Error calculating field stats for stream {}: {}",
890890
stream_name, err
891891
);
892+
} else {
893+
let stats_stream = PARSEABLE
894+
.get_stream(DATASET_STATS_STREAM_NAME)
895+
.expect("Dataset stats stream should exist");
896+
if let Err(err) = stats_stream.flush_and_convert(false, false).await {
897+
warn!(
898+
"Error flushing dataset stats stream {}: {}",
899+
DATASET_STATS_STREAM_NAME, err
900+
);
901+
}
892902
}
893903
}
894904
if let Err(e) = remove_file(path) {
@@ -965,7 +975,7 @@ async fn calculate_field_stats(
965975
.create_stream_if_not_exists(
966976
DATASET_STATS_STREAM_NAME,
967977
StreamType::Internal,
968-
Some(&"dataset_name".to_string()),
978+
Some(&"dataset_name".into()),
969979
vec![log_source_entry],
970980
)
971981
.await?;
@@ -1034,42 +1044,102 @@ async fn collect_all_field_stats(
10341044
.await
10351045
}
10361046

1037-
/// Calculates statistics for a single field in the stream.
1038-
/// Returns `None` if the count query returns 0.
1047+
/// This function is used to fetch distinct values and their counts for a field in the stream.
1048+
/// Returns a vector of `DistinctStat` containing distinct values and their counts.
1049+
/// The query groups by the field and orders by the count in descending order, limiting the results to `PARSEABLE.options.max_field_statistics`.
10391050
async fn calculate_single_field_stats(
10401051
ctx: SessionContext,
10411052
stream_name: &str,
10421053
field_name: &str,
10431054
) -> Option<FieldStat> {
1044-
// Use the escaped field name in the SQL query to avoid issues with special characters
1045-
let escaped_field_name = field_name.replace('"', "\"\"");
1046-
let escaped_stream_name = stream_name.replace('"', "\"\"");
1047-
let count_distinct_sql = format!(
1048-
"select COUNT(\"{escaped_field_name}\") as count, COUNT(DISTINCT \"{escaped_field_name}\") as distinct_count from \"{escaped_stream_name}\" ");
1049-
1050-
let df = ctx.sql(&count_distinct_sql).await.ok()?;
1051-
let batches = df.collect().await.ok()?;
1052-
let batch = batches.first()?;
1053-
if batch.num_rows() == 0 {
1054-
return None;
1055-
}
1056-
let count_array = batch.column(0).as_any().downcast_ref::<Int64Array>()?;
1057-
let distinct_count_array = batch.column(1).as_any().downcast_ref::<Int64Array>()?;
1055+
let mut total_count = 0;
1056+
let mut distinct_count = 0;
1057+
let mut distinct_stats = Vec::new();
10581058

1059-
let count = count_array.value(0);
1060-
let distinct_count = distinct_count_array.value(0);
1061-
if distinct_count == 0 {
1062-
return None;
1059+
let combined_sql = get_stats_sql(stream_name, field_name);
1060+
if let Ok(df) = ctx.sql(&combined_sql).await {
1061+
let mut stream = match df.execute_stream().await {
1062+
Ok(stream) => stream,
1063+
Err(e) => {
1064+
warn!("Failed to execute distinct stats query: {e}");
1065+
return None; // Return empty if query fails
1066+
}
1067+
};
1068+
while let Some(batch_result) = stream.next().await {
1069+
let rb = match batch_result {
1070+
Ok(batch) => batch,
1071+
Err(e) => {
1072+
warn!("Failed to fetch batch in distinct stats query: {e}");
1073+
continue; // Skip this batch if there's an error
1074+
}
1075+
};
1076+
let total_count_array = rb.column(0).as_any().downcast_ref::<Int64Array>()?;
1077+
let distinct_count_array = rb.column(1).as_any().downcast_ref::<Int64Array>()?;
1078+
1079+
total_count = total_count_array.value(0);
1080+
distinct_count = distinct_count_array.value(0);
1081+
1082+
if distinct_count == 0 {
1083+
return None;
1084+
}
1085+
1086+
let field_value_array = rb.column(2).as_ref();
1087+
let value_count_array = rb.column(3).as_any().downcast_ref::<Int64Array>()?;
1088+
1089+
for i in 0..rb.num_rows() {
1090+
let value = format_arrow_value(field_value_array, i);
1091+
let count = value_count_array.value(i);
1092+
1093+
distinct_stats.push(DistinctStat {
1094+
distinct_value: value,
1095+
count,
1096+
});
1097+
}
1098+
}
10631099
}
1064-
let distinct_stats = query_distinct_stats(&ctx, stream_name, field_name).await;
10651100
Some(FieldStat {
10661101
field_name: field_name.to_string(),
1067-
count,
1102+
count: total_count,
10681103
distinct_count,
10691104
distinct_stats,
10701105
})
10711106
}
10721107

1108+
fn get_stats_sql(stream_name: &str, field_name: &str) -> String {
1109+
let escaped_field_name = field_name.replace('"', "\"\"");
1110+
let escaped_stream_name = stream_name.replace('"', "\"\"");
1111+
1112+
format!(
1113+
r#"
1114+
WITH field_groups AS (
1115+
SELECT
1116+
"{escaped_field_name}" as field_value,
1117+
COUNT(*) as value_count
1118+
FROM "{escaped_stream_name}"
1119+
GROUP BY "{escaped_field_name}"
1120+
),
1121+
field_summary AS (
1122+
SELECT
1123+
field_value,
1124+
value_count,
1125+
SUM(value_count) OVER () as total_count,
1126+
COUNT(*) OVER () as distinct_count,
1127+
ROW_NUMBER() OVER (ORDER BY value_count DESC) as rn
1128+
FROM field_groups
1129+
)
1130+
SELECT
1131+
total_count,
1132+
distinct_count,
1133+
field_value,
1134+
value_count
1135+
FROM field_summary
1136+
WHERE rn <= {}
1137+
ORDER BY value_count DESC
1138+
"#,
1139+
PARSEABLE.options.max_field_statistics
1140+
)
1141+
}
1142+
10731143
macro_rules! try_downcast {
10741144
($ty:ty, $arr:expr, $body:expr) => {
10751145
if let Some(arr) = $arr.as_any().downcast_ref::<$ty>() {
@@ -1183,55 +1253,6 @@ fn format_arrow_value(array: &dyn Array, idx: usize) -> String {
11831253
}
11841254
}
11851255

1186-
/// This function is used to fetch distinct values and their counts for a field in the stream.
1187-
/// Returns a vector of `DistinctStat` containing distinct values and their counts.
1188-
/// The query groups by the field and orders by the count in descending order, limiting the results to `PARSEABLE.options.max_field_statistics`.
1189-
async fn query_distinct_stats(
1190-
ctx: &SessionContext,
1191-
stream_name: &str,
1192-
field_name: &str,
1193-
) -> Vec<DistinctStat> {
1194-
let escaped_field_name = field_name.replace('"', "\"\"");
1195-
let escaped_stream_name = stream_name.replace('"', "\"\"");
1196-
1197-
let sql = format!(
1198-
"select count(*) as distinct_count, \"{escaped_field_name}\" from \"{escaped_stream_name}\" group by \"{escaped_field_name}\" order by distinct_count desc limit {}",
1199-
PARSEABLE.options.max_field_statistics
1200-
);
1201-
let mut distinct_stats = Vec::new();
1202-
if let Ok(df) = ctx.sql(&sql).await {
1203-
let mut stream = match df.execute_stream().await {
1204-
Ok(stream) => stream,
1205-
Err(e) => {
1206-
warn!("Failed to execute distinct stats query: {e}");
1207-
return distinct_stats; // Return empty if query fails
1208-
}
1209-
};
1210-
while let Some(batch_result) = stream.next().await {
1211-
let rb = match batch_result {
1212-
Ok(batch) => batch,
1213-
Err(e) => {
1214-
warn!("Failed to fetch batch in distinct stats query: {e}");
1215-
continue; // Skip this batch if there's an error
1216-
}
1217-
};
1218-
let Some(counts) = rb.column(0).as_any().downcast_ref::<Int64Array>() else {
1219-
warn!("Unexpected type for count column in stats query");
1220-
continue;
1221-
};
1222-
let values = rb.column(1).as_ref();
1223-
for i in 0..rb.num_rows() {
1224-
let value = format_arrow_value(values, i);
1225-
distinct_stats.push(DistinctStat {
1226-
distinct_value: value,
1227-
count: counts.value(i),
1228-
});
1229-
}
1230-
}
1231-
}
1232-
distinct_stats
1233-
}
1234-
12351256
pub async fn commit_schema_to_storage(
12361257
stream_name: &str,
12371258
schema: Schema,

0 commit comments

Comments
 (0)