-
Notifications
You must be signed in to change notification settings - Fork 343
feat(reader): Add PartitionSpec support to FileScanTask and RecordBatchTransformer #1821
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…mer. This resolves ~50 tests in the spark-extensions Iceberg Java suite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mbutrovich for this pr, just finised first round of review.
|
Thanks for the first round of feedback @liurenjie1024! I’ll take pass this week. |
…test with DataFusion Comet first.
|
Hopefully I addressed all of your comments @liurenjie1024. The |
d72e629 to
37b1513
Compare
Thanks, I'll take a look today. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mbutrovich for this pr! I think mostly LGTM, we just need adjustment according to the rule
| #[serde(default)] | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| #[serde(serialize_with = "serialize_not_implemented")] | ||
| #[serde(deserialize_with = "deserialize_not_implemented")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not related to this pr, but this is why I don't like pub fields in struct. Adding a field needs to change a lot of unrelated things, also this is error prone since this partition spec is supposed to be the one associated with data file, not default table partition spec.
| |(source_field, source_index)| { | ||
| let name_matches = source_field.name() == &iceberg_field.name; | ||
|
|
||
| if name_mapping.is_some() && !name_matches { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't this check? I think if we already found a field by id, then we should just use this column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see below.
| // 3. "Return the default value if it has a defined initial-default" | ||
| // 4. "Return null in all other cases" | ||
|
|
||
| let column_source = if let Some(constant_value) = constants_map.get(field_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be first step. According the projection rule, this only happens as first check when look up by id failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see below.
I'll take another pass today, thanks for the further comments! It's a non-trivial change and I'm still learning my way around the codebase and spec, so I appreciate your patience. |
|
So I apologize because I kept posting comments as I thought I understood what was happening, then I'd second guess myself and delete the comment. So I just set it to draft while I investigated. As always, I appreciate your patience. Basically all of these changes have to do with Iceberg Java's Here's what I can summarize from my findings today:
You're right that we normally trust field IDs, but we do need the name check when
Without name checking, when looking for Iceberg field_id=2 ("name"), we'd find Parquet field_id=2 ("dept") and read the wrong column. To fix this, we only check names when
You're right that the spec says to check these rules when a field is "not present." However, Java checks partition constants before Parquet field IDs ( The spec's intent is that identity-partitioned fields are "not present" in data files by definition, even if they physically exist in the file. This design was the only way I could get all of the tests in Iceberg Java's test suite to pass, and the subtlety seems to be that the spec is not totally clear on what to do when metadata conflicts between Iceberg and Parquet after a migration or schema change, and I had to choose to call invalid metadata "not present" as Java does. Please feel free to let me know if I misunderstood. I tried to make a test that describes the scenario, and add comments on why the design is the way it is. |
Which issue does this PR close?
Partially address #1749.
What changes are included in this PR?
This PR adds partition spec handling to
FileScanTaskandRecordBatchTransformerto correctly implement the Iceberg spec's "Column Projection" rules for fields "not present" in data files.Problem Statement
Prior to this PR,
iceberg-rust'sFileScanTaskhad no mechanism to pass partition information toRecordBatchTransformer, causing two issues:Incorrect handling of bucket partitioning: Couldn't distinguish identity transforms (which should use partition metadata constants) from non-identity transforms like bucket/truncate/year/month (which must read from data file). For example,
bucket(4, id)storesid_bucket = 2(bucket number) in partition metadata, but actualidvalues (100, 200, 300) are only in the data file. iceberg-rust was incorrectly treating bucket-partitioned source columns as constants, breaking runtime filtering and returning incorrect query results.Field ID conflicts in add_files scenarios: When importing Hive tables via
add_files, partition columns could have field IDs conflicting with Parquet data columns. Example: Parquet has field_id=1→"name", but Iceberg expects field_id=1→"id" (partition). Per spec, thecorrect field is "not present" and requires name mapping fallback.
Iceberg Specification Requirements
Per the Iceberg spec (https://iceberg.apache.org/spec/#column-projection), when a field ID is "not present" in a data file, it must be resolved using these rules:
schema.name-mapping.defaultmetadata to map field id to columns without field idinitial-defaultWhy this matters:
identity(dept)) store actual column values in partition metadata that can be used as constants without reading the data filebucket(4, id),day(timestamp)) store transformed values in partition metadata (e.g., bucket number 2, not the actualidvalues 100, 200, 300) and must read source columns from the data fileChanges Made
FileScanTask(scan/task.rs):partition: Option<Struct>- Partition data from manifest entrypartition_spec: Option<Arc<PartitionSpec>>- For transform-aware constant detectionname_mapping: Option<Arc<NameMapping>>- Name mapping from table metadataconstants_map()function (arrow/record_batch_transformer.rs):PartitionUtil.constantsMap()behaviorTransform::IdentityRecordBatchTransformer(arrow/record_batch_transformer.rs):build_with_partition_data()method to accept partition spec, partition data, and name mappingArrowReader(arrow/reader.rs):build_with_partition_data()when partition information is availablebuild()when not availablescan/context.rs):FileScanTaskfrom manifest entry dataTests Added
bucket_partitioning_reads_source_column_from_file- Verifies that bucket-partitioned source columns are read from data files (not treated as constants from partition metadata)identity_partition_uses_constant_from_metadata- Verifies that identity-transformed fields correctly use partition metadata constantstest_bucket_partitioning_with_renamed_source_column- Verifies field-ID-based mapping works despite column renameadd_files_partition_columns_without_field_ids- Verifies name mapping resolution for Hive table imports without field IDs (spec rule Design of Serialization/Deserialization #2)add_files_with_true_field_id_conflict- Verifies correct field ID conflict detection with name mapping fallback (spec rule Design of Serialization/Deserialization #2)test_all_four_spec_rules- Integration test verifying all 4 spec rules work togetherAre these changes tested?
Yes, there are 6 new unit tests covering all 4 Iceberg spec rules. This also resolved approximately 50 Iceberg Java tests when running with DataFusion Comet's experimental apache/datafusion-comet#2528 PR.