Skip to content

Conversation

@ueshin
Copy link
Member

@ueshin ueshin commented Dec 16, 2016

What changes were proposed in this pull request?

As of Spark 2.1, Spark SQL assumes the machine timezone for datetime manipulation, which is bad if users are not in the same timezones as the machines, or if different users have different timezones.

We should introduce a session local timezone setting that is used for execution.

An explicit non-goal is locale handling.

Semantics

Setting the session local timezone means that the timezone-aware expressions listed below should use the timezone to evaluate values, and also it should be used to convert (cast) between string and timestamp or between timestamp and date.

  • CurrentDate
  • CurrentBatchTimestamp
  • Hour
  • Minute
  • Second
  • DateFormatClass
  • ToUnixTimestamp
  • UnixTimestamp
  • FromUnixTime

and below are implicitly timezone-aware through cast from timestamp to date:

  • DayOfYear
  • Year
  • Quarter
  • Month
  • DayOfMonth
  • WeekOfYear
  • LastDay
  • NextDay
  • TruncDate

For example, if you have timestamp "2016-01-01 00:00:00" in GMT, the values evaluated by some of timezone-aware expressions are:

scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df.selectExpr("cast(ts as string)", "year(ts)", "month(ts)", "dayofmonth(ts)", "hour(ts)", "minute(ts)", "second(ts)").show(truncate = false)
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|ts                 |year(CAST(ts AS DATE))|month(CAST(ts AS DATE))|dayofmonth(CAST(ts AS DATE))|hour(ts)|minute(ts)|second(ts)|
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|2016-01-01 00:00:00|2016                  |1                      |1                           |0       |0         |0         |
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+

whereas setting the session local timezone to "PST", they are:

scala> spark.conf.set("spark.sql.session.timeZone", "PST")

scala> df.selectExpr("cast(ts as string)", "year(ts)", "month(ts)", "dayofmonth(ts)", "hour(ts)", "minute(ts)", "second(ts)").show(truncate = false)
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|ts                 |year(CAST(ts AS DATE))|month(CAST(ts AS DATE))|dayofmonth(CAST(ts AS DATE))|hour(ts)|minute(ts)|second(ts)|
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|2015-12-31 16:00:00|2015                  |12                     |31                          |16      |0         |0         |
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+

Notice that even if you set the session local timezone, it affects only in DataFrame operations, neither in Dataset operations, RDD operations nor in ScalaUDFs. You need to properly handle timezone by yourself.

Design of the fix

I introduced an analyzer to pass session local timezone to timezone-aware expressions and modified DateTimeUtils to take the timezone argument.

How was this patch tested?

Existing tests and added tests for timezone aware expressions.

@ueshin
Copy link
Member Author

ueshin commented Dec 16, 2016

I'd like to discuss the boundary of session local timezone.

I assumed it affects only in DataFrame operations, neither in Dataset operations, before import from RDD, after export to RDD nor in ScalaUDFs because we can't control their timezones, which are basically based on system timezone, i.e. TimeZone.getDefault() value.
(And also for backward-compatibility)

What do you think?

@rxin
Copy link
Contributor

rxin commented Dec 16, 2016

Can you document the semantics of time zones and when they are used?

@SparkQA
Copy link

SparkQA commented Dec 16, 2016

Test build #70242 has finished for PR 16308 at commit e5bb246.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 16, 2016

Test build #70244 has finished for PR 16308 at commit 32cc391.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 16, 2016

Test build #70254 has finished for PR 16308 at commit f434378.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

override lazy val resolved: Boolean =
childrenResolved && checkInputDataTypes().isSuccess && timeZoneResolved

