1717
1818//! Table scan api.
1919
20- use std:: collections:: HashMap ;
20+ use std:: collections:: { HashMap , HashSet } ;
2121use std:: sync:: { Arc , RwLock } ;
2222
2323use arrow_array:: RecordBatch ;
2424use futures:: channel:: mpsc:: { channel, Sender } ;
2525use futures:: stream:: BoxStream ;
2626use futures:: { SinkExt , StreamExt , TryFutureExt , TryStreamExt } ;
27+ use itertools:: Itertools ;
2728use serde:: { Deserialize , Serialize } ;
2829
2930use crate :: arrow:: ArrowReaderBuilder ;
@@ -38,7 +39,7 @@ use crate::io::FileIO;
3839use crate :: runtime:: spawn;
3940use crate :: spec:: {
4041 DataContentType , DataFileFormat , ManifestContentType , ManifestEntryRef , ManifestFile ,
41- ManifestList , Schema , SchemaRef , SnapshotRef , TableMetadataRef ,
42+ ManifestList , ManifestStatus , Operation , Schema , SchemaRef , SnapshotRef , TableMetadataRef ,
4243} ;
4344use crate :: table:: Table ;
4445use crate :: utils:: available_parallelism;
@@ -55,6 +56,10 @@ pub struct TableScanBuilder<'a> {
5556 // Defaults to none which means select all columns
5657 column_names : Option < Vec < String > > ,
5758 snapshot_id : Option < i64 > ,
59+ /// Exclusive. Used for incremental scan.
60+ from_snapshot_id : Option < i64 > ,
61+ /// Inclusive. Used for incremental scan.
62+ to_snapshot_id : Option < i64 > ,
5863 batch_size : Option < usize > ,
5964 case_sensitive : bool ,
6065 filter : Option < Predicate > ,
@@ -78,6 +83,8 @@ impl<'a> TableScanBuilder<'a> {
7883 table,
7984 column_names : None ,
8085 snapshot_id : None ,
86+ from_snapshot_id : None ,
87+ to_snapshot_id : None ,
8188 batch_size : None ,
8289 case_sensitive : true ,
8390 filter : None ,
@@ -140,6 +147,18 @@ impl<'a> TableScanBuilder<'a> {
140147 self
141148 }
142149
150+ /// Set the starting snapshot id (exclusive) for incremental scan.
151+ pub fn from_snapshot_id ( mut self , from_snapshot_id : i64 ) -> Self {
152+ self . from_snapshot_id = Some ( from_snapshot_id) ;
153+ self
154+ }
155+
156+ /// Set the ending snapshot id (inclusive) for incremental scan.
157+ pub fn to_snapshot_id ( mut self , to_snapshot_id : i64 ) -> Self {
158+ self . to_snapshot_id = Some ( to_snapshot_id) ;
159+ self
160+ }
161+
143162 /// Sets the concurrency limit for both manifest files and manifest
144163 /// entries for this scan
145164 pub fn with_concurrency_limit ( mut self , limit : usize ) -> Self {
@@ -227,6 +246,8 @@ impl<'a> TableScanBuilder<'a> {
227246 } ) ?
228247 . clone ( ) ,
229248 } ;
249+ // TODO: we should validate either snapshot (snapshot scan) or
250+ // from_snapshot_id and to_snapshot_id are set (incremental scan)
230251
231252 let schema = snapshot. schema ( self . table . metadata ( ) ) ?;
232253
@@ -297,6 +318,8 @@ impl<'a> TableScanBuilder<'a> {
297318 snapshot_bound_predicate : snapshot_bound_predicate. map ( Arc :: new) ,
298319 object_cache : self . table . object_cache ( ) ,
299320 field_ids : Arc :: new ( field_ids) ,
321+ from_snapshot_id : self . from_snapshot_id ,
322+ to_snapshot_id : self . to_snapshot_id ,
300323 partition_filter_cache : Arc :: new ( PartitionFilterCache :: new ( ) ) ,
301324 manifest_evaluator_cache : Arc :: new ( ManifestEvaluatorCache :: new ( ) ) ,
302325 expression_evaluator_cache : Arc :: new ( ExpressionEvaluatorCache :: new ( ) ) ,
@@ -354,6 +377,8 @@ struct PlanContext {
354377 snapshot_bound_predicate : Option < Arc < BoundPredicate > > ,
355378 object_cache : Arc < ObjectCache > ,
356379 field_ids : Arc < Vec < i32 > > ,
380+ from_snapshot_id : Option < i64 > ,
381+ to_snapshot_id : Option < i64 > ,
357382
358383 partition_filter_cache : Arc < PartitionFilterCache > ,
359384 manifest_evaluator_cache : Arc < ManifestEvaluatorCache > ,
@@ -375,13 +400,74 @@ impl TableScan {
375400 // used to stream the results back to the caller
376401 let ( file_scan_task_tx, file_scan_task_rx) = channel ( concurrency_limit_manifest_entries) ;
377402
403+ if let ( Some ( from_snapshot_id) , Some ( to_snapshot_id) ) = (
404+ self . plan_context . from_snapshot_id ,
405+ self . plan_context . to_snapshot_id ,
406+ ) {
407+ // Incremental scan mode
408+ let added_files = added_files_between (
409+ & self . plan_context . object_cache ,
410+ & self . plan_context . table_metadata ,
411+ to_snapshot_id,
412+ from_snapshot_id,
413+ )
414+ . await ?;
415+
416+ for entry in added_files {
417+ let manifest_entry_context = ManifestEntryContext {
418+ manifest_entry : entry,
419+ expression_evaluator_cache : self
420+ . plan_context
421+ . expression_evaluator_cache
422+ . clone ( ) ,
423+ field_ids : self . plan_context . field_ids . clone ( ) ,
424+ bound_predicates : None , // TODO: support predicates in incremental scan
425+ partition_spec_id : 0 , // TODO: get correct partition spec id
426+ // It's used to skip any data file whose partition data indicates that it can't contain
427+ // any data that matches this scan's filter
428+ snapshot_schema : self . plan_context . snapshot_schema . clone ( ) ,
429+ // delete is not supported in incremental scan
430+ delete_file_index : None ,
431+ } ;
432+
433+ manifest_entry_data_ctx_tx
434+ . clone ( )
435+ . send ( manifest_entry_context)
436+ . await
437+ . map_err ( |_| Error :: new ( ErrorKind :: Unexpected , "mpsc channel SendError" ) ) ?;
438+ }
439+
440+ let mut channel_for_manifest_entry_error = file_scan_task_tx. clone ( ) ;
441+
442+ // Process the [`ManifestEntry`] stream in parallel
443+ spawn ( async move {
444+ let result = manifest_entry_data_ctx_rx
445+ . map ( |me_ctx| Ok ( ( me_ctx, file_scan_task_tx. clone ( ) ) ) )
446+ . try_for_each_concurrent (
447+ concurrency_limit_manifest_entries,
448+ |( manifest_entry_context, tx) | async move {
449+ spawn ( async move {
450+ Self :: process_data_manifest_entry ( manifest_entry_context, tx) . await
451+ } )
452+ . await
453+ } ,
454+ )
455+ . await ;
456+
457+ if let Err ( error) = result {
458+ let _ = channel_for_manifest_entry_error. send ( Err ( error) ) . await ;
459+ }
460+ } ) ;
461+
462+ return Ok ( file_scan_task_rx. boxed ( ) ) ;
463+ }
464+
378465 let delete_file_idx_and_tx: Option < ( DeleteFileIndex , Sender < DeleteFileContext > ) > =
379466 if self . delete_file_processing_enabled {
380467 Some ( DeleteFileIndex :: new ( ) )
381468 } else {
382469 None
383470 } ;
384-
385471 let manifest_list = self . plan_context . get_manifest_list ( ) . await ?;
386472
387473 // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
@@ -1146,6 +1232,100 @@ impl FileScanTask {
11461232 }
11471233}
11481234
1235+ struct Ancestors {
1236+ next : Option < SnapshotRef > ,
1237+ get_snapshot : Box < dyn Fn ( i64 ) -> Option < SnapshotRef > + Send > ,
1238+ }
1239+
1240+ impl Iterator for Ancestors {
1241+ type Item = SnapshotRef ;
1242+
1243+ fn next ( & mut self ) -> Option < Self :: Item > {
1244+ let snapshot = self . next . take ( ) ?;
1245+ let result = snapshot. clone ( ) ;
1246+ self . next = snapshot
1247+ . parent_snapshot_id ( )
1248+ . and_then ( |id| ( self . get_snapshot ) ( id) ) ;
1249+ Some ( result)
1250+ }
1251+ }
1252+
1253+ /// Iterate starting from `snapshot` (inclusive) to the root snapshot.
1254+ fn ancestors_of (
1255+ table_metadata : & TableMetadataRef ,
1256+ snapshot : i64 ,
1257+ ) -> Box < dyn Iterator < Item = SnapshotRef > + Send > {
1258+ if let Some ( snapshot) = table_metadata. snapshot_by_id ( snapshot) {
1259+ let table_metadata = table_metadata. clone ( ) ;
1260+ Box :: new ( Ancestors {
1261+ next : Some ( snapshot. clone ( ) ) ,
1262+ get_snapshot : Box :: new ( move |id| table_metadata. snapshot_by_id ( id) . cloned ( ) ) ,
1263+ } )
1264+ } else {
1265+ Box :: new ( std:: iter:: empty ( ) )
1266+ }
1267+ }
1268+
1269+ /// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
1270+ fn ancestors_between (
1271+ table_metadata : & TableMetadataRef ,
1272+ latest_snapshot_id : i64 ,
1273+ oldest_snapshot_id : i64 ,
1274+ ) -> Box < dyn Iterator < Item = SnapshotRef > + Send > {
1275+ if latest_snapshot_id == oldest_snapshot_id {
1276+ return Box :: new ( std:: iter:: empty ( ) ) ;
1277+ }
1278+
1279+ Box :: new (
1280+ ancestors_of ( table_metadata, latest_snapshot_id)
1281+ . take_while ( move |snapshot| snapshot. snapshot_id ( ) != oldest_snapshot_id) ,
1282+ )
1283+ }
1284+
1285+ /// Get all added files between two snapshots.
1286+ /// The files in `latest_snapshot_id` (inclusive) but not in `oldest_snapshot_id` (exclusive).
1287+ async fn added_files_between (
1288+ object_cache : & ObjectCache ,
1289+ table_metadata : & TableMetadataRef ,
1290+ latest_snapshot_id : i64 ,
1291+ oldest_snapshot_id : i64 ,
1292+ ) -> Result < Vec < ManifestEntryRef > > {
1293+ let mut added_files = vec ! [ ] ;
1294+
1295+ let append_snapshots =
1296+ ancestors_between ( table_metadata, latest_snapshot_id, oldest_snapshot_id)
1297+ . filter ( |snapshot| matches ! ( snapshot. summary( ) . operation, Operation :: Append ) )
1298+ . collect_vec ( ) ;
1299+ let snapshot_ids: HashSet < i64 > = append_snapshots
1300+ . iter ( )
1301+ . map ( |snapshot| snapshot. snapshot_id ( ) )
1302+ . collect ( ) ;
1303+
1304+ for snapshot in append_snapshots {
1305+ let manifest_list = object_cache
1306+ . get_manifest_list ( & snapshot, & table_metadata)
1307+ . await ?;
1308+
1309+ for manifest_file in manifest_list. entries ( ) {
1310+ if !snapshot_ids. contains ( & manifest_file. added_snapshot_id ) {
1311+ continue ;
1312+ }
1313+ let manifest = object_cache. get_manifest ( & manifest_file) . await ?;
1314+ let entries = manifest. entries ( ) . into_iter ( ) . cloned ( ) . filter ( |entry| {
1315+ matches ! ( entry. status( ) , ManifestStatus :: Added )
1316+ && (
1317+ // Is it possible that the snapshot id here is not contained?
1318+ entry. snapshot_id ( ) . is_none ( )
1319+ || snapshot_ids. contains ( & entry. snapshot_id ( ) . unwrap ( ) )
1320+ )
1321+ } ) ;
1322+ added_files. extend ( entries) ;
1323+ }
1324+ }
1325+
1326+ Ok ( added_files)
1327+ }
1328+
11491329#[ cfg( test) ]
11501330pub mod tests {
11511331 use std:: collections:: HashMap ;
0 commit comments