Skip to content

Commit 2f00ad5

Browse files
query param for streaming, default false
1 parent 6dacb94 commit 2f00ad5

File tree

1 file changed

+108
-52
lines changed

1 file changed

+108
-52
lines changed

src/handlers/http/query.rs

Lines changed: 108 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use futures::StreamExt;
2828
use futures_util::Future;
2929
use http::StatusCode;
3030
use serde::{Deserialize, Serialize};
31-
use serde_json::{json, Value};
31+
use serde_json::json;
3232
use std::collections::HashMap;
3333
use std::pin::Pin;
3434
use std::sync::Arc;
@@ -43,15 +43,14 @@ use crate::metrics::QUERY_EXECUTE_TIME;
4343
use crate::option::Mode;
4444
use crate::parseable::{StreamNotFound, PARSEABLE};
4545
use crate::query::error::ExecuteError;
46-
use crate::query::{execute_stream, CountsRequest, CountsResponse, Query as LogicalQuery};
46+
use crate::query::{execute, execute_stream, CountsRequest, CountsResponse, Query as LogicalQuery};
4747
use crate::query::{TableScanVisitor, QUERY_SESSION};
4848
use crate::rbac::Users;
4949
use crate::response::{QueryResponse, TIME_ELAPSED_HEADER};
5050
use crate::storage::ObjectStorageError;
5151
use crate::utils::actix::extract_session_key_from_req;
5252
use crate::utils::time::{TimeParseError, TimeRange};
5353
use crate::utils::user_auth_for_datasets;
54-
use futures_core::Stream as CoreStream;
5554
/// Query Request through http endpoint.
5655
#[derive(Debug, Deserialize, Serialize, Clone)]
5756
#[serde(rename_all = "camelCase")]
@@ -64,6 +63,8 @@ pub struct Query {
6463
#[serde(skip)]
6564
pub fields: bool,
6665
#[serde(skip)]
66+
pub streaming: bool,
67+
#[serde(skip)]
6768
pub filter_tags: Option<Vec<String>>,
6869
}
6970

