Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ trait TimeZoneAwareExpression extends Expression {
/** Returns a copy of this expression with the specified timeZoneId. */
def withTimeZone(timeZoneId: String): TimeZoneAwareExpression

@transient lazy val timeZone: TimeZone = TimeZone.getTimeZone(timeZoneId.get)
@transient lazy val timeZone: TimeZone = DateTimeUtils.getTimeZone(timeZoneId.get)
}

/**
Expand Down Expand Up @@ -416,7 +416,7 @@ case class WeekOfYear(child: Expression) extends UnaryExpression with ImplicitCa
override def dataType: DataType = IntegerType

@transient private lazy val c = {
val c = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
val c = Calendar.getInstance(DateTimeUtils.getTimeZone("UTC"))
c.setFirstDayOfWeek(Calendar.MONDAY)
c.setMinimalDaysInFirstWeek(4)
c
Expand All @@ -431,9 +431,10 @@ case class WeekOfYear(child: Expression) extends UnaryExpression with ImplicitCa
nullSafeCodeGen(ctx, ev, time => {
val cal = classOf[Calendar].getName
val c = ctx.freshName("cal")
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
ctx.addMutableState(cal, c,
s"""
$c = $cal.getInstance(java.util.TimeZone.getTimeZone("UTC"));
$c = $cal.getInstance($dtu.getTimeZone("UTC"));
$c.setFirstDayOfWeek($cal.MONDAY);
$c.setMinimalDaysInFirstWeek(4);
""")
Expand Down Expand Up @@ -954,8 +955,9 @@ case class FromUTCTimestamp(left: Expression, right: Expression)
val tzTerm = ctx.freshName("tz")
val utcTerm = ctx.freshName("utc")
val tzClass = classOf[TimeZone].getName
ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""")
ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $tzClass.getTimeZone("UTC");""")
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it more efficient to save this value in a static (object) member somewhere or will it not matter much? I see it's used several times.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it will matter much for now because this will be processed only once per generating code.
What do you think?

ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $dtu.getTimeZone("$tz");""")
ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $dtu.getTimeZone("UTC");""")
val eval = left.genCode(ctx)
ev.copy(code = s"""
|${eval.code}
Expand Down Expand Up @@ -1125,8 +1127,9 @@ case class ToUTCTimestamp(left: Expression, right: Expression)
val tzTerm = ctx.freshName("tz")
val utcTerm = ctx.freshName("utc")
val tzClass = classOf[TimeZone].getName
ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""")
ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $tzClass.getTimeZone("UTC");""")
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $dtu.getTimeZone("$tz");""")
ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $dtu.getTimeZone("UTC");""")
val eval = left.genCode(ctx)
ev.copy(code = s"""
|${eval.code}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private[sql] class JSONOptions(
val columnNameOfCorruptRecord =
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)

val timeZone: TimeZone = TimeZone.getTimeZone(
val timeZone: TimeZone = DateTimeUtils.getTimeZone(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))

// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.catalyst.optimizer

import java.util.TimeZone

import scala.collection.mutable

import org.apache.spark.sql.catalyst.catalog.SessionCatalog
Expand Down Expand Up @@ -55,7 +53,7 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
case CurrentDate(Some(timeZoneId)) =>
currentDates.getOrElseUpdate(timeZoneId, {
Literal.create(
DateTimeUtils.millisToDays(timestamp / 1000L, TimeZone.getTimeZone(timeZoneId)),
DateTimeUtils.millisToDays(timestamp / 1000L, DateTimeUtils.getTimeZone(timeZoneId)),
DateType)
})
case CurrentTimestamp() => currentTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.util
import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}
import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{Function => JFunction}
import javax.xml.bind.DatatypeConverter

import scala.annotation.tailrec
Expand Down Expand Up @@ -98,6 +100,15 @@ object DateTimeUtils {
sdf
}

private val computedTimeZones = new ConcurrentHashMap[String, TimeZone]
private val computeTimeZone = new JFunction[String, TimeZone] {
override def apply(timeZoneId: String): TimeZone = TimeZone.getTimeZone(timeZoneId)
}

def getTimeZone(timeZoneId: String): TimeZone = {
computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
Copy link
Member

@viirya viirya May 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Java 7 support completely removed now? Seems computeIfAbsent is only supported in Java 8.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe Java 7 support was removed as of Spark 2.2.0.

}

def newDateFormat(formatString: String, timeZone: TimeZone): DateFormat = {
val sdf = new SimpleDateFormat(formatString, Locale.US)
sdf.setTimeZone(timeZone)
Expand Down Expand Up @@ -407,7 +418,7 @@ object DateTimeUtils {
Calendar.getInstance(timeZone)
} else {
Calendar.getInstance(
TimeZone.getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d"))
getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d"))
}
c.set(Calendar.MILLISECOND, 0)

Expand Down Expand Up @@ -1027,15 +1038,15 @@ object DateTimeUtils {
* representation in their timezone.
*/
def fromUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
convertTz(time, TimeZoneGMT, TimeZone.getTimeZone(timeZone))
convertTz(time, TimeZoneGMT, getTimeZone(timeZone))
}

/**
* Returns a utc timestamp from a given timestamp from a given timezone, with the same
* string representation in their timezone.
*/
def toUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
convertTz(time, TimeZone.getTimeZone(timeZone), TimeZoneGMT)
convertTz(time, getTimeZone(timeZone), TimeZoneGMT)
}

/**
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql

import java.io.CharArrayWriter
import java.sql.{Date, Timestamp}
import java.util.TimeZone

import scala.collection.JavaConverters._
import scala.language.implicitConversions
Expand Down Expand Up @@ -249,7 +248,8 @@ class Dataset[T] private[sql](
val hasMoreData = takeResult.length > numRows
val data = takeResult.take(numRows)

lazy val timeZone = TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)
lazy val timeZone =
DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)

// For array values, replace Seq and Array with square brackets
// For cells that are beyond `truncate` characters, replace it with the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution

import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.util.TimeZone

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
Expand Down Expand Up @@ -187,7 +186,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))
case (t: Timestamp, TimestampType) =>
DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t),
TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))
DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))
case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)
case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal)
case (other, tpe) if primitiveTypes.contains(tpe) => other.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ object PartitioningUtils {
typeInference: Boolean,
basePaths: Set[Path],
timeZoneId: String): PartitionSpec = {
parsePartitions(paths, typeInference, basePaths, TimeZone.getTimeZone(timeZoneId))
parsePartitions(paths, typeInference, basePaths, DateTimeUtils.getTimeZone(timeZoneId))
}

private[datasources] def parsePartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class CSVOptions(
name.map(CompressionCodecs.getCodecClassName)
}

val timeZone: TimeZone = TimeZone.getTimeZone(
val timeZone: TimeZone = DateTimeUtils.getTimeZone(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))

// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
package org.apache.spark.sql.execution.streaming

import java.text.SimpleDateFormat
import java.util.{Date, TimeZone, UUID}
import java.util.{Date, UUID}

import scala.collection.mutable
import scala.collection.JavaConverters._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
Expand Down Expand Up @@ -82,7 +83,7 @@ trait ProgressReporter extends Logging {
private var lastNoDataProgressEventTime = Long.MinValue

private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))

@volatile
protected var currentStatus: StreamingQueryStatus = {
Expand Down