Skip to content

Commit 1561742

Browse files
author
Devdutt Shenoi
committed
refactor: simplification of static schema handling
1 parent 523ecc7 commit 1561742

File tree

2 files changed

+111
-156
lines changed

2 files changed

+111
-156
lines changed

src/parseable/mod.rs

Lines changed: 23 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use crate::{
4747
},
4848
metadata::{LogStreamMetadata, SchemaVersion},
4949
option::Mode,
50-
static_schema::{convert_static_schema_to_arrow_schema, StaticSchema},
50+
static_schema::StaticSchema,
5151
storage::{
5252
object_storage::parseable_json_path, ObjectStorageError, ObjectStorageProvider,
5353
ObjectStoreFormat, Owner, Permisssion, StreamType,
@@ -485,13 +485,28 @@ impl Parseable {
485485
}
486486
}
487487

488-
let schema = validate_static_schema(
489-
body,
490-
stream_name,
491-
&time_partition,
492-
custom_partition.as_ref(),
493-
static_schema_flag,
494-
)?;
488+
let schema = if static_schema_flag {
489+
if body.is_empty() {
490+
return Err(CreateStreamError::Custom {
491+
msg: format!(
492+
"Please provide schema in the request body for static schema logstream {stream_name}"
493+
),
494+
status: StatusCode::BAD_REQUEST,
495+
}.into());
496+
}
497+
498+
let static_schema: StaticSchema = serde_json::from_slice(body)?;
499+
static_schema
500+
.convert_to_arrow_schema(&time_partition, custom_partition.as_ref())
501+
.map_err(|_| CreateStreamError::Custom {
502+
msg: format!(
503+
"Unable to commit static schema, logstream {stream_name} not created"
504+
),
505+
status: StatusCode::BAD_REQUEST,
506+
})?
507+
} else {
508+
Arc::new(Schema::empty())
509+
};
495510

496511
self.create_stream(
497512
stream_name.to_string(),
@@ -778,37 +793,6 @@ impl Parseable {
778793
}
779794
}
780795

781-
pub fn validate_static_schema(
782-
body: &Bytes,
783-
stream_name: &str,
784-
time_partition: &str,
785-
custom_partition: Option<&String>,
786-
static_schema_flag: bool,
787-
) -> Result<Arc<Schema>, CreateStreamError> {
788-
if !static_schema_flag {
789-
return Ok(Arc::new(Schema::empty()));
790-
}
791-
792-
if body.is_empty() {
793-
return Err(CreateStreamError::Custom {
794-
msg: format!(
795-
"Please provide schema in the request body for static schema logstream {stream_name}"
796-
),
797-
status: StatusCode::BAD_REQUEST,
798-
});
799-
}
800-
801-
let static_schema: StaticSchema = serde_json::from_slice(body)?;
802-
let parsed_schema =
803-
convert_static_schema_to_arrow_schema(static_schema, time_partition, custom_partition)
804-
.map_err(|_| CreateStreamError::Custom {
805-
msg: format!("Unable to commit static schema, logstream {stream_name} not created"),
806-
status: StatusCode::BAD_REQUEST,
807-
})?;
808-
809-
Ok(parsed_schema)
810-
}
811-
812796
pub fn validate_time_partition_limit(
813797
time_partition_limit: &str,
814798
) -> Result<NonZeroU32, CreateStreamError> {

src/static_schema.rs

Lines changed: 88 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,100 @@ pub struct StaticSchema {
2929
fields: Vec<SchemaFields>,
3030
}
3131

32+
impl StaticSchema {
33+
pub fn convert_to_arrow_schema(
34+
self,
35+
time_partition: &str,
36+
custom_partition: Option<&String>,
37+
) -> Result<Arc<Schema>, AnyError> {
38+
let mut fields = Vec::new();
39+
let mut time_partition_exists = false;
40+
41+
if let Some(custom_partition) = custom_partition {
42+
for partition in custom_partition.split(',') {
43+
if !self.fields.iter().any(|field| field.name == partition) {
44+
return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream"));
45+
}
46+
}
47+
}
48+
for mut field in self.fields {
49+
if !time_partition.is_empty() && field.name == time_partition {
50+
time_partition_exists = true;
51+
field.data_type = "datetime".to_string();
52+
}
53+
54+
let parsed_field = Fields {
55+
name: field.name.clone(),
56+
57+
data_type: {
58+
match field.data_type.as_str() {
59+
"int" => DataType::Int64,
60+
"double" | "float" => DataType::Float64,
61+
"boolean" => DataType::Boolean,
62+
"string" => DataType::Utf8,
63+
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None),
64+
"string_list" => {
65+
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))
66+
}
67+
"int_list" => {
68+
DataType::List(Arc::new(Field::new("item", DataType::Int64, true)))
69+
}
70+
"double_list" | "float_list" => {
71+
DataType::List(Arc::new(Field::new("item", DataType::Float64, true)))
72+
}
73+
"boolean_list" => {
74+
DataType::List(Arc::new(Field::new("item", DataType::Boolean, true)))
75+
}
76+
_ => DataType::Null,
77+
}
78+
},
79+
nullable: default_nullable(),
80+
dict_id: default_dict_id(),
81+
dict_is_ordered: default_dict_is_ordered(),
82+
metadata: HashMap::new(),
83+
};
84+
85+
fields.push(parsed_field);
86+
}
87+
88+
if !time_partition.is_empty() && !time_partition_exists {
89+
return Err(anyhow!("time partition field {time_partition} does not exist in the schema for the static schema logstream"));
90+
}
91+
92+
let mut schema: Vec<Arc<Field>> = Vec::new();
93+
for field in fields {
94+
let field = Field::new(field.name.clone(), field.data_type.clone(), field.nullable);
95+
schema.push(Arc::new(field));
96+
}
97+
98+
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
99+
return Err(anyhow!(
100+
"field {} is a reserved field",
101+
DEFAULT_TIMESTAMP_KEY
102+
));
103+
};
104+
105+
// add the p_timestamp field to the event schema to the 0th index
106+
schema.insert(
107+
0,
108+
Arc::new(Field::new(
109+
DEFAULT_TIMESTAMP_KEY,
110+
DataType::Timestamp(TimeUnit::Millisecond, None),
111+
true,
112+
)),
113+
);
114+
115+
// prepare the record batch and new fields to be added
116+
Ok(Arc::new(Schema::new(schema)))
117+
}
118+
}
119+
32120
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
33121
pub struct SchemaFields {
34122
name: String,
35123
data_type: String,
36124
}
37125

38-
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
39-
#[serde(rename_all = "camelCase")]
40-
pub struct ParsedSchema {
41-
pub fields: Vec<Fields>,
42-
pub metadata: HashMap<String, String>,
43-
}
44-
45126
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
46127
#[serde(rename_all = "camelCase")]
47128
pub struct Fields {
@@ -56,116 +137,6 @@ pub struct Fields {
56137
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
57138

58139
pub struct Metadata {}
59-
pub fn convert_static_schema_to_arrow_schema(
60-
static_schema: StaticSchema,
61-
time_partition: &str,
62-
custom_partition: Option<&String>,
63-
) -> Result<Arc<Schema>, AnyError> {
64-
let mut parsed_schema = ParsedSchema {
65-
fields: Vec::new(),
66-
metadata: HashMap::new(),
67-
};
68-
let mut time_partition_exists = false;
69-
70-
if let Some(custom_partition) = custom_partition {
71-
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
72-
let mut custom_partition_exists = HashMap::with_capacity(custom_partition_list.len());
73-
74-
for partition in &custom_partition_list {
75-
if static_schema
76-
.fields
77-
.iter()
78-
.any(|field| &field.name == partition)
79-
{
80-
custom_partition_exists.insert(partition.to_string(), true);
81-
}
82-
}
83-
84-
for partition in &custom_partition_list {
85-
if !custom_partition_exists.contains_key(*partition) {
86-
return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream"));
87-
}
88-
}
89-
}
90-
for mut field in static_schema.fields {
91-
if !time_partition.is_empty() && field.name == time_partition {
92-
time_partition_exists = true;
93-
field.data_type = "datetime".to_string();
94-
}
95-
96-
let parsed_field = Fields {
97-
name: field.name.clone(),
98-
99-
data_type: {
100-
match field.data_type.as_str() {
101-
"int" => DataType::Int64,
102-
"double" | "float" => DataType::Float64,
103-
"boolean" => DataType::Boolean,
104-
"string" => DataType::Utf8,
105-
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None),
106-
"string_list" => {
107-
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))
108-
}
109-
"int_list" => {
110-
DataType::List(Arc::new(Field::new("item", DataType::Int64, true)))
111-
}
112-
"double_list" | "float_list" => {
113-
DataType::List(Arc::new(Field::new("item", DataType::Float64, true)))
114-
}
115-
"boolean_list" => {
116-
DataType::List(Arc::new(Field::new("item", DataType::Boolean, true)))
117-
}
118-
_ => DataType::Null,
119-
}
120-
},
121-
nullable: default_nullable(),
122-
dict_id: default_dict_id(),
123-
dict_is_ordered: default_dict_is_ordered(),
124-
metadata: HashMap::new(),
125-
};
126-
127-
parsed_schema.fields.push(parsed_field);
128-
}
129-
if !time_partition.is_empty() && !time_partition_exists {
130-
return Err(anyhow! {
131-
format!(
132-
"time partition field {time_partition} does not exist in the schema for the static schema logstream"
133-
),
134-
});
135-
}
136-
add_parseable_fields_to_static_schema(parsed_schema)
137-
}
138-
139-
fn add_parseable_fields_to_static_schema(
140-
parsed_schema: ParsedSchema,
141-
) -> Result<Arc<Schema>, AnyError> {
142-
let mut schema: Vec<Arc<Field>> = Vec::new();
143-
for field in parsed_schema.fields.iter() {
144-
let field = Field::new(field.name.clone(), field.data_type.clone(), field.nullable);
145-
schema.push(Arc::new(field));
146-
}
147-
148-
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
149-
return Err(anyhow!(
150-
"field {} is a reserved field",
151-
DEFAULT_TIMESTAMP_KEY
152-
));
153-
};
154-
155-
// add the p_timestamp field to the event schema to the 0th index
156-
schema.insert(
157-
0,
158-
Arc::new(Field::new(
159-
DEFAULT_TIMESTAMP_KEY,
160-
DataType::Timestamp(TimeUnit::Millisecond, None),
161-
true,
162-
)),
163-
);
164-
165-
// prepare the record batch and new fields to be added
166-
let schema = Arc::new(Schema::new(schema));
167-
Ok(schema)
168-
}
169140

170141
fn default_nullable() -> bool {
171142
true

0 commit comments

Comments
 (0)