Skip to content

perf: don't construct a tokio runtime for each query #1226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tracing::trace;

use crate::{
alerts::AggregateCondition,
parseable::PARSEABLE,
query::{TableScanVisitor, QUERY_SESSION},
rbac::{
map::SessionKey,
Expand Down Expand Up @@ -137,8 +138,9 @@ async fn execute_base_query(
AlertError::CustomError(format!("Table name not found in query- {}", original_query))
})?;

let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
query
.get_dataframe(stream_name)
.get_dataframe(time_partition.as_ref())
.await
.map_err(|err| AlertError::CustomError(err.to_string()))
}
Expand Down
12 changes: 7 additions & 5 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use ulid::Ulid;
pub mod alerts_utils;
pub mod target;

use crate::parseable::PARSEABLE;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::map::SessionKey;
use crate::storage;
Expand Down Expand Up @@ -514,17 +514,16 @@ impl AlertConfig {

// for now proceed in a similar fashion as we do in query
// TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data)
let stream_name = if let Some(stream_name) = query.first_table_name() {
stream_name
} else {
let Some(stream_name) = query.first_table_name() else {
return Err(AlertError::CustomError(format!(
"Table name not found in query- {}",
self.query
)));
};

let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
let base_df = query
.get_dataframe(stream_name)
.get_dataframe(time_partition.as_ref())
.await
.map_err(|err| AlertError::CustomError(err.to_string()))?;

Expand Down Expand Up @@ -704,6 +703,8 @@ pub enum AlertError {
CustomError(String),
#[error("Invalid State Change: {0}")]
InvalidStateChange(String),
#[error("{0}")]
StreamNotFound(#[from] StreamNotFound),
}

impl actix_web::ResponseError for AlertError {
Expand All @@ -717,6 +718,7 @@ impl actix_web::ResponseError for AlertError {
Self::DatafusionError(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::CustomError(_) => StatusCode::BAD_REQUEST,
Self::InvalidStateChange(_) => StatusCode::BAD_REQUEST,
Self::StreamNotFound(_) => StatusCode::NOT_FOUND,
}
}

Expand Down
12 changes: 4 additions & 8 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::handlers::http::query::{into_query, update_schema_when_distributed};
use crate::handlers::livetail::cross_origin_config;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::parseable::PARSEABLE;
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::query::{execute, TableScanVisitor, QUERY_SESSION};
use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
Expand Down Expand Up @@ -216,13 +216,9 @@ impl FlightService for AirServiceImpl {
})?;
let time = Instant::now();

let stream_name_clone = stream_name.clone();
let (records, _) =
match tokio::task::spawn_blocking(move || query.execute(stream_name_clone)).await {
Ok(Ok((records, fields))) => (records, fields),
Ok(Err(e)) => return Err(Status::internal(e.to_string())),
Err(err) => return Err(Status::internal(err.to_string())),
};
let (records, _) = execute(query, &stream_name)
.await
.map_err(|err| Status::internal(err.to_string()))?;

/*
* INFO: No returning the schema with the data.
Expand Down
21 changes: 6 additions & 15 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
use arrow_array::RecordBatch;
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
Expand All @@ -40,9 +39,9 @@ use crate::handlers::http::fetch_schema;
use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::Mode;
use crate::parseable::PARSEABLE;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::query::error::ExecuteError;
use crate::query::{CountsRequest, CountsResponse, Query as LogicalQuery};
use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::Users;
use crate::response::QueryResponse;
Expand Down Expand Up @@ -131,7 +130,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons

return Ok(HttpResponse::Ok().json(response));
}
let (records, fields) = execute_query(query, table_name.clone()).await?;

let (records, fields) = execute(query, &table_name).await?;

let response = QueryResponse {
records,
Expand All @@ -150,17 +150,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
Ok(response)
}

async fn execute_query(
query: LogicalQuery,
stream_name: String,
) -> Result<(Vec<RecordBatch>, Vec<String>), QueryError> {
match tokio::task::spawn_blocking(move || query.execute(stream_name)).await {
Ok(Ok(result)) => Ok(result),
Ok(Err(e)) => Err(QueryError::Execute(e)),
Err(e) => Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))),
}
}

pub async fn get_counts(
req: HttpRequest,
counts_request: Json<CountsRequest>,
Expand Down Expand Up @@ -330,6 +319,8 @@ Description: {0}"#
ActixError(#[from] actix_web::Error),
#[error("Error: {0}")]
Anyhow(#[from] anyhow::Error),
#[error("Error: {0}")]
StreamNotFound(#[from] StreamNotFound),
}

impl actix_web::ResponseError for QueryError {
Expand Down
2 changes: 1 addition & 1 deletion src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, StatusCode};
use once_cell::sync::Lazy;
pub use staging::StagingError;
use streams::StreamRef;
pub use streams::{StreamNotFound, Streams};
pub use streams::{Stream, StreamNotFound, Streams};
use tracing::error;

#[cfg(feature = "kafka")]
Expand Down
46 changes: 32 additions & 14 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use std::ops::Bound;
use std::sync::Arc;
use stream_schema_provider::collect_manifest_files;
use sysinfo::System;
use tokio::runtime::Runtime;

use self::error::ExecuteError;
use self::stream_schema_provider::GlobalSchemaProvider;
Expand All @@ -60,6 +61,24 @@ use crate::utils::time::TimeRange;
pub static QUERY_SESSION: Lazy<SessionContext> =
Lazy::new(|| Query::create_session_context(PARSEABLE.storage()));

/// Dedicated multi-threaded runtime to run all queries on
pub static QUERY_RUNTIME: Lazy<Runtime> =
Lazy::new(|| Runtime::new().expect("Runtime should be constructible"));


/// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single thread/CPU
/// at a time and has access to the entire thread pool, enabling better concurrent processing, and thus quicker results.
pub async fn execute(
query: Query,
stream_name: &str,
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
QUERY_RUNTIME
.spawn(async move { query.execute(time_partition.as_ref()).await })
.await
.expect("The Join should have been successful")
}

// A query request by client
#[derive(Debug)]
pub struct Query {
Expand Down Expand Up @@ -129,15 +148,12 @@ impl Query {
SessionContext::new_with_state(state)
}

#[tokio::main(flavor = "multi_thread")]
pub async fn execute(
&self,
stream_name: String,
time_partition: Option<&String>,
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();

let df = QUERY_SESSION
.execute_logical_plan(self.final_logical_plan(&time_partition))
.execute_logical_plan(self.final_logical_plan(time_partition))
.await?;

let fields = df
Expand All @@ -153,21 +169,23 @@ impl Query {
}

let results = df.collect().await?;

Ok((results, fields))
}

pub async fn get_dataframe(&self, stream_name: String) -> Result<DataFrame, ExecuteError> {
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();

pub async fn get_dataframe(
&self,
time_partition: Option<&String>,
) -> Result<DataFrame, ExecuteError> {
let df = QUERY_SESSION
.execute_logical_plan(self.final_logical_plan(&time_partition))
.execute_logical_plan(self.final_logical_plan(time_partition))
.await?;

Ok(df)
}

/// return logical plan with all time filters applied through
fn final_logical_plan(&self, time_partition: &Option<String>) -> LogicalPlan {
fn final_logical_plan(&self, time_partition: Option<&String>) -> LogicalPlan {
// see https://github.com/apache/arrow-datafusion/pull/8400
// this can be eliminated in later version of datafusion but with slight caveat
// transform cannot modify stringified plans by itself
Expand Down Expand Up @@ -487,7 +505,7 @@ fn transform(
plan: LogicalPlan,
start_time: NaiveDateTime,
end_time: NaiveDateTime,
time_partition: &Option<String>,
time_partition: Option<&String>,
) -> Transformed<LogicalPlan> {
plan.transform(&|plan| match plan {
LogicalPlan::TableScan(table) => {
Expand Down Expand Up @@ -545,7 +563,7 @@ fn transform(

fn table_contains_any_time_filters(
table: &datafusion::logical_expr::TableScan,
time_partition: &Option<String>,
time_partition: Option<&String>,
) -> bool {
table
.filters
Expand All @@ -559,8 +577,8 @@ fn table_contains_any_time_filters(
})
.any(|expr| {
matches!(&*expr.left, Expr::Column(Column { name, .. })
if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) ||
(!time_partition.is_some() && name == event::DEFAULT_TIMESTAMP_KEY)))
if (time_partition.is_some_and(|field| field == name) ||
(time_partition.is_none() && name == event::DEFAULT_TIMESTAMP_KEY)))
})
}

Expand Down
Loading