Skip to content

Commit a37d368

Browse files
author
Devdutt Shenoi
committed
feat: update log source entry
1 parent 186a586 commit a37d368

File tree

3 files changed

+103
-73
lines changed

3 files changed

+103
-73
lines changed

src/event/format/known_schema.rs

Lines changed: 73 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet};
2020

2121
use once_cell::sync::Lazy;
2222
use regex::Regex;
23-
use serde::Deserialize;
23+
use serde::{Deserialize, Deserializer};
2424
use serde_json::{Map, Value};
2525
use tracing::{error, warn};
2626

@@ -35,13 +35,35 @@ pub static KNOWN_SCHEMA_LIST: Lazy<EventProcessor> =
3535
#[error("Event is not in the expected text/JSON format for {0}")]
3636
pub struct Unacceptable(String);
3737

38+
/// Deserializes a string pattern into a compiled Regex
39+
/// NOTE: we only warn if the pattern doesn't compile
40+
pub fn deserialize_regex<'de, D>(deserializer: D) -> Result<Option<Regex>, D::Error>
41+
where
42+
D: Deserializer<'de>,
43+
{
44+
let pattern = String::deserialize(deserializer)?;
45+
46+
let regex = Regex::new(&pattern)
47+
.inspect_err(|err| error!("Error compiling regex pattern: {err}; Pattern: {pattern}"))
48+
.ok();
49+
50+
Ok(regex)
51+
}
52+
53+
/// Configuration for a single pattern within a log format
54+
#[derive(Debug, Default, Deserialize)]
55+
struct Pattern {
56+
/// Regular expression pattern used to match and capture fields from log strings
57+
#[serde(deserialize_with = "deserialize_regex")]
58+
pattern: Option<Regex>,
59+
// Maps field names to regex capture groups
60+
fields: HashSet<String>,
61+
}
62+
3863
/// Defines a schema for extracting structured data from logs using regular expressions
3964
#[derive(Debug, Default)]
4065
pub struct SchemaDefinition {
41-
/// Regular expression pattern used to match and capture fields from log strings
42-
patterns: Vec<Regex>,
43-
// Maps field names to regex capture groups
44-
field_mappings: Vec<HashSet<String>>,
66+
patterns: Vec<Pattern>,
4567
}
4668

