Skip to content

Commit 70c0342

Browse files
remove node metadata from parseable struct, add as OnceCell
1 parent 264ccf3 commit 70c0342

File tree

6 files changed

+137
-204
lines changed

6 files changed

+137
-204
lines changed

src/handlers/http/cluster/mod.rs

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -581,25 +581,16 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
581581
})
582582
.map_err(|err| StreamError::Anyhow(err.into()))?;
583583

584-
// Get self metadata
585-
let self_metadata = if let Some(metadata) = PARSEABLE.get_metadata() {
586-
vec![metadata]
587-
} else {
588-
vec![]
589-
};
590-
591584
// Fetch info for all nodes concurrently
592-
let (querier_infos, ingestor_infos, indexer_infos, self_info) = future::join4(
585+
let (querier_infos, ingestor_infos, indexer_infos) = future::join3(
593586
fetch_nodes_info(querier_metadata),
594587
fetch_nodes_info(ingestor_metadata),
595588
fetch_nodes_info(indexer_metadata),
596-
fetch_nodes_info(self_metadata),
597589
)
598590
.await;
599591

600592
// Combine results from all node types
601593
let mut infos = Vec::new();
602-
infos.extend(self_info?);
603594
infos.extend(querier_infos?);
604595
infos.extend(ingestor_infos?);
605596
infos.extend(indexer_infos?);
@@ -903,12 +894,6 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
903894
)
904895
.await;
905896

