From ac2457038be24d7243ecea8b2ff7d97cda2d7ce4 Mon Sep 17 00:00:00 2001 From: anant Date: Thu, 1 May 2025 14:04:42 +0530 Subject: [PATCH 1/3] bugfix Stats were not getting deleted during stream deletion --- src/handlers/http/logstream.rs | 4 +-- .../http/modal/ingest/ingestor_logstream.rs | 15 ++--------- .../http/modal/query/querier_logstream.rs | 4 +-- src/stats.rs | 25 ++++++++++++++----- 4 files changed, 25 insertions(+), 23 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index d4949b61b..b5d65be5d 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -57,9 +57,9 @@ pub async fn delete(stream_name: Path) -> Result) -> Result { let stream_name = stream_name.into_inner(); - // if the stream not found in memory map, - //check if it exists in the storage - //create stream and schema from storage - if !PARSEABLE.streams.contains(&stream_name) - && !PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await - .unwrap_or(false) - { - return Err(StreamNotFound(stream_name.clone()).into()); - } // Delete from staging let stream_dir = PARSEABLE.get_stream(&stream_name)?; - if fs::remove_dir_all(&stream_dir.data_path).is_err() { + if let Err(err) = fs::remove_dir_all(&stream_dir.data_path) { warn!( - "failed to delete local data for stream {}. Clean {} manually", + "failed to delete local data for stream {} with error {err}. Clean {} manually", stream_name, stream_dir.data_path.to_string_lossy() ) diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index ebffbfb43..fc2242cdc 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -68,9 +68,9 @@ pub async fn delete(stream_name: Path) -> Result prometheus::Resu fn delete_with_label_prefix(metrics: &IntGaugeVec, prefix: &[&str]) { let families: Vec = metrics.collect().into_iter().collect(); for metric in families.iter().flat_map(|m| m.get_metric()) { - let label: Vec<&str> = metric.get_label().iter().map(|l| l.get_value()).collect(); - if !label.starts_with(prefix) { - continue; - } - if let Err(err) = metrics.remove_label_values(&label) { - warn!("Error = {err}"); + let label_map: HashMap<&str, &str> = metric + .get_label() + .iter() + .map(|l| (l.get_name(), l.get_value())) + .collect(); + + let mut should_continue = false; + prefix.iter().for_each(|p| { + // label map doesn't have the key present in prefix, hence continue + if !label_map.values().contains(p) { + should_continue = true; + } + }); + if !should_continue { + if let Err(err) = metrics.remove(&label_map) { + warn!("Error = {err}"); + } } } } From 79e542426d2f8d4131a5252ce4b78df476248f4d Mon Sep 17 00:00:00 2001 From: anant Date: Fri, 2 May 2025 14:13:04 +0530 Subject: [PATCH 2/3] coderabbit suggestion --- src/stats.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/stats.rs b/src/stats.rs index dfb3117ce..2d42af959 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -19,7 +19,6 @@ use std::collections::HashMap; use std::sync::Arc; -use itertools::Itertools; use prometheus::core::Collector; use prometheus::proto::MetricFamily; use prometheus::IntGaugeVec; @@ -199,16 +198,12 @@ fn delete_with_label_prefix(metrics: &IntGaugeVec, prefix: &[&str]) { .map(|l| (l.get_name(), l.get_value())) .collect(); - let mut should_continue = false; - prefix.iter().for_each(|p| { - // label map doesn't have the key present in prefix, hence continue - if !label_map.values().contains(p) { - should_continue = true; - } - }); - if !should_continue { + // Check if all prefix elements are present in label values + let all_prefixes_found = prefix.iter().all(|p| label_map.values().any(|v| v == p)); + + if all_prefixes_found { if let Err(err) = metrics.remove(&label_map) { - warn!("Error = {err}"); + warn!("Error removing metric with labels {:?}: {err}", label_map); } } } From 9d1750cfabf7f7cd90fe5ca7440d7a6e08f4931e Mon Sep 17 00:00:00 2001 From: anant Date: Sat, 3 May 2025 16:06:13 +0530 Subject: [PATCH 3/3] stats removal order fix --- src/stats.rs | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/stats.rs b/src/stats.rs index 2d42af959..7bacdfd0a 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use std::sync::Arc; +use once_cell::sync::Lazy; use prometheus::core::Collector; use prometheus::proto::MetricFamily; use prometheus::IntGaugeVec; @@ -172,15 +173,15 @@ pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Resu let event_labels = event_labels(stream_name, format); let storage_size_labels = storage_size_labels(stream_name); - EVENTS_INGESTED.remove_label_values(&event_labels)?; - EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?; - STORAGE_SIZE.remove_label_values(&storage_size_labels)?; - EVENTS_DELETED.remove_label_values(&event_labels)?; - EVENTS_DELETED_SIZE.remove_label_values(&event_labels)?; - DELETED_EVENTS_STORAGE_SIZE.remove_label_values(&storage_size_labels)?; - LIFETIME_EVENTS_INGESTED.remove_label_values(&event_labels)?; - LIFETIME_EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?; - LIFETIME_EVENTS_STORAGE_SIZE.remove_label_values(&storage_size_labels)?; + remove_label_values(&EVENTS_INGESTED, &event_labels); + remove_label_values(&EVENTS_INGESTED_SIZE, &event_labels); + remove_label_values(&STORAGE_SIZE, &storage_size_labels); + remove_label_values(&EVENTS_DELETED, &event_labels); + remove_label_values(&EVENTS_DELETED_SIZE, &event_labels); + remove_label_values(&DELETED_EVENTS_STORAGE_SIZE, &storage_size_labels); + remove_label_values(&LIFETIME_EVENTS_INGESTED, &event_labels); + remove_label_values(&LIFETIME_EVENTS_INGESTED_SIZE, &event_labels); + remove_label_values(&LIFETIME_EVENTS_STORAGE_SIZE, &storage_size_labels); delete_with_label_prefix(&EVENTS_INGESTED_DATE, &event_labels); delete_with_label_prefix(&EVENTS_INGESTED_SIZE_DATE, &event_labels); @@ -189,6 +190,13 @@ pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Resu Ok(()) } +#[inline] +fn remove_label_values(lazy_static: &Lazy, event_labels: &[&str]) { + if let Err(e) = lazy_static.remove_label_values(event_labels) { + warn!("Unable to delete labels- {event_labels:?}\nwith error- {e}"); + } +} + fn delete_with_label_prefix(metrics: &IntGaugeVec, prefix: &[&str]) { let families: Vec = metrics.collect().into_iter().collect(); for metric in families.iter().flat_map(|m| m.get_metric()) {