Skip to content

Commit ef00f83

Browse files
refactor
1 parent 45725a9 commit ef00f83

File tree

2 files changed

+47
-24
lines changed

2 files changed

+47
-24
lines changed

src/cli.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,15 @@ pub struct Options {
388388
help = "Maximum level of flattening allowed for events"
389389
)]
390390
pub event_flatten_level: usize,
391+
392+
// maximum limit to store the statistics for a field
393+
#[arg(
394+
long,
395+
env = "P_MAX_FIELD_STATISTICS",
396+
default_value = "50",
397+
help = "Maximum number of field statistics to store"
398+
)]
399+
pub max_field_statistics: usize,
391400
}
392401

393402
#[derive(Parser, Debug)]

src/parseable/streams.rs

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,13 @@
1616
*
1717
*
1818
*/
19-
20-
use std::{
21-
collections::{HashMap, HashSet},
22-
fs::{remove_file, write, File, OpenOptions},
23-
num::NonZeroU32,
24-
path::{Path, PathBuf},
25-
process,
26-
sync::{Arc, Mutex, RwLock},
27-
time::{Instant, SystemTime, UNIX_EPOCH},
28-
};
29-
30-
use arrow_array::{Array, Float64Array, Int64Array, NullArray, StringArray};
19+
use arrow_array::{Array, Date32Array, Float64Array, Int64Array, NullArray, StringArray};
3120
use arrow_array::{BooleanArray, RecordBatch, TimestampMillisecondArray};
3221
use arrow_schema::{Field, Fields, Schema};
3322
use chrono::{DateTime, NaiveDateTime, Timelike, Utc};
3423
use datafusion::{datasource::MemTable, prelude::SessionContext};
3524
use derive_more::{Deref, DerefMut};
36-
use futures::stream::{FuturesUnordered, StreamExt};
25+
use futures_util::StreamExt;
3726
use itertools::Itertools;
3827
use parquet::{
3928
arrow::ArrowWriter,
@@ -45,6 +34,15 @@ use parquet::{
4534
use rand::distributions::DistString;
4635
use relative_path::RelativePathBuf;
4736
use serde::Serialize;
37+
use std::{
38+
collections::{HashMap, HashSet},
39+
fs::{remove_file, write, File, OpenOptions},
40+
num::NonZeroU32,
41+
path::{Path, PathBuf},
42+
process,
43+
sync::{Arc, Mutex, RwLock},
44+
time::{Instant, SystemTime, UNIX_EPOCH},
45+
};
4846
use tokio::task::JoinSet;
4947
use tracing::{error, info, trace, warn};
5048

@@ -76,6 +74,8 @@ use super::{
7674
LogStream, ARROW_FILE_EXTENSION,
7775
};
7876

77+
const MAX_CONCURRENT_FIELD_STATS: usize = 10;
78+
7979
#[derive(Serialize, Debug)]
8080
struct DistinctStat {
8181
distinct_value: String,
@@ -355,7 +355,7 @@ impl Stream {
355355
// if yes, then merge them and save
356356

357357
if let Some(mut schema) = schema {
358-
if !&self.stream_name.contains(INTERNAL_STREAM_NAME) {
358+
if self.get_stream_type() != StreamType::Internal {
359359
if let Err(err) = self.calculate_field_stats(rbs, schema.clone().into()).await {
360360
warn!(
361361
"Error calculating field stats for stream {}: {}",
@@ -826,19 +826,25 @@ impl Stream {
826826
ctx: &SessionContext,
827827
schema: &Arc<Schema>,
828828
) -> Vec<FieldStat> {
829-
let field_futures = schema.fields().iter().map(|field| {
829+
// Collect field names into an owned Vec<String> to avoid lifetime issues
830+
let field_names: Vec<String> = schema
831+
.fields()
832+
.iter()
833+
.map(|field| field.name().clone())
834+
.collect();
835+
836+
let field_futures = field_names.into_iter().map(|field_name| {
830837
let ctx = ctx.clone();
831838
let stream_name = self.stream_name.clone();
832-
let field_name = field.name().clone();
833839
async move { Self::calculate_single_field_stats(ctx, stream_name, field_name).await }
834840
});
835841

836-
FuturesUnordered::from_iter(field_futures)
842+
futures::stream::iter(field_futures)
843+
.buffer_unordered(MAX_CONCURRENT_FIELD_STATS)
837844
.filter_map(|x| async { x })
838845
.collect::<Vec<_>>()
839846
.await
840847
}
841-
842848
async fn calculate_single_field_stats(
843849
ctx: SessionContext,
844850
stream_name: String,
@@ -876,11 +882,12 @@ impl Stream {
876882
async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option<i64> {
877883
let df = ctx.sql(sql).await.ok()?;
878884
let batches = df.collect().await.ok()?;
879-
let array = batches
880-
.first()?
881-
.column(0)
882-
.as_any()
883-
.downcast_ref::<arrow_array::Int64Array>()?;
885+
let batch = batches.first()?;
886+
if batch.num_rows() == 0 {
887+
return None;
888+
}
889+
let array = batch.column(0).as_any().downcast_ref::<Int64Array>()?;
890+
884891
Some(array.value(0))
885892
}
886893

@@ -899,11 +906,17 @@ impl Stream {
899906
DateTime::from_timestamp_millis(timestamp)
900907
.map(|dt| dt.to_string())
901908
.unwrap_or_else(|| "INVALID_TIMESTAMP".to_string())
909+
} else if let Some(arr) = array.as_any().downcast_ref::<Date32Array>() {
910+
return arr.value(idx).to_string();
902911
} else if let Some(arr) = array.as_any().downcast_ref::<BooleanArray>() {
903912
arr.value(idx).to_string()
904913
} else if array.as_any().downcast_ref::<NullArray>().is_some() {
905914
"NULL".to_string()
906915
} else {
916+
warn!(
917+
"Unsupported array type for statistics: {:?}",
918+
array.data_type()
919+
);
907920
"UNSUPPORTED".to_string()
908921
}
909922
}
@@ -914,7 +927,8 @@ impl Stream {
914927
field_name: &str,
915928
) -> Vec<DistinctStat> {
916929
let sql = format!(
917-
"select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" where \"{field_name}\" is not null group by \"{field_name}\" order by distinct_count desc limit 50"
930+
"select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" where \"{field_name}\" is not null group by \"{field_name}\" order by distinct_count desc limit {}",
931+
PARSEABLE.options.max_field_statistics
918932
);
919933
let mut distinct_stats = Vec::new();
920934
if let Ok(df) = ctx.sql(&sql).await {

0 commit comments

Comments
 (0)