1717
1818//! Table scan api.
1919
20- use std:: collections:: HashMap ;
20+ use std:: collections:: { HashMap , HashSet } ;
2121use std:: sync:: { Arc , RwLock } ;
2222
2323use arrow_array:: RecordBatch ;
2424use futures:: channel:: mpsc:: { channel, Sender } ;
2525use futures:: stream:: BoxStream ;
2626use futures:: { SinkExt , StreamExt , TryFutureExt , TryStreamExt } ;
27+ use itertools:: Itertools ;
2728use serde:: { Deserialize , Serialize } ;
2829
2930use crate :: arrow:: ArrowReaderBuilder ;
@@ -38,7 +39,7 @@ use crate::io::FileIO;
3839use crate :: runtime:: spawn;
3940use crate :: spec:: {
4041 DataContentType , DataFileFormat , ManifestContentType , ManifestEntryRef , ManifestFile ,
41- ManifestList , Schema , SchemaRef , SnapshotRef , TableMetadataRef ,
42+ ManifestList , ManifestStatus , Operation , Schema , SchemaRef , SnapshotRef , TableMetadataRef ,
4243} ;
4344use crate :: table:: Table ;
4445use crate :: utils:: available_parallelism;
@@ -55,6 +56,10 @@ pub struct TableScanBuilder<'a> {
5556 // Defaults to none which means select all columns
5657 column_names : Option < Vec < String > > ,
5758 snapshot_id : Option < i64 > ,
59+ /// Exclusive. Used for incremental scan.
60+ from_snapshot_id : Option < i64 > ,
61+ /// Inclusive. Used for incremental scan.
62+ to_snapshot_id : Option < i64 > ,
5863 batch_size : Option < usize > ,
5964 case_sensitive : bool ,
6065 filter : Option < Predicate > ,
@@ -78,6 +83,8 @@ impl<'a> TableScanBuilder<'a> {
7883 table,
7984 column_names : None ,
8085 snapshot_id : None ,
86+ from_snapshot_id : None ,
87+ to_snapshot_id : None ,
8188 batch_size : None ,
8289 case_sensitive : true ,
8390 filter : None ,
@@ -140,6 +147,18 @@ impl<'a> TableScanBuilder<'a> {
140147 self
141148 }
142149
150+ /// Set the starting snapshot id (exclusive) for incremental scan.
151+ pub fn from_snapshot_id ( mut self , from_snapshot_id : i64 ) -> Self {
152+ self . from_snapshot_id = Some ( from_snapshot_id) ;
153+ self
154+ }
155+
156+ /// Set the ending snapshot id (inclusive) for incremental scan.
157+ pub fn to_snapshot_id ( mut self , to_snapshot_id : i64 ) -> Self {
158+ self . to_snapshot_id = Some ( to_snapshot_id) ;
159+ self
160+ }
161+
143162 /// Sets the concurrency limit for both manifest files and manifest
144163 /// entries for this scan
145164 pub fn with_concurrency_limit ( mut self , limit : usize ) -> Self {
@@ -206,6 +225,25 @@ impl<'a> TableScanBuilder<'a> {
206225
207226 /// Build the table scan.
208227 pub fn build ( self ) -> Result < TableScan > {
228+ // Validate that we have either a snapshot scan or an incremental scan configuration
229+ if self . from_snapshot_id . is_some ( ) || self . to_snapshot_id . is_some ( ) {
230+ // For incremental scan, we need to_snapshot_id to be set. from_snapshot_id is optional.
231+ if self . to_snapshot_id . is_none ( ) {
232+ return Err ( Error :: new (
233+ ErrorKind :: DataInvalid ,
234+ "Incremental scan requires to_snapshot_id to be set" ,
235+ ) ) ;
236+ }
237+
238+ // snapshot_id should not be set for incremental scan
239+ if self . snapshot_id . is_some ( ) {
240+ return Err ( Error :: new (
241+ ErrorKind :: DataInvalid ,
242+ "snapshot_id should not be set for incremental scan. Use from_snapshot_id and to_snapshot_id instead." ,
243+ ) ) ;
244+ }
245+ }
246+
209247 let snapshot = match self . snapshot_id {
210248 Some ( snapshot_id) => self
211249 . table
@@ -227,7 +265,6 @@ impl<'a> TableScanBuilder<'a> {
227265 } ) ?
228266 . clone ( ) ,
229267 } ;
230-
231268 let schema = snapshot. schema ( self . table . metadata ( ) ) ?;
232269
233270 // Check that all column names exist in the schema.
@@ -297,6 +334,8 @@ impl<'a> TableScanBuilder<'a> {
297334 snapshot_bound_predicate : snapshot_bound_predicate. map ( Arc :: new) ,
298335 object_cache : self . table . object_cache ( ) ,
299336 field_ids : Arc :: new ( field_ids) ,
337+ from_snapshot_id : self . from_snapshot_id ,
338+ to_snapshot_id : self . to_snapshot_id ,
300339 partition_filter_cache : Arc :: new ( PartitionFilterCache :: new ( ) ) ,
301340 manifest_evaluator_cache : Arc :: new ( ManifestEvaluatorCache :: new ( ) ) ,
302341 expression_evaluator_cache : Arc :: new ( ExpressionEvaluatorCache :: new ( ) ) ,
@@ -358,6 +397,11 @@ struct PlanContext {
358397 partition_filter_cache : Arc < PartitionFilterCache > ,
359398 manifest_evaluator_cache : Arc < ManifestEvaluatorCache > ,
360399 expression_evaluator_cache : Arc < ExpressionEvaluatorCache > ,
400+
401+ // for incremental scan.
402+ // If `to_snapshot_id` is set, it means incremental scan. `from_snapshot_id` can be `None`.
403+ from_snapshot_id : Option < i64 > ,
404+ to_snapshot_id : Option < i64 > ,
361405}
362406
363407impl TableScan {
@@ -375,6 +419,65 @@ impl TableScan {
375419 // used to stream the results back to the caller
376420 let ( file_scan_task_tx, file_scan_task_rx) = channel ( concurrency_limit_manifest_entries) ;
377421
422+ if let Some ( to_snapshot_id) = self . plan_context . to_snapshot_id {
423+ // Incremental scan mode
424+ let added_files = added_files_between (
425+ & self . plan_context . object_cache ,
426+ & self . plan_context . table_metadata ,
427+ to_snapshot_id,
428+ self . plan_context . from_snapshot_id ,
429+ )
430+ . await ?;
431+
432+ for entry in added_files {
433+ let manifest_entry_context = ManifestEntryContext {
434+ manifest_entry : entry,
435+ expression_evaluator_cache : self
436+ . plan_context
437+ . expression_evaluator_cache
438+ . clone ( ) ,
439+ field_ids : self . plan_context . field_ids . clone ( ) ,
440+ bound_predicates : None , // TODO: support predicates in incremental scan
441+ partition_spec_id : 0 , // TODO: get correct partition spec id
442+ // It's used to skip any data file whose partition data indicates that it can't contain
443+ // any data that matches this scan's filter
444+ snapshot_schema : self . plan_context . snapshot_schema . clone ( ) ,
445+ // delete is not supported in incremental scan
446+ delete_file_index : None ,
447+ } ;
448+
449+ manifest_entry_data_ctx_tx
450+ . clone ( )
451+ . send ( manifest_entry_context)
452+ . await
453+ . map_err ( |_| Error :: new ( ErrorKind :: Unexpected , "mpsc channel SendError" ) ) ?;
454+ }
455+
456+ let mut channel_for_manifest_entry_error = file_scan_task_tx. clone ( ) ;
457+
458+ // Process the [`ManifestEntry`] stream in parallel
459+ spawn ( async move {
460+ let result = manifest_entry_data_ctx_rx
461+ . map ( |me_ctx| Ok ( ( me_ctx, file_scan_task_tx. clone ( ) ) ) )
462+ . try_for_each_concurrent (
463+ concurrency_limit_manifest_entries,
464+ |( manifest_entry_context, tx) | async move {
465+ spawn ( async move {
466+ Self :: process_data_manifest_entry ( manifest_entry_context, tx) . await
467+ } )
468+ . await
469+ } ,
470+ )
471+ . await ;
472+
473+ if let Err ( error) = result {
474+ let _ = channel_for_manifest_entry_error. send ( Err ( error) ) . await ;
475+ }
476+ } ) ;
477+
478+ return Ok ( file_scan_task_rx. boxed ( ) ) ;
479+ }
480+
378481 let delete_file_idx_and_tx: Option < ( DeleteFileIndex , Sender < DeleteFileContext > ) > =
379482 if self . delete_file_processing_enabled {
380483 Some ( DeleteFileIndex :: new ( ) )
@@ -1146,6 +1249,104 @@ impl FileScanTask {
11461249 }
11471250}
11481251
1252+ struct Ancestors {
1253+ next : Option < SnapshotRef > ,
1254+ get_snapshot : Box < dyn Fn ( i64 ) -> Option < SnapshotRef > + Send > ,
1255+ }
1256+
1257+ impl Iterator for Ancestors {
1258+ type Item = SnapshotRef ;
1259+
1260+ fn next ( & mut self ) -> Option < Self :: Item > {
1261+ let snapshot = self . next . take ( ) ?;
1262+ let result = snapshot. clone ( ) ;
1263+ self . next = snapshot
1264+ . parent_snapshot_id ( )
1265+ . and_then ( |id| ( self . get_snapshot ) ( id) ) ;
1266+ Some ( result)
1267+ }
1268+ }
1269+
1270+ /// Iterate starting from `snapshot` (inclusive) to the root snapshot.
1271+ fn ancestors_of (
1272+ table_metadata : & TableMetadataRef ,
1273+ snapshot : i64 ,
1274+ ) -> Box < dyn Iterator < Item = SnapshotRef > + Send > {
1275+ if let Some ( snapshot) = table_metadata. snapshot_by_id ( snapshot) {
1276+ let table_metadata = table_metadata. clone ( ) ;
1277+ Box :: new ( Ancestors {
1278+ next : Some ( snapshot. clone ( ) ) ,
1279+ get_snapshot : Box :: new ( move |id| table_metadata. snapshot_by_id ( id) . cloned ( ) ) ,
1280+ } )
1281+ } else {
1282+ Box :: new ( std:: iter:: empty ( ) )
1283+ }
1284+ }
1285+
1286+ /// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
1287+ fn ancestors_between (
1288+ table_metadata : & TableMetadataRef ,
1289+ latest_snapshot_id : i64 ,
1290+ oldest_snapshot_id : Option < i64 > ,
1291+ ) -> Box < dyn Iterator < Item = SnapshotRef > + Send > {
1292+ let Some ( oldest_snapshot_id) = oldest_snapshot_id else {
1293+ return Box :: new ( ancestors_of ( table_metadata, latest_snapshot_id) ) ;
1294+ } ;
1295+
1296+ if latest_snapshot_id == oldest_snapshot_id {
1297+ return Box :: new ( std:: iter:: empty ( ) ) ;
1298+ }
1299+
1300+ Box :: new (
1301+ ancestors_of ( table_metadata, latest_snapshot_id)
1302+ . take_while ( move |snapshot| snapshot. snapshot_id ( ) != oldest_snapshot_id) ,
1303+ )
1304+ }
1305+
1306+ /// Get all added files between two snapshots.
1307+ /// The files in `latest_snapshot_id` (inclusive) but not in `oldest_snapshot_id` (exclusive).
1308+ async fn added_files_between (
1309+ object_cache : & ObjectCache ,
1310+ table_metadata : & TableMetadataRef ,
1311+ latest_snapshot_id : i64 ,
1312+ oldest_snapshot_id : Option < i64 > ,
1313+ ) -> Result < Vec < ManifestEntryRef > > {
1314+ let mut added_files = vec ! [ ] ;
1315+
1316+ let append_snapshots =
1317+ ancestors_between ( table_metadata, latest_snapshot_id, oldest_snapshot_id)
1318+ . filter ( |snapshot| matches ! ( snapshot. summary( ) . operation, Operation :: Append ) )
1319+ . collect_vec ( ) ;
1320+ let snapshot_ids: HashSet < i64 > = append_snapshots
1321+ . iter ( )
1322+ . map ( |snapshot| snapshot. snapshot_id ( ) )
1323+ . collect ( ) ;
1324+
1325+ for snapshot in append_snapshots {
1326+ let manifest_list = object_cache
1327+ . get_manifest_list ( & snapshot, table_metadata)
1328+ . await ?;
1329+
1330+ for manifest_file in manifest_list. entries ( ) {
1331+ if !snapshot_ids. contains ( & manifest_file. added_snapshot_id ) {
1332+ continue ;
1333+ }
1334+ let manifest = object_cache. get_manifest ( manifest_file) . await ?;
1335+ let entries = manifest. entries ( ) . iter ( ) . filter ( |entry| {
1336+ matches ! ( entry. status( ) , ManifestStatus :: Added )
1337+ && (
1338+ // Is it possible that the snapshot id here is not contained?
1339+ entry. snapshot_id ( ) . is_none ( )
1340+ || snapshot_ids. contains ( & entry. snapshot_id ( ) . unwrap ( ) )
1341+ )
1342+ } ) ;
1343+ added_files. extend ( entries. cloned ( ) ) ;
1344+ }
1345+ }
1346+
1347+ Ok ( added_files)
1348+ }
1349+
11491350#[ cfg( test) ]
11501351pub mod tests {
11511352 use std:: collections:: HashMap ;
0 commit comments