diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 60a61a1ec..3232d54cb 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -367,64 +367,29 @@ pub async fn sync_role_update_with_ingestors( Ok(()) } -pub async fn fetch_daily_stats_from_ingestors( - stream_name: &str, +pub fn fetch_daily_stats_from_ingestors( date: &str, + stream_meta_list: &[ObjectStoreFormat], ) -> Result { - let mut total_events_ingested: u64 = 0; - let mut total_ingestion_size: u64 = 0; - let mut total_storage_size: u64 = 0; - - let ingestor_infos = get_ingestor_info().await.map_err(|err| { - error!("Fatal: failed to get ingestor info: {:?}", err); - StreamError::Anyhow(err) - })?; - for ingestor in ingestor_infos.iter() { - let uri = Url::parse(&format!( - "{}{}/metrics", - &ingestor.domain_name, - base_path_without_preceding_slash() - )) - .map_err(|err| { - StreamError::Anyhow(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err)) - })?; - - let res = HTTP_CLIENT - .get(uri) - .header(header::AUTHORIZATION, &ingestor.token) - .header(header::CONTENT_TYPE, "application/json") - .send() - .await; - - if let Ok(res) = res { - let text = res - .text() - .await - .map_err(|err| StreamError::Anyhow(anyhow::anyhow!("Request failed: {}", err)))?; - let lines: Vec> = - text.lines().map(|line| Ok(line.to_owned())).collect_vec(); - - let sample = prometheus_parse::Scrape::parse(lines.into_iter()) - .map_err(|err| { - StreamError::Anyhow(anyhow::anyhow!( - "Invalid URL in Ingestor Metadata: {}", - err - )) - })? - .samples; - - let (events_ingested, ingestion_size, storage_size) = - Metrics::get_daily_stats_from_samples(sample, stream_name, date); - total_events_ingested += events_ingested; - total_ingestion_size += ingestion_size; - total_storage_size += storage_size; + // for the given date, get the stats from the ingestors + let mut events_ingested = 0; + let mut ingestion_size = 0; + let mut storage_size = 0; + + for meta in stream_meta_list.iter() { + for manifest in meta.snapshot.manifest_list.iter() { + if manifest.time_lower_bound.date_naive().to_string() == date { + events_ingested += manifest.events_ingested; + ingestion_size += manifest.ingestion_size; + storage_size += manifest.storage_size; + } } } let stats = Stats { - events: total_events_ingested, - ingestion: total_ingestion_size, - storage: total_storage_size, + events: events_ingested, + ingestion: ingestion_size, + storage: storage_size, }; Ok(stats) } @@ -474,17 +439,17 @@ pub async fn fetch_stats_from_ingestors( Utc::now(), IngestionStats::new( count, - format!("{} Bytes", ingestion_size), + ingestion_size, lifetime_count, - format!("{} Bytes", lifetime_ingestion_size), + lifetime_ingestion_size, deleted_count, - format!("{} Bytes", deleted_ingestion_size), + deleted_ingestion_size, "json", ), StorageStats::new( - format!("{} Bytes", storage_size), - format!("{} Bytes", lifetime_storage_size), - format!("{} Bytes", deleted_storage_size), + storage_size, + lifetime_storage_size, + deleted_storage_size, "parquet", ), ); diff --git a/src/handlers/http/cluster/utils.rs b/src/handlers/http/cluster/utils.rs index b41582d70..b88e95bb1 100644 --- a/src/handlers/http/cluster/utils.rs +++ b/src/handlers/http/cluster/utils.rs @@ -19,7 +19,6 @@ use crate::{handlers::http::base_path_without_preceding_slash, HTTP_CLIENT}; use actix_web::http::header; use chrono::{DateTime, Utc}; -use itertools::Itertools; use serde::{Deserialize, Serialize}; use tracing::error; use url::Url; @@ -81,22 +80,22 @@ impl ClusterInfo { #[derive(Debug, Default, Serialize, Deserialize)] pub struct IngestionStats { pub count: u64, - pub size: String, + pub size: u64, pub format: String, pub lifetime_count: u64, - pub lifetime_size: String, + pub lifetime_size: u64, pub deleted_count: u64, - pub deleted_size: String, + pub deleted_size: u64, } impl IngestionStats { pub fn new( count: u64, - size: String, + size: u64, lifetime_count: u64, - lifetime_size: String, + lifetime_size: u64, deleted_count: u64, - deleted_size: String, + deleted_size: u64, format: &str, ) -> Self { Self { @@ -113,14 +112,14 @@ impl IngestionStats { #[derive(Debug, Default, Serialize, Deserialize)] pub struct StorageStats { - pub size: String, + pub size: u64, pub format: String, - pub lifetime_size: String, - pub deleted_size: String, + pub lifetime_size: u64, + pub deleted_size: u64, } impl StorageStats { - pub fn new(size: String, lifetime_size: String, deleted_size: String, format: &str) -> Self { + pub fn new(size: u64, lifetime_size: u64, deleted_size: u64, format: &str) -> Self { Self { size, format: format.to_string(), @@ -143,36 +142,12 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { .fold(IngestionStats::default(), |acc, x| IngestionStats { count: acc.count + x.count, - size: format!( - "{} Bytes", - acc.size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - + x.size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - ), + size: acc.size + x.size, format: x.format.clone(), lifetime_count: acc.lifetime_count + x.lifetime_count, - lifetime_size: format!( - "{} Bytes", - acc.lifetime_size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - + x.lifetime_size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - ), + lifetime_size: acc.lifetime_size + x.lifetime_size, deleted_count: acc.deleted_count + x.deleted_count, - deleted_size: format!( - "{} Bytes", - acc.deleted_size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - + x.deleted_size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - ), + deleted_size: acc.deleted_size + x.deleted_size, }); let cumulative_storage = @@ -180,34 +155,10 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { .iter() .map(|x| &x.storage) .fold(StorageStats::default(), |acc, x| StorageStats { - size: format!( - "{} Bytes", - acc.size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - + x.size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - ), + size: acc.size + x.size, format: x.format.clone(), - lifetime_size: format!( - "{} Bytes", - acc.lifetime_size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - + x.lifetime_size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - ), - deleted_size: format!( - "{} Bytes", - acc.deleted_size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - + x.deleted_size.split(' ').collect_vec()[0] - .parse::() - .unwrap_or_default() - ), + lifetime_size: acc.lifetime_size + x.lifetime_size, + deleted_size: acc.deleted_size + x.deleted_size, }); QueriedStats::new( diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 107c706ed..e6b9dff10 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -262,17 +262,17 @@ pub async fn get_stats( let stats = { let ingestion_stats = IngestionStats::new( stats.current_stats.events, - format!("{} Bytes", stats.current_stats.ingestion), + stats.current_stats.ingestion, stats.lifetime_stats.events, - format!("{} Bytes", stats.lifetime_stats.ingestion), + stats.lifetime_stats.ingestion, stats.deleted_stats.events, - format!("{} Bytes", stats.deleted_stats.ingestion), + stats.deleted_stats.ingestion, "json", ); let storage_stats = StorageStats::new( - format!("{} Bytes", stats.current_stats.storage), - format!("{} Bytes", stats.lifetime_stats.storage), - format!("{} Bytes", stats.deleted_stats.storage), + stats.current_stats.storage, + stats.lifetime_stats.storage, + stats.deleted_stats.storage, "parquet", ); diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 740855bb7..76d282f76 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -26,6 +26,7 @@ use actix_web::{ use bytes::Bytes; use chrono::Utc; use http::StatusCode; +use relative_path::RelativePathBuf; use tokio::sync::Mutex; use tracing::{error, warn}; @@ -44,7 +45,7 @@ use crate::{ hottier::HotTierManager, parseable::{StreamNotFound, PARSEABLE}, stats::{self, Stats}, - storage::StreamType, + storage::{ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY}, }; pub async fn delete(stream_name: Path) -> Result { @@ -151,7 +152,35 @@ pub async fn get_stats( if !date_value.is_empty() { let querier_stats = get_stats_date(&stream_name, date_value).await?; - let ingestor_stats = fetch_daily_stats_from_ingestors(&stream_name, date_value).await?; + + // this function requires all the ingestor stream jsons + let path = RelativePathBuf::from_iter([&stream_name, STREAM_ROOT_DIRECTORY]); + let obs = PARSEABLE + .storage + .get_object_store() + .get_objects( + Some(&path), + Box::new(|file_name| { + file_name.starts_with(".ingestor") && file_name.ends_with("stream.json") + }), + ) + .await?; + + let mut ingestor_stream_jsons = Vec::new(); + for ob in obs { + let stream_metadata: ObjectStoreFormat = match serde_json::from_slice(&ob) { + Ok(d) => d, + Err(e) => { + error!("Failed to parse stream metadata: {:?}", e); + continue; + } + }; + ingestor_stream_jsons.push(stream_metadata); + } + + let ingestor_stats = + fetch_daily_stats_from_ingestors(date_value, &ingestor_stream_jsons)?; + let total_stats = Stats { events: querier_stats.events + ingestor_stats.events, ingestion: querier_stats.ingestion + ingestor_stats.ingestion, @@ -180,17 +209,17 @@ pub async fn get_stats( let stats = { let ingestion_stats = IngestionStats::new( stats.current_stats.events, - format!("{} Bytes", stats.current_stats.ingestion), + stats.current_stats.ingestion, stats.lifetime_stats.events, - format!("{} Bytes", stats.lifetime_stats.ingestion), + stats.lifetime_stats.ingestion, stats.deleted_stats.events, - format!("{} Bytes", stats.deleted_stats.ingestion), + stats.deleted_stats.ingestion, "json", ); let storage_stats = StorageStats::new( - format!("{} Bytes", stats.current_stats.storage), - format!("{} Bytes", stats.lifetime_stats.storage), - format!("{} Bytes", stats.deleted_stats.storage), + stats.current_stats.storage, + stats.lifetime_stats.storage, + stats.deleted_stats.storage, "parquet", ); diff --git a/src/handlers/http/users/dashboards.rs b/src/handlers/http/users/dashboards.rs index 32619f0f0..16a885969 100644 --- a/src/handlers/http/users/dashboards.rs +++ b/src/handlers/http/users/dashboards.rs @@ -21,7 +21,7 @@ use crate::{ parseable::PARSEABLE, storage::{object_storage::dashboard_path, ObjectStorageError}, users::dashboards::{Dashboard, CURRENT_DASHBOARD_VERSION, DASHBOARDS}, - utils::{get_hash, get_user_from_request}, + utils::{actix::extract_session_key_from_req, get_hash, get_user_from_request}, }; use actix_web::{ http::header::ContentType, @@ -36,8 +36,9 @@ use http::StatusCode; use serde_json::Error as SerdeError; pub async fn list(req: HttpRequest) -> Result { - let user_id = get_user_from_request(&req)?; - let dashboards = DASHBOARDS.list_dashboards_by_user(&get_hash(&user_id)); + let key = + extract_session_key_from_req(&req).map_err(|e| DashboardError::Custom(e.to_string()))?; + let dashboards = DASHBOARDS.list_dashboards(&key).await; Ok((web::Json(dashboards), StatusCode::OK)) } @@ -49,7 +50,10 @@ pub async fn get( let user_id = get_user_from_request(&req)?; let dashboard_id = dashboard_id.into_inner(); - if let Some(dashboard) = DASHBOARDS.get_dashboard(&dashboard_id, &get_hash(&user_id)) { + if let Some(dashboard) = DASHBOARDS + .get_dashboard(&dashboard_id, &get_hash(&user_id)) + .await + { return Ok((web::Json(dashboard), StatusCode::OK)); } @@ -77,7 +81,7 @@ pub async fn post( .as_str(), )); } - DASHBOARDS.update(&dashboard); + DASHBOARDS.update(&dashboard).await; let path = dashboard_path(&user_id, &format!("{}.json", dashboard_id)); @@ -99,7 +103,11 @@ pub async fn update( user_id = get_hash(&user_id); let dashboard_id = dashboard_id.into_inner(); - if DASHBOARDS.get_dashboard(&dashboard_id, &user_id).is_none() { + if DASHBOARDS + .get_dashboard(&dashboard_id, &user_id) + .await + .is_none() + { return Err(DashboardError::Metadata("Dashboard does not exist")); } dashboard.dashboard_id = Some(dashboard_id.to_string()); @@ -110,7 +118,7 @@ pub async fn update( tile.tile_id = Some(get_hash(Utc::now().timestamp_micros().to_string().as_str())); } } - DASHBOARDS.update(&dashboard); + DASHBOARDS.update(&dashboard).await; let path = dashboard_path(&user_id, &format!("{}.json", dashboard_id)); @@ -130,14 +138,18 @@ pub async fn delete( let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); let dashboard_id = dashboard_id.into_inner(); - if DASHBOARDS.get_dashboard(&dashboard_id, &user_id).is_none() { + if DASHBOARDS + .get_dashboard(&dashboard_id, &user_id) + .await + .is_none() + { return Err(DashboardError::Metadata("Dashboard does not exist")); } let path = dashboard_path(&user_id, &format!("{}.json", dashboard_id)); let store = PARSEABLE.storage.get_object_store(); store.delete_object(&path).await?; - DASHBOARDS.delete_dashboard(&dashboard_id); + DASHBOARDS.delete_dashboard(&dashboard_id).await; Ok(HttpResponse::Ok().finish()) } @@ -152,6 +164,8 @@ pub enum DashboardError { Metadata(&'static str), #[error("User does not exist")] UserDoesNotExist(#[from] RBACError), + #[error("Error: {0}")] + Custom(String), } impl actix_web::ResponseError for DashboardError { @@ -161,6 +175,7 @@ impl actix_web::ResponseError for DashboardError { Self::Serde(_) => StatusCode::BAD_REQUEST, Self::Metadata(_) => StatusCode::BAD_REQUEST, Self::UserDoesNotExist(_) => StatusCode::NOT_FOUND, + Self::Custom(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/src/handlers/http/users/filters.rs b/src/handlers/http/users/filters.rs index 7ea3c8a09..378e80df3 100644 --- a/src/handlers/http/users/filters.rs +++ b/src/handlers/http/users/filters.rs @@ -21,7 +21,7 @@ use crate::{ parseable::PARSEABLE, storage::{object_storage::filter_path, ObjectStorageError}, users::filters::{Filter, CURRENT_FILTER_VERSION, FILTERS}, - utils::{get_hash, get_user_from_request}, + utils::{actix::extract_session_key_from_req, get_hash, get_user_from_request}, }; use actix_web::{ http::header::ContentType, @@ -34,8 +34,9 @@ use http::StatusCode; use serde_json::Error as SerdeError; pub async fn list(req: HttpRequest) -> Result { - let user_id = get_user_from_request(&req)?; - let filters = FILTERS.list_filters_by_user(&get_hash(&user_id)); + let key = + extract_session_key_from_req(&req).map_err(|e| FiltersError::Custom(e.to_string()))?; + let filters = FILTERS.list_filters(&key).await; Ok((web::Json(filters), StatusCode::OK)) } @@ -46,7 +47,7 @@ pub async fn get( let user_id = get_user_from_request(&req)?; let filter_id = filter_id.into_inner(); - if let Some(filter) = FILTERS.get_filter(&filter_id, &get_hash(&user_id)) { + if let Some(filter) = FILTERS.get_filter(&filter_id, &get_hash(&user_id)).await { return Ok((web::Json(filter), StatusCode::OK)); } @@ -63,7 +64,7 @@ pub async fn post( filter.filter_id = Some(filter_id.clone()); filter.user_id = Some(user_id.clone()); filter.version = Some(CURRENT_FILTER_VERSION.to_string()); - FILTERS.update(&filter); + FILTERS.update(&filter).await; let path = filter_path( &user_id, @@ -86,13 +87,13 @@ pub async fn update( let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); let filter_id = filter_id.into_inner(); - if FILTERS.get_filter(&filter_id, &user_id).is_none() { + if FILTERS.get_filter(&filter_id, &user_id).await.is_none() { return Err(FiltersError::Metadata("Filter does not exist")); } filter.filter_id = Some(filter_id.clone()); filter.user_id = Some(user_id.clone()); filter.version = Some(CURRENT_FILTER_VERSION.to_string()); - FILTERS.update(&filter); + FILTERS.update(&filter).await; let path = filter_path( &user_id, @@ -116,6 +117,7 @@ pub async fn delete( let filter_id = filter_id.into_inner(); let filter = FILTERS .get_filter(&filter_id, &user_id) + .await .ok_or(FiltersError::Metadata("Filter does not exist"))?; let path = filter_path( @@ -126,7 +128,7 @@ pub async fn delete( let store = PARSEABLE.storage.get_object_store(); store.delete_object(&path).await?; - FILTERS.delete_filter(&filter_id); + FILTERS.delete_filter(&filter_id).await; Ok(HttpResponse::Ok().finish()) } @@ -141,6 +143,8 @@ pub enum FiltersError { Metadata(&'static str), #[error("User does not exist")] UserDoesNotExist(#[from] RBACError), + #[error("Error: {0}")] + Custom(String), } impl actix_web::ResponseError for FiltersError { @@ -150,6 +154,7 @@ impl actix_web::ResponseError for FiltersError { Self::Serde(_) => StatusCode::BAD_REQUEST, Self::Metadata(_) => StatusCode::BAD_REQUEST, Self::UserDoesNotExist(_) => StatusCode::NOT_FOUND, + Self::Custom(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 96c55d43d..48b062e31 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -16,19 +16,27 @@ * */ +use std::collections::HashMap; + use actix_web::http::header::ContentType; use chrono::Utc; use http::StatusCode; use itertools::Itertools; +use relative_path::RelativePathBuf; use serde::Serialize; +use tracing::error; use crate::{ alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS}, correlation::{CorrelationError, CORRELATIONS}, - handlers::http::logstream::{error::StreamError, get_stats_date}, + handlers::http::{ + cluster::fetch_daily_stats_from_ingestors, + logstream::{error::StreamError, get_stats_date}, + }, parseable::PARSEABLE, rbac::{map::SessionKey, role::Action, Users}, stats::Stats, + storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, users::{dashboards::DASHBOARDS, filters::FILTERS}, }; @@ -67,14 +75,6 @@ pub struct HomeResponse { } pub async fn generate_home_response(key: &SessionKey) -> Result { - let user_id = if let Some(user_id) = Users.get_username_from_session(key) { - user_id - } else { - return Err(PrismHomeError::Anyhow(anyhow::Error::msg( - "User does not exist", - ))); - }; - // get all stream titles let stream_titles: Vec = PARSEABLE .storage @@ -115,7 +115,8 @@ pub async fn generate_home_response(key: &SessionKey) -> Result Result Result d, + Err(e) => { + error!("Failed to parse stream metadata: {:?}", e); + continue; + } + }; + ingestor_stream_jsons.push(stream_metadata); } + stream_wise_ingestor_stream_json.insert(stream, ingestor_stream_jsons); + } - stream_details.push(details); + for date in dates.into_iter() { + let dated_stats = stats_for_date(date, stream_wise_ingestor_stream_json.clone()).await?; + summary.stats_summary.events += dated_stats.events; + summary.stats_summary.ingestion += dated_stats.ingestion_size; + summary.stats_summary.storage += dated_stats.storage_size; + + stream_details.push(dated_stats); } Ok(HomeResponse { stream_info: summary, stats_details: stream_details, - stream_titles, + stream_titles: stream_titles.clone(), alert_titles, correlation_titles, dashboard_titles, @@ -196,6 +213,28 @@ pub async fn generate_home_response(key: &SessionKey) -> Result>, +) -> Result { + // collect stats for all the streams for the given date + let mut details = DatedStats { + date: date.clone(), + ..Default::default() + }; + + for (stream, meta) in stream_wise_meta { + let querier_stats = get_stats_date(&stream, &date).await?; + let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?; + // collect date-wise stats for all streams + details.events += querier_stats.events + ingestor_stats.events; + details.ingestion_size += querier_stats.ingestion + ingestor_stats.ingestion; + details.storage_size += querier_stats.storage + ingestor_stats.storage; + } + + Ok(details) +} + #[derive(Debug, thiserror::Error)] pub enum PrismHomeError { #[error("Error: {0}")] @@ -206,6 +245,8 @@ pub enum PrismHomeError { CorrelationError(#[from] CorrelationError), #[error("StreamError: {0}")] StreamError(#[from] StreamError), + #[error("ObjectStorageError: {0}")] + ObjectStorageError(#[from] ObjectStorageError), } impl actix_web::ResponseError for PrismHomeError { @@ -215,6 +256,7 @@ impl actix_web::ResponseError for PrismHomeError { PrismHomeError::AlertError(e) => e.status_code(), PrismHomeError::CorrelationError(e) => e.status_code(), PrismHomeError::StreamError(e) => e.status_code(), + PrismHomeError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index ebdd463ec..e9ffcb981 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -26,13 +26,16 @@ use serde::Serialize; use crate::{ handlers::http::{ - cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, + cluster::{ + fetch_stats_from_ingestors, + utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, + }, logstream::error::StreamError, query::update_schema_when_distributed, }, parseable::{StreamNotFound, PARSEABLE}, stats, - storage::{retention::Retention, StreamInfo}, + storage::{retention::Retention, StreamInfo, StreamType}, LOCK_EXPECT, }; @@ -93,24 +96,31 @@ async fn get_stats(stream_name: &str) -> Result> = None; + let ingestor_stats = if PARSEABLE + .get_stream(stream_name) + .is_ok_and(|stream| stream.get_stream_type() == StreamType::UserDefined) + { + Some(fetch_stats_from_ingestors(stream_name).await?) + } else { + None + }; let time = Utc::now(); let stats = { let ingestion_stats = IngestionStats::new( stats.current_stats.events, - format!("{} {}", stats.current_stats.ingestion, "Bytes"), + stats.current_stats.ingestion, stats.lifetime_stats.events, - format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), + stats.lifetime_stats.ingestion, stats.deleted_stats.events, - format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), + stats.deleted_stats.ingestion, "json", ); let storage_stats = StorageStats::new( - format!("{} {}", stats.current_stats.storage, "Bytes"), - format!("{} {}", stats.lifetime_stats.storage, "Bytes"), - format!("{} {}", stats.deleted_stats.storage, "Bytes"), + stats.current_stats.storage, + stats.lifetime_stats.storage, + stats.deleted_stats.storage, "parquet", ); diff --git a/src/users/dashboards.rs b/src/users/dashboards.rs index 39dd785d0..15f5979ef 100644 --- a/src/users/dashboards.rs +++ b/src/users/dashboards.rs @@ -16,15 +16,14 @@ * */ -use std::sync::RwLock; - use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_json::Value; +use tokio::sync::RwLock; use crate::{ - migration::to_bytes, parseable::PARSEABLE, storage::object_storage::dashboard_path, - utils::get_hash, LOCK_EXPECT, + alerts::alerts_utils::user_auth_for_query, migration::to_bytes, parseable::PARSEABLE, + rbac::map::SessionKey, storage::object_storage::dashboard_path, utils::get_hash, }; use super::TimeFilter; @@ -172,27 +171,27 @@ impl Dashboards { } } - let mut s = self.0.write().expect(LOCK_EXPECT); + let mut s = self.0.write().await; s.append(&mut this); Ok(()) } - pub fn update(&self, dashboard: &Dashboard) { - let mut s = self.0.write().expect(LOCK_EXPECT); + pub async fn update(&self, dashboard: &Dashboard) { + let mut s = self.0.write().await; s.retain(|d| d.dashboard_id != dashboard.dashboard_id); s.push(dashboard.clone()); } - pub fn delete_dashboard(&self, dashboard_id: &str) { - let mut s = self.0.write().expect(LOCK_EXPECT); + pub async fn delete_dashboard(&self, dashboard_id: &str) { + let mut s = self.0.write().await; s.retain(|d| d.dashboard_id != Some(dashboard_id.to_string())); } - pub fn get_dashboard(&self, dashboard_id: &str, user_id: &str) -> Option { + pub async fn get_dashboard(&self, dashboard_id: &str, user_id: &str) -> Option { self.0 .read() - .expect(LOCK_EXPECT) + .await .iter() .find(|d| { d.dashboard_id == Some(dashboard_id.to_string()) @@ -201,14 +200,28 @@ impl Dashboards { .cloned() } - pub fn list_dashboards_by_user(&self, user_id: &str) -> Vec { - self.0 - .read() - .expect(LOCK_EXPECT) - .iter() - .filter(|d| d.user_id == Some(user_id.to_string())) - .cloned() - .collect() + pub async fn list_dashboards(&self, key: &SessionKey) -> Vec { + let read = self.0.read().await; + + let mut dashboards = Vec::new(); + + for d in read.iter() { + let mut skip_dashboard = false; + for tile in d.tiles.iter() { + let query = &tile.query; + match user_auth_for_query(key, query).await { + Ok(_) => {} + Err(_) => { + skip_dashboard = true; + break; + } + } + } + if !skip_dashboard { + dashboards.push(d.clone()); + } + } + dashboards } } diff --git a/src/users/filters.rs b/src/users/filters.rs index 650be18a4..59a06dff0 100644 --- a/src/users/filters.rs +++ b/src/users/filters.rs @@ -19,12 +19,12 @@ use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_json::Value; -use std::sync::RwLock; +use tokio::sync::RwLock; use super::TimeFilter; use crate::{ - migration::to_bytes, parseable::PARSEABLE, storage::object_storage::filter_path, - utils::get_hash, LOCK_EXPECT, + alerts::alerts_utils::user_auth_for_query, migration::to_bytes, parseable::PARSEABLE, + rbac::map::SessionKey, storage::object_storage::filter_path, utils::get_hash, }; pub static FILTERS: Lazy = Lazy::new(Filters::default); @@ -124,27 +124,27 @@ impl Filters { } } - let mut s = self.0.write().expect(LOCK_EXPECT); + let mut s = self.0.write().await; s.append(&mut this); Ok(()) } - pub fn update(&self, filter: &Filter) { - let mut s = self.0.write().expect(LOCK_EXPECT); + pub async fn update(&self, filter: &Filter) { + let mut s = self.0.write().await; s.retain(|f| f.filter_id != filter.filter_id); s.push(filter.clone()); } - pub fn delete_filter(&self, filter_id: &str) { - let mut s = self.0.write().expect(LOCK_EXPECT); + pub async fn delete_filter(&self, filter_id: &str) { + let mut s = self.0.write().await; s.retain(|f| f.filter_id != Some(filter_id.to_string())); } - pub fn get_filter(&self, filter_id: &str, user_id: &str) -> Option { + pub async fn get_filter(&self, filter_id: &str, user_id: &str) -> Option { self.0 .read() - .expect(LOCK_EXPECT) + .await .iter() .find(|f| { f.filter_id == Some(filter_id.to_string()) && f.user_id == Some(user_id.to_string()) @@ -152,14 +152,23 @@ impl Filters { .cloned() } - pub fn list_filters_by_user(&self, user_id: &str) -> Vec { - self.0 - .read() - .expect(LOCK_EXPECT) - .iter() - .filter(|f| f.user_id == Some(user_id.to_string())) - .cloned() - .collect() + pub async fn list_filters(&self, key: &SessionKey) -> Vec { + let read = self.0.read().await; + + let mut filters = Vec::new(); + + for f in read.iter() { + let query = if let Some(q) = &f.query.filter_query { + q + } else { + continue; + }; + + if (user_auth_for_query(key, query).await).is_ok() { + filters.push(f.clone()) + } + } + filters } }