Skip to content

Commit e1f5c52

Browse files
add indexer to cluster info and metrics
add indexer server info in cluster info api response add indexer server metrics in cluster metrics api response works only with enterprise build
1 parent 7adcf12 commit e1f5c52

File tree

3 files changed

+254
-112
lines changed

3 files changed

+254
-112
lines changed

src/handlers/http/cluster/mod.rs

Lines changed: 211 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
pub mod utils;
2020

21+
use futures::{future, stream, StreamExt};
2122
use std::collections::HashSet;
2223
use std::time::Duration;
2324

@@ -51,7 +52,7 @@ use crate::HTTP_CLIENT;
5152
use super::base_path_without_preceding_slash;
5253
use super::ingest::PostError;
5354
use super::logstream::error::StreamError;
54-
use super::modal::{IndexerMetadata, IngestorMetadata};
55+
use super::modal::{IndexerMetadata, IngestorMetadata, Metadata};
5556
use super::rbac::RBACError;
5657
use super::role::RoleError;
5758

@@ -541,72 +542,118 @@ pub async fn send_retention_cleanup_request(
541542
}
542543

543544
pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
544-
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
545-
error!("Fatal: failed to get ingestor info: {:?}", err);
546-
StreamError::Anyhow(err)
547-
})?;
545+
// Get ingestor and indexer metadata concurrently
546+
let (ingestor_result, indexer_result) =
547+
future::join(get_ingestor_info(), get_indexer_info()).await;
548548

549-
let mut infos = vec![];
549+
// Handle ingestor metadata result
550+
let ingestor_metadata = ingestor_result
551+
.map_err(|err| {
552+
error!("Fatal: failed to get ingestor info: {:?}", err);
553+
PostError::Invalid(err)
554+
})
555+
.map_err(|err| StreamError::Anyhow(err.into()))?;
550556

551-
for ingestor in ingestor_infos {
552-
let uri = Url::parse(&format!(
553-
"{}{}/about",
554-
ingestor.domain_name,
555-
base_path_without_preceding_slash()
556-
))
557-
.expect("should always be a valid url");
557+
// Handle indexer metadata result
558+
let indexer_metadata = indexer_result
559+
.map_err(|err| {
560+
error!("Fatal: failed to get indexer info: {:?}", err);
561+
PostError::Invalid(err)
562+
})
563+
.map_err(|err| StreamError::Anyhow(err.into()))?;
564+
565+
// Fetch info for both node types concurrently
566+
let (ingestor_infos, indexer_infos) = future::join(
567+
fetch_servers_info(ingestor_metadata),
568+
fetch_servers_info(indexer_metadata),
569+
)
570+
.await;
571+
572+
// Combine results from both node types
573+
let mut infos = Vec::new();
574+
infos.extend(ingestor_infos?);
575+
infos.extend(indexer_infos?);
558576

559-
let resp = HTTP_CLIENT
560-
.get(uri)
561-
.header(header::AUTHORIZATION, ingestor.token.clone())
562-
.header(header::CONTENT_TYPE, "application/json")
563-
.send()
564-
.await;
577+
Ok(actix_web::HttpResponse::Ok().json(infos))
578+
}
565579

