Skip to content

Commit 02610d0

Browse files
fix: data type for static schema
if string parsable to int, consider it valid if string parsable to float, consider it valid
1 parent 506527d commit 02610d0

File tree

2 files changed

+50
-15
lines changed

2 files changed

+50
-15
lines changed

src/event/format/json.rs

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ impl EventFormat for Event {
6262
schema: &HashMap<String, Arc<Field>>,
6363
time_partition: Option<&String>,
6464
schema_version: SchemaVersion,
65+
static_schema_flag: bool,
6566
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
6667
let stream_schema = schema;
6768

@@ -111,7 +112,7 @@ impl EventFormat for Event {
111112

112113
if value_arr
113114
.iter()
114-
.any(|value| fields_mismatch(&schema, value, schema_version))
115+
.any(|value| fields_mismatch(&schema, value, schema_version, static_schema_flag))
115116
{
116117
return Err(anyhow!(
117118
"Could not process this event due to mismatch in datatype"
@@ -253,39 +254,64 @@ fn collect_keys<'a>(values: impl Iterator<Item = &'a Value>) -> Result<Vec<&'a s
253254
Ok(keys)
254255
}
255256

256-
fn fields_mismatch(schema: &[Arc<Field>], body: &Value, schema_version: SchemaVersion) -> bool {
257+
fn fields_mismatch(
258+
schema: &[Arc<Field>],
259+
body: &Value,
260+
schema_version: SchemaVersion,
261+
static_schema_flag: bool,
262+
) -> bool {
257263
for (name, val) in body.as_object().expect("body is of object variant") {
258264
if val.is_null() {
259265
continue;
260266
}
261267
let Some(field) = get_field(schema, name) else {
262268
return true;
263269
};
264-
if !valid_type(field.data_type(), val, schema_version) {
270+
if !valid_type(field, val, schema_version, static_schema_flag) {
265271
return true;
266272
}
267273
}
268274
false
269275
}
270276

271-
fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion) -> bool {
272-
match data_type {
277+
fn valid_type(
278+
field: &Field,
279+
value: &Value,
280+
schema_version: SchemaVersion,
281+
static_schema_flag: bool,
282+
) -> bool {
283+
match field.data_type() {
273284
DataType::Boolean => value.is_boolean(),
274-
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(),
285+
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
286+
if static_schema_flag {
287+
if let Value::String(s) = value {
288+
return s.parse::<i64>().is_ok();
289+
}
290+
}
291+
value.is_i64()
292+
}
275293
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(),
276294
DataType::Float16 | DataType::Float32 => value.is_f64(),
277-
// All numbers can be cast as Float64 from schema version v1
278-
DataType::Float64 if schema_version == SchemaVersion::V1 => value.is_number(),
279-
DataType::Float64 if schema_version != SchemaVersion::V1 => value.is_f64(),
295+
DataType::Float64 => {
296+
if static_schema_flag {
297+
if let Value::String(s) = value.clone() {
298+
return s.parse::<f64>().is_ok() || s.parse::<i64>().is_ok();
299+
}
300+
return value.is_number();
301+
}
302+
match schema_version {
303+
SchemaVersion::V1 => value.is_number(),
304+
_ => value.is_f64(),
305+
}
306+
}
280307
DataType::Utf8 => value.is_string(),
281308
DataType::List(field) => {
282-
let data_type = field.data_type();
283309
if let Value::Array(arr) = value {
284310
for elem in arr {
285311
if elem.is_null() {
286312
continue;
287313
}
288-
if !valid_type(data_type, elem, schema_version) {
314+
if !valid_type(field, elem, schema_version, static_schema_flag) {
289315
return false;
290316
}
291317
}
@@ -303,7 +329,7 @@ fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion
303329
if value.is_null() {
304330
continue;
305331
}
306-
if !valid_type(field.data_type(), value, schema_version) {
332+
if !valid_type(field, value, schema_version, static_schema_flag) {
307333
return false;
308334
}
309335
} else {
@@ -317,7 +343,11 @@ fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion
317343
}
318344
DataType::Timestamp(_, _) => value.is_string() || value.is_number(),
319345
_ => {
320-
error!("Unsupported datatype {:?}, value {:?}", data_type, value);
346+
error!(
347+
"Unsupported datatype {:?}, value {:?}",
348+
field.data_type(),
349+
value
350+
);
321351
unreachable!()
322352
}
323353
}

src/event/format/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ pub trait EventFormat: Sized {
102102
schema: &HashMap<String, Arc<Field>>,
103103
time_partition: Option<&String>,
104104
schema_version: SchemaVersion,
105+
static_schema_flag: bool,
105106
) -> Result<(Self::Data, EventSchema, bool), AnyError>;
106107

107108
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
@@ -117,8 +118,12 @@ pub trait EventFormat: Sized {
117118
schema_version: SchemaVersion,
118119
) -> Result<(RecordBatch, bool), AnyError> {
119120
let p_timestamp = self.get_p_timestamp();
120-
let (data, mut schema, is_first) =
121-
self.to_data(storage_schema, time_partition, schema_version)?;
121+
let (data, mut schema, is_first) = self.to_data(
122+
storage_schema,
123+
time_partition,
124+
schema_version,
125+
static_schema_flag,
126+
)?;
122127

123128
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
124129
return Err(anyhow!(

0 commit comments

Comments
 (0)