diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 60a7e01c4..4ab04918e 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -216,11 +216,8 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { web::scope(&base_path()) // POST "/query" ==> Get results of the SQL query passed in request body .service( - web::resource("/query").route( - web::post() - .to(query::query) - .authorize_for_stream(Action::Query), - ), + web::resource("/query") + .route(web::post().to(query::query).authorize(Action::Query)), ) // POST "/ingest" ==> Post logs to given log stream based on header .service( diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 056c29052..22a4c7fcc 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -16,10 +16,15 @@ * */ +use actix_web::error::ErrorUnauthorized; use actix_web::http::header::ContentType; -use actix_web::{web, HttpRequest, Responder}; +use actix_web::web::Json; +use actix_web::{FromRequest, HttpRequest, Responder}; +use actix_web_httpauth::extractors::basic::BasicAuth; +use futures_util::Future; use http::StatusCode; use serde_json::Value; +use std::pin::Pin; use std::time::Instant; use crate::handlers::FILL_NULL_OPTION_KEY; @@ -27,27 +32,17 @@ use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::CONFIG; use crate::query::error::{ExecuteError, ParseError}; use crate::query::Query; +use crate::rbac::role::{Action, Permission}; +use crate::rbac::Users; use crate::response::QueryResponse; -pub async fn query( - _req: HttpRequest, - json: web::Json, -) -> Result { +pub async fn query(query: Query) -> Result { let time = Instant::now(); - let json = json.into_inner(); - - let fill_null = json - .as_object() - .and_then(|map| map.get(FILL_NULL_OPTION_KEY)) - .and_then(|value| value.as_bool()) - .unwrap_or_default(); - - let query = Query::parse(json)?; let storage = CONFIG.storage().get_object_store(); let query_result = query.execute(storage).await; let query_result = query_result - .map(|(records, fields)| QueryResponse::new(records, fields, fill_null)) + .map(|(records, fields)| QueryResponse::new(records, fields, query.fill_null)) .map(|response| response.to_http()) .map_err(|e| e.into()); @@ -59,6 +54,67 @@ pub async fn query( query_result } +impl FromRequest for Query { + type Error = actix_web::Error; + type Future = Pin>>>; + + fn from_request(req: &HttpRequest, payload: &mut actix_web::dev::Payload) -> Self::Future { + let creds = BasicAuth::extract(req) + .into_inner() + .expect("expects basic auth"); + // Extract username and password from the request using basic auth extractor. + let username = creds.user_id().trim().to_owned(); + let password = creds.password().unwrap_or("").trim().to_owned(); + let permissions = Users.get_permissions(username, password); + let json = Json::::from_request(req, payload); + + let fut = async move { + let json = json.await?.into_inner(); + // maybe move this option to query param instead so that it can simply be extracted from request + let fill_null = json + .as_object() + .and_then(|map| map.get(FILL_NULL_OPTION_KEY)) + .and_then(|value| value.as_bool()) + .unwrap_or_default(); + + let mut query = Query::parse(json).map_err(QueryError::Parse)?; + query.fill_null = fill_null; + + let mut authorized = false; + let mut tags = Vec::new(); + + // in permission check if user can run query on the stream. + // also while iterating add any filter tags for this stream + for permission in permissions { + match permission { + Permission::Stream(Action::All, _) => authorized = true, + Permission::StreamWithTag(Action::Query, stream, tag) + if stream == query.stream_name => + { + authorized = true; + if let Some(tag) = tag { + tags.push(tag) + } + } + _ => (), + } + } + + if !authorized { + return Err(ErrorUnauthorized("Not authorized")); + } + + if !tags.is_empty() { + query.filter_tag = Some(tags) + } + + Ok(query) + }; + + Box::pin(fut) + } +} + #[derive(Debug, thiserror::Error)] pub enum QueryError { #[error("Bad request: {0}")] diff --git a/server/src/query.rs b/server/src/query.rs index 6498ce236..7943643ee 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -16,10 +16,13 @@ * */ +mod filter_optimizer; + use chrono::TimeZone; use chrono::{DateTime, Utc}; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::execution::context::SessionState; use datafusion::prelude::*; use itertools::Itertools; use serde_json::Value; @@ -33,6 +36,7 @@ use crate::utils::TimePeriod; use crate::validator; use self::error::{ExecuteError, ParseError}; +use self::filter_optimizer::FilterOptimizerRule; type Key = &'static str; fn get_value(value: &Value, key: Key) -> Result<&str, Key> { @@ -46,6 +50,8 @@ pub struct Query { pub schema: Arc, pub start: DateTime, pub end: DateTime, + pub filter_tag: Option>, + pub fill_null: bool, } impl Query { @@ -72,17 +78,30 @@ impl Query { .collect() } + // create session context for this query + fn create_session_context(&self) -> SessionContext { + let config = SessionConfig::default(); + let runtime = CONFIG.storage().get_datafusion_runtime(); + let mut state = SessionState::with_config_rt(config, runtime); + + if let Some(tag) = &self.filter_tag { + let filter = FilterOptimizerRule { + column: crate::event::DEFAULT_TAGS_KEY.to_string(), + literals: tag.clone(), + }; + state = state.add_optimizer_rule(Arc::new(filter)) + } + + SessionContext::with_state(state) + } + /// Execute query on object storage(and if necessary on cache as well) with given stream information /// TODO: find a way to query all selected parquet files together in a single context. pub async fn execute( &self, storage: Arc, ) -> Result<(Vec, Vec), ExecuteError> { - let ctx = SessionContext::with_config_rt( - SessionConfig::default(), - CONFIG.storage().get_datafusion_runtime(), - ); - + let ctx = self.create_session_context(); let prefixes = self.get_prefixes(); let Some(table) = storage.query_table(prefixes, Arc::clone(&self.schema))? else { return Ok((Vec::new(), Vec::new())) }; diff --git a/server/src/query/filter_optimizer.rs b/server/src/query/filter_optimizer.rs new file mode 100644 index 000000000..be055de00 --- /dev/null +++ b/server/src/query/filter_optimizer.rs @@ -0,0 +1,136 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{collections::HashMap, sync::Arc}; + +use datafusion::{ + common::{DFField, DFSchema}, + logical_expr::{Filter, LogicalPlan, Projection}, + optimizer::{optimize_children, OptimizerRule}, + prelude::{lit, or, Column, Expr}, + scalar::ScalarValue, +}; + +/// Rewrites logical plan for source using projection and filter +pub struct FilterOptimizerRule { + pub column: String, + pub literals: Vec, +} + +impl OptimizerRule for FilterOptimizerRule { + fn try_optimize( + &self, + plan: &datafusion::logical_expr::LogicalPlan, + config: &dyn datafusion::optimizer::OptimizerConfig, + ) -> datafusion::error::Result> { + // if there are no patterns then the rule cannot be performed + let Some(filter_expr) = self.expr() else { return Ok(None); }; + + if let LogicalPlan::TableScan(table) = plan { + if table.projection.is_none() + || table + .filters + .iter() + .any(|expr| self.contains_valid_tag_filter(expr)) + { + return Ok(None); + } + + let mut table = table.clone(); + let schema = &table.source.schema(); + + if !table + .projected_schema + .has_column_with_unqualified_name(&self.column) + { + let tags_index = schema.index_of(&self.column)?; + let tags_field = schema.field(tags_index); + // modify source table projection to include tags + let mut df_schema = table.projected_schema.fields().clone(); + df_schema.push(DFField::new( + Some(table.table_name.clone()), + tags_field.name(), + tags_field.data_type().clone(), + tags_field.is_nullable(), + )); + + table.projected_schema = + Arc::new(DFSchema::new_with_metadata(df_schema, HashMap::default())?); + if let Some(projection) = &mut table.projection { + projection.push(tags_index) + } + } + + let projected_schema = table.projected_schema.clone(); + let filter = LogicalPlan::Filter(Filter::try_new( + filter_expr, + Arc::new(LogicalPlan::TableScan(table)), + )?); + let plan = LogicalPlan::Projection(Projection::new_from_schema( + Arc::new(filter), + projected_schema, + )); + + return Ok(Some(plan)); + } + + // If we didn't find anything then recurse as normal and build the result. + optimize_children(self, plan, config) + } + + fn name(&self) -> &str { + "parseable_read_filter" + } +} + +impl FilterOptimizerRule { + fn expr(&self) -> Option { + let mut patterns = self.literals.iter().map(|literal| { + Expr::Column(Column::from_name(&self.column)).like(lit(format!("%{}%", literal))) + }); + + let Some(mut filter_expr) = patterns.next() else { return None }; + for expr in patterns { + filter_expr = or(filter_expr, expr) + } + + Some(filter_expr) + } + + fn contains_valid_tag_filter(&self, expr: &Expr) -> bool { + match expr { + Expr::Like(like) => { + let matches_column = match &*like.expr { + Expr::Column(column) => column.name == self.column, + _ => return false, + }; + + let matches_pattern = match &*like.pattern { + Expr::Literal(ScalarValue::Utf8(Some(literal))) => { + let literal = literal.trim_matches('%'); + self.literals.iter().any(|x| x == literal) + } + _ => false, + }; + + matches_column && matches_pattern && !like.negated + } + _ => false, + } + } +} diff --git a/server/src/rbac.rs b/server/src/rbac.rs index a6001ce86..60517e4e2 100644 --- a/server/src/rbac.rs +++ b/server/src/rbac.rs @@ -24,6 +24,8 @@ use crate::rbac::map::{auth_map, mut_auth_map, mut_user_map, user_map}; use crate::rbac::role::{model::DefaultPrivilege, Action}; use crate::rbac::user::User; +use self::role::Permission; + // This type encapsulates both the user_map and auth_map // so other entities deal with only this type pub struct Users; @@ -68,6 +70,11 @@ impl Users { user_map().contains_key(username) } + pub fn get_permissions(&self, username: String, password: String) -> Vec { + let key = (username, password); + auth_map().get(&key).cloned().unwrap_or_default() + } + pub fn authenticate( &self, username: String, diff --git a/server/src/rbac/map.rs b/server/src/rbac/map.rs index 6d742fc0a..0c8e0263e 100644 --- a/server/src/rbac/map.rs +++ b/server/src/rbac/map.rs @@ -107,6 +107,10 @@ impl AuthMap { self.inner.retain(|(x, _), _| x != username) } + pub fn get(&self, key: &(String, String)) -> Option<&Vec> { + self.inner.get(key) + } + // returns None if user is not in the map // Otherwise returns Some(is_authenticated) pub fn check_auth( @@ -120,7 +124,8 @@ impl AuthMap { match *user_perm { // if any action is ALL then we we authorize Permission::Unit(action) => action == required_action || action == Action::All, - Permission::Stream(action, ref stream) => { + Permission::Stream(action, ref stream) + | Permission::StreamWithTag(action, ref stream, _) => { let ok_stream = if let Some(on_stream) = on_stream { stream == on_stream || stream == "*" } else { diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index 50d5a0c62..62cb3320f 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -43,6 +43,7 @@ pub enum Action { pub enum Permission { Unit(Action), Stream(Action, String), + StreamWithTag(Action, String, Option), } // Currently Roles are tied to one stream @@ -70,7 +71,11 @@ impl RoleBuilder { for action in self.actions { let perm = match action { Action::Ingest => Permission::Stream(action, self.stream.clone().unwrap()), - Action::Query => Permission::Stream(action, self.stream.clone().unwrap()), + Action::Query => Permission::StreamWithTag( + action, + self.stream.clone().unwrap(), + self.tag.clone(), + ), Action::CreateStream => Permission::Unit(action), Action::DeleteStream => Permission::Unit(action), Action::ListStream => Permission::Unit(action), diff --git a/server/src/validator.rs b/server/src/validator.rs index 3f10fe236..463a9d3f0 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -192,6 +192,8 @@ pub fn query(query: &str, start_time: &str, end_time: &str) -> Result