From e1f5c5256a5a78d706cf36367702ae502b5b890d Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 29 Mar 2025 14:09:15 -0400 Subject: [PATCH 1/3] add indexer to cluster info and metrics add indexer server info in cluster info api response add indexer server metrics in cluster metrics api response works only with enterprise build --- src/handlers/http/cluster/mod.rs | 305 +++++++++++++++++++++---------- src/handlers/http/modal/mod.rs | 24 +++ src/metrics/prom_utils.rs | 37 ++-- 3 files changed, 254 insertions(+), 112 deletions(-) diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 7062045f0..fd1e4c29c 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -18,6 +18,7 @@ pub mod utils; +use futures::{future, stream, StreamExt}; use std::collections::HashSet; use std::time::Duration; @@ -51,7 +52,7 @@ use crate::HTTP_CLIENT; use super::base_path_without_preceding_slash; use super::ingest::PostError; use super::logstream::error::StreamError; -use super::modal::{IndexerMetadata, IngestorMetadata}; +use super::modal::{IndexerMetadata, IngestorMetadata, Metadata}; use super::rbac::RBACError; use super::role::RoleError; @@ -541,72 +542,118 @@ pub async fn send_retention_cleanup_request( } pub async fn get_cluster_info() -> Result { - let ingestor_infos = get_ingestor_info().await.map_err(|err| { - error!("Fatal: failed to get ingestor info: {:?}", err); - StreamError::Anyhow(err) - })?; + // Get ingestor and indexer metadata concurrently + let (ingestor_result, indexer_result) = + future::join(get_ingestor_info(), get_indexer_info()).await; - let mut infos = vec![]; + // Handle ingestor metadata result + let ingestor_metadata = ingestor_result + .map_err(|err| { + error!("Fatal: failed to get ingestor info: {:?}", err); + PostError::Invalid(err) + }) + .map_err(|err| StreamError::Anyhow(err.into()))?; - for ingestor in ingestor_infos { - let uri = Url::parse(&format!( - "{}{}/about", - ingestor.domain_name, - base_path_without_preceding_slash() - )) - .expect("should always be a valid url"); + // Handle indexer metadata result + let indexer_metadata = indexer_result + .map_err(|err| { + error!("Fatal: failed to get indexer info: {:?}", err); + PostError::Invalid(err) + }) + .map_err(|err| StreamError::Anyhow(err.into()))?; + + // Fetch info for both node types concurrently + let (ingestor_infos, indexer_infos) = future::join( + fetch_servers_info(ingestor_metadata), + fetch_servers_info(indexer_metadata), + ) + .await; + + // Combine results from both node types + let mut infos = Vec::new(); + infos.extend(ingestor_infos?); + infos.extend(indexer_infos?); - let resp = HTTP_CLIENT - .get(uri) - .header(header::AUTHORIZATION, ingestor.token.clone()) - .header(header::CONTENT_TYPE, "application/json") - .send() - .await; + Ok(actix_web::HttpResponse::Ok().json(infos)) +} - let (reachable, staging_path, error, status) = if let Ok(resp) = resp { - let status = Some(resp.status().to_string()); +/// Fetches info for a single server (ingestor or indexer) +async fn fetch_server_info(server: &T) -> Result { + let uri = Url::parse(&format!( + "{}{}/about", + server.domain_name(), + base_path_without_preceding_slash() + )) + .expect("should always be a valid url"); - let resp_data = resp.bytes().await.map_err(|err| { - error!("Fatal: failed to parse ingestor info to bytes: {:?}", err); - StreamError::Network(err) - })?; + let resp = HTTP_CLIENT + .get(uri) + .header(header::AUTHORIZATION, server.token().to_owned()) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await; - let sp = serde_json::from_slice::(&resp_data) - .map_err(|err| { - error!("Fatal: failed to parse ingestor info: {:?}", err); - StreamError::SerdeError(err) - })? - .get("staging") - .ok_or(StreamError::SerdeError(SerdeError::missing_field( - "staging", - )))? - .as_str() - .ok_or(StreamError::SerdeError(SerdeError::custom( - "staging path not a string/ not provided", - )))? - .to_string(); - - (true, sp, None, status) - } else { - ( - false, - "".to_owned(), - resp.as_ref().err().map(|e| e.to_string()), - resp.unwrap_err().status().map(|s| s.to_string()), - ) - }; - - infos.push(utils::ClusterInfo::new( - &ingestor.domain_name, - reachable, - staging_path, - PARSEABLE.storage.get_endpoint(), - error, - status, - )); + let (reachable, staging_path, error, status) = if let Ok(resp) = resp { + let status = Some(resp.status().to_string()); + + let resp_data = resp.bytes().await.map_err(|err| { + error!("Fatal: failed to parse server info to bytes: {:?}", err); + StreamError::Network(err) + })?; + + let sp = serde_json::from_slice::(&resp_data) + .map_err(|err| { + error!("Fatal: failed to parse server info: {:?}", err); + StreamError::SerdeError(err) + })? + .get("staging") + .ok_or(StreamError::SerdeError(SerdeError::missing_field( + "staging", + )))? + .as_str() + .ok_or(StreamError::SerdeError(SerdeError::custom( + "staging path not a string/ not provided", + )))? + .to_string(); + + (true, sp, None, status) + } else { + ( + false, + "".to_owned(), + resp.as_ref().err().map(|e| e.to_string()), + resp.unwrap_err().status().map(|s| s.to_string()), + ) + }; + + Ok(utils::ClusterInfo::new( + server.domain_name(), + reachable, + staging_path, + PARSEABLE.storage.get_endpoint(), + error, + status, + )) +} + +/// Fetches info for multiple servers in parallel +async fn fetch_servers_info( + servers: Vec, +) -> Result, StreamError> { + let servers_len = servers.len(); + let results = stream::iter(servers) + .map(|server| async move { fetch_server_info(&server).await }) + .buffer_unordered(servers_len) // No concurrency limit + .collect::>() + .await; + + // Collect results, propagating any errors + let mut infos = Vec::with_capacity(results.len()); + for result in results { + infos.push(result?); } - Ok(actix_web::HttpResponse::Ok().json(infos)) + Ok(infos) } pub async fn get_cluster_metrics() -> Result { @@ -702,38 +749,37 @@ pub async fn remove_ingestor(ingestor: Path) -> Result Result, PostError> { - let ingestor_metadata = get_ingestor_info().await.map_err(|err| { - error!("Fatal: failed to get ingestor info: {:?}", err); - PostError::Invalid(err) +/// Fetches metrics from a server (ingestor or indexer) +async fn fetch_server_metrics(server: &T) -> Result, PostError> +where + T: Metadata + Send + Sync + 'static, +{ + // Format the metrics URL + let uri = Url::parse(&format!( + "{}{}/metrics", + server.domain_name(), + base_path_without_preceding_slash() + )) + .map_err(|err| { + PostError::Invalid(anyhow::anyhow!("Invalid URL in server metadata: {}", err)) })?; - let mut dresses = vec![]; - - for ingestor in ingestor_metadata { - let uri = Url::parse(&format!( - "{}{}/metrics", - &ingestor.domain_name, - base_path_without_preceding_slash() - )) - .map_err(|err| { - PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err)) - })?; - - // add a check to see if the ingestor is live - if !check_liveness(&ingestor.domain_name).await { - warn!("Ingestor {} is not live", ingestor.domain_name); - continue; - } + // Check if the server is live + if !check_liveness(server.domain_name()).await { + warn!("Server {} is not live", server.domain_name()); + return Ok(None); + } - let res = HTTP_CLIENT - .get(uri) - .header(header::AUTHORIZATION, &ingestor.token) - .header(header::CONTENT_TYPE, "application/json") - .send() - .await; + // Fetch metrics + let res = HTTP_CLIENT + .get(uri) + .header(header::AUTHORIZATION, server.token()) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await; - if let Ok(res) = res { + match res { + Ok(res) => { let text = res.text().await.map_err(PostError::NetworkError)?; let lines: Vec> = text.lines().map(|line| Ok(line.to_owned())).collect_vec(); @@ -741,21 +787,92 @@ async fn fetch_cluster_metrics() -> Result, PostError> { let sample = prometheus_parse::Scrape::parse(lines.into_iter()) .map_err(|err| PostError::CustomError(err.to_string()))? .samples; - let ingestor_metrics = Metrics::from_prometheus_samples(sample, &ingestor) + + let metrics = Metrics::from_prometheus_samples(sample, server) .await .map_err(|err| { - error!("Fatal: failed to get ingestor metrics: {:?}", err); + error!("Fatal: failed to get server metrics: {:?}", err); PostError::Invalid(err.into()) })?; - dresses.push(ingestor_metrics); - } else { + + Ok(Some(metrics)) + } + Err(_) => { warn!( - "Failed to fetch metrics from ingestor: {}\n", - &ingestor.domain_name, + "Failed to fetch metrics from server: {}\n", + server.domain_name() ); + Ok(None) + } + } +} + +/// Fetches metrics from multiple servers in parallel +async fn fetch_servers_metrics(servers: Vec) -> Result, PostError> +where + T: Metadata + Send + Sync + 'static, +{ + let servers_len = servers.len(); + let results = stream::iter(servers) + .map(|server| async move { fetch_server_metrics(&server).await }) + .buffer_unordered(servers_len) // No concurrency limit + .collect::>() + .await; + + // Process results + let mut metrics = Vec::new(); + for result in results { + match result { + Ok(Some(server_metrics)) => metrics.push(server_metrics), + Ok(None) => {} // server was not live or metrics couldn't be fetched + Err(err) => return Err(err), } } - Ok(dresses) + + Ok(metrics) +} + +/// Main function to fetch all cluster metrics, parallelized and refactored +async fn fetch_cluster_metrics() -> Result, PostError> { + // Get ingestor and indexer metadata concurrently + let (ingestor_result, indexer_result) = + future::join(get_ingestor_info(), get_indexer_info()).await; + + // Handle ingestor metadata result + let ingestor_metadata = ingestor_result.map_err(|err| { + error!("Fatal: failed to get ingestor info: {:?}", err); + PostError::Invalid(err) + })?; + + // Handle indexer metadata result + let indexer_metadata = indexer_result.map_err(|err| { + error!("Fatal: failed to get indexer info: {:?}", err); + PostError::Invalid(err) + })?; + + // Fetch metrics from ingestors and indexers concurrently + let (ingestor_metrics, indexer_metrics) = future::join( + fetch_servers_metrics(ingestor_metadata), + fetch_servers_metrics(indexer_metadata), + ) + .await; + + // Combine all metrics + let mut all_metrics = Vec::new(); + + // Add ingestor metrics + match ingestor_metrics { + Ok(metrics) => all_metrics.extend(metrics), + Err(err) => return Err(err), + } + + // Add indexer metrics + match indexer_metrics { + Ok(metrics) => all_metrics.extend(metrics), + Err(err) => return Err(err), + } + + Ok(all_metrics) } pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 6d7679e49..93dc63603 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -549,6 +549,30 @@ impl IndexerMetadata { } } +pub trait Metadata { + fn domain_name(&self) -> &str; + fn token(&self) -> &str; +} + +impl Metadata for IngestorMetadata { + fn domain_name(&self) -> &str { + &self.domain_name + } + + fn token(&self) -> &str { + &self.token + } +} + +impl Metadata for IndexerMetadata { + fn domain_name(&self) -> &str { + &self.domain_name + } + + fn token(&self) -> &str { + &self.token + } +} #[cfg(test)] mod test { use actix_web::body::MessageBody; diff --git a/src/metrics/prom_utils.rs b/src/metrics/prom_utils.rs index 244d43dbb..b10c14a89 100644 --- a/src/metrics/prom_utils.rs +++ b/src/metrics/prom_utils.rs @@ -18,7 +18,7 @@ use crate::handlers::http::base_path_without_preceding_slash; use crate::handlers::http::ingest::PostError; -use crate::handlers::http::modal::IngestorMetadata; +use crate::handlers::http::modal::Metadata; use crate::option::Mode; use crate::parseable::PARSEABLE; use crate::HTTP_CLIENT; @@ -156,11 +156,11 @@ impl Metrics { } (events_ingested, ingestion_size, storage_size) } - pub async fn from_prometheus_samples( + pub async fn from_prometheus_samples( samples: Vec, - ingestor_metadata: &IngestorMetadata, + metadata: &T, ) -> Result { - let mut prom_dress = Metrics::new(ingestor_metadata.domain_name.to_string()); + let mut prom_dress = Metrics::new(metadata.domain_name().to_string()); for sample in samples { if let PromValue::Gauge(val) = sample.value { match sample.metric.as_str() { @@ -206,12 +206,13 @@ impl Metrics { } } } - let (commit_id, staging) = Self::from_about_api_response(ingestor_metadata.clone()) - .await - .map_err(|err| { - error!("Fatal: failed to get ingestor info: {:?}", err); - PostError::Invalid(err.into()) - })?; + let (commit_id, staging) = + Self::from_about_api_response(metadata) + .await + .map_err(|err| { + error!("Fatal: failed to get ingestor info: {:?}", err); + PostError::Invalid(err.into()) + })?; prom_dress.commit = commit_id; prom_dress.staging = staging; @@ -219,12 +220,12 @@ impl Metrics { Ok(prom_dress) } - pub async fn from_about_api_response( - ingestor_metadata: IngestorMetadata, + pub async fn from_about_api_response( + metadata: &T, ) -> Result<(String, String), PostError> { let uri = Url::parse(&format!( "{}{}/about", - &ingestor_metadata.domain_name, + &metadata.domain_name(), base_path_without_preceding_slash() )) .map_err(|err| { @@ -233,7 +234,7 @@ impl Metrics { let res = HTTP_CLIENT .get(uri) .header(header::CONTENT_TYPE, "application/json") - .header(header::AUTHORIZATION, ingestor_metadata.token) + .header(header::AUTHORIZATION, metadata.token()) .send() .await; if let Ok(res) = res { @@ -251,12 +252,12 @@ impl Metrics { Ok((commit_id.to_string(), staging.to_string())) } else { warn!( - "Failed to fetch about API response from ingestor: {}\n", - &ingestor_metadata.domain_name, + "Failed to fetch about API response from server: {}\n", + &metadata.domain_name(), ); Err(PostError::Invalid(anyhow::anyhow!( - "Failed to fetch about API response from ingestor: {}\n", - &ingestor_metadata.domain_name + "Failed to fetch about API response from server: {}\n", + &metadata.domain_name() ))) } } From 819dd983f97e43dd1badacebdaff765f18acf5e8 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 29 Mar 2025 14:18:05 -0400 Subject: [PATCH 2/3] update to server --- src/metrics/prom_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/metrics/prom_utils.rs b/src/metrics/prom_utils.rs index b10c14a89..1ad85afe3 100644 --- a/src/metrics/prom_utils.rs +++ b/src/metrics/prom_utils.rs @@ -210,7 +210,7 @@ impl Metrics { Self::from_about_api_response(metadata) .await .map_err(|err| { - error!("Fatal: failed to get ingestor info: {:?}", err); + error!("Fatal: failed to get server info: {:?}", err); PostError::Invalid(err.into()) })?; From e42b4f798f535b146b38130d6464c9b290744ff6 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 30 Mar 2025 01:24:03 -0400 Subject: [PATCH 3/3] add node type to cluster info and metrics --- src/handlers/http/cluster/mod.rs | 79 +++++++++++++++--------------- src/handlers/http/cluster/utils.rs | 3 ++ src/handlers/http/modal/mod.rs | 7 +++ src/metrics/prom_utils.rs | 10 +++- 4 files changed, 57 insertions(+), 42 deletions(-) diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index fd1e4c29c..0c178190b 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -564,8 +564,8 @@ pub async fn get_cluster_info() -> Result { // Fetch info for both node types concurrently let (ingestor_infos, indexer_infos) = future::join( - fetch_servers_info(ingestor_metadata), - fetch_servers_info(indexer_metadata), + fetch_nodes_info(ingestor_metadata), + fetch_nodes_info(indexer_metadata), ) .await; @@ -577,18 +577,18 @@ pub async fn get_cluster_info() -> Result { Ok(actix_web::HttpResponse::Ok().json(infos)) } -/// Fetches info for a single server (ingestor or indexer) -async fn fetch_server_info(server: &T) -> Result { +/// Fetches info for a single node (ingestor or indexer) +async fn fetch_node_info(node: &T) -> Result { let uri = Url::parse(&format!( "{}{}/about", - server.domain_name(), + node.domain_name(), base_path_without_preceding_slash() )) .expect("should always be a valid url"); let resp = HTTP_CLIENT .get(uri) - .header(header::AUTHORIZATION, server.token().to_owned()) + .header(header::AUTHORIZATION, node.token().to_owned()) .header(header::CONTENT_TYPE, "application/json") .send() .await; @@ -597,13 +597,13 @@ async fn fetch_server_info(server: &T) -> Result(&resp_data) .map_err(|err| { - error!("Fatal: failed to parse server info: {:?}", err); + error!("Fatal: failed to parse node info: {:?}", err); StreamError::SerdeError(err) })? .get("staging") @@ -627,23 +627,24 @@ async fn fetch_server_info(server: &T) -> Result( - servers: Vec, +/// Fetches info for multiple nodes in parallel +async fn fetch_nodes_info( + nodes: Vec, ) -> Result, StreamError> { - let servers_len = servers.len(); - let results = stream::iter(servers) - .map(|server| async move { fetch_server_info(&server).await }) - .buffer_unordered(servers_len) // No concurrency limit + let nodes_len = nodes.len(); + let results = stream::iter(nodes) + .map(|node| async move { fetch_node_info(&node).await }) + .buffer_unordered(nodes_len) // No concurrency limit .collect::>() .await; @@ -749,31 +750,29 @@ pub async fn remove_ingestor(ingestor: Path) -> Result(server: &T) -> Result, PostError> +/// Fetches metrics from a node (ingestor or indexer) +async fn fetch_node_metrics(node: &T) -> Result, PostError> where T: Metadata + Send + Sync + 'static, { // Format the metrics URL let uri = Url::parse(&format!( "{}{}/metrics", - server.domain_name(), + node.domain_name(), base_path_without_preceding_slash() )) - .map_err(|err| { - PostError::Invalid(anyhow::anyhow!("Invalid URL in server metadata: {}", err)) - })?; + .map_err(|err| PostError::Invalid(anyhow::anyhow!("Invalid URL in node metadata: {}", err)))?; - // Check if the server is live - if !check_liveness(server.domain_name()).await { - warn!("Server {} is not live", server.domain_name()); + // Check if the node is live + if !check_liveness(node.domain_name()).await { + warn!("node {} is not live", node.domain_name()); return Ok(None); } // Fetch metrics let res = HTTP_CLIENT .get(uri) - .header(header::AUTHORIZATION, server.token()) + .header(header::AUTHORIZATION, node.token()) .header(header::CONTENT_TYPE, "application/json") .send() .await; @@ -788,10 +787,10 @@ where .map_err(|err| PostError::CustomError(err.to_string()))? .samples; - let metrics = Metrics::from_prometheus_samples(sample, server) + let metrics = Metrics::from_prometheus_samples(sample, node) .await .map_err(|err| { - error!("Fatal: failed to get server metrics: {:?}", err); + error!("Fatal: failed to get node metrics: {:?}", err); PostError::Invalid(err.into()) })?; @@ -799,23 +798,23 @@ where } Err(_) => { warn!( - "Failed to fetch metrics from server: {}\n", - server.domain_name() + "Failed to fetch metrics from node: {}\n", + node.domain_name() ); Ok(None) } } } -/// Fetches metrics from multiple servers in parallel -async fn fetch_servers_metrics(servers: Vec) -> Result, PostError> +/// Fetches metrics from multiple nodes in parallel +async fn fetch_nodes_metrics(nodes: Vec) -> Result, PostError> where T: Metadata + Send + Sync + 'static, { - let servers_len = servers.len(); - let results = stream::iter(servers) - .map(|server| async move { fetch_server_metrics(&server).await }) - .buffer_unordered(servers_len) // No concurrency limit + let nodes_len = nodes.len(); + let results = stream::iter(nodes) + .map(|node| async move { fetch_node_metrics(&node).await }) + .buffer_unordered(nodes_len) // No concurrency limit .collect::>() .await; @@ -823,8 +822,8 @@ where let mut metrics = Vec::new(); for result in results { match result { - Ok(Some(server_metrics)) => metrics.push(server_metrics), - Ok(None) => {} // server was not live or metrics couldn't be fetched + Ok(Some(node_metrics)) => metrics.push(node_metrics), + Ok(None) => {} // node was not live or metrics couldn't be fetched Err(err) => return Err(err), } } @@ -852,8 +851,8 @@ async fn fetch_cluster_metrics() -> Result, PostError> { // Fetch metrics from ingestors and indexers concurrently let (ingestor_metrics, indexer_metrics) = future::join( - fetch_servers_metrics(ingestor_metadata), - fetch_servers_metrics(indexer_metadata), + fetch_nodes_metrics(ingestor_metadata), + fetch_nodes_metrics(indexer_metadata), ) .await; diff --git a/src/handlers/http/cluster/utils.rs b/src/handlers/http/cluster/utils.rs index b88e95bb1..54d451cb7 100644 --- a/src/handlers/http/cluster/utils.rs +++ b/src/handlers/http/cluster/utils.rs @@ -55,6 +55,7 @@ pub struct ClusterInfo { storage_path: String, error: Option, // error message if the ingestor is not reachable status: Option, // status message if the ingestor is reachable + node_type: String, } impl ClusterInfo { @@ -65,6 +66,7 @@ impl ClusterInfo { storage_path: String, error: Option, status: Option, + node_type: &str, ) -> Self { Self { domain_name: domain_name.to_string(), @@ -73,6 +75,7 @@ impl ClusterInfo { storage_path, error, status, + node_type: node_type.to_string(), } } } diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 93dc63603..3a2f236c4 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -552,6 +552,7 @@ impl IndexerMetadata { pub trait Metadata { fn domain_name(&self) -> &str; fn token(&self) -> &str; + fn node_type(&self) -> &str; } impl Metadata for IngestorMetadata { @@ -562,6 +563,9 @@ impl Metadata for IngestorMetadata { fn token(&self) -> &str { &self.token } + fn node_type(&self) -> &str { + "ingestor" + } } impl Metadata for IndexerMetadata { @@ -572,6 +576,9 @@ impl Metadata for IndexerMetadata { fn token(&self) -> &str { &self.token } + fn node_type(&self) -> &str { + "indexer" + } } #[cfg(test)] mod test { diff --git a/src/metrics/prom_utils.rs b/src/metrics/prom_utils.rs index 1ad85afe3..f9f559f5e 100644 --- a/src/metrics/prom_utils.rs +++ b/src/metrics/prom_utils.rs @@ -37,6 +37,7 @@ use url::Url; #[derive(Debug, Serialize, Clone)] pub struct Metrics { address: String, + node_type: String, parseable_events_ingested: f64, // all streams parseable_events_ingested_size: f64, parseable_lifetime_events_ingested: f64, // all streams @@ -72,6 +73,7 @@ impl Default for Metrics { ); Metrics { address, + node_type: "ingestor".to_string(), parseable_events_ingested: 0.0, parseable_events_ingested_size: 0.0, parseable_staging_files: 0.0, @@ -92,9 +94,10 @@ impl Default for Metrics { } impl Metrics { - fn new(address: String) -> Self { + fn new(address: String, node_type: String) -> Self { Metrics { address, + node_type, parseable_events_ingested: 0.0, parseable_events_ingested_size: 0.0, parseable_staging_files: 0.0, @@ -160,7 +163,10 @@ impl Metrics { samples: Vec, metadata: &T, ) -> Result { - let mut prom_dress = Metrics::new(metadata.domain_name().to_string()); + let mut prom_dress = Metrics::new( + metadata.domain_name().to_string(), + metadata.node_type().to_string(), + ); for sample in samples { if let PromValue::Gauge(val) = sample.value { match sample.metric.as_str() {