diff --git a/Cargo.lock b/Cargo.lock index 6340b6b17..ff97cce39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,7 +81,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "465a6172cf69b960917811022d8f29bc0b7fa1398bc4f78b3c466673db1213b6" dependencies = [ "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -215,7 +215,7 @@ dependencies = [ "actix-router", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -602,7 +602,7 @@ checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -641,7 +641,7 @@ dependencies = [ "percent-encoding", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1224,7 +1224,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1237,7 +1237,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1506,7 +1506,7 @@ dependencies = [ "proc-macro2", "quote", "scratch", - "syn", + "syn 1.0.107", ] [[package]] @@ -1523,7 +1523,7 @@ checksum = "ebf883b7aacd7b2aeb2a7b338648ee19f57c140d4ee8e52c68979c6b2f7f2263" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1727,7 +1727,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn", + "syn 1.0.107", ] [[package]] @@ -1896,9 +1896,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.26" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e5317663a9089767a1ec00a487df42e0ca174b61b4483213ac24448e4664df5" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", "futures-sink", @@ -1906,9 +1906,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.26" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" [[package]] name = "futures-executor" @@ -1923,9 +1923,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.26" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfb8371b6fb2aeb2d280374607aeabfc99d95c72edfe51692e42d3d7f0d08531" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" [[package]] name = "futures-lite" @@ -1944,26 +1944,26 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.26" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.16", ] [[package]] name = "futures-sink" -version = "0.3.26" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f310820bb3e8cfd46c80db4d7fb8353e15dfff853a127158425f31e0be6c8364" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" [[package]] name = "futures-task" -version = "0.3.26" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcf79a1bf610b10f42aea489289c5a2c478a786509693b80cd39c44ccd936366" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" [[package]] name = "futures-timer" @@ -1973,9 +1973,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.26" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c1d6de3acfef38d2be4b1f543f553131788603495be83da675e180c8d6b7bd1" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ "futures-channel", "futures-core", @@ -2824,7 +2824,7 @@ checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -2985,6 +2985,7 @@ dependencies = [ "env_logger", "fs_extra", "futures", + "futures-util", "hex", "hostname", "http", @@ -3128,7 +3129,7 @@ checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -3184,7 +3185,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", - "syn", + "syn 1.0.107", "version_check", ] @@ -3207,9 +3208,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.51" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d727cae5b39d21da60fa540906919ad737832fe0b1c165da3a34d6548c849d6" +checksum = "fa1fb82fc0c281dd9671101b66b771ebbe1eaf967b96ac8740dcba4b70005ca8" dependencies = [ "unicode-ident", ] @@ -3302,9 +3303,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.23" +version = "1.0.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" +checksum = "8f4f29d145265ec1c483c7c654450edde0bfe043d3938d6972630663356d9500" dependencies = [ "proc-macro2", ] @@ -3502,7 +3503,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn", + "syn 1.0.107", "unicode-ident", ] @@ -3672,7 +3673,7 @@ checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -3814,7 +3815,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -3857,7 +3858,7 @@ checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -3905,7 +3906,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn", + "syn 1.0.107", ] [[package]] @@ -3948,6 +3949,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn" +version = "2.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6f671d4b5ffdb8eadec19c0ae67fe2639df8684bd7bc4b83d986b8db549cf01" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "sysinfo" version = "0.28.4" @@ -4009,7 +4021,7 @@ checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -4124,7 +4136,7 @@ checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -4256,7 +4268,7 @@ checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -4483,7 +4495,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 1.0.107", "wasm-bindgen-shared", ] @@ -4517,7 +4529,7 @@ checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/server/Cargo.toml b/server/Cargo.toml index 8b821217f..3102c5628 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -79,6 +79,7 @@ pyroscope_pprofrs = { version = "0.2", optional = true } uptime_lib = "0.2.2" regex = "1.7.3" argon2 = "0.5.0" +futures-util = "0.3.28" [build-dependencies] static-files = "0.2" diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index cd9ddfca1..9e3b381cd 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -20,9 +20,8 @@ use std::fs::File; use std::io::BufReader; use actix_cors::Cors; -use actix_web::dev::{Service, ServiceRequest}; -use actix_web::error::ErrorBadRequest; -use actix_web::{middleware, web, App, HttpServer}; +use actix_web::dev::ServiceRequest; +use actix_web::{web, App, HttpMessage, HttpServer, Route}; use actix_web_httpauth::extractors::basic::BasicAuth; use actix_web_httpauth::middleware::HttpAuthentication; use actix_web_prometheus::PrometheusMetrics; @@ -31,11 +30,15 @@ use rustls::{Certificate, PrivateKey, ServerConfig}; use rustls_pemfile::{certs, pkcs8_private_keys}; use crate::option::CONFIG; -use crate::rbac::user_map; +use crate::rbac::role::Action; +use crate::rbac::Users; + +use self::middleware::{Authorization, DisAllowRootUser}; mod health_check; mod ingest; mod logstream; +mod middleware; mod query; mod rbac; @@ -51,8 +54,8 @@ macro_rules! create_app { App::new() .wrap($prometheus.clone()) .configure(|cfg| configure_routes(cfg)) - .wrap(middleware::Logger::default()) - .wrap(middleware::Compress::default()) + .wrap(actix_web::middleware::Logger::default()) + .wrap(actix_web::middleware::Compress::default()) .wrap( Cors::default() .allow_any_header() @@ -66,16 +69,15 @@ async fn authenticate( req: ServiceRequest, credentials: BasicAuth, ) -> Result { - let username = credentials.user_id().trim(); + let username = credentials.user_id().trim().to_owned(); let password = credentials.password().unwrap().trim(); - if let Some(user) = user_map().read().unwrap().get(username) { - if user.verify(password) { - return Ok(req); - } + if Users.authenticate(&username, password) { + req.extensions_mut().insert(username); + Ok(req) + } else { + Err((actix_web::error::ErrorUnauthorized("Unauthorized"), req)) } - - Err((actix_web::error::ErrorUnauthorized("Unauthorized"), req)) } pub async fn run_http(prometheus: PrometheusMetrics) -> anyhow::Result<()> { @@ -135,69 +137,117 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { .service( web::resource("") // PUT "/logstream/{logstream}" ==> Create log stream - .route(web::put().to(logstream::put_stream)) + .route( + web::put() + .to(logstream::put_stream) + .authorize_for_stream(Action::CreateStream), + ) // POST "/logstream/{logstream}" ==> Post logs to given log stream - .route(web::post().to(ingest::post_event)) + .route( + web::post() + .to(ingest::post_event) + .authorize_for_stream(Action::Ingest), + ) // DELETE "/logstream/{logstream}" ==> Delete log stream - .route(web::delete().to(logstream::delete)) + .route( + web::delete() + .to(logstream::delete) + .authorize_for_stream(Action::DeleteStream), + ) .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) .service( web::resource("/alert") // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream - .route(web::put().to(logstream::put_alert)) + .route( + web::put() + .to(logstream::put_alert) + .authorize_for_stream(Action::PutAlert), + ) // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream - .route(web::get().to(logstream::get_alert)), + .route( + web::get() + .to(logstream::get_alert) + .authorize_for_stream(Action::GetAlert), + ), ) .service( // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream - web::resource("/schema").route(web::get().to(logstream::schema)), + web::resource("/schema").route( + web::get() + .to(logstream::schema) + .authorize_for_stream(Action::GetSchema), + ), ) .service( // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream - web::resource("/stats").route(web::get().to(logstream::get_stats)), + web::resource("/stats").route( + web::get() + .to(logstream::get_stats) + .authorize_for_stream(Action::GetStats), + ), ) .service( web::resource("/retention") // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream - .route(web::put().to(logstream::put_retention)) + .route( + web::put() + .to(logstream::put_retention) + .authorize_for_stream(Action::PutRetention), + ) // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream - .route(web::get().to(logstream::get_retention)), + .route( + web::get() + .to(logstream::get_retention) + .authorize_for_stream(Action::GetRetention), + ), ); // User API - let user_api = web::scope("/user").service( - web::resource("/{username}") - // POST /user/{username} => Create a new user - .route(web::put().to(rbac::put_user)) - // DELETE /user/{username} => Delete a user - .route(web::delete().to(rbac::delete_user)) - .wrap_fn(|req, srv| { - // The credentials set in the env vars (P_USERNAME & P_PASSWORD) are treated - // as root credentials. Any other user is not allowed to modify or delete - // the root user. Deny request if username is same as username - // from env variable P_USERNAME. - let username = req.match_info().get("username").unwrap_or(""); - let is_root = username == CONFIG.parseable.username; - let call = srv.call(req); - async move { - if is_root { - return Err(ErrorBadRequest("Cannot call this API for root admin user")); - } - call.await - } - }), - ); + let user_api = web::scope("/user") + .service( + web::resource("") + // GET /user => List all users + .route(web::get().to(rbac::list_users).authorize(Action::ListUser)), + ) + .service( + web::resource("/{username}") + // PUT /user/{username} => Create a new user + .route(web::put().to(rbac::put_user).authorize(Action::PutUser)) + // DELETE /user/{username} => Delete a user + .route( + web::delete() + .to(rbac::delete_user) + .authorize(Action::DeleteUser), + ), + ) + .service( + web::resource("/{username}/role") + // PUT /user/{username}/roles => Put roles for user + .route(web::put().to(rbac::put_role).authorize(Action::PutRoles)), + ) + // Deny request if username is same as the env variable P_USERNAME. + .wrap(DisAllowRootUser); cfg.service( // Base path "{url}/api/v1" web::scope(&base_path()) // POST "/query" ==> Get results of the SQL query passed in request body - .service(web::resource("/query").route(web::post().to(query::query))) + .service( + web::resource("/query").route( + web::post() + .to(query::query) + .authorize_for_stream(Action::Query), + ), + ) // POST "/ingest" ==> Post logs to given log stream based on header .service( web::resource("/ingest") - .route(web::post().to(ingest::ingest)) + .route( + web::post() + .to(ingest::ingest) + .authorize_for_stream(Action::Ingest), + ) .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) // GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command @@ -208,7 +258,8 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { web::scope("/logstream") .service( // GET "/logstream" ==> Get list of all Log Streams on the server - web::resource("").route(web::get().to(logstream::list)), + web::resource("") + .route(web::get().to(logstream::list).authorize(Action::ListStream)), ) .service( // logstream API @@ -229,3 +280,24 @@ fn base_path() -> String { pub fn metrics_path() -> String { format!("{}/metrics", base_path()) } + +trait RouteExt { + fn authorize(self, action: Action) -> Self; + fn authorize_for_stream(self, action: Action) -> Self; +} + +impl RouteExt for Route { + fn authorize(self, action: Action) -> Self { + self.wrap(Authorization { + action, + stream: false, + }) + } + + fn authorize_for_stream(self, action: Action) -> Self { + self.wrap(Authorization { + action, + stream: true, + }) + } +} diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 8ef22e734..6edd9efe8 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -104,7 +104,7 @@ fn into_event_batch( #[derive(Debug, thiserror::Error)] pub enum PostError { - #[error("{0}")] + #[error("Stream {0} not found")] StreamNotFound(String), #[error("Could not deserialize into JSON object, {0}")] SerdeError(#[from] serde_json::Error), diff --git a/server/src/handlers/http/middleware.rs b/server/src/handlers/http/middleware.rs new file mode 100644 index 000000000..7c4194ff0 --- /dev/null +++ b/server/src/handlers/http/middleware.rs @@ -0,0 +1,153 @@ +/* +* Parseable Server (C) 2022 - 2023 Parseable, Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +* +* +*/ + +use std::future::{ready, Ready}; + +use actix_web::{ + dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}, + error::{ErrorBadRequest, ErrorUnauthorized}, + Error, HttpMessage, +}; +use futures_util::future::LocalBoxFuture; + +use crate::{ + option::CONFIG, + rbac::{role::Action, Users}, +}; + +pub struct Authorization { + pub action: Action, + pub stream: bool, +} + +impl Transform for Authorization +where + S: Service, Error = Error>, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse; + type Error = Error; + type InitError = (); + type Transform = AuthorizationMiddleware; + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ready(Ok(AuthorizationMiddleware { + action: self.action, + match_stream: self.stream, + service, + })) + } +} + +pub struct AuthorizationMiddleware { + action: Action, + match_stream: bool, + service: S, +} + +impl Service for AuthorizationMiddleware +where + S: Service, Error = Error>, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse; + type Error = Error; + type Future = LocalBoxFuture<'static, Result>; + + forward_ready!(service); + + fn call(&self, req: ServiceRequest) -> Self::Future { + let stream = if self.match_stream { + req.match_info().get("logstream") + } else { + None + }; + let extensions = req.extensions(); + let username = extensions + .get::() + .expect("authentication layer verified username"); + let is_auth = Users.check_permission(username, self.action, stream); + drop(extensions); + + let fut = self.service.call(req); + + Box::pin(async move { + if !is_auth { + return Err(ErrorUnauthorized("Not authorized")); + } + fut.await + }) + } +} + +// The credentials set in the env vars (P_USERNAME & P_PASSWORD) are treated +// as root credentials. Any other user is not allowed to modify or delete +// the root user. Deny request if username is same as username +// from env variable P_USERNAME. +pub struct DisAllowRootUser; + +impl Transform for DisAllowRootUser +where + S: Service, Error = Error>, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse; + type Error = Error; + type InitError = (); + type Transform = DisallowRootUserMiddleware; + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ready(Ok(DisallowRootUserMiddleware { service })) + } +} + +pub struct DisallowRootUserMiddleware { + service: S, +} + +impl Service for DisallowRootUserMiddleware +where + S: Service, Error = Error>, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse; + type Error = Error; + type Future = LocalBoxFuture<'static, Result>; + + forward_ready!(service); + + fn call(&self, req: ServiceRequest) -> Self::Future { + let username = req.match_info().get("username").unwrap_or(""); + let is_root = username == CONFIG.parseable.username; + let fut = self.service.call(req); + + Box::pin(async move { + if is_root { + return Err(ErrorBadRequest("Cannot call this API for root admin user")); + } + fut.await + }) + } +} diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs index 3d1bd71a1..0d8eb4ed0 100644 --- a/server/src/handlers/http/rbac.rs +++ b/server/src/handlers/http/rbac.rs @@ -19,8 +19,9 @@ use crate::{ option::CONFIG, rbac::{ + role::model::DefaultPrivilege, user::{PassCode, User}, - user_map, + Users, }, storage::{self, ObjectStorageError, StorageMetadata}, validator::{self, error::UsernameValidationError}, @@ -32,6 +33,12 @@ use tokio::sync::Mutex; // async aware lock for updating storage metadata and user map atomicically static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); +// Handler for GET /api/v1/user +// returns list of all registerd users +pub async fn list_users() -> impl Responder { + web::Json(Users.list_users()) +} + // Handler for PUT /api/v1/user/{username} // Creates a new user by username if it does not exists // Otherwise make a call to reset password @@ -40,8 +47,7 @@ pub async fn put_user(username: web::Path) -> Result) -> Result) -> Result) -> Result Result { put_metadata(&metadata).await?; // update in mem table - user_map() - .write() - .unwrap() - .get_mut(&username) - .expect("checked that user exists in map") - .password_hash = hash; - + Users.change_password_hash(&username, &hash); Ok(password) } +// Put roles for given user +pub async fn put_role( + username: web::Path, + role: web::Json, +) -> Result { + let username = username.into_inner(); + let role = role.into_inner(); + let role: Vec = serde_json::from_value(role)?; + + let permissions; + if !Users.contains(&username) { + return Err(RBACError::UserDoesNotExist); + }; + // update parseable.json first + let mut metadata = get_metadata().await?; + if let Some(user) = metadata + .users + .iter_mut() + .find(|user| user.username == username) + { + user.role = role; + permissions = user.permissions() + } else { + // should be unreachable given state is always consistent + return Err(RBACError::UserDoesNotExist); + } + + put_metadata(&metadata).await?; + // update in mem table + Users.put_permissions(&username, &permissions); + Ok(format!("Roles updated successfully for {}", username)) +} + async fn get_metadata() -> Result { let metadata = CONFIG .storage() @@ -129,6 +162,8 @@ pub enum RBACError { UserExists, #[error("User does not exist")] UserDoesNotExist, + #[error("{0}")] + SerdeError(#[from] serde_json::Error), #[error("Failed to connect to storage: {0}")] ObjectStorageError(#[from] ObjectStorageError), #[error("invalid Username: {0}")] @@ -140,6 +175,7 @@ impl actix_web::ResponseError for RBACError { match self { Self::UserExists => StatusCode::BAD_REQUEST, Self::UserDoesNotExist => StatusCode::NOT_FOUND, + Self::SerdeError(_) => StatusCode::BAD_REQUEST, Self::ValidationError(_) => StatusCode::BAD_REQUEST, Self::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, } diff --git a/server/src/rbac.rs b/server/src/rbac.rs index d53be66ec..d2fb21293 100644 --- a/server/src/rbac.rs +++ b/server/src/rbac.rs @@ -20,21 +20,132 @@ use std::sync::RwLock; use once_cell::sync::OnceCell; -use self::user::{get_admin_user, User, UserMap}; +use self::{ + role::{Action, Permission}, + user::{get_admin_user, verify, User, UserMap, UserPermMap}, +}; +pub mod role; pub mod user; -pub static USERS: OnceCell> = OnceCell::new(); +pub static USER_AUTHENTICATION_MAP: OnceCell> = OnceCell::new(); +pub static USER_AUTHORIZATION_MAP: OnceCell> = OnceCell::new(); -pub fn user_map() -> &'static RwLock { - USERS.get().expect("user map is set") +pub struct Users; + +impl Users { + pub fn put_user(&self, user: User) { + USER_AUTHORIZATION_MAP + .get() + .expect("map is set") + .write() + .unwrap() + .insert(&user); + + USER_AUTHENTICATION_MAP + .get() + .expect("map is set") + .write() + .unwrap() + .insert(user); + } + + pub fn list_users(&self) -> Vec { + USER_AUTHORIZATION_MAP + .get() + .expect("map is set") + .read() + .unwrap() + .keys() + .cloned() + .collect() + } + + pub fn delete_user(&self, username: &str) { + USER_AUTHORIZATION_MAP + .get() + .expect("map is set") + .write() + .unwrap() + .remove(username); + + USER_AUTHENTICATION_MAP + .get() + .expect("map is set") + .write() + .unwrap() + .remove(username); + } + + pub fn change_password_hash(&self, username: &str, hash: &String) { + if let Some(entry) = USER_AUTHENTICATION_MAP + .get() + .expect("map is set") + .write() + .unwrap() + .get_mut(username) + { + entry.clone_from(hash) + }; + } + + pub fn put_permissions(&self, username: &str, roles: &Vec) { + if let Some(entry) = USER_AUTHORIZATION_MAP + .get() + .expect("map is set") + .write() + .unwrap() + .get_mut(username) + { + entry.clone_from(roles) + }; + } + + pub fn contains(&self, username: &str) -> bool { + USER_AUTHENTICATION_MAP + .get() + .expect("map is set") + .read() + .unwrap() + .contains_key(username) + } + + pub fn authenticate(&self, username: &str, password: &str) -> bool { + if let Some(hash) = USER_AUTHENTICATION_MAP + .get() + .expect("map is set") + .read() + .unwrap() + .get(username) + { + verify(hash, password) + } else { + false + } + } + + pub fn check_permission(&self, username: &str, action: Action, stream: Option<&str>) -> bool { + USER_AUTHORIZATION_MAP + .get() + .expect("map is set") + .read() + .unwrap() + .has_perm(username, action, stream) + } } pub fn set_user_map(users: Vec) { - let mut map = UserMap::default(); - for user in users { - map.insert(user) - } - map.insert(get_admin_user()); - USERS.set(RwLock::new(map)).expect("map is only set once") + let mut perm_map = UserPermMap::from(&users); + let mut user_map = UserMap::from(users); + let admin = get_admin_user(); + perm_map.insert(&admin); + user_map.insert(admin); + + USER_AUTHENTICATION_MAP + .set(RwLock::new(user_map)) + .expect("map is only set once"); + + USER_AUTHORIZATION_MAP + .set(RwLock::new(perm_map)) + .expect("map is only set once"); } diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs new file mode 100644 index 000000000..45e8a21c4 --- /dev/null +++ b/server/src/rbac/role.rs @@ -0,0 +1,180 @@ +/* +* Parseable Server (C) 2022 - 2023 Parseable, Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +* +* +*/ + +// Represents actions that corresponds to an api +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub enum Action { + Ingest, + Query, + CreateStream, + ListStream, + GetSchema, + GetStats, + DeleteStream, + GetRetention, + PutRetention, + PutAlert, + GetAlert, + PutUser, + ListUser, + DeleteUser, + PutRoles, + All, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum Permission { + Unit(Action), + Stream(Action, String), +} + +// Currently Roles are tied to one stream +#[derive(Debug, Default)] +pub struct RoleBuilder { + actions: Vec, + stream: Option, + tag: Option, +} + +// R x P +impl RoleBuilder { + pub fn with_stream(mut self, stream: String) -> Self { + self.stream = Some(stream); + self + } + + pub fn with_tag(mut self, tag: String) -> Self { + self.tag = Some(tag); + self + } + + pub fn build(self) -> Vec { + let mut perms = Vec::new(); + for action in self.actions { + let perm = match action { + Action::Ingest => Permission::Stream(action, self.stream.clone().unwrap()), + Action::Query => Permission::Stream(action, self.stream.clone().unwrap()), + Action::CreateStream => Permission::Unit(action), + Action::ListStream => Permission::Unit(action), + Action::GetSchema => Permission::Stream(action, self.stream.clone().unwrap()), + Action::GetStats => Permission::Stream(action, self.stream.clone().unwrap()), + Action::DeleteStream => Permission::Stream(action, self.stream.clone().unwrap()), + Action::GetRetention => Permission::Stream(action, self.stream.clone().unwrap()), + Action::PutRetention => Permission::Stream(action, self.stream.clone().unwrap()), + Action::PutAlert => Permission::Stream(action, self.stream.clone().unwrap()), + Action::GetAlert => Permission::Stream(action, self.stream.clone().unwrap()), + Action::PutUser => Permission::Unit(action), + Action::ListUser => Permission::Unit(action), + Action::PutRoles => Permission::Unit(action), + Action::DeleteUser => Permission::Unit(action), + Action::All => Permission::Stream(action, self.stream.clone().unwrap()), + }; + perms.push(perm); + } + perms + } +} + +// use facing model for /user/roles +// we can put same model in the backend +// user -> Vec +pub mod model { + use super::{Action, RoleBuilder}; + + #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] + #[serde(tag = "privilege", content = "resource", rename_all = "lowercase")] + pub enum DefaultPrivilege { + Admin, + Editor, + Writer { stream: String }, + Reader { stream: String, tag: String }, + } + + impl From<&DefaultPrivilege> for RoleBuilder { + fn from(value: &DefaultPrivilege) -> Self { + match value { + DefaultPrivilege::Admin => admin_perm_builder(), + DefaultPrivilege::Editor => editor_perm_builder(), + DefaultPrivilege::Writer { stream } => { + writer_perm_builder().with_stream(stream.to_owned()) + } + DefaultPrivilege::Reader { stream, tag } => reader_perm_builder() + .with_stream(stream.to_owned()) + .with_tag(tag.to_owned()), + } + } + } + + fn admin_perm_builder() -> RoleBuilder { + RoleBuilder { + actions: vec![Action::All], + stream: Some("*".to_string()), + tag: None, + } + } + + fn editor_perm_builder() -> RoleBuilder { + RoleBuilder { + actions: vec![ + Action::Ingest, + Action::Query, + Action::CreateStream, + Action::ListStream, + Action::GetSchema, + Action::GetStats, + Action::GetRetention, + Action::PutRetention, + Action::PutAlert, + Action::GetAlert, + ], + stream: Some("*".to_string()), + tag: None, + } + } + + fn writer_perm_builder() -> RoleBuilder { + RoleBuilder { + actions: vec![ + Action::Ingest, + Action::Query, + Action::GetSchema, + Action::GetStats, + Action::GetRetention, + Action::PutAlert, + Action::GetAlert, + ], + stream: None, + tag: None, + } + } + + fn reader_perm_builder() -> RoleBuilder { + RoleBuilder { + actions: vec![ + Action::Query, + Action::GetSchema, + Action::GetStats, + Action::GetRetention, + Action::GetAlert, + ], + stream: None, + tag: None, + } + } +} diff --git a/server/src/rbac/user.rs b/server/src/rbac/user.rs index 26e70b4f3..199e26916 100644 --- a/server/src/rbac/user.rs +++ b/server/src/rbac/user.rs @@ -16,7 +16,7 @@ * */ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use argon2::{ password_hash::{rand_core::OsRng, PasswordHasher, SaltString}, @@ -27,12 +27,13 @@ use rand::distributions::{Alphanumeric, DistString}; use crate::option::CONFIG; +use super::role::{model::DefaultPrivilege, Action, Permission, RoleBuilder}; + #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct User { pub username: String, pub password_hash: String, - // fill this - pub roles: Vec<()>, + pub role: Vec, } impl User { @@ -43,28 +44,37 @@ impl User { Self { username, password_hash: hash, - roles: Vec::new(), + role: Vec::new(), }, password, ) } - // Take the password and compare with the hash stored internally (PHC format ==> - // $[$v=][$=(,=)*][$[$]]) - // ref https://github.com/P-H-C/phc-string-format/blob/master/phc-sf-spec.md#specification - pub fn verify(&self, password: &str) -> bool { - let parsed_hash = PasswordHash::new(&self.password_hash).unwrap(); - Argon2::default() - .verify_password(password.as_bytes(), &parsed_hash) - .is_ok() - } - // generate a new password pub fn gen_new_password() -> PassCode { let password = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); let hash = gen_hash(&password); PassCode { password, hash } } + + pub fn permissions(&self) -> Vec { + let perms: HashSet = self + .role + .iter() + .flat_map(|role| RoleBuilder::from(role).build()) + .collect(); + perms.into_iter().collect() + } +} + +// Take the password and compare with the hash stored internally (PHC format ==> +// $[$v=][$=(,=)*][$[$]]) +// ref https://github.com/P-H-C/phc-string-format/blob/master/phc-sf-spec.md#specification +pub fn verify(password_hash: &str, password: &str) -> bool { + let parsed_hash = PasswordHash::new(password_hash).unwrap(); + Argon2::default() + .verify_password(password.as_bytes(), &parsed_hash) + .is_ok() } // generate a one way hash for password to be stored in metadata file @@ -81,11 +91,21 @@ fn gen_hash(password: &str) -> String { } #[derive(Debug, Default, derive_more::Deref, derive_more::DerefMut)] -pub struct UserMap(HashMap); +pub struct UserMap(HashMap); impl UserMap { pub fn insert(&mut self, user: User) { - self.0.insert(user.username.clone(), user); + self.0.insert(user.username, user.password_hash); + } +} + +impl From> for UserMap { + fn from(users: Vec) -> Self { + let mut user_map = Self::default(); + for user in users { + user_map.insert(user); + } + user_map } } @@ -102,6 +122,53 @@ pub fn get_admin_user() -> User { User { username, password_hash: hashcode, - roles: Vec::new(), + role: vec![DefaultPrivilege::Admin], + } +} + +#[derive(Debug, Default, Clone, derive_more::Deref, derive_more::DerefMut)] +pub struct UserPermMap(HashMap>); + +impl UserPermMap { + pub fn insert(&mut self, user: &User) { + self.0.insert(user.username.clone(), user.permissions()); + } + + pub fn has_perm( + &self, + username: &str, + required_action: Action, + on_stream: Option<&str>, + ) -> bool { + if let Some(perms) = self.get(username) { + perms.iter().any(|user_perm| { + match *user_perm { + // if any action is ALL then we we authorize + Permission::Unit(action) => action == required_action || action == Action::All, + Permission::Stream(action, ref stream) => { + let ok_stream = if let Some(on_stream) = on_stream { + stream == on_stream || stream == "*" + } else { + // if no stream to match then stream check is not needed + true + }; + (action == required_action || action == Action::All) && ok_stream + } + } + }) + } else { + // NO permission set for this user + false + } + } +} + +impl From<&Vec> for UserPermMap { + fn from(users: &Vec) -> Self { + let mut map = Self::default(); + for user in users { + map.insert(user) + } + map } }