Skip to content

Commit e922b07

Browse files
authored
Add query filter for read permissions (#429)
If a user has read privilege on a certain stream and tag, apply that filter to datafusion query node, so we return relevant fields only Part of #250
1 parent 3f4fab2 commit e922b07

File tree

8 files changed

+254
-27
lines changed

8 files changed

+254
-27
lines changed

server/src/handlers/http.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,11 +216,8 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) {
216216
web::scope(&base_path())
217217
// POST "/query" ==> Get results of the SQL query passed in request body
218218
.service(
219-
web::resource("/query").route(
220-
web::post()
221-
.to(query::query)
222-
.authorize_for_stream(Action::Query),
223-
),
219+
web::resource("/query")
220+
.route(web::post().to(query::query).authorize(Action::Query)),
224221
)
225222
// POST "/ingest" ==> Post logs to given log stream based on header
226223
.service(

server/src/handlers/http/query.rs

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,38 +16,33 @@
1616
*
1717
*/
1818

19+
use actix_web::error::ErrorUnauthorized;
1920
use actix_web::http::header::ContentType;
20-
use actix_web::{web, HttpRequest, Responder};
21+
use actix_web::web::Json;
22+
use actix_web::{FromRequest, HttpRequest, Responder};
23+
use actix_web_httpauth::extractors::basic::BasicAuth;
24+
use futures_util::Future;
2125
use http::StatusCode;
2226
use serde_json::Value;
27+
use std::pin::Pin;
2328
use std::time::Instant;
2429

2530
use crate::handlers::FILL_NULL_OPTION_KEY;
2631
use crate::metrics::QUERY_EXECUTE_TIME;
2732
use crate::option::CONFIG;
2833
use crate::query::error::{ExecuteError, ParseError};
2934
use crate::query::Query;
35+
use crate::rbac::role::{Action, Permission};
36+
use crate::rbac::Users;
3037
use crate::response::QueryResponse;
3138

32-
pub async fn query(
33-
_req: HttpRequest,
34-
json: web::Json<Value>,
35-
) -> Result<impl Responder, QueryError> {
39+
pub async fn query(query: Query) -> Result<impl Responder, QueryError> {
3640
let time = Instant::now();
37-
let json = json.into_inner();
38-
39-
let fill_null = json
40-
.as_object()
41-
.and_then(|map| map.get(FILL_NULL_OPTION_KEY))
42-
.and_then(|value| value.as_bool())
43-
.unwrap_or_default();
44-
45-
let query = Query::parse(json)?;
4641

4742
let storage = CONFIG.storage().get_object_store();
4843
let query_result = query.execute(storage).await;
4944
let query_result = query_result
50-
.map(|(records, fields)| QueryResponse::new(records, fields, fill_null))
45+
.map(|(records, fields)| QueryResponse::new(records, fields, query.fill_null))
5146
.map(|response| response.to_http())
5247
.map_err(|e| e.into());
5348

@@ -59,6 +54,67 @@ pub async fn query(
5954
query_result
6055
}
6156

57+
impl FromRequest for Query {
58+
type Error = actix_web::Error;
59+
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;
60+
61+
fn from_request(req: &HttpRequest, payload: &mut actix_web::dev::Payload) -> Self::Future {
62+
let creds = BasicAuth::extract(req)
63+
.into_inner()
64+
.expect("expects basic auth");
65+
// Extract username and password from the request using basic auth extractor.
66+
let username = creds.user_id().trim().to_owned();
67+
let password = creds.password().unwrap_or("").trim().to_owned();
68+
let permissions = Users.get_permissions(username, password);
69+
let json = Json::<Value>::from_request(req, payload);
70+
71+
let fut = async move {
72+
let json = json.await?.into_inner();
73+
// maybe move this option to query param instead so that it can simply be extracted from request
74+
let fill_null = json
75+
.as_object()
76+
.and_then(|map| map.get(FILL_NULL_OPTION_KEY))
77+
.and_then(|value| value.as_bool())
78+
.unwrap_or_default();
79+
80+
let mut query = Query::parse(json).map_err(QueryError::Parse)?;
81+
query.fill_null = fill_null;
82+
83+
let mut authorized = false;
84+
let mut tags = Vec::new();
85+
86+
// in permission check if user can run query on the stream.
87+
// also while iterating add any filter tags for this stream
88+
for permission in permissions {
89+
match permission {
90+
Permission::Stream(Action::All, _) => authorized = true,
91+
Permission::StreamWithTag(Action::Query, stream, tag)
92+
if stream == query.stream_name =>
93+
{
94+
authorized = true;
95+
if let Some(tag) = tag {
96+
tags.push(tag)
97+
}
98+
}
99+
_ => (),
100+
}
101+
}
102+
103+
if !authorized {
104+
return Err(ErrorUnauthorized("Not authorized"));
105+
}
106+
107+
if !tags.is_empty() {
108+
query.filter_tag = Some(tags)
109+
}
110+
111+
Ok(query)
112+
};
113+
114+
Box::pin(fut)
115+
}
116+
}
117+
62118
#[derive(Debug, thiserror::Error)]
63119
pub enum QueryError {
64120
#[error("Bad request: {0}")]

server/src/query.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
*
1717
*/
1818

19+
mod filter_optimizer;
20+
1921
use chrono::TimeZone;
2022
use chrono::{DateTime, Utc};
2123
use datafusion::arrow::datatypes::Schema;
2224
use datafusion::arrow::record_batch::RecordBatch;
25+
use datafusion::execution::context::SessionState;
2326
use datafusion::prelude::*;
2427
use itertools::Itertools;
2528
use serde_json::Value;
@@ -33,6 +36,7 @@ use crate::utils::TimePeriod;
3336
use crate::validator;
3437

3538
use self::error::{ExecuteError, ParseError};
39+
use self::filter_optimizer::FilterOptimizerRule;
3640

3741
type Key = &'static str;
3842
fn get_value(value: &Value, key: Key) -> Result<&str, Key> {
@@ -46,6 +50,8 @@ pub struct Query {
4650
pub schema: Arc<Schema>,
4751
pub start: DateTime<Utc>,
4852
pub end: DateTime<Utc>,
53+
pub filter_tag: Option<Vec<String>>,
54+
pub fill_null: bool,
4955
}
5056

5157
impl Query {
@@ -72,17 +78,30 @@ impl Query {
7278
.collect()
7379
}
7480

81+
// create session context for this query
82+
fn create_session_context(&self) -> SessionContext {
83+
let config = SessionConfig::default();
84+
let runtime = CONFIG.storage().get_datafusion_runtime();
85+
let mut state = SessionState::with_config_rt(config, runtime);
86+
87+
if let Some(tag) = &self.filter_tag {
88+
let filter = FilterOptimizerRule {
89+
column: crate::event::DEFAULT_TAGS_KEY.to_string(),
90+
literals: tag.clone(),
91+
};
92+
state = state.add_optimizer_rule(Arc::new(filter))
93+
}
94+
95+
SessionContext::with_state(state)
96+
}
97+
7598
/// Execute query on object storage(and if necessary on cache as well) with given stream information
7699
/// TODO: find a way to query all selected parquet files together in a single context.
77100
pub async fn execute(
78101
&self,
79102
storage: Arc<dyn ObjectStorage + Send>,
80103
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
81-
let ctx = SessionContext::with_config_rt(
82-
SessionConfig::default(),
83-
CONFIG.storage().get_datafusion_runtime(),
84-
);
85-
104+
let ctx = self.create_session_context();
86105
let prefixes = self.get_prefixes();
87106
let Some(table) = storage.query_table(prefixes, Arc::clone(&self.schema))? else { return Ok((Vec::new(), Vec::new())) };
88107

server/src/query/filter_optimizer.rs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use std::{collections::HashMap, sync::Arc};
20+
21+
use datafusion::{
22+
common::{DFField, DFSchema},
23+
logical_expr::{Filter, LogicalPlan, Projection},
24+
optimizer::{optimize_children, OptimizerRule},
25+
prelude::{lit, or, Column, Expr},
26+
scalar::ScalarValue,
27+
};
28+
29+
/// Rewrites logical plan for source using projection and filter
30+
pub struct FilterOptimizerRule {
31+
pub column: String,
32+
pub literals: Vec<String>,
33+
}
34+
35+
impl OptimizerRule for FilterOptimizerRule {
36+
fn try_optimize(
37+
&self,
38+
plan: &datafusion::logical_expr::LogicalPlan,
39+
config: &dyn datafusion::optimizer::OptimizerConfig,
40+
) -> datafusion::error::Result<Option<datafusion::logical_expr::LogicalPlan>> {
41+
// if there are no patterns then the rule cannot be performed
42+
let Some(filter_expr) = self.expr() else { return Ok(None); };
43+
44+
if let LogicalPlan::TableScan(table) = plan {
45+
if table.projection.is_none()
46+
|| table
47+
.filters
48+
.iter()
49+
.any(|expr| self.contains_valid_tag_filter(expr))
50+
{
51+
return Ok(None);
52+
}
53+
54+
let mut table = table.clone();
55+
let schema = &table.source.schema();
56+
57+
if !table
58+
.projected_schema
59+
.has_column_with_unqualified_name(&self.column)
60+
{
61+
let tags_index = schema.index_of(&self.column)?;
62+
let tags_field = schema.field(tags_index);
63+
// modify source table projection to include tags
64+
let mut df_schema = table.projected_schema.fields().clone();
65+
df_schema.push(DFField::new(
66+
Some(table.table_name.clone()),
67+
tags_field.name(),
68+
tags_field.data_type().clone(),
69+
tags_field.is_nullable(),
70+
));
71+
72+
table.projected_schema =
73+
Arc::new(DFSchema::new_with_metadata(df_schema, HashMap::default())?);
74+
if let Some(projection) = &mut table.projection {
75+
projection.push(tags_index)
76+
}
77+
}
78+
79+
let projected_schema = table.projected_schema.clone();
80+
let filter = LogicalPlan::Filter(Filter::try_new(
81+
filter_expr,
82+
Arc::new(LogicalPlan::TableScan(table)),
83+
)?);
84+
let plan = LogicalPlan::Projection(Projection::new_from_schema(
85+
Arc::new(filter),
86+
projected_schema,
87+
));
88+
89+
return Ok(Some(plan));
90+
}
91+
92+
// If we didn't find anything then recurse as normal and build the result.
93+
optimize_children(self, plan, config)
94+
}
95+
96+
fn name(&self) -> &str {
97+
"parseable_read_filter"
98+
}
99+
}
100+
101+
impl FilterOptimizerRule {
102+
fn expr(&self) -> Option<Expr> {
103+
let mut patterns = self.literals.iter().map(|literal| {
104+
Expr::Column(Column::from_name(&self.column)).like(lit(format!("%{}%", literal)))
105+
});
106+
107+
let Some(mut filter_expr) = patterns.next() else { return None };
108+
for expr in patterns {
109+
filter_expr = or(filter_expr, expr)
110+
}
111+
112+
Some(filter_expr)
113+
}
114+
115+
fn contains_valid_tag_filter(&self, expr: &Expr) -> bool {
116+
match expr {
117+
Expr::Like(like) => {
118+
let matches_column = match &*like.expr {
119+
Expr::Column(column) => column.name == self.column,
120+
_ => return false,
121+
};
122+
123+
let matches_pattern = match &*like.pattern {
124+
Expr::Literal(ScalarValue::Utf8(Some(literal))) => {
125+
let literal = literal.trim_matches('%');
126+
self.literals.iter().any(|x| x == literal)
127+
}
128+
_ => false,
129+
};
130+
131+
matches_column && matches_pattern && !like.negated
132+
}
133+
_ => false,
134+
}
135+
}
136+
}

server/src/rbac.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ use crate::rbac::map::{auth_map, mut_auth_map, mut_user_map, user_map};
2424
use crate::rbac::role::{model::DefaultPrivilege, Action};
2525
use crate::rbac::user::User;
2626

27+
use self::role::Permission;
28+
2729
// This type encapsulates both the user_map and auth_map
2830
// so other entities deal with only this type
2931
pub struct Users;
@@ -68,6 +70,11 @@ impl Users {
6870
user_map().contains_key(username)
6971
}
7072

73+
pub fn get_permissions(&self, username: String, password: String) -> Vec<Permission> {
74+
let key = (username, password);
75+
auth_map().get(&key).cloned().unwrap_or_default()
76+
}
77+
7178
pub fn authenticate(
7279
&self,
7380
username: String,

server/src/rbac/map.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ impl AuthMap {
107107
self.inner.retain(|(x, _), _| x != username)
108108
}
109109

110+
pub fn get(&self, key: &(String, String)) -> Option<&Vec<Permission>> {
111+
self.inner.get(key)
112+
}
113+
110114
// returns None if user is not in the map
111115
// Otherwise returns Some(is_authenticated)
112116
pub fn check_auth(
@@ -120,7 +124,8 @@ impl AuthMap {
120124
match *user_perm {
121125
// if any action is ALL then we we authorize
122126
Permission::Unit(action) => action == required_action || action == Action::All,
123-
Permission::Stream(action, ref stream) => {
127+
Permission::Stream(action, ref stream)
128+
| Permission::StreamWithTag(action, ref stream, _) => {
124129
let ok_stream = if let Some(on_stream) = on_stream {
125130
stream == on_stream || stream == "*"
126131
} else {

server/src/rbac/role.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub enum Action {
4343
pub enum Permission {
4444
Unit(Action),
4545
Stream(Action, String),
46+
StreamWithTag(Action, String, Option<String>),
4647
}
4748

4849
// Currently Roles are tied to one stream
@@ -70,7 +71,11 @@ impl RoleBuilder {
7071
for action in self.actions {
7172
let perm = match action {
7273
Action::Ingest => Permission::Stream(action, self.stream.clone().unwrap()),
73-
Action::Query => Permission::Stream(action, self.stream.clone().unwrap()),
74+
Action::Query => Permission::StreamWithTag(
75+
action,
76+
self.stream.clone().unwrap(),
77+
self.tag.clone(),
78+
),
7479
Action::CreateStream => Permission::Unit(action),
7580
Action::DeleteStream => Permission::Unit(action),
7681
Action::ListStream => Permission::Unit(action),

0 commit comments

Comments
 (0)