Skip to content

update user auth for list filters #1295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 1 addition & 51 deletions src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

use arrow_array::{Float64Array, Int64Array, RecordBatch};
use datafusion::{
common::tree_node::TreeNode,
functions_aggregate::{
count::count,
expr_fn::avg,
Expand All @@ -31,63 +30,14 @@ use datafusion::{
use tracing::trace;

use crate::{
alerts::LogicalOperator,
parseable::PARSEABLE,
query::{TableScanVisitor, QUERY_SESSION},
rbac::{
map::SessionKey,
role::{Action, Permission},
Users,
},
utils::time::TimeRange,
alerts::LogicalOperator, parseable::PARSEABLE, query::QUERY_SESSION, utils::time::TimeRange,
};

use super::{
AggregateConfig, AggregateFunction, AggregateResult, Aggregates, AlertConfig, AlertError,
AlertOperator, AlertState, ConditionConfig, Conditions, WhereConfigOperator, ALERTS,
};

async fn get_tables_from_query(query: &str) -> Result<TableScanVisitor, AlertError> {
let session_state = QUERY_SESSION.state();
let raw_logical_plan = session_state.create_logical_plan(query).await?;

let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
Ok(visitor)
}

pub async fn user_auth_for_query(session_key: &SessionKey, query: &str) -> Result<(), AlertError> {
let tables = get_tables_from_query(query).await?;
let permissions = Users.get_permissions(session_key);

for table_name in tables.into_inner().iter() {
let mut authorized = false;

// in permission check if user can run query on the stream.
// also while iterating add any filter tags for this stream
for permission in permissions.iter() {
match permission {
Permission::Stream(Action::All, _) => {
authorized = true;
break;
}
Permission::StreamWithTag(Action::Query, ref stream, _)
if stream == table_name || stream == "*" =>
{
authorized = true;
}
_ => (),
}
}

if !authorized {
return Err(AlertError::Unauthorized);
}
}

Ok(())
}

/// accept the alert
///
/// alert contains aggregate_config
Expand Down
2 changes: 1 addition & 1 deletion src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/

use actix_web::http::header::ContentType;
use alerts_utils::user_auth_for_query;
use async_trait::async_trait;
use chrono::Utc;
use derive_more::derive::FromStr;
Expand All @@ -43,6 +42,7 @@ use crate::rbac::map::SessionKey;
use crate::storage;
use crate::storage::ObjectStorageError;
use crate::sync::alert_runtime;
use crate::utils::user_auth_for_query;

use self::target::Target;

Expand Down
6 changes: 3 additions & 3 deletions src/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
rbac::{map::SessionKey, Users},
storage::ObjectStorageError,
users::filters::FilterQuery,
utils::{get_hash, user_auth_for_query},
utils::{get_hash, user_auth_for_datasets},
};

pub static CORRELATIONS: Lazy<Correlations> = Lazy::new(Correlations::default);
Expand Down Expand Up @@ -87,7 +87,7 @@ impl Correlations {
.iter()
.map(|t| t.table_name.clone())
.collect_vec();
if user_auth_for_query(&permissions, tables).is_ok() {
if user_auth_for_datasets(&permissions, tables).is_ok() {
user_correlations.push(correlation.clone());
}
}
Expand Down Expand Up @@ -281,7 +281,7 @@ impl CorrelationConfig {
.map(|t| t.table_name.clone())
.collect_vec();

user_auth_for_query(&permissions, tables)?;
user_auth_for_datasets(&permissions, tables)?;

// to validate table config, we need to check whether the mentioned fields
// are present in the table or not
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::utils::arrow::flight::{
send_to_ingester,
};
use crate::utils::time::TimeRange;
use crate::utils::user_auth_for_query;
use crate::utils::user_auth_for_datasets;
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
Expand Down Expand Up @@ -211,7 +211,7 @@ impl FlightService for AirServiceImpl {

let permissions = Users.get_permissions(&key);

user_auth_for_query(&permissions, &streams).map_err(|_| {
user_auth_for_datasets(&permissions, &streams).map_err(|_| {
Status::permission_denied("User Does not have permission to access this")
})?;
let time = Instant::now();
Expand Down
6 changes: 2 additions & 4 deletions src/handlers/http/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
parseable::PARSEABLE,
storage::object_storage::alert_json_path,
// sync::schedule_alert_task,
utils::actix::extract_session_key_from_req,
utils::{actix::extract_session_key_from_req, user_auth_for_query},
};
use actix_web::{
web::{self, Json, Path},
Expand All @@ -31,9 +31,7 @@ use actix_web::{
use bytes::Bytes;
use ulid::Ulid;

use crate::alerts::{
alerts_utils::user_auth_for_query, AlertConfig, AlertError, AlertRequest, AlertState, ALERTS,
};
use crate::alerts::{AlertConfig, AlertError, AlertRequest, AlertState, ALERTS};

// GET /alerts
/// User needs at least a read access to the stream(s) that is being referenced in an alert
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use itertools::Itertools;

use crate::rbac::Users;
use crate::utils::actix::extract_session_key_from_req;
use crate::utils::{get_hash, get_user_from_request, user_auth_for_query};
use crate::utils::{get_hash, get_user_from_request, user_auth_for_datasets};

use crate::correlation::{CorrelationConfig, CorrelationError, CORRELATIONS};

Expand Down Expand Up @@ -54,7 +54,7 @@ pub async fn get(
.map(|t| t.table_name.clone())
.collect_vec();

user_auth_for_query(&permissions, tables)?;
user_auth_for_datasets(&permissions, tables)?;

Ok(web::Json(correlation))
}
Expand Down
6 changes: 3 additions & 3 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::storage::object_storage::commit_schema_to_storage;
use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;
use crate::utils::time::{TimeParseError, TimeRange};
use crate::utils::user_auth_for_query;
use crate::utils::user_auth_for_datasets;

/// Query Request through http endpoint.
#[derive(Debug, Deserialize, Serialize, Clone)]
Expand Down Expand Up @@ -99,7 +99,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
.first_table_name()
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;

user_auth_for_query(&permissions, &tables)?;
user_auth_for_datasets(&permissions, &tables)?;

let time = Instant::now();
// Intercept `count(*)`` queries and use the counts API
Expand Down Expand Up @@ -162,7 +162,7 @@ pub async fn get_counts(
let permissions = Users.get_permissions(&creds);

// does user have access to table?
user_auth_for_query(&permissions, &[counts_request.stream.clone()])?;
user_auth_for_datasets(&permissions, &[counts_request.stream.clone()])?;

let records = counts_request.get_bin_density().await?;

Expand Down
7 changes: 5 additions & 2 deletions src/users/dashboards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ use serde_json::Value;
use tokio::sync::RwLock;

use crate::{
alerts::alerts_utils::user_auth_for_query, migration::to_bytes, parseable::PARSEABLE,
rbac::map::SessionKey, storage::object_storage::dashboard_path, utils::get_hash,
migration::to_bytes,
parseable::PARSEABLE,
rbac::map::SessionKey,
storage::object_storage::dashboard_path,
utils::{get_hash, user_auth_for_query},
};

use super::TimeFilter;
Expand Down
45 changes: 39 additions & 6 deletions src/users/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ use tokio::sync::RwLock;

use super::TimeFilter;
use crate::{
alerts::alerts_utils::user_auth_for_query, migration::to_bytes, parseable::PARSEABLE,
rbac::map::SessionKey, storage::object_storage::filter_path, utils::get_hash,
migration::to_bytes,
parseable::PARSEABLE,
rbac::{map::SessionKey, Users},
storage::object_storage::filter_path,
utils::{get_hash, user_auth_for_datasets, user_auth_for_query},
};

pub static FILTERS: Lazy<Filters> = Lazy::new(Filters::default);
Expand All @@ -42,11 +45,29 @@ pub struct Filter {

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct FilterQuery {
pub filter_type: String,
pub filter_type: FilterType,
pub filter_query: Option<String>,
pub filter_builder: Option<FilterBuilder>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum FilterType {
Filter,
SQL,
Search,
}

impl FilterType {
pub fn to_str(&self) -> &str {
match self {
FilterType::Filter => "filter",
FilterType::SQL => "sql",
FilterType::Search => "search",
}
}
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct FilterBuilder {
pub id: String,
Expand Down Expand Up @@ -163,9 +184,21 @@ impl Filters {
} else {
continue;
};

if (user_auth_for_query(key, query).await).is_ok() {
filters.push(f.clone())
let filter_type = &f.query.filter_type;

// if filter type is one of SQL or filter
// then check if the user has access to the dataset based on the query string
// if filter type is search then check if the user has access to the dataset based on the dataset name
if *filter_type == FilterType::SQL || *filter_type == FilterType::Filter {
if (user_auth_for_query(key, query).await).is_ok() {
filters.push(f.clone())
}
} else if *filter_type == FilterType::Search {
let dataset_name = &f.stream_name;
let permissions = Users.get_permissions(key);
if user_auth_for_datasets(&permissions, &[dataset_name.to_string()]).is_ok() {
filters.push(f.clone())
}
}
}
filters
Expand Down
27 changes: 26 additions & 1 deletion src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ pub mod uid;
pub mod update;

use crate::handlers::http::rbac::RBACError;
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::map::SessionKey;
use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use actix::extract_session_key_from_req;
use actix_web::HttpRequest;
use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc};
use datafusion::common::tree_node::TreeNode;
use regex::Regex;
use sha2::{Digest, Sha256};
use tracing::debug;
Expand Down Expand Up @@ -82,7 +85,29 @@ pub fn get_hash(key: &str) -> String {
result
}

pub fn user_auth_for_query(
async fn get_tables_from_query(query: &str) -> Result<TableScanVisitor, actix_web::error::Error> {
let session_state = QUERY_SESSION.state();
let raw_logical_plan = session_state
.create_logical_plan(query)
.await
.map_err(|e| actix_web::error::ErrorInternalServerError(format!("Query error: {}", e)))?;

let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
Ok(visitor)
}

pub async fn user_auth_for_query(
session_key: &SessionKey,
query: &str,
) -> Result<(), actix_web::error::Error> {
let tables = get_tables_from_query(query).await?;
let permissions = Users.get_permissions(session_key);
let tables = tables.into_inner();
user_auth_for_datasets(&permissions, &tables)
}

pub fn user_auth_for_datasets(
permissions: &[Permission],
tables: &[String],
) -> Result<(), actix_web::error::Error> {
Expand Down
Loading