Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.catalyst

import java.sql.Timestamp

import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
Expand Down Expand Up @@ -110,7 +108,7 @@ trait ScalaReflection {
StructField(p.name.toString, dataType, nullable)
}), nullable = true)
case t if t <:< typeOf[String] => Schema(StringType, nullable = true)
case t if t <:< typeOf[Timestamp] => Schema(TimestampType, nullable = true)
case t if t <:< typeOf[java.sql.Timestamp] => Schema(TimestampType, nullable = true)
case t if t <:< typeOf[java.sql.Date] => Schema(DateType, nullable = true)
case t if t <:< typeOf[BigDecimal] => Schema(DecimalType.Unlimited, nullable = true)
case t if t <:< typeOf[java.math.BigDecimal] => Schema(DecimalType.Unlimited, nullable = true)
Expand All @@ -136,20 +134,20 @@ trait ScalaReflection {

def typeOfObject: PartialFunction[Any, DataType] = {
// The data type can be determined without ambiguity.
case obj: BooleanType.JvmType => BooleanType
case obj: BinaryType.JvmType => BinaryType
case obj: Boolean => BooleanType
case obj: Array[Byte] => BinaryType
case obj: String => StringType
case obj: StringType.JvmType => StringType
case obj: ByteType.JvmType => ByteType
case obj: ShortType.JvmType => ShortType
case obj: IntegerType.JvmType => IntegerType
case obj: LongType.JvmType => LongType
case obj: FloatType.JvmType => FloatType
case obj: DoubleType.JvmType => DoubleType
case obj: UTF8String => StringType
case obj: Byte => ByteType
case obj: Short => ShortType
case obj: Int => IntegerType
case obj: Long => LongType
case obj: Float => FloatType
case obj: Double => DoubleType
case obj: java.sql.Date => DateType
case obj: java.math.BigDecimal => DecimalType.Unlimited
case obj: Decimal => DecimalType.Unlimited
case obj: TimestampType.JvmType => TimestampType
case obj: java.sql.Timestamp => TimestampType
case null => NullType
// For other cases, there is no obvious mapping from the type of the given object to a
// Catalyst data type. A user should provide his/her specific rules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ case class MaxOf(left: Expression, right: Expression) extends Expression {
}

lazy val ordering = left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}

Expand Down Expand Up @@ -391,7 +391,7 @@ case class MinOf(left: Expression, right: Expression) extends Expression {
}

lazy val ordering = left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
protected def getColumn(inputRow: TermName, dataType: DataType, ordinal: Int) = {
dataType match {
case StringType => q"$inputRow($ordinal).asInstanceOf[org.apache.spark.sql.types.UTF8String]"
case dt @ NativeType() => q"$inputRow.${accessorForType(dt)}($ordinal)"
case dt: DataType if isNativeType(dt) => q"$inputRow.${accessorForType(dt)}($ordinal)"
case _ => q"$inputRow.apply($ordinal).asInstanceOf[${termForType(dataType)}]"
}
}
Expand All @@ -635,7 +635,8 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
value: TermName) = {
dataType match {
case StringType => q"$destinationRow.update($ordinal, $value)"
case dt @ NativeType() => q"$destinationRow.${mutatorForType(dt)}($ordinal, $value)"
case dt: DataType if isNativeType(dt) =>
q"$destinationRow.${mutatorForType(dt)}($ordinal, $value)"
case _ => q"$destinationRow.update($ordinal, $value)"
}
}
Expand Down Expand Up @@ -675,7 +676,18 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
}

protected def termForType(dt: DataType) = dt match {
case n: NativeType => n.tag
case n: AtomicType => n.tag
case _ => typeTag[Any]
}

/**
* List of data types that have special accessors and setters in [[Row]].
*/
protected val nativeTypes =
Seq(IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)

/**
* Returns true if the data type has a special accessor and setter in [[Row]].
*/
protected def isNativeType(dt: DataType) = nativeTypes.contains(dt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now "native type" refers to those data types have special accessor and setter in Row?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is specific in codegenerator, which i don't care as much.

}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
q"override def update(i: Int, value: Any): Unit = { ..$cases; $accessorFailure }"
}

val specificAccessorFunctions = NativeType.all.map { dataType =>
val specificAccessorFunctions = nativeTypes.map { dataType =>
val ifStatements = expressions.zipWithIndex.flatMap {
// getString() is not used by expressions
case (e, i) if e.dataType == dataType && dataType != StringType =>
Expand All @@ -135,7 +135,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
}
}

val specificMutatorFunctions = NativeType.all.map { dataType =>
val specificMutatorFunctions = nativeTypes.map { dataType =>
val ifStatements = expressions.zipWithIndex.flatMap {
// setString() is not used by expressions
case (e, i) if e.dataType == dataType && dataType != StringType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{DataType, BinaryType, BooleanType, NativeType}
import org.apache.spark.sql.types.{DataType, BinaryType, BooleanType, AtomicType}

object InterpretedPredicate {
def create(expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
Expand Down Expand Up @@ -211,7 +211,7 @@ case class LessThan(left: Expression, right: Expression) extends BinaryCompariso
s"Types do not match ${left.dataType} != ${right.dataType}")
}
left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}
}
Expand Down Expand Up @@ -240,7 +240,7 @@ case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryCo
s"Types do not match ${left.dataType} != ${right.dataType}")
}
left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}
}
Expand Down Expand Up @@ -269,7 +269,7 @@ case class GreaterThan(left: Expression, right: Expression) extends BinaryCompar
s"Types do not match ${left.dataType} != ${right.dataType}")
}
left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}
}
Expand Down Expand Up @@ -298,7 +298,7 @@ case class GreaterThanOrEqual(left: Expression, right: Expression) extends Binar
s"Types do not match ${left.dataType} != ${right.dataType}")
}
left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.types.{UTF8String, DataType, StructType, NativeType}
import org.apache.spark.sql.types.{UTF8String, DataType, StructType, AtomicType}

/**
* An extended interface to [[Row]] that allows the values for each column to be updated. Setting
Expand Down Expand Up @@ -227,9 +227,9 @@ class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] {
return if (order.direction == Ascending) 1 else -1
} else {
val comparison = order.dataType match {
case n: NativeType if order.direction == Ascending =>
case n: AtomicType if order.direction == Ascending =>
n.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
case n: NativeType if order.direction == Descending =>
case n: AtomicType if order.direction == Descending =>
n.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
case other => sys.error(s"Type $other does not support ordered operations")
}
Expand Down
Loading