16
16
*
17
17
*/
18
18
19
- use crate :: event:: DEFAULT_TIMESTAMP_KEY ;
20
- use crate :: utils:: arrow:: get_field;
21
- use anyhow:: { anyhow, Error as AnyError } ;
22
- use serde:: { Deserialize , Serialize } ;
23
- use std:: str;
19
+ use std:: { collections:: HashMap , sync:: Arc } ;
24
20
25
21
use arrow_schema:: { DataType , Field , Schema , TimeUnit } ;
26
- use std:: { collections:: HashMap , sync:: Arc } ;
22
+ use serde:: { Deserialize , Serialize } ;
23
+
24
+ use crate :: { event:: DEFAULT_TIMESTAMP_KEY , utils:: arrow:: get_field} ;
25
+
27
26
#[ derive( Debug , Clone , PartialEq , Serialize , Deserialize ) ]
28
27
pub struct StaticSchema {
29
28
fields : Vec < SchemaFields > ,
30
29
}
31
30
31
+ #[ derive( Debug , thiserror:: Error ) ]
32
+ pub enum StaticSchemaError {
33
+ #[ error(
34
+ "custom partition field {0} does not exist in the schema for the static schema logstream"
35
+ ) ]
36
+ MissingCustomPartition ( String ) ,
37
+ #[ error(
38
+ "time partition field {0} does not exist in the schema for the static schema logstream"
39
+ ) ]
40
+ MissingTimePartition ( String ) ,
41
+ #[ error( "field {DEFAULT_TIMESTAMP_KEY} is a reserved field" ) ]
42
+ MissingDefaultTimePartition ,
43
+ }
44
+
32
45
impl StaticSchema {
33
46
pub fn convert_to_arrow_schema (
34
47
self ,
35
48
time_partition : & str ,
36
49
custom_partition : Option < & String > ,
37
- ) -> Result < Arc < Schema > , AnyError > {
50
+ ) -> Result < Arc < Schema > , StaticSchemaError > {
38
51
let mut fields = Vec :: new ( ) ;
39
52
let mut time_partition_exists = false ;
40
53
41
54
if let Some ( custom_partition) = custom_partition {
42
55
for partition in custom_partition. split ( ',' ) {
43
56
if !self . fields . iter ( ) . any ( |field| field. name == partition) {
44
- return Err ( anyhow ! ( "custom partition field {partition} does not exist in the schema for the static schema logstream" ) ) ;
57
+ return Err ( StaticSchemaError :: MissingCustomPartition (
58
+ partition. to_owned ( ) ,
59
+ ) ) ;
45
60
}
46
61
}
47
62
}
@@ -86,7 +101,9 @@ impl StaticSchema {
86
101
}
87
102
88
103
if !time_partition. is_empty ( ) && !time_partition_exists {
89
- return Err ( anyhow ! ( "time partition field {time_partition} does not exist in the schema for the static schema logstream" ) ) ;
104
+ return Err ( StaticSchemaError :: MissingTimePartition (
105
+ time_partition. to_owned ( ) ,
106
+ ) ) ;
90
107
}
91
108
92
109
let mut schema: Vec < Arc < Field > > = Vec :: new ( ) ;
@@ -96,10 +113,7 @@ impl StaticSchema {
96
113
}
97
114
98
115
if get_field ( & schema, DEFAULT_TIMESTAMP_KEY ) . is_some ( ) {
99
- return Err ( anyhow ! (
100
- "field {} is a reserved field" ,
101
- DEFAULT_TIMESTAMP_KEY
102
- ) ) ;
116
+ return Err ( StaticSchemaError :: MissingDefaultTimePartition ) ;
103
117
} ;
104
118
105
119
// add the p_timestamp field to the event schema to the 0th index
0 commit comments