Skip to content

Commit 7b7a60c

Browse files
Merge branch 'main' into event
2 parents 683881f + 671abae commit 7b7a60c

File tree

5 files changed

+61
-42
lines changed

5 files changed

+61
-42
lines changed

src/handlers/http/ingest.rs

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,21 @@ use arrow_array::RecordBatch;
2424
use bytes::Bytes;
2525
use chrono::Utc;
2626
use http::StatusCode;
27-
use opentelemetry_proto::tonic::logs::v1::LogsData;
28-
use opentelemetry_proto::tonic::metrics::v1::MetricsData;
29-
use opentelemetry_proto::tonic::trace::v1::TracesData;
3027
use serde_json::Value;
3128

3229
use crate::event::error::EventError;
3330
use crate::event::format::{self, EventFormat, LogSource};
3431
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3532
use crate::metadata::SchemaVersion;
3633
use crate::option::Mode;
37-
use crate::otel::logs::flatten_otel_logs;
38-
use crate::otel::metrics::flatten_otel_metrics;
39-
use crate::otel::traces::flatten_otel_traces;
4034
use crate::parseable::{StreamNotFound, PARSEABLE};
4135
use crate::storage::{ObjectStorageError, StreamType};
4236
use crate::utils::header_parsing::ParseHeaderError;
4337
use crate::utils::json::flatten::JsonFlattenError;
4438
use crate::{event, LOCK_EXPECT};
4539

4640
use super::logstream::error::{CreateStreamError, StreamError};
47-
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
41+
use super::modal::utils::ingest_utils::flatten_and_push_logs;
4842
use super::users::dashboards::DashboardError;
4943
use super::users::filters::FiltersError;
5044

@@ -70,6 +64,14 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7064
.get(LOG_SOURCE_KEY)
7165
.and_then(|h| h.to_str().ok())
7266
.map_or(LogSource::default(), LogSource::from);
67+
68+
if matches!(
69+
log_source,
70+
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
71+
) {
72+
return Err(PostError::OtelNotSupported);
73+
}
74+
7375
flatten_and_push_logs(json, &stream_name, &log_source).await?;
7476

7577
Ok(HttpResponse::Ok().finish())
@@ -133,11 +135,7 @@ pub async fn handle_otel_logs_ingestion(
133135
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs)
134136
.await?;
135137

136-
//custom flattening required for otel logs
137-
let logs: LogsData = serde_json::from_value(json)?;
138-
for record in flatten_otel_logs(&logs) {
139-
push_logs(&stream_name, record, &log_source).await?;
140-
}
138+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
141139

142140
Ok(HttpResponse::Ok().finish())
143141
}
@@ -168,11 +166,7 @@ pub async fn handle_otel_metrics_ingestion(
168166
)
169167
.await?;
170168

171-
//custom flattening required for otel metrics
172-
let metrics: MetricsData = serde_json::from_value(json)?;
173-
for record in flatten_otel_metrics(metrics) {
174-
push_logs(&stream_name, record, &log_source).await?;
175-
}
169+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
176170

177171
Ok(HttpResponse::Ok().finish())
178172
}
@@ -200,11 +194,7 @@ pub async fn handle_otel_traces_ingestion(
200194
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces)
201195
.await?;
202196

203-
//custom flattening required for otel traces
204-
let traces: TracesData = serde_json::from_value(json)?;
205-
for record in flatten_otel_traces(&traces) {
206-
push_logs(&stream_name, record, &log_source).await?;
207-
}
197+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
208198

