From 225c3e5c04ac385a935689b64932918f692d633e Mon Sep 17 00:00:00 2001 From: 7h3cyb3rm0nk Date: Fri, 21 Feb 2025 22:19:09 +0530 Subject: [PATCH 1/3] fix: field name validation before processing static schema --- src/static_schema.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/static_schema.rs b/src/static_schema.rs index 5b1a5cada..d9b9a479c 100644 --- a/src/static_schema.rs +++ b/src/static_schema.rs @@ -23,7 +23,7 @@ use serde::{Deserialize, Serialize}; use std::str; use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::{HashMap,HashSet} , sync::Arc}; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct StaticSchema { fields: Vec, @@ -87,7 +87,12 @@ pub fn convert_static_schema_to_arrow_schema( } } } + + let mut existing_field_names: HashSet = HashSet::new(); + for mut field in static_schema.fields { + + validate_field_names(&field.name, &mut existing_field_names)?; if !time_partition.is_empty() && field.name == time_partition { time_partition_exists = true; field.data_type = "datetime".to_string(); @@ -139,6 +144,7 @@ pub fn convert_static_schema_to_arrow_schema( fn add_parseable_fields_to_static_schema( parsed_schema: ParsedSchema, ) -> Result, AnyError> { + let mut schema: Vec> = Vec::new(); for field in parsed_schema.fields.iter() { let field = Field::new(field.name.clone(), field.data_type.clone(), field.nullable); @@ -176,3 +182,16 @@ fn default_dict_id() -> i64 { fn default_dict_is_ordered() -> bool { false } + +fn validate_field_names(field_name: &str, existing_fields: &mut HashSet) -> Result<(), AnyError> { + + if field_name.is_empty() { + return Err(anyhow!("field names should not be empty")); + } + + if !existing_fields.insert(field_name.to_string()) { + return Err(anyhow!("duplicate field name: {}", field_name)); + } + + Ok(()) +} From 98676f7ede7c627e259ebd8bacad8879c47fb63b Mon Sep 17 00:00:00 2001 From: 7h3cyb3rm0nk Date: Fri, 21 Feb 2025 22:51:33 +0530 Subject: [PATCH 2/3] add: tests for validate_field_names --- src/static_schema.rs | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/src/static_schema.rs b/src/static_schema.rs index d9b9a479c..e17eca83b 100644 --- a/src/static_schema.rs +++ b/src/static_schema.rs @@ -23,7 +23,10 @@ use serde::{Deserialize, Serialize}; use std::str; use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use std::{collections::{HashMap,HashSet} , sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct StaticSchema { fields: Vec, @@ -91,7 +94,6 @@ pub fn convert_static_schema_to_arrow_schema( let mut existing_field_names: HashSet = HashSet::new(); for mut field in static_schema.fields { - validate_field_names(&field.name, &mut existing_field_names)?; if !time_partition.is_empty() && field.name == time_partition { time_partition_exists = true; @@ -144,7 +146,6 @@ pub fn convert_static_schema_to_arrow_schema( fn add_parseable_fields_to_static_schema( parsed_schema: ParsedSchema, ) -> Result, AnyError> { - let mut schema: Vec> = Vec::new(); for field in parsed_schema.fields.iter() { let field = Field::new(field.name.clone(), field.data_type.clone(), field.nullable); @@ -183,8 +184,10 @@ fn default_dict_is_ordered() -> bool { false } -fn validate_field_names(field_name: &str, existing_fields: &mut HashSet) -> Result<(), AnyError> { - +fn validate_field_names( + field_name: &str, + existing_fields: &mut HashSet, +) -> Result<(), AnyError> { if field_name.is_empty() { return Err(anyhow!("field names should not be empty")); } @@ -195,3 +198,21 @@ fn validate_field_names(field_name: &str, existing_fields: &mut HashSet) Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashSet; + #[test] + fn empty_field_names() { + let mut existing_field_names: HashSet = HashSet::new(); + assert!(validate_field_names("", &mut existing_field_names).is_err()); + } + + #[test] + fn duplicate_field_names() { + let mut existing_field_names: HashSet = HashSet::new(); + let _ = validate_field_names("test_field", &mut existing_field_names); + assert!(validate_field_names("test_field", &mut existing_field_names).is_err()); + } +} From ec9827e7043b735bf7d8c8be866783cf8e8e09de Mon Sep 17 00:00:00 2001 From: 7h3cyb3rm0nk Date: Mon, 24 Feb 2025 23:35:14 +0530 Subject: [PATCH 3/3] refactor: change anyhow error to custom error variants --- src/static_schema.rs | 53 +++++++++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/src/static_schema.rs b/src/static_schema.rs index e17eca83b..286ec65ad 100644 --- a/src/static_schema.rs +++ b/src/static_schema.rs @@ -18,7 +18,6 @@ use crate::event::DEFAULT_TIMESTAMP_KEY; use crate::utils::arrow::get_field; -use anyhow::{anyhow, Error as AnyError}; use serde::{Deserialize, Serialize}; use std::str; @@ -27,6 +26,7 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, }; + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct StaticSchema { fields: Vec, @@ -57,13 +57,12 @@ pub struct Fields { } #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] - pub struct Metadata {} pub fn convert_static_schema_to_arrow_schema( static_schema: StaticSchema, time_partition: &str, custom_partition: Option<&String>, -) -> Result, AnyError> { +) -> Result, StaticSchemaError> { let mut parsed_schema = ParsedSchema { fields: Vec::new(), metadata: HashMap::new(), @@ -86,7 +85,9 @@ pub fn convert_static_schema_to_arrow_schema( for partition in &custom_partition_list { if !custom_partition_exists.contains_key(*partition) { - return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream")); + return Err(StaticSchemaError::MissingCustomPartition( + partition.to_string(), + )); } } } @@ -134,18 +135,16 @@ pub fn convert_static_schema_to_arrow_schema( parsed_schema.fields.push(parsed_field); } if !time_partition.is_empty() && !time_partition_exists { - return Err(anyhow! { - format!( - "time partition field {time_partition} does not exist in the schema for the static schema logstream" - ), - }); + return Err(StaticSchemaError::MissingTimePartition( + time_partition.to_string(), + )); } add_parseable_fields_to_static_schema(parsed_schema) } fn add_parseable_fields_to_static_schema( parsed_schema: ParsedSchema, -) -> Result, AnyError> { +) -> Result, StaticSchemaError> { let mut schema: Vec> = Vec::new(); for field in parsed_schema.fields.iter() { let field = Field::new(field.name.clone(), field.data_type.clone(), field.nullable); @@ -153,10 +152,7 @@ fn add_parseable_fields_to_static_schema( } if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { - return Err(anyhow!( - "field {} is a reserved field", - DEFAULT_TIMESTAMP_KEY - )); + return Err(StaticSchemaError::ReservedKey(DEFAULT_TIMESTAMP_KEY)); }; // add the p_timestamp field to the event schema to the 0th index @@ -187,22 +183,43 @@ fn default_dict_is_ordered() -> bool { fn validate_field_names( field_name: &str, existing_fields: &mut HashSet, -) -> Result<(), AnyError> { +) -> Result<(), StaticSchemaError> { if field_name.is_empty() { - return Err(anyhow!("field names should not be empty")); + return Err(StaticSchemaError::EmptyFieldName); } if !existing_fields.insert(field_name.to_string()) { - return Err(anyhow!("duplicate field name: {}", field_name)); + return Err(StaticSchemaError::DuplicateField(field_name.to_string())); } Ok(()) } +#[derive(Debug, thiserror::Error)] +pub enum StaticSchemaError { + #[error( + "custom partition field {0} does not exist in the schema for the static schema logstream" + )] + MissingCustomPartition(String), + + #[error( + "time partition field {0} does not exist in the schema for the static schema logstream" + )] + MissingTimePartition(String), + + #[error("field {0:?} is a reserved field")] + ReservedKey(&'static str), + + #[error("field name cannot be empty")] + EmptyFieldName, + + #[error("duplicate field name: {0}")] + DuplicateField(String), +} + #[cfg(test)] mod tests { use super::*; - use std::collections::HashSet; #[test] fn empty_field_names() { let mut existing_field_names: HashSet = HashSet::new();