@@ -234,14 +234,15 @@ impl StandardTableProvider {
234
234
let Ok ( staging) = PARSEABLE . get_stream ( & self . stream ) else {
235
235
return Ok ( ( ) ) ;
236
236
} ;
237
- let records = staging. recordbatches_cloned ( & self . schema ) ;
238
- let reversed_mem_table = reversed_mem_table ( records, self . schema . clone ( ) ) ?;
239
237
240
- let memory_exec = reversed_mem_table
238
+ // Staging arrow exection plan
239
+ let records = staging. recordbatches_cloned ( & self . schema ) ;
240
+ let arrow_exec = reversed_mem_table ( records, self . schema . clone ( ) ) ?
241
241
. scan ( state, projection, filters, limit)
242
242
. await ?;
243
- execution_plans. push ( memory_exec ) ;
243
+ execution_plans. push ( arrow_exec ) ;
244
244
245
+ // Partititon parquet files on disk among the available CPUs
245
246
let target_partition = num_cpus:: get ( ) ;
246
247
let mut partitioned_files = Vec :: from_iter ( ( 0 ..target_partition) . map ( |_| Vec :: new ( ) ) ) ;
247
248
for ( index, file_path) in staging. parquet_files ( ) . into_iter ( ) . enumerate ( ) {
@@ -252,6 +253,9 @@ impl StandardTableProvider {
252
253
partitioned_files[ index % target_partition] . push ( file)
253
254
}
254
255
256
+ // NOTE: There is the possibility of a parquet file being pushed to object store
257
+ // and deleted from staging in the time it takes for datafusion to get to it.
258
+ // Staging parquet execution plan
255
259
self . create_parquet_physical_plan (
256
260
execution_plans,
257
261
ObjectStoreUrl :: parse ( "file:///" ) . unwrap ( ) ,
0 commit comments