Skip to content

Commit 0608fd1

Browse files
committed
Address Remaining PR Comments
1 parent 8e6face commit 0608fd1

File tree

3 files changed

+81
-160
lines changed

3 files changed

+81
-160
lines changed

arrow-avro/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ bzip2 = { version = "0.6.0", optional = true }
5656
xz = { version = "0.1", default-features = false, optional = true }
5757
crc = { version = "3.0", optional = true }
5858
uuid = "1.17"
59-
indexmap = "2"
59+
indexmap = "2.10"
6060

6161
[dev-dependencies]
6262
arrow-data = { workspace = true }

arrow-avro/src/reader/mod.rs

Lines changed: 39 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@
8888
8989
use crate::codec::{AvroField, AvroFieldBuilder};
9090
use crate::schema::{
91-
compare_schemas, Fingerprint, FingerprintAlgorithm, Schema as AvroSchema, SchemaStore,
92-
SINGLE_OBJECT_MAGIC,
91+
compare_schemas, generate_fingerprint, Fingerprint, FingerprintAlgorithm, Schema as AvroSchema,
92+
SchemaStore, SINGLE_OBJECT_MAGIC,
9393
};
9494
use arrow_array::{RecordBatch, RecordBatchReader};
9595
use arrow_schema::{ArrowError, SchemaRef};
@@ -140,13 +140,12 @@ pub struct Decoder {
140140
active_decoder: RecordDecoder,
141141
active_fingerprint: Option<Fingerprint>,
142142
batch_size: usize,
143-
decoded_rows: usize,
143+
remaining_capacity: usize,
144144
cache: IndexMap<Fingerprint, RecordDecoder>,
145145
max_cache_size: usize,
146146
reader_schema: Option<AvroSchema<'static>>,
147-
schema_store: Option<SchemaStore<'static>>,
147+
writer_schema_store: Option<SchemaStore<'static>>,
148148
utf8_view: bool,
149-
static_store_mode: bool,
150149
strict_mode: bool,
151150
pending_schema: Option<(Fingerprint, RecordDecoder)>,
152151
}
@@ -169,33 +168,33 @@ impl Decoder {
169168
/// Returns the number of bytes consumed.
170169
pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
171170
let mut total_consumed = 0usize;
172-
let hash_type = self.schema_store.as_ref().map_or(
171+
let hash_type = self.writer_schema_store.as_ref().map_or(
173172
FingerprintAlgorithm::Rabin,
174173
SchemaStore::fingerprint_algorithm,
175174
);
176-
while total_consumed < data.len() && self.decoded_rows < self.batch_size {
175+
while total_consumed < data.len() && self.remaining_capacity > 0 {
177176
if let Some(prefix_bytes) = self.handle_prefix(&data[total_consumed..], hash_type)? {
178-
// Schema change detected (> 0) or there were insufficient bytes to read the next prefix (= 0).
179-
// If the former, the batch must end because the next record has a different schema.
180-
// If the latter, batch ends because the caller needs to fetch more bytes.
177+
// A batch is complete when its `remaining_capacity` is 0. It may be completed early if
178+
// a schema change is detected or there are insufficient bytes to read the next prefix.
179+
// A schema change requires a new batch.
181180
total_consumed += prefix_bytes;
182181
break;
183182
}
184183
let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
185184
total_consumed += n;
186-
self.decoded_rows += 1;
185+
self.remaining_capacity -= 1;
187186
}
188187
Ok(total_consumed)
189188
}
190189

191190
/// Produce a `RecordBatch` if at least one row is fully decoded, returning
192191
/// `Ok(None)` if no new rows are available.
193192
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
194-
if self.decoded_rows == 0 {
193+
if self.remaining_capacity == self.batch_size {
195194
return Ok(None);
196195
}
197196
let batch = self.active_decoder.flush()?;
198-
self.decoded_rows = 0;
197+
self.remaining_capacity = self.batch_size;
199198
// Apply a pending schema switch if one is staged
200199
if let Some((new_fingerprint, new_decoder)) = self.pending_schema.take() {
201200
// Cache the old decoder before replacing it
@@ -210,7 +209,6 @@ impl Decoder {
210209
self.active_decoder = new_decoder;
211210
}
212211
}
213-
self.evict_cache();
214212
Ok(Some(batch))
215213
}
216214

@@ -220,10 +218,7 @@ impl Decoder {
220218
buf: &[u8],
221219
hash_type: FingerprintAlgorithm,
222220
) -> Result<Option<usize>, ArrowError> {
223-
if self.schema_store.is_none()
224-
|| self.static_store_mode
225-
|| !buf.starts_with(&SINGLE_OBJECT_MAGIC)
226-
{
221+
if self.writer_schema_store.is_none() || !buf.starts_with(&SINGLE_OBJECT_MAGIC) {
227222
return Ok(None);
228223
}
229224
let full_len = prefix_len(hash_type);
@@ -247,8 +242,8 @@ impl Decoder {
247242
self.prepare_schema_switch(new_fp)?;
248243
// If there are already decoded rows, we must flush them first.
249244
// Forcing the batch to be full ensures `flush` is called next.
250-
if self.decoded_rows > 0 {
251-
self.decoded_rows = self.batch_size;
245+
if self.remaining_capacity < self.batch_size {
246+
self.remaining_capacity = 0;
252247
}
253248
}
254249
Ok(Some(full_len))
@@ -260,7 +255,7 @@ impl Decoder {
260255
} else {
261256
// No cached decoder, create a new one
262257
let store = self
263-
.schema_store
258+
.writer_schema_store
264259
.as_ref()
265260
.ok_or_else(|| ArrowError::ParseError("Schema store unavailable".into()))?;
266261
let writer_schema = store.lookup(&new_fingerprint).ok_or_else(|| {
@@ -282,23 +277,14 @@ impl Decoder {
282277
Ok(())
283278
}
284279

285-
#[inline]
286-
fn evict_cache(&mut self) {
287-
while self.cache.len() > self.max_cache_size {
288-
if let Some(lru_key) = self.cache.keys().next().cloned() {
289-
self.cache.shift_remove(&lru_key);
290-
}
291-
}
292-
}
293-
294280
/// Returns the number of rows that can be added to this decoder before it is full.
295281
pub fn capacity(&self) -> usize {
296-
self.batch_size.saturating_sub(self.decoded_rows)
282+
self.remaining_capacity
297283
}
298284

299285
/// Returns true if the decoder has reached its capacity for the current batch.
300286
pub fn batch_is_full(&self) -> bool {
301-
self.capacity() == 0
287+
self.remaining_capacity == 0
302288
}
303289
}
304290

@@ -312,7 +298,6 @@ pub struct ReaderBuilder {
312298
reader_schema: Option<AvroSchema<'static>>,
313299
writer_schema_store: Option<SchemaStore<'static>>,
314300
active_fingerprint: Option<Fingerprint>,
315-
static_store_mode: bool,
316301
decoder_cache_size: usize,
317302
}
318303

@@ -325,7 +310,6 @@ impl Default for ReaderBuilder {
325310
reader_schema: None,
326311
writer_schema_store: None,
327312
active_fingerprint: None,
328-
static_store_mode: false,
329313
decoder_cache_size: 20,
330314
}
331315
}
@@ -367,20 +351,18 @@ impl ReaderBuilder {
367351
active_decoder: RecordDecoder,
368352
active_fingerprint: Option<Fingerprint>,
369353
reader_schema: Option<AvroSchema<'static>>,
370-
schema_store: Option<SchemaStore<'static>>,
371-
static_store_mode: bool,
354+
writer_schema_store: Option<SchemaStore<'static>>,
372355
) -> Decoder {
373356
Decoder {
374357
batch_size: self.batch_size,
375-
decoded_rows: 0,
358+
remaining_capacity: self.batch_size,
376359
active_fingerprint,
377360
active_decoder,
378361
cache: IndexMap::new(),
379362
max_cache_size: self.decoder_cache_size,
380363
reader_schema,
381364
utf8_view: self.utf8_view,
382-
schema_store,
383-
static_store_mode,
365+
writer_schema_store,
384366
strict_mode: self.strict_mode,
385367
pending_schema: None,
386368
}
@@ -397,39 +379,36 @@ impl ReaderBuilder {
397379
})?;
398380
let record_decoder =
399381
self.make_record_decoder(&writer_schema, self.reader_schema.as_ref())?;
400-
Ok(self.make_decoder_with_parts(record_decoder, None, None, None, true))
382+
Ok(self.make_decoder_with_parts(record_decoder, None, None, None))
401383
}
402384
None => {
403385
let reader_schema = self.reader_schema.clone().ok_or_else(|| {
404386
ArrowError::ParseError("Reader schema required for raw Avro".into())
405387
})?;
406388
let (init_fingerprint, initial_decoder) =
407-
match (&self.writer_schema_store, self.active_fingerprint) {
389+
if let (Some(schema_store), Some(fingerprint)) =
390+
(&self.writer_schema_store, self.active_fingerprint)
391+
{
408392
// An initial fingerprint is provided, use it to look up the first schema.
409-
(Some(schema_store), Some(fingerprint)) => {
410-
let writer_schema =
411-
schema_store.lookup(&fingerprint).ok_or_else(|| {
412-
ArrowError::ParseError(
413-
"Active fingerprint not found in schema store".into(),
414-
)
415-
})?;
416-
let decoder =
417-
self.make_record_decoder(writer_schema, Some(&reader_schema))?;
418-
(Some(fingerprint), decoder)
419-
}
393+
let writer_schema = schema_store.lookup(&fingerprint).ok_or_else(|| {
394+
ArrowError::ParseError(
395+
"Active fingerprint not found in schema store".into(),
396+
)
397+
})?;
398+
let decoder =
399+
self.make_record_decoder(writer_schema, Some(&reader_schema))?;
400+
(Some(fingerprint), decoder)
401+
} else {
420402
// No initial fingerprint; the first record must contain one.
421-
// A temporary decoder is created from the reader schema.
422-
_ => {
423-
let decoder = self.make_record_decoder(&reader_schema, None)?;
424-
(None, decoder)
425-
}
403+
// A decoder is created from the reader schema only.
404+
let decoder = self.make_record_decoder(&reader_schema, None)?;
405+
(None, decoder)
426406
};
427407
Ok(self.make_decoder_with_parts(
428408
initial_decoder,
429409
init_fingerprint,
430410
Some(reader_schema),
431411
self.writer_schema_store.clone(),
432-
self.static_store_mode,
433412
))
434413
}
435414
}
@@ -493,18 +472,6 @@ impl ReaderBuilder {
493472
self
494473
}
495474

496-
/// If `true`, all schemas must be pre-registered in the `SchemaStore`.
497-
///
498-
/// When this mode is enabled, decoding will fail if a schema fingerprint is
499-
/// encountered that does not already exist in the store. This prevents the
500-
/// dynamic resolution of schemas and ensures that only known schemas are used.
501-
///
502-
/// Defaults to `false`.
503-
pub fn with_static_store_mode(mut self, enabled: bool) -> Self {
504-
self.static_store_mode = enabled;
505-
self
506-
}
507-
508475
/// Set the maximum number of decoders to cache.
509476
///
510477
/// When dealing with Avro files that contain multiple schemas, we may need to switch
@@ -521,20 +488,13 @@ impl ReaderBuilder {
521488
self.writer_schema_store.as_ref(),
522489
self.reader_schema.as_ref(),
523490
self.active_fingerprint.as_ref(),
524-
self.static_store_mode,
525491
) {
526-
(Some(_), None, _, _) => Err(ArrowError::ParseError(
492+
(Some(_), None, _) => Err(ArrowError::ParseError(
527493
"Reader schema must be set when writer schema store is provided".into(),
528494
)),
529-
(None, _, Some(_), _) => Err(ArrowError::ParseError(
495+
(None, _, Some(_)) => Err(ArrowError::ParseError(
530496
"Active fingerprint requires a writer schema store".into(),
531497
)),
532-
(None, _, _, true) => Err(ArrowError::ParseError(
533-
"static_store_mode=true requires a writer schema store".into(),
534-
)),
535-
(Some(_), _, None, true) => Err(ArrowError::ParseError(
536-
"static_store_mode=true requires an active fingerprint".into(),
537-
)),
538498
_ => Ok(()),
539499
}
540500
}
@@ -780,32 +740,6 @@ mod test {
780740
assert_eq!(store.fingerprint_algorithm(), FingerprintAlgorithm::Rabin);
781741
}
782742

783-
#[test]
784-
fn test_static_store_mode_ignores_subsequent_prefix() {
785-
let (store, fp_int, fp_long, schema_int, _schema_long) = make_two_schema_store();
786-
let mut decoder = ReaderBuilder::new()
787-
.with_batch_size(8)
788-
.with_reader_schema(schema_int.clone())
789-
.with_writer_schema_store(store)
790-
.with_active_fingerprint(fp_int)
791-
.with_static_store_mode(true)
792-
.build_decoder()
793-
.expect("build decoder");
794-
let prefix = make_prefix(fp_long);
795-
match decoder.decode(&prefix) {
796-
Err(ArrowError::ParseError(_)) => {
797-
assert!(
798-
decoder.pending_schema.is_none(),
799-
"no schema switch should be staged"
800-
);
801-
}
802-
Ok(n) => {
803-
panic!("decode unexpectedly succeeded (consumed {n} bytes) in static_store_mode")
804-
}
805-
Err(e) => panic!("unexpected error kind: {e}"),
806-
}
807-
}
808-
809743
#[test]
810744
fn test_unknown_fingerprint_is_error() {
811745
let (mut store, fp_int, _fp_long, schema_int, _schema_long) = make_two_schema_store();

0 commit comments

Comments
 (0)