@@ -27,68 +27,89 @@ extern crate uuid;
2727
2828use apache_avro:: types:: Value ;
2929use apache_avro:: { to_avro_datum, Decimal , Schema as ApacheSchema } ;
30- use arrow_avro:: { reader:: ReaderBuilder , schema:: Schema as AvroSchema } ;
30+ use arrow_avro:: schema:: { Fingerprint , SINGLE_OBJECT_MAGIC } ;
31+ use arrow_avro:: { reader:: ReaderBuilder , schema:: AvroSchema } ;
3132use criterion:: { criterion_group, criterion_main, BatchSize , BenchmarkId , Criterion , Throughput } ;
3233use once_cell:: sync:: Lazy ;
33- use std:: { hint:: black_box, io , time:: Duration } ;
34+ use std:: { hint:: black_box, time:: Duration } ;
3435use uuid:: Uuid ;
3536
36- fn encode_records ( schema : & ApacheSchema , rows : impl Iterator < Item = Value > ) -> Vec < u8 > {
37+ fn make_prefix ( fp : Fingerprint ) -> [ u8 ; 10 ] {
38+ let Fingerprint :: Rabin ( val) = fp;
39+ let mut buf = [ 0u8 ; 10 ] ;
40+ buf[ ..2 ] . copy_from_slice ( & SINGLE_OBJECT_MAGIC ) ; // C3 01
41+ buf[ 2 ..] . copy_from_slice ( & val. to_le_bytes ( ) ) ; // little‑endian 64‑bit
42+ buf
43+ }
44+
45+ fn encode_records_with_prefix (
46+ schema : & ApacheSchema ,
47+ prefix : & [ u8 ] ,
48+ rows : impl Iterator < Item = Value > ,
49+ ) -> Vec < u8 > {
3750 let mut out = Vec :: new ( ) ;
3851 for v in rows {
52+ out. extend_from_slice ( prefix) ;
3953 out. extend_from_slice ( & to_avro_datum ( schema, v) . expect ( "encode datum failed" ) ) ;
4054 }
4155 out
4256}
4357
44- fn gen_int ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
45- encode_records (
58+ fn gen_int ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
59+ encode_records_with_prefix (
4660 sc,
61+ prefix,
4762 ( 0 ..n) . map ( |i| Value :: Record ( vec ! [ ( "field1" . into( ) , Value :: Int ( i as i32 ) ) ] ) ) ,
4863 )
4964}
5065
51- fn gen_long ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
52- encode_records (
66+ fn gen_long ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
67+ encode_records_with_prefix (
5368 sc,
69+ prefix,
5470 ( 0 ..n) . map ( |i| Value :: Record ( vec ! [ ( "field1" . into( ) , Value :: Long ( i as i64 ) ) ] ) ) ,
5571 )
5672}
5773
58- fn gen_float ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
59- encode_records (
74+ fn gen_float ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
75+ encode_records_with_prefix (
6076 sc,
77+ prefix,
6178 ( 0 ..n) . map ( |i| Value :: Record ( vec ! [ ( "field1" . into( ) , Value :: Float ( i as f32 + 0.5678 ) ) ] ) ) ,
6279 )
6380}
6481
65- fn gen_bool ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
66- encode_records (
82+ fn gen_bool ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
83+ encode_records_with_prefix (
6784 sc,
85+ prefix,
6886 ( 0 ..n) . map ( |i| Value :: Record ( vec ! [ ( "field1" . into( ) , Value :: Boolean ( i % 2 == 0 ) ) ] ) ) ,
6987 )
7088}
7189
72- fn gen_double ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
73- encode_records (
90+ fn gen_double ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
91+ encode_records_with_prefix (
7492 sc,
93+ prefix,
7594 ( 0 ..n) . map ( |i| Value :: Record ( vec ! [ ( "field1" . into( ) , Value :: Double ( i as f64 + 0.1234 ) ) ] ) ) ,
7695 )
7796}
7897
79- fn gen_bytes ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
80- encode_records (
98+ fn gen_bytes ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
99+ encode_records_with_prefix (
81100 sc,
101+ prefix,
82102 ( 0 ..n) . map ( |i| {
83103 let payload = vec ! [ ( i & 0xFF ) as u8 ; 16 ] ;
84104 Value :: Record ( vec ! [ ( "field1" . into( ) , Value :: Bytes ( payload) ) ] )
85105 } ) ,
86106 )
87107}
88108
89- fn gen_string ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
90- encode_records (
109+ fn gen_string ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
110+ encode_records_with_prefix (
91111 sc,
112+ prefix,
92113 ( 0 ..n) . map ( |i| {
93114 let s = if i % 3 == 0 {
94115 format ! ( "value-{i}" )
@@ -100,30 +121,34 @@ fn gen_string(sc: &ApacheSchema, n: usize) -> Vec<u8> {
100121 )
101122}
102123
103- fn gen_date ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
104- encode_records (
124+ fn gen_date ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
125+ encode_records_with_prefix (
105126 sc,
127+ prefix,
106128 ( 0 ..n) . map ( |i| Value :: Record ( vec ! [ ( "field1" . into( ) , Value :: Int ( i as i32 ) ) ] ) ) ,
107129 )
108130}
109131
110- fn gen_timemillis ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
111- encode_records (
132+ fn gen_timemillis ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
133+ encode_records_with_prefix (
112134 sc,
135+ prefix,
113136 ( 0 ..n) . map ( |i| Value :: Record ( vec ! [ ( "field1" . into( ) , Value :: Int ( ( i * 37 ) as i32 ) ) ] ) ) ,
114137 )
115138}
116139
117- fn gen_timemicros ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
118- encode_records (
140+ fn gen_timemicros ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
141+ encode_records_with_prefix (
119142 sc,
143+ prefix,
120144 ( 0 ..n) . map ( |i| Value :: Record ( vec ! [ ( "field1" . into( ) , Value :: Long ( ( i * 1_001 ) as i64 ) ) ] ) ) ,
121145 )
122146}
123147
124- fn gen_ts_millis ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
125- encode_records (
148+ fn gen_ts_millis ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
149+ encode_records_with_prefix (
126150 sc,
151+ prefix,
127152 ( 0 ..n) . map ( |i| {
128153 Value :: Record ( vec ! [ (
129154 "field1" . into( ) ,
@@ -133,9 +158,10 @@ fn gen_ts_millis(sc: &ApacheSchema, n: usize) -> Vec<u8> {
133158 )
134159}
135160
136- fn gen_ts_micros ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
137- encode_records (
161+ fn gen_ts_micros ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
162+ encode_records_with_prefix (
138163 sc,
164+ prefix,
139165 ( 0 ..n) . map ( |i| {
140166 Value :: Record ( vec ! [ (
141167 "field1" . into( ) ,
@@ -145,10 +171,11 @@ fn gen_ts_micros(sc: &ApacheSchema, n: usize) -> Vec<u8> {
145171 )
146172}
147173
148- fn gen_map ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
174+ fn gen_map ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
149175 use std:: collections:: HashMap ;
150- encode_records (
176+ encode_records_with_prefix (
151177 sc,
178+ prefix,
152179 ( 0 ..n) . map ( |i| {
153180 let mut m = HashMap :: new ( ) ;
154181 let int_val = |v : i32 | Value :: Union ( 0 , Box :: new ( Value :: Int ( v) ) ) ;
@@ -165,9 +192,10 @@ fn gen_map(sc: &ApacheSchema, n: usize) -> Vec<u8> {
165192 )
166193}
167194
168- fn gen_array ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
169- encode_records (
195+ fn gen_array ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
196+ encode_records_with_prefix (
170197 sc,
198+ prefix,
171199 ( 0 ..n) . map ( |i| {
172200 let items = ( 0 ..5 ) . map ( |j| Value :: Int ( i as i32 + j) ) . collect ( ) ;
173201 Value :: Record ( vec ! [ ( "field1" . into( ) , Value :: Array ( items) ) ] )
@@ -189,9 +217,10 @@ fn trim_i128_be(v: i128) -> Vec<u8> {
189217 full[ first..] . to_vec ( )
190218}
191219
192- fn gen_decimal ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
193- encode_records (
220+ fn gen_decimal ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
221+ encode_records_with_prefix (
194222 sc,
223+ prefix,
195224 ( 0 ..n) . map ( |i| {
196225 let unscaled = if i % 2 == 0 { i as i128 } else { -( i as i128 ) } ;
197226 Value :: Record ( vec ! [ (
@@ -202,9 +231,10 @@ fn gen_decimal(sc: &ApacheSchema, n: usize) -> Vec<u8> {
202231 )
203232}
204233
205- fn gen_uuid ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
206- encode_records (
234+ fn gen_uuid ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
235+ encode_records_with_prefix (
207236 sc,
237+ prefix,
208238 ( 0 ..n) . map ( |i| {
209239 let mut raw = ( i as u128 ) . to_be_bytes ( ) ;
210240 raw[ 6 ] = ( raw[ 6 ] & 0x0F ) | 0x40 ;
@@ -214,9 +244,10 @@ fn gen_uuid(sc: &ApacheSchema, n: usize) -> Vec<u8> {
214244 )
215245}
216246
217- fn gen_fixed ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
218- encode_records (
247+ fn gen_fixed ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
248+ encode_records_with_prefix (
219249 sc,
250+ prefix,
220251 ( 0 ..n) . map ( |i| {
221252 let mut buf = vec ! [ 0u8 ; 16 ] ;
222253 buf[ ..8 ] . copy_from_slice ( & ( i as u64 ) . to_be_bytes ( ) ) ;
@@ -225,9 +256,10 @@ fn gen_fixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
225256 )
226257}
227258
228- fn gen_interval ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
229- encode_records (
259+ fn gen_interval ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
260+ encode_records_with_prefix (
230261 sc,
262+ prefix,
231263 ( 0 ..n) . map ( |i| {
232264 let months = ( i % 24 ) as u32 ;
233265 let days = ( i % 32 ) as u32 ;
@@ -241,10 +273,11 @@ fn gen_interval(sc: &ApacheSchema, n: usize) -> Vec<u8> {
241273 )
242274}
243275
244- fn gen_enum ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
276+ fn gen_enum ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
245277 const SYMBOLS : [ & str ; 3 ] = [ "A" , "B" , "C" ] ;
246- encode_records (
278+ encode_records_with_prefix (
247279 sc,
280+ prefix,
248281 ( 0 ..n) . map ( |i| {
249282 let idx = i % 3 ;
250283 Value :: Record ( vec ! [ (
@@ -255,9 +288,10 @@ fn gen_enum(sc: &ApacheSchema, n: usize) -> Vec<u8> {
255288 )
256289}
257290
258- fn gen_mixed ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
259- encode_records (
291+ fn gen_mixed ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
292+ encode_records_with_prefix (
260293 sc,
294+ prefix,
261295 ( 0 ..n) . map ( |i| {
262296 Value :: Record ( vec ! [
263297 ( "f1" . into( ) , Value :: Int ( i as i32 ) ) ,
@@ -269,9 +303,10 @@ fn gen_mixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
269303 )
270304}
271305
272- fn gen_nested ( sc : & ApacheSchema , n : usize ) -> Vec < u8 > {
273- encode_records (
306+ fn gen_nested ( sc : & ApacheSchema , n : usize , prefix : & [ u8 ] ) -> Vec < u8 > {
307+ encode_records_with_prefix (
274308 sc,
309+ prefix,
275310 ( 0 ..n) . map ( |i| {
276311 let sub = Value :: Record ( vec ! [
277312 ( "x" . into( ) , Value :: Int ( i as i32 ) ) ,
@@ -290,12 +325,14 @@ fn new_decoder(
290325 batch_size : usize ,
291326 utf8view : bool ,
292327) -> arrow_avro:: reader:: Decoder {
293- let schema: AvroSchema < ' static > = serde_json:: from_str ( schema_json) . unwrap ( ) ;
328+ let schema = AvroSchema :: new ( schema_json. parse ( ) . unwrap ( ) ) ;
329+ let mut store = arrow_avro:: schema:: SchemaStore :: new ( ) ;
330+ store. register ( schema. clone ( ) ) . unwrap ( ) ;
294331 ReaderBuilder :: new ( )
295- . with_schema ( schema )
332+ . with_writer_schema_store ( store )
296333 . with_batch_size ( batch_size)
297334 . with_utf8_view ( utf8view)
298- . build_decoder ( io :: empty ( ) )
335+ . build_decoder ( )
299336 . expect ( "failed to build decoder" )
300337}
301338
@@ -325,8 +362,8 @@ const ARRAY_SCHEMA: &str = r#"{"type":"record","name":"ArrRec","fields":[{"name"
325362const DECIMAL_SCHEMA : & str = r#"{"type":"record","name":"DecRec","fields":[{"name":"field1","type":{"type":"bytes","logicalType":"decimal","precision":10,"scale":3}}]}"# ;
326363const UUID_SCHEMA : & str = r#"{"type":"record","name":"UuidRec","fields":[{"name":"field1","type":{"type":"string","logicalType":"uuid"}}]}"# ;
327364const FIXED_SCHEMA : & str = r#"{"type":"record","name":"FixRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Fixed16","size":16}}]}"# ;
328- const INTERVAL_SCHEMA_ENCODE : & str = r#"{"type":"record","name":"DurRecEnc","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12}}]}"# ;
329365const INTERVAL_SCHEMA : & str = r#"{"type":"record","name":"DurRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12,"logicalType":"duration"}}]}"# ;
366+ const INTERVAL_SCHEMA_ENCODE : & str = r#"{"type":"record","name":"DurRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12}}]}"# ;
330367const ENUM_SCHEMA : & str = r#"{"type":"record","name":"EnumRec","fields":[{"name":"field1","type":{"type":"enum","name":"MyEnum","symbols":["A","B","C"]}}]}"# ;
331368const MIX_SCHEMA : & str = r#"{"type":"record","name":"MixRec","fields":[{"name":"f1","type":"int"},{"name":"f2","type":"long"},{"name":"f3","type":"string"},{"name":"f4","type":"double"}]}"# ;
332369const NEST_SCHEMA : & str = r#"{"type":"record","name":"NestRec","fields":[{"name":"sub","type":{"type":"record","name":"Sub","fields":[{"name":"x","type":"int"},{"name":"y","type":"string"}]}}]}"# ;
@@ -336,7 +373,13 @@ macro_rules! dataset {
336373 static $name: Lazy <Vec <Vec <u8 >>> = Lazy :: new( || {
337374 let schema =
338375 ApacheSchema :: parse_str( $schema_json) . expect( "invalid schema for generator" ) ;
339- SIZES . iter( ) . map( |& n| $gen_fn( & schema, n) ) . collect( )
376+ let arrow_schema = AvroSchema :: new( $schema_json. to_string( ) ) ;
377+ let fingerprint = arrow_schema. fingerprint( ) . expect( "fingerprint failed" ) ;
378+ let prefix = make_prefix( fingerprint) ;
379+ SIZES
380+ . iter( )
381+ . map( |& n| $gen_fn( & schema, n, & prefix) )
382+ . collect( )
340383 } ) ;
341384 } ;
342385}
@@ -406,6 +449,14 @@ fn bench_scenario(
406449
407450fn criterion_benches ( c : & mut Criterion ) {
408451 for & batch_size in & [ SMALL_BATCH , LARGE_BATCH ] {
452+ bench_scenario (
453+ c,
454+ "Interval" ,
455+ INTERVAL_SCHEMA ,
456+ & INTERVAL_DATA ,
457+ false ,
458+ batch_size,
459+ ) ;
409460 bench_scenario ( c, "Int32" , INT_SCHEMA , & INT_DATA , false , batch_size) ;
410461 bench_scenario ( c, "Int64" , LONG_SCHEMA , & LONG_DATA , false , batch_size) ;
411462 bench_scenario ( c, "Float32" , FLOAT_SCHEMA , & FLOAT_DATA , false , batch_size) ;
@@ -480,14 +531,6 @@ fn criterion_benches(c: &mut Criterion) {
480531 false ,
481532 batch_size,
482533 ) ;
483- bench_scenario (
484- c,
485- "Interval" ,
486- INTERVAL_SCHEMA ,
487- & INTERVAL_DATA ,
488- false ,
489- batch_size,
490- ) ;
491534 bench_scenario (
492535 c,
493536 "Enum(Dictionary)" ,
0 commit comments