1616// under the License.
1717
1818use std:: collections:: HashMap ;
19- use std:: future:: Future ;
20- use std:: pin:: Pin ;
21- use std:: sync:: { Arc , OnceLock , RwLock } ;
22- use std:: task:: { Context , Poll } ;
19+ use std:: sync:: Arc ;
2320
2421use futures:: channel:: oneshot;
2522use futures:: future:: join_all;
2623use futures:: { StreamExt , TryStreamExt } ;
24+ use tokio:: sync:: oneshot:: { channel, Receiver } ;
2725
26+ use super :: delete_filter:: { DeleteFilter , EqDelFuture } ;
2827use crate :: arrow:: record_batch_transformer:: RecordBatchTransformer ;
2928use crate :: arrow:: ArrowReader ;
3029use crate :: delete_vector:: DeleteVector ;
31- use crate :: expr:: Predicate :: AlwaysTrue ;
32- use crate :: expr:: { Bind , BoundPredicate , Predicate } ;
30+ use crate :: expr:: Predicate ;
3331use crate :: io:: FileIO ;
34- use crate :: scan:: { ArrowRecordBatchStream , FileScanTask , FileScanTaskDeleteFile } ;
32+ use crate :: scan:: { ArrowRecordBatchStream , FileScanTaskDeleteFile } ;
3533use crate :: spec:: { DataContentType , Schema , SchemaRef } ;
3634use crate :: { Error , ErrorKind , Result } ;
3735
3836#[ allow( unused) ]
39- pub trait DeleteFileManager {
37+ pub trait DeleteFileLoader {
4038 /// Read the delete file referred to in the task
4139 ///
42- /// Returns the raw contents of the delete file as a RecordBatch stream
43- fn read_delete_file ( task : & FileScanTaskDeleteFile ) -> Result < ArrowRecordBatchStream > ;
40+ /// Returns the contents of the delete file as a RecordBatch stream. Applies schema evolution.
41+ async fn read_delete_file (
42+ & self ,
43+ task : & FileScanTaskDeleteFile ,
44+ schema : SchemaRef ,
45+ ) -> Result < ArrowRecordBatchStream > ;
4446}
4547
4648#[ allow( unused) ]
4749#[ derive( Clone , Debug ) ]
48- pub ( crate ) struct CachingDeleteFileManager {
50+ pub ( crate ) struct CachingDeleteFileLoader {
4951 file_io : FileIO ,
5052 concurrency_limit_data_files : usize ,
51- state : Arc < RwLock < DeleteFileManagerState > > ,
52- }
53-
54- impl DeleteFileManager for CachingDeleteFileManager {
55- fn read_delete_file ( _task : & FileScanTaskDeleteFile ) -> Result < ArrowRecordBatchStream > {
56- // TODO, implementation in https://github.com/apache/iceberg-rust/pull/982
57-
58- Err ( Error :: new (
59- ErrorKind :: FeatureUnsupported ,
60- "Reading delete files is not yet supported" ,
61- ) )
62- }
63- }
64- // Equality deletes may apply to more than one DataFile in a scan, and so
65- // the same equality delete file may be present in more than one invocation of
66- // DeleteFileManager::load_deletes in the same scan. We want to deduplicate these
67- // to avoid having to load them twice, so we immediately store cloneable futures in the
68- // state that can be awaited upon to get te EQ deletes. That way we can check to see if
69- // a load of each Eq delete file is already in progress and avoid starting another one.
70- #[ derive( Debug , Clone ) ]
71- struct EqDelFuture {
72- result : OnceLock < Predicate > ,
73- }
74-
75- impl EqDelFuture {
76- pub fn new ( ) -> ( oneshot:: Sender < Predicate > , Self ) {
77- let ( tx, rx) = oneshot:: channel ( ) ;
78- let result = OnceLock :: new ( ) ;
79-
80- crate :: runtime:: spawn ( {
81- let result = result. clone ( ) ;
82- async move { result. set ( rx. await . unwrap ( ) ) }
83- } ) ;
84-
85- ( tx, Self { result } )
86- }
53+ del_filter : DeleteFilter ,
8754}
8855
89- impl Future for EqDelFuture {
90- type Output = Predicate ;
56+ impl DeleteFileLoader for CachingDeleteFileLoader {
57+ async fn read_delete_file (
58+ & self ,
59+ task : & FileScanTaskDeleteFile ,
60+ schema : SchemaRef ,
61+ ) -> Result < ArrowRecordBatchStream > {
62+ let raw_batch_stream =
63+ CachingDeleteFileLoader :: parquet_to_batch_stream ( & task. file_path , self . file_io . clone ( ) )
64+ . await ?;
9165
92- fn poll ( self : Pin < & mut Self > , _cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
93- match self . result . get ( ) {
94- None => Poll :: Pending ,
95- Some ( predicate) => Poll :: Ready ( predicate. clone ( ) ) ,
96- }
66+ Self :: evolve_schema ( raw_batch_stream, schema) . await
9767 }
9868}
9969
100- #[ derive( Debug , Default ) ]
101- struct DeleteFileManagerState {
102- // delete vectors and positional deletes get merged when loaded into a single delete vector
103- // per data file
104- delete_vectors : HashMap < String , Arc < RwLock < DeleteVector > > > ,
105-
106- // equality delete files are parsed into unbound `Predicate`s. We store them here as
107- // cloneable futures (see note below)
108- equality_deletes : HashMap < String , EqDelFuture > ,
109- }
110-
111- type StateRef = Arc < RwLock < DeleteFileManagerState > > ;
112-
11370// Intermediate context during processing of a delete file task.
11471enum DeleteFileContext {
11572 // TODO: Delete Vector loader from Puffin files
@@ -130,12 +87,12 @@ enum ParsedDeleteFileContext {
13087}
13188
13289#[ allow( unused_variables) ]
133- impl CachingDeleteFileManager {
90+ impl CachingDeleteFileLoader {
13491 pub ( crate ) fn new ( file_io : FileIO , concurrency_limit_data_files : usize ) -> Self {
135- CachingDeleteFileManager {
92+ CachingDeleteFileLoader {
13693 file_io,
13794 concurrency_limit_data_files,
138- state : Arc :: new ( Default :: default ( ) ) ,
95+ del_filter : DeleteFilter :: default ( ) ,
13996 }
14097 }
14198
@@ -169,7 +126,7 @@ impl CachingDeleteFileManager {
169126 /// vector maps that resulted from any positional delete or delete vector files into a
170127 /// single map and persist it in the state.
171128 ///
172- ///
129+ ///
173130 /// Conceptually, the data flow is like this:
174131 /// ```none
175132 /// FileScanTaskDeleteFile
@@ -204,59 +161,74 @@ impl CachingDeleteFileManager {
204161 /// |
205162 /// [join!]
206163 /// ```
207- pub ( crate ) async fn load_deletes (
164+ pub ( crate ) fn load_deletes (
208165 & self ,
209166 delete_file_entries : & [ FileScanTaskDeleteFile ] ,
210167 schema : SchemaRef ,
211- ) -> Result < ( ) > {
168+ ) -> Receiver < Result < DeleteFilter > > {
169+ let ( tx, rx) = channel ( ) ;
170+
212171 let stream_items = delete_file_entries
213172 . iter ( )
214173 . map ( |t| {
215174 (
216175 t. clone ( ) ,
217176 self . file_io . clone ( ) ,
218- self . state . clone ( ) ,
177+ self . del_filter . clone ( ) ,
219178 schema. clone ( ) ,
220179 )
221180 } )
222181 . collect :: < Vec < _ > > ( ) ;
223- // NOTE: removing the collect and just passing the iterator to futures::stream:iter
224- // results in an error 'implementation of `std::ops::FnOnce` is not general enough'
225-
226- let task_stream = futures:: stream:: iter ( stream_items. into_iter ( ) ) ;
182+ let task_stream = futures:: stream:: iter ( stream_items) ;
183+ let del_filter = self . del_filter . clone ( ) ;
184+ let concurrency_limit_data_files = self . concurrency_limit_data_files ;
185+ crate :: runtime:: spawn ( async move {
186+ let result = async move {
187+ let mut del_filter = del_filter;
188+
189+ let results: Vec < ParsedDeleteFileContext > = task_stream
190+ . map ( move |( task, file_io, del_filter, schema) | async move {
191+ Self :: load_file_for_task ( & task, file_io, del_filter, schema) . await
192+ } )
193+ . map ( move |ctx| {
194+ Ok ( async { Self :: parse_file_content_for_task ( ctx. await ?) . await } )
195+ } )
196+ . try_buffer_unordered ( concurrency_limit_data_files)
197+ . try_collect :: < Vec < _ > > ( )
198+ . await ?;
199+
200+ // wait for all in-progress EQ deletes from other tasks
201+ let _ = join_all ( results. iter ( ) . filter_map ( |i| {
202+ if let ParsedDeleteFileContext :: InProgEqDel ( fut) = i {
203+ Some ( fut. clone ( ) )
204+ } else {
205+ None
206+ }
207+ } ) )
208+ . await ;
209+
210+ for item in results {
211+ if let ParsedDeleteFileContext :: DelVecs ( hash_map) = item {
212+ for ( data_file_path, delete_vector) in hash_map. into_iter ( ) {
213+ del_filter. upsert_delete_vector ( data_file_path, delete_vector) ;
214+ }
215+ }
216+ }
227217
228- let results: Vec < ParsedDeleteFileContext > = task_stream
229- . map ( move |( task, file_io, state_ref, schema) | async {
230- Self :: load_file_for_task ( task, file_io, state_ref, schema) . await
231- } )
232- . map ( move |ctx| Ok ( async { Self :: parse_file_content_for_task ( ctx. await ?) . await } ) )
233- . try_buffer_unordered ( self . concurrency_limit_data_files )
234- . try_collect :: < Vec < _ > > ( )
235- . await ?;
236-
237- // wait for all in-progress EQ deletes from other tasks
238- let _ = join_all ( results. iter ( ) . filter_map ( |i| {
239- if let ParsedDeleteFileContext :: InProgEqDel ( fut) = i {
240- Some ( fut. clone ( ) )
241- } else {
242- None
218+ Ok ( del_filter)
243219 }
244- } ) )
245- . await ;
246-
247- let merged_delete_vectors = results
248- . into_iter ( )
249- . fold ( HashMap :: default ( ) , Self :: merge_delete_vectors) ;
220+ . await ;
250221
251- self . state . write ( ) . unwrap ( ) . delete_vectors = merged_delete_vectors;
222+ let _ = tx. send ( result) ;
223+ } ) ;
252224
253- Ok ( ( ) )
225+ rx
254226 }
255227
256228 async fn load_file_for_task (
257- task : FileScanTaskDeleteFile ,
229+ task : & FileScanTaskDeleteFile ,
258230 file_io : FileIO ,
259- state : StateRef ,
231+ del_filter : DeleteFilter ,
260232 schema : SchemaRef ,
261233 ) -> Result < DeleteFileContext > {
262234 match task. file_type {
@@ -266,16 +238,15 @@ impl CachingDeleteFileManager {
266238
267239 DataContentType :: EqualityDeletes => {
268240 let sender = {
269- let mut state = state. write ( ) . unwrap ( ) ;
270- if let Some ( existing) = state. equality_deletes . get ( & task. file_path ) {
241+ if let Some ( existing) = del_filter
242+ . get_equality_delete_predicate_for_delete_file_path ( & task. file_path )
243+ {
271244 return Ok ( DeleteFileContext :: InProgEqDel ( existing. clone ( ) ) ) ;
272245 }
273246
274247 let ( sender, fut) = EqDelFuture :: new ( ) ;
275248
276- state
277- . equality_deletes
278- . insert ( task. file_path . to_string ( ) , fut) ;
249+ del_filter. insert_equality_delete ( task. file_path . to_string ( ) , fut) ;
279250
280251 sender
281252 } ;
@@ -327,23 +298,6 @@ impl CachingDeleteFileManager {
327298 }
328299 }
329300
330- fn merge_delete_vectors (
331- mut merged_delete_vectors : HashMap < String , Arc < RwLock < DeleteVector > > > ,
332- item : ParsedDeleteFileContext ,
333- ) -> HashMap < String , Arc < RwLock < DeleteVector > > > {
334- if let ParsedDeleteFileContext :: DelVecs ( del_vecs) = item {
335- del_vecs. into_iter ( ) . for_each ( |( key, val) | {
336- let entry = merged_delete_vectors. entry ( key) . or_default ( ) ;
337- {
338- let mut inner = entry. write ( ) . unwrap ( ) ;
339- ( * inner) . intersect_assign ( & val) ;
340- }
341- } ) ;
342- }
343-
344- merged_delete_vectors
345- }
346-
347301 /// Loads a RecordBatchStream for a given datafile.
348302 async fn parquet_to_batch_stream (
349303 data_file_path : & str ,
@@ -416,72 +370,6 @@ impl CachingDeleteFileManager {
416370 "parsing of equality deletes is not yet supported" ,
417371 ) )
418372 }
419-
420- /// Builds eq delete predicate for the provided task.
421- ///
422- /// Must await on load_deletes before calling this.
423- pub ( crate ) async fn build_delete_predicate_for_task (
424- & self ,
425- file_scan_task : & FileScanTask ,
426- ) -> Result < Option < BoundPredicate > > {
427- // * Filter the task's deletes into just the Equality deletes
428- // * Retrieve the unbound predicate for each from self.state.equality_deletes
429- // * Logical-AND them all together to get a single combined `Predicate`
430- // * Bind the predicate to the task's schema to get a `BoundPredicate`
431-
432- let mut combined_predicate = AlwaysTrue ;
433- for delete in & file_scan_task. deletes {
434- if !is_equality_delete ( delete) {
435- continue ;
436- }
437-
438- let predicate = {
439- let state = self . state . read ( ) . unwrap ( ) ;
440-
441- let Some ( predicate) = state. equality_deletes . get ( & delete. file_path ) else {
442- return Err ( Error :: new (
443- ErrorKind :: Unexpected ,
444- format ! (
445- "Missing predicate for equality delete file '{}'" ,
446- delete. file_path
447- ) ,
448- ) ) ;
449- } ;
450-
451- predicate. clone ( )
452- } ;
453-
454- combined_predicate = combined_predicate. and ( predicate. await ) ;
455- }
456-
457- if combined_predicate == AlwaysTrue {
458- return Ok ( None ) ;
459- }
460-
461- // TODO: handle case-insensitive case
462- let bound_predicate = combined_predicate. bind ( file_scan_task. schema . clone ( ) , false ) ?;
463- Ok ( Some ( bound_predicate) )
464- }
465-
466- /// Retrieve a delete vector for the data file associated with a given file scan task
467- ///
468- /// Should only be called after awaiting on load_deletes. Takes the vector to avoid a
469- /// clone since each item is specific to a single data file and won't need to be used again
470- pub ( crate ) fn get_delete_vector_for_task (
471- & self ,
472- file_scan_task : & FileScanTask ,
473- ) -> Option < Arc < RwLock < DeleteVector > > > {
474- self . state
475- . write ( )
476- . unwrap ( )
477- . delete_vectors
478- . get ( file_scan_task. data_file_path ( ) )
479- . map ( Clone :: clone)
480- }
481- }
482-
483- pub ( crate ) fn is_equality_delete ( f : & FileScanTaskDeleteFile ) -> bool {
484- matches ! ( f. file_type, DataContentType :: EqualityDeletes )
485373}
486374
487375#[ cfg( test) ]
@@ -498,6 +386,7 @@ mod tests {
498386 use tempfile:: TempDir ;
499387
500388 use super :: * ;
389+ use crate :: scan:: FileScanTask ;
501390 use crate :: spec:: { DataFileFormat , Schema } ;
502391
503392 type ArrowSchemaRef = Arc < ArrowSchema > ;
@@ -516,13 +405,14 @@ mod tests {
516405
517406 // Note that with the delete file parsing not yet in place, all we can test here is that
518407 // the call to the loader fails with the expected FeatureUnsupportedError.
519- let delete_file_manager = CachingDeleteFileManager :: new ( file_io. clone ( ) , 10 ) ;
408+ let delete_file_manager = CachingDeleteFileLoader :: new ( file_io. clone ( ) , 10 ) ;
520409
521410 let file_scan_tasks = setup ( table_location) ;
522411
523412 let result = delete_file_manager
524413 . load_deletes ( & file_scan_tasks[ 0 ] . deletes , file_scan_tasks[ 0 ] . schema_ref ( ) )
525- . await ;
414+ . await
415+ . unwrap ( ) ;
526416
527417 assert ! ( result. is_err_and( |e| e. kind( ) == ErrorKind :: FeatureUnsupported ) ) ;
528418 }
0 commit comments