Skip to content

Commit 924ef5b

Browse files
author
Devdutt Shenoi
committed
Merge remote-tracking branch 'origin/main'
2 parents 6bb7afc + d147f48 commit 924ef5b

23 files changed

+993
-335
lines changed

Cargo.lock

Lines changed: 116 additions & 101 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@ build = "build.rs"
99

1010
[dependencies]
1111
# Arrow and DataFusion ecosystem
12-
arrow = "53.0.0"
13-
arrow-array = "53.0.0"
14-
arrow-flight = { version = "53.0.0", features = ["tls"] }
15-
arrow-ipc = { version = "53.0.0", features = ["zstd"] }
16-
arrow-json = "53.0.0"
17-
arrow-schema = { version = "53.0.0", features = ["serde"] }
18-
arrow-select = "53.0.0"
19-
datafusion = "44.0.0"
12+
arrow = "54.0.0"
13+
arrow-array = "54.0.0"
14+
arrow-flight = { version = "54.0.0", features = ["tls"] }
15+
arrow-ipc = { version = "54.0.0", features = ["zstd"] }
16+
arrow-json = "54.0.0"
17+
arrow-schema = { version = "54.0.0", features = ["serde"] }
18+
arrow-select = "54.0.0"
19+
datafusion = "45.0.0"
2020
object_store = { version = "0.11.2", features = ["cloud", "aws", "azure"] }
21-
parquet = "53.0.0"
21+
parquet = "54.0.0"
2222

2323
# Web server and HTTP-related
2424
actix-cors = "0.7.0"
@@ -135,7 +135,7 @@ anyhow = "1.0"
135135

136136
[dev-dependencies]
137137
rstest = "0.23.0"
138-
arrow = "53.0.0"
138+
arrow = "54.0.0"
139139
temp-dir = "0.1.14"
140140

141141
[package.metadata.parseable_ui]

src/connectors/kafka/processor.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use tracing::{debug, error};
2727

