Skip to content

Commit f91614f

Browse files
committed
[SPARK-17270][SQL] Move object optimization rules into its own file (branch-2.0)
## What changes were proposed in this pull request? As part of breaking Optimizer.scala apart, this patch moves various Dataset object optimization rules into a single file. I'm submitting separate pull requests so we can more easily merge this in branch-2.0 to simplify optimizer backports. This is #14839 but for branch-2.0. ## How was this patch tested? This should be covered by existing tests. Author: Reynold Xin <[email protected]> Closes #14843 from rxin/SPARK-17270-branch-2.0.
1 parent 94d52d7 commit f91614f

File tree

2 files changed

+101
-72
lines changed

2 files changed

+101
-72
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 0 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -186,25 +186,6 @@ object RemoveAliasOnlyProject extends Rule[LogicalPlan] {
186186
}
187187
}
188188

189-
/**
190-
* Removes cases where we are unnecessarily going between the object and serialized (InternalRow)
191-
* representation of data item. For example back to back map operations.
192-
*/
193-
object EliminateSerialization extends Rule[LogicalPlan] {
194-
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
195-
case d @ DeserializeToObject(_, _, s: SerializeFromObject)
196-
if d.outputObjectType == s.inputObjectType =>
197-
// Adds an extra Project here, to preserve the output expr id of `DeserializeToObject`.
198-
// We will remove it later in RemoveAliasOnlyProject rule.
199-
val objAttr =
200-
Alias(s.child.output.head, s.child.output.head.name)(exprId = d.output.head.exprId)
201-
Project(objAttr :: Nil, s.child)
202-
case a @ AppendColumns(_, _, _, s: SerializeFromObject)
203-
if a.deserializer.dataType == s.inputObjectType =>
204-
AppendColumnsWithObject(a.func, s.serializer, a.serializer, s.child)
205-
}
206-
}
207-
208189
/**
209190
* Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins.
210191
*/
@@ -1582,59 +1563,6 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] {
15821563
}
15831564
}
15841565

