Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions core/src/main/scala/org/apache/spark/api/r/SerDe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.api.r

import java.io.{DataInputStream, DataOutputStream}
import java.sql.{Date, Time}
import java.sql.{Timestamp, Date, Time}

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -107,9 +107,12 @@ private[spark] object SerDe {
Date.valueOf(readString(in))
}

def readTime(in: DataInputStream): Time = {
val t = in.readDouble()
new Time((t * 1000L).toLong)
def readTime(in: DataInputStream): Timestamp = {
val seconds = in.readDouble()
val sec = Math.floor(seconds).toLong
val t = new Timestamp(sec * 1000L)
t.setNanos(((seconds - sec) * 1e9).toInt)
t
}

def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
Expand Down Expand Up @@ -227,6 +230,9 @@ private[spark] object SerDe {
case "java.sql.Time" =>
writeType(dos, "time")
writeTime(dos, value.asInstanceOf[Time])
case "java.sql.Timestamp" =>
writeType(dos, "time")
writeTime(dos, value.asInstanceOf[Timestamp])
case "[B" =>
writeType(dos, "raw")
writeBytes(dos, value.asInstanceOf[Array[Byte]])
Expand Down Expand Up @@ -289,6 +295,9 @@ private[spark] object SerDe {
out.writeDouble(value.getTime.toDouble / 1000.0)
}

def writeTime(out: DataOutputStream, value: Timestamp): Unit = {
out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9)
}

// NOTE: Only works for ASCII right now
def writeString(out: DataOutputStream, value: String): Unit = {
Expand Down
11 changes: 11 additions & 0 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import decimal
import time
import datetime
import calendar
import keyword
import warnings
import json
Expand Down Expand Up @@ -654,6 +655,8 @@ def _need_python_to_sql_conversion(dataType):
_need_python_to_sql_conversion(dataType.valueType)
elif isinstance(dataType, UserDefinedType):
return True
elif isinstance(dataType, TimestampType):
return True
else:
return False

Expand Down Expand Up @@ -707,6 +710,14 @@ def converter(obj):
return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()])
elif isinstance(dataType, UserDefinedType):
return lambda obj: dataType.serialize(obj)
elif isinstance(dataType, TimestampType):

def to_posix_timstamp(dt):
if dt.tzinfo is None:
return int(time.mktime(dt.timetuple()) * 1e7 + dt.microsecond * 10)
else:
return int(calendar.timegm(dt.utctimetuple()) * 1e7 + dt.microsecond * 10)
return to_posix_timstamp
else:
raise ValueError("Unexpected type %r" % dataType)

Expand Down
6 changes: 6 additions & 0 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;

import scala.collection.Seq;
Expand Down Expand Up @@ -103,6 +104,11 @@ public Date getDate(int i) {
throw new UnsupportedOperationException();
}

@Override
public Timestamp getTimestamp(int i) {
throw new UnsupportedOperationException();
}

