Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ sha2 = "0.10.8"

# Serialization and Data Formats
byteorder = "1.4.3"
erased-serde = "=0.3.16"
serde = { version = "1.0", features = ["rc", "derive"] }
serde_json = "1.0"
serde_repr = "0.1.17"
Expand Down Expand Up @@ -141,6 +142,7 @@ futures-core = "0.3.31"
tempfile = "3.20.0"
lazy_static = "1.4.0"
prost = "0.13.1"
dashmap = "6.1.0"

[build-dependencies]
cargo_toml = "0.21"
Expand Down
12 changes: 12 additions & 0 deletions src/alerts/alert_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use crate::{
alert_traits::AlertTrait,
target::{NotificationConfig, TARGETS},
},
metastore::metastore_traits::MetastoreObject,
query::resolve_stream_names,
storage::object_storage::alert_json_path,
};

/// Helper struct for basic alert fields during migration
Expand Down Expand Up @@ -527,3 +529,13 @@ impl AlertQueryResult {
pub struct NotificationStateRequest {
pub state: String,
}

impl MetastoreObject for AlertConfig {
fn get_object_id(&self) -> String {
self.id.to_string()
}

fn get_object_path(&self) -> String {
alert_json_path(self.id).to_string()
}
}
3 changes: 2 additions & 1 deletion src/alerts/alert_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
alert_enums::NotificationState,
alert_structs::{Context, ThresholdConfig},
},
metastore::metastore_traits::MetastoreObject,
rbac::map::SessionKey,
};
use chrono::{DateTime, Utc};
Expand All @@ -47,7 +48,7 @@ pub trait MessageCreation {
}

