Skip to content

Commit f55dee4

Browse files
author
Devdutt Shenoi
committed
feat: dedicated runtime for all queries
1 parent 090f9a6 commit f55dee4

File tree

4 files changed

+22
-11
lines changed

4 files changed

+22
-11
lines changed

src/handlers/airplane.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::handlers::http::query::{into_query, update_schema_when_distributed};
3838
use crate::handlers::livetail::cross_origin_config;
3939
use crate::metrics::QUERY_EXECUTE_TIME;
4040
use crate::parseable::PARSEABLE;
41-
use crate::query::{TableScanVisitor, QUERY_SESSION};
41+
use crate::query::{execute, TableScanVisitor, QUERY_SESSION};
4242
use crate::utils::arrow::flight::{
4343
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
4444
send_to_ingester,
@@ -216,12 +216,7 @@ impl FlightService for AirServiceImpl {
216216
})?;
217217
let time = Instant::now();
218218

219-
let time_partition = PARSEABLE
220-
.get_stream(&stream_name)
221-
.map_err(|err| Status::internal(err.to_string()))?
222-
.get_time_partition();
223-
let (records, _) = query
224-
.execute(time_partition.as_ref())
219+
let (records, _) = execute(query, &stream_name)
225220
.await
226221
.map_err(|err| Status::internal(err.to_string()))?;
227222

src/handlers/http/query.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use crate::metrics::QUERY_EXECUTE_TIME;
4141
use crate::option::Mode;
4242
use crate::parseable::{StreamNotFound, PARSEABLE};
4343
use crate::query::error::ExecuteError;
44-
use crate::query::{CountsRequest, CountsResponse, Query as LogicalQuery};
44+
use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery};
4545
use crate::query::{TableScanVisitor, QUERY_SESSION};
4646
use crate::rbac::Users;
4747
use crate::response::QueryResponse;
@@ -131,8 +131,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
131131
return Ok(HttpResponse::Ok().json(response));
132132
}
133133

134-
let time_partition = PARSEABLE.get_stream(&table_name)?.get_time_partition();
135-
let (records, fields) = query.execute(time_partition.as_ref()).await?;
134+
let (records, fields) = execute(query, &table_name).await?;
136135

137136
let response = QueryResponse {
138137
records,

src/parseable/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, StatusCode};
2828
use once_cell::sync::Lazy;
2929
pub use staging::StagingError;
3030
use streams::StreamRef;
31-
pub use streams::{StreamNotFound, Streams, Stream};
31+
pub use streams::{Stream, StreamNotFound, Streams};
3232
use tracing::error;
3333

3434
#[cfg(feature = "kafka")]

src/query/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use std::ops::Bound;
4242
use std::sync::Arc;
4343
use stream_schema_provider::collect_manifest_files;
4444
use sysinfo::System;
45+
use tokio::runtime::Runtime;
4546

4647
use self::error::ExecuteError;
4748
use self::stream_schema_provider::GlobalSchemaProvider;
@@ -60,6 +61,21 @@ use crate::utils::time::TimeRange;
6061
pub static QUERY_SESSION: Lazy<SessionContext> =
6162
Lazy::new(|| Query::create_session_context(PARSEABLE.storage()));
6263

64+
/// Dedicated multi-threaded runtime to run
65+
pub static QUERY_RUNTIME: Lazy<Runtime> =
66+
Lazy::new(|| Runtime::new().expect("Runtime should be constructible"));
67+
68+
pub async fn execute(
69+
query: Query,
70+
stream_name: &str,
71+
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
72+
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
73+
QUERY_RUNTIME
74+
.spawn(async move { query.execute(time_partition.as_ref()).await })
75+
.await
76+
.expect("The Join should have been successful")
77+
}
78+
6379
// A query request by client
6480
#[derive(Debug)]
6581
pub struct Query {
@@ -150,6 +166,7 @@ impl Query {
150166
}
151167

152168
let results = df.collect().await?;
169+
153170
Ok((results, fields))
154171
}
155172

0 commit comments

Comments
 (0)