Skip to content

Commit 090f9a6

Browse files
author
Devdutt Shenoi
committed
recactor: query only expects time_partition
1 parent 868ada3 commit 090f9a6

File tree

5 files changed

+30
-26
lines changed

5 files changed

+30
-26
lines changed

src/alerts/alerts_utils.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use tracing::trace;
3131

3232
use crate::{
3333
alerts::AggregateCondition,
34+
parseable::PARSEABLE,
3435
query::{TableScanVisitor, QUERY_SESSION},
3536
rbac::{
3637
map::SessionKey,
@@ -137,8 +138,9 @@ async fn execute_base_query(
137138
AlertError::CustomError(format!("Table name not found in query- {}", original_query))
138139
})?;
139140

141+
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
140142
query
141-
.get_dataframe(stream_name)
143+
.get_dataframe(time_partition.as_ref())
142144
.await
143145
.map_err(|err| AlertError::CustomError(err.to_string()))
144146
}

src/alerts/mod.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use ulid::Ulid;
3737
pub mod alerts_utils;
3838
pub mod target;
3939

40-
use crate::parseable::PARSEABLE;
40+
use crate::parseable::{StreamNotFound, PARSEABLE};
4141
use crate::query::{TableScanVisitor, QUERY_SESSION};
4242
use crate::rbac::map::SessionKey;
4343
use crate::storage;
@@ -514,17 +514,16 @@ impl AlertConfig {
514514

515515
// for now proceed in a similar fashion as we do in query
516516
// TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data)
517-
let stream_name = if let Some(stream_name) = query.first_table_name() {
518-
stream_name
519-
} else {
517+
let Some(stream_name) = query.first_table_name() else {
520518
return Err(AlertError::CustomError(format!(
521519
"Table name not found in query- {}",
522520
self.query
523521
)));
524522
};
525523

524+
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
526525
let base_df = query
527-
.get_dataframe(stream_name)
526+
.get_dataframe(time_partition.as_ref())
528527
.await
529528
.map_err(|err| AlertError::CustomError(err.to_string()))?;
530529

@@ -704,6 +703,8 @@ pub enum AlertError {
704703
CustomError(String),
705704
#[error("Invalid State Change: {0}")]
706705
InvalidStateChange(String),
706+
#[error("{0}")]
707+
StreamNotFound(#[from] StreamNotFound),
707708
}
708709

709710
impl actix_web::ResponseError for AlertError {
@@ -717,6 +718,7 @@ impl actix_web::ResponseError for AlertError {
717718
Self::DatafusionError(_) => StatusCode::INTERNAL_SERVER_ERROR,
718719
Self::CustomError(_) => StatusCode::BAD_REQUEST,
719720
Self::InvalidStateChange(_) => StatusCode::BAD_REQUEST,
721+
Self::StreamNotFound(_) => StatusCode::NOT_FOUND,
720722
}
721723
}
722724

src/handlers/airplane.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,11 +216,12 @@ impl FlightService for AirServiceImpl {
216216
})?;
217217
let time = Instant::now();
218218

219-
let stream = PARSEABLE
219+
let time_partition = PARSEABLE
220220
.get_stream(&stream_name)
221-
.map_err(|err| Status::internal(err.to_string()))?;
221+
.map_err(|err| Status::internal(err.to_string()))?
222+
.get_time_partition();
222223
let (records, _) = query
223-
.execute(&stream)
224+
.execute(time_partition.as_ref())
224225
.await
225226
.map_err(|err| Status::internal(err.to_string()))?;
226227

src/handlers/http/query.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
131131
return Ok(HttpResponse::Ok().json(response));
132132
}
133133

134-
let stream = PARSEABLE.get_stream(&table_name)?;
135-
let (records, fields) = query.execute(&stream).await?;
134+
let time_partition = PARSEABLE.get_stream(&table_name)?.get_time_partition();
135+
let (records, fields) = query.execute(time_partition.as_ref()).await?;
136136

137137
let response = QueryResponse {
138138
records,
@@ -321,7 +321,7 @@ Description: {0}"#
321321
#[error("Error: {0}")]
322322
Anyhow(#[from] anyhow::Error),
323323
#[error("Error: {0}")]
324-
StreamNotFound(#[from] StreamNotFound)
324+
StreamNotFound(#[from] StreamNotFound),
325325
}
326326

327327
impl actix_web::ResponseError for QueryError {

src/query/mod.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use crate::catalog::Snapshot as CatalogSnapshot;
5353
use crate::event;
5454
use crate::handlers::http::query::QueryError;
5555
use crate::option::Mode;
56-
use crate::parseable::{Stream, PARSEABLE};
56+
use crate::parseable::PARSEABLE;
5757
use crate::storage::{ObjectStorageProvider, ObjectStoreFormat, STREAM_ROOT_DIRECTORY};
5858
use crate::utils::time::TimeRange;
5959

@@ -131,12 +131,10 @@ impl Query {
131131

132132
pub async fn execute(
133133
&self,
134-
stream: &Stream,
134+
time_partition: Option<&String>,
135135
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
136-
let time_partition = stream.get_time_partition();
137-
138136
let df = QUERY_SESSION
139-
.execute_logical_plan(self.final_logical_plan(&time_partition))
137+
.execute_logical_plan(self.final_logical_plan(time_partition))
140138
.await?;
141139

142140
let fields = df
@@ -155,18 +153,19 @@ impl Query {
155153
Ok((results, fields))
156154
}
157155

158-
pub async fn get_dataframe(&self, stream_name: String) -> Result<DataFrame, ExecuteError> {
159-
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
160-
156+
pub async fn get_dataframe(
157+
&self,
158+
time_partition: Option<&String>,
159+
) -> Result<DataFrame, ExecuteError> {
161160
let df = QUERY_SESSION
162-
.execute_logical_plan(self.final_logical_plan(&time_partition))
161+
.execute_logical_plan(self.final_logical_plan(time_partition))
163162
.await?;
164163

165164
Ok(df)
166165
}
167166

168167
/// return logical plan with all time filters applied through
169-
fn final_logical_plan(&self, time_partition: &Option<String>) -> LogicalPlan {
168+
fn final_logical_plan(&self, time_partition: Option<&String>) -> LogicalPlan {
170169
// see https://github.com/apache/arrow-datafusion/pull/8400
171170
// this can be eliminated in later version of datafusion but with slight caveat
172171
// transform cannot modify stringified plans by itself
@@ -486,7 +485,7 @@ fn transform(
486485
plan: LogicalPlan,
487486
start_time: NaiveDateTime,
488487
end_time: NaiveDateTime,
489-
time_partition: &Option<String>,
488+
time_partition: Option<&String>,
490489
) -> Transformed<LogicalPlan> {
491490
plan.transform(&|plan| match plan {
492491
LogicalPlan::TableScan(table) => {
@@ -544,7 +543,7 @@ fn transform(
544543

545544
fn table_contains_any_time_filters(
546545
table: &datafusion::logical_expr::TableScan,
547-
time_partition: &Option<String>,
546+
time_partition: Option<&String>,
548547
) -> bool {
549548
table
550549
.filters
@@ -558,8 +557,8 @@ fn table_contains_any_time_filters(
558557
})
559558
.any(|expr| {
560559
matches!(&*expr.left, Expr::Column(Column { name, .. })
561-
if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) ||
562-
(!time_partition.is_some() && name == event::DEFAULT_TIMESTAMP_KEY)))
560+
if (time_partition.is_some_and(|field| field == name) ||
561+
(time_partition.is_none() && name == event::DEFAULT_TIMESTAMP_KEY)))
563562
})
564563
}
565564

0 commit comments

Comments
 (0)