2828
use crate::{
2929
connectors::common::processor::Processor,
30-
event::format::{json, EventFormat, LogSource},
30+
event::format::{json, EventFormat, LogSource, LogSourceEntry},
3131
parseable::PARSEABLE,
3232
storage::StreamType,
3333
};
@@ -43,9 +43,14 @@ impl ParseableSinkProcessor {
4343
.first()
4444
.map(|r| r.topic.as_str())
4545
.unwrap_or_default();
46+
let log_source_entry = LogSourceEntry::default();
4647

4748
PARSEABLE
48-
.create_stream_if_not_exists(stream_name, StreamType::UserDefined, LogSource::Json)
49+
.create_stream_if_not_exists(
50+
stream_name,
51+
StreamType::UserDefined,
52+
vec![log_source_entry],
53+
)
4954
.await?;
5055

5156
let mut json_vec = Vec::with_capacity(records.len());

src/event/format/json.rs

Lines changed: 113 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use crate::{
5555
struct JsonPartition {
5656
batch: Vec<Json>,
5757
schema: Vec<Arc<Field>>,
58-
date: NaiveDate,
58+
parsed_timestamp: NaiveDateTime,
5959
}
6060

6161
pub struct Event {
@@ -209,7 +209,7 @@ impl EventFormat for Event {
209209
.collect()
210210
};
211211

212-
if fields_mismatch(&schema, data, schema_version) {
212+
if fields_mismatch(&schema, data, schema_version, static_schema_flag) {
213213
return Err(anyhow!(
214214
"Could not process this event due to mismatch in datatype"
215215
));
@@ -289,7 +289,7 @@ impl EventFormat for Event {
289289
JsonPartition {
290290
batch: vec![json],
291291
schema,
292-
date: parsed_timestamp.date(),
292+
parsed_timestamp,
293293
},
294294
);
295295
}
@@ -301,7 +301,7 @@ impl EventFormat for Event {
301301
JsonPartition {
302302
batch,
303303
schema,
304-
date,
304+
parsed_timestamp,
305305
},
306306
) in json_partitions
307307
{
@@ -313,14 +313,19 @@ impl EventFormat for Event {
313313
schema_version,
314314
)?;
315315

316-
partitions.insert(prefix, PartitionEvent { rb: batch, date });
316+
partitions.insert(
317+
prefix,
318+
PartitionEvent {
319+
rb: batch,
320+
parsed_timestamp,
321+
},
322+
);
317323
}
318324

319325
Ok(super::Event {
320326
origin_format: "json",
321327
origin_size,
322328
is_first_event,
323-
time_partition: None,
324329
partitions,
325330
stream_type,
326331
})
@@ -401,66 +406,124 @@ fn collect_keys(object: &Json) -> HashSet<&str> {
401406
}
402407

403408
// Returns true when the field doesn't exist in schema or has an invalid type
404-
fn fields_mismatch(schema: &[Arc<Field>], body: &Json, schema_version: SchemaVersion) -> bool {
409+
fn fields_mismatch(
410+
schema: &[Arc<Field>],
411+
body: &Json,
412+
schema_version: SchemaVersion,
413+
static_schema_flag: bool,
414+
) -> bool {
405415
body.iter().any(|(key, value)| {
406416
!value.is_null()
407417
&& get_field(schema, key)
408-
.is_none_or(|field| !valid_type(field.data_type(), value, schema_version))
418+
.is_none_or(|field| !valid_type(field, value, schema_version, static_schema_flag))
409419
})
410420
}
411421

412-
fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion) -> bool {
413-
match data_type {
422+
fn valid_type(
423+
field: &Field,
424+
value: &Value,
425+
schema_version: SchemaVersion,
426+
static_schema_flag: bool,
427+
) -> bool {
428+
match field.data_type() {
414429
DataType::Boolean => value.is_boolean(),
415-
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(),
430+
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
431+
validate_int(value, static_schema_flag)
432+
}
416433
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(),
417434
DataType::Float16 | DataType::Float32 => value.is_f64(),
418-
// All numbers can be cast as Float64 from schema version v1
419-
DataType::Float64 if schema_version == SchemaVersion::V1 => value.is_number(),
420-
DataType::Float64 if schema_version != SchemaVersion::V1 => value.is_f64(),
435+
DataType::Float64 => validate_float(value, schema_version, static_schema_flag),
421436
DataType::Utf8 => value.is_string(),
422-
DataType::List(field) => {
423-
let data_type = field.data_type();
424-
if let Value::Array(arr) = value {
425-
for elem in arr {
426-
if elem.is_null() {
427-
continue;
428-
}
429-
if !valid_type(data_type, elem, schema_version) {
430-
return false;
431-
}
432-
}
433-
}
434-
true
435-
}
437+
DataType::List(field) => validate_list(field, value, schema_version, static_schema_flag),
436438
DataType::Struct(fields) => {
437-
if let Value::Object(val) = value {
438-
for (key, value) in val {
439-
let field = (0..fields.len())
440-
.find(|idx| fields[*idx].name() == key)
441-
.map(|idx| &fields[idx]);
442-
443-
if let Some(field) = field {
444-
if value.is_null() {
445-
continue;
446-
}
447-
if !valid_type(field.data_type(), value, schema_version) {
448-
return false;
449-
}
450-
} else {
451-
return false;
452-
}
453-
}
454-
true
455-
} else {
456-
false
439+
validate_struct(fields, value, schema_version, static_schema_flag)
440+
}
441+
DataType::Date32 => {
442+
if let Value::String(s) = value {
443+
return NaiveDate::parse_from_str(s, "%Y-%m-%d").is_ok();
457444
}
445+
false
458446
}
459447
DataType::Timestamp(_, _) => value.is_string() || value.is_number(),
460448
_ => {
461-
error!("Unsupported datatype {:?}, value {:?}", data_type, value);
462-
unreachable!()
449+
error!(
450+
"Unsupported datatype {:?}, value {:?}",
451+
field.data_type(),
452+
value
453+
);
454+
false
455+
}
456+
}
457+
}
458+
459+
fn validate_int(value: &Value, static_schema_flag: bool) -> bool {
460+
// allow casting string to int for static schema
461+
if static_schema_flag {
462+
if let Value::String(s) = value {
463+
return s.trim().parse::<i64>().is_ok();
464+
}
465+
}
466+
value.is_i64()
467+
}
468+
469+
fn validate_float(value: &Value, schema_version: SchemaVersion, static_schema_flag: bool) -> bool {
470+
// allow casting string to int for static schema
471+
if static_schema_flag {
472+
if let Value::String(s) = value.clone() {
473+
let trimmed = s.trim();
474+
return trimmed.parse::<f64>().is_ok() || trimmed.parse::<i64>().is_ok();
475+
}
476+
return value.is_number();
477+
}
478+
match schema_version {
479+
SchemaVersion::V1 => value.is_number(),
480+
_ => value.is_f64(),
481+
}
482+
}
483+
484+
fn validate_list(
485+
field: &Field,
486+
value: &Value,
487+
schema_version: SchemaVersion,
488+
static_schema_flag: bool,
489+
) -> bool {
490+
if let Value::Array(arr) = value {
491+
for elem in arr {
492+
if elem.is_null() {
493+
continue;
494+
}
495+
if !valid_type(field, elem, schema_version, static_schema_flag) {
496+
return false;
497+
}
498+
}
499+
}
500+
true
501+
}
502+
503+
fn validate_struct(
504+
fields: &Fields,
505+
value: &Value,
506+
schema_version: SchemaVersion,
507+
static_schema_flag: bool,
508+
) -> bool {
509+
if let Value::Object(val) = value {
510+
for (key, value) in val {
511+
let field = fields.iter().find(|f| f.name() == key);
512+
513+
if let Some(field) = field {
514+
if value.is_null() {
515+
continue;
516+
}
517+
if !valid_type(field, value, schema_version, static_schema_flag) {
518+
return false;
519+
}
520+
} else {
521+
return false;
522+
}
463523
}
524+
true
525+
} else {
526+
false
464527
}
465528
}
466529

src/event/format/mod.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ static TIME_FIELD_NAME_PARTS: [&str; 2] = ["time", "date"];
4848
type EventSchema = Vec<Arc<Field>>;
4949

5050
/// Source of the logs, used to perform special processing for certain sources
51-
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
51+
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
5252
pub enum LogSource {
5353
// AWS Kinesis sends logs in the format of a json array
5454
Kinesis,
@@ -98,6 +98,23 @@ impl Display for LogSource {
9898

9999
pub type IsFirstEvent = bool;
100100

101+
/// Contains the format name and a list of known field names that are associated with the said format.
102+
/// Stored on disk as part of `ObjectStoreFormat` in stream.json
103+
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
104+
pub struct LogSourceEntry {
105+
pub log_source_format: LogSource,
106+
pub fields: HashSet<String>,
107+
}
108+
109+
impl LogSourceEntry {
110+
pub fn new(log_source_format: LogSource, fields: HashSet<String>) -> Self {
111+
LogSourceEntry {
112+
log_source_format,
113+
fields,
114+
}
115+
}
116+
}
117+
101118
// Global Trait for event format
102119
// This trait is implemented by all the event formats
103120
pub trait EventFormat: Sized {

src/event/mod.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,42 +26,48 @@ use std::sync::Arc;
2626

2727
use self::error::EventError;
2828
use crate::{metadata::update_stats, parseable::Stream, storage::StreamType};
29-
use chrono::NaiveDate;
29+
use chrono::NaiveDateTime;
3030
use std::collections::HashMap;
3131

3232
pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
3333

3434
#[derive(Debug)]
3535
pub struct PartitionEvent {
3636
pub rb: RecordBatch,
37-
pub date: NaiveDate,
37+
pub parsed_timestamp: NaiveDateTime,
3838
}
3939

4040
#[derive(Debug)]
4141
pub struct Event {
4242
pub origin_format: &'static str,
4343
pub origin_size: usize,
4444
pub is_first_event: bool,
45-
pub time_partition: Option<String>,
4645
pub partitions: HashMap<String, PartitionEvent>,
4746
pub stream_type: StreamType,
4847
}
4948

5049
// Events holds the schema related to a each event for a single log stream
5150
impl Event {
5251
pub fn process(self, stream: &Stream) -> Result<(), EventError> {
53-
for (prefix, PartitionEvent { rb, date }) in self.partitions {
52+
for (
53+
prefix,
54+
PartitionEvent {
55+
rb,
56+
parsed_timestamp,
57+
},
58+
) in self.partitions
59+
{
5460
if self.is_first_event {
5561
stream.commit_schema(rb.schema().as_ref().clone())?;
5662
}
57-
stream.push(&prefix, &rb, self.stream_type)?;
63+
stream.push(&prefix, parsed_timestamp, &rb, self.stream_type)?;
5864

5965
update_stats(
6066
&stream.stream_name,
6167
self.origin_format,
6268
self.origin_size,
6369
rb.num_rows(),
64-
date,
70+
parsed_timestamp.date(),
6571
);
6672

6773
crate::livetail::LIVETAIL.process(&stream.stream_name, &rb);
@@ -71,7 +77,12 @@ impl Event {
7177

7278
pub fn process_unchecked(&self, stream: &Stream) -> Result<(), EventError> {
7379
for (prefix, partition) in &self.partitions {
74-
stream.push(prefix, &partition.rb, self.stream_type)?;
80+
stream.push(
81+
prefix,
82+
partition.parsed_timestamp,
83+
&partition.rb,
84+
self.stream_type,
85+
)?;
7586
}
7687

7788
Ok(())

0 commit comments

Comments
 (0)