From bd94e3ce1a063c56d0a29be00ed9e88a50cc78c4 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 29 Jan 2025 01:25:19 -0500 Subject: [PATCH 1/3] feat: add sysinfo metrics collect CPU usage, memory usage of the server collect disk usage of the volume - data, staging, hot-tier add these metrics to Prometheus Metrics export these metrics to cluster metrics API add the metrics to pmeta stream add the querier node's sysinfo metrics to pmeta and cluster metrics API --- src/handlers/http/cluster/mod.rs | 30 +-- src/handlers/http/ingest.rs | 19 +- src/handlers/http/modal/query_server.rs | 2 + src/metrics/mod.rs | 255 +++++++++++++++++++++++- src/metrics/prom_utils.rs | 243 ++++++++++++++++++++++ 5 files changed, 521 insertions(+), 28 deletions(-) diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index c1cd46b6c..828178c30 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use std::time::Duration; use actix_web::http::header::{self, HeaderMap}; -use actix_web::web::Path; +use actix_web::web::{Json, Path}; use actix_web::Responder; use bytes::Bytes; use chrono::Utc; @@ -867,7 +867,6 @@ where let text = res.text().await.map_err(PostError::NetworkError)?; let lines: Vec> = text.lines().map(|line| Ok(line.to_owned())).collect_vec(); - let sample = prometheus_parse::Scrape::parse(lines.into_iter()) .map_err(|err| PostError::CustomError(err.to_string()))? .samples; @@ -993,7 +992,7 @@ async fn fetch_cluster_metrics() -> Result, PostError> { Ok(all_metrics) } -pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { +pub async fn init_cluster_metrics_scheduler() -> Result<(), PostError> { info!("Setting up schedular for cluster metrics ingestion"); let mut scheduler = AsyncScheduler::new(); scheduler @@ -1002,25 +1001,12 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { let result: Result<(), PostError> = async { let cluster_metrics = fetch_cluster_metrics().await; if let Ok(metrics) = cluster_metrics { - if !metrics.is_empty() { - info!("Cluster metrics fetched successfully from all ingestors"); - if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { - if matches!( - ingest_internal_stream( - INTERNAL_STREAM_NAME.to_string(), - bytes::Bytes::from(metrics_bytes), - ) - .await, - Ok(()) - ) { - info!("Cluster metrics successfully ingested into internal stream"); - } else { - error!("Failed to ingest cluster metrics into internal stream"); - } - } else { - error!("Failed to serialize cluster metrics"); - } - } + let json_value = serde_json::to_value(metrics) + .map_err(|e| anyhow::anyhow!("Failed to serialize metrics: {}", e))?; + + ingest_internal_stream(INTERNAL_STREAM_NAME.to_string(), Json(json_value)) + .await + .map_err(|e| anyhow::anyhow!("Failed to ingest metrics: {}", e))?; } Ok(()) } diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 02bab9e97..257271665 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -18,10 +18,27 @@ use std::collections::{HashMap, HashSet}; +use super::logstream::error::{CreateStreamError, StreamError}; +use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; +use super::users::dashboards::DashboardError; +use super::users::filters::FiltersError; +use crate::event::format::{self, EventFormat, LogSource}; +use crate::event::{self, error::EventError}; +use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage; +use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; +use crate::metadata::error::stream_info::MetadataError; +use crate::metadata::{SchemaVersion, STREAM_INFO}; +use crate::option::{Mode, CONFIG}; +use crate::otel::logs::flatten_otel_logs; +use crate::otel::metrics::flatten_otel_metrics; +use crate::otel::traces::flatten_otel_traces; +use crate::storage::{ObjectStorageError, StreamType}; +use crate::utils::header_parsing::ParseHeaderError; +use crate::utils::json::convert_array_to_object; +use crate::utils::json::flatten::{convert_to_array, JsonFlattenError}; use actix_web::web::{Json, Path}; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_array::RecordBatch; -use bytes::Bytes; use chrono::Utc; use http::StatusCode; use serde_json::Value; diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index b138a292c..8b684101d 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -117,6 +117,8 @@ impl ParseableServer for QueryServer { // track all parquet files already in the data directory storage::retention::load_retention_from_global(); + metrics::init_system_metrics_scheduler().await?; + cluster::init_cluster_metrics_scheduler().await?; // all internal data structures populated now. // start the analytics scheduler if enabled if PARSEABLE.options.send_analytics { diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 1896bce0c..063a676fb 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -18,15 +18,25 @@ pub mod prom_utils; pub mod storage; - -use crate::{handlers::http::metrics_path, stats::FullStats}; +use actix_web::HttpResponse; +use clokwerk::{AsyncScheduler, Interval}; +use http::StatusCode; +use serde::Serialize; +use std::{path::Path, time::Duration}; +use sysinfo::{Disks, System}; +use tracing::{error, info}; + +use crate::{handlers::http::metrics_path, option::CONFIG, stats::FullStats}; use actix_web::Responder; use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; use error::MetricsError; use once_cell::sync::Lazy; -use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry}; +use prometheus::{ + GaugeVec, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry, +}; pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME"); +const SYSTEM_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1); pub static EVENTS_INGESTED: Lazy = Lazy::new(|| { IntGaugeVec::new( @@ -182,6 +192,42 @@ pub static ALERTS_STATES: Lazy = Lazy::new(|| { .expect("metric can be created") }); +pub static TOTAL_DISK: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("total_disk", "Total Disk Size").namespace(METRICS_NAMESPACE), + &["volume"], + ) + .expect("metric can be created") +}); +pub static USED_DISK: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("used_disk", "Used Disk Size").namespace(METRICS_NAMESPACE), + &["volume"], + ) + .expect("metric can be created") +}); +pub static AVAILABLE_DISK: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("available_disk", "Available Disk Size").namespace(METRICS_NAMESPACE), + &["volume"], + ) + .expect("metric can be created") +}); +pub static MEMORY: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("memory_usage", "Memory Usage").namespace(METRICS_NAMESPACE), + &["memory_usage"], + ) + .expect("metric can be created") +}); +pub static CPU: Lazy = Lazy::new(|| { + GaugeVec::new( + Opts::new("cpu_usage", "CPU Usage").namespace(METRICS_NAMESPACE), + &["cpu_usage"], + ) + .expect("metric can be created") +}); + fn custom_metrics(registry: &Registry) { registry .register(Box::new(EVENTS_INGESTED.clone())) @@ -231,6 +277,21 @@ fn custom_metrics(registry: &Registry) { registry .register(Box::new(ALERTS_STATES.clone())) .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_DISK.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(USED_DISK.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(AVAILABLE_DISK.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(MEMORY.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(CPU.clone())) + .expect("metric can be registered"); } pub fn build_metrics_handler() -> PrometheusMetrics { @@ -290,12 +351,196 @@ pub async fn fetch_stats_from_storage(stream_name: &str, stats: FullStats) { .set(stats.lifetime_stats.storage as i64); } -use actix_web::HttpResponse; - pub async fn get() -> Result { Ok(HttpResponse::Ok().body(format!("{:?}", build_metrics_handler()))) } +#[derive(Debug, Serialize, Default, Clone)] +pub struct DiskMetrics { + total: u64, + used: u64, + available: u64, +} + +#[derive(Debug, Serialize, Default, Clone)] +pub struct SystemMetrics { + memory: MemoryMetrics, + cpu: Vec, +} + +#[derive(Debug, Serialize, Default, Clone)] +pub struct MemoryMetrics { + total: u64, + used: u64, + total_swap: u64, + used_swap: u64, +} + +#[derive(Debug, Serialize, Default, Clone)] +pub struct CpuMetrics { + name: String, + usage: f64, +} + +// Scheduler for collecting all system metrics +pub async fn init_system_metrics_scheduler() -> Result<(), MetricsError> { + info!("Setting up scheduler for capturing system metrics"); + let mut scheduler = AsyncScheduler::new(); + + scheduler + .every(SYSTEM_METRICS_INTERVAL_SECONDS) + .run(move || async { + if let Err(err) = collect_all_metrics().await { + error!("Error in capturing system metrics: {:#}", err); + } + }); + + tokio::spawn(async move { + loop { + scheduler.run_pending().await; + tokio::time::sleep(Duration::from_secs(10)).await; + } + }); + + Ok(()) +} + +// Function to collect memory, CPU and disk usage metrics +pub async fn collect_all_metrics() -> Result<(), MetricsError> { + // Collect system metrics (CPU and memory) + collect_system_metrics().await?; + + // Collect disk metrics for all volumes + collect_disk_metrics().await?; + + Ok(()) +} + +// Function to collect disk usage metrics +async fn collect_disk_metrics() -> Result<(), MetricsError> { + // collect staging volume metrics + collect_volume_disk_usage("staging", CONFIG.staging_dir())?; + // Collect data volume metrics for local storage + if CONFIG.get_storage_mode_string() == "Local drive" { + collect_volume_disk_usage("data", Path::new(&CONFIG.storage().get_endpoint()))?; + } + + // Collect hot tier volume metrics if configured + if let Some(hot_tier_dir) = CONFIG.hot_tier_dir() { + collect_volume_disk_usage("hot_tier", hot_tier_dir)?; + } + + Ok(()) +} + +// Function to collect disk usage metrics for a specific volume +fn collect_volume_disk_usage(label: &str, path: &Path) -> Result<(), MetricsError> { + let metrics = get_volume_disk_usage(path)?; + + TOTAL_DISK + .with_label_values(&[label]) + .set(metrics.total as i64); + USED_DISK + .with_label_values(&[label]) + .set(metrics.used as i64); + AVAILABLE_DISK + .with_label_values(&[label]) + .set(metrics.available as i64); + + Ok(()) +} + +// Function to get disk usage for a specific volume +fn get_volume_disk_usage(path: &Path) -> Result { + let mut disks = Disks::new_with_refreshed_list(); + disks.sort_by(|a, b| { + b.mount_point() + .to_str() + .unwrap_or("") + .len() + .cmp(&a.mount_point().to_str().unwrap_or("").len()) + }); + + for disk in disks.iter() { + let mount_point = disk.mount_point().to_str().unwrap(); + + if path.starts_with(mount_point) { + return Ok(DiskMetrics { + total: disk.total_space(), + used: disk.total_space() - disk.available_space(), + available: disk.available_space(), + }); + } + } + + Err(MetricsError::Custom( + format!("No matching disk found for path: {:?}", path), + StatusCode::INTERNAL_SERVER_ERROR, + )) +} + +// Function to collect CPU and memory usage metrics +async fn collect_system_metrics() -> Result<(), MetricsError> { + let metrics = get_system_metrics()?; + + // Set memory metrics + MEMORY + .with_label_values(&["total_memory"]) + .set(metrics.memory.total as i64); + MEMORY + .with_label_values(&["used_memory"]) + .set(metrics.memory.used as i64); + MEMORY + .with_label_values(&["total_swap"]) + .set(metrics.memory.total_swap as i64); + MEMORY + .with_label_values(&["used_swap"]) + .set(metrics.memory.used_swap as i64); + + // Set CPU metrics + for cpu in metrics.cpu { + CPU.with_label_values(&[&cpu.name]).set(cpu.usage); + } + + Ok(()) +} + +// Get system metrics +fn get_system_metrics() -> Result { + let mut sys = System::new_all(); + sys.refresh_all(); + + // Collect memory metrics + let memory = MemoryMetrics { + total: sys.total_memory(), + used: sys.used_memory(), + total_swap: sys.total_swap(), + used_swap: sys.used_swap(), + }; + + // Collect CPU metrics + let mut cpu_metrics = Vec::new(); + + // Add global CPU usage + cpu_metrics.push(CpuMetrics { + name: "global".to_string(), + usage: sys.global_cpu_usage() as f64, + }); + + // Add individual CPU usage + for cpu in sys.cpus() { + cpu_metrics.push(CpuMetrics { + name: cpu.name().to_string(), + usage: cpu.cpu_usage() as f64, + }); + } + + Ok(SystemMetrics { + memory, + cpu: cpu_metrics, + }) +} + pub mod error { use actix_web::http::header::ContentType; diff --git a/src/metrics/prom_utils.rs b/src/metrics/prom_utils.rs index f9f559f5e..5aef018bf 100644 --- a/src/metrics/prom_utils.rs +++ b/src/metrics/prom_utils.rs @@ -16,6 +16,10 @@ * */ +use std::collections::HashMap; +use std::path::Path; + +use crate::about::current; use crate::handlers::http::base_path_without_preceding_slash; use crate::handlers::http::ingest::PostError; use crate::handlers::http::modal::Metadata; @@ -34,6 +38,11 @@ use tracing::error; use tracing::warn; use url::Url; +use super::get_system_metrics; +use super::get_volume_disk_usage; +use super::DiskMetrics; +use super::MemoryMetrics; + #[derive(Debug, Serialize, Clone)] pub struct Metrics { address: String, @@ -53,6 +62,11 @@ pub struct Metrics { event_time: NaiveDateTime, commit: String, staging: String, + parseable_data_disk_usage: DiskMetrics, + parseable_staging_disk_usage: DiskMetrics, + parseable_hot_tier_disk_usage: DiskMetrics, + parseable_memory_usage: MemoryMetrics, + parseable_cpu_usage: HashMap, } #[derive(Debug, Serialize, Default, Clone)] @@ -113,11 +127,240 @@ impl Metrics { event_time: Utc::now().naive_utc(), commit: "".to_string(), staging: "".to_string(), + parseable_data_disk_usage: DiskMetrics { + total: 0, + used: 0, + available: 0, + }, + parseable_staging_disk_usage: DiskMetrics { + total: 0, + used: 0, + available: 0, + }, + parseable_hot_tier_disk_usage: DiskMetrics { + total: 0, + used: 0, + available: 0, + }, + parseable_memory_usage: MemoryMetrics { + total: 0, + used: 0, + total_swap: 0, + used_swap: 0, + }, + parseable_cpu_usage: HashMap::new(), } } } +#[derive(Debug)] +enum MetricType { + SimpleGauge(String), + StorageSize(String), + DiskUsage(String), + MemoryUsage(String), + CpuUsage, +} + +impl MetricType { + fn from_metric(metric: &str, labels: &HashMap) -> Option { + match metric { + "parseable_events_ingested" => { + Some(Self::SimpleGauge("parseable_events_ingested".into())) + } + "parseable_events_ingested_size" => { + Some(Self::SimpleGauge("parseable_events_ingested_size".into())) + } + "parseable_lifetime_events_ingested" => Some(Self::SimpleGauge( + "parseable_lifetime_events_ingested".into(), + )), + "parseable_lifetime_events_ingested_size" => Some(Self::SimpleGauge( + "parseable_lifetime_events_ingested_size".into(), + )), + "parseable_events_deleted" => { + Some(Self::SimpleGauge("parseable_events_deleted".into())) + } + "parseable_events_deleted_size" => { + Some(Self::SimpleGauge("parseable_events_deleted_size".into())) + } + "parseable_staging_files" => Some(Self::SimpleGauge("parseable_staging_files".into())), + "process_resident_memory_bytes" => { + Some(Self::SimpleGauge("process_resident_memory_bytes".into())) + } + "parseable_storage_size" => labels.get("type").map(|t| Self::StorageSize(t.clone())), + "parseable_lifetime_events_storage_size" => { + labels.get("type").map(|t| Self::StorageSize(t.clone())) + } + "parseable_deleted_events_storage_size" => { + labels.get("type").map(|t| Self::StorageSize(t.clone())) + } + "parseable_total_disk" | "parseable_used_disk" | "parseable_available_disk" => { + labels.get("volume").map(|v| Self::DiskUsage(v.clone())) + } + "parseable_memory_usage" => labels + .get("memory_usage") + .map(|m| Self::MemoryUsage(m.clone())), + "parseable_cpu_usage" => Some(Self::CpuUsage), + _ => None, + } + } +} impl Metrics { + pub async fn ingestor_prometheus_samples( + samples: Vec, + ingestor_metadata: &IngestorMetadata, + ) -> Result { + let mut metrics = Metrics::new(ingestor_metadata.domain_name.to_string()); + + Self::build_metrics_from_samples(samples, &mut metrics)?; + + // Get additional metadata + let (commit_id, staging) = Self::from_about_api_response(ingestor_metadata.clone()) + .await + .map_err(|err| { + error!("Fatal: failed to get ingestor info: {:?}", err); + PostError::Invalid(err.into()) + })?; + + metrics.commit = commit_id; + metrics.staging = staging; + + Ok(metrics) + } + + pub async fn querier_prometheus_metrics() -> Self { + let mut metrics = Metrics::new(get_url().to_string()); + + let system_metrics = get_system_metrics().expect("Failed to get system metrics"); + + metrics.parseable_memory_usage.total = system_metrics.memory.total; + metrics.parseable_memory_usage.used = system_metrics.memory.used; + metrics.parseable_memory_usage.total_swap = system_metrics.memory.total_swap; + metrics.parseable_memory_usage.used_swap = system_metrics.memory.used_swap; + for cpu_usage in system_metrics.cpu { + metrics + .parseable_cpu_usage + .insert(cpu_usage.name.clone(), cpu_usage.usage); + } + + let staging_disk_usage = get_volume_disk_usage(CONFIG.staging_dir()) + .expect("Failed to get staging volume disk usage"); + + metrics.parseable_staging_disk_usage.total = staging_disk_usage.total; + metrics.parseable_staging_disk_usage.used = staging_disk_usage.used; + metrics.parseable_staging_disk_usage.available = staging_disk_usage.available; + + if CONFIG.get_storage_mode_string() == "Local drive" { + let data_disk_usage = + get_volume_disk_usage(Path::new(&CONFIG.storage().get_endpoint())) + .expect("Failed to get data volume disk usage"); + + metrics.parseable_data_disk_usage.total = data_disk_usage.total; + metrics.parseable_data_disk_usage.used = data_disk_usage.used; + metrics.parseable_data_disk_usage.available = data_disk_usage.available; + } + + if CONFIG.options.hot_tier_storage_path.is_some() { + let hot_tier_disk_usage = + get_volume_disk_usage(CONFIG.hot_tier_dir().as_ref().unwrap()) + .expect("Failed to get hot tier volume disk usage"); + + metrics.parseable_hot_tier_disk_usage.total = hot_tier_disk_usage.total; + metrics.parseable_hot_tier_disk_usage.used = hot_tier_disk_usage.used; + metrics.parseable_hot_tier_disk_usage.available = hot_tier_disk_usage.available; + } + + metrics.commit = current().commit_hash; + metrics.staging = CONFIG.staging_dir().display().to_string(); + + metrics + } + + fn build_metrics_from_samples( + samples: Vec, + metrics: &mut Metrics, + ) -> Result<(), PostError> { + for sample in samples { + let metric_type = MetricType::from_metric(&sample.metric, &sample.labels); + + match (sample.value.clone(), metric_type) { + (PromValue::Gauge(val), Some(metric_type)) => { + Self::process_gauge_metric( + metrics, + metric_type, + val, + &sample.metric, + sample.clone(), + ); + } + _ => continue, + } + } + Ok(()) + } + + fn process_gauge_metric( + metrics: &mut Metrics, + metric_type: MetricType, + val: f64, + metric_name: &str, + sample: PromSample, + ) { + match metric_type { + MetricType::SimpleGauge(metric_name) => match metric_name.as_str() { + "parseable_events_ingested" => metrics.parseable_events_ingested += val, + "parseable_events_ingested_size" => metrics.parseable_events_ingested_size += val, + "parseable_lifetime_events_ingested" => { + metrics.parseable_lifetime_events_ingested += val + } + "parseable_lifetime_events_ingested_size" => { + metrics.parseable_lifetime_events_ingested_size += val + } + "parseable_events_deleted" => metrics.parseable_deleted_events_ingested += val, + "parseable_events_deleted_size" => { + metrics.parseable_deleted_events_ingested_size += val + } + "parseable_staging_files" => metrics.parseable_staging_files += val, + "process_resident_memory_bytes" => metrics.process_resident_memory_bytes += val, + _ => {} + }, + MetricType::StorageSize(storage_type) => match storage_type.as_str() { + "staging" => metrics.parseable_storage_size.staging += val, + "data" => metrics.parseable_storage_size.data += val, + _ => {} + }, + MetricType::DiskUsage(volume_type) => { + let disk_usage = match volume_type.as_str() { + "data" => &mut metrics.parseable_data_disk_usage, + "staging" => &mut metrics.parseable_staging_disk_usage, + "hot_tier" => &mut metrics.parseable_hot_tier_disk_usage, + _ => return, + }; + + match metric_name { + "parseable_total_disk" => disk_usage.total = val as u64, + "parseable_used_disk" => disk_usage.used = val as u64, + "parseable_available_disk" => disk_usage.available = val as u64, + _ => {} + } + } + MetricType::MemoryUsage(memory_type) => match memory_type.as_str() { + "total_memory" => metrics.parseable_memory_usage.total = val as u64, + "used_memory" => metrics.parseable_memory_usage.used = val as u64, + "total_swap" => metrics.parseable_memory_usage.total_swap = val as u64, + "used_swap" => metrics.parseable_memory_usage.used_swap = val as u64, + _ => {} + }, + MetricType::CpuUsage => { + if let Some(cpu_name) = sample.labels.get("cpu_usage") { + metrics + .parseable_cpu_usage + .insert(cpu_name.to_string(), val); + } + } + } + } + pub fn get_daily_stats_from_samples( samples: Vec, stream_name: &str, From 062bc25fc2ba9293b3f374a4f6f6c719cd4eaf3f Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 29 Jan 2025 07:28:27 -0500 Subject: [PATCH 2/3] deepsource fix --- src/metrics/prom_utils.rs | 120 +++++++++++++++++++++++--------------- 1 file changed, 72 insertions(+), 48 deletions(-) diff --git a/src/metrics/prom_utils.rs b/src/metrics/prom_utils.rs index 5aef018bf..f655c38f0 100644 --- a/src/metrics/prom_utils.rs +++ b/src/metrics/prom_utils.rs @@ -307,57 +307,81 @@ impl Metrics { sample: PromSample, ) { match metric_type { - MetricType::SimpleGauge(metric_name) => match metric_name.as_str() { - "parseable_events_ingested" => metrics.parseable_events_ingested += val, - "parseable_events_ingested_size" => metrics.parseable_events_ingested_size += val, - "parseable_lifetime_events_ingested" => { - metrics.parseable_lifetime_events_ingested += val - } - "parseable_lifetime_events_ingested_size" => { - metrics.parseable_lifetime_events_ingested_size += val - } - "parseable_events_deleted" => metrics.parseable_deleted_events_ingested += val, - "parseable_events_deleted_size" => { - metrics.parseable_deleted_events_ingested_size += val - } - "parseable_staging_files" => metrics.parseable_staging_files += val, - "process_resident_memory_bytes" => metrics.process_resident_memory_bytes += val, - _ => {} - }, - MetricType::StorageSize(storage_type) => match storage_type.as_str() { - "staging" => metrics.parseable_storage_size.staging += val, - "data" => metrics.parseable_storage_size.data += val, - _ => {} - }, + MetricType::SimpleGauge(metric_name) => { + Self::process_simple_gauge(metrics, &metric_name, val) + } + MetricType::StorageSize(storage_type) => { + Self::process_storage_size(metrics, &storage_type, val) + } MetricType::DiskUsage(volume_type) => { - let disk_usage = match volume_type.as_str() { - "data" => &mut metrics.parseable_data_disk_usage, - "staging" => &mut metrics.parseable_staging_disk_usage, - "hot_tier" => &mut metrics.parseable_hot_tier_disk_usage, - _ => return, - }; - - match metric_name { - "parseable_total_disk" => disk_usage.total = val as u64, - "parseable_used_disk" => disk_usage.used = val as u64, - "parseable_available_disk" => disk_usage.available = val as u64, - _ => {} - } + Self::process_disk_usage(metrics, &volume_type, val, metric_name) } - MetricType::MemoryUsage(memory_type) => match memory_type.as_str() { - "total_memory" => metrics.parseable_memory_usage.total = val as u64, - "used_memory" => metrics.parseable_memory_usage.used = val as u64, - "total_swap" => metrics.parseable_memory_usage.total_swap = val as u64, - "used_swap" => metrics.parseable_memory_usage.used_swap = val as u64, - _ => {} - }, - MetricType::CpuUsage => { - if let Some(cpu_name) = sample.labels.get("cpu_usage") { - metrics - .parseable_cpu_usage - .insert(cpu_name.to_string(), val); - } + MetricType::MemoryUsage(memory_type) => { + Self::process_memory_usage(metrics, &memory_type, val) + } + MetricType::CpuUsage => Self::process_cpu_usage(metrics, val, sample), + } + } + + fn process_simple_gauge(metrics: &mut Metrics, metric_name: &str, val: f64) { + match metric_name { + "parseable_events_ingested" => metrics.parseable_events_ingested += val, + "parseable_events_ingested_size" => metrics.parseable_events_ingested_size += val, + "parseable_lifetime_events_ingested" => { + metrics.parseable_lifetime_events_ingested += val + } + "parseable_lifetime_events_ingested_size" => { + metrics.parseable_lifetime_events_ingested_size += val } + "parseable_events_deleted" => metrics.parseable_deleted_events_ingested += val, + "parseable_events_deleted_size" => { + metrics.parseable_deleted_events_ingested_size += val + } + "parseable_staging_files" => metrics.parseable_staging_files += val, + "process_resident_memory_bytes" => metrics.process_resident_memory_bytes += val, + _ => {} + } + } + + fn process_storage_size(metrics: &mut Metrics, storage_type: &str, val: f64) { + match storage_type { + "staging" => metrics.parseable_storage_size.staging += val, + "data" => metrics.parseable_storage_size.data += val, + _ => {} + } + } + + fn process_disk_usage(metrics: &mut Metrics, volume_type: &str, val: f64, metric_name: &str) { + let disk_usage = match volume_type { + "data" => &mut metrics.parseable_data_disk_usage, + "staging" => &mut metrics.parseable_staging_disk_usage, + "hot_tier" => &mut metrics.parseable_hot_tier_disk_usage, + _ => return, + }; + + match metric_name { + "parseable_total_disk" => disk_usage.total = val as u64, + "parseable_used_disk" => disk_usage.used = val as u64, + "parseable_available_disk" => disk_usage.available = val as u64, + _ => {} + } + } + + fn process_memory_usage(metrics: &mut Metrics, memory_type: &str, val: f64) { + match memory_type { + "total_memory" => metrics.parseable_memory_usage.total = val as u64, + "used_memory" => metrics.parseable_memory_usage.used = val as u64, + "total_swap" => metrics.parseable_memory_usage.total_swap = val as u64, + "used_swap" => metrics.parseable_memory_usage.used_swap = val as u64, + _ => {} + } + } + + fn process_cpu_usage(metrics: &mut Metrics, val: f64, sample: PromSample) { + if let Some(cpu_name) = sample.labels.get("cpu_usage") { + metrics + .parseable_cpu_usage + .insert(cpu_name.to_string(), val); } } From 6d6703999981ca785c56cd4b2c654be95ec67472 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 22 Apr 2025 01:14:27 -0400 Subject: [PATCH 3/3] merge from main --- src/event/mod.rs | 2 - src/handlers/http/cluster/mod.rs | 31 +++- src/handlers/http/ingest.rs | 39 +---- src/handlers/http/modal/ingest_server.rs | 3 +- src/handlers/http/modal/query_server.rs | 12 +- src/handlers/http/modal/server.rs | 3 + src/metrics/mod.rs | 10 +- src/metrics/prom_utils.rs | 204 ++++------------------- src/parseable/streams.rs | 54 +++--- 9 files changed, 97 insertions(+), 261 deletions(-) diff --git a/src/event/mod.rs b/src/event/mod.rs index b641643cb..46ca44081 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -76,7 +76,6 @@ impl Event { &self.rb, self.parsed_timestamp, &self.custom_partition_values, - self.stream_type, )?; update_stats( @@ -100,7 +99,6 @@ impl Event { &self.rb, self.parsed_timestamp, &self.custom_partition_values, - self.stream_type, )?; Ok(()) diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 828178c30..3c7e736db 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use std::time::Duration; use actix_web::http::header::{self, HeaderMap}; -use actix_web::web::{Json, Path}; +use actix_web::web::Path; use actix_web::Responder; use bytes::Bytes; use chrono::Utc; @@ -41,6 +41,7 @@ use url::Url; use utils::{check_liveness, to_url_string, IngestionStats, QueriedStats, StorageStats}; use crate::handlers::http::ingest::ingest_internal_stream; +use crate::metrics::collect_all_metrics; use crate::metrics::prom_utils::Metrics; use crate::parseable::PARSEABLE; use crate::rbac::role::model::DefaultPrivilege; @@ -999,14 +1000,30 @@ pub async fn init_cluster_metrics_scheduler() -> Result<(), PostError> { .every(CLUSTER_METRICS_INTERVAL_SECONDS) .run(move || async { let result: Result<(), PostError> = async { + if let Err(err) = collect_all_metrics().await { + error!("Error in capturing system metrics: {:#}", err); + } let cluster_metrics = fetch_cluster_metrics().await; if let Ok(metrics) = cluster_metrics { - let json_value = serde_json::to_value(metrics) - .map_err(|e| anyhow::anyhow!("Failed to serialize metrics: {}", e))?; - - ingest_internal_stream(INTERNAL_STREAM_NAME.to_string(), Json(json_value)) - .await - .map_err(|e| anyhow::anyhow!("Failed to ingest metrics: {}", e))?; + if !metrics.is_empty() { + info!("Cluster metrics fetched successfully from all ingestors"); + if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { + if matches!( + ingest_internal_stream( + INTERNAL_STREAM_NAME.to_string(), + bytes::Bytes::from(metrics_bytes), + ) + .await, + Ok(()) + ) { + info!("Cluster metrics successfully ingested into internal stream"); + } else { + error!("Failed to ingest cluster metrics into internal stream"); + } + } else { + error!("Failed to serialize cluster metrics"); + } + } } Ok(()) } diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 257271665..81756eba3 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -18,37 +18,19 @@ use std::collections::{HashMap, HashSet}; -use super::logstream::error::{CreateStreamError, StreamError}; -use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; -use super::users::dashboards::DashboardError; -use super::users::filters::FiltersError; -use crate::event::format::{self, EventFormat, LogSource}; -use crate::event::{self, error::EventError}; -use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage; -use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; -use crate::metadata::error::stream_info::MetadataError; -use crate::metadata::{SchemaVersion, STREAM_INFO}; -use crate::option::{Mode, CONFIG}; -use crate::otel::logs::flatten_otel_logs; -use crate::otel::metrics::flatten_otel_metrics; -use crate::otel::traces::flatten_otel_traces; -use crate::storage::{ObjectStorageError, StreamType}; -use crate::utils::header_parsing::ParseHeaderError; -use crate::utils::json::convert_array_to_object; -use crate::utils::json::flatten::{convert_to_array, JsonFlattenError}; use actix_web::web::{Json, Path}; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_array::RecordBatch; +use bytes::Bytes; use chrono::Utc; use http::StatusCode; use serde_json::Value; use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; -use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; +use crate::event::format::{LogSource, LogSourceEntry}; use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY}; use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; -use crate::metadata::SchemaVersion; use crate::option::Mode; use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST; use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST; @@ -138,26 +120,11 @@ pub async fn ingest( } pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> { - let size: usize = body.len(); let json: Value = serde_json::from_slice(&body)?; - let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw(); let mut p_custom_fields = HashMap::new(); p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string()); p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string()); - // For internal streams, use old schema - format::json::Event::new(json) - .into_event( - stream_name, - size as u64, - &schema, - false, - None, - None, - SchemaVersion::V0, - StreamType::Internal, - &p_custom_fields, - )? - .process()?; + flatten_and_push_logs(json, &stream_name, &LogSource::Pmeta, &p_custom_fields).await?; Ok(()) } diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index e9751ee13..a942efd94 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -31,6 +31,7 @@ use tokio::sync::oneshot; use tokio::sync::OnceCell; use crate::handlers::http::modal::NodeType; +use crate::metrics::init_system_metrics_scheduler; use crate::{ analytics, handlers::{ @@ -119,7 +120,7 @@ impl ParseableServer for IngestServer { thread::spawn(|| sync::handler(cancel_rx)); tokio::spawn(airplane::server()); - + init_system_metrics_scheduler().await?; // Ingestors shouldn't have to deal with OpenId auth flow let result = self.start(shutdown_rx, prometheus.clone(), None).await; // Cancel sync jobs diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 8b684101d..41e9eca8e 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -20,12 +20,13 @@ use std::sync::Arc; use std::thread; use crate::handlers::airplane; -use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; +use crate::handlers::http::cluster::{self, init_cluster_metrics_scheduler}; use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; use crate::handlers::http::{base_path, prism_base_path}; use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::handlers::http::{rbac, role}; use crate::hottier::HotTierManager; +use crate::metrics::init_system_metrics_scheduler; use crate::rbac::role::Action; use crate::{analytics, migration, storage, sync}; use actix_web::web::{resource, ServiceConfig}; @@ -34,7 +35,6 @@ use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; use bytes::Bytes; use tokio::sync::{oneshot, OnceCell}; -use tracing::info; use crate::parseable::PARSEABLE; use crate::Server; @@ -117,17 +117,15 @@ impl ParseableServer for QueryServer { // track all parquet files already in the data directory storage::retention::load_retention_from_global(); - metrics::init_system_metrics_scheduler().await?; - cluster::init_cluster_metrics_scheduler().await?; // all internal data structures populated now. // start the analytics scheduler if enabled if PARSEABLE.options.send_analytics { analytics::init_analytics_scheduler()?; } - if init_cluster_metrics_schedular().is_ok() { - info!("Cluster metrics scheduler started successfully"); - } + init_system_metrics_scheduler().await?; + init_cluster_metrics_scheduler().await?; + if let Some(hot_tier_manager) = HotTierManager::global() { hot_tier_manager.put_internal_stream_hot_tier().await?; hot_tier_manager.download_from_s3()?; diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 1674c3ee2..780b02884 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -30,6 +30,7 @@ use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; use crate::hottier::HotTierManager; use crate::metrics; +use crate::metrics::init_system_metrics_scheduler; use crate::migration; use crate::storage; use crate::sync; @@ -134,6 +135,8 @@ impl ParseableServer for Server { analytics::init_analytics_scheduler()?; } + init_system_metrics_scheduler().await?; + tokio::spawn(handlers::livetail::server()); tokio::spawn(handlers::airplane::server()); diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 063a676fb..aa98798b1 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -26,7 +26,7 @@ use std::{path::Path, time::Duration}; use sysinfo::{Disks, System}; use tracing::{error, info}; -use crate::{handlers::http::metrics_path, option::CONFIG, stats::FullStats}; +use crate::{handlers::http::metrics_path, parseable::PARSEABLE, stats::FullStats}; use actix_web::Responder; use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; use error::MetricsError; @@ -419,14 +419,14 @@ pub async fn collect_all_metrics() -> Result<(), MetricsError> { // Function to collect disk usage metrics async fn collect_disk_metrics() -> Result<(), MetricsError> { // collect staging volume metrics - collect_volume_disk_usage("staging", CONFIG.staging_dir())?; + collect_volume_disk_usage("staging", PARSEABLE.options.staging_dir())?; // Collect data volume metrics for local storage - if CONFIG.get_storage_mode_string() == "Local drive" { - collect_volume_disk_usage("data", Path::new(&CONFIG.storage().get_endpoint()))?; + if PARSEABLE.get_storage_mode_string() == "Local drive" { + collect_volume_disk_usage("data", Path::new(&PARSEABLE.storage().get_endpoint()))?; } // Collect hot tier volume metrics if configured - if let Some(hot_tier_dir) = CONFIG.hot_tier_dir() { + if let Some(hot_tier_dir) = PARSEABLE.hot_tier_dir() { collect_volume_disk_usage("hot_tier", hot_tier_dir)?; } diff --git a/src/metrics/prom_utils.rs b/src/metrics/prom_utils.rs index f655c38f0..5fc4dca61 100644 --- a/src/metrics/prom_utils.rs +++ b/src/metrics/prom_utils.rs @@ -16,15 +16,11 @@ * */ -use std::collections::HashMap; -use std::path::Path; - -use crate::about::current; +use super::DiskMetrics; +use super::MemoryMetrics; use crate::handlers::http::base_path_without_preceding_slash; use crate::handlers::http::ingest::PostError; use crate::handlers::http::modal::Metadata; -use crate::option::Mode; -use crate::parseable::PARSEABLE; use crate::HTTP_CLIENT; use actix_web::http::header; use chrono::NaiveDateTime; @@ -34,15 +30,11 @@ use prometheus_parse::Value as PromValue; use serde::Serialize; use serde_json::Error as JsonError; use serde_json::Value as JsonValue; +use std::collections::HashMap; use tracing::error; use tracing::warn; use url::Url; -use super::get_system_metrics; -use super::get_volume_disk_usage; -use super::DiskMetrics; -use super::MemoryMetrics; - #[derive(Debug, Serialize, Clone)] pub struct Metrics { address: String, @@ -75,38 +67,6 @@ struct StorageMetrics { data: f64, } -impl Default for Metrics { - fn default() -> Self { - // for now it is only for ingestor - let url = PARSEABLE.options.get_url(Mode::Ingest); - let address = format!( - "http://{}:{}", - url.domain() - .unwrap_or(url.host_str().expect("should have a host")), - url.port().unwrap_or_default() - ); - Metrics { - address, - node_type: "ingestor".to_string(), - parseable_events_ingested: 0.0, - parseable_events_ingested_size: 0.0, - parseable_staging_files: 0.0, - process_resident_memory_bytes: 0.0, - parseable_storage_size: StorageMetrics::default(), - parseable_lifetime_events_ingested: 0.0, - parseable_lifetime_events_ingested_size: 0.0, - parseable_deleted_events_ingested: 0.0, - parseable_deleted_events_ingested_size: 0.0, - parseable_deleted_storage_size: StorageMetrics::default(), - parseable_lifetime_storage_size: StorageMetrics::default(), - event_type: "cluster-metrics".to_string(), - event_time: Utc::now().naive_utc(), - commit: "".to_string(), - staging: "".to_string(), - } - } -} - impl Metrics { fn new(address: String, node_type: String) -> Self { Metrics { @@ -206,21 +166,25 @@ impl MetricType { } } impl Metrics { - pub async fn ingestor_prometheus_samples( + pub async fn from_prometheus_samples( samples: Vec, - ingestor_metadata: &IngestorMetadata, + metadata: &T, ) -> Result { - let mut metrics = Metrics::new(ingestor_metadata.domain_name.to_string()); + let mut metrics = Metrics::new( + metadata.domain_name().to_string(), + metadata.node_type().to_string(), + ); Self::build_metrics_from_samples(samples, &mut metrics)?; // Get additional metadata - let (commit_id, staging) = Self::from_about_api_response(ingestor_metadata.clone()) - .await - .map_err(|err| { - error!("Fatal: failed to get ingestor info: {:?}", err); - PostError::Invalid(err.into()) - })?; + let (commit_id, staging) = + Self::from_about_api_response(metadata) + .await + .map_err(|err| { + error!("Fatal: failed to get ingestor info: {:?}", err); + PostError::Invalid(err.into()) + })?; metrics.commit = commit_id; metrics.staging = staging; @@ -228,54 +192,6 @@ impl Metrics { Ok(metrics) } - pub async fn querier_prometheus_metrics() -> Self { - let mut metrics = Metrics::new(get_url().to_string()); - - let system_metrics = get_system_metrics().expect("Failed to get system metrics"); - - metrics.parseable_memory_usage.total = system_metrics.memory.total; - metrics.parseable_memory_usage.used = system_metrics.memory.used; - metrics.parseable_memory_usage.total_swap = system_metrics.memory.total_swap; - metrics.parseable_memory_usage.used_swap = system_metrics.memory.used_swap; - for cpu_usage in system_metrics.cpu { - metrics - .parseable_cpu_usage - .insert(cpu_usage.name.clone(), cpu_usage.usage); - } - - let staging_disk_usage = get_volume_disk_usage(CONFIG.staging_dir()) - .expect("Failed to get staging volume disk usage"); - - metrics.parseable_staging_disk_usage.total = staging_disk_usage.total; - metrics.parseable_staging_disk_usage.used = staging_disk_usage.used; - metrics.parseable_staging_disk_usage.available = staging_disk_usage.available; - - if CONFIG.get_storage_mode_string() == "Local drive" { - let data_disk_usage = - get_volume_disk_usage(Path::new(&CONFIG.storage().get_endpoint())) - .expect("Failed to get data volume disk usage"); - - metrics.parseable_data_disk_usage.total = data_disk_usage.total; - metrics.parseable_data_disk_usage.used = data_disk_usage.used; - metrics.parseable_data_disk_usage.available = data_disk_usage.available; - } - - if CONFIG.options.hot_tier_storage_path.is_some() { - let hot_tier_disk_usage = - get_volume_disk_usage(CONFIG.hot_tier_dir().as_ref().unwrap()) - .expect("Failed to get hot tier volume disk usage"); - - metrics.parseable_hot_tier_disk_usage.total = hot_tier_disk_usage.total; - metrics.parseable_hot_tier_disk_usage.used = hot_tier_disk_usage.used; - metrics.parseable_hot_tier_disk_usage.available = hot_tier_disk_usage.available; - } - - metrics.commit = current().commit_hash; - metrics.staging = CONFIG.staging_dir().display().to_string(); - - metrics - } - fn build_metrics_from_samples( samples: Vec, metrics: &mut Metrics, @@ -311,7 +227,7 @@ impl Metrics { Self::process_simple_gauge(metrics, &metric_name, val) } MetricType::StorageSize(storage_type) => { - Self::process_storage_size(metrics, &storage_type, val) + Self::process_storage_size(metrics, &storage_type, val, metric_name) } MetricType::DiskUsage(volume_type) => { Self::process_disk_usage(metrics, &volume_type, val, metric_name) @@ -343,10 +259,24 @@ impl Metrics { } } - fn process_storage_size(metrics: &mut Metrics, storage_type: &str, val: f64) { + fn process_storage_size( + metrics: &mut Metrics, + storage_type: &str, + val: f64, + metric_name: &str, + ) { + let target = match metric_name { + "parseable_storage_size" => &mut metrics.parseable_storage_size, + "parseable_lifetime_events_storage_size" => { + &mut metrics.parseable_lifetime_storage_size + } + "parseable_deleted_events_storage_size" => &mut metrics.parseable_deleted_storage_size, + _ => return, + }; + match storage_type { - "staging" => metrics.parseable_storage_size.staging += val, - "data" => metrics.parseable_storage_size.data += val, + "staging" => target.staging += val, + "data" => target.data += val, _ => {} } } @@ -426,72 +356,6 @@ impl Metrics { } (events_ingested, ingestion_size, storage_size) } - pub async fn from_prometheus_samples( - samples: Vec, - metadata: &T, - ) -> Result { - let mut prom_dress = Metrics::new( - metadata.domain_name().to_string(), - metadata.node_type().to_string(), - ); - for sample in samples { - if let PromValue::Gauge(val) = sample.value { - match sample.metric.as_str() { - "parseable_events_ingested" => prom_dress.parseable_events_ingested += val, - "parseable_events_ingested_size" => { - prom_dress.parseable_events_ingested_size += val - } - "parseable_lifetime_events_ingested" => { - prom_dress.parseable_lifetime_events_ingested += val - } - "parseable_lifetime_events_ingested_size" => { - prom_dress.parseable_lifetime_events_ingested_size += val - } - "parseable_events_deleted" => { - prom_dress.parseable_deleted_events_ingested += val - } - "parseable_events_deleted_size" => { - prom_dress.parseable_deleted_events_ingested_size += val - } - "parseable_staging_files" => prom_dress.parseable_staging_files += val, - "process_resident_memory_bytes" => { - prom_dress.process_resident_memory_bytes += val - } - "parseable_storage_size" => { - if sample.labels.get("type").expect("type is present") == "staging" { - prom_dress.parseable_storage_size.staging += val; - } - if sample.labels.get("type").expect("type is present") == "data" { - prom_dress.parseable_storage_size.data += val; - } - } - "parseable_lifetime_events_storage_size" => { - if sample.labels.get("type").expect("type is present") == "data" { - prom_dress.parseable_lifetime_storage_size.data += val; - } - } - "parseable_deleted_events_storage_size" => { - if sample.labels.get("type").expect("type is present") == "data" { - prom_dress.parseable_deleted_storage_size.data += val; - } - } - _ => {} - } - } - } - let (commit_id, staging) = - Self::from_about_api_response(metadata) - .await - .map_err(|err| { - error!("Fatal: failed to get server info: {:?}", err); - PostError::Invalid(err.into()) - })?; - - prom_dress.commit = commit_id; - prom_dress.staging = staging; - - Ok(prom_dress) - } pub async fn from_about_api_response( metadata: &T, diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index c67c60043..6f9930c8d 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -52,7 +52,6 @@ use crate::{ }, metadata::{LogStreamMetadata, SchemaVersion}, metrics, - option::Mode, storage::{object_storage::to_bytes, retention::Retention, StreamType}, utils::time::{Minute, TimeRange}, LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY, @@ -130,33 +129,30 @@ impl Stream { record: &RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, - stream_type: StreamType, ) -> Result<(), StagingError> { let mut guard = self.writer.lock().unwrap(); - if self.options.mode != Mode::Query || stream_type == StreamType::Internal { - let filename = - self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values); - match guard.disk.get_mut(&filename) { - Some(writer) => { - writer.write(record)?; - } - None => { - // entry is not present thus we create it - std::fs::create_dir_all(&self.data_path)?; + let filename = + self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values); + match guard.disk.get_mut(&filename) { + Some(writer) => { + writer.write(record)?; + } + None => { + // entry is not present thus we create it + std::fs::create_dir_all(&self.data_path)?; - let range = TimeRange::granularity_range( - parsed_timestamp.and_local_timezone(Utc).unwrap(), - OBJECT_STORE_DATA_GRANULARITY, - ); - let file_path = self.data_path.join(&filename); - let mut writer = DiskWriter::try_new(file_path, &record.schema(), range) - .expect("File and RecordBatch both are checked"); + let range = TimeRange::granularity_range( + parsed_timestamp.and_local_timezone(Utc).unwrap(), + OBJECT_STORE_DATA_GRANULARITY, + ); + let file_path = self.data_path.join(&filename); + let mut writer = DiskWriter::try_new(file_path, &record.schema(), range) + .expect("File and RecordBatch both are checked"); - writer.write(record)?; - guard.disk.insert(filename, writer); - } - }; - } + writer.write(record)?; + guard.disk.insert(filename, writer); + } + }; guard.mem.push(schema_key, record); @@ -1048,15 +1044,7 @@ mod tests { ], ) .unwrap(); - staging - .push( - "abc", - &batch, - time, - &HashMap::new(), - StreamType::UserDefined, - ) - .unwrap(); + staging.push("abc", &batch, time, &HashMap::new()).unwrap(); staging.flush(true); }