-
Notifications
You must be signed in to change notification settings - Fork 2
feat: support incremental scan between 2 snapshots #13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support incremental scan between 2 snapshots #13
Conversation
d4d27e6 to
a1c426e
Compare
3636dc3 to
a66db13
Compare
8e6aebb to
c8b4714
Compare
Signed-off-by: xxchan <[email protected]>
c8b4714 to
3c0e53e
Compare
| if !snapshot_ids.contains(&manifest_file.added_snapshot_id) { | ||
| continue; | ||
| } | ||
| let manifest = object_cache.get_manifest(manifest_file).await?; | ||
| let entries = manifest.entries().iter().filter(|entry| { | ||
| matches!(entry.status(), ManifestStatus::Added) | ||
| && ( | ||
| // Is it possible that the snapshot id here is not contained? | ||
| entry.snapshot_id().is_none() | ||
| || snapshot_ids.contains(&entry.snapshot_id().unwrap()) | ||
| ) | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think simply checking manifest status Added is not enough, since compaction will add an manifest entry with Added as well, however, we don't want to consume a compaction snapshot. We should detect whether a snapshot is a compaction snapshot. If the snapshot is a compaction, we should skip it. For compaction snapshot, I think we can just check if there is a manifest entry having ManifestStatus::Deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have filtered Append snapshots above:
let append_snapshots =
ancestors_between(table_metadata, latest_snapshot_id, oldest_snapshot_id)
.filter(|snapshot| matches!(snapshot.summary().operation, Operation::Append))The definition of snapshot operation is:
(so compaction snapshot should be Replace)
/// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots.
pub enum Operation {
/// Only data files were added and no files were removed.
Append,
/// Data and delete files were added and removed without changing table data;
/// i.e., compaction, changing the data file format, or relocating data files.
Replace,
/// Data and delete files were added and removed in a logical overwrite operation.
Overwrite,
/// Data files were removed and their contents logically deleted and/or delete files were added to delete rows.
Delete,
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @Li0k
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found a problem: Seems delete files are also included in Append snapshot. 🤔
Encountered an entry for a delete file in a data file manifest:
ManifestEntry { status: Added, snapshot_id: Some(4746599406424318107), sequence_number: Some(4), file_sequence_number: Some(4), data_file: DataFile { content: EqualityDeletes, file_path: "s3://hummock001/iceberg_connection/public/t/data/11-00077-eq-del-0195a491-109c-75b1-a93c-e29d371c2d0b.parquet", file_format: Parquet, partition: Struct { fields: [], null_bitmap: BitVec<usize, bitvec::order::Lsb0> { addr: 0x8, head: 000000, bits: 0, capacity: 0 } [] }, record_count: 2, file_size_in_bytes: 680, column_sizes: {1: 82}, value_counts: {1: 2}, null_value_counts: {1: 0}, nan_value_counts: {}, lower_bounds: {1: Datum { type: Long, literal: Long(524235685507891200) }}, upper_bounds: {1: Datum { type: Long, literal: Long(524235873517572096) }}, key_metadata: None, split_offsets: [4], equality_ids: [1], sort_order_id: None } }The java code uses a ignoreDeleted to further filter the manifest.
https://github.com/apache/iceberg/blob/6ec3de390d3fa6e797c6975b1eaaea41719db0fe/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java#L86
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Only data files were added and no files were removed.
Append,
Indeed, by definition, there should be no delete files in an append snapshot. However, RisingWave's iceberg sink would include delete files to the append snapshot. Shall we change the snapshot operation to Overwrite if it is not an append-only commit? But once we change it to OverWrite, it means we can't read any incremental data between the snapshots. It seems we should add the added data of overwrite snapshot to the incremental read as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some facts I found:
- Changelog scan with deletes are not supported yet: Support changelog scan for table with delete files apache/iceberg#10935
- Changelog scan is not used by flink
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it's used in SparkChangelogTable https://iceberg.apache.org/docs/latest/spark-procedures/?h=changes#carry-over-rows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @ZENOTME If we commit delete files to the iceberg, it seems we should change the iceberg snapshot operation to OverWrite instead of Append.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it's used in SparkChangelogTable https://iceberg.apache.org/docs/latest/spark-procedures/?h=changes#carry-over-rows
I think we should handle OverWrite as well. Ignoring delete files is acceptable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. kind of like "force-append-only"
chenzl25
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Signed-off-by: xxchan <[email protected]>
Signed-off-by: xxchan <[email protected]>
4373389
into
risingwavelabs:dev_rebase_main_20250307
| manifest_entry_data_ctx_tx | ||
| .clone() | ||
| .send(manifest_entry_context) | ||
| .await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was looking over the code, and I’m wondering if there might be a chance for the logic to get stuck here when the channel’s buffer size is smaller than the number of entries.
It seems like the channel could block once the buffer fills up, and since the consumers only start after all entries are sent, it might cause a hang.
I could be missing something, though—do you think this could be an issue, or is there a detail I might not be seeing? I’d really appreciate your thoughts!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, curious how did you find here 👀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I came across your PR while browsing through this repo, and it caught my attention because I'm looking for a way to implement incremental ingestion for an Iceberg table using Rust. Great work on it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I think your insight is correct here. I've refined implementation in #36
* fixup! feat: support incremental scan between 2 snapshots (#13) refactor: refactor incremental scan * clippy Signed-off-by: xxchan <[email protected]> * add some check Signed-off-by: xxchan <[email protected]> --------- Signed-off-by: xxchan <[email protected]>
Related code in Java