566-
let (reachable, staging_path, error, status) = if let Ok(resp) = resp {
567-
let status = Some(resp.status().to_string());
580+
/// Fetches info for a single server (ingestor or indexer)
581+
async fn fetch_server_info<T: Metadata>(server: &T) -> Result<utils::ClusterInfo, StreamError> {
582+
let uri = Url::parse(&format!(
583+
"{}{}/about",
584+
server.domain_name(),
585+
base_path_without_preceding_slash()
586+
))
587+
.expect("should always be a valid url");
568588

569-
let resp_data = resp.bytes().await.map_err(|err| {
570-
error!("Fatal: failed to parse ingestor info to bytes: {:?}", err);
571-
StreamError::Network(err)
572-
})?;
589+
let resp = HTTP_CLIENT
590+
.get(uri)
591+
.header(header::AUTHORIZATION, server.token().to_owned())
592+
.header(header::CONTENT_TYPE, "application/json")
593+
.send()
594+
.await;
573595

574-
let sp = serde_json::from_slice::<JsonValue>(&resp_data)
575-
.map_err(|err| {
576-
error!("Fatal: failed to parse ingestor info: {:?}", err);
577-
StreamError::SerdeError(err)
578-
})?
579-
.get("staging")
580-
.ok_or(StreamError::SerdeError(SerdeError::missing_field(
581-
"staging",
582-
)))?
583-
.as_str()
584-
.ok_or(StreamError::SerdeError(SerdeError::custom(
585-
"staging path not a string/ not provided",
586-
)))?
587-
.to_string();
588-
589-
(true, sp, None, status)
590-
} else {
591-
(
592-
false,
593-
"".to_owned(),
594-
resp.as_ref().err().map(|e| e.to_string()),
595-
resp.unwrap_err().status().map(|s| s.to_string()),
596-
)
597-
};
598-
599-
infos.push(utils::ClusterInfo::new(
600-
&ingestor.domain_name,
601-
reachable,
602-
staging_path,
603-
PARSEABLE.storage.get_endpoint(),
604-
error,
605-
status,
606-
));
596+
let (reachable, staging_path, error, status) = if let Ok(resp) = resp {
597+
let status = Some(resp.status().to_string());
598+
599+
let resp_data = resp.bytes().await.map_err(|err| {
600+
error!("Fatal: failed to parse server info to bytes: {:?}", err);
601+
StreamError::Network(err)
602+
})?;
603+
604+
let sp = serde_json::from_slice::<JsonValue>(&resp_data)
605+
.map_err(|err| {
606+
error!("Fatal: failed to parse server info: {:?}", err);
607+
StreamError::SerdeError(err)
608+
})?
609+
.get("staging")
610+
.ok_or(StreamError::SerdeError(SerdeError::missing_field(
611+
"staging",
612+
)))?
613+
.as_str()
614+
.ok_or(StreamError::SerdeError(SerdeError::custom(
615+
"staging path not a string/ not provided",
616+
)))?
617+
.to_string();
618+
619+
(true, sp, None, status)
620+
} else {
621+
(
622+
false,
623+
"".to_owned(),
624+
resp.as_ref().err().map(|e| e.to_string()),
625+
resp.unwrap_err().status().map(|s| s.to_string()),
626+
)
627+
};
628+
629+
Ok(utils::ClusterInfo::new(
630+
server.domain_name(),
631+
reachable,
632+
staging_path,
633+
PARSEABLE.storage.get_endpoint(),
634+
error,
635+
status,
636+
))
637+
}
638+
639+
/// Fetches info for multiple servers in parallel
640+
async fn fetch_servers_info<T: Metadata>(
641+
servers: Vec<T>,
642+
) -> Result<Vec<utils::ClusterInfo>, StreamError> {
643+
let servers_len = servers.len();
644+
let results = stream::iter(servers)
645+
.map(|server| async move { fetch_server_info(&server).await })
646+
.buffer_unordered(servers_len) // No concurrency limit
647+
.collect::<Vec<_>>()
648+
.await;
649+
650+
// Collect results, propagating any errors
651+
let mut infos = Vec::with_capacity(results.len());
652+
for result in results {
653+
infos.push(result?);
607654
}
608655

609-
Ok(actix_web::HttpResponse::Ok().json(infos))
656+
Ok(infos)
610657
}
611658

612659
pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
@@ -702,60 +749,130 @@ pub async fn remove_ingestor(ingestor: Path<String>) -> Result<impl Responder, P
702749
Ok((msg, StatusCode::OK))
703750
}
704751

