Skip to content

Commit 17c3ab1

Browse files
committed
hotfix for alerts
Added new actions
1 parent 40876cd commit 17c3ab1

File tree

5 files changed

+38
-34
lines changed

5 files changed

+38
-34
lines changed

.github/workflows/coverage.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ name: Lint, Test and Coverage Report
1010
jobs:
1111
coverage:
1212
runs-on: ubuntu-latest
13+
env:
14+
CMAKE_FLAGS: "-DCMAKE_POLICY_VERSION_MINIMUM=3.5"
1315
steps:
1416
- uses: actions/checkout@v4
1517
- uses: dtolnay/rust-toolchain@stable
@@ -35,8 +37,6 @@ jobs:
3537
if: runner.os == 'Linux'
3638

3739
- name: Check with clippy
38-
env:
39-
CMAKE_FLAGS: "-DCMAKE_POLICY_VERSION_MINIMUM=3.5"
4040
run: cargo hack clippy --verbose --each-feature --no-dev-deps -- -D warnings
4141

4242
- name: Test default feature set

src/alerts/alerts_utils.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> {
102102
trace!("RUNNING EVAL TASK FOR- {alert:?}");
103103

104104
let query = prepare_query(alert).await?;
105-
let base_df = execute_base_query(&query, &alert.query).await?;
105+
let select_query = format!("SELECT * FROM {}", alert.stream);
106+
let base_df = execute_base_query(&query, &select_query).await?;
106107
let agg_results = evaluate_aggregates(&alert.aggregate_config, &base_df).await?;
107108
let final_res = calculate_final_result(&alert.aggregate_config, &agg_results);
108109

@@ -118,7 +119,8 @@ async fn prepare_query(alert: &AlertConfig) -> Result<crate::query::Query, Alert
118119
};
119120

120121
let session_state = QUERY_SESSION.state();
121-
let raw_logical_plan = session_state.create_logical_plan(&alert.query).await?;
122+
let select_query = format!("SELECT * FROM {}", alert.stream);
123+
let raw_logical_plan = session_state.create_logical_plan(&select_query).await?;
122124

123125
let time_range = TimeRange::parse_human_time(start_time, end_time)
124126
.map_err(|err| AlertError::CustomError(err.to_string()))?;

