diff --git a/Cargo.lock b/Cargo.lock index d66dbc4cc..6340b6b17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -341,6 +341,17 @@ dependencies = [ "backtrace", ] +[[package]] +name = "argon2" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95c2fcf79ad1932ac6269a738109997a83c227c09b75842ae564dc8ede6a861c" +dependencies = [ + "base64ct", + "blake2", + "password-hash", +] + [[package]] name = "arrayref" version = "0.3.6" @@ -955,6 +966,12 @@ dependencies = [ "simd-abstraction", ] +[[package]] +name = "base64ct" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" + [[package]] name = "bitflags" version = "1.3.2" @@ -2946,6 +2963,7 @@ dependencies = [ "actix-web-prometheus", "actix-web-static-files", "anyhow", + "argon2", "arrow-array", "arrow-ipc", "arrow-json", @@ -3007,6 +3025,17 @@ dependencies = [ "zip", ] +[[package]] +name = "password-hash" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "346f04948ba92c43e8469c1ee6736c7563d71012b17d40745260fe106aac2166" +dependencies = [ + "base64ct", + "rand_core", + "subtle", +] + [[package]] name = "paste" version = "1.0.11" diff --git a/server/Cargo.toml b/server/Cargo.toml index 53d11d046..8b821217f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -78,6 +78,7 @@ pyroscope = { version = "0.5.3", optional = true } pyroscope_pprofrs = { version = "0.2", optional = true } uptime_lib = "0.2.2" regex = "1.7.3" +argon2 = "0.5.0" [build-dependencies] static-files = "0.2" diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 982d80e08..ae8238684 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -20,7 +20,8 @@ use std::fs::File; use std::io::BufReader; use actix_cors::Cors; -use actix_web::dev::ServiceRequest; +use actix_web::dev::{Service, ServiceRequest}; +use actix_web::error::ErrorBadRequest; use actix_web::{middleware, web, App, HttpServer}; use actix_web_httpauth::extractors::basic::BasicAuth; use actix_web_httpauth::middleware::HttpAuthentication; @@ -30,11 +31,13 @@ use rustls::{Certificate, PrivateKey, ServerConfig}; use rustls_pemfile::{certs, pkcs8_private_keys}; use crate::option::CONFIG; +use crate::rbac::get_user_map; mod health_check; mod ingest; mod logstream; mod query; +mod rbac; include!(concat!(env!("OUT_DIR"), "/generated.rs")); @@ -63,10 +66,13 @@ async fn validator( req: ServiceRequest, credentials: BasicAuth, ) -> Result { - if credentials.user_id().trim() == CONFIG.parseable.username - && credentials.password().unwrap().trim() == CONFIG.parseable.password - { - return Ok(req); + let username = credentials.user_id().trim(); + let password = credentials.password().unwrap().trim(); + + if let Some(user) = get_user_map().read().unwrap().get(username) { + if user.verify(password) { + return Ok(req); + } } Err((actix_web::error::ErrorUnauthorized("Unauthorized"), req)) @@ -157,6 +163,25 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream .route(web::get().to(logstream::get_retention)), ); + let user_api = web::scope("/user").service( + web::resource("/{username}") + // POST /user/{username} => Create a new user + .route(web::put().to(rbac::put_user)) + // DELETE /user/{username} => Delete a user + .route(web::delete().to(rbac::delete_user)) + .wrap_fn(|req, srv| { + // deny request if username is same as username from config + let username = req.match_info().get("username").unwrap_or(""); + let is_root = username == CONFIG.parseable.username; + let call = srv.call(req); + async move { + if is_root { + return Err(ErrorBadRequest("Cannot call this API for root admin user")); + } + call.await + } + }), + ); cfg.service( // Base path "{url}/api/v1" @@ -184,6 +209,7 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { logstream_api, ), ) + .service(user_api) .wrap(HttpAuthentication::basic(validator)), ) // GET "/" ==> Serve the static frontend directory diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs new file mode 100644 index 000000000..c24f86fa3 --- /dev/null +++ b/server/src/handlers/http/rbac.rs @@ -0,0 +1,153 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use crate::{ + option::CONFIG, + rbac::{ + get_user_map, + user::{PassCode, User}, + }, + storage::{self, ObjectStorageError, StorageMetadata}, + validator::{self, error::UsernameValidationError}, +}; +use actix_web::{http::header::ContentType, web, Responder}; +use http::StatusCode; +use tokio::sync::Mutex; + +// async aware lock for updating storage metadata and user map atomicically +static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); + +// Handler for PUT /api/v1/user/{username} +// Creates a new user by username if it does not exists +// Otherwise make a call to reset password +// returns password generated for this user +pub async fn put_user(username: web::Path) -> Result { + let username = username.into_inner(); + validator::verify_username(&username)?; + let _ = UPDATE_LOCK.lock().await; + let user_exists = get_user_map().read().unwrap().contains_key(&username); + if user_exists { + reset_password(username).await + } else { + let mut metadata = get_metadata().await?; + if metadata.users.iter().any(|user| user.username == username) { + // should be unreachable given state is always consistent + return Err(RBACError::UserExists); + } + + let (user, password) = User::create_new(username); + metadata.users.push(user.clone()); + put_metadata(&metadata).await?; + // set this user to user map + get_user_map().write().unwrap().insert(user); + + Ok(password) + } +} + +// Handler for DELETE /api/v1/user/delete/{username} +pub async fn delete_user(username: web::Path) -> Result { + let username = username.into_inner(); + let _ = UPDATE_LOCK.lock().await; + // fail this request if the user does not exists + if !get_user_map().read().unwrap().contains_key(&username) { + return Err(RBACError::UserDoesNotExist); + }; + // delete from parseable.json first + let mut metadata = get_metadata().await?; + metadata.users.retain(|user| user.username != username); + put_metadata(&metadata).await?; + // update in mem table + get_user_map().write().unwrap().remove(&username); + Ok(format!("deleted user: {}", username)) +} + +// Reset password for given username +// returns new password generated for this user +pub async fn reset_password(username: String) -> Result { + // get new password for this user + let PassCode { password, hash } = User::gen_new_password(); + // update parseable.json first + let mut metadata = get_metadata().await?; + if let Some(user) = metadata + .users + .iter_mut() + .find(|user| user.username == username) + { + user.password_hash.clone_from(&hash); + } else { + // should be unreachable given state is always consistent + return Err(RBACError::UserDoesNotExist); + } + put_metadata(&metadata).await?; + + // update in mem table + get_user_map() + .write() + .unwrap() + .get_mut(&username) + .expect("checked that user exists in map") + .password_hash = hash; + + Ok(password) +} + +async fn get_metadata() -> Result { + let metadata = CONFIG + .storage() + .get_object_store() + .get_metadata() + .await? + .expect("metadata is initialized"); + Ok(metadata) +} + +async fn put_metadata(metadata: &StorageMetadata) -> Result<(), ObjectStorageError> { + storage::put_remote_metadata(metadata).await?; + storage::put_staging_metadata(metadata)?; + Ok(()) +} + +#[derive(Debug, thiserror::Error)] +pub enum RBACError { + #[error("User exists already")] + UserExists, + #[error("User does not exist")] + UserDoesNotExist, + #[error("Failed to connect to storage: {0}")] + ObjectStorageError(#[from] ObjectStorageError), + #[error("invalid Username: {0}")] + ValidationError(#[from] UsernameValidationError), +} + +impl actix_web::ResponseError for RBACError { + fn status_code(&self) -> http::StatusCode { + match self { + Self::UserExists => StatusCode::BAD_REQUEST, + Self::UserDoesNotExist => StatusCode::NOT_FOUND, + Self::ValidationError(_) => StatusCode::BAD_REQUEST, + Self::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 1c2f0321a..51ac95ac3 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -41,6 +41,7 @@ mod metrics; mod migration; mod option; mod query; +mod rbac; mod response; mod stats; mod storage; @@ -60,9 +61,11 @@ async fn main() -> anyhow::Result<()> { CONFIG.validate(); let storage = CONFIG.storage().get_object_store(); CONFIG.validate_staging()?; + migration::run_metadata_migration(&CONFIG).await?; let metadata = storage::resolve_parseable_metadata().await?; + banner::print(&CONFIG, &metadata).await; + rbac::set_user_map(metadata.users.clone()); metadata.set_global(); - banner::print(&CONFIG, storage::StorageMetadata::global()).await; let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); diff --git a/server/src/migration.rs b/server/src/migration.rs index 5bde8c889..649371836 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -17,14 +17,49 @@ * */ +mod metadata_migration; mod schema_migration; mod stream_metadata_migration; +use std::fs::OpenOptions; + use bytes::Bytes; use relative_path::RelativePathBuf; use serde::Serialize; -use crate::{option::Config, storage::ObjectStorage}; +use crate::{ + option::Config, + storage::{ObjectStorage, ObjectStorageError}, +}; + +pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> { + let object_store = config.storage().get_object_store(); + let storage_metadata = get_storage_metadata(&*object_store).await?; + let staging_metadata = get_staging_metadata(config)?; + + fn get_version(metadata: &serde_json::Value) -> Option<&str> { + metadata + .as_object() + .and_then(|meta| meta.get("version")) + .and_then(|version| version.as_str()) + } + + if let Some(storage_metadata) = storage_metadata { + if get_version(&storage_metadata) == Some("v1") { + let metadata = metadata_migration::v1_v2(storage_metadata); + put_remote_metadata(&*object_store, &metadata).await?; + } + } + + if let Some(staging_metadata) = staging_metadata { + if get_version(&staging_metadata) == Some("v1") { + let metadata = metadata_migration::v1_v2(staging_metadata); + put_staging_metadata(config, &metadata)?; + } + } + + Ok(()) +} pub async fn run_migration(config: &Config) -> anyhow::Result<()> { let storage = config.storage().get_object_store(); @@ -85,3 +120,54 @@ fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes { .map(|any| any.into()) .expect("serialize cannot fail") } + +pub fn get_staging_metadata(config: &Config) -> anyhow::Result> { + let path = config.staging_dir().join(".parseable.json"); + let bytes = match std::fs::read(path) { + Ok(bytes) => bytes, + Err(err) => match err.kind() { + std::io::ErrorKind::NotFound => return Ok(None), + _ => return Err(err.into()), + }, + }; + let meta: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + Ok(Some(meta)) +} + +async fn get_storage_metadata( + storage: &dyn ObjectStorage, +) -> anyhow::Result> { + let path = RelativePathBuf::from_iter([".parseable.json"]); + match storage.get_object(&path).await { + Ok(bytes) => Ok(Some( + serde_json::from_slice(&bytes).expect("parseable config is valid json"), + )), + Err(err) => { + if matches!(err, ObjectStorageError::NoSuchKey(_)) { + Ok(None) + } else { + Err(err.into()) + } + } + } +} + +pub async fn put_remote_metadata( + storage: &dyn ObjectStorage, + metadata: &serde_json::Value, +) -> anyhow::Result<()> { + let path = RelativePathBuf::from_iter([".parseable.json"]); + let metadata = serde_json::to_vec(metadata)?.into(); + Ok(storage.put_object(&path, metadata).await?) +} + +pub fn put_staging_metadata(config: &Config, metadata: &serde_json::Value) -> anyhow::Result<()> { + let path = config.staging_dir().join(".parseable.json"); + let mut file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(path)?; + serde_json::to_writer(&mut file, metadata)?; + Ok(()) +} diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs new file mode 100644 index 000000000..b949ab4d2 --- /dev/null +++ b/server/src/migration/metadata_migration.rs @@ -0,0 +1,31 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::vec; + +use serde_json::Value; + +pub fn v1_v2(mut storage_metadata: serde_json::Value) -> Value { + let metadata = storage_metadata.as_object_mut().unwrap(); + *metadata.get_mut("version").unwrap() = Value::String("v2".to_string()); + metadata.remove("user"); + metadata.remove("stream"); + metadata.insert("users".to_string(), Value::Array(vec![])); + metadata.insert("streams".to_string(), Value::Array(vec![])); + storage_metadata +} diff --git a/server/src/rbac.rs b/server/src/rbac.rs new file mode 100644 index 000000000..04b6d14a1 --- /dev/null +++ b/server/src/rbac.rs @@ -0,0 +1,40 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::sync::RwLock; + +use once_cell::sync::OnceCell; + +use self::user::{get_admin_user, User, UserMap}; + +pub mod user; + +pub static USERS: OnceCell> = OnceCell::new(); + +pub fn get_user_map() -> &'static RwLock { + USERS.get().expect("user map is set") +} + +pub fn set_user_map(users: Vec) { + let mut map = UserMap::default(); + for user in users { + map.insert(user) + } + map.insert(get_admin_user()); + USERS.set(RwLock::new(map)).expect("map is only set once") +} diff --git a/server/src/rbac/user.rs b/server/src/rbac/user.rs new file mode 100644 index 000000000..e98098ad0 --- /dev/null +++ b/server/src/rbac/user.rs @@ -0,0 +1,105 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::collections::HashMap; + +use argon2::{ + password_hash::{rand_core::OsRng, PasswordHasher, SaltString}, + Argon2, PasswordHash, PasswordVerifier, +}; + +use rand::distributions::{Alphanumeric, DistString}; + +use crate::option::CONFIG; + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct User { + pub username: String, + pub password_hash: String, + // fill this + pub roles: Vec<()>, +} + +impl User { + // create a new User and return self with password generated for said user. + pub fn create_new(username: String) -> (Self, String) { + let PassCode { password, hash } = Self::gen_new_password(); + ( + Self { + username, + password_hash: hash, + roles: Vec::new(), + }, + password, + ) + } + + // Verification works because the PasswordHash is in PHC format + // $[$v=][$=(,=)*][$[$]] + // ref https://github.com/P-H-C/phc-string-format/blob/master/phc-sf-spec.md#specification + pub fn verify(&self, password: &str) -> bool { + let parsed_hash = PasswordHash::new(&self.password_hash).unwrap(); + Argon2::default() + .verify_password(password.as_bytes(), &parsed_hash) + .is_ok() + } + + // gen new password + pub fn gen_new_password() -> PassCode { + let password = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); + let hash = gen_hash(&password); + PassCode { password, hash } + } +} + +fn gen_hash(password: &str) -> String { + let salt = SaltString::generate(&mut OsRng); + let argon2 = Argon2::default(); + let hashcode = argon2 + .hash_password(password.as_bytes(), &salt) + .expect("can hash random alphanumeric") + .to_string(); + + hashcode +} + +#[derive(Debug, Default, derive_more::Deref, derive_more::DerefMut)] +pub struct UserMap(HashMap); + +impl UserMap { + pub fn insert(&mut self, user: User) { + self.0.insert(user.username.clone(), user); + } +} + +pub struct PassCode { + pub password: String, + pub hash: String, +} + +pub fn get_admin_user() -> User { + let username = CONFIG.parseable.username.clone(); + let password = CONFIG.parseable.password.clone(); + let hashcode = gen_hash(&password); + + User { + username, + password_hash: hashcode, + roles: Vec::new(), + } +} diff --git a/server/src/storage.rs b/server/src/storage.rs index 46f80b979..561e3722a 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -16,13 +16,11 @@ * */ -use crate::option::CONFIG; use crate::stats::Stats; use chrono::Local; use std::fmt::Debug; -use std::fs::create_dir_all; mod localfs; mod object_storage; @@ -34,10 +32,11 @@ mod store_metadata; pub use localfs::{FSConfig, LocalFS}; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; pub use s3::{S3Config, S3}; -pub use store_metadata::StorageMetadata; +pub use store_metadata::{ + put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata, +}; pub use self::staging::StorageDir; -use self::store_metadata::{put_staging_metadata, EnvChange}; /// local sync interval to move data.records to /tmp dir of that stream. /// 60 sec is a reasonable value. @@ -123,55 +122,6 @@ impl ObjectStoreFormat { } } -pub async fn resolve_parseable_metadata() -> Result { - let staging_metadata = store_metadata::get_staging_metadata()?; - let storage = CONFIG.storage().get_object_store(); - let remote_metadata = storage.get_metadata().await?; - - let check = store_metadata::check_metadata_conflict(staging_metadata, remote_metadata); - - const MISMATCH: &str = "Could not start the server because metadata file found in staging directory does not match one in the storage"; - let res: Result = match check { - EnvChange::None(metadata) => Ok(metadata), - EnvChange::StagingMismatch => Err(MISMATCH), - EnvChange::StorageMismatch => Err(MISMATCH), - EnvChange::NewRemote => { - Err("Could not start the server because metadata not found in storage") - } - EnvChange::NewStaging(mut metadata) => { - create_dir_all(CONFIG.staging_dir())?; - metadata.staging = CONFIG.staging_dir().canonicalize()?; - create_remote_metadata(&metadata).await?; - put_staging_metadata(&metadata)?; - - Ok(metadata) - } - EnvChange::CreateBoth => { - create_dir_all(CONFIG.staging_dir())?; - let metadata = StorageMetadata::new(); - create_remote_metadata(&metadata).await?; - put_staging_metadata(&metadata)?; - - Ok(metadata) - } - }; - - res.map_err(|err| { - let err = format!( - "{}. {}", - err, - "Join us on Parseable Slack to report this incident : https://launchpass.com/parseable" - ); - let err: Box = err.into(); - ObjectStorageError::UnhandledError(err) - }) -} - -async fn create_remote_metadata(metadata: &StorageMetadata) -> Result<(), ObjectStorageError> { - let client = CONFIG.storage().get_object_store(); - client.put_metadata(metadata).await -} - #[derive(serde::Serialize)] pub struct LogStream { pub name: String, diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 498861407..562a4c209 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -17,19 +17,28 @@ */ use std::{ - fs::{self, OpenOptions}, + fs::{self, create_dir_all, OpenOptions}, path::PathBuf, }; use once_cell::sync::OnceCell; use std::io; -use crate::{option::CONFIG, utils::uid}; +use crate::{option::CONFIG, rbac::user::User, storage::ObjectStorageError, utils::uid}; use super::object_storage::PARSEABLE_METADATA_FILE_NAME; -pub static STORAGE_METADATA: OnceCell = OnceCell::new(); +// Expose some static variables for internal usage +pub static STORAGE_METADATA: OnceCell = OnceCell::new(); +// For use in global static +#[derive(Debug, PartialEq, Eq)] +pub struct StaticStorageMetadata { + pub mode: String, + pub deployment_id: uid::Uid, +} + +// Type for serialization and deserialization #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct StorageMetadata { pub version: String, @@ -38,69 +47,126 @@ pub struct StorageMetadata { pub storage: String, #[serde(default = "crate::utils::uid::gen")] pub deployment_id: uid::Uid, - pub user: Vec, - pub stream: Vec, -} - -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub struct User { - username: String, - password: String, - role: String, + pub users: Vec, + pub streams: Vec, } impl StorageMetadata { pub fn new() -> Self { Self { - version: "v1".to_string(), + version: "v2".to_string(), mode: CONFIG.storage_name.to_owned(), staging: CONFIG.staging_dir().canonicalize().unwrap(), storage: CONFIG.storage().get_endpoint(), deployment_id: uid::gen(), - user: Vec::new(), - stream: Vec::new(), + users: Vec::new(), + streams: Vec::new(), } } - pub fn global() -> &'static Self { + pub fn global() -> &'static StaticStorageMetadata { STORAGE_METADATA .get() .expect("gloabal static is initialized") } pub fn set_global(self) { - STORAGE_METADATA.set(self).expect("only set once") + let metadata = StaticStorageMetadata { + mode: self.mode, + deployment_id: self.deployment_id, + }; + + STORAGE_METADATA.set(metadata).expect("only set once") } } -pub fn check_metadata_conflict( - staging_metadata: Option, - remote_metadata: Option, -) -> EnvChange { - match (staging_metadata, remote_metadata) { - (Some(staging), Some(remote)) if staging.mode == remote.mode => { - if staging.storage != remote.storage { - EnvChange::StorageMismatch - } else if staging.staging != remote.staging { - EnvChange::StagingMismatch +// always returns remote metadata as it is source of truth +// overwrites staging metadata while updating storage info +pub async fn resolve_parseable_metadata() -> Result { + let staging_metadata = get_staging_metadata()?; + let storage = CONFIG.storage().get_object_store(); + let remote_metadata = storage.get_metadata().await?; + + let check = match (staging_metadata, remote_metadata) { + (Some(staging), Some(remote)) => { + if staging.deployment_id == remote.deployment_id { + EnvChange::None(remote) } else { - EnvChange::None(staging) + EnvChange::DeploymentMismatch } } - (Some(staging), Some(remote)) if staging.mode != remote.mode => EnvChange::StorageMismatch, - (None, None) => EnvChange::CreateBoth, (None, Some(remote)) => EnvChange::NewStaging(remote), (Some(_), None) => EnvChange::NewRemote, - _ => unreachable!(), + (None, None) => EnvChange::CreateBoth, + }; + + // flags for if metadata needs to be synced + let mut overwrite_staging = false; + let mut overwrite_remote = false; + + const MISMATCH: &str = "Could not start the server because metadata file found in staging directory does not match one in the storage"; + let res = match check { + EnvChange::None(metadata) => { + // overwrite staging anyways so that it matches remote in case of any divergence + overwrite_staging = true; + Ok(metadata) + } + EnvChange::DeploymentMismatch => Err(MISMATCH), + EnvChange::NewRemote => { + Err("Could not start the server because metadata not found in storage") + } + EnvChange::NewStaging(mut metadata) => { + create_dir_all(CONFIG.staging_dir())?; + metadata.staging = CONFIG.staging_dir().canonicalize()?; + // this flag is set to true so that metadata is copied to staging + overwrite_staging = true; + // overwrite remote because staging dir has changed. + overwrite_remote = true; + Ok(metadata) + } + EnvChange::CreateBoth => { + create_dir_all(CONFIG.staging_dir())?; + let metadata = StorageMetadata::new(); + // new metadata needs to be set on both staging and remote + overwrite_remote = true; + overwrite_staging = true; + Ok(metadata) + } + }; + + let metadata = res.map_err(|err| { + let err = format!( + "{}. {}", + err, + "Join us on Parseable Slack to report this incident : https://launchpass.com/parseable" + ); + let err: Box = err.into(); + ObjectStorageError::UnhandledError(err) + })?; + + if overwrite_staging { + put_staging_metadata(&metadata)?; } + + if overwrite_remote { + put_remote_metadata(&metadata).await?; + } + + Ok(metadata) } + +// variant contain remote metadata #[derive(Debug, Clone, PartialEq, Eq)] pub enum EnvChange { + /// No change in env i.e both staging and remote have same id None(StorageMetadata), - StagingMismatch, - StorageMismatch, + /// Mismatch in deployment id. Cannot use this staging for this remote + DeploymentMismatch, + /// Metadata not found in storage. Treated as possible misconfiguration on user side. NewRemote, + /// If a new staging is found then we just copy remote metadata to this staging. NewStaging(StorageMetadata), + /// Fresh remote and staging, hence create a new metadata file on both CreateBoth, } @@ -119,9 +185,18 @@ pub fn get_staging_metadata() -> io::Result> { Ok(Some(meta)) } +pub async fn put_remote_metadata(metadata: &StorageMetadata) -> Result<(), ObjectStorageError> { + let client = CONFIG.storage().get_object_store(); + client.put_metadata(metadata).await +} + pub fn put_staging_metadata(meta: &StorageMetadata) -> io::Result<()> { let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); - let mut file = OpenOptions::new().create_new(true).write(true).open(path)?; + let mut file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(path)?; serde_json::to_writer(&mut file, meta)?; Ok(()) } diff --git a/server/src/validator.rs b/server/src/validator.rs index 2b31fa53f..956facfa5 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -23,7 +23,9 @@ use crate::metadata::STREAM_INFO; use crate::query::Query; use chrono::{DateTime, Utc}; -use self::error::{AlertValidationError, QueryValidationError, StreamNameValidationError}; +use self::error::{ + AlertValidationError, QueryValidationError, StreamNameValidationError, UsernameValidationError, +}; // Add more sql keywords here in lower case const DENIED_NAMES: &[&str] = &[ @@ -112,6 +114,22 @@ pub fn stream_name(stream_name: &str) -> Result<(), StreamNameValidationError> { Ok(()) } +pub fn verify_username(username: &str) -> Result<(), UsernameValidationError> { + // Check if the username meets the required criteria + if username.len() < 3 || username.len() > 64 { + return Err(UsernameValidationError::InvalidLength); + } + // Username should contain only alphanumeric characters or underscores + if !username + .chars() + .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') + { + return Err(UsernameValidationError::SpecialChar); + } + + Ok(()) +} + pub fn query(query: &str, start_time: &str, end_time: &str) -> Result { if query.is_empty() { return Err(QueryValidationError::EmptyQuery); @@ -233,4 +251,14 @@ pub mod error { #[error("SQL keyword cannot be used as stream name")] SQLKeyword(String), } + + #[derive(Debug, thiserror::Error)] + pub enum UsernameValidationError { + #[error("Username length should be between 3 and 64 chars")] + InvalidLength, + #[error( + "Username contains invalid characters. Only lowercase aplhanumeric and _ is allowed" + )] + SpecialChar, + } }