@Override
public <T> Seq<T> getSeq(int i) {
throw new UnsupportedOperationException();
Expand Down
8 changes: 7 additions & 1 deletion sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,15 @@ trait Row extends Serializable {
*
* @throws ClassCastException when data type does not match.
*/
// TODO(davies): This is not the right default implementation, we use Int as Date internally
def getDate(i: Int): java.sql.Date = apply(i).asInstanceOf[java.sql.Date]

/**
* Returns the value at position i of date type as java.sql.Timestamp.
*
* @throws ClassCastException when data type does not match.
*/
def getTimestamp(i: Int): java.sql.Timestamp = apply(i).asInstanceOf[java.sql.Timestamp]

/**
* Returns the value at position i of array type as a Scala Seq.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst

import java.lang.{Iterable => JavaIterable}
import java.math.{BigDecimal => JavaBigDecimal}
import java.sql.Date
import java.sql.{Timestamp, Date}
import java.util.{Map => JavaMap}
import javax.annotation.Nullable

Expand Down Expand Up @@ -58,6 +58,7 @@ object CatalystTypeConverters {
case structType: StructType => StructConverter(structType)
case StringType => StringConverter
case DateType => DateConverter
case TimestampType => TimestampConverter
case dt: DecimalType => BigDecimalConverter
case BooleanType => BooleanConverter
case ByteType => ByteConverter
Expand Down Expand Up @@ -274,6 +275,15 @@ object CatalystTypeConverters {
override def toScalaImpl(row: Row, column: Int): Date = toScala(row.getInt(column))
}

private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] {
override def toCatalystImpl(scalaValue: Timestamp): Long =
DateUtils.fromJavaTimestamp(scalaValue)
override def toScala(catalystValue: Any): Timestamp =
if (catalystValue == null) null
else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
override def toScalaImpl(row: Row, column: Int): Timestamp = toScala(row.getLong(column))
}

private object BigDecimalConverter extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
override def toCatalystImpl(scalaValue: Any): Decimal = scalaValue match {
case d: BigDecimal => Decimal(d)
Expand Down Expand Up @@ -367,6 +377,7 @@ object CatalystTypeConverters {
def convertToCatalyst(a: Any): Any = a match {
case s: String => StringConverter.toCatalyst(s)
case d: Date => DateConverter.toCatalyst(d)
case t: Timestamp => TimestampConverter.toCatalyst(t)
case d: BigDecimal => BigDecimalConverter.toCatalyst(d)
case d: JavaBigDecimal => BigDecimalConverter.toCatalyst(d)
case seq: Seq[Any] => seq.map(convertToCatalyst)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
private[this] def castToString(from: DataType): Any => Any = from match {
case BinaryType => buildCast[Array[Byte]](_, UTF8String(_))
case DateType => buildCast[Int](_, d => UTF8String(DateUtils.toString(d)))
case TimestampType => buildCast[Timestamp](_, t => UTF8String(timestampToString(t)))
case TimestampType => buildCast[Long](_,
t => UTF8String(timestampToString(DateUtils.toJavaTimestamp(t))))
case _ => buildCast[Any](_, o => UTF8String(o.toString))
}

Expand All @@ -127,7 +128,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case StringType =>
buildCast[UTF8String](_, _.length() != 0)
case TimestampType =>
buildCast[Timestamp](_, t => t.getTime() != 0 || t.getNanos() != 0)
buildCast[Long](_, t => t != 0)
case DateType =>
// Hive would return null when cast from date to boolean
buildCast[Int](_, d => null)
Expand Down Expand Up @@ -158,20 +159,21 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
if (periodIdx != -1 && n.length() - periodIdx > 9) {
n = n.substring(0, periodIdx + 10)
}
try Timestamp.valueOf(n) catch { case _: java.lang.IllegalArgumentException => null }
try DateUtils.fromJavaTimestamp(Timestamp.valueOf(n))
catch { case _: java.lang.IllegalArgumentException => null }
})
case BooleanType =>
buildCast[Boolean](_, b => new Timestamp(if (b) 1 else 0))
buildCast[Boolean](_, b => (if (b) 1L else 0))
case LongType =>
buildCast[Long](_, l => new Timestamp(l))
buildCast[Long](_, l => longToTimestamp(l))
case IntegerType =>
buildCast[Int](_, i => new Timestamp(i))
buildCast[Int](_, i => longToTimestamp(i.toLong))
case ShortType =>
buildCast[Short](_, s => new Timestamp(s))
buildCast[Short](_, s => longToTimestamp(s.toLong))
case ByteType =>
buildCast[Byte](_, b => new Timestamp(b))
buildCast[Byte](_, b => longToTimestamp(b.toLong))
case DateType =>
buildCast[Int](_, d => new Timestamp(DateUtils.toJavaDate(d).getTime))
buildCast[Int](_, d => DateUtils.toMillisSinceEpoch(d) * 10000)
// TimestampWritable.decimalToTimestamp
case DecimalType() =>
buildCast[Decimal](_, d => decimalToTimestamp(d))
Expand All @@ -191,25 +193,17 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
})
}

private[this] def decimalToTimestamp(d: Decimal) = {
val seconds = Math.floor(d.toDouble).toLong
val bd = (d.toBigDecimal - seconds) * 1000000000
val nanos = bd.intValue()

val millis = seconds * 1000
val t = new Timestamp(millis)

// remaining fractional portion as nanos
t.setNanos(nanos)
t
private[this] def decimalToTimestamp(d: Decimal): Long = {
(d.toBigDecimal * 10000000L).longValue()
}

// Timestamp to long, converting milliseconds to seconds
private[this] def timestampToLong(ts: Timestamp) = Math.floor(ts.getTime / 1000.0).toLong

private[this] def timestampToDouble(ts: Timestamp) = {
// First part is the seconds since the beginning of time, followed by nanosecs.
Math.floor(ts.getTime / 1000.0).toLong + ts.getNanos.toDouble / 1000000000
// converting milliseconds to 100ns
private[this] def longToTimestamp(t: Long): Long = t * 10000L
// converting 100ns to seconds
private[this] def timestampToLong(ts: Long): Long = math.floor(ts.toDouble / 10000000L).toLong
// converting 100ns to seconds in double
private[this] def timestampToDouble(ts: Long): Double = {
ts / 10000000.0
}

// Converts Timestamp to string according to Hive TimestampWritable convention
Expand All @@ -234,7 +228,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case TimestampType =>
// throw valid precision more than seconds, according to Hive.
// Timestamp.nanos is in 0 to 999,999,999, no more than a second.
buildCast[Timestamp](_, t => DateUtils.millisToDays(t.getTime))
buildCast[Long](_, t => DateUtils.millisToDays(t / 10000L))
// Hive throws this exception as a Semantic Exception
// It is never possible to compare result when hive return with exception,
// so we can return null
Expand All @@ -253,7 +247,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case DateType =>
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToLong(t))
buildCast[Long](_, t => timestampToLong(t))
case x: NumericType =>
b => x.numeric.asInstanceOf[Numeric[Any]].toLong(b)
}
Expand All @@ -269,7 +263,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case DateType =>
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToLong(t).toInt)
buildCast[Long](_, t => timestampToLong(t).toInt)
case x: NumericType =>
b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b)
}
Expand All @@ -285,7 +279,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case DateType =>
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToLong(t).toShort)
buildCast[Long](_, t => timestampToLong(t).toShort)
case x: NumericType =>
b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b).toShort
}
Expand All @@ -301,7 +295,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case DateType =>
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToLong(t).toByte)
buildCast[Long](_, t => timestampToLong(t).toByte)
case x: NumericType =>
b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b).toByte
}
Expand Down Expand Up @@ -334,7 +328,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
buildCast[Int](_, d => null) // date can't cast to decimal in Hive
case TimestampType =>
// Note that we lose precision here.
buildCast[Timestamp](_, t => changePrecision(Decimal(timestampToDouble(t)), target))
buildCast[Long](_, t => changePrecision(Decimal(timestampToDouble(t)), target))
case DecimalType() =>
b => changePrecision(b.asInstanceOf[Decimal].clone(), target)
case LongType =>
Expand All @@ -358,7 +352,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case DateType =>
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToDouble(t))
buildCast[Long](_, t => timestampToDouble(t))
case x: NumericType =>
b => x.numeric.asInstanceOf[Numeric[Any]].toDouble(b)
}
Expand All @@ -374,7 +368,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case DateType =>
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToDouble(t).toFloat)
buildCast[Long](_, t => timestampToDouble(t).toFloat)
case x: NumericType =>
b => x.numeric.asInstanceOf[Numeric[Any]].toFloat(b)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR
case BooleanType => new MutableBoolean
case LongType => new MutableLong
case DateType => new MutableInt // We use INT for DATE internally
case TimestampType => new MutableLong // We use Long for Timestamp internally
case _ => new MutableAny
}.toArray)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class CodeGenContext {
case BinaryType => "byte[]"
case StringType => stringType
case DateType => "int"
case TimestampType => "java.sql.Timestamp"
case TimestampType => "long"
case dt: OpenHashSetUDT if dt.elementType == IntegerType => classOf[IntegerHashSet].getName
case dt: OpenHashSetUDT if dt.elementType == LongType => classOf[LongHashSet].getName
case _ => "Object"
Expand All @@ -140,6 +140,7 @@ class CodeGenContext {
case FloatType => "Float"
case BooleanType => "Boolean"
case DateType => "Integer"
case TimestampType => "Long"
case _ => javaType(dt)
}

Expand All @@ -155,6 +156,7 @@ class CodeGenContext {
case DoubleType => "-1.0"
case IntegerType => "-1"
case DateType => "-1"
case TimestampType => "-1L"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use -1 as an uninitialized value, easy for debug.

case _ => "null"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {

val specificAccessorFunctions = ctx.nativeTypes.map { dataType =>
val cases = expressions.zipWithIndex.map {
case (e, i) if e.dataType == dataType =>
case (e, i) if e.dataType == dataType
|| dataType == IntegerType && e.dataType == DateType
|| dataType == LongType && e.dataType == TimestampType =>
s"case $i: return c$i;"
case _ => ""
}.mkString("\n ")
Expand All @@ -96,7 +98,9 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {

val specificMutatorFunctions = ctx.nativeTypes.map { dataType =>
val cases = expressions.zipWithIndex.map {
case (e, i) if e.dataType == dataType =>
case (e, i) if e.dataType == dataType
|| dataType == IntegerType && e.dataType == DateType
|| dataType == LongType && e.dataType == TimestampType =>
s"case $i: { c$i = value; return; }"
case _ => ""
}.mkString("\n")
Expand All @@ -119,7 +123,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
val nonNull = e.dataType match {
case BooleanType => s"$col ? 0 : 1"
case ByteType | ShortType | IntegerType | DateType => s"$col"
case LongType => s"$col ^ ($col >>> 32)"
case LongType | TimestampType => s"$col ^ ($col >>> 32)"
case FloatType => s"Float.floatToIntBits($col)"
case DoubleType =>
s"(int)(Double.doubleToLongBits($col) ^ (Double.doubleToLongBits($col) >>> 32))"
Expand Down
Loading