override def withTimeZone(zoneId: String): TimeZoneAwareExpression = copy(zoneId = zoneId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just a copy ctor isn't it? Maybe no need to add this? Not a big deal though.

copy(zoneId = zoneId)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a copy ctor, but the analyzer ResolveTimeZone can't call the copy ctor because it doesn't know the actual expression class.

* Common base class for time zone aware expressions.
*/
trait TimeZoneAwareExpression extends Expression {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the reason you are using null rather than option to avoid a bunch of gets?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I wanted to avoid a bunch of gets.

case _ => false
}

def needTimeZone(from: DataType, to: DataType): Boolean = (from, to) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's important to document this ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I'll add a document.

10
""")
case class Cast(child: Expression, dataType: DataType) extends UnaryExpression with NullIntolerant {
case class Cast(child: Expression, dataType: DataType, zoneId: String = null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not 100% sure whether this is a good idea, but should we consider adding a Cast.unapply that does not match on zoneId?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we should add classdoc to explain what zoneId is. I'd probably call it timeZoneId.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe an extra unapply is probably a bad idea, since then we can miss a pattern match.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that an extra unapply is a bad idea. I'll leave it as it is for now.

@rxin
Copy link
Contributor

rxin commented Dec 17, 2016

I'd rename all the zoneId to timeZoneId to reduce confusion ..

case e if !e.resolved => u
case g: Generator => MultiAlias(g, Nil)
case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)()
case c @ Cast(ne: NamedExpression, _, _) => Alias(c, ne.name)()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we add a Cast.unapply that returns only the first two arguments, we can reduce a lot of the cast match changes. Not sure if it is worth it though.

InternalRow.fromSeq(partitionSchema.map { field =>
Cast(Literal(spec(field.name)), field.dataType).eval()
Cast(Literal(spec(field.name)), field.dataType,
DateTimeUtils.defaultTimeZone().getID).eval()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this change the behavior on how we interpret partition values when timezone settings change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the behavior doesn't change by timezone setting, i.e. using system timezone.

This is a part that I was not sure which we should handle the partition values, use timezone settings or system timezone.
Should we use timezone settings?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, now I think we should use timezone settings for partition values, because the values are also parts of data so they should be affected by the settings.

case CurrentDate(tz) =>
currentDates.getOrElseUpdate(tz, {
val dateExpr = CurrentDate(tz)
Literal.create(dateExpr.eval(EmptyRow), dateExpr.dataType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can technically return different absolute time values for dates, can't this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I'll modify this.

def timeZoneResolved: Boolean = zoneId != null

def withTimeZone(zoneId: String): TimeZoneAwareExpression

Copy link
Contributor

@rxin rxin Dec 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a lazy val? otherwise it is pretty expensive to keep creating a new timezone object (or doing lookup) per row in the interpreted path.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I'll use lazy val.

Copy link
Member Author

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin Thank you for your review.
I'll address your comments soon.

override lazy val resolved: Boolean =
childrenResolved && checkInputDataTypes().isSuccess && timeZoneResolved

override def withTimeZone(zoneId: String): TimeZoneAwareExpression = copy(zoneId = zoneId)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a copy ctor, but the analyzer ResolveTimeZone can't call the copy ctor because it doesn't know the actual expression class.

case _ => false
}

def needTimeZone(from: DataType, to: DataType): Boolean = (from, to) match {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I'll add a document.

10
""")
case class Cast(child: Expression, dataType: DataType) extends UnaryExpression with NullIntolerant {
case class Cast(child: Expression, dataType: DataType, zoneId: String = null)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that an extra unapply is a bad idea. I'll leave it as it is for now.

case CurrentDate(tz) =>
currentDates.getOrElseUpdate(tz, {
val dateExpr = CurrentDate(tz)
Literal.create(dateExpr.eval(EmptyRow), dateExpr.dataType)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I'll modify this.

* Common base class for time zone aware expressions.
*/
trait TimeZoneAwareExpression extends Expression {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I wanted to avoid a bunch of gets.

def timeZoneResolved: Boolean = zoneId != null

def withTimeZone(zoneId: String): TimeZoneAwareExpression

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I'll use lazy val.

@SparkQA
Copy link

SparkQA commented Jan 23, 2017

Test build #71837 has finished for PR 16308 at commit 328399a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// "2016-01-01 08:00:00"
checkAnswer(
df.select("t").filter($"t" <= "2016-01-01 00:00:00"),
Row(Timestamp.valueOf("2015-12-31 16:00:00")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shows that it will be very confusing if the session local timezone is different from JVM default timezone in driver...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, let me modify it.

// | | df | timestamp | date_format |
// +---+---------------------+-------------+---------------------+
// | a | 16533|1428476400000|"2015-04-08 00:00:00"|
// | b |"2015-04-08 13:10:15"|1428523815000|"2015-04-08 13:10:15"|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused, the d is already a Date, how can we have the time info back after converting the date to string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean you are wondering why sdf.format(d) has the time info 13:10:15 ?

If so, java.sql.Date DOES have the time info if it was initialized with the constructor Date(long date) or even if it was initalized with the constructor Date(int year, int month, int day) or with Date.valueOf(String s), it has the time info 00:00:00 of the day in the timezone TimeZone.getDefault().

scala> TimeZone.setDefault(TimeZone.getTimeZone("GMT"))

scala> val gmtDate = Date.valueOf("2017-01-24")
gmtDate: java.sql.Date = 2017-01-24

scala> val gmtTime = gmtDate.getTime
gmtTime: Long = 1485216000000

scala> TimeZone.setDefault(TimeZone.getTimeZone("PST"))

scala> val pstDate = Date.valueOf("2017-01-24")
pstDate: java.sql.Date = 2017-01-24

scala> val pstTime = pstDate.getTime
pstTime: Long = 1485244800000

scala> val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
sdf: java.text.SimpleDateFormat = java.text.SimpleDateFormat@4f76f1a0

scala> sdf.setTimeZone(TimeZone.getTimeZone("GMT"))

scala> sdf.format(gmtTime)
res12: String = 2017-01-24 00:00:00

scala> sdf.format(pstTime)
res13: String = 2017-01-24 08:00:00

scala> val d = new Date(sdf.parse("2015-04-08 13:10:15").getTime)
d: java.sql.Date = 2015-04-08

scala> sdf.format(d)
res14: String = 2015-04-08 13:10:15

Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
}

test("to_unix_timestamp with session local timezone") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These newly added tests are so similar that they all try to prove one thing: when you convert string or date to timestamp, the result changes according to session local timezone. When you convert timestamp to string or date, the result also changes with session local timezone. All time-related expressions should respect this.

shall we just write a general test for this? then we don't need so many similar tests

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that there are so many similar tests, but I have no idea to generalize them.
Would you please give me some code snippets? I'll be able to expand them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to add tests in this file at all. We should improve DateTimeUtilsSuite to make sure the newly added methods work well with different timezones, e.g. getHours, daysToMillions, etc. Then make sure these timezone aware expressions will call the newly added methods in DateTimeUtils which has timezone parameter(we can remove the old versions that don't take timezone parameter, after we finish handling partition values).

This suite is end-to-end test, and it's very annoying if we wanna test all changed expressions, we should write more low-level tests in DateTimeUtilsSuite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is, except this suite, all the changes you made to the tests are just fixed existing tests to fit the timezone stuff. You add all the new tests in this suite as end-to-end tests, which is not good. We should add new tests in DateTimeUtilsSuite as unit tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see! I'll move tests to DateTimeUtilsSuite soon. Thanks a lot!

@SparkQA
Copy link

SparkQA commented Jan 23, 2017

Test build #71854 has finished for PR 16308 at commit 7352612.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 25, 2017

Test build #71983 has finished for PR 16308 at commit b99cf79.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The patch is in good shape. I left a few small comments, let's try to get this in ASAP.

private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
private lazy val formatter: SimpleDateFormat =
Try(new SimpleDateFormat(constFormat.toString, Locale.US)).getOrElse(null)
Try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit just use try catch...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that this pattern is used quite often. Should we put it in a method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I'll replace Trys with try-catch and add a method to create SimpleDateFormat with a format string and a timezone to DateTimeUtils.

Divide(newAggExpr, Literal.create(math.pow(10.0, scale), DoubleType)),
DecimalType(prec + 4, scale + 4))
DecimalType(prec + 4, scale + 4),
Option(conf.sessionLocalTimeZone))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: why a new line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove it.

Divide(newAggExpr, Literal.create(math.pow(10.0, scale), DoubleType)),
DecimalType(prec + 4, scale + 4))
DecimalType(prec + 4, scale + 4),
Option(conf.sessionLocalTimeZone))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: why a new line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove it.