1585-
/**
1586-
* Typed [[Filter]] is by default surrounded by a [[DeserializeToObject]] beneath it and a
1587-
* [[SerializeFromObject]] above it. If these serializations can't be eliminated, we should embed
1588-
* the deserializer in filter condition to save the extra serialization at last.
1589-
*/
1590-
object EmbedSerializerInFilter extends Rule[LogicalPlan] {
1591-
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1592-
case s @ SerializeFromObject(_, Filter(condition, d: DeserializeToObject))
1593-
// SPARK-15632: Conceptually, filter operator should never introduce schema change. This
1594-
// optimization rule also relies on this assumption. However, Dataset typed filter operator
1595-
// does introduce schema changes in some cases. Thus, we only enable this optimization when
1596-
//
1597-
// 1. either input and output schemata are exactly the same, or
1598-
// 2. both input and output schemata are single-field schema and share the same type.
1599-
//
1600-
// The 2nd case is included because encoders for primitive types always have only a single
1601-
// field with hard-coded field name "value".
1602-
// TODO Cleans this up after fixing SPARK-15632.
1603-
if s.schema == d.child.schema || samePrimitiveType(s.schema, d.child.schema) =>
1604-
1605-
val numObjects = condition.collect {
1606-
case a: Attribute if a == d.output.head => a
1607-
}.length
1608-
1609-
if (numObjects > 1) {
1610-
// If the filter condition references the object more than one times, we should not embed
1611-
// deserializer in it as the deserialization will happen many times and slow down the
1612-
// execution.
1613-
// TODO: we can still embed it if we can make sure subexpression elimination works here.
1614-
s
1615-
} else {
1616-
val newCondition = condition transform {
1617-
case a: Attribute if a == d.output.head => d.deserializer
1618-
}
1619-
val filter = Filter(newCondition, d.child)
1620-
1621-
// Adds an extra Project here, to preserve the output expr id of `SerializeFromObject`.
1622-
// We will remove it later in RemoveAliasOnlyProject rule.
1623-
val objAttrs = filter.output.zip(s.output).map { case (fout, sout) =>
1624-
Alias(fout, fout.name)(exprId = sout.exprId)
1625-
}
1626-
Project(objAttrs, filter)
1627-
}
1628-
}
1629-
1630-
def samePrimitiveType(lhs: StructType, rhs: StructType): Boolean = {
1631-
(lhs, rhs) match {
1632-
case (StructType(Array(f1)), StructType(Array(f2))) => f1.dataType == f2.dataType
1633-
case _ => false
1634-
}
1635-
}
1636-
}
1637-
16381566
/**
16391567
* This rule rewrites predicate sub-queries into left semi/anti joins. The following predicates
16401568
* are supported:
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.expressions._
21+
import org.apache.spark.sql.catalyst.plans.logical._
22+
import org.apache.spark.sql.catalyst.rules._
23+
import org.apache.spark.sql.types.StructType
24+
25+
/*
26+
* This file defines optimization rules related to object manipulation (for the Dataset API).
27+
*/
28+
29+
30+
/**
31+
* Removes cases where we are unnecessarily going between the object and serialized (InternalRow)
32+
* representation of data item. For example back to back map operations.
33+
*/
34+
object EliminateSerialization extends Rule[LogicalPlan] {
35+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
36+
case d @ DeserializeToObject(_, _, s: SerializeFromObject)
37+
if d.outputObjectType == s.inputObjectType =>
38+
// Adds an extra Project here, to preserve the output expr id of `DeserializeToObject`.
39+
// We will remove it later in RemoveAliasOnlyProject rule.
40+
val objAttr =
41+
Alias(s.child.output.head, s.child.output.head.name)(exprId = d.output.head.exprId)
42+
Project(objAttr :: Nil, s.child)
43+
case a @ AppendColumns(_, _, _, s: SerializeFromObject)
44+
if a.deserializer.dataType == s.inputObjectType =>
45+
AppendColumnsWithObject(a.func, s.serializer, a.serializer, s.child)
46+
}
47+
}
48+
49+
50+
/**
51+
* Typed [[Filter]] is by default surrounded by a [[DeserializeToObject]] beneath it and a
52+
* [[SerializeFromObject]] above it. If these serializations can't be eliminated, we should embed
53+
* the deserializer in filter condition to save the extra serialization at last.
54+
*/
55+
object EmbedSerializerInFilter extends Rule[LogicalPlan] {
56+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
57+
case s @ SerializeFromObject(_, Filter(condition, d: DeserializeToObject))
58+
// SPARK-15632: Conceptually, filter operator should never introduce schema change. This
59+
// optimization rule also relies on this assumption. However, Dataset typed filter operator
60+
// does introduce schema changes in some cases. Thus, we only enable this optimization when
61+
//
62+
// 1. either input and output schemata are exactly the same, or
63+
// 2. both input and output schemata are single-field schema and share the same type.
64+
//
65+
// The 2nd case is included because encoders for primitive types always have only a single
66+
// field with hard-coded field name "value".
67+
// TODO Cleans this up after fixing SPARK-15632.
68+
if s.schema == d.child.schema || samePrimitiveType(s.schema, d.child.schema) =>
69+
70+
val numObjects = condition.collect {
71+
case a: Attribute if a == d.output.head => a
72+
}.length
73+
74+
if (numObjects > 1) {
75+
// If the filter condition references the object more than one times, we should not embed
76+
// deserializer in it as the deserialization will happen many times and slow down the
77+
// execution.
78+
// TODO: we can still embed it if we can make sure subexpression elimination works here.
79+
s
80+
} else {
81+
val newCondition = condition transform {
82+
case a: Attribute if a == d.output.head => d.deserializer
83+
}
84+
val filter = Filter(newCondition, d.child)
85+
86+
// Adds an extra Project here, to preserve the output expr id of `SerializeFromObject`.
87+
// We will remove it later in RemoveAliasOnlyProject rule.
88+
val objAttrs = filter.output.zip(s.output).map { case (fout, sout) =>
89+
Alias(fout, fout.name)(exprId = sout.exprId)
90+
}
91+
Project(objAttrs, filter)
92+
}
93+
}
94+
95+
def samePrimitiveType(lhs: StructType, rhs: StructType): Boolean = {
96+
(lhs, rhs) match {
97+
case (StructType(Array(f1)), StructType(Array(f2))) => f1.dataType == f2.dataType
98+
case _ => false
99+
}
100+
}
101+
}

0 commit comments

Comments
 (0)