Skip to content

Commit ad87946

Browse files
xxchanchenzl25
authored andcommitted
feat: support incremental scan between 2 snapshots (#13)
1 parent 5c46965 commit ad87946

File tree

2 files changed

+173
-12
lines changed

2 files changed

+173
-12
lines changed

crates/iceberg/src/scan/context.rs

Lines changed: 130 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::collections::HashSet;
1819
use std::sync::Arc;
1920

2021
use futures::channel::mpsc::Sender;
2122
use futures::{SinkExt, TryFutureExt};
23+
use itertools::Itertools;
2224

2325
use crate::delete_file_index::DeleteFileIndex;
2426
use crate::expr::{Bind, BoundPredicate, Predicate};
@@ -28,11 +30,12 @@ use crate::scan::{
2830
PartitionFilterCache,
2931
};
3032
use crate::spec::{
31-
ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, SchemaRef, SnapshotRef,
32-
TableMetadataRef,
33+
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList,
34+
ManifestStatus, Operation, SchemaRef, SnapshotRef, TableMetadataRef,
3335
};
3436
use crate::{Error, ErrorKind, Result};
3537

38+
type ManifestEntryFilterFn = dyn Fn(&ManifestEntryRef) -> bool + Send + Sync;
3639
/// Wraps a [`ManifestFile`] alongside the objects that are needed
3740
/// to process it in a thread-safe manner
3841
pub(crate) struct ManifestFileContext {
@@ -46,6 +49,10 @@ pub(crate) struct ManifestFileContext {
4649
snapshot_schema: SchemaRef,
4750
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
4851
delete_file_index: DeleteFileIndex,
52+
53+
/// filter manifest entries.
54+
/// Used for different kind of scans, e.g., only scan newly added files without delete files.
55+
filter_fn: Option<Arc<ManifestEntryFilterFn>>,
4956
}
5057

5158
/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -74,12 +81,13 @@ impl ManifestFileContext {
7481
mut sender,
7582
expression_evaluator_cache,
7683
delete_file_index,
77-
..
84+
filter_fn,
7885
} = self;
86+
let filter_fn = filter_fn.unwrap_or_else(|| Arc::new(|_| true));
7987

8088
let manifest = object_cache.get_manifest(&manifest_file).await?;
8189

82-
for manifest_entry in manifest.entries() {
90+
for manifest_entry in manifest.entries().iter().filter(|e| filter_fn(e)) {
8391
let manifest_entry_context = ManifestEntryContext {
8492
// TODO: refactor to avoid the expensive ManifestEntry clone
8593
manifest_entry: manifest_entry.clone(),
@@ -149,6 +157,11 @@ pub(crate) struct PlanContext {
149157
pub partition_filter_cache: Arc<PartitionFilterCache>,
150158
pub manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
151159
pub expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
160+
161+
// for incremental scan.
162+
// If `to_snapshot_id` is set, it means incremental scan. `from_snapshot_id` can be `None`.
163+
pub from_snapshot_id: Option<i64>,
164+
pub to_snapshot_id: Option<i64>,
152165
}
153166

154167
impl PlanContext {
@@ -180,18 +193,71 @@ impl PlanContext {
180193
Ok(partition_filter)
181194
}
182195

183-
pub(crate) fn build_manifest_file_contexts(
196+
pub(crate) async fn build_manifest_file_contexts(
184197
&self,
185198
manifest_list: Arc<ManifestList>,
186199
tx_data: Sender<ManifestEntryContext>,
187200
delete_file_idx: DeleteFileIndex,
188201
delete_file_tx: Sender<ManifestEntryContext>,
189202
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
190-
let manifest_files = manifest_list.entries().iter();
203+
let mut filter_fn: Option<Arc<ManifestEntryFilterFn>> = None;
204+
let manifest_files = {
205+
if let Some(to_snapshot_id) = self.to_snapshot_id {
206+
// Incremental scan mode:
207+
// Get all added files between two snapshots.
208+
// - data files in `Append` and `Overwrite` snapshots are included.
209+
// - delete files are ignored
210+
// - `Replace` snapshots (e.g., compaction) are ignored.
211+
//
212+
// `latest_snapshot_id` is inclusive, `oldest_snapshot_id` is exclusive.
213+
214+
let snapshots =
215+
ancestors_between(&self.table_metadata, to_snapshot_id, self.from_snapshot_id)
216+
.filter(|snapshot| {
217+
matches!(
218+
snapshot.summary().operation,
219+
Operation::Append | Operation::Overwrite
220+
)
221+
})
222+
.collect_vec();
223+
let snapshot_ids: HashSet<i64> = snapshots
224+
.iter()
225+
.map(|snapshot| snapshot.snapshot_id())
226+
.collect();
227+
228+
let mut manifest_files = vec![];
229+
for snapshot in snapshots {
230+
let manifest_list = self
231+
.object_cache
232+
.get_manifest_list(&snapshot, &self.table_metadata)
233+
.await?;
234+
for entry in manifest_list.entries() {
235+
if !snapshot_ids.contains(&entry.added_snapshot_id) {
236+
continue;
237+
}
238+
manifest_files.push(entry.clone());
239+
}
240+
}
241+
242+
filter_fn = Some(Arc::new(move |entry: &ManifestEntryRef| {
243+
matches!(entry.status(), ManifestStatus::Added)
244+
&& matches!(entry.data_file().content_type(), DataContentType::Data)
245+
&& (
246+
// Is it possible that the snapshot id here is not contained?
247+
entry.snapshot_id().is_none()
248+
|| snapshot_ids.contains(&entry.snapshot_id().unwrap())
249+
)
250+
}));
251+
252+
manifest_files
253+
} else {
254+
manifest_list.entries().to_vec()
255+
}
256+
};
191257

192258
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
193259
let mut filtered_mfcs = vec![];
194-
for manifest_file in manifest_files {
260+
for manifest_file in &manifest_files {
195261
let tx = if manifest_file.content == ManifestContentType::Deletes {
196262
delete_file_tx.clone()
197263
} else {
@@ -224,6 +290,7 @@ impl PlanContext {
224290
partition_bound_predicate,
225291
tx,
226292
delete_file_idx.clone(),
293+
filter_fn.clone(),
227294
);
228295

229296
filtered_mfcs.push(Ok(mfc));
@@ -238,6 +305,7 @@ impl PlanContext {
238305
partition_filter: Option<Arc<BoundPredicate>>,
239306
sender: Sender<ManifestEntryContext>,
240307
delete_file_index: DeleteFileIndex,
308+
filter_fn: Option<Arc<ManifestEntryFilterFn>>,
241309
) -> ManifestFileContext {
242310
let bound_predicates =
243311
if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =
@@ -260,6 +328,61 @@ impl PlanContext {
260328
field_ids: self.field_ids.clone(),
261329
expression_evaluator_cache: self.expression_evaluator_cache.clone(),
262330
delete_file_index,
331+
filter_fn,
263332
}
264333
}
265334
}
335+
336+
struct Ancestors {
337+
next: Option<SnapshotRef>,
338+
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
339+
}
340+
341+
impl Iterator for Ancestors {
342+
type Item = SnapshotRef;
343+
344+
fn next(&mut self) -> Option<Self::Item> {
345+
let snapshot = self.next.take()?;
346+
let result = snapshot.clone();
347+
self.next = snapshot
348+
.parent_snapshot_id()
349+
.and_then(|id| (self.get_snapshot)(id));
350+
Some(result)
351+
}
352+
}
353+
354+
/// Iterate starting from `snapshot` (inclusive) to the root snapshot.
355+
fn ancestors_of(
356+
table_metadata: &TableMetadataRef,
357+
snapshot: i64,
358+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
359+
if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) {
360+
let table_metadata = table_metadata.clone();
361+
Box::new(Ancestors {
362+
next: Some(snapshot.clone()),
363+
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
364+
})
365+
} else {
366+
Box::new(std::iter::empty())
367+
}
368+
}
369+
370+
/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
371+
fn ancestors_between(
372+
table_metadata: &TableMetadataRef,
373+
latest_snapshot_id: i64,
374+
oldest_snapshot_id: Option<i64>,
375+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
376+
let Some(oldest_snapshot_id) = oldest_snapshot_id else {
377+
return Box::new(ancestors_of(table_metadata, latest_snapshot_id));
378+
};
379+
380+
if latest_snapshot_id == oldest_snapshot_id {
381+
return Box::new(std::iter::empty());
382+
}
383+
384+
Box::new(
385+
ancestors_of(table_metadata, latest_snapshot_id)
386+
.take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id),
387+
)
388+
}

crates/iceberg/src/scan/mod.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod cache;
2121
use cache::*;
2222
mod context;
2323
use context::*;
24+
pub use task::*;
2425
mod task;
2526

2627
use std::sync::Arc;
@@ -29,7 +30,6 @@ use arrow_array::RecordBatch;
2930
use futures::channel::mpsc::{Sender, channel};
3031
use futures::stream::BoxStream;
3132
use futures::{SinkExt, StreamExt, TryStreamExt};
32-
pub use task::*;
3333

3434
use crate::arrow::ArrowReaderBuilder;
3535
use crate::delete_file_index::DeleteFileIndex;
@@ -51,6 +51,10 @@ pub struct TableScanBuilder<'a> {
5151
// Defaults to none which means select all columns
5252
column_names: Option<Vec<String>>,
5353
snapshot_id: Option<i64>,
54+
/// Exclusive. Used for incremental scan.
55+
from_snapshot_id: Option<i64>,
56+
/// Inclusive. Used for incremental scan.
57+
to_snapshot_id: Option<i64>,
5458
batch_size: Option<usize>,
5559
case_sensitive: bool,
5660
filter: Option<Predicate>,
@@ -69,6 +73,8 @@ impl<'a> TableScanBuilder<'a> {
6973
table,
7074
column_names: None,
7175
snapshot_id: None,
76+
from_snapshot_id: None,
77+
to_snapshot_id: None,
7278
batch_size: None,
7379
case_sensitive: true,
7480
filter: None,
@@ -130,6 +136,18 @@ impl<'a> TableScanBuilder<'a> {
130136
self
131137
}
132138

139+
/// Set the starting snapshot id (exclusive) for incremental scan.
140+
pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self {
141+
self.from_snapshot_id = Some(from_snapshot_id);
142+
self
143+
}
144+
145+
/// Set the ending snapshot id (inclusive) for incremental scan.
146+
pub fn to_snapshot_id(mut self, to_snapshot_id: i64) -> Self {
147+
self.to_snapshot_id = Some(to_snapshot_id);
148+
self
149+
}
150+
133151
/// Sets the concurrency limit for both manifest files and manifest
134152
/// entries for this scan
135153
pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
@@ -185,6 +203,25 @@ impl<'a> TableScanBuilder<'a> {
185203

186204
/// Build the table scan.
187205
pub fn build(self) -> Result<TableScan> {
206+
// Validate that we have either a snapshot scan or an incremental scan configuration
207+
if self.from_snapshot_id.is_some() || self.to_snapshot_id.is_some() {
208+
// For incremental scan, we need to_snapshot_id to be set. from_snapshot_id is optional.
209+
if self.to_snapshot_id.is_none() {
210+
return Err(Error::new(
211+
ErrorKind::DataInvalid,
212+
"Incremental scan requires to_snapshot_id to be set",
213+
));
214+
}
215+
216+
// snapshot_id should not be set for incremental scan
217+
if self.snapshot_id.is_some() {
218+
return Err(Error::new(
219+
ErrorKind::DataInvalid,
220+
"snapshot_id should not be set for incremental scan. Use from_snapshot_id and to_snapshot_id instead.",
221+
));
222+
}
223+
}
224+
188225
let snapshot = match self.snapshot_id {
189226
Some(snapshot_id) => self
190227
.table
@@ -214,7 +251,6 @@ impl<'a> TableScanBuilder<'a> {
214251
current_snapshot_id.clone()
215252
}
216253
};
217-
218254
let schema = snapshot.schema(self.table.metadata())?;
219255

220256
// Check that all column names exist in the schema.
@@ -277,6 +313,8 @@ impl<'a> TableScanBuilder<'a> {
277313
snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
278314
object_cache: self.table.object_cache(),
279315
field_ids: Arc::new(field_ids),
316+
from_snapshot_id: self.from_snapshot_id,
317+
to_snapshot_id: self.to_snapshot_id,
280318
partition_filter_cache: Arc::new(PartitionFilterCache::new()),
281319
manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
282320
expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
@@ -353,7 +391,7 @@ impl TableScan {
353391
manifest_entry_data_ctx_tx,
354392
delete_file_idx.clone(),
355393
manifest_entry_delete_ctx_tx,
356-
)?;
394+
).await?;
357395

358396
let mut channel_for_manifest_error = file_scan_task_tx.clone();
359397

@@ -383,7 +421,7 @@ impl TableScan {
383421
spawn(async move {
384422
Self::process_delete_manifest_entry(manifest_entry_context, tx).await
385423
})
386-
.await
424+
.await
387425
},
388426
)
389427
.await;
@@ -406,7 +444,7 @@ impl TableScan {
406444
spawn(async move {
407445
Self::process_data_manifest_entry(manifest_entry_context, tx).await
408446
})
409-
.await
447+
.await
410448
},
411449
)
412450
.await;

0 commit comments

Comments
 (0)