src/alerts/mod.rs

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ pub struct AlertRequest {
390390
#[serde(default = "Severity::default")]
391391
pub severity: Severity,
392392
pub title: String,
393-
pub query: String,
393+
pub stream: String,
394394
pub alert_type: AlertType,
395395
pub aggregate_config: Aggregations,
396396
pub eval_type: EvalConfig,
@@ -404,7 +404,7 @@ impl From<AlertRequest> for AlertConfig {
404404
id: Ulid::new(),
405405
severity: val.severity,
406406
title: val.title,
407-
query: val.query,
407+
stream: val.stream,
408408
alert_type: val.alert_type,
409409
aggregate_config: val.aggregate_config,
410410
eval_type: val.eval_type,
@@ -422,7 +422,7 @@ pub struct AlertConfig {
422422
pub id: Ulid,
423423
pub severity: Severity,
424424
pub title: String,
425-
pub query: String,
425+
pub stream: String,
426426
pub alert_type: AlertType,
427427
pub aggregate_config: Aggregations,
428428
pub eval_type: EvalConfig,
@@ -435,7 +435,7 @@ pub struct AlertConfig {
435435
impl AlertConfig {
436436
pub fn modify(&mut self, alert: AlertRequest) {
437437
self.title = alert.title;
438-
self.query = alert.query;
438+
self.stream = alert.stream;
439439
self.alert_type = alert.alert_type;
440440
self.aggregate_config = alert.aggregate_config;
441441
self.eval_type = alert.eval_type;
@@ -480,28 +480,14 @@ impl AlertConfig {
480480
self.validate_configs()?;
481481

482482
let session_state = QUERY_SESSION.state();
483-
let raw_logical_plan = session_state.create_logical_plan(&self.query).await?;
483+
let select_query = format!("SELECT * FROM {}", self.stream);
484+
485+
let raw_logical_plan = session_state.create_logical_plan(&select_query).await?;
484486

485487
// create a visitor to extract the table names present in query
486488
let mut visitor = TableScanVisitor::default();
487489
let _ = raw_logical_plan.visit(&mut visitor);
488490

489-
let table = visitor.into_inner().first().unwrap().to_owned();
490-
491-
let lowercase = self.query.split(&table).collect_vec()[0].to_lowercase();
492-
493-
if lowercase
494-
.strip_prefix(" ")
495-
.unwrap_or(&lowercase)
496-
.strip_suffix(" ")
497-
.unwrap_or(&lowercase)
498-
.ne("select * from")
499-
{
500-
return Err(AlertError::Metadata(
501-
"Query needs to be select * from <logstream>",
502-
));
503-
}
504-
505491
// TODO: Filter tags should be taken care of!!!
506492
let time_range = TimeRange::parse_human_time("1m", "now")
507493
.map_err(|err| AlertError::CustomError(err.to_string()))?;
@@ -517,7 +503,7 @@ impl AlertConfig {
517503
let Some(stream_name) = query.first_table_name() else {
518504
return Err(AlertError::CustomError(format!(
519505
"Table name not found in query- {}",
520-
self.query
506+
select_query
521507
)));
522508
};
523509

@@ -766,8 +752,8 @@ impl Alerts {
766752
let mut alerts: Vec<AlertConfig> = Vec::new();
767753
for (_, alert) in self.alerts.read().await.iter() {
768754
// filter based on whether the user can execute this query or not
769-
let query = &alert.query;
770-
if user_auth_for_query(&session, query).await.is_ok() {
755+
let query = format!("SELECT * from {}", &alert.stream);
756+
if user_auth_for_query(&session, &query).await.is_ok() {
771757
alerts.push(alert.to_owned());
772758
}
773759
}

src/handlers/http/alerts.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ pub async fn post(
5353
// validate the incoming alert query
5454
// does the user have access to these tables or not?
5555
let session_key = extract_session_key_from_req(&req)?;
56-
user_auth_for_query(&session_key, &alert.query).await?;
56+
let query = format!("SELECT * FROM {}", alert.stream);
57+
user_auth_for_query(&session_key, &query).await?;
5758

5859
// create scheduled tasks
5960
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
@@ -89,7 +90,8 @@ pub async fn get(req: HttpRequest, alert_id: Path<Ulid>) -> Result<impl Responde
8990

9091
let alert = ALERTS.get_alert_by_id(alert_id).await?;
9192
// validate that the user has access to the tables mentioned
92-
user_auth_for_query(&session_key, &alert.query).await?;
93+
let query = format!("SELECT * FROM {}", alert.stream);
94+
user_auth_for_query(&session_key, &query).await?;
9395

9496
Ok(web::Json(alert))
9597
}
@@ -103,7 +105,8 @@ pub async fn delete(req: HttpRequest, alert_id: Path<Ulid>) -> Result<impl Respo
103105
let alert = ALERTS.get_alert_by_id(alert_id).await?;
104106

105107
// validate that the user has access to the tables mentioned
106-
user_auth_for_query(&session_key, &alert.query).await?;
108+
let query = format!("SELECT * FROM {}", alert.stream);
109+
user_auth_for_query(&session_key, &query).await?;
107110

108111
let store = PARSEABLE.storage.get_object_store();
109112
let alert_path = alert_json_path(alert_id);
@@ -139,8 +142,9 @@ pub async fn modify(
139142

140143
// validate that the user has access to the tables mentioned
141144
// in the old as well as the modified alert
142-
user_auth_for_query(&session_key, &alert.query).await?;
143-
user_auth_for_query(&session_key, &alert_request.query).await?;
145+
let query = format!("SELECT * FROM {}", alert.stream);
146+
user_auth_for_query(&session_key, &query).await?;
147+
user_auth_for_query(&session_key, &alert_request.stream).await?;
144148

145149
alert.modify(alert_request);
146150
alert.validate().await?;
@@ -185,7 +189,8 @@ pub async fn update_state(
185189
let alert = ALERTS.get_alert_by_id(alert_id).await?;
186190

187191
// validate that the user has access to the tables mentioned
188-
user_auth_for_query(&session_key, &alert.query).await?;
192+
let query = format!("SELECT * FROM {}", alert.stream);
193+
user_auth_for_query(&session_key, &query).await?;
189194

190195
// get current state
191196
let current_state = ALERTS.get_state(alert_id).await?;

src/rbac/role.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ pub enum Action {
4848
ListRole,
4949
GetAbout,
5050
QueryLLM,
51+
AddLLM,
52+
DeleteLLM,
53+
GetLLM,
54+
ListLLM,
5155
ListCluster,
5256
ListClusterMetrics,
5357
Deleteingestor,
@@ -115,6 +119,10 @@ impl RoleBuilder {
115119
| Action::DeleteUser
116120
| Action::GetAbout
117121
| Action::QueryLLM
122+
| Action::AddLLM
123+
| Action::DeleteLLM
124+
| Action::GetLLM
125+
| Action::ListLLM
118126
| Action::PutRole
119127
| Action::GetRole
120128
| Action::DeleteRole
@@ -234,6 +242,7 @@ pub mod model {
234242
Action::GetAlert,
235243
Action::DeleteAlert,
236244
Action::QueryLLM,
245+
Action::GetLLM,
237246
Action::CreateFilter,
238247
Action::ListFilter,
239248
Action::GetFilter,
@@ -276,6 +285,7 @@ pub mod model {
276285
Action::DeleteDashboard,
277286
Action::Ingest,
278287
Action::QueryLLM,
288+
Action::GetLLM,
279289
Action::GetStreamInfo,
280290
Action::GetFilter,
281291
Action::ListFilter,
@@ -298,6 +308,7 @@ pub mod model {
298308
Action::GetSchema,
299309
Action::GetStats,
300310
Action::QueryLLM,
311+
Action::GetLLM,
301312
Action::ListFilter,
302313
Action::GetFilter,
303314
Action::CreateFilter,

0 commit comments

Comments
 (0)