Skip to content

Commit ce82b16

Browse files
committed
Helper functions for query
1 parent 232e031 commit ce82b16

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

src/handlers/http/query.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,52 @@ pub struct Query {
7171
pub filter_tags: Option<Vec<String>>,
7272
}
7373

74+
/// A function to execute the query and fetch QueryResponse
75+
/// This won't look in the cache
76+
/// TODO: Improve this function and make this a part of the query API
77+
pub async fn get_records_and_fields(
78+
query_request: &Query,
79+
req: &HttpRequest
80+
) -> Result<(Option<Vec<RecordBatch>>, Option<Vec<String>>), QueryError> {
81+
let session_state = QUERY_SESSION.state();
82+
83+
// get the logical plan and extract the table name
84+
let raw_logical_plan = session_state
85+
.create_logical_plan(&query_request.query)
86+
.await?;
87+
88+
let time_range = TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;
89+
// create a visitor to extract the table name
90+
let mut visitor = TableScanVisitor::default();
91+
let _ = raw_logical_plan.visit(&mut visitor);
92+
93+
let tables = visitor.into_inner();
94+
update_schema_when_distributed(&tables).await?;
95+
let query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;
96+
97+
let creds = extract_session_key_from_req(&req)?;
98+
let permissions = Users.get_permissions(&creds);
99+
100+
let table_name = query
101+
.first_table_name()
102+
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
103+
104+
user_auth_for_datasets(&permissions, &tables)?;
105+
106+
let (records, fields) = execute(query, &table_name, false).await?;
107+
108+
let records = match records {
109+
Either::Left(vec_rb) => {
110+
vec_rb
111+
},
112+
Either::Right(_) => {
113+
return Err(QueryError::CustomError("Reject streaming response".into()))
114+
},
115+
};
116+
117+
Ok((Some(records), Some(fields)))
118+
}
119+
74120
pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
75121
let session_state = QUERY_SESSION.state();
76122
let raw_logical_plan = match session_state

src/utils/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub mod time;
2525
pub mod uid;
2626
pub mod update;
2727

28+
use crate::handlers::http::query::QueryError;
2829
use crate::handlers::http::rbac::RBACError;
2930
use crate::query::{TableScanVisitor, QUERY_SESSION};
3031
use crate::rbac::map::SessionKey;
@@ -131,3 +132,24 @@ pub fn user_auth_for_datasets(
131132

132133
Ok(())
133134
}
135+
136+
/// A function to extract table names from a SQL string
137+
pub async fn extract_tables(sql: &str) -> Result<Option<Vec<String>>, QueryError> {
138+
let session_state = QUERY_SESSION.state();
139+
140+
// get the logical plan and extract the table name
141+
let raw_logical_plan = match session_state
142+
.create_logical_plan(sql)
143+
.await {
144+
Ok(plan) => plan,
145+
Err(_) => return Ok(None),
146+
};
147+
148+
// create a visitor to extract the table name
149+
let mut visitor = TableScanVisitor::default();
150+
let _ = raw_logical_plan.visit(&mut visitor);
151+
152+
let tables = visitor.into_inner();
153+
154+
Ok(Some(tables))
155+
}

0 commit comments

Comments
 (0)