1818use std:: collections:: HashMap ;
1919use std:: sync:: Arc ;
2020
21+ use arrow_array:: { Int64Array , StringArray } ;
2122use futures:: channel:: oneshot;
2223use futures:: future:: join_all;
2324use futures:: { StreamExt , TryStreamExt } ;
@@ -347,14 +348,44 @@ impl CachingDeleteFileLoader {
347348 ///
348349 /// Returns a map of data file path to a delete vector
349350 async fn parse_positional_deletes_record_batch_stream (
350- stream : ArrowRecordBatchStream ,
351+ mut stream : ArrowRecordBatchStream ,
351352 ) -> Result < HashMap < String , DeleteVector > > {
352- // TODO
353+ let mut result: HashMap < String , DeleteVector > = HashMap :: default ( ) ;
354+
355+ while let Some ( batch) = stream. next ( ) . await {
356+ let batch = batch?;
357+ let schema = batch. schema ( ) ;
358+ let columns = batch. columns ( ) ;
359+
360+ let Some ( file_paths) = columns[ 0 ] . as_any ( ) . downcast_ref :: < StringArray > ( ) else {
361+ return Err ( Error :: new (
362+ ErrorKind :: DataInvalid ,
363+ "Could not downcast file paths array to StringArray" ,
364+ ) ) ;
365+ } ;
366+ let Some ( positions) = columns[ 1 ] . as_any ( ) . downcast_ref :: < Int64Array > ( ) else {
367+ return Err ( Error :: new (
368+ ErrorKind :: DataInvalid ,
369+ "Could not downcast positions array to Int64Array" ,
370+ ) ) ;
371+ } ;
372+
373+ for ( file_path, pos) in file_paths. iter ( ) . zip ( positions. iter ( ) ) {
374+ let ( Some ( file_path) , Some ( pos) ) = ( file_path, pos) else {
375+ return Err ( Error :: new (
376+ ErrorKind :: DataInvalid ,
377+ "null values in delete file" ,
378+ ) ) ;
379+ } ;
353380
354- Err ( Error :: new (
355- ErrorKind :: FeatureUnsupported ,
356- "parsing of positional deletes is not yet supported" ,
357- ) )
381+ result
382+ . entry ( file_path. to_string ( ) )
383+ . or_default ( )
384+ . insert ( pos as u64 ) ;
385+ }
386+ }
387+
388+ Ok ( result)
358389 }
359390
360391 /// Parses record batch streams from individual equality delete files
@@ -395,7 +426,7 @@ mod tests {
395426 const FIELD_ID_POSITIONAL_DELETE_POS : u64 = 2147483545 ;
396427
397428 #[ tokio:: test]
398- async fn test_delete_file_manager_load_deletes ( ) {
429+ async fn test_delete_file_loader_load_deletes ( ) {
399430 let tmp_dir = TempDir :: new ( ) . unwrap ( ) ;
400431 let table_location = tmp_dir. path ( ) ;
401432 let file_io = FileIO :: from_path ( table_location. as_os_str ( ) . to_str ( ) . unwrap ( ) )
@@ -405,37 +436,76 @@ mod tests {
405436
406437 // Note that with the delete file parsing not yet in place, all we can test here is that
407438 // the call to the loader fails with the expected FeatureUnsupportedError.
408- let delete_file_manager = CachingDeleteFileLoader :: new ( file_io. clone ( ) , 10 ) ;
439+ let delete_file_loader = CachingDeleteFileLoader :: new ( file_io. clone ( ) , 10 ) ;
409440
410441 let file_scan_tasks = setup ( table_location) ;
411442
412- let result = delete_file_manager
443+ let delete_filter = delete_file_loader
413444 . load_deletes ( & file_scan_tasks[ 0 ] . deletes , file_scan_tasks[ 0 ] . schema_ref ( ) )
414445 . await
446+ . unwrap ( )
447+ . unwrap ( ) ;
448+
449+ let result = delete_filter
450+ . get_delete_vector ( & file_scan_tasks[ 0 ] )
415451 . unwrap ( ) ;
452+ assert_eq ! ( result. lock( ) . unwrap( ) . len( ) , 3 ) ; // pos dels from pos del file 1 and 2
416453
417- assert ! ( result. is_err_and( |e| e. kind( ) == ErrorKind :: FeatureUnsupported ) ) ;
454+ let result = delete_filter
455+ . get_delete_vector ( & file_scan_tasks[ 1 ] )
456+ . unwrap ( ) ;
457+ assert_eq ! ( result. lock( ) . unwrap( ) . len( ) , 3 ) ; // pos dels from pos del file 3
418458 }
419459
420460 fn setup ( table_location : & Path ) -> Vec < FileScanTask > {
421461 let data_file_schema = Arc :: new ( Schema :: builder ( ) . build ( ) . unwrap ( ) ) ;
422462 let positional_delete_schema = create_pos_del_schema ( ) ;
423463
424- let file_path_values = vec ! [ format!( "{}/1.parquet" , table_location. to_str( ) . unwrap( ) ) ; 8 ] ;
425- let pos_values = vec ! [ 0 , 1 , 3 , 5 , 6 , 8 , 1022 , 1023 ] ;
426-
427- let file_path_col = Arc :: new ( StringArray :: from_iter_values ( file_path_values) ) ;
428- let pos_col = Arc :: new ( Int64Array :: from_iter_values ( pos_values) ) ;
464+ let mut file_path_values = vec ! [ ] ;
465+ let mut pos_values = vec ! [ ] ;
466+
467+ file_path_values. push ( vec ! [
468+ format!(
469+ "{}/1.parquet" ,
470+ table_location. to_str( ) . unwrap( )
471+ ) ;
472+ 3
473+ ] ) ;
474+ pos_values. push ( vec ! [ 0 , 1 , 3 ] ) ;
475+
476+ file_path_values. push ( vec ! [
477+ format!(
478+ "{}/1.parquet" ,
479+ table_location. to_str( ) . unwrap( )
480+ ) ;
481+ 3
482+ ] ) ;
483+ pos_values. push ( vec ! [ 5 , 6 , 8 ] ) ;
484+
485+ file_path_values. push ( vec ! [
486+ format!(
487+ "{}/2.parquet" ,
488+ table_location. to_str( ) . unwrap( )
489+ ) ;
490+ 3
491+ ] ) ;
492+ pos_values. push ( vec ! [ 1022 , 1023 , 1024 ] ) ;
493+ // 9 rows in total pos deleted across 3 files
429494
430495 let props = WriterProperties :: builder ( )
431496 . set_compression ( Compression :: SNAPPY )
432497 . build ( ) ;
433498
434499 for n in 1 ..=3 {
500+ let file_path_col = Arc :: new ( StringArray :: from_iter_values (
501+ file_path_values. pop ( ) . unwrap ( ) ,
502+ ) ) ;
503+ let pos_col = Arc :: new ( Int64Array :: from_iter_values ( pos_values. pop ( ) . unwrap ( ) ) ) ;
504+
435505 let positional_deletes_to_write =
436506 RecordBatch :: try_new ( positional_delete_schema. clone ( ) , vec ! [
437507 file_path_col. clone( ) ,
438- pos_col. clone ( ) ,
508+ pos_col,
439509 ] )
440510 . unwrap ( ) ;
441511
@@ -486,7 +556,7 @@ mod tests {
486556 start: 0 ,
487557 length: 0 ,
488558 record_count: None ,
489- data_file_path: "" . to_string ( ) ,
559+ data_file_path: format! ( "{}/1.parquet" , table_location . to_str ( ) . unwrap ( ) ) ,
490560 data_file_content: DataContentType :: Data ,
491561 data_file_format: DataFileFormat :: Parquet ,
492562 schema: data_file_schema. clone( ) ,
@@ -498,13 +568,13 @@ mod tests {
498568 start: 0 ,
499569 length: 0 ,
500570 record_count: None ,
501- data_file_path: "" . to_string ( ) ,
571+ data_file_path: format! ( "{}/2.parquet" , table_location . to_str ( ) . unwrap ( ) ) ,
502572 data_file_content: DataContentType :: Data ,
503573 data_file_format: DataFileFormat :: Parquet ,
504574 schema: data_file_schema. clone( ) ,
505575 project_field_ids: vec![ ] ,
506576 predicate: None ,
507- deletes: vec![ pos_del_2 , pos_del_3] ,
577+ deletes: vec![ pos_del_3] ,
508578 } ,
509579 ] ;
510580
0 commit comments