diff --git a/Cargo.lock b/Cargo.lock
index 52cc45e4a..e4f29241a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2283,6 +2283,12 @@ dependencies = [
"unicase",
]
+[[package]]
+name = "minimal-lexical"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
+
[[package]]
name = "miniz_oxide"
version = "0.6.2"
@@ -2343,6 +2349,16 @@ dependencies = [
"libc",
]
+[[package]]
+name = "nom"
+version = "7.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
+dependencies = [
+ "memchr",
+ "minimal-lexical",
+]
+
[[package]]
name = "nom8"
version = "0.2.0"
@@ -2662,6 +2678,7 @@ dependencies = [
"itertools 0.10.5",
"log",
"maplit",
+ "nom",
"num_cpus",
"object_store",
"once_cell",
diff --git a/server/Cargo.toml b/server/Cargo.toml
index 08e83f136..e49f300a0 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -87,6 +87,7 @@ ulid = { version = "1.0", features = ["serde"] }
uptime_lib = "0.2.2"
xxhash-rust = { version = "0.8", features = ["xxh3"] }
xz2 = { version = "*", features = ["static"] }
+nom = "7.1.3"
[build-dependencies]
diff --git a/server/src/alerts/mod.rs b/server/src/alerts/mod.rs
index 97458223f..81868d093 100644
--- a/server/src/alerts/mod.rs
+++ b/server/src/alerts/mod.rs
@@ -26,6 +26,7 @@ use regex::Regex;
use serde::{Deserialize, Serialize};
use std::fmt;
+pub mod parser;
pub mod rule;
pub mod target;
diff --git a/server/src/alerts/parser.rs b/server/src/alerts/parser.rs
new file mode 100644
index 000000000..cd7704b0c
--- /dev/null
+++ b/server/src/alerts/parser.rs
@@ -0,0 +1,252 @@
+/*
+ * Parseable Server (C) 2022 - 2023 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use std::str::FromStr;
+
+use nom::{
+ branch::alt,
+ bytes::complete::{tag, take_until, take_while1},
+ character::complete::{char, multispace0, multispace1},
+ combinator::map,
+ sequence::{delimited, separated_pair},
+ IResult,
+};
+
+use super::rule::{
+ base::{
+ ops::{NumericOperator, StringOperator},
+ NumericRule, StringRule,
+ },
+ CompositeRule,
+};
+
+fn parse_numeric_op(input: &str) -> IResult<&str, NumericOperator> {
+ alt((
+ map(tag("<="), |_| NumericOperator::LessThanEquals),
+ map(tag(">="), |_| NumericOperator::GreaterThanEquals),
+ map(tag("!="), |_| NumericOperator::NotEqualTo),
+ map(tag("<"), |_| NumericOperator::LessThan),
+ map(tag(">"), |_| NumericOperator::GreaterThan),
+ map(tag("="), |_| NumericOperator::EqualTo),
+ ))(input)
+}
+
+fn parse_string_op(input: &str) -> IResult<&str, StringOperator> {
+ alt((
+ map(tag("!="), |_| StringOperator::NotExact),
+ map(tag("=%"), |_| StringOperator::Contains),
+ map(tag("!%"), |_| StringOperator::NotContains),
+ map(tag("="), |_| StringOperator::Exact),
+ map(tag("~"), |_| StringOperator::Regex),
+ ))(input)
+}
+
+fn parse_numeric_rule(input: &str) -> IResult<&str, CompositeRule> {
+ let (remaining, key) = map(parse_identifier, |s: &str| s.to_string())(input)?;
+ let (remaining, op) = delimited(multispace0, parse_numeric_op, multispace0)(remaining)?;
+ let (remaining, value) = map(take_while1(|c: char| c.is_ascii_digit()), |x| {
+ str::parse(x).unwrap()
+ })(remaining)?;
+
+ Ok((
+ remaining,
+ CompositeRule::Numeric(NumericRule {
+ column: key,
+ operator: op,
+ value,
+ }),
+ ))
+}
+
+fn parse_string_rule(input: &str) -> IResult<&str, CompositeRule> {
+ let (remaining, key) = map(parse_identifier, |s: &str| s.to_string())(input)?;
+ let (remaining, op) = delimited(multispace0, parse_string_op, multispace0)(remaining)?;
+ let (remaining, value) = map(
+ delimited(char('"'), take_until("\""), char('"')),
+ |x: &str| x.to_string(),
+ )(remaining)?;
+
+ Ok((
+ remaining,
+ CompositeRule::String(StringRule {
+ column: key,
+ operator: op,
+ value,
+ ignore_case: None,
+ }),
+ ))
+}
+
+fn parse_identifier(input: &str) -> IResult<&str, &str> {
+ take_while1(|c: char| c.is_alphanumeric() || c == '-' || c == '_')(input)
+}
+
+fn parse_unary_expr(input: &str) -> IResult<&str, CompositeRule> {
+ map(delimited(tag("!("), parse_expression, char(')')), |x| {
+ CompositeRule::Not(Box::new(x))
+ })(input)
+}
+
+fn parse_bracket_expr(input: &str) -> IResult<&str, CompositeRule> {
+ delimited(
+ char('('),
+ delimited(multispace0, parse_expression, multispace0),
+ char(')'),
+ )(input)
+}
+
+fn parse_and(input: &str) -> IResult<&str, CompositeRule> {
+ let (remaining, (lhs, rhs)) = separated_pair(
+ parse_atom,
+ delimited(multispace1, tag("and"), multispace1),
+ parse_term,
+ )(input)?;
+
+ Ok((remaining, CompositeRule::And(vec![lhs, rhs])))
+}
+
+fn parse_or(input: &str) -> IResult<&str, CompositeRule> {
+ let (remaining, (lhs, rhs)) = separated_pair(
+ parse_term,
+ delimited(multispace1, tag("or"), multispace1),
+ parse_expression,
+ )(input)?;
+
+ Ok((remaining, CompositeRule::Or(vec![lhs, rhs])))
+}
+
+fn parse_expression(input: &str) -> IResult<&str, CompositeRule> {
+ alt((parse_or, parse_term))(input)
+}
+fn parse_term(input: &str) -> IResult<&str, CompositeRule> {
+ alt((parse_and, parse_atom))(input)
+}
+fn parse_atom(input: &str) -> IResult<&str, CompositeRule> {
+ alt((
+ alt((parse_numeric_rule, parse_string_rule)),
+ parse_unary_expr,
+ parse_bracket_expr,
+ ))(input)
+}
+
+impl FromStr for CompositeRule {
+ type Err = Box;
+
+ fn from_str(s: &str) -> Result {
+ parse_expression(s)
+ .map(|(_, x)| x)
+ .map_err(|x| x.to_string().into())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::str::FromStr;
+
+ use crate::alerts::rule::{
+ base::{
+ ops::{NumericOperator, StringOperator},
+ NumericRule, StringRule,
+ },
+ CompositeRule,
+ };
+
+ #[test]
+ fn test_and_or_not() {
+ let input = r#"key=500 and key="value" or !(key=300)"#;
+ let rule = CompositeRule::from_str(input).unwrap();
+
+ let numeric1 = NumericRule {
+ column: "key".to_string(),
+ operator: NumericOperator::EqualTo,
+ value: serde_json::Number::from(500),
+ };
+
+ let string1 = StringRule {
+ column: "key".to_string(),
+ operator: StringOperator::Exact,
+ value: "value".to_string(),
+ ignore_case: None,
+ };
+
+ let numeric3 = NumericRule {
+ column: "key".to_string(),
+ operator: NumericOperator::EqualTo,
+ value: serde_json::Number::from(300),
+ };
+
+ assert_eq!(
+ rule,
+ CompositeRule::Or(vec![
+ CompositeRule::And(vec![
+ CompositeRule::Numeric(numeric1),
+ CompositeRule::String(string1)
+ ]),
+ CompositeRule::Not(Box::new(CompositeRule::Numeric(numeric3)))
+ ])
+ )
+ }
+
+ #[test]
+ fn test_complex() {
+ let input = r#"(verb =% "list" or verb =% "get") and (resource = "secret" and username !% "admin")"#;
+ let rule = CompositeRule::from_str(input).unwrap();
+
+ let verb_like_list = StringRule {
+ column: "verb".to_string(),
+ operator: StringOperator::Contains,
+ value: "list".to_string(),
+ ignore_case: None,
+ };
+
+ let verb_like_get = StringRule {
+ column: "verb".to_string(),
+ operator: StringOperator::Contains,
+ value: "get".to_string(),
+ ignore_case: None,
+ };
+
+ let resource_exact_secret = StringRule {
+ column: "resource".to_string(),
+ operator: StringOperator::Exact,
+ value: "secret".to_string(),
+ ignore_case: None,
+ };
+
+ let username_notcontains_admin = StringRule {
+ column: "username".to_string(),
+ operator: StringOperator::NotContains,
+ value: "admin".to_string(),
+ ignore_case: None,
+ };
+
+ assert_eq!(
+ rule,
+ CompositeRule::And(vec![
+ CompositeRule::Or(vec![
+ CompositeRule::String(verb_like_list),
+ CompositeRule::String(verb_like_get)
+ ]),
+ CompositeRule::And(vec![
+ CompositeRule::String(resource_exact_secret),
+ CompositeRule::String(username_notcontains_admin)
+ ]),
+ ])
+ )
+ }
+}
diff --git a/server/src/alerts/rule.rs b/server/src/alerts/rule.rs
index cd8c614b5..24b222415 100644
--- a/server/src/alerts/rule.rs
+++ b/server/src/alerts/rule.rs
@@ -17,8 +17,18 @@
*/
use arrow_array::{cast::as_string_array, RecordBatch};
-use datafusion::arrow::datatypes::{DataType, Schema};
-use std::sync::atomic::{AtomicU32, Ordering};
+use datafusion::arrow::datatypes::Schema;
+use itertools::Itertools;
+use serde::{
+ de::{MapAccess, Visitor},
+ Deserialize, Deserializer,
+};
+use std::{
+ fmt,
+ marker::PhantomData,
+ str::FromStr,
+ sync::atomic::{AtomicU32, Ordering},
+};
use self::base::{
ops::{NumericOperator, StringOperator},
@@ -32,24 +42,39 @@ use super::AlertState;
#[serde(rename_all = "camelCase")]
pub enum Rule {
Column(ColumnRule),
+ #[serde(deserialize_with = "string_or_struct", serialize_with = "to_string")]
+ Composite(CompositeRule),
}
impl Rule {
pub fn resolves(&self, event: RecordBatch) -> Vec {
match self {
Rule::Column(rule) => rule.resolves(event),
+ Rule::Composite(rule) => rule
+ .resolves(event)
+ .iter()
+ .map(|x| {
+ if *x {
+ AlertState::SetToFiring
+ } else {
+ AlertState::Listening
+ }
+ })
+ .collect(),
}
}
pub fn valid_for_schema(&self, schema: &Schema) -> bool {
match self {
Rule::Column(rule) => rule.valid_for_schema(schema),
+ Rule::Composite(rule) => rule.valid_for_schema(schema),
}
}
pub fn trigger_reason(&self) -> String {
match self {
Rule::Column(rule) => rule.trigger_reason(),
+ Rule::Composite(rule) => format!("matched rule {}", rule),
}
}
}
@@ -73,29 +98,10 @@ impl ColumnRule {
match self {
Self::ConsecutiveNumeric(ConsecutiveNumericRule {
base_rule: rule, ..
- }) => match schema.column_with_name(&rule.column) {
- Some((_, column)) => matches!(
- column.data_type(),
- DataType::Int8
- | DataType::Int16
- | DataType::Int32
- | DataType::Int64
- | DataType::UInt8
- | DataType::UInt16
- | DataType::UInt32
- | DataType::UInt64
- | DataType::Float16
- | DataType::Float32
- | DataType::Float64
- ),
- None => false,
- },
+ }) => rule.valid_for_schema(schema),
Self::ConsecutiveString(ConsecutiveStringRule {
base_rule: rule, ..
- }) => match schema.column_with_name(&rule.column) {
- Some((_, column)) => matches!(column.data_type(), DataType::Utf8),
- None => false,
- },
+ }) => rule.valid_for_schema(schema),
}
}
@@ -217,6 +223,117 @@ fn one() -> u32 {
1
}
+#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
+#[serde(rename_all = "camelCase")]
+pub enum CompositeRule {
+ And(Vec),
+ Or(Vec),
+ Not(Box),
+ Numeric(NumericRule),
+ String(StringRule),
+}
+
+impl CompositeRule {
+ fn resolves(&self, event: RecordBatch) -> Vec {
+ let res = match self {
+ CompositeRule::And(rules) => {
+ // get individual evaluation for each subrule
+ let mut evaluations = rules
+ .iter()
+ .map(|x| x.resolves(event.clone()))
+ .collect_vec();
+ // They all must be of same length otherwise some columns was missing in evaluation
+ let is_same_len = evaluations.iter().map(|x| x.len()).all_equal();
+ // if there are more than one rule then we go through all evaluations and compare them side by side
+ if is_same_len && evaluations.len() > 1 {
+ (0..evaluations[0].len())
+ .map(|idx| evaluations.iter().all(|x| x[idx]))
+ .collect()
+ } else if is_same_len && evaluations.len() == 1 {
+ evaluations.pop().expect("length one")
+ } else {
+ vec![]
+ }
+ }
+ CompositeRule::Or(rules) => {
+ // get individual evaluation for each subrule
+ let evaluations: Vec> = rules
+ .iter()
+ .map(|x| x.resolves(event.clone()))
+ .collect_vec();
+ let mut evaluation_iterators = evaluations.iter().map(|x| x.iter()).collect_vec();
+ let mut res = vec![];
+
+ loop {
+ let mut continue_iteration = false;
+ let mut accumulator = false;
+ for iter in &mut evaluation_iterators {
+ if let Some(val) = iter.next() {
+ accumulator = accumulator || *val;
+ continue_iteration = true
+ }
+ }
+ if !continue_iteration {
+ break;
+ } else {
+ res.push(accumulator)
+ }
+ }
+
+ res
+ }
+ CompositeRule::Numeric(rule) => {
+ let Some(column) = event.column_by_name(&rule.column) else {
+ return Vec::new();
+ };
+ rule.resolves(column)
+ }
+ CompositeRule::String(rule) => {
+ let Some(column) = event.column_by_name(&rule.column) else {
+ return Vec::new();
+ };
+ rule.resolves(as_string_array(column))
+ }
+ CompositeRule::Not(rule) => {
+ let mut res = rule.resolves(event);
+ res.iter_mut().for_each(|x| *x = !*x);
+ res
+ }
+ };
+
+ res
+ }
+
+ fn valid_for_schema(&self, schema: &Schema) -> bool {
+ match self {
+ CompositeRule::And(rules) => rules.iter().all(|rule| rule.valid_for_schema(schema)),
+ CompositeRule::Or(rules) => rules.iter().all(|rule| rule.valid_for_schema(schema)),
+ CompositeRule::Not(rule) => rule.valid_for_schema(schema),
+ CompositeRule::Numeric(rule) => rule.valid_for_schema(schema),
+ CompositeRule::String(rule) => rule.valid_for_schema(schema),
+ }
+ }
+}
+
+impl fmt::Display for CompositeRule {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let v = match self {
+ CompositeRule::And(rules) => {
+ let rules_str: Vec = rules.iter().map(|rule| rule.to_string()).collect();
+ format!("({})", rules_str.join(" and "))
+ }
+ CompositeRule::Or(rules) => {
+ let rules_str: Vec = rules.iter().map(|rule| rule.to_string()).collect();
+ format!("({})", rules_str.join(" or "))
+ }
+ CompositeRule::Not(rule) => format!("!({})", rule),
+ CompositeRule::Numeric(numeric_rule) => numeric_rule.to_string(),
+ CompositeRule::String(string_rule) => string_rule.to_string(),
+ };
+ write!(f, "{}", v)
+ }
+}
+
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct ConsecutiveRepeatState {
#[serde(default = "one")]
@@ -260,6 +377,57 @@ impl ConsecutiveRepeatState {
}
}
+fn string_or_struct<'de, T, D>(deserializer: D) -> Result
+where
+ T: Deserialize<'de> + FromStr>,
+ D: Deserializer<'de>,
+{
+ // This is a Visitor that forwards string types to T's `FromStr` impl and
+ // forwards map types to T's `Deserialize` impl. The `PhantomData` is to
+ // keep the compiler from complaining about T being an unused generic type
+ // parameter. We need T in order to know the Value type for the Visitor
+ // impl.
+ struct StringOrStruct(PhantomData T>);
+
+ impl<'de, T> Visitor<'de> for StringOrStruct
+ where
+ T: Deserialize<'de> + FromStr>,
+ {
+ type Value = T;
+
+ fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
+ formatter.write_str("string or map")
+ }
+
+ fn visit_str(self, value: &str) -> Result
+ where
+ E: serde::de::Error,
+ {
+ FromStr::from_str(value).map_err(|x| serde::de::Error::custom(x))
+ }
+
+ fn visit_map(self, map: M) -> Result
+ where
+ M: MapAccess<'de>,
+ {
+ // `MapAccessDeserializer` is a wrapper that turns a `MapAccess`
+ // into a `Deserializer`, allowing it to be used as the input to T's
+ // `Deserialize` implementation. T then deserializes itself using
+ // the entries from the map visitor.
+ Deserialize::deserialize(serde::de::value::MapAccessDeserializer::new(map))
+ }
+ }
+
+ deserializer.deserialize_any(StringOrStruct(PhantomData))
+}
+
+fn to_string(ty: &CompositeRule, serializer: S) -> Result
+where
+ S: serde::Serializer,
+{
+ serializer.serialize_str(&ty.to_string())
+}
+
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicU32;
@@ -298,17 +466,20 @@ mod tests {
}
pub mod base {
+ use std::fmt::Display;
+
use arrow_array::{
cast::as_primitive_array,
types::{Float64Type, Int64Type, UInt64Type},
Array, ArrowPrimitiveType, PrimitiveArray, StringArray,
};
+ use arrow_schema::{DataType, Schema};
use itertools::Itertools;
use self::ops::{NumericOperator, StringOperator};
use regex::Regex;
- #[derive(Debug, serde::Serialize, serde::Deserialize)]
+ #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct NumericRule {
pub column: String,
@@ -360,9 +531,35 @@ pub mod base {
})
.collect()
}
+
+ pub fn valid_for_schema(&self, schema: &Schema) -> bool {
+ match schema.column_with_name(&self.column) {
+ Some((_, column)) => matches!(
+ column.data_type(),
+ DataType::Int8
+ | DataType::Int16
+ | DataType::Int32
+ | DataType::Int64
+ | DataType::UInt8
+ | DataType::UInt16
+ | DataType::UInt32
+ | DataType::UInt64
+ | DataType::Float16
+ | DataType::Float32
+ | DataType::Float64
+ ),
+ None => false,
+ }
+ }
+ }
+
+ impl Display for NumericRule {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{} {} {}", self.column, self.operator, self.value)
+ }
}
- #[derive(Debug, serde::Serialize, serde::Deserialize)]
+ #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct StringRule {
pub column: String,
@@ -417,12 +614,30 @@ pub mod base {
}
}
}
+
+ pub fn valid_for_schema(&self, schema: &Schema) -> bool {
+ match schema.column_with_name(&self.column) {
+ Some((_, column)) => matches!(column.data_type(), DataType::Utf8),
+ None => false,
+ }
+ }
+ }
+
+ impl Display for StringRule {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{} {} \"{}\"", self.column, self.operator, self.value)
+ }
}
pub mod ops {
- #[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
+ use std::fmt::Display;
+
+ #[derive(
+ Debug, Default, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize,
+ )]
#[serde(rename_all = "camelCase")]
pub enum NumericOperator {
+ #[default]
#[serde(alias = "=")]
EqualTo,
#[serde(alias = "!=")]
@@ -437,19 +652,33 @@ pub mod base {
LessThanEquals,
}
- impl Default for NumericOperator {
- fn default() -> Self {
- Self::EqualTo
+ impl Display for NumericOperator {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{}",
+ match self {
+ NumericOperator::EqualTo => "=",
+ NumericOperator::NotEqualTo => "!=",
+ NumericOperator::GreaterThan => ">",
+ NumericOperator::GreaterThanEquals => ">=",
+ NumericOperator::LessThan => "<",
+ NumericOperator::LessThanEquals => "<=",
+ }
+ )
}
}
- #[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
+ #[derive(
+ Debug, Default, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize,
+ )]
#[serde(rename_all = "camelCase")]
pub enum StringOperator {
#[serde(alias = "=")]
Exact,
#[serde(alias = "!=")]
NotExact,
+ #[default]
#[serde(alias = "=%")]
Contains,
#[serde(alias = "!%")]
@@ -458,9 +687,19 @@ pub mod base {
Regex,
}
- impl Default for StringOperator {
- fn default() -> Self {
- Self::Contains
+ impl Display for StringOperator {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{}",
+ match self {
+ StringOperator::Exact => "=",
+ StringOperator::NotExact => "!=",
+ StringOperator::Contains => "=%",
+ StringOperator::NotContains => "!%",
+ StringOperator::Regex => "~",
+ }
+ )
}
}
}
diff --git a/server/src/validator.rs b/server/src/validator.rs
index 463a9d3f0..1a44a16c2 100644
--- a/server/src/validator.rs
+++ b/server/src/validator.rs
@@ -44,8 +44,8 @@ pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> {
return Err(AlertValidationError::NoTarget);
}
- match alert.rule {
- Rule::Column(ref column_rule) => match column_rule {
+ if let Rule::Column(ref column_rule) = alert.rule {
+ match column_rule {
ColumnRule::ConsecutiveNumeric(ConsecutiveNumericRule {
base_rule: NumericRule { ref column, .. },
ref state,
@@ -61,7 +61,7 @@ pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> {
return Err(AlertValidationError::InvalidRuleRepeat);
}
}
- },
+ }
}
}
Ok(())