Skip to content

optimise server start #1277

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,8 @@ pub enum AlertError {
InvalidStateChange(String),
#[error("{0}")]
StreamNotFound(#[from] StreamNotFound),
#[error("{0}")]
Anyhow(#[from] anyhow::Error),
}

impl actix_web::ResponseError for AlertError {
Expand All @@ -719,6 +721,7 @@ impl actix_web::ResponseError for AlertError {
Self::CustomError(_) => StatusCode::BAD_REQUEST,
Self::InvalidStateChange(_) => StatusCode::BAD_REQUEST,
Self::StreamNotFound(_) => StatusCode::NOT_FOUND,
Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand All @@ -731,7 +734,7 @@ impl actix_web::ResponseError for AlertError {

impl Alerts {
/// Loads alerts from disk, blocks
pub async fn load(&self) -> Result<(), AlertError> {
pub async fn load(&self) -> anyhow::Result<()> {
let mut map = self.alerts.write().await;
let store = PARSEABLE.storage.get_object_store();

Expand All @@ -743,7 +746,8 @@ impl Alerts {
alert.clone(),
inbox_rx,
outbox_tx,
)?;
)
.map_err(|e| anyhow::Error::msg(e.to_string()))?;

self.update_task(alert.id, handle, outbox_rx, inbox_tx)
.await;
Expand Down
6 changes: 6 additions & 0 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,12 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;

// add a check to see if the ingestor is live
if !check_liveness(&ingestor.domain_name).await {
warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
}

let res = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, &ingestor.token)
Expand Down
40 changes: 40 additions & 0 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ use std::{path::Path, sync::Arc};

use actix_web::{middleware::from_fn, web::ServiceConfig, App, HttpServer};
use actix_web_prometheus::PrometheusMetrics;
use anyhow::Context;
use async_trait::async_trait;
use base64::{prelude::BASE64_STANDARD, Engine};
use bytes::Bytes;
use futures::future;
use openid::Discovered;
use relative_path::RelativePathBuf;
use serde::{Deserialize, Serialize};
Expand All @@ -32,11 +34,14 @@ use tokio::sync::oneshot;
use tracing::{error, info, warn};

use crate::{
alerts::ALERTS,
cli::Options,
correlation::CORRELATIONS,
oidc::Claims,
option::Mode,
parseable::PARSEABLE,
storage::{ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY},
users::{dashboards::DASHBOARDS, filters::FILTERS},
utils::{get_indexer_id, get_ingestor_id},
};

Expand Down Expand Up @@ -159,6 +164,41 @@ pub trait ParseableServer {
}
}

pub async fn load_on_init() -> anyhow::Result<()> {
// Run all loading operations concurrently
let (correlations_result, filters_result, dashboards_result, alerts_result) = future::join4(
async {
CORRELATIONS
.load()
.await
.context("Failed to load correlations")
},
async { FILTERS.load().await.context("Failed to load filters") },
async { DASHBOARDS.load().await.context("Failed to load dashboards") },
async { ALERTS.load().await.context("Failed to load alerts") },
)
.await;

// Handle errors from each operation
if let Err(e) = correlations_result {
error!("{e}");
}

if let Err(err) = filters_result {
error!("{err}");
}

if let Err(err) = dashboards_result {
error!("{err}");
}

if let Err(err) = alerts_result {
error!("{err}");
}

Ok(())
}

#[derive(Debug, Serialize, Deserialize, Default, Clone, Eq, PartialEq)]
pub struct IngestorMetadata {
pub version: String,
Expand Down
31 changes: 6 additions & 25 deletions src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

use std::thread;

use crate::alerts::ALERTS;
use crate::correlation::CORRELATIONS;
use crate::handlers::airplane;
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
Expand All @@ -28,22 +26,20 @@ use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
use crate::handlers::http::{rbac, role};
use crate::hottier::HotTierManager;
use crate::rbac::role::Action;
use crate::users::dashboards::DASHBOARDS;
use crate::users::filters::FILTERS;
use crate::{analytics, migration, storage, sync};
use actix_web::web::{resource, ServiceConfig};
use actix_web::{web, Scope};
use actix_web_prometheus::PrometheusMetrics;
use async_trait::async_trait;
use bytes::Bytes;
use tokio::sync::oneshot;
use tracing::{error, info};
use tracing::info;

use crate::parseable::PARSEABLE;
use crate::Server;

use super::query::{querier_ingest, querier_logstream, querier_rbac, querier_role};
use super::{OpenIdClient, ParseableServer};
use super::{load_on_init, OpenIdClient, ParseableServer};

pub struct QueryServer;

Expand Down Expand Up @@ -90,7 +86,6 @@ impl ParseableServer for QueryServer {
));
}

migration::run_file_migration(&PARSEABLE).await?;
let parseable_json = PARSEABLE.validate_storage().await?;
migration::run_metadata_migration(&PARSEABLE, &parseable_json).await?;

Expand All @@ -109,22 +104,8 @@ impl ParseableServer for QueryServer {

//create internal stream at server start
PARSEABLE.create_internal_stream_if_not_exists().await?;

if let Err(e) = CORRELATIONS.load().await {
error!("{e}");
}
if let Err(err) = FILTERS.load().await {
error!("{err}")
};

if let Err(err) = DASHBOARDS.load().await {
error!("{err}")
};

if let Err(err) = ALERTS.load().await {
error!("{err}")
};

// load on init
load_on_init().await?;
// track all parquet files already in the data directory
storage::retention::load_retention_from_global();

Expand All @@ -150,11 +131,11 @@ impl ParseableServer for QueryServer {

let result = self
.start(shutdown_rx, prometheus.clone(), PARSEABLE.options.openid())
.await;
.await?;
// Cancel sync jobs
cancel_tx.send(()).expect("Cancellation should not fail");

result
Ok(result)
}
}

Expand Down
25 changes: 5 additions & 20 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

use std::thread;

use crate::alerts::ALERTS;
use crate::analytics;
use crate::correlation::CORRELATIONS;
use crate::handlers;
use crate::handlers::http::about;
use crate::handlers::http::alerts;
Expand All @@ -35,8 +33,6 @@ use crate::metrics;
use crate::migration;
use crate::storage;
use crate::sync;
use crate::users::dashboards::DASHBOARDS;
use crate::users::filters::FILTERS;

use actix_web::web;
use actix_web::web::resource;
Expand All @@ -47,7 +43,6 @@ use actix_web_static_files::ResourceFiles;
use async_trait::async_trait;
use bytes::Bytes;
use tokio::sync::oneshot;
use tracing::error;

use crate::{
handlers::http::{
Expand All @@ -61,6 +56,7 @@ use crate::{

// use super::generate;
use super::generate;
use super::load_on_init;
use super::OpenIdClient;
use super::ParseableServer;

Expand Down Expand Up @@ -103,7 +99,8 @@ impl ParseableServer for Server {
}

async fn load_metadata(&self) -> anyhow::Result<Option<Bytes>> {
migration::run_file_migration(&PARSEABLE).await?;
//TODO: removed file migration
//deprecated support for deployments < v1.0.0
let parseable_json = PARSEABLE.validate_storage().await?;
migration::run_metadata_migration(&PARSEABLE, &parseable_json).await?;

Expand All @@ -120,20 +117,8 @@ impl ParseableServer for Server {

migration::run_migration(&PARSEABLE).await?;

if let Err(e) = CORRELATIONS.load().await {
error!("{e}");
}
if let Err(err) = FILTERS.load().await {
error!("{err}")
};

if let Err(err) = DASHBOARDS.load().await {
error!("{err}")
};

if let Err(err) = ALERTS.load().await {
error!("{err}")
};
// load on init
load_on_init().await?;

storage::retention::load_retention_from_global();

Expand Down
Loading
Loading