18
18
19
19
use crate :: event:: DEFAULT_TIMESTAMP_KEY ;
20
20
use crate :: utils:: arrow:: get_field;
21
- use anyhow:: { anyhow, Error as AnyError } ;
22
21
use serde:: { Deserialize , Serialize } ;
23
22
use std:: str;
24
23
@@ -60,7 +59,7 @@ pub fn convert_static_schema_to_arrow_schema(
60
59
static_schema : StaticSchema ,
61
60
time_partition : & str ,
62
61
custom_partition : Option < & String > ,
63
- ) -> Result < Arc < Schema > , AnyError > {
62
+ ) -> Result < Arc < Schema > , StaticSchemaError > {
64
63
let mut parsed_schema = ParsedSchema {
65
64
fields : Vec :: new ( ) ,
66
65
metadata : HashMap :: new ( ) ,
@@ -83,7 +82,7 @@ pub fn convert_static_schema_to_arrow_schema(
83
82
84
83
for partition in & custom_partition_list {
85
84
if !custom_partition_exists. contains_key ( * partition) {
86
- return Err ( anyhow ! ( "custom partition field {partition} does not exist in the schema for the static schema logstream" ) ) ;
85
+ return Err ( StaticSchemaError :: MissingCustomPartition ( partition. to_string ( ) ) ) ;
87
86
}
88
87
}
89
88
}
@@ -132,18 +131,14 @@ pub fn convert_static_schema_to_arrow_schema(
132
131
parsed_schema. fields . push ( parsed_field) ;
133
132
}
134
133
if !time_partition. is_empty ( ) && !time_partition_exists {
135
- return Err ( anyhow ! {
136
- format!(
137
- "time partition field {time_partition} does not exist in the schema for the static schema logstream"
138
- ) ,
139
- } ) ;
134
+ return Err ( StaticSchemaError :: MissingTimePartition ( time_partition. to_string ( ) ) ) ;
140
135
}
141
136
add_parseable_fields_to_static_schema ( parsed_schema)
142
137
}
143
138
144
139
fn add_parseable_fields_to_static_schema (
145
140
parsed_schema : ParsedSchema ,
146
- ) -> Result < Arc < Schema > , AnyError > {
141
+ ) -> Result < Arc < Schema > , StaticSchemaError > {
147
142
148
143
let mut schema: Vec < Arc < Field > > = Vec :: new ( ) ;
149
144
for field in parsed_schema. fields . iter ( ) {
@@ -152,10 +147,7 @@ fn add_parseable_fields_to_static_schema(
152
147
}
153
148
154
149
if get_field ( & schema, DEFAULT_TIMESTAMP_KEY ) . is_some ( ) {
155
- return Err ( anyhow ! (
156
- "field {} is a reserved field" ,
157
- DEFAULT_TIMESTAMP_KEY
158
- ) ) ;
150
+ return Err ( StaticSchemaError :: DefaultTime ) ;
159
151
} ;
160
152
161
153
// add the p_timestamp field to the event schema to the 0th index
@@ -183,19 +175,43 @@ fn default_dict_is_ordered() -> bool {
183
175
false
184
176
}
185
177
186
- fn validate_field_names ( field_name : & str , existing_fields : & mut HashSet < String > ) -> Result < ( ) , AnyError > {
178
+ fn validate_field_names ( field_name : & str , existing_fields : & mut HashSet < String > ) -> Result < ( ) , StaticSchemaError > {
187
179
188
180
if field_name. is_empty ( ) {
189
- return Err ( anyhow ! ( "field names should not be empty" ) ) ;
181
+ return Err ( StaticSchemaError :: EmptyFieldName ) ;
190
182
}
191
183
192
184
if !existing_fields. insert ( field_name. to_string ( ) ) {
193
- return Err ( anyhow ! ( "duplicate field name: {}" , field_name) ) ;
185
+ return Err ( StaticSchemaError :: DuplicateField ( field_name. to_string ( ) ) ) ;
194
186
}
195
187
196
188
Ok ( ( ) )
197
189
}
198
190
191
+
192
+ #[ derive( Debug , thiserror:: Error ) ]
193
+ pub enum StaticSchemaError {
194
+
195
+ #[ error(
196
+ "custom partition field {0} does not exist in the schema for the static schema logstream"
197
+ ) ]
198
+ MissingCustomPartition ( String ) ,
199
+
200
+ #[ error(
201
+ "time partition field {0} does not exist in the schema for the static schema logstream"
202
+ ) ]
203
+ MissingTimePartition ( String ) ,
204
+
205
+ #[ error( "field {DEFAULT_TIMESTAMP_KEY:?} is a reserved field" ) ]
206
+ DefaultTime ,
207
+
208
+ #[ error( "field name cannot be empty" ) ]
209
+ EmptyFieldName ,
210
+
211
+ #[ error( "duplicate field name: {0}" ) ]
212
+ DuplicateField ( String ) ,
213
+ }
214
+
199
215
#[ cfg( test) ]
200
216
mod tests {
201
217
use super :: * ;
0 commit comments