209199
Ok(HttpResponse::Ok().finish())
210200
}
@@ -245,6 +235,14 @@ pub async fn post_event(
245235
.get(LOG_SOURCE_KEY)
246236
.and_then(|h| h.to_str().ok())
247237
.map_or(LogSource::default(), LogSource::from);
238+
239+
if matches!(
240+
log_source,
241+
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
242+
) {
243+
return Err(PostError::OtelNotSupported);
244+
}
245+
248246
flatten_and_push_logs(json, &stream_name, &log_source).await?;
249247

250248
Ok(HttpResponse::Ok().finish())

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
use arrow_schema::Field;
2020
use chrono::{DateTime, NaiveDateTime, Utc};
2121
use itertools::Itertools;
22+
use opentelemetry_proto::tonic::{
23+
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
24+
};
2225
use serde_json::Value;
2326
use std::{collections::HashMap, sync::Arc};
2427

@@ -32,6 +35,7 @@ use crate::{
3235
kinesis::{flatten_kinesis_logs, Message},
3336
},
3437
metadata::SchemaVersion,
38+
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
3539
parseable::{StreamNotFound, PARSEABLE},
3640
storage::StreamType,
3741
utils::json::{convert_array_to_object, flatten::convert_to_array},
@@ -45,14 +49,32 @@ pub async fn flatten_and_push_logs(
4549
) -> Result<(), PostError> {
4650
match log_source {
4751
LogSource::Kinesis => {
52+
//custom flattening required for Amazon Kinesis
4853
let message: Message = serde_json::from_value(json)?;
49-
let json = flatten_kinesis_logs(message);
50-
for record in json {
54+
for record in flatten_kinesis_logs(message) {
5155
push_logs(stream_name, record, &LogSource::default()).await?;
5256
}
5357
}
54-
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
55-
return Err(PostError::OtelNotSupported);
58+
LogSource::OtelLogs => {
59+
//custom flattening required for otel logs
60+
let logs: LogsData = serde_json::from_value(json)?;
61+
for record in flatten_otel_logs(&logs) {
62+
push_logs(stream_name, record, log_source).await?;
63+
}
64+
}
65+
LogSource::OtelTraces => {
66+
//custom flattening required for otel traces
67+
let traces: TracesData = serde_json::from_value(json)?;
68+
for record in flatten_otel_traces(&traces) {
69+
push_logs(stream_name, record, log_source).await?;
70+
}
71+
}
72+
LogSource::OtelMetrics => {
73+
//custom flattening required for otel metrics
74+
let metrics: MetricsData = serde_json::from_value(json)?;
75+
for record in flatten_otel_metrics(metrics) {
76+
push_logs(stream_name, record, log_source).await?;
77+
}
5678
}
5779
_ => {
5880
push_logs(stream_name, json, log_source).await?;
@@ -61,7 +83,7 @@ pub async fn flatten_and_push_logs(
6183
Ok(())
6284
}
6385

64-
pub async fn push_logs(
86+
async fn push_logs(
6587
stream_name: &str,
6688
json: Value,
6789
log_source: &LogSource,

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
pub mod about;
20-
mod alerts;
20+
pub mod alerts;
2121
pub mod analytics;
2222
pub mod audit;
2323
pub mod banner;

src/otel/traces.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,15 @@ fn flatten_span_record(span_record: &Span) -> Vec<Map<String, Value>> {
293293
span_record_json.extend(flatten_status(status));
294294
}
295295

296-
for span_json in &mut span_records_json {
297-
for (key, value) in &span_record_json {
298-
span_json.insert(key.clone(), value.clone());
296+
// if span_record.events is null, code should still flatten other elements in the span record - this is handled in the if block
297+
// else block handles the flattening the span record that includes events and links records in each span record
298+
if span_records_json.is_empty() {
299+
span_records_json = vec![span_record_json];
300+
} else {
301+
for span_json in &mut span_records_json {
302+
for (key, value) in &span_record_json {
303+
span_json.insert(key.clone(), value.clone());
304+
}
299305
}
300306
}
301307

src/utils/json/flatten.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -335,18 +335,11 @@ pub fn has_more_than_four_levels(value: &Value, current_level: usize) -> bool {
335335

336336
// Converts a Vector of values into a `Value::Array`, as long as all of them are objects
337337
pub fn convert_to_array(flattened: Vec<Value>) -> Result<Value, JsonFlattenError> {
338-
let mut result = Vec::new();
339-
for item in flattened {
340-
let mut map = Map::new();
341-
let Some(item) = item.as_object() else {
342-
return Err(JsonFlattenError::ExpectedObjectInArray);
343-
};
344-
for (key, value) in item {
345-
map.insert(key.clone(), value.clone());
346-
}
347-
result.push(Value::Object(map));
338+
if flattened.iter().any(|item| !item.is_object()) {
339+
return Err(JsonFlattenError::ExpectedObjectInArray);
348340
}
349-
Ok(Value::Array(result))
341+
342+
Ok(Value::Array(flattened))
350343
}
351344

352345
#[cfg(test)]

0 commit comments

Comments
 (0)