diff --git a/src/event/mod.rs b/src/event/mod.rs index c60f0d057..d1e91b893 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -77,7 +77,6 @@ impl Event { &self.rb, self.parsed_timestamp, &self.custom_partition_values, - self.stream_type, )?; update_stats( @@ -101,7 +100,6 @@ impl Event { &self.rb, self.parsed_timestamp, &self.custom_partition_values, - self.stream_type, )?; Ok(()) diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index c1cd46b6c..3c7e736db 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -41,6 +41,7 @@ use url::Url; use utils::{check_liveness, to_url_string, IngestionStats, QueriedStats, StorageStats}; use crate::handlers::http::ingest::ingest_internal_stream; +use crate::metrics::collect_all_metrics; use crate::metrics::prom_utils::Metrics; use crate::parseable::PARSEABLE; use crate::rbac::role::model::DefaultPrivilege; @@ -867,7 +868,6 @@ where let text = res.text().await.map_err(PostError::NetworkError)?; let lines: Vec> = text.lines().map(|line| Ok(line.to_owned())).collect_vec(); - let sample = prometheus_parse::Scrape::parse(lines.into_iter()) .map_err(|err| PostError::CustomError(err.to_string()))? .samples; @@ -993,13 +993,16 @@ async fn fetch_cluster_metrics() -> Result, PostError> { Ok(all_metrics) } -pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { +pub async fn init_cluster_metrics_scheduler() -> Result<(), PostError> { info!("Setting up schedular for cluster metrics ingestion"); let mut scheduler = AsyncScheduler::new(); scheduler .every(CLUSTER_METRICS_INTERVAL_SECONDS) .run(move || async { let result: Result<(), PostError> = async { + if let Err(err) = collect_all_metrics().await { + error!("Error in capturing system metrics: {:#}", err); + } let cluster_metrics = fetch_cluster_metrics().await; if let Ok(metrics) = cluster_metrics { if !metrics.is_empty() { diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index a742057f9..ddaa6af48 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -28,10 +28,9 @@ use serde_json::Value; use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; -use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; +use crate::event::format::{LogSource, LogSourceEntry}; use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY}; use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; -use crate::metadata::SchemaVersion; use crate::option::Mode; use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST; use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST; @@ -126,26 +125,11 @@ pub async fn ingest( } pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> { - let size: usize = body.len(); let json: Value = serde_json::from_slice(&body)?; - let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw(); let mut p_custom_fields = HashMap::new(); p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string()); p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string()); - // For internal streams, use old schema - format::json::Event::new(json) - .into_event( - stream_name, - size as u64, - &schema, - false, - None, - None, - SchemaVersion::V0, - StreamType::Internal, - &p_custom_fields, - )? - .process()?; + flatten_and_push_logs(json, &stream_name, &LogSource::Pmeta, &p_custom_fields).await?; Ok(()) } diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index e9751ee13..a942efd94 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -31,6 +31,7 @@ use tokio::sync::oneshot; use tokio::sync::OnceCell; use crate::handlers::http::modal::NodeType; +use crate::metrics::init_system_metrics_scheduler; use crate::{ analytics, handlers::{ @@ -119,7 +120,7 @@ impl ParseableServer for IngestServer { thread::spawn(|| sync::handler(cancel_rx)); tokio::spawn(airplane::server()); - + init_system_metrics_scheduler().await?; // Ingestors shouldn't have to deal with OpenId auth flow let result = self.start(shutdown_rx, prometheus.clone(), None).await; // Cancel sync jobs diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index b138a292c..41e9eca8e 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -20,12 +20,13 @@ use std::sync::Arc; use std::thread; use crate::handlers::airplane; -use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; +use crate::handlers::http::cluster::{self, init_cluster_metrics_scheduler}; use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; use crate::handlers::http::{base_path, prism_base_path}; use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::handlers::http::{rbac, role}; use crate::hottier::HotTierManager; +use crate::metrics::init_system_metrics_scheduler; use crate::rbac::role::Action; use crate::{analytics, migration, storage, sync}; use actix_web::web::{resource, ServiceConfig}; @@ -34,7 +35,6 @@ use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; use bytes::Bytes; use tokio::sync::{oneshot, OnceCell}; -use tracing::info; use crate::parseable::PARSEABLE; use crate::Server; @@ -123,9 +123,9 @@ impl ParseableServer for QueryServer { analytics::init_analytics_scheduler()?; } - if init_cluster_metrics_schedular().is_ok() { - info!("Cluster metrics scheduler started successfully"); - } + init_system_metrics_scheduler().await?; + init_cluster_metrics_scheduler().await?; + if let Some(hot_tier_manager) = HotTierManager::global() { hot_tier_manager.put_internal_stream_hot_tier().await?; hot_tier_manager.download_from_s3()?; diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 1674c3ee2..780b02884 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -30,6 +30,7 @@ use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; use crate::hottier::HotTierManager; use crate::metrics; +use crate::metrics::init_system_metrics_scheduler; use crate::migration; use crate::storage; use crate::sync; @@ -134,6 +135,8 @@ impl ParseableServer for Server { analytics::init_analytics_scheduler()?; } + init_system_metrics_scheduler().await?; + tokio::spawn(handlers::livetail::server()); tokio::spawn(handlers::airplane::server()); diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 1896bce0c..aa98798b1 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -18,15 +18,25 @@ pub mod prom_utils; pub mod storage; - -use crate::{handlers::http::metrics_path, stats::FullStats}; +use actix_web::HttpResponse; +use clokwerk::{AsyncScheduler, Interval}; +use http::StatusCode; +use serde::Serialize; +use std::{path::Path, time::Duration}; +use sysinfo::{Disks, System}; +use tracing::{error, info}; + +use crate::{handlers::http::metrics_path, parseable::PARSEABLE, stats::FullStats}; use actix_web::Responder; use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; use error::MetricsError; use once_cell::sync::Lazy; -use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry}; +use prometheus::{ + GaugeVec, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry, +}; pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME"); +const SYSTEM_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1); pub static EVENTS_INGESTED: Lazy = Lazy::new(|| { IntGaugeVec::new( @@ -182,6 +192,42 @@ pub static ALERTS_STATES: Lazy = Lazy::new(|| { .expect("metric can be created") }); +pub static TOTAL_DISK: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("total_disk", "Total Disk Size").namespace(METRICS_NAMESPACE), + &["volume"], + ) + .expect("metric can be created") +}); +pub static USED_DISK: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("used_disk", "Used Disk Size").namespace(METRICS_NAMESPACE), + &["volume"], + ) + .expect("metric can be created") +}); +pub static AVAILABLE_DISK: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("available_disk", "Available Disk Size").namespace(METRICS_NAMESPACE), + &["volume"], + ) + .expect("metric can be created") +}); +pub static MEMORY: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("memory_usage", "Memory Usage").namespace(METRICS_NAMESPACE), + &["memory_usage"], + ) + .expect("metric can be created") +}); +pub static CPU: Lazy = Lazy::new(|| { + GaugeVec::new( + Opts::new("cpu_usage", "CPU Usage").namespace(METRICS_NAMESPACE), + &["cpu_usage"], + ) + .expect("metric can be created") +}); + fn custom_metrics(registry: &Registry) { registry .register(Box::new(EVENTS_INGESTED.clone())) @@ -231,6 +277,21 @@ fn custom_metrics(registry: &Registry) { registry .register(Box::new(ALERTS_STATES.clone())) .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_DISK.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(USED_DISK.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(AVAILABLE_DISK.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(MEMORY.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(CPU.clone())) + .expect("metric can be registered"); } pub fn build_metrics_handler() -> PrometheusMetrics { @@ -290,12 +351,196 @@ pub async fn fetch_stats_from_storage(stream_name: &str, stats: FullStats) { .set(stats.lifetime_stats.storage as i64); } -use actix_web::HttpResponse; - pub async fn get() -> Result { Ok(HttpResponse::Ok().body(format!("{:?}", build_metrics_handler()))) } +#[derive(Debug, Serialize, Default, Clone)] +pub struct DiskMetrics { + total: u64, + used: u64, + available: u64, +} + +#[derive(Debug, Serialize, Default, Clone)] +pub struct SystemMetrics { + memory: MemoryMetrics, + cpu: Vec, +} + +#[derive(Debug, Serialize, Default, Clone)] +pub struct MemoryMetrics { + total: u64, + used: u64, + total_swap: u64, + used_swap: u64, +} + +#[derive(Debug, Serialize, Default, Clone)] +pub struct CpuMetrics { + name: String, + usage: f64, +} + +// Scheduler for collecting all system metrics +pub async fn init_system_metrics_scheduler() -> Result<(), MetricsError> { + info!("Setting up scheduler for capturing system metrics"); + let mut scheduler = AsyncScheduler::new(); + + scheduler + .every(SYSTEM_METRICS_INTERVAL_SECONDS) + .run(move || async { + if let Err(err) = collect_all_metrics().await { + error!("Error in capturing system metrics: {:#}", err); + } + }); + + tokio::spawn(async move { + loop { + scheduler.run_pending().await; + tokio::time::sleep(Duration::from_secs(10)).await; + } + }); + + Ok(()) +} + +// Function to collect memory, CPU and disk usage metrics +pub async fn collect_all_metrics() -> Result<(), MetricsError> { + // Collect system metrics (CPU and memory) + collect_system_metrics().await?; + + // Collect disk metrics for all volumes + collect_disk_metrics().await?; + + Ok(()) +} + +// Function to collect disk usage metrics +async fn collect_disk_metrics() -> Result<(), MetricsError> { + // collect staging volume metrics + collect_volume_disk_usage("staging", PARSEABLE.options.staging_dir())?; + // Collect data volume metrics for local storage + if PARSEABLE.get_storage_mode_string() == "Local drive" { + collect_volume_disk_usage("data", Path::new(&PARSEABLE.storage().get_endpoint()))?; + } + + // Collect hot tier volume metrics if configured + if let Some(hot_tier_dir) = PARSEABLE.hot_tier_dir() { + collect_volume_disk_usage("hot_tier", hot_tier_dir)?; + } + + Ok(()) +} + +// Function to collect disk usage metrics for a specific volume +fn collect_volume_disk_usage(label: &str, path: &Path) -> Result<(), MetricsError> { + let metrics = get_volume_disk_usage(path)?; + + TOTAL_DISK + .with_label_values(&[label]) + .set(metrics.total as i64); + USED_DISK + .with_label_values(&[label]) + .set(metrics.used as i64); + AVAILABLE_DISK + .with_label_values(&[label]) + .set(metrics.available as i64); + + Ok(()) +} + +// Function to get disk usage for a specific volume +fn get_volume_disk_usage(path: &Path) -> Result { + let mut disks = Disks::new_with_refreshed_list(); + disks.sort_by(|a, b| { + b.mount_point() + .to_str() + .unwrap_or("") + .len() + .cmp(&a.mount_point().to_str().unwrap_or("").len()) + }); + + for disk in disks.iter() { + let mount_point = disk.mount_point().to_str().unwrap(); + + if path.starts_with(mount_point) { + return Ok(DiskMetrics { + total: disk.total_space(), + used: disk.total_space() - disk.available_space(), + available: disk.available_space(), + }); + } + } + + Err(MetricsError::Custom( + format!("No matching disk found for path: {:?}", path), + StatusCode::INTERNAL_SERVER_ERROR, + )) +} + +// Function to collect CPU and memory usage metrics +async fn collect_system_metrics() -> Result<(), MetricsError> { + let metrics = get_system_metrics()?; + + // Set memory metrics + MEMORY + .with_label_values(&["total_memory"]) + .set(metrics.memory.total as i64); + MEMORY + .with_label_values(&["used_memory"]) + .set(metrics.memory.used as i64); + MEMORY + .with_label_values(&["total_swap"]) + .set(metrics.memory.total_swap as i64); + MEMORY + .with_label_values(&["used_swap"]) + .set(metrics.memory.used_swap as i64); + + // Set CPU metrics + for cpu in metrics.cpu { + CPU.with_label_values(&[&cpu.name]).set(cpu.usage); + } + + Ok(()) +} + +// Get system metrics +fn get_system_metrics() -> Result { + let mut sys = System::new_all(); + sys.refresh_all(); + + // Collect memory metrics + let memory = MemoryMetrics { + total: sys.total_memory(), + used: sys.used_memory(), + total_swap: sys.total_swap(), + used_swap: sys.used_swap(), + }; + + // Collect CPU metrics + let mut cpu_metrics = Vec::new(); + + // Add global CPU usage + cpu_metrics.push(CpuMetrics { + name: "global".to_string(), + usage: sys.global_cpu_usage() as f64, + }); + + // Add individual CPU usage + for cpu in sys.cpus() { + cpu_metrics.push(CpuMetrics { + name: cpu.name().to_string(), + usage: cpu.cpu_usage() as f64, + }); + } + + Ok(SystemMetrics { + memory, + cpu: cpu_metrics, + }) +} + pub mod error { use actix_web::http::header::ContentType; diff --git a/src/metrics/prom_utils.rs b/src/metrics/prom_utils.rs index f9f559f5e..5fc4dca61 100644 --- a/src/metrics/prom_utils.rs +++ b/src/metrics/prom_utils.rs @@ -16,11 +16,11 @@ * */ +use super::DiskMetrics; +use super::MemoryMetrics; use crate::handlers::http::base_path_without_preceding_slash; use crate::handlers::http::ingest::PostError; use crate::handlers::http::modal::Metadata; -use crate::option::Mode; -use crate::parseable::PARSEABLE; use crate::HTTP_CLIENT; use actix_web::http::header; use chrono::NaiveDateTime; @@ -30,6 +30,7 @@ use prometheus_parse::Value as PromValue; use serde::Serialize; use serde_json::Error as JsonError; use serde_json::Value as JsonValue; +use std::collections::HashMap; use tracing::error; use tracing::warn; use url::Url; @@ -53,6 +54,11 @@ pub struct Metrics { event_time: NaiveDateTime, commit: String, staging: String, + parseable_data_disk_usage: DiskMetrics, + parseable_staging_disk_usage: DiskMetrics, + parseable_hot_tier_disk_usage: DiskMetrics, + parseable_memory_usage: MemoryMetrics, + parseable_cpu_usage: HashMap, } #[derive(Debug, Serialize, Default, Clone)] @@ -61,38 +67,6 @@ struct StorageMetrics { data: f64, } -impl Default for Metrics { - fn default() -> Self { - // for now it is only for ingestor - let url = PARSEABLE.options.get_url(Mode::Ingest); - let address = format!( - "http://{}:{}", - url.domain() - .unwrap_or(url.host_str().expect("should have a host")), - url.port().unwrap_or_default() - ); - Metrics { - address, - node_type: "ingestor".to_string(), - parseable_events_ingested: 0.0, - parseable_events_ingested_size: 0.0, - parseable_staging_files: 0.0, - process_resident_memory_bytes: 0.0, - parseable_storage_size: StorageMetrics::default(), - parseable_lifetime_events_ingested: 0.0, - parseable_lifetime_events_ingested_size: 0.0, - parseable_deleted_events_ingested: 0.0, - parseable_deleted_events_ingested_size: 0.0, - parseable_deleted_storage_size: StorageMetrics::default(), - parseable_lifetime_storage_size: StorageMetrics::default(), - event_type: "cluster-metrics".to_string(), - event_time: Utc::now().naive_utc(), - commit: "".to_string(), - staging: "".to_string(), - } - } -} - impl Metrics { fn new(address: String, node_type: String) -> Self { Metrics { @@ -113,11 +87,234 @@ impl Metrics { event_time: Utc::now().naive_utc(), commit: "".to_string(), staging: "".to_string(), + parseable_data_disk_usage: DiskMetrics { + total: 0, + used: 0, + available: 0, + }, + parseable_staging_disk_usage: DiskMetrics { + total: 0, + used: 0, + available: 0, + }, + parseable_hot_tier_disk_usage: DiskMetrics { + total: 0, + used: 0, + available: 0, + }, + parseable_memory_usage: MemoryMetrics { + total: 0, + used: 0, + total_swap: 0, + used_swap: 0, + }, + parseable_cpu_usage: HashMap::new(), } } } +#[derive(Debug)] +enum MetricType { + SimpleGauge(String), + StorageSize(String), + DiskUsage(String), + MemoryUsage(String), + CpuUsage, +} + +impl MetricType { + fn from_metric(metric: &str, labels: &HashMap) -> Option { + match metric { + "parseable_events_ingested" => { + Some(Self::SimpleGauge("parseable_events_ingested".into())) + } + "parseable_events_ingested_size" => { + Some(Self::SimpleGauge("parseable_events_ingested_size".into())) + } + "parseable_lifetime_events_ingested" => Some(Self::SimpleGauge( + "parseable_lifetime_events_ingested".into(), + )), + "parseable_lifetime_events_ingested_size" => Some(Self::SimpleGauge( + "parseable_lifetime_events_ingested_size".into(), + )), + "parseable_events_deleted" => { + Some(Self::SimpleGauge("parseable_events_deleted".into())) + } + "parseable_events_deleted_size" => { + Some(Self::SimpleGauge("parseable_events_deleted_size".into())) + } + "parseable_staging_files" => Some(Self::SimpleGauge("parseable_staging_files".into())), + "process_resident_memory_bytes" => { + Some(Self::SimpleGauge("process_resident_memory_bytes".into())) + } + "parseable_storage_size" => labels.get("type").map(|t| Self::StorageSize(t.clone())), + "parseable_lifetime_events_storage_size" => { + labels.get("type").map(|t| Self::StorageSize(t.clone())) + } + "parseable_deleted_events_storage_size" => { + labels.get("type").map(|t| Self::StorageSize(t.clone())) + } + "parseable_total_disk" | "parseable_used_disk" | "parseable_available_disk" => { + labels.get("volume").map(|v| Self::DiskUsage(v.clone())) + } + "parseable_memory_usage" => labels + .get("memory_usage") + .map(|m| Self::MemoryUsage(m.clone())), + "parseable_cpu_usage" => Some(Self::CpuUsage), + _ => None, + } + } +} impl Metrics { + pub async fn from_prometheus_samples( + samples: Vec, + metadata: &T, + ) -> Result { + let mut metrics = Metrics::new( + metadata.domain_name().to_string(), + metadata.node_type().to_string(), + ); + + Self::build_metrics_from_samples(samples, &mut metrics)?; + + // Get additional metadata + let (commit_id, staging) = + Self::from_about_api_response(metadata) + .await + .map_err(|err| { + error!("Fatal: failed to get ingestor info: {:?}", err); + PostError::Invalid(err.into()) + })?; + + metrics.commit = commit_id; + metrics.staging = staging; + + Ok(metrics) + } + + fn build_metrics_from_samples( + samples: Vec, + metrics: &mut Metrics, + ) -> Result<(), PostError> { + for sample in samples { + let metric_type = MetricType::from_metric(&sample.metric, &sample.labels); + + match (sample.value.clone(), metric_type) { + (PromValue::Gauge(val), Some(metric_type)) => { + Self::process_gauge_metric( + metrics, + metric_type, + val, + &sample.metric, + sample.clone(), + ); + } + _ => continue, + } + } + Ok(()) + } + + fn process_gauge_metric( + metrics: &mut Metrics, + metric_type: MetricType, + val: f64, + metric_name: &str, + sample: PromSample, + ) { + match metric_type { + MetricType::SimpleGauge(metric_name) => { + Self::process_simple_gauge(metrics, &metric_name, val) + } + MetricType::StorageSize(storage_type) => { + Self::process_storage_size(metrics, &storage_type, val, metric_name) + } + MetricType::DiskUsage(volume_type) => { + Self::process_disk_usage(metrics, &volume_type, val, metric_name) + } + MetricType::MemoryUsage(memory_type) => { + Self::process_memory_usage(metrics, &memory_type, val) + } + MetricType::CpuUsage => Self::process_cpu_usage(metrics, val, sample), + } + } + + fn process_simple_gauge(metrics: &mut Metrics, metric_name: &str, val: f64) { + match metric_name { + "parseable_events_ingested" => metrics.parseable_events_ingested += val, + "parseable_events_ingested_size" => metrics.parseable_events_ingested_size += val, + "parseable_lifetime_events_ingested" => { + metrics.parseable_lifetime_events_ingested += val + } + "parseable_lifetime_events_ingested_size" => { + metrics.parseable_lifetime_events_ingested_size += val + } + "parseable_events_deleted" => metrics.parseable_deleted_events_ingested += val, + "parseable_events_deleted_size" => { + metrics.parseable_deleted_events_ingested_size += val + } + "parseable_staging_files" => metrics.parseable_staging_files += val, + "process_resident_memory_bytes" => metrics.process_resident_memory_bytes += val, + _ => {} + } + } + + fn process_storage_size( + metrics: &mut Metrics, + storage_type: &str, + val: f64, + metric_name: &str, + ) { + let target = match metric_name { + "parseable_storage_size" => &mut metrics.parseable_storage_size, + "parseable_lifetime_events_storage_size" => { + &mut metrics.parseable_lifetime_storage_size + } + "parseable_deleted_events_storage_size" => &mut metrics.parseable_deleted_storage_size, + _ => return, + }; + + match storage_type { + "staging" => target.staging += val, + "data" => target.data += val, + _ => {} + } + } + + fn process_disk_usage(metrics: &mut Metrics, volume_type: &str, val: f64, metric_name: &str) { + let disk_usage = match volume_type { + "data" => &mut metrics.parseable_data_disk_usage, + "staging" => &mut metrics.parseable_staging_disk_usage, + "hot_tier" => &mut metrics.parseable_hot_tier_disk_usage, + _ => return, + }; + + match metric_name { + "parseable_total_disk" => disk_usage.total = val as u64, + "parseable_used_disk" => disk_usage.used = val as u64, + "parseable_available_disk" => disk_usage.available = val as u64, + _ => {} + } + } + + fn process_memory_usage(metrics: &mut Metrics, memory_type: &str, val: f64) { + match memory_type { + "total_memory" => metrics.parseable_memory_usage.total = val as u64, + "used_memory" => metrics.parseable_memory_usage.used = val as u64, + "total_swap" => metrics.parseable_memory_usage.total_swap = val as u64, + "used_swap" => metrics.parseable_memory_usage.used_swap = val as u64, + _ => {} + } + } + + fn process_cpu_usage(metrics: &mut Metrics, val: f64, sample: PromSample) { + if let Some(cpu_name) = sample.labels.get("cpu_usage") { + metrics + .parseable_cpu_usage + .insert(cpu_name.to_string(), val); + } + } + pub fn get_daily_stats_from_samples( samples: Vec, stream_name: &str, @@ -159,72 +356,6 @@ impl Metrics { } (events_ingested, ingestion_size, storage_size) } - pub async fn from_prometheus_samples( - samples: Vec, - metadata: &T, - ) -> Result { - let mut prom_dress = Metrics::new( - metadata.domain_name().to_string(), - metadata.node_type().to_string(), - ); - for sample in samples { - if let PromValue::Gauge(val) = sample.value { - match sample.metric.as_str() { - "parseable_events_ingested" => prom_dress.parseable_events_ingested += val, - "parseable_events_ingested_size" => { - prom_dress.parseable_events_ingested_size += val - } - "parseable_lifetime_events_ingested" => { - prom_dress.parseable_lifetime_events_ingested += val - } - "parseable_lifetime_events_ingested_size" => { - prom_dress.parseable_lifetime_events_ingested_size += val - } - "parseable_events_deleted" => { - prom_dress.parseable_deleted_events_ingested += val - } - "parseable_events_deleted_size" => { - prom_dress.parseable_deleted_events_ingested_size += val - } - "parseable_staging_files" => prom_dress.parseable_staging_files += val, - "process_resident_memory_bytes" => { - prom_dress.process_resident_memory_bytes += val - } - "parseable_storage_size" => { - if sample.labels.get("type").expect("type is present") == "staging" { - prom_dress.parseable_storage_size.staging += val; - } - if sample.labels.get("type").expect("type is present") == "data" { - prom_dress.parseable_storage_size.data += val; - } - } - "parseable_lifetime_events_storage_size" => { - if sample.labels.get("type").expect("type is present") == "data" { - prom_dress.parseable_lifetime_storage_size.data += val; - } - } - "parseable_deleted_events_storage_size" => { - if sample.labels.get("type").expect("type is present") == "data" { - prom_dress.parseable_deleted_storage_size.data += val; - } - } - _ => {} - } - } - } - let (commit_id, staging) = - Self::from_about_api_response(metadata) - .await - .map_err(|err| { - error!("Fatal: failed to get server info: {:?}", err); - PostError::Invalid(err.into()) - })?; - - prom_dress.commit = commit_id; - prom_dress.staging = staging; - - Ok(prom_dress) - } pub async fn from_about_api_response( metadata: &T, diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index c67c60043..6f9930c8d 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -52,7 +52,6 @@ use crate::{ }, metadata::{LogStreamMetadata, SchemaVersion}, metrics, - option::Mode, storage::{object_storage::to_bytes, retention::Retention, StreamType}, utils::time::{Minute, TimeRange}, LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY, @@ -130,33 +129,30 @@ impl Stream { record: &RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, - stream_type: StreamType, ) -> Result<(), StagingError> { let mut guard = self.writer.lock().unwrap(); - if self.options.mode != Mode::Query || stream_type == StreamType::Internal { - let filename = - self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values); - match guard.disk.get_mut(&filename) { - Some(writer) => { - writer.write(record)?; - } - None => { - // entry is not present thus we create it - std::fs::create_dir_all(&self.data_path)?; + let filename = + self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values); + match guard.disk.get_mut(&filename) { + Some(writer) => { + writer.write(record)?; + } + None => { + // entry is not present thus we create it + std::fs::create_dir_all(&self.data_path)?; - let range = TimeRange::granularity_range( - parsed_timestamp.and_local_timezone(Utc).unwrap(), - OBJECT_STORE_DATA_GRANULARITY, - ); - let file_path = self.data_path.join(&filename); - let mut writer = DiskWriter::try_new(file_path, &record.schema(), range) - .expect("File and RecordBatch both are checked"); + let range = TimeRange::granularity_range( + parsed_timestamp.and_local_timezone(Utc).unwrap(), + OBJECT_STORE_DATA_GRANULARITY, + ); + let file_path = self.data_path.join(&filename); + let mut writer = DiskWriter::try_new(file_path, &record.schema(), range) + .expect("File and RecordBatch both are checked"); - writer.write(record)?; - guard.disk.insert(filename, writer); - } - }; - } + writer.write(record)?; + guard.disk.insert(filename, writer); + } + }; guard.mem.push(schema_key, record); @@ -1048,15 +1044,7 @@ mod tests { ], ) .unwrap(); - staging - .push( - "abc", - &batch, - time, - &HashMap::new(), - StreamType::UserDefined, - ) - .unwrap(); + staging.push("abc", &batch, time, &HashMap::new()).unwrap(); staging.flush(true); }