Skip to content

Commit 85e13cc

Browse files
author
Devdutt Shenoi
committed
Merge remote-tracking branch 'origin/main' into schema-detect
2 parents f756639 + 6fe35a6 commit 85e13cc

File tree

8 files changed

+290
-52
lines changed

8 files changed

+290
-52
lines changed

src/connectors/kafka/processor.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,20 @@
1616
*
1717
*/
1818

19-
use std::sync::Arc;
20-
2119
use async_trait::async_trait;
2220
use futures_util::StreamExt;
2321
use rdkafka::consumer::{CommitMode, Consumer};
2422
use serde_json::Value;
23+
use std::collections::HashMap;
24+
use std::sync::Arc;
2525
use tokio_stream::wrappers::ReceiverStream;
2626
use tracing::{debug, error};
2727

2828
use crate::{
2929
connectors::common::processor::Processor,
3030
event::{
3131
format::{json, EventFormat, LogSourceEntry},
32-
Event as ParseableEvent,
32+
Event as ParseableEvent, USER_AGENT_KEY,
3333
},
3434
parseable::PARSEABLE,
3535
storage::StreamType,
@@ -76,6 +76,9 @@ impl ParseableSinkProcessor {
7676
}
7777
}
7878

79+
let mut p_custom_fields = HashMap::new();
80+
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "kafka".to_string());
81+
7982
let p_event = json::Event::new(Value::Array(json_vec)).into_event(
8083
stream_name.to_string(),
8184
total_payload_size,
@@ -85,6 +88,7 @@ impl ParseableSinkProcessor {
8588
time_partition.as_ref(),
8689
schema_version,
8790
StreamType::UserDefined,
91+
&p_custom_fields,
8892
)?;
8993

9094
Ok(p_event)

src/event/format/json.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ impl EventFormat for Event {
149149
time_partition: Option<&String>,
150150
schema_version: SchemaVersion,
151151
stream_type: StreamType,
152+
p_custom_fields: &HashMap<String, String>,
152153
) -> Result<super::Event, anyhow::Error> {
153154
let custom_partition_values = match custom_partitions.as_ref() {
154155
Some(custom_partition) => {
@@ -168,6 +169,7 @@ impl EventFormat for Event {
168169
static_schema_flag,
169170
time_partition,
170171
schema_version,
172+
p_custom_fields,
171173
)?;
172174

173175
Ok(super::Event {

src/event/format/mod.rs

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use serde_json::Value;
3333
use crate::{
3434
metadata::SchemaVersion,
3535
storage::StreamType,
36-
utils::arrow::{get_field, get_timestamp_array, replace_columns},
36+
utils::arrow::{add_parseable_fields, get_field},
3737
};
3838

3939
use super::{Event, DEFAULT_TIMESTAMP_KEY};
@@ -149,9 +149,10 @@ pub trait EventFormat: Sized {
149149
static_schema_flag: bool,
150150
time_partition: Option<&String>,
151151
schema_version: SchemaVersion,
152+
p_custom_fields: &HashMap<String, String>,
152153
) -> Result<(RecordBatch, bool), AnyError> {
153154
let p_timestamp = self.get_p_timestamp();
154-
let (data, mut schema, is_first) = self.to_data(
155+
let (data, schema, is_first) = self.to_data(
155156
storage_schema,
156157
time_partition,
157158
schema_version,
@@ -165,16 +166,6 @@ pub trait EventFormat: Sized {
165166
));
166167
};
167168

168-
// add the p_timestamp field to the event schema to the 0th index
169-
schema.insert(
170-
0,
171-
Arc::new(Field::new(
172-
DEFAULT_TIMESTAMP_KEY,
173-
DataType::Timestamp(TimeUnit::Millisecond, None),
174-
true,
175-
)),
176-
);
177-
178169
// prepare the record batch and new fields to be added
179170
let mut new_schema = Arc::new(Schema::new(schema));
180171
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
@@ -183,13 +174,9 @@ pub trait EventFormat: Sized {
183174
new_schema =
184175
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);
185176

186-
let mut rb = Self::decode(data, new_schema.clone())?;
187-
rb = replace_columns(
188-
rb.schema(),
189-
&rb,
190-
&[0],
191-
&[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))],
192-
);
177+
let rb = Self::decode(data, new_schema.clone())?;
178+
179+
let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields)?;
193180

194181
Ok((rb, is_first))
195182
}
@@ -227,6 +214,7 @@ pub trait EventFormat: Sized {
227214
time_partition: Option<&String>,
228215
schema_version: SchemaVersion,
229216
stream_type: StreamType,
217+
p_custom_fields: &HashMap<String, String>,
230218
) -> Result<Event, AnyError>;
231219
}
232220

src/event/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ use chrono::NaiveDateTime;
3535
use std::collections::HashMap;
3636

3737
pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
38+
pub const USER_AGENT_KEY: &str = "p_user_agent";
39+
pub const SOURCE_IP_KEY: &str = "p_src_ip";
40+
pub const FORMAT_KEY: &str = "p_format";
3841

3942
#[derive(Clone)]
4043
pub struct Event {

src/handlers/http/audit.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use actix_web::{
2323
middleware::Next,
2424
};
2525
use actix_web_httpauth::extractors::basic::BasicAuth;
26+
use http::header::USER_AGENT;
2627
use ulid::Ulid;
2728

2829
use crate::{
@@ -85,7 +86,7 @@ pub async fn audit_log_middleware(
8586
)
8687
.with_user_agent(
8788
req.headers()
88-
.get("User-Agent")
89+
.get(USER_AGENT)
8990
.and_then(|a| a.to_str().ok())
9091
.unwrap_or_default(),
9192
)

src/handlers/http/ingest.rs

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::utils::header_parsing::ParseHeaderError;
4242
use crate::utils::json::flatten::JsonFlattenError;
4343

4444
use super::logstream::error::{CreateStreamError, StreamError};
45-
use super::modal::utils::ingest_utils::flatten_and_push_logs;
45+
use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header};
4646
use super::users::dashboards::DashboardError;
4747
use super::users::filters::FiltersError;
4848

@@ -92,6 +92,7 @@ pub async fn ingest(
9292
};
9393

9494
let log_source_entry = LogSourceEntry::new(log_source.clone(), fields);
95+
let p_custom_fields = get_custom_fields_from_header(req);
9596

9697
PARSEABLE
9798
.create_stream_if_not_exists(
@@ -101,7 +102,7 @@ pub async fn ingest(
101102
)
102103
.await?;
103104

104-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
105+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
105106

106107
Ok(HttpResponse::Ok().finish())
107108
}
@@ -122,6 +123,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
122123
None,
123124
SchemaVersion::V0,
124125
StreamType::Internal,
126+
&HashMap::new(),
125127
)?
126128
.process()?;
127129

@@ -163,8 +165,9 @@ pub async fn handle_otel_logs_ingestion(
163165
vec![log_source_entry],
164166
)
165167
.await?;
168+
let p_custom_fields = get_custom_fields_from_header(req);
166169

167-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
170+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
168171

169172
Ok(HttpResponse::Ok().finish())
170173
}
@@ -186,6 +189,7 @@ pub async fn handle_otel_metrics_ingestion(
186189
if log_source != LogSource::OtelMetrics {
187190
return Err(PostError::IncorrectLogSource(LogSource::OtelMetrics));
188191
}
192+
189193
let stream_name = stream_name.to_str().unwrap().to_owned();
190194
let log_source_entry = LogSourceEntry::new(
191195
log_source.clone(),
@@ -202,7 +206,9 @@ pub async fn handle_otel_metrics_ingestion(
202206
)
203207
.await?;
204208

205-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
209+
let p_custom_fields = get_custom_fields_from_header(req);
210+
211+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
206212

207213
Ok(HttpResponse::Ok().finish())
208214
}
@@ -242,7 +248,9 @@ pub async fn handle_otel_traces_ingestion(
242248
)
243249
.await?;
244250

245-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
251+
let p_custom_fields = get_custom_fields_from_header(req);
252+
253+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
246254

247255
Ok(HttpResponse::Ok().finish())
248256
}
@@ -299,7 +307,8 @@ pub async fn post_event(
299307
_ => {}
300308
}
301309

302-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
310+
let p_custom_fields = get_custom_fields_from_header(req);
311+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
303312

304313
Ok(HttpResponse::Ok().finish())
305314
}
@@ -452,7 +461,13 @@ mod tests {
452461
});
453462

