Skip to content

Commit d8fa6e4

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-6621][SPARK-42789][SQL] Rewrite multiple GetJsonObjects to a JsonTuple if their json expressions are the same (#1286)
* RewriteGetJsonObject * Fix * Fix * fix * fix
1 parent 5b0fffe commit d8fa6e4

File tree

5 files changed

+268
-0
lines changed

5 files changed

+268
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,17 @@ case class GetJsonObject(json: Expression, path: Expression)
142142
val useExternalJsonPathLib = SQLConf.get.getConf(SQLConf.USE_EXTERNAL_JSON_PATH_LIB)
143143
@transient private lazy val om = new ObjectMapper
144144
@transient private lazy val parsedPath = parsePath(path.eval().asInstanceOf[UTF8String])
145+
// Used to rewrite GetJsonObject to JsonTuple
146+
@transient private[catalyst] lazy val rewriteToJsonTuplePath: Option[Expression] = {
147+
if (path.foldable) {
148+
parsedPath match {
149+
case Some(List(Key, Named(name))) => Some(Literal.create(name, StringType))
150+
case _ => None
151+
}
152+
} else {
153+
None
154+
}
155+
}
145156

146157
override def eval(input: InternalRow): Any = {
147158
val jsonStr = json.eval(input).asInstanceOf[UTF8String]

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
213213
Batch("Partial Aggregation Optimization", fixedPoint,
214214
PushPartialAggregationThroughJoin,
215215
DeduplicateRightSideOfLeftSemiAntiJoin) :+
216+
Batch("Rewrite GetJsonObject", Once,
217+
RewriteGetJsonObject) :+
216218
Batch("Object Expressions Optimization", fixedPoint,
217219
EliminateMapObjects,
218220
CombineTypedFilters,
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.sql.catalyst.expressions._
23+
import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, Project}
24+
import org.apache.spark.sql.catalyst.rules.Rule
25+
import org.apache.spark.sql.internal.SQLConf
26+
import org.apache.spark.sql.types._
27+
28+
object RewriteGetJsonObject extends Rule[LogicalPlan] {
29+
def apply(plan: LogicalPlan): LogicalPlan = if (conf.getConf(SQLConf.REWRITE_TO_JSONTUPLE)) {
30+
plan.transform {
31+
case p: Project =>
32+
val getJsonObjects = p.projectList.flatMap {
33+
_.collect {
34+
case gjo: GetJsonObject if gjo.rewriteToJsonTuplePath.nonEmpty => gjo
35+
}
36+
}
37+
38+
val groupedGetJsonObjects = getJsonObjects.groupBy(_.json).filter(_._2.size > 1)
39+
if (groupedGetJsonObjects.nonEmpty) {
40+
var newChild = p.child
41+
val keyValues: mutable.Map[Expression, AttributeReference] = mutable.Map.empty
42+
groupedGetJsonObjects.foreach {
43+
case (json, getJsonObjects) =>
44+
val generatorOutput = getJsonObjects.map { j =>
45+
val attr = AttributeReference(j.rewriteToJsonTuplePath.get.toString, StringType)()
46+
keyValues.put(j, attr)
47+
attr
48+
}
49+
newChild = Generate(
50+
JsonTuple(json +: getJsonObjects.map(_.rewriteToJsonTuplePath.get)),
51+
Nil,
52+
outer = false,
53+
Some(json.sql),
54+
generatorOutput,
55+
newChild)
56+
}
57+
58+
val newProjectList = p.projectList.map {
59+
_.transformUp {
60+
case gjo: GetJsonObject => keyValues.getOrElse(gjo, gjo)
61+
}.asInstanceOf[NamedExpression]
62+
}
63+
p.copy(newProjectList, newChild)
64+
} else {
65+
p
66+
}
67+
}
68+
} else {
69+
plan
70+
}
71+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1234,6 +1234,13 @@ object SQLConf {
12341234
.booleanConf
12351235
.createWithDefault(false)
12361236

1237+
val REWRITE_TO_JSONTUPLE = buildConf("spark.sql.optimizer.rewriteToJsonTuple")
1238+
.internal()
1239+
.doc("When true, rewrite GetJsonObject To JsonTuple.")
1240+
.version("3.0.0")
1241+
.booleanConf
1242+
.createWithDefault(false)
1243+
12371244
val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord")
12381245
.doc("The name of internal column for storing raw/un-parsed JSON and CSV records that fail " +
12391246
"to parse.")
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.dsl.expressions._
21+
import org.apache.spark.sql.catalyst.dsl.plans._
22+
import org.apache.spark.sql.catalyst.expressions.{Concat, GetJsonObject, JsonTuple}
23+
import org.apache.spark.sql.catalyst.plans.PlanTest
24+
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
25+
import org.apache.spark.sql.catalyst.rules.RuleExecutor
26+
import org.apache.spark.sql.internal.SQLConf
27+
28+
class RewriteGetJsonObjectSuite extends PlanTest {
29+
30+
object Optimize extends RuleExecutor[LogicalPlan] {
31+
val batches =
32+
Batch("Rewrite GetJsonObject", Once,
33+
RewriteGetJsonObject) :: Nil
34+
}
35+
36+
private val testRelation = LocalRelation('a.string, 'b.string, 'c.string)
37+
38+
test("Rewrite to a single JsonTuple") {
39+
withSQLConf(SQLConf.REWRITE_TO_JSONTUPLE.key -> "true") {
40+
val query = testRelation
41+
.select(
42+
GetJsonObject($"a", stringToLiteral("$.c1")).as("c1"),
43+
GetJsonObject($"a", stringToLiteral("$.c2")).as("c2"),
44+
GetJsonObject($"a", stringToLiteral("$.c3")).as("c3"))
45+
46+
val correctAnswer = testRelation
47+
.generate(
48+
JsonTuple(Seq($"a", stringToLiteral("c1"), stringToLiteral("c2"), stringToLiteral("c3"))),
49+
Nil,
50+
alias = Some("`a`"),
51+
outputNames = Seq("c1", "c2", "c3"))
52+
.select($"c1".as("c1"), $"c2".as("c2"), $"c3".as("c3"))
53+
54+
val getJsonObjects = query.expressions.flatMap {
55+
_.collect {
56+
case gjo: GetJsonObject if gjo.rewriteToJsonTuplePath.nonEmpty => gjo
57+
}
58+
}
59+
assert(getJsonObjects.forall(_.rewriteToJsonTuplePath.nonEmpty))
60+
comparePlans(Optimize.execute(query.analyze), correctAnswer.analyze)
61+
}
62+
}
63+
64+
test("Rewrite to multiple JsonTuples") {
65+
withSQLConf(SQLConf.REWRITE_TO_JSONTUPLE.key -> "true") {
66+
val query = testRelation
67+
.select(
68+
GetJsonObject($"a", stringToLiteral("$.c1")).as("c1"),
69+
GetJsonObject($"a", stringToLiteral("$.c2")).as("c2"),
70+
GetJsonObject($"b", stringToLiteral("$.c1")).as("c3"),
71+
GetJsonObject($"b", stringToLiteral("$.c2")).as("c4"),
72+
GetJsonObject($"c", stringToLiteral("$.c1")).as("c5"))
73+
74+
val correctAnswer = testRelation
75+
.generate(
76+
JsonTuple(Seq($"a", stringToLiteral("c1"), stringToLiteral("c2"))),
77+
Nil,
78+
alias = Some("`a`"),
79+
outputNames = Seq("c1", "c2"))
80+
.generate(
81+
JsonTuple(Seq($"b", stringToLiteral("c1"), stringToLiteral("c2"))),
82+
Nil,
83+
alias = Some("`b`"),
84+
outputNames = Seq("c1", "c2"))
85+
.select(
86+
$"`a`.c1".as("c1"), $"`a`.c2".as("c2"),
87+
$"`b`.c1".as("c3"), $"`b`.c2".as("c4"),
88+
GetJsonObject($"c", stringToLiteral("$.c1")).as("c5"))
89+
90+
val getJsonObjects = query.expressions.flatMap {
91+
_.collect {
92+
case gjo: GetJsonObject if gjo.rewriteToJsonTuplePath.nonEmpty => gjo
93+
}
94+
}
95+
assert(getJsonObjects.forall(_.rewriteToJsonTuplePath.nonEmpty))
96+
comparePlans(Optimize.execute(query.analyze), correctAnswer.analyze)
97+
}
98+
}
99+
100+
test("Rewrite GetJsonObject with other UDFs") {
101+
withSQLConf(SQLConf.REWRITE_TO_JSONTUPLE.key -> "true") {
102+
val query = testRelation
103+
.select(
104+
Concat(Seq(GetJsonObject($"a", stringToLiteral("$.c1")),
105+
GetJsonObject($"a", stringToLiteral("$.c2")),
106+
GetJsonObject($"a", stringToLiteral("$.c3")))).as("c"))
107+
108+
val correctAnswer = testRelation
109+
.generate(
110+
JsonTuple(Seq($"a", stringToLiteral("c1"), stringToLiteral("c2"), stringToLiteral("c3"))),
111+
Nil,
112+
alias = Some("`a`"),
113+
outputNames = Seq("c1", "c2", "c3"))
114+
.select(Concat(Seq($"c1", $"c2", $"c3")).as("c"))
115+
116+
val getJsonObjects = query.expressions.flatMap {
117+
_.collect {
118+
case gjo: GetJsonObject if gjo.rewriteToJsonTuplePath.nonEmpty => gjo
119+
}
120+
}
121+
assert(getJsonObjects.forall(_.rewriteToJsonTuplePath.nonEmpty))
122+
comparePlans(Optimize.execute(query.analyze), correctAnswer.analyze)
123+
}
124+
}
125+
126+
test("Do not rewrite if parsed path is empty") {
127+
withSQLConf(SQLConf.REWRITE_TO_JSONTUPLE.key -> "true") {
128+
val query = testRelation
129+
.select(
130+
GetJsonObject($"a", stringToLiteral("c1")).as("c1"),
131+
GetJsonObject($"a", stringToLiteral("c2")).as("c2"),
132+
GetJsonObject($"a", stringToLiteral("c3")).as("c3"))
133+
134+
val getJsonObjects = query.expressions.flatMap {
135+
_.collect {
136+
case gjo: GetJsonObject if gjo.rewriteToJsonTuplePath.nonEmpty => gjo
137+
}
138+
}
139+
assert(getJsonObjects.forall(_.rewriteToJsonTuplePath.isEmpty))
140+
comparePlans(Optimize.execute(query.analyze), query.analyze)
141+
}
142+
}
143+
144+
test("Do not rewrite if parsed path is not Key and Named") {
145+
withSQLConf(SQLConf.REWRITE_TO_JSONTUPLE.key -> "true") {
146+
val query = testRelation
147+
.select(
148+
GetJsonObject($"a", stringToLiteral("$[0]")).as("c1"),
149+
GetJsonObject($"a", stringToLiteral("$[1]")).as("c2"))
150+
151+
val getJsonObjects = query.expressions.flatMap {
152+
_.collect {
153+
case gjo: GetJsonObject if gjo.rewriteToJsonTuplePath.nonEmpty => gjo
154+
}
155+
}
156+
assert(getJsonObjects.forall(_.rewriteToJsonTuplePath.isEmpty))
157+
comparePlans(Optimize.execute(query.analyze), query.analyze)
158+
}
159+
}
160+
161+
test("Do not rewrite if parsed path contains multiple Named") {
162+
withSQLConf(SQLConf.REWRITE_TO_JSONTUPLE.key -> "true") {
163+
val query = testRelation
164+
.select(
165+
GetJsonObject($"a", stringToLiteral("$.a.b")).as("c1"),
166+
GetJsonObject($"a", stringToLiteral("$.c.d")).as("c2"))
167+
168+
val getJsonObjects = query.expressions.flatMap {
169+
_.collect {
170+
case gjo: GetJsonObject if gjo.rewriteToJsonTuplePath.nonEmpty => gjo
171+
}
172+
}
173+
assert(getJsonObjects.forall(_.rewriteToJsonTuplePath.isEmpty))
174+
comparePlans(Optimize.execute(query.analyze), query.analyze)
175+
}
176+
}
177+
}

0 commit comments

Comments
 (0)