@@ -75,7 +76,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
7576
{
7677
Ok(raw_logical_plan) => raw_logical_plan,
7778
Err(_) => {
78-
//if logical plan creation fails, create streams and try again
7979
create_streams_for_querier().await;
8080
session_state
8181
.create_logical_plan(&query_request.query)
@@ -85,10 +85,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
8585
let time_range =
8686
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;
8787

88-
// Create a visitor to extract the table names present in query
8988
let mut visitor = TableScanVisitor::default();
9089
let _ = raw_logical_plan.visit(&mut visitor);
91-
9290
let tables = visitor.into_inner();
9391
update_schema_when_distributed(&tables).await?;
9492
let query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;
@@ -103,65 +101,118 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
103101
user_auth_for_datasets(&permissions, &tables)?;
104102

105103
let time = Instant::now();
106-
// Intercept `count(*)`` queries and use the counts API
104+
107105
if let Some(column_name) = query.is_logical_plan_count_without_filters() {
108-
let counts_req = CountsRequest {
109-
stream: table_name.clone(),
110-
start_time: query_request.start_time.clone(),
111-
end_time: query_request.end_time.clone(),
112-
num_bins: 1,
113-
};
114-
let count_records = counts_req.get_bin_density().await?;
115-
// NOTE: this should not panic, since there is atleast one bin, always
116-
let count = count_records[0].count;
117-
let response = if query_request.fields {
118-
json!({
119-
"fields": [&column_name],
120-
"records": [json!({column_name: count})]
121-
})
122-
} else {
123-
Value::Array(vec![json!({column_name: count})])
124-
};
106+
return handle_count_query(&query_request, &table_name, column_name, time).await;
107+
}
125108

126-
let total_time = format!("{:?}", time.elapsed());
127-
let time = time.elapsed().as_secs_f64();
109+
if !query_request.streaming {
110+
return handle_non_streaming_query(query, &table_name, &query_request, time).await;
111+
}
112+
113+
handle_streaming_query(query, &table_name, &query_request, time).await
114+
}
128115

129-
QUERY_EXECUTE_TIME
130-
.with_label_values(&[&table_name])
131-
.observe(time);
116+
async fn handle_count_query(
117+
query_request: &Query,
118+
table_name: &str,
119+
column_name: &str,
120+
time: Instant,
121+
) -> Result<HttpResponse, QueryError> {
122+
let counts_req = CountsRequest {
123+
stream: table_name.to_string(),
124+
start_time: query_request.start_time.clone(),
125+
end_time: query_request.end_time.clone(),
126+
num_bins: 1,
127+
};
128+
let count_records = counts_req.get_bin_density().await?;
129+
let count = count_records[0].count;
130+
let response = if query_request.fields {
131+
json!({
132+
"fields": [column_name],
133+
"records": [json!({column_name: count})]
134+
})
135+
} else {
136+
serde_json::Value::Array(vec![json!({column_name: count})])
137+
};
132138

133-
return Ok(HttpResponse::Ok()
134-
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
135-
.json(response));
139+
let total_time = format!("{:?}", time.elapsed());
140+
let time = time.elapsed().as_secs_f64();
141+
142+
QUERY_EXECUTE_TIME
143+
.with_label_values(&[table_name])
144+
.observe(time);
145+
146+
Ok(HttpResponse::Ok()
147+
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
148+
.json(response))
149+
}
150+
151+
async fn handle_non_streaming_query(
152+
query: LogicalQuery,
153+
table_name: &str,
154+
query_request: &Query,
155+
time: Instant,
156+
) -> Result<HttpResponse, QueryError> {
157+
let (records, fields) = execute(query, table_name).await?;
158+
let total_time = format!("{:?}", time.elapsed());
159+
let time = time.elapsed().as_secs_f64();
160+
161+
QUERY_EXECUTE_TIME
162+
.with_label_values(&[table_name])
163+
.observe(time);
164+
let response = QueryResponse {
165+
records,
166+
fields,
167+
fill_null: query_request.send_null,
168+
with_fields: query_request.fields,
136169
}
137-
let (records_stream, fields) = execute_stream(query, &table_name).await?;
170+
.to_http()?;
171+
Ok(HttpResponse::Ok()
172+
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
173+
.json(response))
174+
}
175+
176+
async fn handle_streaming_query(
177+
query: LogicalQuery,
178+
table_name: &str,
179+
query_request: &Query,
180+
time: Instant,
181+
) -> Result<HttpResponse, QueryError> {
182+
let (records_stream, fields) = execute_stream(query, table_name).await?;
138183
let fields = fields.clone();
139-
let stream = records_stream.map(move |batch_result| {
140-
match batch_result {
141-
Ok(batch) => {
142-
// convert record batch to JSON
143-
let response = QueryResponse {
144-
records: vec![batch],
145-
fields: fields.clone(),
146-
fill_null: query_request.send_null,
147-
with_fields: query_request.fields,
148-
}
149-
.to_http()
150-
.unwrap_or_else(|e| {
151-
error!("Failed to parse record batch into JSON: {}", e);
152-
json!({})
153-
});
154-
Ok(Bytes::from(format!("{}\n", response.to_string())))
184+
let total_time = format!("{:?}", time.elapsed());
185+
let time = time.elapsed().as_secs_f64();
186+
QUERY_EXECUTE_TIME
187+
.with_label_values(&[table_name])
188+
.observe(time);
189+
190+
let send_null = query_request.send_null;
191+
let with_fields = query_request.fields;
192+
193+
let stream = records_stream.map(move |batch_result| match batch_result {
194+
Ok(batch) => {
195+
let response = QueryResponse {
196+
records: vec![batch],
197+
fields: fields.clone(),
198+
fill_null: send_null,
199+
with_fields,
155200
}
156-
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
201+
.to_http()
202+
.unwrap_or_else(|e| {
203+
error!("Failed to parse record batch into JSON: {}", e);
204+
json!({})
205+
});
206+
Ok(Bytes::from(format!("{}\n", response)))
157207
}
208+
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
158209
});
159210

160-
let boxed_stream =
161-
Box::pin(stream) as Pin<Box<dyn CoreStream<Item = Result<Bytes, actix_web::Error>> + Send>>;
211+
let boxed_stream = Box::pin(stream);
162212

163213
Ok(HttpResponse::Ok()
164214
.content_type("application/json")
215+
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
165216
.streaming(boxed_stream))
166217
}
167218

@@ -234,6 +285,10 @@ impl FromRequest for Query {
234285
query.send_null = params.get("sendNull").cloned().unwrap_or(false);
235286
}
236287

288+
if !query.streaming {
289+
query.streaming = params.get("streaming").cloned().unwrap_or(false);
290+
}
291+
237292
Ok(query)
238293
};
239294

@@ -297,6 +352,7 @@ fn transform_query_for_ingestor(query: &Query) -> Option<Query> {
297352
send_null: query.send_null,
298353
start_time: start_time.to_rfc3339(),
299354
end_time: end_time.to_rfc3339(),
355+
streaming: query.streaming,
300356
};
301357

302358
Some(q)

0 commit comments

Comments
 (0)