@@ -24,7 +24,7 @@ use std::{
24
24
sync:: Arc ,
25
25
} ;
26
26
27
- use anyhow:: { anyhow, Error as AnyError } ;
27
+ use anyhow:: anyhow;
28
28
use arrow_array:: RecordBatch ;
29
29
use arrow_schema:: { DataType , Field , Schema , TimeUnit } ;
30
30
use chrono:: { DateTime , Utc } ;
@@ -119,14 +119,14 @@ pub trait EventFormat: Sized {
119
119
schema_version : SchemaVersion ,
120
120
) -> anyhow:: Result < ( EventSchema , IsFirstEvent ) > ;
121
121
122
- fn decode ( data : Self :: Data , schema : Arc < Schema > ) -> Result < RecordBatch , AnyError > ;
122
+ fn decode ( data : Self :: Data , schema : Arc < Schema > ) -> anyhow :: Result < RecordBatch > ;
123
123
124
124
/// Updates inferred schema with `p_timestamp` field and ensures it adheres to expectations
125
125
fn prepare_and_validate_schema (
126
126
mut schema : EventSchema ,
127
127
storage_schema : & HashMap < String , Arc < Field > > ,
128
128
static_schema_flag : bool ,
129
- ) -> Result < EventSchema , AnyError > {
129
+ ) -> anyhow :: Result < EventSchema > {
130
130
if get_field ( & schema, DEFAULT_TIMESTAMP_KEY ) . is_some ( ) {
131
131
return Err ( anyhow ! ( "field {DEFAULT_TIMESTAMP_KEY} is a reserved field" , ) ) ;
132
132
}
@@ -160,7 +160,7 @@ pub trait EventFormat: Sized {
160
160
schema : & EventSchema ,
161
161
time_partition : Option < & String > ,
162
162
schema_version : SchemaVersion ,
163
- ) -> Result < RecordBatch , AnyError > {
163
+ ) -> anyhow :: Result < RecordBatch > {
164
164
// prepare the record batch and new fields to be added
165
165
let mut new_schema = Arc :: new ( Schema :: new ( schema. clone ( ) ) ) ;
166
166
new_schema =
@@ -176,7 +176,7 @@ pub trait EventFormat: Sized {
176
176
Ok ( rb)
177
177
}
178
178
179
- fn into_event ( self , stream : & Stream ) -> Result < Event , AnyError > ;
179
+ fn into_event ( self , stream : & Stream ) -> anyhow :: Result < Event > ;
180
180
}
181
181
182
182
pub fn get_existing_field_names (
@@ -234,7 +234,7 @@ pub fn update_field_type_in_schema(
234
234
inferred_schema : Arc < Schema > ,
235
235
existing_schema : Option < & HashMap < String , Arc < Field > > > ,
236
236
time_partition : Option < & String > ,
237
- log_records : Option < & [ Json ] > ,
237
+ log_records : Option < & Json > ,
238
238
schema_version : SchemaVersion ,
239
239
) -> Arc < Schema > {
240
240
let mut updated_schema = inferred_schema. clone ( ) ;
@@ -245,11 +245,9 @@ pub fn update_field_type_in_schema(
245
245
updated_schema = override_existing_timestamp_fields ( existing_schema, updated_schema) ;
246
246
}
247
247
248
- if let Some ( log_records) = log_records {
249
- for log_record in log_records {
250
- updated_schema =
251
- override_data_type ( updated_schema. clone ( ) , log_record. clone ( ) , schema_version) ;
252
- }
248
+ if let Some ( log_record) = log_records {
249
+ updated_schema =
250
+ override_data_type ( updated_schema. clone ( ) , log_record. clone ( ) , schema_version) ;
253
251
}
254
252
255
253
let Some ( time_partition) = time_partition else {
0 commit comments