diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 9312ba77d..9d151349b 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -71,6 +71,51 @@ pub struct Query { pub filter_tags: Option>, } +/// A function to execute the query and fetch QueryResponse +/// This won't look in the cache +/// TODO: Improve this function and make this a part of the query API +pub async fn get_records_and_fields( + query_request: &Query, + req: &HttpRequest, +) -> Result<(Option>, Option>), QueryError> { + let session_state = QUERY_SESSION.state(); + + // get the logical plan and extract the table name + let raw_logical_plan = session_state + .create_logical_plan(&query_request.query) + .await?; + + let time_range = + TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?; + // create a visitor to extract the table name + let mut visitor = TableScanVisitor::default(); + let _ = raw_logical_plan.visit(&mut visitor); + + let tables = visitor.into_inner(); + update_schema_when_distributed(&tables).await?; + let query: LogicalQuery = into_query(query_request, &session_state, time_range).await?; + + let creds = extract_session_key_from_req(req)?; + let permissions = Users.get_permissions(&creds); + + let table_name = query + .first_table_name() + .ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?; + + user_auth_for_datasets(&permissions, &tables)?; + + let (records, fields) = execute(query, &table_name, false).await?; + + let records = match records { + Either::Left(vec_rb) => vec_rb, + Either::Right(_) => { + return Err(QueryError::CustomError("Reject streaming response".into())) + } + }; + + Ok((Some(records), Some(fields))) +} + pub async fn query(req: HttpRequest, query_request: Query) -> Result { let session_state = QUERY_SESSION.state(); let raw_logical_plan = match session_state diff --git a/src/utils/mod.rs b/src/utils/mod.rs index f2e2685d6..010e9f594 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -131,3 +131,22 @@ pub fn user_auth_for_datasets( Ok(()) } + +/// A function to extract table names from a SQL string +pub async fn extract_tables(sql: &str) -> Option> { + let session_state = QUERY_SESSION.state(); + + // get the logical plan and extract the table name + let raw_logical_plan = match session_state.create_logical_plan(sql).await { + Ok(plan) => plan, + Err(_) => return None, + }; + + // create a visitor to extract the table name + let mut visitor = TableScanVisitor::default(); + let _ = raw_logical_plan.visit(&mut visitor); + + let tables = visitor.into_inner(); + + Some(tables) +}