Skip to content

Commit fe1fb0f

Browse files
committed
comment address, ut fix
1 parent 09cfb67 commit fe1fb0f

File tree

5 files changed

+22
-10
lines changed

5 files changed

+22
-10
lines changed

python/pyspark/sql/functions.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2766,15 +2766,13 @@ def map_concat(*cols):
27662766
:param cols: list of column names (string) or list of :class:`Column` expressions
27672767
27682768
>>> from pyspark.sql.functions import map_concat
2769-
>>> spark.conf.set("spark.sql.deduplicateMapKey.lastWinsPolicy.enabled", "true")
2770-
>>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as map1, map(3, 'c', 1, 'd') as map2")
2769+
>>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as map1, map(3, 'c') as map2")
27712770
>>> df.select(map_concat("map1", "map2").alias("map3")).show(truncate=False)
27722771
+------------------------+
27732772
|map3 |
27742773
+------------------------+
27752774
|[1 -> d, 2 -> b, 3 -> c]|
27762775
+------------------------+
2777-
>>> spark.conf.unset("spark.sql.deduplicateMapKey.lastWinsPolicy.enabled")
27782776
"""
27792777
sc = SparkContext._active_spark_context
27802778
if len(cols) == 1 and isinstance(cols[0], (list, set)):

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria
5050
private lazy val keyGetter = InternalRow.getAccessor(keyType)
5151
private lazy val valueGetter = InternalRow.getAccessor(valueType)
5252

53+
private val deduplicateWithLastWinsPolicy =
54+
SQLConf.get.getConf(DEDUPLICATE_MAP_KEY_WITH_LAST_WINS_POLICY)
55+
5356
def put(key: Any, value: Any): Unit = {
5457
if (key == null) {
5558
throw new RuntimeException("Cannot use null as map key.")
@@ -65,7 +68,7 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria
6568
keys.append(key)
6669
values.append(value)
6770
} else {
68-
if (!SQLConf.get.getConf(DEDUPLICATE_MAP_KEY_WITH_LAST_WINS_POLICY)) {
71+
if (!deduplicateWithLastWinsPolicy) {
6972
throw new RuntimeException(s"Duplicate map key $key was founded, please set " +
7073
s"${DEDUPLICATE_MAP_KEY_WITH_LAST_WINS_POLICY.key} to true to remove it with " +
7174
"last wins policy.")

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2171,7 +2171,9 @@ object SQLConf {
21712171
buildConf("spark.sql.deduplicateMapKey.lastWinsPolicy.enabled")
21722172
.doc("When true, use last wins policy to remove duplicated map keys in built-in functions, " +
21732173
"this config takes effect in below build-in functions: CreateMap, MapFromArrays, " +
2174-
"MapFromEntries, StringToMap, MapConcat and TransformKeys.")
2174+
"MapFromEntries, StringToMap, MapConcat and TransformKeys. Otherwise, if this is false, " +
2175+
"which is the default, Spark will throw an exception while duplicated map keys are " +
2176+
"detected.")
21752177
.booleanConf
21762178
.createWithDefault(false)
21772179

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,13 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper {
4444
assert(e.getMessage.contains("Cannot use null as map key"))
4545
}
4646

47+
test("fail while duplicated keys detected") {
48+
val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
49+
builder.put(1, 1)
50+
val e = intercept[RuntimeException](builder.put(1, 2))
51+
assert(e.getMessage.contains("Duplicate map key 1 was founded"))
52+
}
53+
4754
test("remove duplicated keys with last wins policy") {
4855
withSQLConf(SQLConf.DEDUPLICATE_MAP_KEY_WITH_LAST_WINS_POLICY.key -> "true") {
4956
val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,13 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
188188
checkExampleSyntax(example)
189189
example.split(" > ").toList.foreach(_ match {
190190
case exampleRe(sql, output) =>
191-
val df = clonedSpark.sql(sql)
192-
val actual = unindentAndTrim(
193-
hiveResultString(df.queryExecution.executedPlan).mkString("\n"))
194-
val expected = unindentAndTrim(output)
195-
assert(actual === expected)
191+
withSQLConf(SQLConf.DEDUPLICATE_MAP_KEY_WITH_LAST_WINS_POLICY.key -> "true") {
192+
val df = clonedSpark.sql(sql)
193+
val actual = unindentAndTrim(
194+
hiveResultString(df.queryExecution.executedPlan).mkString("\n"))
195+
val expected = unindentAndTrim(output)
196+
assert(actual === expected)
197+
}
196198
case _ =>
197199
})
198200
}

0 commit comments

Comments
 (0)