Commit ec545f2
authored
Improve
## Which issue does this PR close?
PR fixes schema mismatch errors (similar to the example shown below)
when using `IcebergCommitExec` with DataFusion. This occurs when
`IcebergCommitExec` is not the top-level plan but is instead wrapped as
the input to another plan node, for example when added by a custom
optimization rule (cache invalidation step for example).
>An internal error occurred. Internal error: PhysicalOptimizer rule
'OutputRequirements' failed. Schema mismatch. Expected original schema:
Schema { fields: [Field { name: "count", data_type: UInt64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {}
}, got new schema: Schema { fields: [Field { name: "r_regionkey",
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {"PARQUET:field_id": "1"} }, Field { name: "r_name",
data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {"PARQUET:field_id": "2"} }, Field { name: "r_comment",
data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {"PARQUET:field_id": "3"} }], metadata: {} }.
This issue was likely caused by a bug in DataFusion's code. Please help
us to resolve this by filing a bug report in our issue tracker:
https://github.com/apache/datafusion/issues
## What changes are included in this PR?
PR updates `compute_properties` logic to use target (output) schema
instead of input schema. Below is example DataFusion `DataSinkExec`
implementation demonstrating that properties must be created based on
target schema, not input.
https://github.com/apache/datafusion/blob/4eacb6046773b759dae0b3d801fe8cb1c6b65c0f/datafusion/datasource/src/sink.rs#L101C1-L117C6
```rust
impl DataSinkExec {
/// Create a plan to write to `sink`
pub fn new(
input: Arc<dyn ExecutionPlan>,
sink: Arc<dyn DataSink>,
sort_order: Option<LexRequirement>,
) -> Self {
let count_schema = make_count_schema();
let cache = Self::create_schema(&input, count_schema);
Self {
input,
sink,
count_schema: make_count_schema(),
sort_order,
cache,
}
}
....
fn properties(&self) -> &PlanProperties {
&self.cache
}
```
## Are these changes tested?
Tested manually, expanded existing test to verify output schema, tested
as part of [Spice Iceberg write automated
tests](https://github.com/spiceai/spiceai/blob/trunk/crates/runtime/tests/iceberg/write/mod.rs)IcebergCommitExec to correctly populate properties/schema (apache#1721)1 parent b4a44a3 commit ec545f2
1 file changed
+7
-2
lines changedLines changed: 7 additions & 2 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
57 | 57 | | |
58 | 58 | | |
59 | 59 | | |
60 | | - | |
| 60 | + | |
| 61 | + | |
| 62 | + | |
61 | 63 | | |
62 | 64 | | |
63 | 65 | | |
64 | 66 | | |
65 | 67 | | |
66 | 68 | | |
67 | | - | |
| 69 | + | |
68 | 70 | | |
69 | 71 | | |
70 | 72 | | |
| |||
469 | 471 | | |
470 | 472 | | |
471 | 473 | | |
| 474 | + | |
| 475 | + | |
| 476 | + | |
472 | 477 | | |
473 | 478 | | |
474 | 479 | | |
| |||
0 commit comments