Skip to content

add indexer to cluster info and metrics #1281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 212 additions & 96 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

pub mod utils;

use futures::{future, stream, StreamExt};
use std::collections::HashSet;
use std::time::Duration;

Expand Down Expand Up @@ -51,7 +52,7 @@ use crate::HTTP_CLIENT;
use super::base_path_without_preceding_slash;
use super::ingest::PostError;
use super::logstream::error::StreamError;
use super::modal::{IndexerMetadata, IngestorMetadata};
use super::modal::{IndexerMetadata, IngestorMetadata, Metadata};
use super::rbac::RBACError;
use super::role::RoleError;

Expand Down Expand Up @@ -541,72 +542,119 @@ pub async fn send_retention_cleanup_request(
}

pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
error!("Fatal: failed to get ingestor info: {:?}", err);
StreamError::Anyhow(err)
})?;
// Get ingestor and indexer metadata concurrently
let (ingestor_result, indexer_result) =
future::join(get_ingestor_info(), get_indexer_info()).await;

let mut infos = vec![];
// Handle ingestor metadata result
let ingestor_metadata = ingestor_result
.map_err(|err| {
error!("Fatal: failed to get ingestor info: {:?}", err);
PostError::Invalid(err)
})
.map_err(|err| StreamError::Anyhow(err.into()))?;

for ingestor in ingestor_infos {
let uri = Url::parse(&format!(
"{}{}/about",
ingestor.domain_name,
base_path_without_preceding_slash()
))
.expect("should always be a valid url");
// Handle indexer metadata result
let indexer_metadata = indexer_result
.map_err(|err| {
error!("Fatal: failed to get indexer info: {:?}", err);
PostError::Invalid(err)
})
.map_err(|err| StreamError::Anyhow(err.into()))?;

// Fetch info for both node types concurrently
let (ingestor_infos, indexer_infos) = future::join(
fetch_nodes_info(ingestor_metadata),
fetch_nodes_info(indexer_metadata),
)
.await;

// Combine results from both node types
let mut infos = Vec::new();
infos.extend(ingestor_infos?);
infos.extend(indexer_infos?);

let resp = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, ingestor.token.clone())
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;
Ok(actix_web::HttpResponse::Ok().json(infos))
}

let (reachable, staging_path, error, status) = if let Ok(resp) = resp {
let status = Some(resp.status().to_string());
/// Fetches info for a single node (ingestor or indexer)
async fn fetch_node_info<T: Metadata>(node: &T) -> Result<utils::ClusterInfo, StreamError> {
let uri = Url::parse(&format!(
"{}{}/about",
node.domain_name(),
base_path_without_preceding_slash()
))
.expect("should always be a valid url");

let resp_data = resp.bytes().await.map_err(|err| {
error!("Fatal: failed to parse ingestor info to bytes: {:?}", err);
StreamError::Network(err)
})?;
let resp = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, node.token().to_owned())
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

let sp = serde_json::from_slice::<JsonValue>(&resp_data)
.map_err(|err| {
error!("Fatal: failed to parse ingestor info: {:?}", err);
StreamError::SerdeError(err)
})?
.get("staging")
.ok_or(StreamError::SerdeError(SerdeError::missing_field(
"staging",
)))?
.as_str()
.ok_or(StreamError::SerdeError(SerdeError::custom(
"staging path not a string/ not provided",
)))?
.to_string();

(true, sp, None, status)
} else {
(
false,
"".to_owned(),
resp.as_ref().err().map(|e| e.to_string()),
resp.unwrap_err().status().map(|s| s.to_string()),
)
};

infos.push(utils::ClusterInfo::new(
&ingestor.domain_name,
reachable,
staging_path,
PARSEABLE.storage.get_endpoint(),
error,
status,
));
let (reachable, staging_path, error, status) = if let Ok(resp) = resp {
let status = Some(resp.status().to_string());

let resp_data = resp.bytes().await.map_err(|err| {
error!("Fatal: failed to parse node info to bytes: {:?}", err);
StreamError::Network(err)
})?;

let sp = serde_json::from_slice::<JsonValue>(&resp_data)
.map_err(|err| {
error!("Fatal: failed to parse node info: {:?}", err);
StreamError::SerdeError(err)
})?
.get("staging")
.ok_or(StreamError::SerdeError(SerdeError::missing_field(
"staging",
)))?
.as_str()
.ok_or(StreamError::SerdeError(SerdeError::custom(
"staging path not a string/ not provided",
)))?
.to_string();

(true, sp, None, status)
} else {
(
false,
"".to_owned(),
resp.as_ref().err().map(|e| e.to_string()),
resp.unwrap_err().status().map(|s| s.to_string()),
)
};

Ok(utils::ClusterInfo::new(
node.domain_name(),
reachable,
staging_path,
PARSEABLE.storage.get_endpoint(),
error,
status,
node.node_type(),
))
}

/// Fetches info for multiple nodes in parallel
async fn fetch_nodes_info<T: Metadata>(
nodes: Vec<T>,
) -> Result<Vec<utils::ClusterInfo>, StreamError> {
let nodes_len = nodes.len();
let results = stream::iter(nodes)
.map(|node| async move { fetch_node_info(&node).await })
.buffer_unordered(nodes_len) // No concurrency limit
.collect::<Vec<_>>()
.await;

// Collect results, propagating any errors
let mut infos = Vec::with_capacity(results.len());
for result in results {
infos.push(result?);
}

Ok(actix_web::HttpResponse::Ok().json(infos))
Ok(infos)
}

pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
Expand Down Expand Up @@ -702,60 +750,128 @@ pub async fn remove_ingestor(ingestor: Path<String>) -> Result<impl Responder, P
Ok((msg, StatusCode::OK))
}

async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
let ingestor_metadata = get_ingestor_info().await.map_err(|err| {
error!("Fatal: failed to get ingestor info: {:?}", err);
PostError::Invalid(err)
})?;

let mut dresses = vec![];

for ingestor in ingestor_metadata {
let uri = Url::parse(&format!(
"{}{}/metrics",
&ingestor.domain_name,
base_path_without_preceding_slash()
))
.map_err(|err| {
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;

// add a check to see if the ingestor is live
if !check_liveness(&ingestor.domain_name).await {
warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
}
/// Fetches metrics from a node (ingestor or indexer)
async fn fetch_node_metrics<T>(node: &T) -> Result<Option<Metrics>, PostError>
where
T: Metadata + Send + Sync + 'static,
{
// Format the metrics URL
let uri = Url::parse(&format!(
"{}{}/metrics",
node.domain_name(),
base_path_without_preceding_slash()
))
.map_err(|err| PostError::Invalid(anyhow::anyhow!("Invalid URL in node metadata: {}", err)))?;

// Check if the node is live
if !check_liveness(node.domain_name()).await {
warn!("node {} is not live", node.domain_name());
return Ok(None);
}

let res = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;
// Fetch metrics
let res = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, node.token())
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

if let Ok(res) = res {
match res {
Ok(res) => {
let text = res.text().await.map_err(PostError::NetworkError)?;
let lines: Vec<Result<String, std::io::Error>> =
text.lines().map(|line| Ok(line.to_owned())).collect_vec();

let sample = prometheus_parse::Scrape::parse(lines.into_iter())
.map_err(|err| PostError::CustomError(err.to_string()))?
.samples;
let ingestor_metrics = Metrics::from_prometheus_samples(sample, &ingestor)

let metrics = Metrics::from_prometheus_samples(sample, node)
.await
.map_err(|err| {
error!("Fatal: failed to get ingestor metrics: {:?}", err);
error!("Fatal: failed to get node metrics: {:?}", err);
PostError::Invalid(err.into())
})?;
dresses.push(ingestor_metrics);
} else {

Ok(Some(metrics))
}
Err(_) => {
warn!(
"Failed to fetch metrics from ingestor: {}\n",
&ingestor.domain_name,
"Failed to fetch metrics from node: {}\n",
node.domain_name()
);
Ok(None)
}
}
}

/// Fetches metrics from multiple nodes in parallel
async fn fetch_nodes_metrics<T>(nodes: Vec<T>) -> Result<Vec<Metrics>, PostError>
where
T: Metadata + Send + Sync + 'static,
{
let nodes_len = nodes.len();
let results = stream::iter(nodes)
.map(|node| async move { fetch_node_metrics(&node).await })
.buffer_unordered(nodes_len) // No concurrency limit
.collect::<Vec<_>>()
.await;

// Process results
let mut metrics = Vec::new();
for result in results {
match result {
Ok(Some(node_metrics)) => metrics.push(node_metrics),
Ok(None) => {} // node was not live or metrics couldn't be fetched
Err(err) => return Err(err),
}
}
Ok(dresses)

Ok(metrics)
}

/// Main function to fetch all cluster metrics, parallelized and refactored
async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
// Get ingestor and indexer metadata concurrently
let (ingestor_result, indexer_result) =
future::join(get_ingestor_info(), get_indexer_info()).await;

// Handle ingestor metadata result
let ingestor_metadata = ingestor_result.map_err(|err| {
error!("Fatal: failed to get ingestor info: {:?}", err);
PostError::Invalid(err)
})?;

// Handle indexer metadata result
let indexer_metadata = indexer_result.map_err(|err| {
error!("Fatal: failed to get indexer info: {:?}", err);
PostError::Invalid(err)
})?;

// Fetch metrics from ingestors and indexers concurrently
let (ingestor_metrics, indexer_metrics) = future::join(
fetch_nodes_metrics(ingestor_metadata),
fetch_nodes_metrics(indexer_metadata),
)
.await;

// Combine all metrics
let mut all_metrics = Vec::new();

// Add ingestor metrics
match ingestor_metrics {
Ok(metrics) => all_metrics.extend(metrics),
Err(err) => return Err(err),
}

// Add indexer metrics
match indexer_metrics {
Ok(metrics) => all_metrics.extend(metrics),
Err(err) => return Err(err),
}

Ok(all_metrics)
}

pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
Expand Down
3 changes: 3 additions & 0 deletions src/handlers/http/cluster/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub struct ClusterInfo {
storage_path: String,
error: Option<String>, // error message if the ingestor is not reachable
status: Option<String>, // status message if the ingestor is reachable
node_type: String,
}

impl ClusterInfo {
Expand All @@ -65,6 +66,7 @@ impl ClusterInfo {
storage_path: String,
error: Option<String>,
status: Option<String>,
node_type: &str,
) -> Self {
Self {
domain_name: domain_name.to_string(),
Expand All @@ -73,6 +75,7 @@ impl ClusterInfo {
storage_path,
error,
status,
node_type: node_type.to_string(),
}
}
}
Expand Down
Loading
Loading