checkEvaluation(Cast(Literal("20150318"), TimestampType), null)
checkEvaluation(Cast(Literal("2015-031-8"), TimestampType), null)
checkEvaluation(Cast(Literal("2015-03-18T12:03:17-0:70"), TimestampType), null)
for (tz <- ALL_TIMEZONES) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try to parameterize this a little bit more. I know you didn't write it, but it is quite hard to get through.

val hasMoreData = takeResult.length > numRows
val data = takeResult.take(numRows)

lazy val timeZone = TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if a user changes the session timezone? What would be the preferred behavior? Currently show() generates the same result every time, but that might be unexpected.

c.set(Calendar.MILLISECOND, 123)
assert(stringToTimestamp(
UTF8String.fromString("2015-03-18T12:03:17.123121+7:30")).get ===
for (tz <- DateTimeTestUtils.ALL_TIMEZONES) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also try to parameterize these tests?

@SparkQA
Copy link

SparkQA commented Jan 26, 2017

Test build #72012 has started for PR 16308 at commit 6fa1d6a.

@ueshin
Copy link
Member Author

ueshin commented Jan 26, 2017

Jenkins, retest this please.

@hvanhovell
Copy link
Contributor

LGTM - pending jenkins.

@SparkQA
Copy link

SparkQA commented Jan 26, 2017

Test build #72019 has finished for PR 16308 at commit 6fa1d6a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

Merging to master. Thanks for the hard work, and the patience with the review process.

Can you open follow-up PRs for any remaining issues?

@asfgit asfgit closed this in 2969fb4 Jan 26, 2017
@ueshin
Copy link
Member Author

ueshin commented Jan 26, 2017

Sure! I'll send follow-up prs as soon as possible.
Thanks a lot for your review!

uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

As of Spark 2.1, Spark SQL assumes the machine timezone for datetime manipulation, which is bad if users are not in the same timezones as the machines, or if different users have different timezones.

We should introduce a session local timezone setting that is used for execution.

An explicit non-goal is locale handling.

### Semantics

Setting the session local timezone means that the timezone-aware expressions listed below should use the timezone to evaluate values, and also it should be used to convert (cast) between string and timestamp or between timestamp and date.

- `CurrentDate`
- `CurrentBatchTimestamp`
- `Hour`
- `Minute`
- `Second`
- `DateFormatClass`
- `ToUnixTimestamp`
- `UnixTimestamp`
- `FromUnixTime`

and below are implicitly timezone-aware through cast from timestamp to date:

- `DayOfYear`
- `Year`
- `Quarter`
- `Month`
- `DayOfMonth`
- `WeekOfYear`
- `LastDay`
- `NextDay`
- `TruncDate`

For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values evaluated by some of timezone-aware expressions are:

```scala
scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df.selectExpr("cast(ts as string)", "year(ts)", "month(ts)", "dayofmonth(ts)", "hour(ts)", "minute(ts)", "second(ts)").show(truncate = false)
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|ts                 |year(CAST(ts AS DATE))|month(CAST(ts AS DATE))|dayofmonth(CAST(ts AS DATE))|hour(ts)|minute(ts)|second(ts)|
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|2016-01-01 00:00:00|2016                  |1                      |1                           |0       |0         |0         |
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
```

whereas setting the session local timezone to `"PST"`, they are:

```scala
scala> spark.conf.set("spark.sql.session.timeZone", "PST")

scala> df.selectExpr("cast(ts as string)", "year(ts)", "month(ts)", "dayofmonth(ts)", "hour(ts)", "minute(ts)", "second(ts)").show(truncate = false)
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|ts                 |year(CAST(ts AS DATE))|month(CAST(ts AS DATE))|dayofmonth(CAST(ts AS DATE))|hour(ts)|minute(ts)|second(ts)|
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|2015-12-31 16:00:00|2015                  |12                     |31                          |16      |0         |0         |
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
```

Notice that even if you set the session local timezone, it affects only in `DataFrame` operations, neither in `Dataset` operations, `RDD` operations nor in `ScalaUDF`s. You need to properly handle timezone by yourself.

### Design of the fix

I introduced an analyzer to pass session local timezone to timezone-aware expressions and modified DateTimeUtils to take the timezone argument.

## How was this patch tested?

Existing tests and added tests for timezone aware expressions.

Author: Takuya UESHIN <[email protected]>

Closes apache#16308 from ueshin/issues/SPARK-18350.
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
## What changes were proposed in this pull request?

As of Spark 2.1, Spark SQL assumes the machine timezone for datetime manipulation, which is bad if users are not in the same timezones as the machines, or if different users have different timezones.

We should introduce a session local timezone setting that is used for execution.

An explicit non-goal is locale handling.

### Semantics

Setting the session local timezone means that the timezone-aware expressions listed below should use the timezone to evaluate values, and also it should be used to convert (cast) between string and timestamp or between timestamp and date.

- `CurrentDate`
- `CurrentBatchTimestamp`
- `Hour`
- `Minute`
- `Second`
- `DateFormatClass`
- `ToUnixTimestamp`
- `UnixTimestamp`
- `FromUnixTime`

and below are implicitly timezone-aware through cast from timestamp to date:

- `DayOfYear`
- `Year`
- `Quarter`
- `Month`
- `DayOfMonth`
- `WeekOfYear`
- `LastDay`
- `NextDay`
- `TruncDate`

For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values evaluated by some of timezone-aware expressions are:

```scala
scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df.selectExpr("cast(ts as string)", "year(ts)", "month(ts)", "dayofmonth(ts)", "hour(ts)", "minute(ts)", "second(ts)").show(truncate = false)
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|ts                 |year(CAST(ts AS DATE))|month(CAST(ts AS DATE))|dayofmonth(CAST(ts AS DATE))|hour(ts)|minute(ts)|second(ts)|
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|2016-01-01 00:00:00|2016                  |1                      |1                           |0       |0         |0         |
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
```

whereas setting the session local timezone to `"PST"`, they are:

```scala
scala> spark.conf.set("spark.sql.session.timeZone", "PST")

scala> df.selectExpr("cast(ts as string)", "year(ts)", "month(ts)", "dayofmonth(ts)", "hour(ts)", "minute(ts)", "second(ts)").show(truncate = false)
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|ts                 |year(CAST(ts AS DATE))|month(CAST(ts AS DATE))|dayofmonth(CAST(ts AS DATE))|hour(ts)|minute(ts)|second(ts)|
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|2015-12-31 16:00:00|2015                  |12                     |31                          |16      |0         |0         |
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
```

Notice that even if you set the session local timezone, it affects only in `DataFrame` operations, neither in `Dataset` operations, `RDD` operations nor in `ScalaUDF`s. You need to properly handle timezone by yourself.

### Design of the fix

I introduced an analyzer to pass session local timezone to timezone-aware expressions and modified DateTimeUtils to take the timezone argument.

## How was this patch tested?

Existing tests and added tests for timezone aware expressions.

Author: Takuya UESHIN <[email protected]>

Closes apache#16308 from ueshin/issues/SPARK-18350.
asfgit pushed a commit that referenced this pull request Feb 15, 2017
## What changes were proposed in this pull request?

This is a follow-up pr of #16308.

This pr enables timezone support in CSV/JSON parsing.

We should introduce `timeZone` option for CSV/JSON datasources (the default value of the option is session local timezone).

The datasources should use the `timeZone` option to format/parse to write/read timestamp values.
Notice that while reading, if the timestampFormat has the timezone info, the timezone will not be used because we should respect the timezone in the values.

For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values written with the default timezone option, which is `"GMT"` because session local timezone is `"GMT"` here, are:

```scala
scala> spark.conf.set("spark.sql.session.timeZone", "GMT")

scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df.show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+

scala> df.write.json("/path/to/gmtjson")
```

```sh
$ cat /path/to/gmtjson/part-*
{"ts":"2016-01-01T00:00:00.000Z"}
```

whereas setting the option to `"PST"`, they are:

```scala
scala> df.write.option("timeZone", "PST").json("/path/to/pstjson")
```

```sh
$ cat /path/to/pstjson/part-*
{"ts":"2015-12-31T16:00:00.000-08:00"}
```

We can properly read these files even if the timezone option is wrong because the timestamp values have timezone info:

```scala
scala> val schema = new StructType().add("ts", TimestampType)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true))

scala> spark.read.schema(schema).json("/path/to/gmtjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+

scala> spark.read.schema(schema).option("timeZone", "PST").json("/path/to/gmtjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+
```

And even if `timezoneFormat` doesn't contain timezone info, we can properly read the values with setting correct timezone option:

```scala
scala> df.write.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson")
```

```sh
$ cat /path/to/jstjson/part-*
{"ts":"2016-01-01T09:00:00"}
```

```scala
// wrong result
scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").json("/path/to/jstjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 09:00:00|
+-------------------+

// correct result
scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+
```

This pr also makes `JsonToStruct` and `StructToJson` `TimeZoneAwareExpression` to be able to evaluate values with timezone option.

## How was this patch tested?

Existing tests and added some tests.

Author: Takuya UESHIN <[email protected]>

Closes #16750 from ueshin/issues/SPARK-18937.
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 16, 2017
## What changes were proposed in this pull request?

This is a follow-up pr of apache#16308.

This pr enables timezone support in CSV/JSON parsing.

We should introduce `timeZone` option for CSV/JSON datasources (the default value of the option is session local timezone).

The datasources should use the `timeZone` option to format/parse to write/read timestamp values.
Notice that while reading, if the timestampFormat has the timezone info, the timezone will not be used because we should respect the timezone in the values.

For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values written with the default timezone option, which is `"GMT"` because session local timezone is `"GMT"` here, are:

```scala
scala> spark.conf.set("spark.sql.session.timeZone", "GMT")

scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df.show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+

scala> df.write.json("/path/to/gmtjson")
```

```sh
$ cat /path/to/gmtjson/part-*
{"ts":"2016-01-01T00:00:00.000Z"}
```

whereas setting the option to `"PST"`, they are:

```scala
scala> df.write.option("timeZone", "PST").json("/path/to/pstjson")
```

```sh
$ cat /path/to/pstjson/part-*
{"ts":"2015-12-31T16:00:00.000-08:00"}
```

We can properly read these files even if the timezone option is wrong because the timestamp values have timezone info:

```scala
scala> val schema = new StructType().add("ts", TimestampType)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true))

scala> spark.read.schema(schema).json("/path/to/gmtjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+

scala> spark.read.schema(schema).option("timeZone", "PST").json("/path/to/gmtjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+
```

And even if `timezoneFormat` doesn't contain timezone info, we can properly read the values with setting correct timezone option:

```scala
scala> df.write.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson")
```

```sh
$ cat /path/to/jstjson/part-*
{"ts":"2016-01-01T09:00:00"}
```

```scala
// wrong result
scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").json("/path/to/jstjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 09:00:00|
+-------------------+

// correct result
scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+
```

This pr also makes `JsonToStruct` and `StructToJson` `TimeZoneAwareExpression` to be able to evaluate values with timezone option.

## How was this patch tested?

Existing tests and added some tests.

Author: Takuya UESHIN <[email protected]>

Closes apache#16750 from ueshin/issues/SPARK-18937.
ghost pushed a commit to dbtsai/spark that referenced this pull request Mar 4, 2017
## What changes were proposed in this pull request?

This is a follow-up pr of apache#16308 and apache#16750.

This pr enables timezone support in partition values.

We should use `timeZone` option introduced at apache#16750 to parse/format partition values of the `TimestampType`.

For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT` which will be used for partition values, the values written by the default timezone option, which is `"GMT"` because the session local timezone is `"GMT"` here, are:

```scala
scala> spark.conf.set("spark.sql.session.timeZone", "GMT")

scala> val df = Seq((1, new java.sql.Timestamp(1451606400000L))).toDF("i", "ts")
df: org.apache.spark.sql.DataFrame = [i: int, ts: timestamp]

scala> df.show()
+---+-------------------+
|  i|                 ts|
+---+-------------------+
|  1|2016-01-01 00:00:00|
+---+-------------------+

scala> df.write.partitionBy("ts").save("/path/to/gmtpartition")
```

```sh
$ ls /path/to/gmtpartition/
_SUCCESS			ts=2016-01-01 00%3A00%3A00
```

whereas setting the option to `"PST"`, they are:

```scala
scala> df.write.option("timeZone", "PST").partitionBy("ts").save("/path/to/pstpartition")
```

```sh
$ ls /path/to/pstpartition/
_SUCCESS			ts=2015-12-31 16%3A00%3A00
```

We can properly read the partition values if the session local timezone and the timezone of the partition values are the same:

```scala
scala> spark.read.load("/path/to/gmtpartition").show()
+---+-------------------+
|  i|                 ts|
+---+-------------------+
|  1|2016-01-01 00:00:00|
+---+-------------------+
```

And even if the timezones are different, we can properly read the values with setting corrent timezone option:

```scala
// wrong result
scala> spark.read.load("/path/to/pstpartition").show()
+---+-------------------+
|  i|                 ts|
+---+-------------------+
|  1|2015-12-31 16:00:00|
+---+-------------------+

// correct result
scala> spark.read.option("timeZone", "PST").load("/path/to/pstpartition").show()
+---+-------------------+
|  i|                 ts|
+---+-------------------+
|  1|2016-01-01 00:00:00|
+---+-------------------+
```

## How was this patch tested?

Existing tests and added some tests.

Author: Takuya UESHIN <[email protected]>

Closes apache#17053 from ueshin/issues/SPARK-18939.
@grvishwanath
Copy link

@ueshin Can you please explain to me if this functionality is the same as blessing Spark with "Timestamp with Time Zone"? If now, how is it different?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants