Skip to content

Commit 1841a68

Browse files
process live ingestors in paralllel
1 parent 8af3a4c commit 1841a68

File tree

1 file changed

+18
-7
lines changed
  • src/handlers/http/cluster

1 file changed

+18
-7
lines changed

src/handlers/http/cluster/mod.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,24 +67,35 @@ const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1
6767

6868
pub async fn for_each_live_ingestor<F, Fut, E>(api_fn: F) -> Result<(), E>
6969
where
70-
F: Fn(NodeMetadata) -> Fut + Clone,
71-
Fut: Future<Output = Result<(), E>>,
72-
E: From<anyhow::Error>,
70+
F: Fn(NodeMetadata) -> Fut + Clone + Send + Sync + 'static,
71+
Fut: Future<Output = Result<(), E>> + Send,
72+
E: From<anyhow::Error> + Send + Sync + 'static,
7373
{
7474
let ingestor_infos: Vec<NodeMetadata> =
7575
get_node_info(NodeType::Ingestor).await.map_err(|err| {
7676
error!("Fatal: failed to get ingestor info: {:?}", err);
7777
E::from(err)
7878
})?;
7979

80-
// Process each live ingestor
80+
let mut live_ingestors = Vec::new();
8181
for ingestor in ingestor_infos {
82-
if !utils::check_liveness(&ingestor.domain_name).await {
82+
if utils::check_liveness(&ingestor.domain_name).await {
83+
live_ingestors.push(ingestor);
84+
} else {
8385
warn!("Ingestor {} is not live", ingestor.domain_name);
84-
continue;
8586
}
87+
}
88+
89+
// Process all live ingestors in parallel
90+
let results = futures::future::join_all(live_ingestors.into_iter().map(|ingestor| {
91+
let api_fn = api_fn.clone();
92+
async move { api_fn(ingestor).await }
93+
}))
94+
.await;
8695

87-
api_fn(ingestor).await?;
96+
// collect results
97+
for result in results {
98+
result?;
8899
}
89100

90101
Ok(())

0 commit comments

Comments
 (0)