Skip to content

Commit 07e0953

Browse files
Adds Confluent wire format handling to arrow-avro crate (#8242)
# Which issue does this PR close? - Part of #4886 - Extends work initiated in #8006 # Rationale for this change This introduces support for Confluent schema registry ID handling in the arrow-avro crate, adding compatibility with Confluent's wire format. These improvements enable streaming Apache Kafka, Redpanda, and Pulsar messages with Avro schemas directly into arrow-rs. # What changes are included in this PR? - Adds Confluent support - Adds initial support for SHA256 and MD5 algorithm types. Rabin remains the default. # Are these changes tested? Yes, existing tests are all passing, and tests for ID handling have been added. Benchmark results show no appreciable changes. # Are there any user-facing changes? - Confluent users need to provide the ID fingerprint when using the `set` method, unlike the `register` method which generates it from the schema on the fly. Existing API behavior has been maintained. - SchemaStore TryFrom now accepts a `&HashMap<Fingerprint, AvroSchema>`, rather than a `&[AvroSchema]` Huge shout out to @jecsand838 for his collaboration on this! --------- Co-authored-by: Connor Sanders <[email protected]>
1 parent 4506998 commit 07e0953

File tree

4 files changed

+634
-236
lines changed

4 files changed

+634
-236
lines changed

arrow-avro/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ default = ["deflate", "snappy", "zstd", "bzip2", "xz"]
4040
deflate = ["flate2"]
4141
snappy = ["snap", "crc"]
4242
canonical_extension_types = ["arrow-schema/canonical_extension_types"]
43+
md5 = ["dep:md5"]
44+
sha256 = ["dep:sha2"]
4345

4446
[dependencies]
4547
arrow-schema = { workspace = true }
@@ -59,6 +61,8 @@ strum_macros = "0.27"
5961
uuid = "1.17"
6062
indexmap = "2.10"
6163
rand = "0.9"
64+
md5 = { version = "0.8", optional = true }
65+
sha2 = { version = "0.10", optional = true }
6266

6367
[dev-dependencies]
6468
arrow-data = { workspace = true }

arrow-avro/benches/decoder.rs

Lines changed: 149 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,42 @@ extern crate uuid;
2727

2828
use apache_avro::types::Value;
2929
use apache_avro::{to_avro_datum, Decimal, Schema as ApacheSchema};
30-
use arrow_avro::schema::{Fingerprint, SINGLE_OBJECT_MAGIC};
30+
use arrow_avro::schema::{Fingerprint, FingerprintAlgorithm, CONFLUENT_MAGIC, SINGLE_OBJECT_MAGIC};
3131
use arrow_avro::{reader::ReaderBuilder, schema::AvroSchema};
3232
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput};
3333
use once_cell::sync::Lazy;
3434
use std::{hint::black_box, time::Duration};
3535
use uuid::Uuid;
3636

37-
fn make_prefix(fp: Fingerprint) -> [u8; 10] {
38-
let Fingerprint::Rabin(val) = fp;
39-
let mut buf = [0u8; 10];
40-
buf[..2].copy_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01
41-
buf[2..].copy_from_slice(&val.to_le_bytes()); // little‑endian 64‑bit
42-
buf
37+
fn make_prefix(fp: Fingerprint) -> Vec<u8> {
38+
match fp {
39+
Fingerprint::Rabin(val) => {
40+
let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + size_of::<u64>());
41+
buf.extend_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01
42+
buf.extend_from_slice(&val.to_le_bytes()); // little-endian
43+
buf
44+
}
45+
Fingerprint::Id(id) => {
46+
let mut buf = Vec::with_capacity(CONFLUENT_MAGIC.len() + size_of::<u32>());
47+
buf.extend_from_slice(&CONFLUENT_MAGIC); // 00
48+
buf.extend_from_slice(&id.to_be_bytes()); // big-endian
49+
buf
50+
}
51+
#[cfg(feature = "md5")]
52+
Fingerprint::MD5(val) => {
53+
let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + size_of_val(&val));
54+
buf.extend_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01
55+
buf.extend_from_slice(&val);
56+
buf
57+
}
58+
#[cfg(feature = "sha256")]
59+
Fingerprint::SHA256(val) => {
60+
let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + size_of_val(&val));
61+
buf.extend_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01
62+
buf.extend_from_slice(&val);
63+
buf
64+
}
65+
}
4366
}
4467