454463
let (rb, _) = json::Event::new(json)
455-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
464+
.into_recordbatch(
465+
&HashMap::default(),
466+
false,
467+
None,
468+
SchemaVersion::V0,
469+
&HashMap::new(),
470+
)
456471
.unwrap();
457472

458473
assert_eq!(rb.num_rows(), 1);
@@ -480,7 +495,13 @@ mod tests {
480495
});
481496

482497
let (rb, _) = json::Event::new(json)
483-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
498+
.into_recordbatch(
499+
&HashMap::default(),
500+
false,
501+
None,
502+
SchemaVersion::V0,
503+
&HashMap::new(),
504+
)
484505
.unwrap();
485506

486507
assert_eq!(rb.num_rows(), 1);
@@ -512,7 +533,7 @@ mod tests {
512533
);
513534

514535
let (rb, _) = json::Event::new(json)
515-
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
536+
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
516537
.unwrap();
517538

518539
assert_eq!(rb.num_rows(), 1);
@@ -544,7 +565,7 @@ mod tests {
544565
);
545566

546567
assert!(json::Event::new(json)
547-
.into_recordbatch(&schema, false, None, SchemaVersion::V0,)
568+
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
548569
.is_err());
549570
}
550571

@@ -562,7 +583,7 @@ mod tests {
562583
);
563584

