Skip to content

Commit 3bc2f97

Browse files
authored
Spark 3.2, 3.3: Fix nullability propagation for MergeRows node (#5679)
1 parent ecf004b commit 3bc2f97

File tree

4 files changed

+72
-2
lines changed

4 files changed

+72
-2
lines changed

spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand {
360360
}.toMap
361361

362362
attrs.zipWithIndex.map { case (attr, index) =>
363-
attr.withNullability(nullabilityMap(index))
363+
AttributeReference(attr.name, attr.dataType, nullabilityMap(index), attr.metadata)()
364364
}
365365
}
366366

spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import java.util.Map;
2222
import org.apache.iceberg.TableProperties;
23+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
2324
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
25+
import org.junit.Test;
2426

2527
public class TestCopyOnWriteMerge extends TestMerge {
2628

@@ -38,4 +40,37 @@ public TestCopyOnWriteMerge(
3840
protected Map<String, String> extraTableProperties() {
3941
return ImmutableMap.of(TableProperties.MERGE_MODE, "copy-on-write");
4042
}
43+
44+
@Test
45+
public void testMergeWithTableWithNonNullableColumn() {
46+
createAndInitTable(
47+
"id INT NOT NULL, dep STRING",
48+
"{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" + "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
49+
50+
createOrReplaceView(
51+
"source",
52+
"id INT NOT NULL, dep STRING",
53+
"{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
54+
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n"
55+
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
56+
57+
sql(
58+
"MERGE INTO %s AS t USING source AS s "
59+
+ "ON t.id == s.id "
60+
+ "WHEN MATCHED AND t.id = 1 THEN "
61+
+ " UPDATE SET * "
62+
+ "WHEN MATCHED AND t.id = 6 THEN "
63+
+ " DELETE "
64+
+ "WHEN NOT MATCHED AND s.id = 2 THEN "
65+
+ " INSERT *",
66+
tableName);
67+
68+
ImmutableList<Object[]> expectedRows =
69+
ImmutableList.of(
70+
row(1, "emp-id-1"), // updated
71+
row(2, "emp-id-2") // new
72+
);
73+
assertEquals(
74+
"Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
75+
}
4176
}

spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelIcebergCommand {
360360
}.toMap
361361

362362
attrs.zipWithIndex.map { case (attr, index) =>
363-
attr.withNullability(nullabilityMap(index))
363+
AttributeReference(attr.name, attr.dataType, nullabilityMap(index), attr.metadata)()
364364
}
365365
}
366366

spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import java.util.Map;
2222
import org.apache.iceberg.TableProperties;
23+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
2324
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
25+
import org.junit.Test;
2426

2527
public class TestCopyOnWriteMerge extends TestMerge {
2628

@@ -38,4 +40,37 @@ public TestCopyOnWriteMerge(
3840
protected Map<String, String> extraTableProperties() {
3941
return ImmutableMap.of(TableProperties.MERGE_MODE, "copy-on-write");
4042
}
43+
44+
@Test
45+
public void testMergeWithTableWithNonNullableColumn() {
46+
createAndInitTable(
47+
"id INT NOT NULL, dep STRING",
48+
"{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" + "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
49+
50+
createOrReplaceView(
51+
"source",
52+
"id INT NOT NULL, dep STRING",
53+
"{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
54+
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n"
55+
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
56+
57+
sql(
58+
"MERGE INTO %s AS t USING source AS s "
59+
+ "ON t.id == s.id "
60+
+ "WHEN MATCHED AND t.id = 1 THEN "
61+
+ " UPDATE SET * "
62+
+ "WHEN MATCHED AND t.id = 6 THEN "
63+
+ " DELETE "
64+
+ "WHEN NOT MATCHED AND s.id = 2 THEN "
65+
+ " INSERT *",
66+
tableName);
67+
68+
ImmutableList<Object[]> expectedRows =
69+
ImmutableList.of(
70+
row(1, "emp-id-1"), // updated
71+
row(2, "emp-id-2") // new
72+
);
73+
assertEquals(
74+
"Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
75+
}
4176
}

0 commit comments

Comments
 (0)