705-
async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
706-
let ingestor_metadata = get_ingestor_info().await.map_err(|err| {
707-
error!("Fatal: failed to get ingestor info: {:?}", err);
708-
PostError::Invalid(err)
752+
/// Fetches metrics from a server (ingestor or indexer)
753+
async fn fetch_server_metrics<T>(server: &T) -> Result<Option<Metrics>, PostError>
754+
where
755+
T: Metadata + Send + Sync + 'static,
756+
{
757+
// Format the metrics URL
758+
let uri = Url::parse(&format!(
759+
"{}{}/metrics",
760+
server.domain_name(),
761+
base_path_without_preceding_slash()
762+
))
763+
.map_err(|err| {
764+
PostError::Invalid(anyhow::anyhow!("Invalid URL in server metadata: {}", err))
709765
})?;
710766

711-
let mut dresses = vec![];
712-
713-
for ingestor in ingestor_metadata {
714-
let uri = Url::parse(&format!(
715-
"{}{}/metrics",
716-
&ingestor.domain_name,
717-
base_path_without_preceding_slash()
718-
))
719-
.map_err(|err| {
720-
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
721-
})?;
722-
723-
// add a check to see if the ingestor is live
724-
if !check_liveness(&ingestor.domain_name).await {
725-
warn!("Ingestor {} is not live", ingestor.domain_name);
726-
continue;
727-
}
767+
// Check if the server is live
768+
if !check_liveness(server.domain_name()).await {
769+
warn!("Server {} is not live", server.domain_name());
770+
return Ok(None);
771+
}
728772

729-
let res = HTTP_CLIENT
730-
.get(uri)
731-
.header(header::AUTHORIZATION, &ingestor.token)
732-
.header(header::CONTENT_TYPE, "application/json")
733-
.send()
734-
.await;
773+
// Fetch metrics
774+
let res = HTTP_CLIENT
775+
.get(uri)
776+
.header(header::AUTHORIZATION, server.token())
777+
.header(header::CONTENT_TYPE, "application/json")
778+
.send()
779+
.await;
735780

736-
if let Ok(res) = res {
781+
match res {
782+
Ok(res) => {
737783
let text = res.text().await.map_err(PostError::NetworkError)?;
738784
let lines: Vec<Result<String, std::io::Error>> =
739785
text.lines().map(|line| Ok(line.to_owned())).collect_vec();
740786

741787
let sample = prometheus_parse::Scrape::parse(lines.into_iter())
742788
.map_err(|err| PostError::CustomError(err.to_string()))?
743789
.samples;
744-
let ingestor_metrics = Metrics::from_prometheus_samples(sample, &ingestor)
790+
791+
let metrics = Metrics::from_prometheus_samples(sample, server)
745792
.await
746793
.map_err(|err| {
747-
error!("Fatal: failed to get ingestor metrics: {:?}", err);
794+
error!("Fatal: failed to get server metrics: {:?}", err);
748795
PostError::Invalid(err.into())
749796
})?;
750-
dresses.push(ingestor_metrics);
751-
} else {
797+
798+
Ok(Some(metrics))
799+
}
800+
Err(_) => {
752801
warn!(
753-
"Failed to fetch metrics from ingestor: {}\n",
754-
&ingestor.domain_name,
802+
"Failed to fetch metrics from server: {}\n",
803+
server.domain_name()
755804
);
805+
Ok(None)
806+
}
807+
}
808+
}
809+
810+
/// Fetches metrics from multiple servers in parallel
811+
async fn fetch_servers_metrics<T>(servers: Vec<T>) -> Result<Vec<Metrics>, PostError>
812+
where
813+
T: Metadata + Send + Sync + 'static,
814+
{
815+
let servers_len = servers.len();
816+
let results = stream::iter(servers)
817+
.map(|server| async move { fetch_server_metrics(&server).await })
818+
.buffer_unordered(servers_len) // No concurrency limit
819+
.collect::<Vec<_>>()
820+
.await;
821+
822+
// Process results
823+
let mut metrics = Vec::new();
824+
for result in results {
825+
match result {
826+
Ok(Some(server_metrics)) => metrics.push(server_metrics),
827+
Ok(None) => {} // server was not live or metrics couldn't be fetched
828+
Err(err) => return Err(err),
756829
}
757830
}
758-
Ok(dresses)
831+
832+
Ok(metrics)
833+
}
834+
835+
/// Main function to fetch all cluster metrics, parallelized and refactored
836+
async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
837+
// Get ingestor and indexer metadata concurrently
838+
let (ingestor_result, indexer_result) =
839+
future::join(get_ingestor_info(), get_indexer_info()).await;
840+
841+
// Handle ingestor metadata result
842+
let ingestor_metadata = ingestor_result.map_err(|err| {
843+
error!("Fatal: failed to get ingestor info: {:?}", err);
844+
PostError::Invalid(err)
845+
})?;
846+
847+
// Handle indexer metadata result
848+
let indexer_metadata = indexer_result.map_err(|err| {
849+
error!("Fatal: failed to get indexer info: {:?}", err);
850+
PostError::Invalid(err)
851+
})?;
852+
853+
// Fetch metrics from ingestors and indexers concurrently
854+
let (ingestor_metrics, indexer_metrics) = future::join(
855+
fetch_servers_metrics(ingestor_metadata),
856+
fetch_servers_metrics(indexer_metadata),
857+
)
858+
.await;
859+
860+
// Combine all metrics
861+
let mut all_metrics = Vec::new();
862+
863+
// Add ingestor metrics
864+
match ingestor_metrics {
865+
Ok(metrics) => all_metrics.extend(metrics),
866+
Err(err) => return Err(err),
867+
}
868+
869+
// Add indexer metrics
870+
match indexer_metrics {
871+
Ok(metrics) => all_metrics.extend(metrics),
872+
Err(err) => return Err(err),
873+
}
874+
875+
Ok(all_metrics)
759876
}
760877

761878
pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {

src/handlers/http/modal/mod.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,30 @@ impl IndexerMetadata {
549549
}
550550
}
551551

552+
pub trait Metadata {
553+
fn domain_name(&self) -> &str;
554+
fn token(&self) -> &str;
555+
}
556+
557+
impl Metadata for IngestorMetadata {
558+
fn domain_name(&self) -> &str {
559+
&self.domain_name
560+
}
561+
562+
fn token(&self) -> &str {
563+
&self.token
564+
}
565+
}
566+
567+
impl Metadata for IndexerMetadata {
568+
fn domain_name(&self) -> &str {
569+
&self.domain_name
570+
}
571+
572+
fn token(&self) -> &str {
573+
&self.token
574+
}
575+
}
552576
#[cfg(test)]
553577
mod test {
554578
use actix_web::body::MessageBody;

0 commit comments

Comments
 (0)