Skip to content

Commit 44fcf99

Browse files
committed
[SPARK-12873][SQL] Add more comment in HiveTypeCoercion for type widening
I was reading this part of the analyzer code again and got confused by the difference between findWiderTypeForTwo and findTightestCommonTypeOfTwo. I also simplified WidenSetOperationTypes to make it a lot simpler. The easiest way to review this one is to just read the original code, and the new code. The logic is super simple. Author: Reynold Xin <[email protected]> Closes #10802 from rxin/SPARK-12873.
1 parent db9a860 commit 44fcf99

File tree

2 files changed

+49
-40
lines changed

2 files changed

+49
-40
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala

Lines changed: 48 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,20 @@ import org.apache.spark.sql.types._
2727

2828

2929
/**
30-
* A collection of [[Rule Rules]] that can be used to coerce differing types that
31-
* participate in operations into compatible ones. Most of these rules are based on Hive semantics,
32-
* but they do not introduce any dependencies on the hive codebase. For this reason they remain in
33-
* Catalyst until we have a more standard set of coercions.
30+
* A collection of [[Rule Rules]] that can be used to coerce differing types that participate in
31+
* operations into compatible ones.
32+
*
33+
* Most of these rules are based on Hive semantics, but they do not introduce any dependencies on
34+
* the hive codebase.
35+
*
36+
* Notes about type widening / tightest common types: Broadly, there are two cases when we need
37+
* to widen data types (e.g. union, binary comparison). In case 1, we are looking for a common
38+
* data type for two or more data types, and in this case no loss of precision is allowed. Examples
39+
* include type inference in JSON (e.g. what's the column's data type if one row is an integer
40+
* while the other row is a long?). In case 2, we are looking for a widened data type with
41+
* some acceptable loss of precision (e.g. there is no common type for double and decimal because
42+
* double's range is larger than decimal, and yet decimal is more precise than double, but in
43+
* union we would cast the decimal into double).
3444
*/
3545
object HiveTypeCoercion {
3646

@@ -63,6 +73,8 @@ object HiveTypeCoercion {
6373
DoubleType)
6474

6575
/**
76+
* Case 1 type widening (see the classdoc comment above for HiveTypeCoercion).
77+
*
6678
* Find the tightest common type of two types that might be used in a binary expression.
6779
* This handles all numeric types except fixed-precision decimals interacting with each other or
6880
* with primitive types, because in that case the precision and scale of the result depends on
@@ -118,16 +130,20 @@ object HiveTypeCoercion {
118130
})
119131
}
120132

133+
/**
134+
* Case 2 type widening (see the classdoc comment above for HiveTypeCoercion).
135+
*
136+
* i.e. the main difference with [[findTightestCommonTypeOfTwo]] is that here we allow some
137+
* loss of precision when widening decimal and double.
138+
*/
121139
private def findWiderTypeForTwo(t1: DataType, t2: DataType): Option[DataType] = (t1, t2) match {
122140
case (t1: DecimalType, t2: DecimalType) =>
123141
Some(DecimalPrecision.widerDecimalType(t1, t2))
124142
case (t: IntegralType, d: DecimalType) =>
125143
Some(DecimalPrecision.widerDecimalType(DecimalType.forType(t), d))
126144
case (d: DecimalType, t: IntegralType) =>
127145
Some(DecimalPrecision.widerDecimalType(DecimalType.forType(t), d))
128-
case (t: FractionalType, d: DecimalType) =>
129-
Some(DoubleType)
130-
case (d: DecimalType, t: FractionalType) =>
146+
case (_: FractionalType, _: DecimalType) | (_: DecimalType, _: FractionalType) =>
131147
Some(DoubleType)
132148
case _ =>
133149
findTightestCommonTypeToString(t1, t2)
@@ -200,41 +216,37 @@ object HiveTypeCoercion {
200216
*/
201217
object WidenSetOperationTypes extends Rule[LogicalPlan] {
202218

203-
private[this] def widenOutputTypes(
204-
planName: String,
205-
left: LogicalPlan,
206-
right: LogicalPlan): (LogicalPlan, LogicalPlan) = {
207-
require(left.output.length == right.output.length)
219+
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
220+
case p if p.analyzed => p
208221

209-
val castedTypes = left.output.zip(right.output).map {
210-
case (lhs, rhs) if lhs.dataType != rhs.dataType =>
211-
findWiderTypeForTwo(lhs.dataType, rhs.dataType)
212-
case other => None
213-
}
222+
case s @ SetOperation(left, right) if s.childrenResolved
223+
&& left.output.length == right.output.length && !s.resolved =>
214224

215-
def castOutput(plan: LogicalPlan): LogicalPlan = {
216-
val casted = plan.output.zip(castedTypes).map {
217-
case (e, Some(dt)) if e.dataType != dt =>
218-
Alias(Cast(e, dt), e.name)()
219-
case (e, _) => e
225+
// Tracks the list of data types to widen.
226+
// Some(dataType) means the right-hand side and the left-hand side have different types,
227+
// and there is a target type to widen both sides to.
228+
val targetTypes: Seq[Option[DataType]] = left.output.zip(right.output).map {
229+
case (lhs, rhs) if lhs.dataType != rhs.dataType =>
230+
findWiderTypeForTwo(lhs.dataType, rhs.dataType)
231+
case other => None
220232
}
221-
Project(casted, plan)
222-
}
223233

224-
if (castedTypes.exists(_.isDefined)) {
225-
(castOutput(left), castOutput(right))
226-
} else {
227-
(left, right)
228-
}
234+
if (targetTypes.exists(_.isDefined)) {
235+
// There is at least one column to widen.
236+
s.makeCopy(Array(widenTypes(left, targetTypes), widenTypes(right, targetTypes)))
237+
} else {
238+
// If we cannot find any column to widen, then just return the original set.
239+
s
240+
}
229241
}
230242

231-
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
232-
case p if p.analyzed => p
233-
234-
case s @ SetOperation(left, right) if s.childrenResolved
235-
&& left.output.length == right.output.length && !s.resolved =>
236-
val (newLeft, newRight) = widenOutputTypes(s.nodeName, left, right)
237-
s.makeCopy(Array(newLeft, newRight))
243+
/** Given a plan, add an extra project on top to widen some columns' data types. */
244+
private def widenTypes(plan: LogicalPlan, targetTypes: Seq[Option[DataType]]): LogicalPlan = {
245+
val casted = plan.output.zip(targetTypes).map {
246+
case (e, Some(dt)) if e.dataType != dt => Alias(Cast(e, dt), e.name)()
247+
case (e, _) => e
248+
}
249+
Project(casted, plan)
238250
}
239251
}
240252

@@ -372,8 +384,6 @@ object HiveTypeCoercion {
372384
* - INT gets turned into DECIMAL(10, 0)
373385
* - LONG gets turned into DECIMAL(20, 0)
374386
* - FLOAT and DOUBLE cause fixed-length decimals to turn into DOUBLE
375-
*
376-
* Note: Union/Except/Interact is handled by WidenTypes
377387
*/
378388
// scalastyle:on
379389
object DecimalPrecision extends Rule[LogicalPlan] {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ class HiveTypeCoercionSuite extends PlanTest {
387387
)
388388
}
389389

390-
test("WidenSetOperationTypes for union except and intersect") {
390+
test("WidenSetOperationTypes for union, except, and intersect") {
391391
def checkOutput(logical: LogicalPlan, expectTypes: Seq[DataType]): Unit = {
392392
logical.output.zip(expectTypes).foreach { case (attr, dt) =>
393393
assert(attr.dataType === dt)
@@ -499,7 +499,6 @@ class HiveTypeCoercionSuite extends PlanTest {
499499
ruleTest(dateTimeOperations, Subtract(interval, interval), Subtract(interval, interval))
500500
}
501501

502-
503502
/**
504503
* There are rules that need to not fire before child expressions get resolved.
505504
* We use this test to make sure those rules do not fire early.

0 commit comments

Comments
 (0)