Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -704,19 +704,48 @@ object HiveTypeCoercion {

/**
* Casts types according to the expected input types for Expressions that have the trait
* [[AutoCastInputTypes]].
* [[ExpectsInputTypes]].
*/
object ImplicitTypeCasts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

case e: AutoCastInputTypes if e.children.map(_.dataType) != e.inputTypes =>
val newC = (e.children, e.children.map(_.dataType), e.inputTypes).zipped.map {
case (child, actual, expected) =>
if (actual == expected) child else Cast(child, expected)
case e: ExpectsInputTypes =>
val children: Seq[Expression] = e.children.zip(e.inputTypes).map { case (in, expected) =>
implicitCast(in, expected)
}
e.withNewChildren(newC)
e.withNewChildren(children)
}

/**
* If needed, cast the expression into the expected type.
* If the implicit cast is not allowed, return the expression itself.
*/
def implicitCast(e: Expression, expectedType: AbstractDataType): Expression = {
val inType = e.dataType
(inType, expectedType) match {
// Cast null type (usually from null literals) into target types
case (NullType, target: DataType) => Cast(e, target.defaultConcreteType)

// Implicit cast among numeric types
case (_: NumericType, target: NumericType) if e.dataType != target => Cast(e, target)

// Implicit cast between date time types
case (DateType, TimestampType) => Cast(e, TimestampType)
case (TimestampType, DateType) => Cast(e, DateType)

// Implicit cast from/to string
case (StringType, NumericType) => Cast(e, DoubleType)
case (StringType, target: NumericType) => Cast(e, target)
case (StringType, DateType) => Cast(e, DateType)
case (StringType, TimestampType) => Cast(e, TimestampType)
case (StringType, BinaryType) => Cast(e, BinaryType)
case (any, StringType) if any != StringType => Cast(e, StringType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to hive and discussion in #6551,
should we only allow atomic type(except boolean and binary) to string?


// Else, just return the same input expression
case _ => e
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.AbstractDataType


/**
Expand All @@ -32,28 +32,12 @@ trait ExpectsInputTypes { self: Expression =>
*
* The possible values at each position are:
* 1. a specific data type, e.g. LongType, StringType.
* 2. a non-leaf data type, e.g. NumericType, IntegralType, FractionalType.
* 3. a list of specific data types, e.g. Seq(StringType, BinaryType).
* 2. a non-leaf abstract data type, e.g. NumericType, IntegralType, FractionalType.
*/
def inputTypes: Seq[Any]
def inputTypes: Seq[AbstractDataType]

override def checkInputDataTypes(): TypeCheckResult = {
// We will do the type checking in `HiveTypeCoercion`, so always returning success here.
TypeCheckResult.TypeCheckSuccess
}
}

/**
* Expressions that require a specific `DataType` as input should implement this trait
* so that the proper type conversions can be performed in the analyzer.
*/
trait AutoCastInputTypes { self: Expression =>

def inputTypes: Seq[DataType]

override def checkInputDataTypes(): TypeCheckResult = {
// We will always do type casting for `AutoCastInputTypes` in `HiveTypeCoercion`,
// so type mismatch error won't be reported here, but for underling `Cast`s.
// TODO: implement proper type checking.
TypeCheckResult.TypeCheckSuccess
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ abstract class LeafMathExpression(c: Double, name: String)
* @param name The short name of the function
*/
abstract class UnaryMathExpression(f: Double => Double, name: String)
extends UnaryExpression with Serializable with AutoCastInputTypes {
self: Product =>
extends UnaryExpression with Serializable with ExpectsInputTypes { self: Product =>

override def inputTypes: Seq[DataType] = Seq(DoubleType)
override def dataType: DataType = DoubleType
Expand Down Expand Up @@ -96,7 +95,7 @@ abstract class UnaryMathExpression(f: Double => Double, name: String)
* @param name The short name of the function
*/
abstract class BinaryMathExpression(f: (Double, Double) => Double, name: String)
extends BinaryExpression with Serializable with AutoCastInputTypes { self: Product =>
extends BinaryExpression with Serializable with ExpectsInputTypes { self: Product =>

override def inputTypes: Seq[DataType] = Seq(DoubleType, DoubleType)

Expand Down Expand Up @@ -208,7 +207,7 @@ case class ToRadians(child: Expression) extends UnaryMathExpression(math.toRadia
}

case class Bin(child: Expression)
extends UnaryExpression with Serializable with AutoCastInputTypes {
extends UnaryExpression with Serializable with ExpectsInputTypes {

override def inputTypes: Seq[DataType] = Seq(LongType)
override def dataType: DataType = StringType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import org.apache.spark.unsafe.types.UTF8String
* A function that calculates an MD5 128-bit checksum and returns it as a hex string
* For input of type [[BinaryType]]
*/
case class Md5(child: Expression)
extends UnaryExpression with AutoCastInputTypes {
case class Md5(child: Expression) extends UnaryExpression with ExpectsInputTypes {

override def dataType: DataType = StringType

Expand Down Expand Up @@ -62,12 +61,10 @@ case class Md5(child: Expression)
* the hash length is not one of the permitted values, the return value is NULL.
*/
case class Sha2(left: Expression, right: Expression)
extends BinaryExpression with Serializable with AutoCastInputTypes {
extends BinaryExpression with Serializable with ExpectsInputTypes {

override def dataType: DataType = StringType

override def toString: String = s"SHA2($left, $right)"

override def inputTypes: Seq[DataType] = Seq(BinaryType, IntegerType)

override def eval(input: InternalRow): Any = {
Expand Down Expand Up @@ -147,7 +144,7 @@ case class Sha2(left: Expression, right: Expression)
* A function that calculates a sha1 hash value and returns it as a hex string
* For input of type [[BinaryType]] or [[StringType]]
*/
case class Sha1(child: Expression) extends UnaryExpression with AutoCastInputTypes {
case class Sha1(child: Expression) extends UnaryExpression with ExpectsInputTypes {

override def dataType: DataType = StringType

Expand All @@ -174,8 +171,7 @@ case class Sha1(child: Expression) extends UnaryExpression with AutoCastInputTyp
* A function that computes a cyclic redundancy check value and returns it as a bigint
* For input of type [[BinaryType]]
*/
case class Crc32(child: Expression)
extends UnaryExpression with AutoCastInputTypes {
case class Crc32(child: Expression) extends UnaryExpression with ExpectsInputTypes {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crc32 should be able to work with StringType, but StringType cannot be implicit casted BinaryType, right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i need to think about whether we should support implicit casts from string to binary. sql server does support that. hive doesn't, but hive chose to make a lot of the udfs work against both types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we can always cast a string to binary correctly, as it produces different binary when specifying different encoder. It's actually the case accept multiple DataType for an expression.
And also for Length, which support both StringType and BinaryType.

We probably need another PR for this improvement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about having an AbstractDataType that's a TypeCollection, that expressions can put arbitrary types into it. Basically similar to the Seq[Any] idea, but with better type safety.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good idea for this, but it probably make thing more complicated for auto casting. (Which data type should be cast to?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for StringType -> BinaryType (UTF8 will be used)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we'd better leave the casting (StringType -> BinaryType) to be done within the UDF Crc32 itself, not via the generic auto casting rule. From the user perspective, the UDF Crc32 will support both StringType and BinaryType.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, I just checked the code of Hive, it does convert the StringType => BinaryType (UTF8 bytes), just as the generic rule. @davies +1


override def dataType: DataType = LongType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ trait PredicateHelper {
expr.references.subsetOf(plan.outputSet)
}

case class Not(child: Expression) extends UnaryExpression with Predicate with AutoCastInputTypes {
case class Not(child: Expression) extends UnaryExpression with Predicate with ExpectsInputTypes {
override def toString: String = s"NOT $child"

override def inputTypes: Seq[DataType] = Seq(BooleanType)
Expand Down Expand Up @@ -120,11 +120,11 @@ case class InSet(value: Expression, hset: Set[Any])
}

case class And(left: Expression, right: Expression)
extends BinaryOperator with Predicate with AutoCastInputTypes {
extends BinaryExpression with Predicate with ExpectsInputTypes {

override def inputTypes: Seq[DataType] = Seq(BooleanType, BooleanType)
override def toString: String = s"($left && $right)"

override def symbol: String = "&&"
override def inputTypes: Seq[DataType] = Seq(BooleanType, BooleanType)

override def eval(input: InternalRow): Any = {
val l = left.eval(input)
Expand Down Expand Up @@ -169,11 +169,11 @@ case class And(left: Expression, right: Expression)
}

case class Or(left: Expression, right: Expression)
extends BinaryOperator with Predicate with AutoCastInputTypes {
extends BinaryExpression with Predicate with ExpectsInputTypes {

override def inputTypes: Seq[DataType] = Seq(BooleanType, BooleanType)
override def toString: String = s"($left || $right)"

override def symbol: String = "||"
override def inputTypes: Seq[DataType] = Seq(BooleanType, BooleanType)

override def eval(input: InternalRow): Any = {
val l = left.eval(input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

trait StringRegexExpression extends AutoCastInputTypes {
trait StringRegexExpression extends ExpectsInputTypes {
self: BinaryExpression =>

def escape(v: String): String
Expand Down Expand Up @@ -111,7 +111,7 @@ case class RLike(left: Expression, right: Expression)
override def toString: String = s"$left RLIKE $right"
}

trait CaseConversionExpression extends AutoCastInputTypes {
trait CaseConversionExpression extends ExpectsInputTypes {
self: UnaryExpression =>

def convert(v: UTF8String): UTF8String
Expand Down Expand Up @@ -154,7 +154,7 @@ case class Lower(child: Expression) extends UnaryExpression with CaseConversionE
}

/** A base trait for functions that compare two strings, returning a boolean. */
trait StringComparison extends AutoCastInputTypes {
trait StringComparison extends ExpectsInputTypes {
self: BinaryExpression =>

def compare(l: UTF8String, r: UTF8String): Boolean
Expand Down Expand Up @@ -215,7 +215,7 @@ case class EndsWith(left: Expression, right: Expression)
* Defined for String and Binary types.
*/
case class Substring(str: Expression, pos: Expression, len: Expression)
extends Expression with AutoCastInputTypes {
extends Expression with ExpectsInputTypes {

def this(str: Expression, pos: Expression) = {
this(str, pos, Literal(Integer.MAX_VALUE))
Expand Down Expand Up @@ -283,7 +283,7 @@ case class Substring(str: Expression, pos: Expression, len: Expression)
/**
* A function that return the length of the given string expression.
*/
case class StringLength(child: Expression) extends UnaryExpression with AutoCastInputTypes {
case class StringLength(child: Expression) extends UnaryExpression with ExpectsInputTypes {
override def dataType: DataType = IntegerType
override def inputTypes: Seq[DataType] = Seq(StringType)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types

import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{TypeTag, runtimeMirror}

import org.apache.spark.sql.catalyst.ScalaReflectionLock
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.util.Utils

/**
* A non-concrete data type, reserved for internal uses.
*/
private[sql] abstract class AbstractDataType {
private[sql] def defaultConcreteType: DataType
}


/**
* An internal type used to represent everything that is not null, UDTs, arrays, structs, and maps.
*/
protected[sql] abstract class AtomicType extends DataType {
private[sql] type InternalType
@transient private[sql] val tag: TypeTag[InternalType]
private[sql] val ordering: Ordering[InternalType]

@transient private[sql] val classTag = ScalaReflectionLock.synchronized {
val mirror = runtimeMirror(Utils.getSparkClassLoader)
ClassTag[InternalType](mirror.runtimeClass(tag.tpe))
}
}


/**
* :: DeveloperApi ::
* Numeric data types.
*/
abstract class NumericType extends AtomicType {
// Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
// implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
// type parameter and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
// desugared by the compiler into an argument to the objects constructor. This means there is no
// longer an no argument constructor and thus the JVM cannot serialize the object anymore.
private[sql] val numeric: Numeric[InternalType]
}


private[sql] object NumericType extends AbstractDataType {
/**
* Enables matching against NumericType for expressions:
* {{{
* case Cast(child @ NumericType(), StringType) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exist: why we add unapply for it? Is it same with Cast(child: NumericType, StringType)? It looks to me that we only need this object NumericType in ExpectsInputTypes when an expression need any kind of numeric input.
And, should we add object AtomicType too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

child is an expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry didn't see that...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw this is old code. just got copied around.

* ...
* }}}
*/
def unapply(e: Expression): Boolean = e.dataType.isInstanceOf[NumericType]

private[sql] override def defaultConcreteType: DataType = IntegerType
}


private[sql] object IntegralType extends AbstractDataType {
/**
* Enables matching against IntegralType for expressions:
* {{{
* case Cast(child @ IntegralType(), StringType) =>
* ...
* }}}
*/
def unapply(e: Expression): Boolean = e.dataType.isInstanceOf[IntegralType]

private[sql] override def defaultConcreteType: DataType = IntegerType
}


private[sql] abstract class IntegralType extends NumericType {
private[sql] val integral: Integral[InternalType]
}


private[sql] object FractionalType extends AbstractDataType {
/**
* Enables matching against FractionalType for expressions:
* {{{
* case Cast(child @ FractionalType(), StringType) =>
* ...
* }}}
*/
def unapply(e: Expression): Boolean = e.dataType.isInstanceOf[FractionalType]

private[sql] override def defaultConcreteType: DataType = DoubleType
}


private[sql] abstract class FractionalType extends NumericType {
private[sql] val fractional: Fractional[InternalType]
private[sql] val asIntegral: Integral[InternalType]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import org.json4s.JsonDSL._
import org.apache.spark.annotation.DeveloperApi


object ArrayType {
object ArrayType extends AbstractDataType {
/** Construct a [[ArrayType]] object with the given element type. The `containsNull` is true. */
def apply(elementType: DataType): ArrayType = ArrayType(elementType, containsNull = true)

override def defaultConcreteType: DataType = ArrayType(NullType, containsNull = true)
}


Expand All @@ -41,8 +43,6 @@ object ArrayType {
*
* @param elementType The data type of values.
* @param containsNull Indicates if values have `null` values
*
* @group dataType
*/
@DeveloperApi
case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import org.apache.spark.sql.catalyst.util.TypeUtils
* :: DeveloperApi ::
* The data type representing `Array[Byte]` values.
* Please use the singleton [[DataTypes.BinaryType]].
*
* @group dataType
*/
@DeveloperApi
class BinaryType private() extends AtomicType {
Expand Down
Loading