Skip to content

Commit 17c6edd

Browse files
fix: multi threaded query execution
add multi threading execution to query execute method allows datafusion to use multiple threads to perform parallel execution of plans improves query performance
1 parent 887a63f commit 17c6edd

File tree

4 files changed

+19
-11
lines changed

4 files changed

+19
-11
lines changed

src/handlers/airplane.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,14 @@ impl FlightService for AirServiceImpl {
215215
Status::permission_denied("User Does not have permission to access this")
216216
})?;
217217
let time = Instant::now();
218-
let (records, _) = query
219-
.execute(stream_name.clone())
220-
.await
221-
.map_err(|err| Status::internal(err.to_string()))?;
218+
219+
let stream_name_clone = stream_name.clone();
220+
let (records, _) =
221+
match tokio::task::spawn_blocking(move || query.execute(stream_name_clone)).await {
222+
Ok(Ok((records, fields))) => (records, fields),
223+
Ok(Err(e)) => return Err(Status::internal(e.to_string())),
224+
Err(err) => return Err(Status::internal(err.to_string())),
225+
};
222226

223227
/*
224228
* INFO: No returning the schema with the data.

src/handlers/http/query.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,13 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
130130

131131
return Ok(HttpResponse::Ok().json(response));
132132
}
133-
let (records, fields) = query.execute(table_name.clone()).await?;
134-
133+
let table_name_clone = table_name.clone();
134+
let (records, fields) =
135+
match tokio::task::spawn_blocking(move || query.execute(table_name_clone)).await {
136+
Ok(Ok((records, fields))) => (records, fields),
137+
Ok(Err(e)) => return Err(QueryError::Execute(e)),
138+
Err(e) => return Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))),
139+
};
135140
let response = QueryResponse {
136141
records,
137142
fields,

src/parseable/streams.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -513,10 +513,7 @@ impl Stream {
513513
let file_size = match file.metadata() {
514514
Ok(meta) => meta.len(),
515515
Err(err) => {
516-
warn!(
517-
"File ({}) not found; Error = {err}",
518-
file.display()
519-
);
516+
warn!("File ({}) not found; Error = {err}", file.display());
520517
continue;
521518
}
522519
};

src/query/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ impl Query {
9191
let mut config = SessionConfig::default()
9292
.with_parquet_pruning(true)
9393
.with_prefer_existing_sort(true)
94-
.with_round_robin_repartition(true);
94+
.with_round_robin_repartition(true)
95+
.with_batch_size(8192);
9596

9697
// For more details refer https://datafusion.apache.org/user-guide/configs.html
9798

@@ -135,6 +136,7 @@ impl Query {
135136
SessionContext::new_with_state(state)
136137
}
137138

139+
#[tokio::main(flavor = "multi_thread")]
138140
pub async fn execute(
139141
&self,
140142
stream_name: String,

0 commit comments

Comments
 (0)