#[async_trait]
pub trait AlertTrait: Debug + Send + Sync {
pub trait AlertTrait: Debug + Send + Sync + MetastoreObject {
async fn eval_alert(&self) -> Result<Option<String>, AlertError>;
async fn validate(&self, session_key: &SessionKey) -> Result<(), AlertError>;
async fn update_notification_state(
Expand Down
31 changes: 25 additions & 6 deletions src/alerts/alert_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ use crate::{
target::{self, NotificationConfig},
},
handlers::http::query::create_streams_for_distributed,
metastore::metastore_traits::MetastoreObject,
parseable::PARSEABLE,
query::resolve_stream_names,
rbac::map::SessionKey,
storage::object_storage::alert_json_path,
utils::user_auth_for_query,
};

Expand Down Expand Up @@ -65,6 +67,16 @@ pub struct ThresholdAlert {
pub last_triggered_at: Option<DateTime<Utc>>,
}

impl MetastoreObject for ThresholdAlert {
fn get_object_path(&self) -> String {
alert_json_path(self.id).to_string()
}

fn get_object_id(&self) -> String {
self.id.to_string()
}
}

#[async_trait]
impl AlertTrait for ThresholdAlert {
async fn eval_alert(&self) -> Result<Option<String>, AlertError> {
Expand Down Expand Up @@ -170,12 +182,14 @@ impl AlertTrait for ThresholdAlert {
&mut self,
new_notification_state: NotificationState,
) -> Result<(), AlertError> {
let store = PARSEABLE.storage.get_object_store();
// update state in memory
self.notification_state = new_notification_state;
// update on disk
store.put_alert(self.id, &self.to_alert_config()).await?;

// update on disk
PARSEABLE
.metastore
.put_alert(&self.to_alert_config())
.await?;
Ok(())
}

Expand All @@ -184,7 +198,6 @@ impl AlertTrait for ThresholdAlert {
new_state: AlertState,
trigger_notif: Option<String>,
) -> Result<(), AlertError> {
let store = PARSEABLE.storage.get_object_store();
if self.state.eq(&AlertState::Disabled) {
warn!(
"Alert- {} is currently Disabled. Updating state to {new_state}.",
Expand All @@ -199,7 +212,10 @@ impl AlertTrait for ThresholdAlert {
}

// update on disk
store.put_alert(self.id, &self.to_alert_config()).await?;
PARSEABLE
.metastore
.put_alert(&self.to_alert_config())
.await?;
// The task should have already been removed from the list of running tasks
return Ok(());
}
Expand Down Expand Up @@ -232,7 +248,10 @@ impl AlertTrait for ThresholdAlert {
}

// update on disk
store.put_alert(self.id, &self.to_alert_config()).await?;
PARSEABLE
.metastore
.put_alert(&self.to_alert_config())
.await?;

if trigger_notif.is_some() && self.notification_state.eq(&NotificationState::Notify) {
trace!("trigger notif on-\n{}", self.state);
Expand Down
32 changes: 11 additions & 21 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@ use crate::alerts::alert_traits::{AlertManagerTrait, AlertTrait};
use crate::alerts::alert_types::ThresholdAlert;
use crate::alerts::target::{NotificationConfig, TARGETS};
use crate::handlers::http::fetch_schema;
use crate::metastore::MetastoreError;
// use crate::handlers::http::query::create_streams_for_distributed;
// use crate::option::Mode;
use crate::parseable::{PARSEABLE, StreamNotFound};
use crate::query::{QUERY_SESSION, resolve_stream_names};
use crate::rbac::map::SessionKey;
use crate::storage;
use crate::storage::{ALERTS_ROOT_DIRECTORY, ObjectStorageError};
use crate::storage::ObjectStorageError;
use crate::sync::alert_runtime;
use crate::utils::user_auth_for_query;

Expand Down Expand Up @@ -103,10 +104,7 @@ pub fn create_default_alerts_manager() -> Alerts {

impl AlertConfig {
/// Migration function to convert v1 alerts to v2 structure
pub async fn migrate_from_v1(
alert_json: &JsonValue,
store: &dyn crate::storage::ObjectStorage,
) -> Result<AlertConfig, AlertError> {
pub async fn migrate_from_v1(alert_json: &JsonValue) -> Result<AlertConfig, AlertError> {
let basic_fields = Self::parse_basic_fields(alert_json)?;
let alert_info = format!("Alert '{}' (ID: {})", basic_fields.title, basic_fields.id);

Expand Down Expand Up @@ -138,7 +136,7 @@ impl AlertConfig {
};

// Save the migrated alert back to storage
store.put_alert(basic_fields.id, &migrated_alert).await?;
PARSEABLE.metastore.put_alert(&migrated_alert).await?;

Ok(migrated_alert)
}
Expand Down Expand Up @@ -950,6 +948,8 @@ pub enum AlertError {
Unimplemented(String),
#[error("{0}")]
ValidationFailure(String),
#[error(transparent)]
MetastoreError(#[from] MetastoreError),
}

impl actix_web::ResponseError for AlertError {
Expand Down Expand Up @@ -977,6 +977,7 @@ impl actix_web::ResponseError for AlertError {
Self::ArrowError(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::Unimplemented(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::NotPresentInOSS(_) => StatusCode::BAD_REQUEST,
Self::MetastoreError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand All @@ -991,19 +992,10 @@ impl actix_web::ResponseError for AlertError {
impl AlertManagerTrait for Alerts {
/// Loads alerts from disk, blocks
async fn load(&self) -> anyhow::Result<()> {
let mut map = self.alerts.write().await;
let store = PARSEABLE.storage.get_object_store();

// Get alerts path and read raw bytes for migration handling
let relative_path = relative_path::RelativePathBuf::from(ALERTS_ROOT_DIRECTORY);
let raw_objects = PARSEABLE.metastore.get_alerts().await?;

let raw_objects = store
.get_objects(
Some(&relative_path),
Box::new(|file_name| file_name.ends_with(".json")),
)
.await
.unwrap_or_default();
let mut map = self.alerts.write().await;

for raw_bytes in raw_objects {
// First, try to parse as JSON Value to check version
Expand All @@ -1022,7 +1014,7 @@ impl AlertManagerTrait for Alerts {
|| json_value.get("stream").is_some()
{
// This is a v1 alert that needs migration
match AlertConfig::migrate_from_v1(&json_value, store.as_ref()).await {
match AlertConfig::migrate_from_v1(&json_value).await {
Ok(migrated) => migrated,
Err(e) => {
error!("Failed to migrate v1 alert: {e}");
Expand All @@ -1042,7 +1034,7 @@ impl AlertManagerTrait for Alerts {
} else {
// No version field, assume v1 and migrate
warn!("Found alert without version field, assuming v1 and migrating");
match AlertConfig::migrate_from_v1(&json_value, store.as_ref()).await {
match AlertConfig::migrate_from_v1(&json_value).await {
Ok(migrated) => migrated,
Err(e) => {
error!("Failed to migrate alert without version: {e}");
Expand Down Expand Up @@ -1253,8 +1245,6 @@ impl AlertManagerTrait for Alerts {
alert_id: Ulid,
new_notification_state: NotificationState,
) -> Result<(), AlertError> {
// let store = PARSEABLE.storage.get_object_store();

// read and modify alert
let mut write_access = self.alerts.write().await;
let mut alert: Box<dyn AlertTrait> = if let Some(alert) = write_access.get(&alert_id) {
Expand Down
30 changes: 16 additions & 14 deletions src/alerts/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::{

use async_trait::async_trait;
use base64::Engine;
use bytes::Bytes;
use chrono::Utc;
use http::{HeaderMap, HeaderValue, header::AUTHORIZATION};
use itertools::Itertools;
Expand All @@ -38,6 +37,7 @@ use url::Url;

use crate::{
alerts::{AlertError, AlertState, Context, alert_traits::CallableTarget},
metastore::metastore_traits::MetastoreObject,
parseable::PARSEABLE,
storage::object_storage::target_json_path,
};
Expand All @@ -56,25 +56,19 @@ pub struct TargetConfigs {
impl TargetConfigs {
/// Loads alerts from disk, blocks
pub async fn load(&self) -> anyhow::Result<()> {
let targets = PARSEABLE.metastore.get_targets().await?;
let mut map = self.target_configs.write().await;
let store = PARSEABLE.storage.get_object_store();

for alert in store.get_targets().await.unwrap_or_default() {
map.insert(alert.id, alert);
for target in targets {
map.insert(target.id, target);
}

Ok(())
}

pub async fn update(&self, target: Target) -> Result<(), AlertError> {
PARSEABLE.metastore.put_target(&target).await?;
let mut map = self.target_configs.write().await;
map.insert(target.id, target.clone());

let path = target_json_path(&target.id);

let store = PARSEABLE.storage.get_object_store();
let target_bytes = serde_json::to_vec(&target)?;
store.put_object(&path, Bytes::from(target_bytes)).await?;
Ok(())
}

Expand Down Expand Up @@ -121,9 +115,7 @@ impl TargetConfigs {
.await
.remove(target_id)
.ok_or(AlertError::InvalidTargetID(target_id.to_string()))?;
let path = target_json_path(&target.id);
let store = PARSEABLE.storage.get_object_store();
store.delete_object(&path).await?;
PARSEABLE.metastore.delete_target(&target).await?;
Ok(target)
}
}
Expand Down Expand Up @@ -340,6 +332,16 @@ impl Target {
}
}

impl MetastoreObject for Target {
fn get_object_path(&self) -> String {
target_json_path(&self.id).to_string()
}

fn get_object_id(&self) -> String {
self.id.to_string()
}
}

fn call_target(target: TargetType, context: Context) {
trace!("Calling target with context- {context:?}");
tokio::spawn(async move { target.call(&context).await });
Expand Down
12 changes: 12 additions & 0 deletions src/catalog/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use std::collections::HashMap;
use itertools::Itertools;
use parquet::{file::reader::FileReader, format::SortingColumn};

use crate::metastore::metastore_traits::MetastoreObject;

use super::column::Column;

#[derive(
Expand Down Expand Up @@ -88,6 +90,16 @@ impl Manifest {
}
}

impl MetastoreObject for Manifest {
fn get_object_path(&self) -> String {
unimplemented!()
}

fn get_object_id(&self) -> String {
unimplemented!()
}
}

pub fn create_from_parquet_file(
object_store_path: String,
fs_file_path: &std::path::Path,
Expand Down
Loading
Loading