Skip to content

Commit 921effd

Browse files
committed
Add support for multiple column names to be passed in the message
field of an alert Fixes #564
1 parent 68a9615 commit 921effd

File tree

2 files changed

+32
-31
lines changed

2 files changed

+32
-31
lines changed

server/src/alerts/mod.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -135,40 +135,39 @@ pub struct Message {
135135

136136
impl Message {
137137
// checks if message (with a column name) is valid (i.e. the column name is present in the schema)
138-
pub fn valid(&self, schema: &Schema, column: Option<&str>) -> bool {
139-
if let Some(col) = column {
140-
return get_field(&schema.fields, col).is_some();
141-
}
142-
true
138+
pub fn valid(&self, schema: &Schema, column: &str) -> bool {
139+
return get_field(&schema.fields, column).is_some();
143140
}
144141

145-
pub fn extract_column_name(&self) -> Option<&str> {
142+
pub fn extract_column_names(&self) -> Option<Vec<&str>> {
146143
let re = Regex::new(r"\{(.*?)\}").unwrap();
147144
let tokens: Vec<&str> = re
148145
.captures_iter(self.message.as_str())
149146
.map(|cap| cap.get(1).unwrap().as_str())
150147
.collect();
151-
// the message can have either no column name ({column_name} not present) or one column name
152-
// return Some only if there is exactly one column name present
153-
if tokens.len() == 1 {
154-
return Some(tokens[0]);
148+
// the message can have either no column name ({column_name} not present) or any number of {column_name} present
149+
// return None if there is no column name present in the message
150+
if tokens.is_empty() {
151+
return None;
155152
}
156-
None
153+
Some(tokens)
157154
}
158155

159-
// returns the message with the column name replaced with the value of the column
156+
// returns the message with the column name(s) replaced with the value of the column
160157
fn get(&self, event: RecordBatch) -> String {
161-
if let Some(column) = self.extract_column_name() {
162-
if let Some(value) = event.column_by_name(column) {
163-
let arr = cast(value, &DataType::Utf8).unwrap();
164-
let value = as_string_array(&arr).value(0);
158+
let mut replace_message = self.message.clone();
159+
if let Some(columns) = self.extract_column_names() {
160+
for column in columns {
161+
if let Some(value) = event.column_by_name(column) {
162+
let arr = cast(value, &DataType::Utf8).unwrap();
163+
let value = as_string_array(&arr).value(0);
165164

166-
return self
167-
.message
168-
.replace(&format!("{{{column}}}"), value.to_string().as_str());
165+
replace_message = replace_message
166+
.replace(&format!("{{{column}}}"), value.to_string().as_str());
167+
}
169168
}
170169
}
171-
self.message.clone()
170+
replace_message
172171
}
173172
}
174173

server/src/handlers/http/logstream.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -154,17 +154,19 @@ pub async fn put_alert(
154154

155155
let schema = STREAM_INFO.schema(&stream_name)?;
156156
for alert in &alerts.alerts {
157-
let column = alert.message.extract_column_name();
158-
let is_valid = alert.message.valid(&schema, column);
159-
if !is_valid {
160-
let col = column.unwrap_or("");
161-
return Err(StreamError::InvalidAlertMessage(
162-
alert.name.to_owned(),
163-
col.to_string(),
164-
));
165-
}
166-
if !alert.rule.valid_for_schema(&schema) {
167-
return Err(StreamError::InvalidAlert(alert.name.to_owned()));
157+
if let Some(columns) = alert.message.extract_column_names() {
158+
for column in columns {
159+
let is_valid = alert.message.valid(&schema, column);
160+
if !is_valid {
161+
return Err(StreamError::InvalidAlertMessage(
162+
alert.name.to_owned(),
163+
column.to_string(),
164+
));
165+
}
166+
if !alert.rule.valid_for_schema(&schema) {
167+
return Err(StreamError::InvalidAlert(alert.name.to_owned()));
168+
}
169+
}
168170
}
169171
}
170172

0 commit comments

Comments
 (0)