Skip to content

Commit db821ed

Browse files
adrian-wangmarmbrus
authored andcommitted
[SPARK-4508] [SQL] build native date type to conform behavior to Hive
The previous #3732 is reverted due to some test failure. Have fixed that. Author: Daoyuan Wang <[email protected]> Closes #4325 from adrian-wang/datenative and squashes the following commits: 096e20d [Daoyuan Wang] fix for mixed timezone 0ed0fdc [Daoyuan Wang] fix test data a2fdd4e [Daoyuan Wang] getDate c37832b [Daoyuan Wang] row to catalyst f0005b1 [Daoyuan Wang] add date in sql parser and java type conversion 024c9a6 [Daoyuan Wang] clean some import order d6715fc [Daoyuan Wang] refactoring Date as Primitive Int internally 374abd5 [Daoyuan Wang] spark native date type support
1 parent 5adbb39 commit db821ed

File tree

49 files changed

+204
-125
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+204
-125
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql
2020
import scala.util.hashing.MurmurHash3
2121

2222
import org.apache.spark.sql.catalyst.expressions.GenericRow
23-
23+
import org.apache.spark.sql.types.DateUtils
2424

2525
object Row {
2626
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717

1818
package org.apache.spark.sql.catalyst
1919

20-
import java.sql.{Date, Timestamp}
20+
import java.sql.Timestamp
2121

2222
import org.apache.spark.util.Utils
2323
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute, AttributeReference, Row}
2424
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
2525
import org.apache.spark.sql.types._
2626

27-
2827
/**
2928
* A default version of ScalaReflection that uses the runtime universe.
3029
*/
@@ -72,6 +71,7 @@ trait ScalaReflection {
7271
}.toArray)
7372
case (d: BigDecimal, _) => Decimal(d)
7473
case (d: java.math.BigDecimal, _) => Decimal(d)
74+
case (d: java.sql.Date, _) => DateUtils.fromJavaDate(d)
7575
case (other, _) => other
7676
}
7777

@@ -85,6 +85,7 @@ trait ScalaReflection {
8585
}
8686
case (r: Row, s: StructType) => convertRowToScala(r, s)
8787
case (d: Decimal, _: DecimalType) => d.toJavaBigDecimal
88+
case (i: Int, DateType) => DateUtils.toJavaDate(i)
8889
case (other, _) => other
8990
}
9091

@@ -159,7 +160,7 @@ trait ScalaReflection {
159160
valueDataType, valueContainsNull = valueNullable), nullable = true)
160161
case t if t <:< typeOf[String] => Schema(StringType, nullable = true)
161162
case t if t <:< typeOf[Timestamp] => Schema(TimestampType, nullable = true)
162-
case t if t <:< typeOf[Date] => Schema(DateType, nullable = true)
163+
case t if t <:< typeOf[java.sql.Date] => Schema(DateType, nullable = true)
163164
case t if t <:< typeOf[BigDecimal] => Schema(DecimalType.Unlimited, nullable = true)
164165
case t if t <:< typeOf[java.math.BigDecimal] => Schema(DecimalType.Unlimited, nullable = true)
165166
case t if t <:< typeOf[Decimal] => Schema(DecimalType.Unlimited, nullable = true)
@@ -191,7 +192,7 @@ trait ScalaReflection {
191192
case obj: LongType.JvmType => LongType
192193
case obj: FloatType.JvmType => FloatType
193194
case obj: DoubleType.JvmType => DoubleType
194-
case obj: DateType.JvmType => DateType
195+
case obj: java.sql.Date => DateType
195196
case obj: java.math.BigDecimal => DecimalType.Unlimited
196197
case obj: Decimal => DecimalType.Unlimited
197198
case obj: TimestampType.JvmType => TimestampType

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class SqlParser extends AbstractSparkSQLParser {
5252
protected val CAST = Keyword("CAST")
5353
protected val COALESCE = Keyword("COALESCE")
5454
protected val COUNT = Keyword("COUNT")
55+
protected val DATE = Keyword("DATE")
5556
protected val DECIMAL = Keyword("DECIMAL")
5657
protected val DESC = Keyword("DESC")
5758
protected val DISTINCT = Keyword("DISTINCT")
@@ -383,6 +384,7 @@ class SqlParser extends AbstractSparkSQLParser {
383384
| DOUBLE ^^^ DoubleType
384385
| fixedDecimalType
385386
| DECIMAL ^^^ DecimalType.Unlimited
387+
| DATE ^^^ DateType
386388
)
387389

388390
protected lazy val fixedDecimalType: Parser[DataType] =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala

Lines changed: 20 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
113113
// UDFToString
114114
private[this] def castToString(from: DataType): Any => Any = from match {
115115
case BinaryType => buildCast[Array[Byte]](_, new String(_, "UTF-8"))
116-
case DateType => buildCast[Date](_, dateToString)
116+
case DateType => buildCast[Int](_, d => DateUtils.toString(d))
117117
case TimestampType => buildCast[Timestamp](_, timestampToString)
118118
case _ => buildCast[Any](_, _.toString)
119119
}
@@ -131,7 +131,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
131131
buildCast[Timestamp](_, t => t.getTime() != 0 || t.getNanos() != 0)
132132
case DateType =>
133133
// Hive would return null when cast from date to boolean
134-
buildCast[Date](_, d => null)
134+
buildCast[Int](_, d => null)
135135
case LongType =>
136136
buildCast[Long](_, _ != 0)
137137
case IntegerType =>
@@ -171,7 +171,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
171171
case ByteType =>
172172
buildCast[Byte](_, b => new Timestamp(b))
173173
case DateType =>
174-
buildCast[Date](_, d => new Timestamp(d.getTime))
174+
buildCast[Int](_, d => new Timestamp(DateUtils.toJavaDate(d).getTime))
175175
// TimestampWritable.decimalToTimestamp
176176
case DecimalType() =>
177177
buildCast[Decimal](_, d => decimalToTimestamp(d))
@@ -224,37 +224,24 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
224224
}
225225
}
226226