906-
let self_metadata = if let Some(metadata) = PARSEABLE.get_metadata() {
907-
vec![metadata]
908-
} else {
909-
vec![]
910-
};
911-
912897
// Handle querier metadata result
913898
let querier_metadata: Vec<NodeMetadata> = querier_result.map_err(|err| {
914899
error!("Fatal: failed to get querier info: {:?}", err);
@@ -925,8 +910,7 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
925910
PostError::Invalid(err)
926911
})?;
927912
// Fetch metrics from ingestors and indexers concurrently
928-
let (self_metrics, querier_metrics, ingestor_metrics, indexer_metrics) = future::join4(
929-
fetch_nodes_metrics(self_metadata),
913+
let (querier_metrics, ingestor_metrics, indexer_metrics) = future::join3(
930914
fetch_nodes_metrics(querier_metadata),
931915
fetch_nodes_metrics(ingestor_metadata),
932916
fetch_nodes_metrics(indexer_metadata),
@@ -936,12 +920,6 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
936920
// Combine all metrics
937921
let mut all_metrics = Vec::new();
938922

939-
// Add self metrics
940-
match self_metrics {
941-
Ok(metrics) => all_metrics.extend(metrics),
942-
Err(err) => return Err(err),
943-
}
944-
945923
// Add querier metrics
946924
match querier_metrics {
947925
Ok(metrics) => all_metrics.extend(metrics),

src/handlers/http/modal/ingest_server.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ use bytes::Bytes;
2727
use relative_path::RelativePathBuf;
2828
use serde_json::Value;
2929
use tokio::sync::oneshot;
30+
use tokio::sync::OnceCell;
3031

31-
use crate::option::Mode;
32+
use crate::handlers::http::modal::NodeType;
3233
use crate::{
3334
analytics,
3435
handlers::{
@@ -46,13 +47,14 @@ use crate::{
4647
sync, Server,
4748
};
4849

50+
use super::IngestorMetadata;
4951
use super::{
5052
ingest::{ingestor_logstream, ingestor_rbac, ingestor_role},
5153
OpenIdClient, ParseableServer,
5254
};
5355

5456
pub const INGESTOR_EXPECT: &str = "Ingestor Metadata should be set in ingestor mode";
55-
57+
pub static INGESTOR_META: OnceCell<IngestorMetadata> = OnceCell::const_new();
5658
pub struct IngestServer;
5759

5860
#[async_trait]
@@ -98,6 +100,15 @@ impl ParseableServer for IngestServer {
98100
prometheus: &PrometheusMetrics,
99101
shutdown_rx: oneshot::Receiver<()>,
100102
) -> anyhow::Result<()> {
103+
// write the ingestor metadata to storage
104+
INGESTOR_META
105+
.get_or_init(|| async {
106+
IngestorMetadata::load_node_metadata(NodeType::Ingestor)
107+
.await
108+
.expect("Ingestor Metadata should be set in ingestor mode")
109+
})
110+
.await;
111+
101112
PARSEABLE.storage.register_store_metrics(prometheus);
102113

103114
migration::run_migration(&PARSEABLE).await?;
@@ -108,9 +119,6 @@ impl ParseableServer for IngestServer {
108119

109120
tokio::spawn(airplane::server());
110121

111-
// write the ingestor metadata to storage
112-
PARSEABLE.store_metadata(Mode::Ingest).await?;
113-
114122
// Ingestors shouldn't have to deal with OpenId auth flow
115123
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
116124
// Cancel sync jobs

src/handlers/http/modal/mod.rs

Lines changed: 79 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -277,28 +277,92 @@ impl NodeMetadata {
277277
}
278278
}
279279

280-
/// Capture metadata information by either loading it from staging or starting fresh
281-
pub fn load(
282-
options: &Options,
283-
storage: &dyn ObjectStorageProvider,
284-
node_type: NodeType,
285-
) -> Arc<Self> {
286-
let staging_path = options.staging_dir();
280+
pub async fn load_node_metadata(node_type: NodeType) -> anyhow::Result<Self> {
281+
let staging_path = PARSEABLE.options.staging_dir();
287282
let node_type_str = node_type.as_str();
288283

289284
// Attempt to load metadata from staging
290-
if let Some(mut meta) = Self::load_from_staging(staging_path, node_type_str, options) {
291-
Self::update_metadata(&mut meta, options, node_type);
292-
meta.put_on_disk(staging_path)
293-
.expect("Couldn't write updated metadata to disk");
294-
return Arc::new(meta);
285+
if let Some(meta) = Self::load_from_staging(staging_path, node_type_str, &PARSEABLE.options)
286+
{
287+
return Self::process_and_store_metadata(meta, staging_path, node_type).await;
288+
}
289+
290+
// Attempt to load metadata from storage
291+
let storage_metas = Self::load_from_storage(node_type_str.to_string()).await;
292+
let url = PARSEABLE.options.get_url(node_type.to_mode());
293+
let port = url.port().unwrap_or(80).to_string();
294+
let url = url.to_string();
295+
296+
for storage_meta in storage_metas {
297+
if storage_meta.domain_name == url && storage_meta.port == port {
298+
return Self::process_and_store_metadata(storage_meta, staging_path, node_type)
299+
.await;
300+
}
295301
}
296302

297-
// If no metadata is found in staging, create a new one
298-
let meta = Self::create_new_metadata(options, storage, node_type);
303+
// If no metadata is found, create a new one
304+
let meta = Self::create_new_metadata(&PARSEABLE.options, &*PARSEABLE.storage, node_type);
305+
Self::store_new_metadata(meta, staging_path).await
306+
}
307+
308+
/// Process and store metadata
309+
async fn process_and_store_metadata(
310+
mut meta: Self,
311+
staging_path: &Path,
312+
node_type: NodeType,
313+
) -> anyhow::Result<Self> {
314+
Self::update_metadata(&mut meta, &PARSEABLE.options, node_type);
315+
meta.put_on_disk(staging_path)
316+
.expect("Couldn't write updated metadata to disk");
317+
318+
let path = meta.file_path();
319+
let resource = serde_json::to_vec(&meta)?.into();
320+
let store = PARSEABLE.storage.get_object_store();
321+
store.put_object(&path, resource).await?;
322+
323+
Ok(meta)
324+
}
325+
326+
/// Store new metadata
327+
async fn store_new_metadata(meta: Self, staging_path: &Path) -> anyhow::Result<Self> {
299328
meta.put_on_disk(staging_path)
300329
.expect("Couldn't write new metadata to disk");
301-
Arc::new(meta)
330+
331+
let path = meta.file_path();
332+
let resource = serde_json::to_vec(&meta)?.into();
333+
let store = PARSEABLE.storage.get_object_store();
334+
store.put_object(&path, resource).await?;
335+
336+
Ok(meta)
337+
}
338+
339+
async fn load_from_storage(node_type: String) -> Vec<NodeMetadata> {
340+
let path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
341+
let glob_storage = PARSEABLE.storage.get_object_store();
342+
let obs = glob_storage
343+
.get_objects(
344+
Some(&path),
345+
Box::new({
346+
let node_type = node_type.clone();
347+
move |file_name| file_name.contains(&node_type)
348+
}),
349+
)
350+
.await;
351+
352+
let mut metadata = vec![];
353+
if let Ok(obs) = obs {
354+
for object in obs {
355+
//convert to NodeMetadata
356+
match serde_json::from_slice::<NodeMetadata>(&object) {
357+
Ok(node_metadata) => metadata.push(node_metadata),
358+
Err(e) => error!("Failed to deserialize NodeMetadata: {:?}", e),
359+
}
360+
}
361+
} else {
362+
error!("Couldn't read from storage");
363+
}
364+
// Return the metadata
365+
metadata
302366
}
303367

304368
/// Load metadata from the staging directory
@@ -467,30 +531,6 @@ impl NodeMetadata {
467531
Ok(metadata)
468532
}
469533

470-
pub async fn migrate(&self) -> anyhow::Result<Option<Self>> {
471-
let imp = self.file_path();
472-
let bytes = match PARSEABLE.storage.get_object_store().get_object(&imp).await {
473-
Ok(bytes) => bytes,
474-
Err(_) => {
475-
return Ok(None);
476-
}
477-
};
478-
479-
let mut resource = Self::from_bytes(&bytes, PARSEABLE.options.flight_port)?;
480-
resource.node_type.clone_from(&self.node_type);
481-
let bytes = Bytes::from(serde_json::to_vec(&resource)?);
482-
483-
resource.put_on_disk(PARSEABLE.options.staging_dir())?;
484-
485-
PARSEABLE
486-
.storage
487-
.get_object_store()
488-
.put_object(&imp, bytes)
489-
.await?;
490-
491-
Ok(Some(resource))
492-
}
493-
494534
/// Puts the node info into the staging.
495535
///
496536
/// This function takes the node info as a parameter and stores it in staging.

src/handlers/http/modal/query_server.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,17 @@ use actix_web::{web, Scope};
3232
use actix_web_prometheus::PrometheusMetrics;
3333
use async_trait::async_trait;
3434
use bytes::Bytes;
35-
use tokio::sync::oneshot;
35+
use tokio::sync::{oneshot, OnceCell};
3636
use tracing::info;
3737

3838
use crate::parseable::PARSEABLE;
3939
use crate::Server;
4040

4141
use super::query::{querier_ingest, querier_logstream, querier_rbac, querier_role};
42-
use super::{load_on_init, OpenIdClient, ParseableServer};
42+
use super::{load_on_init, NodeType, OpenIdClient, ParseableServer, QuerierMetadata};
4343

4444
pub struct QueryServer;
45-
45+
pub static QUERIER_META: OnceCell<QuerierMetadata> = OnceCell::const_new();
4646
#[async_trait]
4747
impl ParseableServer for QueryServer {
4848
// configure the api routes
@@ -99,7 +99,14 @@ impl ParseableServer for QueryServer {
9999
shutdown_rx: oneshot::Receiver<()>,
100100
) -> anyhow::Result<()> {
101101
PARSEABLE.storage.register_store_metrics(prometheus);
102-
102+
// write the ingestor metadata to storage
103+
QUERIER_META
104+
.get_or_init(|| async {
105+
QuerierMetadata::load_node_metadata(NodeType::Querier)
106+
.await
107+
.expect("Querier Metadata should be set in ingestor mode")
108+
})
109+
.await;
103110
migration::run_migration(&PARSEABLE).await?;
104111

105112
//create internal stream at server start

0 commit comments

Comments
 (0)