564585
let (rb, _) = json::Event::new(json)
565-
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
586+
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
566587
.unwrap();
567588

568589
assert_eq!(rb.num_rows(), 1);
@@ -603,7 +624,13 @@ mod tests {
603624
]);
604625

605626
let (rb, _) = json::Event::new(json)
606-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
627+
.into_recordbatch(
628+
&HashMap::default(),
629+
false,
630+
None,
631+
SchemaVersion::V0,
632+
&HashMap::new(),
633+
)
607634
.unwrap();
608635

609636
assert_eq!(rb.num_rows(), 3);
@@ -651,7 +678,13 @@ mod tests {
651678
]);
652679

653680
let (rb, _) = json::Event::new(json)
654-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
681+
.into_recordbatch(
682+
&HashMap::default(),
683+
false,
684+
None,
685+
SchemaVersion::V0,
686+
&HashMap::new(),
687+
)
655688
.unwrap();
656689

657690
assert_eq!(rb.num_rows(), 3);
@@ -700,7 +733,7 @@ mod tests {
700733
);
701734

702735
let (rb, _) = json::Event::new(json)
703-
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
736+
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
704737
.unwrap();
705738

706739
assert_eq!(rb.num_rows(), 3);
@@ -749,7 +782,7 @@ mod tests {
749782
);
750783

751784
assert!(json::Event::new(json)
752-
.into_recordbatch(&schema, false, None, SchemaVersion::V0,)
785+
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
753786
.is_err());
754787
}
755788

@@ -789,7 +822,13 @@ mod tests {
789822
.unwrap();
790823

791824
let (rb, _) = json::Event::new(flattened_json)
792-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
825+
.into_recordbatch(
826+
&HashMap::default(),
827+
false,
828+
None,
829+
SchemaVersion::V0,
830+
&HashMap::new(),
831+
)
793832
.unwrap();
794833
assert_eq!(rb.num_rows(), 4);
795834
assert_eq!(rb.num_columns(), 5);
@@ -872,7 +911,13 @@ mod tests {
872911
.unwrap();
873912

874913
let (rb, _) = json::Event::new(flattened_json)
875-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1)
914+
.into_recordbatch(
915+
&HashMap::default(),
916+
false,
917+
None,
918+
SchemaVersion::V1,
919+
&HashMap::new(),
920+
)
876921
.unwrap();
877922

878923
assert_eq!(rb.num_rows(), 4);

0 commit comments

Comments
 (0)