227-
// Converts Timestamp to string according to Hive TimestampWritable convention
228-
private[this] def timestampToDateString(ts: Timestamp): String = {
229-
Cast.threadLocalDateFormat.get.format(ts)
230-
}
231-
232227
// DateConverter
233228
private[this] def castToDate(from: DataType): Any => Any = from match {
234229
case StringType =>
235230
buildCast[String](_, s =>
236-
try Date.valueOf(s) catch { case _: java.lang.IllegalArgumentException => null })
231+
try DateUtils.fromJavaDate(Date.valueOf(s))
232+
catch { case _: java.lang.IllegalArgumentException => null }
233+
)
237234
case TimestampType =>
238235
// throw valid precision more than seconds, according to Hive.
239236
// Timestamp.nanos is in 0 to 999,999,999, no more than a second.
240-
buildCast[Timestamp](_, t => new Date(Math.floor(t.getTime / 1000.0).toLong * 1000))
237+
buildCast[Timestamp](_, t => DateUtils.millisToDays(t.getTime))
241238
// Hive throws this exception as a Semantic Exception
242-
// It is never possible to compare result when hive return with exception, so we can return null
239+
// It is never possible to compare result when hive return with exception,
240+
// so we can return null
243241
// NULL is more reasonable here, since the query itself obeys the grammar.
244242
case _ => _ => null
245243
}
246244

