Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions server/src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,40 +135,39 @@ pub struct Message {

impl Message {
// checks if message (with a column name) is valid (i.e. the column name is present in the schema)
pub fn valid(&self, schema: &Schema, column: Option<&str>) -> bool {
if let Some(col) = column {
return get_field(&schema.fields, col).is_some();
}
true
pub fn valid(&self, schema: &Schema, column: &str) -> bool {
return get_field(&schema.fields, column).is_some();
}

pub fn extract_column_name(&self) -> Option<&str> {
pub fn extract_column_names(&self) -> Option<Vec<&str>> {
let re = Regex::new(r"\{(.*?)\}").unwrap();
let tokens: Vec<&str> = re
.captures_iter(self.message.as_str())
.map(|cap| cap.get(1).unwrap().as_str())
.collect();
// the message can have either no column name ({column_name} not present) or one column name
// return Some only if there is exactly one column name present
if tokens.len() == 1 {
return Some(tokens[0]);
// the message can have either no column name ({column_name} not present) or any number of {column_name} present
// return None if there is no column name present in the message
if tokens.is_empty() {
return None;
}
None
Some(tokens)
}

// returns the message with the column name replaced with the value of the column
// returns the message with the column name(s) replaced with the value of the column
fn get(&self, event: RecordBatch) -> String {
if let Some(column) = self.extract_column_name() {
if let Some(value) = event.column_by_name(column) {
let arr = cast(value, &DataType::Utf8).unwrap();
let value = as_string_array(&arr).value(0);
let mut replace_message = self.message.clone();
if let Some(columns) = self.extract_column_names() {
for column in columns {
if let Some(value) = event.column_by_name(column) {
let arr = cast(value, &DataType::Utf8).unwrap();
let value = as_string_array(&arr).value(0);

return self
.message
.replace(&format!("{{{column}}}"), value.to_string().as_str());
replace_message = replace_message
.replace(&format!("{{{column}}}"), value.to_string().as_str());
}
}
}
self.message.clone()
replace_message
}
}

Expand Down
24 changes: 13 additions & 11 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,19 @@ pub async fn put_alert(

let schema = STREAM_INFO.schema(&stream_name)?;
for alert in &alerts.alerts {
let column = alert.message.extract_column_name();
let is_valid = alert.message.valid(&schema, column);
if !is_valid {
let col = column.unwrap_or("");
return Err(StreamError::InvalidAlertMessage(
alert.name.to_owned(),
col.to_string(),
));
}
if !alert.rule.valid_for_schema(&schema) {
return Err(StreamError::InvalidAlert(alert.name.to_owned()));
if let Some(columns) = alert.message.extract_column_names() {
for column in columns {
let is_valid = alert.message.valid(&schema, column);
if !is_valid {
return Err(StreamError::InvalidAlertMessage(
alert.name.to_owned(),
column.to_string(),
));
}
if !alert.rule.valid_for_schema(&schema) {
return Err(StreamError::InvalidAlert(alert.name.to_owned()));
}
}
}
}

Expand Down