Skip to content

Commit 509d8d6

Browse files
author
Devdutt Shenoi
authored
Merge branch 'main' into static-schema
2 parents 5149831 + bd07a47 commit 509d8d6

File tree

3 files changed

+48
-28
lines changed

3 files changed

+48
-28
lines changed

src/handlers/http/ingest.rs

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,21 @@ use arrow_array::RecordBatch;
2424
use bytes::Bytes;
2525
use chrono::Utc;
2626
use http::StatusCode;
27-
use opentelemetry_proto::tonic::logs::v1::LogsData;
28-
use opentelemetry_proto::tonic::metrics::v1::MetricsData;
29-
use opentelemetry_proto::tonic::trace::v1::TracesData;
3027
use serde_json::Value;
3128

3229
use crate::event::error::EventError;
3330
use crate::event::format::{self, EventFormat, LogSource};
3431
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3532
use crate::metadata::SchemaVersion;
3633
use crate::option::Mode;
37-
use crate::otel::logs::flatten_otel_logs;
38-
use crate::otel::metrics::flatten_otel_metrics;
39-
use crate::otel::traces::flatten_otel_traces;
4034
use crate::parseable::{StreamNotFound, PARSEABLE};
4135
use crate::storage::{ObjectStorageError, StreamType};
4236
use crate::utils::header_parsing::ParseHeaderError;
4337
use crate::utils::json::flatten::JsonFlattenError;
4438
use crate::{event, LOCK_EXPECT};
4539

4640
use super::logstream::error::{CreateStreamError, StreamError};
47-
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
41+
use super::modal::utils::ingest_utils::flatten_and_push_logs;
4842
use super::users::dashboards::DashboardError;
4943
use super::users::filters::FiltersError;
5044

@@ -70,6 +64,14 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7064
.get(LOG_SOURCE_KEY)
7165
.and_then(|h| h.to_str().ok())
7266
.map_or(LogSource::default(), LogSource::from);
67+
68+
if matches!(
69+
log_source,
70+
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
71+
) {
72+
return Err(PostError::OtelNotSupported);
73+
}
74+
7375
flatten_and_push_logs(json, &stream_name, &log_source).await?;
7476

7577
Ok(HttpResponse::Ok().finish())
@@ -133,11 +135,7 @@ pub async fn handle_otel_logs_ingestion(
133135
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs)
134136
.await?;
135137

136-
//custom flattening required for otel logs
137-
let logs: LogsData = serde_json::from_value(json)?;
138-
for record in flatten_otel_logs(&logs) {
139-
push_logs(&stream_name, record, &log_source).await?;
140-
}
138+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
141139

142140
Ok(HttpResponse::Ok().finish())
143141
}
@@ -168,11 +166,7 @@ pub async fn handle_otel_metrics_ingestion(
168166
)
169167
.await?;
170168

171-
//custom flattening required for otel metrics
172-
let metrics: MetricsData = serde_json::from_value(json)?;
173-
for record in flatten_otel_metrics(metrics) {
174-
push_logs(&stream_name, record, &log_source).await?;
175-
}
169+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
176170

177171
Ok(HttpResponse::Ok().finish())
178172
}
@@ -200,11 +194,7 @@ pub async fn handle_otel_traces_ingestion(
200194
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces)
201195
.await?;
202196

203-
//custom flattening required for otel traces
204-
let traces: TracesData = serde_json::from_value(json)?;
205-
for record in flatten_otel_traces(&traces) {
206-
push_logs(&stream_name, record, &log_source).await?;
207-
}
197+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
208198

209199
Ok(HttpResponse::Ok().finish())
210200
}
@@ -245,6 +235,14 @@ pub async fn post_event(
245235
.get(LOG_SOURCE_KEY)
246236
.and_then(|h| h.to_str().ok())
247237
.map_or(LogSource::default(), LogSource::from);
238+
239+
if matches!(
240+
log_source,
241+
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
242+
) {
243+
return Err(PostError::OtelNotSupported);
244+
}
245+
248246
flatten_and_push_logs(json, &stream_name, &log_source).await?;
249247

250248
Ok(HttpResponse::Ok().finish())

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
use arrow_schema::Field;
2020
use chrono::{DateTime, NaiveDateTime, Utc};
2121
use itertools::Itertools;
22+
use opentelemetry_proto::tonic::{
23+
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
24+
};
2225
use serde_json::Value;
2326
use std::{collections::HashMap, sync::Arc};
2427

@@ -32,6 +35,7 @@ use crate::{
3235
kinesis::{flatten_kinesis_logs, Message},
3336
},
3437
metadata::SchemaVersion,
38+
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
3539
parseable::{StreamNotFound, PARSEABLE},
3640
storage::StreamType,
3741
utils::json::{convert_array_to_object, flatten::convert_to_array},
@@ -45,14 +49,32 @@ pub async fn flatten_and_push_logs(
4549
) -> Result<(), PostError> {
4650
match log_source {
4751
LogSource::Kinesis => {
52+
//custom flattening required for Amazon Kinesis
4853
let message: Message = serde_json::from_value(json)?;
49-
let json = flatten_kinesis_logs(message);
50-
for record in json {
54+
for record in flatten_kinesis_logs(message) {
5155
push_logs(stream_name, record, &LogSource::default()).await?;
5256
}
5357
}
54-
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
55-
return Err(PostError::OtelNotSupported);
58+
LogSource::OtelLogs => {
59+
//custom flattening required for otel logs
60+
let logs: LogsData = serde_json::from_value(json)?;
61+
for record in flatten_otel_logs(&logs) {
62+
push_logs(stream_name, record, log_source).await?;
63+
}
64+
}
65+
LogSource::OtelTraces => {
66+
//custom flattening required for otel traces
67+
let traces: TracesData = serde_json::from_value(json)?;
68+
for record in flatten_otel_traces(&traces) {
69+
push_logs(stream_name, record, log_source).await?;
70+
}
71+
}
72+
LogSource::OtelMetrics => {
73+
//custom flattening required for otel metrics
74+
let metrics: MetricsData = serde_json::from_value(json)?;
75+
for record in flatten_otel_metrics(metrics) {
76+
push_logs(stream_name, record, log_source).await?;
77+
}
5678
}
5779
_ => {
5880
push_logs(stream_name, json, log_source).await?;
@@ -61,7 +83,7 @@ pub async fn flatten_and_push_logs(
6183
Ok(())
6284
}
6385

64-
pub async fn push_logs(
86+
async fn push_logs(
6587
stream_name: &str,
6688
json: Value,
6789
log_source: &LogSource,

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
pub mod about;
20-
mod alerts;
20+
pub mod alerts;
2121
pub mod analytics;
2222
pub mod audit;
2323
pub mod banner;

0 commit comments

Comments
 (0)