247-
// Date cannot be cast to long, according to hive
248-
private[this] def dateToLong(d: Date) = null
249-
250-
// Date cannot be cast to double, according to hive
251-
private[this] def dateToDouble(d: Date) = null
252-
253-
// Converts Date to string according to Hive DateWritable convention
254-
private[this] def dateToString(d: Date): String = {
255-
Cast.threadLocalDateFormat.get.format(d)
256-
}
257-
258245
// LongConverter
259246
private[this] def castToLong(from: DataType): Any => Any = from match {
260247
case StringType =>
@@ -264,7 +251,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
264251
case BooleanType =>
265252
buildCast[Boolean](_, b => if (b) 1L else 0L)
266253
case DateType =>
267-
buildCast[Date](_, d => dateToLong(d))
254+
buildCast[Int](_, d => null)
268255
case TimestampType =>
269256
buildCast[Timestamp](_, t => timestampToLong(t))
270257
case x: NumericType =>
@@ -280,7 +267,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
280267
case BooleanType =>
281268
buildCast[Boolean](_, b => if (b) 1 else 0)
282269
case DateType =>
283-
buildCast[Date](_, d => dateToLong(d))
270+
buildCast[Int](_, d => null)
284271
case TimestampType =>
285272
buildCast[Timestamp](_, t => timestampToLong(t).toInt)
286273
case x: NumericType =>
@@ -296,7 +283,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
296283
case BooleanType =>
297284
buildCast[Boolean](_, b => if (b) 1.toShort else 0.toShort)
298285
case DateType =>
299-
buildCast[Date](_, d => dateToLong(d))
286+
buildCast[Int](_, d => null)
300287
case TimestampType =>
301288
buildCast[Timestamp](_, t => timestampToLong(t).toShort)
302289
case x: NumericType =>
@@ -312,7 +299,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
312299
case BooleanType =>
313300
buildCast[Boolean](_, b => if (b) 1.toByte else 0.toByte)
314301
case DateType =>
315-
buildCast[Date](_, d => dateToLong(d))
302+
buildCast[Int](_, d => null)
316303
case TimestampType =>
317304
buildCast[Timestamp](_, t => timestampToLong(t).toByte)
318305
case x: NumericType =>
@@ -342,7 +329,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
342329
case BooleanType =>
343330
buildCast[Boolean](_, b => changePrecision(if (b) Decimal(1) else Decimal(0), target))
344331
case DateType =>
345-
buildCast[Date](_, d => null) // date can't cast to decimal in Hive
332+
buildCast[Int](_, d => null) // date can't cast to decimal in Hive
346333
case TimestampType =>
347334
// Note that we lose precision here.
348335
buildCast[Timestamp](_, t => changePrecision(Decimal(timestampToDouble(t)), target))
@@ -367,7 +354,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
367354
case BooleanType =>
368355
buildCast[Boolean](_, b => if (b) 1d else 0d)
369356
case DateType =>
370-
buildCast[Date](_, d => dateToDouble(d))
357+
buildCast[Int](_, d => null)
371358
case TimestampType =>
372359
buildCast[Timestamp](_, t => timestampToDouble(t))
373360
case x: NumericType =>
@@ -383,7 +370,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
383370
case BooleanType =>
384371
buildCast[Boolean](_, b => if (b) 1f else 0f)
385372
case DateType =>
386-
buildCast[Date](_, d => dateToDouble(d))
373+
buildCast[Int](_, d => null)
387374
case TimestampType =>
388375
buildCast[Timestamp](_, t => timestampToDouble(t).toFloat)
389376
case x: NumericType =>
@@ -442,16 +429,16 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
442429

443430
object Cast {
444431
// `SimpleDateFormat` is not thread-safe.
445-
private[sql] val threadLocalDateFormat = new ThreadLocal[DateFormat] {
432+
private[sql] val threadLocalTimestampFormat = new ThreadLocal[DateFormat] {
446433
override def initialValue() = {
447-
new SimpleDateFormat("yyyy-MM-dd")
434+
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
448435
}
449436
}
450437

451438
// `SimpleDateFormat` is not thread-safe.
452-
private[sql] val threadLocalTimestampFormat = new ThreadLocal[DateFormat] {
439+
private[sql] val threadLocalDateFormat = new ThreadLocal[DateFormat] {
453440
override def initialValue() = {
454-
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
441+
new SimpleDateFormat("yyyy-MM-dd")
455442
}
456443
}
457444
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,9 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
246246
new String(${eval.primitiveTerm}.asInstanceOf[Array[Byte]])
247247
""".children
248248

249+
case Cast(child @ DateType(), StringType) =>
250+
child.castOrNull(c => q"org.apache.spark.sql.types.DateUtils.toString($c)", StringType)
251+
249252
case Cast(child @ NumericType(), IntegerType) =>
250253
child.castOrNull(c => q"$c.toInt", IntegerType)
251254

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ object Literal {
3535
case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
3636
case d: Decimal => Literal(d, DecimalType.Unlimited)
3737
case t: Timestamp => Literal(t, TimestampType)
38-
case d: Date => Literal(d, DateType)
38+
case d: Date => Literal(DateUtils.fromJavaDate(d), DateType)
3939
case a: Array[Byte] => Literal(a, BinaryType)
4040
case null => Literal(null, NullType)
4141
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.types
19+
20+
import java.sql.Date
21+
import java.util.{Calendar, TimeZone}
22+
23+
import org.apache.spark.sql.catalyst.expressions.Cast
24+
25+
/**
26+
* helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
27+
*/
28+
object DateUtils {
29+
private val MILLIS_PER_DAY = 86400000
30+
31+
// Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
32+
private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
33+
override protected def initialValue: TimeZone = {
34+
Calendar.getInstance.getTimeZone
35+
}
36+
}
37+
38+
private def javaDateToDays(d: Date): Int = {
39+
millisToDays(d.getTime)
40+
}
41+
42+
def millisToDays(millisLocal: Long): Int = {
43+
((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / MILLIS_PER_DAY).toInt
44+
}
45+
46+
private def toMillisSinceEpoch(days: Int): Long = {
47+
val millisUtc = days.toLong * MILLIS_PER_DAY
48+
millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc)
49+
}
50+
51+
def fromJavaDate(date: java.sql.Date): Int = {
52+
javaDateToDays(date)
53+
}
54+
55+
def toJavaDate(daysSinceEpoch: Int): java.sql.Date = {
56+
new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch))
57+
}
58+
59+
def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days))
60+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.types
1919

20-
import java.sql.{Date, Timestamp}
20+
import java.sql.Timestamp
2121

2222
import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral}
2323
import scala.reflect.ClassTag
@@ -387,18 +387,16 @@ case object TimestampType extends NativeType {
387387
*/
388388
@DeveloperApi
389389
case object DateType extends NativeType {
390-
private[sql] type JvmType = Date
390+
private[sql] type JvmType = Int
391391

392392
@transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
393393

394-
private[sql] val ordering = new Ordering[JvmType] {
395-
def compare(x: Date, y: Date) = x.compareTo(y)
396-
}
394+
private[sql] val ordering = implicitly[Ordering[JvmType]]
397395

398396
/**
399-
* The default size of a value of the DateType is 8 bytes.
397+
* The default size of a value of the DateType is 4 bytes.
400398
*/
401-
override def defaultSize: Int = 8
399+
override def defaultSize: Int = 4
402400
}
403401

404402

0 commit comments

Comments
 (0)