4769
impl SchemaDefinition {
@@ -55,36 +77,40 @@ impl SchemaDefinition {
5577
/// * `extract_log` - Optional field name containing the raw log text
5678
///
5779
/// # Returns
58-
/// * `true` - If all expected fields are already present in the object OR if extraction was successful
59-
/// * `false` - If extraction failed or no pattern was available and fields were missing
80+
/// * `Some` - If all expected fields are already present in the object OR if extraction was successful
81+
/// Contains fields present in catch group
82+
/// * `None` - If extraction failed or no pattern was available and fields were missing
6083
pub fn check_or_extract(
6184
&self,
6285
obj: &mut Map<String, Value>,
6386
extract_log: Option<&str>,
64-
) -> bool {
65-
if self
66-
.field_mappings
87+
) -> Option<HashSet<String>> {
88+
if let Some(pattern) = self
89+
.patterns
6790
.iter()
68-
.any(|fields| fields.iter().all(|field| obj.contains_key(field)))
91+
.find(|pattern| pattern.fields.iter().all(|field| obj.contains_key(field)))
6992
{
70-
return true;
93+
return Some(pattern.fields.clone());
7194
}
7295

7396
let Some(event) = extract_log
7497
.and_then(|field| obj.get(field))
7598
.and_then(|s| s.as_str())
7699
else {
77-
return false;
100+
return None;
78101
};
79102

80-
for pattern in self.patterns.iter() {
103+
for format in self.patterns.iter() {
104+
let Some(pattern) = format.pattern.as_ref() else {
105+
continue;
106+
};
81107
let Some(captures) = pattern.captures(event) else {
82108
continue;
83109
};
84110
let mut extracted_fields = Map::new();
85111

86112
// With named capture groups, you can iterate over the field names
87-
for field_name in self.field_mappings.iter().flatten() {
113+
for field_name in format.fields.iter() {
88114
if let Some(value) = captures.name(field_name) {
89115
extracted_fields.insert(
90116
field_name.to_owned(),
@@ -95,10 +121,10 @@ impl SchemaDefinition {
95121

96122
obj.extend(extracted_fields);
97123

98-
return true;
124+
return Some(format.fields.clone());
99125
}
100126

101-
false
127+
None
102128
}
103129
}
104130

@@ -109,13 +135,6 @@ struct Format {
109135
regex: Vec<Pattern>,
110136
}
111137

112-
/// Configuration for a single pattern within a log format
113-
#[derive(Debug, Deserialize)]
114-
struct Pattern {
115-
pattern: Option<String>,
116-
fields: HashSet<String>,
117-
}
118-
119138
/// Manages a collection of schema definitions for various log formats
120139
#[derive(Debug)]
121140
pub struct EventProcessor {
@@ -140,18 +159,7 @@ impl EventProcessor {
140159
.entry(format.name.clone())
141160
.or_insert_with(SchemaDefinition::default);
142161

143-
schema.field_mappings.push(regex.fields.clone());
144-
// Compile the regex pattern if present
145-
// NOTE: we only warn if the pattern doesn't compile
146-
if let Some(pattern) = regex.pattern.and_then(|pattern| {
147-
Regex::new(&pattern)
148-
.inspect_err(|err| {
149-
error!("Error compiling regex pattern: {err}; Pattern: {pattern}")
150-
})
151-
.ok()
152-
}) {
153-
schema.patterns.push(pattern);
154-
}
162+
schema.patterns.push(regex);
155163
}
156164
}
157165

@@ -173,32 +181,37 @@ impl EventProcessor {
173181
json: &mut Value,
174182
log_source: &str,
175183
extract_log: Option<&str>,
176-
) -> Result<(), Unacceptable> {
184+
) -> Result<HashSet<String>, Unacceptable> {
177185
let Some(schema) = self.schema_definitions.get(log_source) else {
178186
warn!("Unknown log format: {log_source}");
179-
return Ok(());
187+
return Ok(Default::default());
180188
};
181189

190+
let mut fields = HashSet::new();
182191
match json {
183192
Value::Array(list) => {
184193
for event in list {
185194
let Value::Object(event) = event else {
186195
continue;
187196
};
188-
if !schema.check_or_extract(event, extract_log) {
197+
if let Some(known_fields) = schema.check_or_extract(event, extract_log) {
198+
fields.extend(known_fields);
199+
} else {
189200
return Err(Unacceptable(log_source.to_owned()));
190201
}
191202
}
192203
}
193204
Value::Object(event) => {
194-
if !schema.check_or_extract(event, extract_log) {
205+
if let Some(known_fields) = schema.check_or_extract(event, extract_log) {
206+
return Ok(known_fields);
207+
} else {
195208
return Err(Unacceptable(log_source.to_owned()));
196209
}
197210
}
198211
_ => unreachable!("We don't accept events of the form: {json}"),
199212
}
200213

201-
Ok(())
214+
Ok(fields)
202215
}
203216
}
204217

@@ -244,7 +257,7 @@ mod tests {
244257

245258
// Use check_or_extract instead of extract
246259
let result = schema.check_or_extract(&mut obj, Some(log_field));
247-
assert!(result, "Failed to extract fields from valid log");
260+
assert!(result.is_some(), "Failed to extract fields from valid log");
248261

249262
// Verify extracted fields were added to the object
250263
assert_eq!(obj.get("ip").unwrap().as_str().unwrap(), "192.168.1.1");
@@ -275,7 +288,7 @@ mod tests {
275288

276289
// Use check_or_extract instead of extract
277290
let result = schema.check_or_extract(&mut obj, Some(log_field));
278-
assert!(result, "Failed to extract fields from valid log");
291+
assert!(result.is_some(), "Failed to extract fields from valid log");
279292

280293
// Verify extracted fields were added to the object
281294
assert_eq!(obj.get("level").unwrap().as_str().unwrap(), "ERROR");
@@ -308,7 +321,10 @@ mod tests {
308321

309322
// check_or_extract should return true without modifying anything
310323
let result = schema.check_or_extract(&mut obj, None);
311-
assert!(result, "Should return true when fields already exist");
324+
assert!(
325+
result.is_some(),
326+
"Should return true when fields already exist"
327+
);
312328

313329
// Verify the original values weren't changed
314330
assert_eq!(
@@ -332,7 +348,10 @@ mod tests {
332348

333349
// check_or_extract should return false
334350
let result = schema.check_or_extract(&mut obj, Some(log_field));
335-
assert!(!result, "Should not extract fields from invalid log format");
351+
assert!(
352+
result.is_none(),
353+
"Should not extract fields from invalid log format"
354+
);
336355

337356
// Verify no fields were added
338357
assert!(!obj.contains_key("ip"));
@@ -343,11 +362,10 @@ mod tests {
343362
fn test_no_pattern_missing_fields() {
344363
// Create a schema definition with no pattern
345364
let schema = SchemaDefinition {
346-
patterns: vec![],
347-
field_mappings: vec![HashSet::from_iter([
348-
"field1".to_string(),
349-
"field2".to_string(),
350-
])],
365+
patterns: vec![Pattern {
366+
pattern: None,
367+
fields: HashSet::from_iter(["field1".to_string(), "field2".to_string()]),
368+
}],
351369
};
352370

353371
// Create an object missing the required fields
@@ -360,7 +378,7 @@ mod tests {
360378
// check_or_extract should return false
361379
let result = schema.check_or_extract(&mut obj, Some("log"));
362380
assert!(
363-
!result,
381+
result.is_none(),
364382
"Should return false when no pattern and missing fields"
365383
);
366384
}
@@ -467,7 +485,10 @@ mod tests {
467485

468486
// check_or_extract should return false
469487
let result = schema.check_or_extract(&mut obj, Some("raw_log"));
470-
assert!(!result, "Should return false when log field is missing");
488+
assert!(
489+
result.is_none(),
490+
"Should return false when log field is missing"
491+
);
471492

472493
// Verify no fields were added
473494
assert!(!obj.contains_key("level"));

src/handlers/http/ingest.rs

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use serde_json::Value;
2828

2929
use crate::event;
3030
use crate::event::error::EventError;
31-
use crate::event::format::known_schema::Unacceptable;
31+
use crate::event::format::known_schema::{Unacceptable, KNOWN_SCHEMA_LIST};
3232
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
3333
use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3434
use crate::metadata::SchemaVersion;
@@ -49,7 +49,7 @@ use super::users::filters::FiltersError;
4949
// Handler for POST /api/v1/ingest
5050
// ingests events by extracting stream name from header
5151
// creates if stream does not exist
52-
pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpResponse, PostError> {
52+
pub async fn ingest(req: HttpRequest, Json(mut json): Json<Value>) -> Result<HttpResponse, PostError> {
5353
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
5454
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
5555
};
@@ -78,7 +78,18 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7878
return Err(PostError::OtelNotSupported);
7979
}
8080

81-
let log_source_entry = LogSourceEntry::new(log_source.clone(), HashSet::new());
81+
let fields = match &log_source {
82+
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
83+
return Err(PostError::OtelNotSupported)
84+
}
85+
LogSource::Custom(src) => {
86+
KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)?
87+
}
88+
_ => HashSet::new(),
89+
};
90+
91+
let log_source_entry = LogSourceEntry::new(log_source.clone(), fields);
92+
8293
PARSEABLE
8394
.create_stream_if_not_exists(
8495
&stream_name,
@@ -87,7 +98,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
8798
)
8899
.await?;
89100

90-
flatten_and_push_logs(json, &stream_name, &log_source, extract_log).await?;
101+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
91102

92103
Ok(HttpResponse::Ok().finish())
93104
}
@@ -150,7 +161,7 @@ pub async fn handle_otel_logs_ingestion(
150161
)
151162
.await?;
152163

153-
flatten_and_push_logs(json, &stream_name, &log_source, None).await?;
164+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
154165

155166
Ok(HttpResponse::Ok().finish())
156167
}
@@ -188,7 +199,7 @@ pub async fn handle_otel_metrics_ingestion(
188199
)
189200
.await?;
190201

191-
flatten_and_push_logs(json, &stream_name, &log_source, None).await?;
202+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
192203

193204
Ok(HttpResponse::Ok().finish())
194205
}
@@ -228,7 +239,7 @@ pub async fn handle_otel_traces_ingestion(
228239
)
229240
.await?;
230241

231-
flatten_and_push_logs(json, &stream_name, &log_source, None).await?;
242+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
232243

233244
Ok(HttpResponse::Ok().finish())
234245
}
@@ -239,7 +250,7 @@ pub async fn handle_otel_traces_ingestion(
239250
pub async fn post_event(
240251
req: HttpRequest,
241252
stream_name: Path<String>,
242-
Json(json): Json<Value>,
253+
Json(mut json): Json<Value>,
243254
) -> Result<HttpResponse, PostError> {
244255
let stream_name = stream_name.into_inner();
245256

@@ -275,14 +286,17 @@ pub async fn post_event(
275286
.get(EXTRACT_LOG_KEY)
276287
.and_then(|h| h.to_str().ok());
277288

278-
if matches!(
279-
log_source,
280-
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
281-
) {
282-
return Err(PostError::OtelNotSupported);
289+
match &log_source {
290+
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
291+
return Err(PostError::OtelNotSupported)
292+
}
293+
LogSource::Custom(src) => {
294+
KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)?;
295+
}
296+
_ => {}
283297
}
284298

285-
flatten_and_push_logs(json, &stream_name, &log_source, extract_log).await?;
299+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
286300

287301
Ok(HttpResponse::Ok().finish())
288302
}

0 commit comments

Comments
 (0)