Conversation
74f38ae to
f1c99cd
Compare
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
| /// which is much more efficient than scanning the entire table when you only need | ||
| /// the new data. | ||
| #[tokio::main] | ||
| async fn main() -> Result<(), Box<dyn std::error::Error>> { |
There was a problem hiding this comment.
Happy to remove this if we don't want it, claude wrote it for me.
|
I paired up with Claude to review for a while today, cross-referencing the Java behavior and spec. Suggestion 3 is the one I feel the least strongly about, and would benefit from another set of eyes. Here are the compiled notes: 1. Separate scan type, don't bolt onto
|
| Commit | PR | Change |
|---|---|---|
beed94dc2134 |
iceberg#4580 | IncrementalAppendScan interface |
7f472ebbec19 |
iceberg#4744 | BaseIncrementalAppendScan impl |
c69a3dd6171e |
iceberg#4870 | IncrementalScan<T> parent, IncrementalChangelogScan |
40de4bc7dc12 |
iceberg#5382 | BaseIncrementalScan extracted |
80ec14ff363b |
iceberg#5577 | Deprecated appendsBetween/appendsAfter on TableScan, removal in 2.0.0 |
Entry point moved from TableScan.appendsBetween() to Table.newIncrementalAppendScan() (api/src/main/java/org/apache/iceberg/Table.java:71-84).
2. Reuse ancestors_between
The spec defines snapshot ancestry via parent-snapshot-id (Spec: Snapshots). This codebase already has the traversal in crates/iceberg/src/util/snapshot.rs:51-61:
pub fn ancestors_between(
table_metadata: &TableMetadataRef,
latest_snapshot_id: i64,
oldest_snapshot_id: Option<i64>,
) -> impl Iterator<Item = SnapshotRef> + SendReturns (oldest, latest], which is already exclusive-start, inclusive-end: exactly the default incremental range.
SnapshotRange::build() reimplements this walk from scratch. It could use the existing utility instead:
let snapshots_in_range: Vec<SnapshotRef> = if from_inclusive {
ancestors_between(&metadata, to_id, from_parent_id).collect()
} else {
ancestors_between(&metadata, to_id, Some(from_id)).collect()
};
let snapshot_ids: HashSet<i64> = snapshots_in_range.iter()
.filter(|s| s.summary().operation == Operation::Append)
.map(|s| s.snapshot_id())
.collect();One thing to watch: ancestors_between doesn't validate that oldest_snapshot_id is actually in the chain. If it's not, you get the full chain to root. Worth verifying ancestry explicitly (check the walk terminated at the expected snapshot, not root).
Java equivalent: SnapshotUtil.ancestorsBetween (core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:212-229).
3. Skip non-APPEND snapshots, don't error
The spec defines four operation values: append, replace, overwrite, delete (Spec: Snapshot Summary). The spec doesn't define incremental scan semantics, so we have to decide what to do with non-APPEND snapshots in the range.
The PR returns ErrorKind::FeatureUnsupported for any non-APPEND snapshot. This seems too strict. A table that had a compaction (replace) in its history would break incremental append scans, even though the compaction added no logical data.
It would be better to silently skip non-APPEND snapshots. Only Operation::Append contributes new data files to an append scan:
Replace= compaction/rewrite, no new logical dataDelete= removes dataOverwrite= mixed add/remove, ambiguous for append-only
let append_snapshot_ids: HashSet<i64> = ancestors_between(...)
.filter(|s| s.summary().operation == Operation::Append)
.map(|s| s.snapshot_id())
.collect();Properly surfacing Overwrite changes would be the domain of an IncrementalChangelogScan, which could be a separate scan type down the road.
Java's BaseIncrementalAppendScan (core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java:105-117) silently skips all non-APPEND snapshots. The deprecated IncrementalDataTableScan errored on OVERWRITE (lines 138-143) but that behavior was not carried forward.
4. Consider manifest-level filtering
The spec says each manifest list entry tracks the snapshot that created it via added-snapshot-id (Spec: Manifest Lists). A manifest whose added-snapshot-id is outside the scan range can't contain relevant ADDED entries. Skipping it avoids the I/O and parse cost entirely.
The PR only filters at the entry level (crates/iceberg/src/scan/context.rs, fetch_manifest_and_stream_manifest_entries). Every manifest in the manifest list is fetched and parsed, including inherited ones that can't contain relevant entries. For tables with many manifests, this matters.
It might be worth adding a check in PlanContext::build_manifest_file_contexts (crates/iceberg/src/scan/context.rs:195-254). ManifestFile already has the field (crates/iceberg/src/spec/manifest_list.rs):
// Inside the loop over manifest_files:
if let Some(ref snapshot_range) = self.snapshot_range {
if manifest_file.content == ManifestContentType::Deletes {
continue;
}
if !snapshot_range.contains(manifest_file.added_snapshot_id) {
continue;
}
}Keep the entry-level filter too; a manifest can contain entries from multiple snapshots. But the manifest-level check avoids unnecessary I/O.
Java applies both: .filterManifests(m -> snapshotIds.contains(m.snapshotId())) (line 70) then .filterManifestEntries(...) (line 81), plus .ignoreDeleted(). See core/src/main/java/org/apache/iceberg/ManifestGroup.java lines 121, 132, 311-320, 375.
5. DataFusion: consider a scan range enum
The PR threads from_snapshot_id: Option<i64> and from_snapshot_inclusive: bool through IcebergTableScan::new(), get_batch_stream(), IcebergStaticTableProvider, and IcebergTableProvider (crates/integrations/datafusion/src/physical_plan/scan.rs:60-84). This triggers #[allow(clippy::too_many_arguments)].
An enum could clean this up:
pub(crate) enum ScanRange {
CurrentSnapshot,
PointInTime(i64),
Incremental { from: i64, to: Option<i64>, from_inclusive: bool },
}This would replace three parameters with one across all four structs, and would collapse IcebergStaticTableProvider's three near-identical constructors (try_new_incremental, try_new_incremental_inclusive, try_new_appends_after) into one.
6. Tests don't verify filtering behavior
Tautological assertions. These tests assert result.is_ok() || result.is_err(), which is always true:
test_appends_after_convenience_method(crates/iceberg/src/scan/mod.rs)test_appends_between_convenience_methodtest_incremental_scan_from_snapshot_inclusive
Swallowed errors. DataFusion tests use if let Ok(provider) = provider { ... }, passing silently when construction fails:
test_static_provider_incremental_creates_scan(crates/integrations/datafusion/src/table/mod.rs)test_static_provider_incremental_inclusivetest_static_provider_appends_after
No end-to-end test. No test checks that an incremental scan actually returns only files from the expected snapshot range.
Something along the lines of test_plan_files_no_deletions (crates/iceberg/src/scan/mod.rs:1278) would work well here:
- Extend
TableTestFixturewith multiple APPEND snapshots and manifest entries with explicitsnapshot_idandManifestStatusvalues - Run an incremental scan between two snapshots
- Collect
FileScanTasks and assert only files from the expected snapshots come back - Test edge cases: inclusive vs exclusive, same from/to, non-APPEND snapshots skipped
TableTestFixture::new_with_deep_history() already creates 5 chained snapshots (S1 through S5), which could be extended with manifest files and operation types in the test metadata JSON.
7. Minor
- Example file (
crates/examples/src/datafusion_incremental_read.rs): author says "claude wrote it for me." Feels verbose; could probably be trimmed or dropped if the API doc comments cover the usage well enough. - Comment style:
/// Optional snapshot range for incremental scansexplains what, not why. Might want to match codebase convention. - Unrelated fix:
context.rschangesself.case_sensitivetocase_sensitiveafter destructuring. Correct but unrelated; might be better as a separate commit.
|
Thanks for the review Matt:
|
|
Moving conversation here #2337 since I can't seem to re-open this |
Which issue does this PR close?
What changes are included in this PR?
Link to spark docs
This PR adds incremental snapshot scanning support to iceberg-rust, similar to the Java client's
IncrementalDataTableScan. This feature allows reading only the data files that were added between two snapshots, which is essential for:Core Iceberg Changes (
crates/iceberg/src/scan/)New API on
TableScanBuilder:Implementation details:
SnapshotRangestruct to validate snapshot ancestry and track snapshot IDs in rangeManifestFileContextto filter entries withstatus=ADDEDandsnapshot_idwithin rangeDataFusion Integration (
crates/integrations/datafusion/)New constructors on
IcebergStaticTableProvider:Example Added (
crates/examples/)Added
datafusion_incremental_read.rsexample demonstrating:appends_after()for checkpoint-based processingFiles Changed
crates/iceberg/src/scan/mod.rsSnapshotRange, incremental scan methods onTableScanBuildercrates/iceberg/src/scan/context.rssnapshot_rangeto contexts, manifest entry filteringcrates/integrations/datafusion/src/table/mod.rsIcebergStaticTableProvidercrates/integrations/datafusion/src/physical_plan/scan.rsIcebergTableScanandget_batch_stream()crates/examples/src/datafusion_incremental_read.rscrates/examples/Cargo.tomlAre these changes tested?
Yes
Core Iceberg Tests (
crates/iceberg/src/scan/mod.rs)DataFusion Integration Tests (
crates/integrations/datafusion/src/table/mod.rs)