4568
fn encode_records_with_prefix(
@@ -336,6 +359,27 @@ fn new_decoder(
336359
.expect("failed to build decoder")
337360
}
338361

362+
fn new_decoder_id(
363+
schema_json: &'static str,
364+
batch_size: usize,
365+
utf8view: bool,
366+
id: u32,
367+
) -> arrow_avro::reader::Decoder {
368+
let schema = AvroSchema::new(schema_json.parse().unwrap());
369+
let mut store = arrow_avro::schema::SchemaStore::new_with_type(FingerprintAlgorithm::None);
370+
// Register the schema with a provided Confluent-style ID
371+
store
372+
.set(Fingerprint::Id(id), schema.clone())
373+
.expect("failed to set schema with id");
374+
ReaderBuilder::new()
375+
.with_writer_schema_store(store)
376+
.with_active_fingerprint(Fingerprint::Id(id))
377+
.with_batch_size(batch_size)
378+
.with_utf8_view(utf8view)
379+
.build_decoder()
380+
.expect("failed to build decoder for id")
381+
}
382+
339383
const SIZES: [usize; 3] = [100, 10_000, 1_000_000];
340384

341385
const INT_SCHEMA: &str =
@@ -373,7 +417,7 @@ macro_rules! dataset {
373417
static $name: Lazy<Vec<Vec<u8>>> = Lazy::new(|| {
374418
let schema =
375419
ApacheSchema::parse_str($schema_json).expect("invalid schema for generator");
376-
let arrow_schema = AvroSchema::new($schema_json.to_string());
420+
let arrow_schema = AvroSchema::new($schema_json.parse().unwrap());
377421
let fingerprint = arrow_schema.fingerprint().expect("fingerprint failed");
378422
let prefix = make_prefix(fingerprint);
379423
SIZES
@@ -384,6 +428,24 @@ macro_rules! dataset {
384428
};
385429
}
386430

431+
/// Additional helper for Confluent's ID-based wire format (00 + BE u32).
432+
macro_rules! dataset_id {
433+
($name:ident, $schema_json:expr, $gen_fn:ident, $id:expr) => {
434+
static $name: Lazy<Vec<Vec<u8>>> = Lazy::new(|| {
435+
let schema =
436+
ApacheSchema::parse_str($schema_json).expect("invalid schema for generator");
437+
let prefix = make_prefix(Fingerprint::Id($id));
438+
SIZES
439+
.iter()
440+
.map(|&n| $gen_fn(&schema, n, &prefix))
441+
.collect()
442+
});
443+
};
444+
}
445+
446+
const ID_BENCH_ID: u32 = 7;
447+
448+
dataset_id!(INT_DATA_ID, INT_SCHEMA, gen_int, ID_BENCH_ID);
387449
dataset!(INT_DATA, INT_SCHEMA, gen_int);
388450
dataset!(LONG_DATA, LONG_SCHEMA, gen_long);
389451
dataset!(FLOAT_DATA, FLOAT_SCHEMA, gen_float);
@@ -406,19 +468,20 @@ dataset!(ENUM_DATA, ENUM_SCHEMA, gen_enum);
406468
dataset!(MIX_DATA, MIX_SCHEMA, gen_mixed);
407469
dataset!(NEST_DATA, NEST_SCHEMA, gen_nested);
408470

409-
fn bench_scenario(
471+
fn bench_with_decoder<F>(
410472
c: &mut Criterion,
411473
name: &str,
412-
schema_json: &'static str,
413474
data_sets: &[Vec<u8>],
414-
utf8view: bool,
415-
batch_size: usize,
416-
) {
475+
rows: &[usize],
476+
mut new_decoder: F,
477+
) where
478+
F: FnMut() -> arrow_avro::reader::Decoder,
479+
{
417480
let mut group = c.benchmark_group(name);
418-
for (idx, &rows) in SIZES.iter().enumerate() {
481+
for (idx, &row_count) in rows.iter().enumerate() {
419482
let datum = &data_sets[idx];
420483
group.throughput(Throughput::Bytes(datum.len() as u64));
421-
match rows {
484+
match row_count {
422485
10_000 => {
423486
group
424487
.sample_size(25)
@@ -433,9 +496,9 @@ fn bench_scenario(
433496
}
434497
_ => {}
435498
}
436-
group.bench_function(BenchmarkId::from_parameter(rows), |b| {
499+
group.bench_function(BenchmarkId::from_parameter(row_count), |b| {
437500
b.iter_batched_ref(
438-
|| new_decoder(schema_json, batch_size, utf8view),
501+
&mut new_decoder,
439502
|decoder| {
440503
black_box(decoder.decode(datum).unwrap());
441504
black_box(decoder.flush().unwrap().unwrap());
@@ -449,105 +512,75 @@ fn bench_scenario(
449512

450513
fn criterion_benches(c: &mut Criterion) {
451514
for &batch_size in &[SMALL_BATCH, LARGE_BATCH] {
452-
bench_scenario(
453-
c,
454-
"Interval",
455-
INTERVAL_SCHEMA,
456-
&INTERVAL_DATA,
457-
false,
458-
batch_size,
459-
);
460-
bench_scenario(c, "Int32", INT_SCHEMA, &INT_DATA, false, batch_size);
461-
bench_scenario(c, "Int64", LONG_SCHEMA, &LONG_DATA, false, batch_size);
462-
bench_scenario(c, "Float32", FLOAT_SCHEMA, &FLOAT_DATA, false, batch_size);
463-
bench_scenario(c, "Boolean", BOOL_SCHEMA, &BOOL_DATA, false, batch_size);
464-
bench_scenario(c, "Float64", DOUBLE_SCHEMA, &DOUBLE_DATA, false, batch_size);
465-
bench_scenario(
466-
c,
467-
"Binary(Bytes)",
468-
BYTES_SCHEMA,
469-
&BYTES_DATA,
470-
false,
471-
batch_size,
472-
);
473-
bench_scenario(c, "String", STRING_SCHEMA, &STRING_DATA, false, batch_size);
474-
bench_scenario(
475-
c,
476-
"StringView",
477-
STRING_SCHEMA,
478-
&STRING_DATA,
479-
true,
480-
batch_size,
481-
);
482-
bench_scenario(c, "Date32", DATE_SCHEMA, &DATE_DATA, false, batch_size);
483-
bench_scenario(
484-
c,
485-
"TimeMillis",
486-
TMILLIS_SCHEMA,
487-
&TMILLIS_DATA,
488-
false,
489-
batch_size,
490-
);
491-
bench_scenario(
492-
c,
493-
"TimeMicros",
494-
TMICROS_SCHEMA,
495-
&TMICROS_DATA,
496-
false,
497-
batch_size,
498-
);
499-
bench_scenario(
500-
c,
501-
"TimestampMillis",
502-
TSMILLIS_SCHEMA,
503-
&TSMILLIS_DATA,
504-
false,
505-
batch_size,
506-
);
507-
bench_scenario(
508-
c,
509-
"TimestampMicros",
510-
TSMICROS_SCHEMA,
511-
&TSMICROS_DATA,
512-
false,
513-
batch_size,
514-
);
515-
bench_scenario(c, "Map", MAP_SCHEMA, &MAP_DATA, false, batch_size);
516-
bench_scenario(c, "Array", ARRAY_SCHEMA, &ARRAY_DATA, false, batch_size);
517-
bench_scenario(
518-
c,
519-
"Decimal128",
520-
DECIMAL_SCHEMA,
521-
&DECIMAL_DATA,
522-
false,
523-
batch_size,
524-
);
525-
bench_scenario(c, "UUID", UUID_SCHEMA, &UUID_DATA, false, batch_size);
526-
bench_scenario(
527-
c,
528-
"FixedSizeBinary",
529-
FIXED_SCHEMA,
530-
&FIXED_DATA,
531-
false,
532-
batch_size,
533-
);
534-
bench_scenario(
535-
c,
536-
"Enum(Dictionary)",
537-
ENUM_SCHEMA,
538-
&ENUM_DATA,
539-
false,
540-
batch_size,
541-
);
542-
bench_scenario(c, "Mixed", MIX_SCHEMA, &MIX_DATA, false, batch_size);
543-
bench_scenario(
544-
c,
545-
"Nested(Struct)",
546-
NEST_SCHEMA,
547-
&NEST_DATA,
548-
false,
549-
batch_size,
550-
);
515+
bench_with_decoder(c, "Interval", &INTERVAL_DATA, &SIZES, || {
516+
new_decoder(INTERVAL_SCHEMA, batch_size, false)
517+
});
518+
bench_with_decoder(c, "Int32", &INT_DATA, &SIZES, || {
519+
new_decoder(INT_SCHEMA, batch_size, false)
520+
});
521+
bench_with_decoder(c, "Int32_Id", &INT_DATA_ID, &SIZES, || {
522+
new_decoder_id(INT_SCHEMA, batch_size, false, ID_BENCH_ID)
523+
});
524+
bench_with_decoder(c, "Int64", &LONG_DATA, &SIZES, || {
525+
new_decoder(LONG_SCHEMA, batch_size, false)
526+
});
527+
bench_with_decoder(c, "Float32", &FLOAT_DATA, &SIZES, || {
528+
new_decoder(FLOAT_SCHEMA, batch_size, false)
529+
});
530+
bench_with_decoder(c, "Boolean", &BOOL_DATA, &SIZES, || {
531+
new_decoder(BOOL_SCHEMA, batch_size, false)
532+
});
533+
bench_with_decoder(c, "Float64", &DOUBLE_DATA, &SIZES, || {
534+
new_decoder(DOUBLE_SCHEMA, batch_size, false)
535+
});
536+
bench_with_decoder(c, "Binary(Bytes)", &BYTES_DATA, &SIZES, || {
537+
new_decoder(BYTES_SCHEMA, batch_size, false)
538+
});
539+
bench_with_decoder(c, "String", &STRING_DATA, &SIZES, || {
540+
new_decoder(STRING_SCHEMA, batch_size, false)
541+
});
542+
bench_with_decoder(c, "StringView", &STRING_DATA, &SIZES, || {
543+
new_decoder(STRING_SCHEMA, batch_size, true)
544+
});
545+
bench_with_decoder(c, "Date32", &DATE_DATA, &SIZES, || {
546+
new_decoder(DATE_SCHEMA, batch_size, false)
547+
});
548+
bench_with_decoder(c, "TimeMillis", &TMILLIS_DATA, &SIZES, || {
549+
new_decoder(TMILLIS_SCHEMA, batch_size, false)
550+
});
551+
bench_with_decoder(c, "TimeMicros", &TMICROS_DATA, &SIZES, || {
552+
new_decoder(TMICROS_SCHEMA, batch_size, false)
553+
});
554+
bench_with_decoder(c, "TimestampMillis", &TSMILLIS_DATA, &SIZES, || {
555+
new_decoder(TSMILLIS_SCHEMA, batch_size, false)
556+
});
557+
bench_with_decoder(c, "TimestampMicros", &TSMICROS_DATA, &SIZES, || {
558+
new_decoder(TSMICROS_SCHEMA, batch_size, false)
559+
});
560+
bench_with_decoder(c, "Map", &MAP_DATA, &SIZES, || {
561+
new_decoder(MAP_SCHEMA, batch_size, false)
562+
});
563+
bench_with_decoder(c, "Array", &ARRAY_DATA, &SIZES, || {
564+
new_decoder(ARRAY_SCHEMA, batch_size, false)
565+
});
566+
bench_with_decoder(c, "Decimal128", &DECIMAL_DATA, &SIZES, || {
567+
new_decoder(DECIMAL_SCHEMA, batch_size, false)
568+
});
569+
bench_with_decoder(c, "UUID", &UUID_DATA, &SIZES, || {
570+
new_decoder(UUID_SCHEMA, batch_size, false)
571+
});
572+
bench_with_decoder(c, "FixedSizeBinary", &FIXED_DATA, &SIZES, || {
573+
new_decoder(FIXED_SCHEMA, batch_size, false)
574+
});
575+
bench_with_decoder(c, "Enum(Dictionary)", &ENUM_DATA, &SIZES, || {
576+
new_decoder(ENUM_SCHEMA, batch_size, false)
577+
});
578+
bench_with_decoder(c, "Mixed", &MIX_DATA, &SIZES, || {
579+
new_decoder(MIX_SCHEMA, batch_size, false)
580+
});
581+
bench_with_decoder(c, "Nested(Struct)", &NEST_DATA, &SIZES, || {
582+
new_decoder(NEST_SCHEMA, batch_size, false)
583+
});
551584
}
552585
}
553586

0 commit comments

Comments
 (0)