Skip to content

Commit c8c6be2

Browse files
wForgetbeliefer
authored andcommitted
[SPARK-48922][SQL][3.5] Avoid redundant array transform of identical expression for map type
### What changes were proposed in this pull request? Backports #50245 to 3.5 Similar to #47843, this patch avoids ArrayTransform in `resolveMapType` function if the resolution expression is the same as input param. ### Why are the changes needed? My previous pr #47381 was not merged, but I still think it is an optimization, so I reopened it. During the upgrade from Spark 3.1.1 to 3.5.0, I found a performance regression in map type inserts. There are some extra conversion expressions in project before insert, which doesn't seem to be always necessary. ``` map_from_arrays(transform(map_keys(map#516), lambdafunction(lambda key#652, lambda key#652, false)), transform(map_values(map#516), lambdafunction(lambda value#654, lambda value#654, false))) AS map#656 ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #50245 from wForget/SPARK-48922. Authored-by: wforget <643348094qq.com> Signed-off-by: beliefer <beliefer163.com> (cherry picked from commit 1be108e) Closes #50265 from wForget/SPARK-48922-3.5. Authored-by: wforget <[email protected]> Signed-off-by: beliefer <[email protected]>
1 parent 4231d58 commit c8c6be2

File tree

2 files changed

+53
-5
lines changed

2 files changed

+53
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -418,11 +418,28 @@ object TableOutputResolver {
418418
}
419419

420420
if (resKey.length == 1 && resValue.length == 1) {
421-
val keyFunc = LambdaFunction(resKey.head, Seq(keyParam))
422-
val valueFunc = LambdaFunction(resValue.head, Seq(valueParam))
423-
val newKeys = ArrayTransform(MapKeys(nullCheckedInput), keyFunc)
424-
val newValues = ArrayTransform(MapValues(nullCheckedInput), valueFunc)
425-
Some(Alias(MapFromArrays(newKeys, newValues), expected.name)())
421+
// If the key and value expressions have not changed, we just check original map field.
422+
// Otherwise, we construct a new map by adding transformations to the keys and values.
423+
if (resKey.head == keyParam && resValue.head == valueParam) {
424+
Some(
425+
Alias(nullCheckedInput, expected.name)(
426+
nonInheritableMetadataKeys =
427+
Seq(CharVarcharUtils.CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)))
428+
} else {
429+
val newKeys = if (resKey.head != keyParam) {
430+
val keyFunc = LambdaFunction(resKey.head, Seq(keyParam))
431+
ArrayTransform(MapKeys(nullCheckedInput), keyFunc)
432+
} else {
433+
MapKeys(nullCheckedInput)
434+
}
435+
val newValues = if (resValue.head != valueParam) {
436+
val valueFunc = LambdaFunction(resValue.head, Seq(valueParam))
437+
ArrayTransform(MapValues(nullCheckedInput), valueFunc)
438+
} else {
439+
MapValues(nullCheckedInput)
440+
}
441+
Some(Alias(MapFromArrays(newKeys, newValues), expected.name)())
442+
}
426443
} else {
427444
None
428445
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,37 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
334334
hasTransform = true)
335335
}
336336

337+
test("SPARK-48922: Avoid redundant array transform of identical expression for map type") {
338+
def assertMapField(fromType: MapType, toType: MapType, transformNum: Int): Unit = {
339+
val table = TestRelation(Seq($"a".int, Symbol("map").map(toType)))
340+
val query = TestRelation(Seq(Symbol("map").map(fromType), $"a".int))
341+
342+
val writePlan = byName(table, query).analyze
343+
344+
assertResolved(writePlan)
345+
checkAnalysis(writePlan, writePlan)
346+
347+
val transforms = writePlan.children.head.expressions.flatMap { e =>
348+
e.flatMap {
349+
case t: ArrayTransform => Some(t)
350+
case _ => None
351+
}
352+
}
353+
assert(transforms.size == transformNum)
354+
}
355+
356+
assertMapField(MapType(LongType, StringType), MapType(LongType, StringType), 0)
357+
assertMapField(
358+
MapType(LongType, new StructType().add("x", "int").add("y", "int")),
359+
MapType(LongType, new StructType().add("y", "int").add("x", "byte")),
360+
1)
361+
assertMapField(MapType(LongType, LongType), MapType(IntegerType, LongType), 1)
362+
assertMapField(
363+
MapType(LongType, new StructType().add("x", "int").add("y", "int")),
364+
MapType(IntegerType, new StructType().add("y", "int").add("x", "byte")),
365+
2)
366+
}
367+
337368
test("SPARK-33136: output resolved on complex types for V2 write commands") {
338369
def assertTypeCompatibility(name: String, fromType: DataType, toType: DataType): Unit = {
339370
val table = TestRelation(StructType(Seq(StructField("a", toType))))

0 commit comments

Comments
 (0)