Skip to content

Commit ae92749

Browse files
optimise streaming
1 parent 4731d40 commit ae92749

File tree

1 file changed

+29
-35
lines changed

1 file changed

+29
-35
lines changed

src/handlers/http/query.rs

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::handlers::http::fetch_schema;
2121
use actix_web::http::header::ContentType;
2222
use actix_web::web::{self, Json};
2323
use actix_web::{Either, FromRequest, HttpRequest, HttpResponse, Responder};
24+
use arrow_array::RecordBatch;
2425
use bytes::Bytes;
2526
use chrono::{DateTime, Utc};
2627
use datafusion::common::tree_node::TreeNode;
@@ -248,7 +249,6 @@ async fn handle_streaming_query(
248249
}
249250
Either::Right(stream) => stream,
250251
};
251-
let fields = fields.clone();
252252
let total_time = format!("{:?}", time.elapsed());
253253
let time = time.elapsed().as_secs_f64();
254254
QUERY_EXECUTE_TIME
@@ -266,22 +266,10 @@ async fn handle_streaming_query(
266266
.to_string();
267267

268268
// stream the records without fields
269-
let records_stream = records_stream.map(move |batch_result| match batch_result {
270-
Ok(batch) => {
271-
let response = QueryResponse {
272-
records: vec![batch],
273-
fields: Vec::new(),
274-
fill_null: send_null,
275-
with_fields: false,
276-
}
277-
.to_json()
278-
.unwrap_or_else(|e| {
279-
error!("Failed to parse record batch into JSON: {}", e);
280-
json!({})
281-
});
282-
Ok(Bytes::from(format!("{}\n", response)))
283-
}
284-
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
269+
let mut batch_processor = create_batch_processor(send_null);
270+
let records_stream = records_stream.map(move |batch_result| {
271+
let batch_result = batch_result.map_err(QueryError::from);
272+
batch_processor(batch_result)
285273
});
286274

287275
// Combine the initial fields chunk with the records stream
@@ -292,24 +280,9 @@ async fn handle_streaming_query(
292280
Box::pin(fields_chunk.chain(records_stream))
293281
as Pin<Box<dyn Stream<Item = Result<Bytes, actix_web::Error>>>>
294282
} else {
295-
let stream = records_stream.map(move |batch_result| match batch_result {
296-
Ok(batch) => {
297-
let response = QueryResponse {
298-
records: vec![batch],
299-
fields: fields.clone(),
300-
fill_null: send_null,
301-
with_fields,
302-
}
303-
.to_json()
304-
.unwrap_or_else(|e| {
305-
error!("Failed to parse record batch into JSON: {}", e);
306-
json!({})
307-
});
308-
Ok(Bytes::from(format!("{}\n", response)))
309-
}
310-
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
311-
});
312-
283+
let mut batch_processor = create_batch_processor(send_null);
284+
let stream = records_stream
285+
.map(move |batch_result| batch_processor(batch_result.map_err(QueryError::from)));
313286
Box::pin(stream) as Pin<Box<dyn Stream<Item = Result<Bytes, actix_web::Error>>>>
314287
};
315288

@@ -319,6 +292,27 @@ async fn handle_streaming_query(
319292
.streaming(stream))
320293
}
321294

295+
fn create_batch_processor(
296+
send_null: bool,
297+
) -> impl FnMut(Result<RecordBatch, QueryError>) -> Result<Bytes, actix_web::Error> {
298+
move |batch_result| match batch_result {
299+
Ok(batch) => {
300+
let response = QueryResponse {
301+
records: vec![batch],
302+
fields: Vec::new(),
303+
fill_null: send_null,
304+
with_fields: false,
305+
}
306+
.to_json()
307+
.map_err(|e| {
308+
error!("Failed to parse record batch into JSON: {}", e);
309+
actix_web::error::ErrorInternalServerError(e)
310+
})?;
311+
Ok(Bytes::from(format!("{}\n", response)))
312+
}
313+
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
314+
}
315+
}
322316
pub async fn get_counts(
323317
req: HttpRequest,
324318
counts_request: Json<CountsRequest>,

0 commit comments

Comments
 (0)