diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 82581ffff..737ba940a 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -705,6 +705,8 @@ pub enum AlertError { InvalidStateChange(String), #[error("{0}")] StreamNotFound(#[from] StreamNotFound), + #[error("{0}")] + Anyhow(#[from] anyhow::Error), } impl actix_web::ResponseError for AlertError { @@ -719,6 +721,7 @@ impl actix_web::ResponseError for AlertError { Self::CustomError(_) => StatusCode::BAD_REQUEST, Self::InvalidStateChange(_) => StatusCode::BAD_REQUEST, Self::StreamNotFound(_) => StatusCode::NOT_FOUND, + Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, } } @@ -731,7 +734,7 @@ impl actix_web::ResponseError for AlertError { impl Alerts { /// Loads alerts from disk, blocks - pub async fn load(&self) -> Result<(), AlertError> { + pub async fn load(&self) -> anyhow::Result<()> { let mut map = self.alerts.write().await; let store = PARSEABLE.storage.get_object_store(); @@ -743,7 +746,8 @@ impl Alerts { alert.clone(), inbox_rx, outbox_tx, - )?; + ) + .map_err(|e| anyhow::Error::msg(e.to_string()))?; self.update_task(alert.id, handle, outbox_rx, inbox_tx) .await; diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 05d56e028..7062045f0 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -720,6 +720,12 @@ async fn fetch_cluster_metrics() -> Result, PostError> { PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err)) })?; + // add a check to see if the ingestor is live + if !check_liveness(&ingestor.domain_name).await { + warn!("Ingestor {} is not live", ingestor.domain_name); + continue; + } + let res = HTTP_CLIENT .get(uri) .header(header::AUTHORIZATION, &ingestor.token) diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 2f1cbded7..6d7679e49 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -20,9 +20,11 @@ use std::{path::Path, sync::Arc}; use actix_web::{middleware::from_fn, web::ServiceConfig, App, HttpServer}; use actix_web_prometheus::PrometheusMetrics; +use anyhow::Context; use async_trait::async_trait; use base64::{prelude::BASE64_STANDARD, Engine}; use bytes::Bytes; +use futures::future; use openid::Discovered; use relative_path::RelativePathBuf; use serde::{Deserialize, Serialize}; @@ -32,11 +34,14 @@ use tokio::sync::oneshot; use tracing::{error, info, warn}; use crate::{ + alerts::ALERTS, cli::Options, + correlation::CORRELATIONS, oidc::Claims, option::Mode, parseable::PARSEABLE, storage::{ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY}, + users::{dashboards::DASHBOARDS, filters::FILTERS}, utils::{get_indexer_id, get_ingestor_id}, }; @@ -159,6 +164,41 @@ pub trait ParseableServer { } } +pub async fn load_on_init() -> anyhow::Result<()> { + // Run all loading operations concurrently + let (correlations_result, filters_result, dashboards_result, alerts_result) = future::join4( + async { + CORRELATIONS + .load() + .await + .context("Failed to load correlations") + }, + async { FILTERS.load().await.context("Failed to load filters") }, + async { DASHBOARDS.load().await.context("Failed to load dashboards") }, + async { ALERTS.load().await.context("Failed to load alerts") }, + ) + .await; + + // Handle errors from each operation + if let Err(e) = correlations_result { + error!("{e}"); + } + + if let Err(err) = filters_result { + error!("{err}"); + } + + if let Err(err) = dashboards_result { + error!("{err}"); + } + + if let Err(err) = alerts_result { + error!("{err}"); + } + + Ok(()) +} + #[derive(Debug, Serialize, Deserialize, Default, Clone, Eq, PartialEq)] pub struct IngestorMetadata { pub version: String, diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index e0f3c7e31..dde519d2e 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -18,8 +18,6 @@ use std::thread; -use crate::alerts::ALERTS; -use crate::correlation::CORRELATIONS; use crate::handlers::airplane; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; @@ -28,8 +26,6 @@ use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::handlers::http::{rbac, role}; use crate::hottier::HotTierManager; use crate::rbac::role::Action; -use crate::users::dashboards::DASHBOARDS; -use crate::users::filters::FILTERS; use crate::{analytics, migration, storage, sync}; use actix_web::web::{resource, ServiceConfig}; use actix_web::{web, Scope}; @@ -37,13 +33,13 @@ use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; use bytes::Bytes; use tokio::sync::oneshot; -use tracing::{error, info}; +use tracing::info; use crate::parseable::PARSEABLE; use crate::Server; use super::query::{querier_ingest, querier_logstream, querier_rbac, querier_role}; -use super::{OpenIdClient, ParseableServer}; +use super::{load_on_init, OpenIdClient, ParseableServer}; pub struct QueryServer; @@ -90,7 +86,6 @@ impl ParseableServer for QueryServer { )); } - migration::run_file_migration(&PARSEABLE).await?; let parseable_json = PARSEABLE.validate_storage().await?; migration::run_metadata_migration(&PARSEABLE, &parseable_json).await?; @@ -109,22 +104,8 @@ impl ParseableServer for QueryServer { //create internal stream at server start PARSEABLE.create_internal_stream_if_not_exists().await?; - - if let Err(e) = CORRELATIONS.load().await { - error!("{e}"); - } - if let Err(err) = FILTERS.load().await { - error!("{err}") - }; - - if let Err(err) = DASHBOARDS.load().await { - error!("{err}") - }; - - if let Err(err) = ALERTS.load().await { - error!("{err}") - }; - + // load on init + load_on_init().await?; // track all parquet files already in the data directory storage::retention::load_retention_from_global(); @@ -150,11 +131,11 @@ impl ParseableServer for QueryServer { let result = self .start(shutdown_rx, prometheus.clone(), PARSEABLE.options.openid()) - .await; + .await?; // Cancel sync jobs cancel_tx.send(()).expect("Cancellation should not fail"); - result + Ok(result) } } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index d28b02b59..002c102b3 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -18,9 +18,7 @@ use std::thread; -use crate::alerts::ALERTS; use crate::analytics; -use crate::correlation::CORRELATIONS; use crate::handlers; use crate::handlers::http::about; use crate::handlers::http::alerts; @@ -35,8 +33,6 @@ use crate::metrics; use crate::migration; use crate::storage; use crate::sync; -use crate::users::dashboards::DASHBOARDS; -use crate::users::filters::FILTERS; use actix_web::web; use actix_web::web::resource; @@ -47,7 +43,6 @@ use actix_web_static_files::ResourceFiles; use async_trait::async_trait; use bytes::Bytes; use tokio::sync::oneshot; -use tracing::error; use crate::{ handlers::http::{ @@ -61,6 +56,7 @@ use crate::{ // use super::generate; use super::generate; +use super::load_on_init; use super::OpenIdClient; use super::ParseableServer; @@ -103,7 +99,8 @@ impl ParseableServer for Server { } async fn load_metadata(&self) -> anyhow::Result> { - migration::run_file_migration(&PARSEABLE).await?; + //TODO: removed file migration + //deprecated support for deployments < v1.0.0 let parseable_json = PARSEABLE.validate_storage().await?; migration::run_metadata_migration(&PARSEABLE, &parseable_json).await?; @@ -120,20 +117,8 @@ impl ParseableServer for Server { migration::run_migration(&PARSEABLE).await?; - if let Err(e) = CORRELATIONS.load().await { - error!("{e}"); - } - if let Err(err) = FILTERS.load().await { - error!("{err}") - }; - - if let Err(err) = DASHBOARDS.load().await { - error!("{err}") - }; - - if let Err(err) = ALERTS.load().await { - error!("{err}") - }; + // load on init + load_on_init().await?; storage::retention::load_retention_from_global(); diff --git a/src/migration/mod.rs b/src/migration/mod.rs index d101879cf..d11ae3f79 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -21,13 +21,14 @@ pub mod metadata_migration; mod schema_migration; mod stream_metadata_migration; -use std::{collections::HashMap, fs::OpenOptions, sync::Arc}; +use std::{collections::HashMap, fs::OpenOptions}; use arrow_schema::Schema; use bytes::Bytes; use relative_path::RelativePathBuf; use serde::Serialize; use serde_json::Value; +use tracing::warn; use crate::{ metadata::{load_daily_metrics, update_data_type_time_partition, LogStreamMetadata}, @@ -36,8 +37,7 @@ use crate::{ parseable::{Parseable, PARSEABLE}, storage::{ object_storage::{parseable_json_path, schema_path, stream_json_path}, - ObjectStorage, ObjectStorageError, ObjectStoreFormat, PARSEABLE_METADATA_FILE_NAME, - PARSEABLE_ROOT_DIRECTORY, STREAM_ROOT_DIRECTORY, + ObjectStorage, ObjectStoreFormat, PARSEABLE_METADATA_FILE_NAME, }, }; @@ -131,20 +131,51 @@ pub async fn run_metadata_migration( Ok(()) } -/// run the migration for all streams +/// run the migration for all streams concurrently pub async fn run_migration(config: &Parseable) -> anyhow::Result<()> { let storage = config.storage.get_object_store(); - for stream_name in storage.list_streams().await? { - let Some(metadata) = migration_stream(&stream_name, &*storage).await? else { - continue; - }; - config - .get_or_create_stream(&stream_name) - .set_metadata(metadata) - .await; - } - Ok(()) + // Get all stream names + let stream_names = storage.list_streams().await?; + + // Create futures for each stream migration + let futures = stream_names.into_iter().map(|stream_name| { + let storage = storage.clone(); + async move { + match migration_stream(&stream_name, &*storage).await { + Ok(Some(metadata)) => { + // Apply the metadata update + config + .get_or_create_stream(&stream_name) + .set_metadata(metadata) + .await; + Ok(()) + } + Ok(None) => Ok(()), + Err(e) => { + // Optionally log error but continue with other streams + warn!("Error migrating stream {}: {:?}", stream_name, e); + Err(e) + } + } + } + }); + + // Execute all migrations concurrently + let results = futures::future::join_all(futures).await; + + // Check for errors + let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect(); + + if errors.is_empty() { + Ok(()) + } else { + // Return the first error, or aggregate them if needed + Err(anyhow::anyhow!( + "Migration errors occurred: {} failures", + errors.len() + )) + } } async fn migration_stream( @@ -406,83 +437,3 @@ pub fn put_staging_metadata( serde_json::to_writer(&mut file, metadata)?; Ok(()) } - -pub async fn run_file_migration(config: &Parseable) -> anyhow::Result<()> { - let object_store = config.storage.get_object_store(); - - let old_meta_file_path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME); - - // if this errors that means migrations is already done - if let Err(err) = object_store.get_object(&old_meta_file_path).await { - if matches!(err, ObjectStorageError::NoSuchKey(_)) { - return Ok(()); - } - return Err(err.into()); - } - - run_meta_file_migration(&object_store, old_meta_file_path).await?; - run_stream_files_migration(&object_store).await?; - - Ok(()) -} - -async fn run_meta_file_migration( - object_store: &Arc, - old_meta_file_path: RelativePathBuf, -) -> anyhow::Result<()> { - // get the list of all meta files - let mut meta_files = object_store.get_ingestor_meta_file_paths().await?; - meta_files.push(old_meta_file_path); - - for file in meta_files { - match object_store.get_object(&file).await { - Ok(bytes) => { - // we can unwrap here because we know the file exists - let new_path = RelativePathBuf::from_iter([ - PARSEABLE_ROOT_DIRECTORY, - file.file_name().expect("should have a file name"), - ]); - object_store.put_object(&new_path, bytes).await?; - object_store.delete_object(&file).await?; - } - Err(err) => { - // if error is not a no such key error, something weird happened - // so return the error - if !matches!(err, ObjectStorageError::NoSuchKey(_)) { - return Err(err.into()); - } - } - } - } - - Ok(()) -} - -async fn run_stream_files_migration(object_store: &Arc) -> anyhow::Result<()> { - let streams = object_store.list_old_streams().await?; - - for stream in streams { - let paths = object_store.get_stream_file_paths(&stream).await?; - - for path in paths { - match object_store.get_object(&path).await { - Ok(bytes) => { - let new_path = RelativePathBuf::from_iter([ - stream.as_str(), - STREAM_ROOT_DIRECTORY, - path.file_name().expect("should have a file name"), - ]); - object_store.put_object(&new_path, bytes).await?; - object_store.delete_object(&path).await?; - } - Err(err) => { - if !matches!(err, ObjectStorageError::NoSuchKey(_)) { - return Err(err.into()); - } - } - } - } - } - - Ok(()) -}