diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index d4949b61b..b5d65be5d 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -57,9 +57,9 @@ pub async fn delete(stream_name: Path) -> Result) -> Result { let stream_name = stream_name.into_inner(); - // if the stream not found in memory map, - //check if it exists in the storage - //create stream and schema from storage - if !PARSEABLE.streams.contains(&stream_name) - && !PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await - .unwrap_or(false) - { - return Err(StreamNotFound(stream_name.clone()).into()); - } // Delete from staging let stream_dir = PARSEABLE.get_stream(&stream_name)?; - if fs::remove_dir_all(&stream_dir.data_path).is_err() { + if let Err(err) = fs::remove_dir_all(&stream_dir.data_path) { warn!( - "failed to delete local data for stream {}. Clean {} manually", + "failed to delete local data for stream {} with error {err}. Clean {} manually", stream_name, stream_dir.data_path.to_string_lossy() ) diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index ebffbfb43..fc2242cdc 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -68,9 +68,9 @@ pub async fn delete(stream_name: Path) -> Result prometheus::Resu let event_labels = event_labels(stream_name, format); let storage_size_labels = storage_size_labels(stream_name); - EVENTS_INGESTED.remove_label_values(&event_labels)?; - EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?; - STORAGE_SIZE.remove_label_values(&storage_size_labels)?; - EVENTS_DELETED.remove_label_values(&event_labels)?; - EVENTS_DELETED_SIZE.remove_label_values(&event_labels)?; - DELETED_EVENTS_STORAGE_SIZE.remove_label_values(&storage_size_labels)?; - LIFETIME_EVENTS_INGESTED.remove_label_values(&event_labels)?; - LIFETIME_EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?; - LIFETIME_EVENTS_STORAGE_SIZE.remove_label_values(&storage_size_labels)?; + remove_label_values(&EVENTS_INGESTED, &event_labels); + remove_label_values(&EVENTS_INGESTED_SIZE, &event_labels); + remove_label_values(&STORAGE_SIZE, &storage_size_labels); + remove_label_values(&EVENTS_DELETED, &event_labels); + remove_label_values(&EVENTS_DELETED_SIZE, &event_labels); + remove_label_values(&DELETED_EVENTS_STORAGE_SIZE, &storage_size_labels); + remove_label_values(&LIFETIME_EVENTS_INGESTED, &event_labels); + remove_label_values(&LIFETIME_EVENTS_INGESTED_SIZE, &event_labels); + remove_label_values(&LIFETIME_EVENTS_STORAGE_SIZE, &storage_size_labels); delete_with_label_prefix(&EVENTS_INGESTED_DATE, &event_labels); delete_with_label_prefix(&EVENTS_INGESTED_SIZE_DATE, &event_labels); @@ -188,15 +190,29 @@ pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Resu Ok(()) } +#[inline] +fn remove_label_values(lazy_static: &Lazy, event_labels: &[&str]) { + if let Err(e) = lazy_static.remove_label_values(event_labels) { + warn!("Unable to delete labels- {event_labels:?}\nwith error- {e}"); + } +} + fn delete_with_label_prefix(metrics: &IntGaugeVec, prefix: &[&str]) { let families: Vec = metrics.collect().into_iter().collect(); for metric in families.iter().flat_map(|m| m.get_metric()) { - let label: Vec<&str> = metric.get_label().iter().map(|l| l.get_value()).collect(); - if !label.starts_with(prefix) { - continue; - } - if let Err(err) = metrics.remove_label_values(&label) { - warn!("Error = {err}"); + let label_map: HashMap<&str, &str> = metric + .get_label() + .iter() + .map(|l| (l.get_name(), l.get_value())) + .collect(); + + // Check if all prefix elements are present in label values + let all_prefixes_found = prefix.iter().all(|p| label_map.values().any(|v| v == p)); + + if all_prefixes_found { + if let Err(err) = metrics.remove(&label_map) { + warn!("Error removing metric with labels {:?}: {err}", label_map); + } } } }