From de79e50779c0f2e17ea26301ac7d1216b37331c9 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 10 May 2017 14:55:53 +0900 Subject: [PATCH 1/4] Cache TimeZone instances per thread. --- .../expressions/datetimeExpressions.scala | 17 ++++++++------ .../spark/sql/catalyst/json/JSONOptions.scala | 2 +- .../catalyst/optimizer/finishAnalysis.scala | 4 +--- .../sql/catalyst/util/DateTimeUtils.scala | 23 ++++++++++++++++--- .../scala/org/apache/spark/sql/Dataset.scala | 4 ++-- .../spark/sql/execution/QueryExecution.scala | 3 +-- .../datasources/PartitioningUtils.scala | 2 +- .../datasources/csv/CSVOptions.scala | 2 +- .../streaming/ProgressReporter.scala | 5 ++-- 9 files changed, 40 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index a98cd33f2780..47dac8913ca1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -43,7 +43,7 @@ trait TimeZoneAwareExpression extends Expression { /** Returns a copy of this expression with the specified timeZoneId. */ def withTimeZone(timeZoneId: String): TimeZoneAwareExpression - @transient lazy val timeZone: TimeZone = TimeZone.getTimeZone(timeZoneId.get) + @transient lazy val timeZone: TimeZone = DateTimeUtils.getTimeZone(timeZoneId.get) } /** @@ -416,7 +416,7 @@ case class WeekOfYear(child: Expression) extends UnaryExpression with ImplicitCa override def dataType: DataType = IntegerType @transient private lazy val c = { - val c = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + val c = Calendar.getInstance(DateTimeUtils.getTimeZone("UTC")) c.setFirstDayOfWeek(Calendar.MONDAY) c.setMinimalDaysInFirstWeek(4) c @@ -431,9 +431,10 @@ case class WeekOfYear(child: Expression) extends UnaryExpression with ImplicitCa nullSafeCodeGen(ctx, ev, time => { val cal = classOf[Calendar].getName val c = ctx.freshName("cal") + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") ctx.addMutableState(cal, c, s""" - $c = $cal.getInstance(java.util.TimeZone.getTimeZone("UTC")); + $c = $cal.getInstance($dtu.getTimeZone("UTC")); $c.setFirstDayOfWeek($cal.MONDAY); $c.setMinimalDaysInFirstWeek(4); """) @@ -954,8 +955,9 @@ case class FromUTCTimestamp(left: Expression, right: Expression) val tzTerm = ctx.freshName("tz") val utcTerm = ctx.freshName("utc") val tzClass = classOf[TimeZone].getName - ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""") - ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $tzClass.getTimeZone("UTC");""") + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") + ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $dtu.getTimeZone("$tz");""") + ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $dtu.getTimeZone("UTC");""") val eval = left.genCode(ctx) ev.copy(code = s""" |${eval.code} @@ -1125,8 +1127,9 @@ case class ToUTCTimestamp(left: Expression, right: Expression) val tzTerm = ctx.freshName("tz") val utcTerm = ctx.freshName("utc") val tzClass = classOf[TimeZone].getName - ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""") - ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $tzClass.getTimeZone("UTC");""") + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") + ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $dtu.getTimeZone("$tz");""") + ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $dtu.getTimeZone("UTC");""") val eval = left.genCode(ctx) ev.copy(code = s""" |${eval.code} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 23ba5ed4d50d..793051503835 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -70,7 +70,7 @@ private[sql] class JSONOptions( val columnNameOfCorruptRecord = parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord) - val timeZone: TimeZone = TimeZone.getTimeZone( + val timeZone: TimeZone = DateTimeUtils.getTimeZone( parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)) // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 89e1dc9e322e..af0837e36e8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.optimizer -import java.util.TimeZone - import scala.collection.mutable import org.apache.spark.sql.catalyst.catalog.SessionCatalog @@ -55,7 +53,7 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { case CurrentDate(Some(timeZoneId)) => currentDates.getOrElseUpdate(timeZoneId, { Literal.create( - DateTimeUtils.millisToDays(timestamp / 1000L, TimeZone.getTimeZone(timeZoneId)), + DateTimeUtils.millisToDays(timestamp / 1000L, DateTimeUtils.getTimeZone(timeZoneId)), DateType) }) case CurrentTimestamp() => currentTime diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 6c1592fd8881..3224e8809b81 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -23,6 +23,7 @@ import java.util.{Calendar, Locale, TimeZone} import javax.xml.bind.DatatypeConverter import scala.annotation.tailrec +import scala.collection.mutable import org.apache.spark.unsafe.types.UTF8String @@ -98,6 +99,21 @@ object DateTimeUtils { sdf } + private val threadLocalTimeZones = new ThreadLocal[mutable.Map[String, TimeZone]] { + override def initialValue(): mutable.Map[String, TimeZone] = mutable.Map.empty + } + + def getTimeZone(timeZoneId: String): TimeZone = { + val timeZones = threadLocalTimeZones.get() + if (timeZones.contains(timeZoneId)) { + timeZones(timeZoneId) + } else { + val timeZone = TimeZone.getTimeZone(timeZoneId) + timeZones(timeZoneId) = timeZone + timeZone + } + } + def newDateFormat(formatString: String, timeZone: TimeZone): DateFormat = { val sdf = new SimpleDateFormat(formatString, Locale.US) sdf.setTimeZone(timeZone) @@ -407,7 +423,7 @@ object DateTimeUtils { Calendar.getInstance(timeZone) } else { Calendar.getInstance( - TimeZone.getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d")) + getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d")) } c.set(Calendar.MILLISECOND, 0) @@ -1027,7 +1043,7 @@ object DateTimeUtils { * representation in their timezone. */ def fromUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = { - convertTz(time, TimeZoneGMT, TimeZone.getTimeZone(timeZone)) + convertTz(time, TimeZoneGMT, getTimeZone(timeZone)) } /** @@ -1035,7 +1051,7 @@ object DateTimeUtils { * string representation in their timezone. */ def toUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = { - convertTz(time, TimeZone.getTimeZone(timeZone), TimeZoneGMT) + convertTz(time, getTimeZone(timeZone), TimeZoneGMT) } /** @@ -1045,5 +1061,6 @@ object DateTimeUtils { threadLocalGmtCalendar.remove() threadLocalTimestampFormat.remove() threadLocalDateFormat.remove() + threadLocalTimeZones.remove() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 61154e23b1b8..3eedce673a54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql import java.io.CharArrayWriter import java.sql.{Date, Timestamp} -import java.util.TimeZone import scala.collection.JavaConverters._ import scala.language.implicitConversions @@ -249,7 +248,8 @@ class Dataset[T] private[sql]( val hasMoreData = takeResult.length > numRows val data = takeResult.take(numRows) - lazy val timeZone = TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone) + lazy val timeZone = + DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone) // For array values, replace Seq and Array with square brackets // For cells that are beyond `truncate` characters, replace it with the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 8e8210e334a1..2e05e5d65923 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import java.util.TimeZone import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} @@ -187,7 +186,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d)) case (t: Timestamp, TimestampType) => DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t), - TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)) + DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)) case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8) case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal) case (other, tpe) if primitiveTypes.contains(tpe) => other.toString diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 2d70172487e1..f61c673baaa5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -94,7 +94,7 @@ object PartitioningUtils { typeInference: Boolean, basePaths: Set[Path], timeZoneId: String): PartitionSpec = { - parsePartitions(paths, typeInference, basePaths, TimeZone.getTimeZone(timeZoneId)) + parsePartitions(paths, typeInference, basePaths, DateTimeUtils.getTimeZone(timeZoneId)) } private[datasources] def parsePartitions( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 62e4c6e4b4ea..78c16b75ee68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -117,7 +117,7 @@ class CSVOptions( name.map(CompressionCodecs.getCodecClassName) } - val timeZone: TimeZone = TimeZone.getTimeZone( + val timeZone: TimeZone = DateTimeUtils.getTimeZone( parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)) // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 693933f95a23..a4e4ca821374 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.streaming import java.text.SimpleDateFormat -import java.util.{Date, TimeZone, UUID} +import java.util.{Date, UUID} import scala.collection.mutable import scala.collection.JavaConverters._ @@ -26,6 +26,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent @@ -82,7 +83,7 @@ trait ProgressReporter extends Logging { private var lastNoDataProgressEventTime = Long.MinValue private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 - timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC")) + timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) @volatile protected var currentStatus: StreamingQueryStatus = { From 97d5bba544d26444dd945e848ec5da0d37a9a381 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 10 May 2017 18:59:51 +0900 Subject: [PATCH 2/4] Use `getOrElseUpdate()`. --- .../apache/spark/sql/catalyst/util/DateTimeUtils.scala | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 3224e8809b81..1615e6b8d180 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -104,14 +104,7 @@ object DateTimeUtils { } def getTimeZone(timeZoneId: String): TimeZone = { - val timeZones = threadLocalTimeZones.get() - if (timeZones.contains(timeZoneId)) { - timeZones(timeZoneId) - } else { - val timeZone = TimeZone.getTimeZone(timeZoneId) - timeZones(timeZoneId) = timeZone - timeZone - } + threadLocalTimeZones.get().getOrElseUpdate(timeZoneId, TimeZone.getTimeZone(timeZoneId)) } def newDateFormat(formatString: String, timeZone: TimeZone): DateFormat = { From 7935a1a8d8336924e361559d7a708d73b8568e68 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 15 May 2017 09:57:29 +0900 Subject: [PATCH 3/4] Use ConcurrentHashMap instead of thread-local Map. --- .../apache/spark/sql/catalyst/util/DateTimeUtils.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 1615e6b8d180..1d791efe6e2b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.util import java.sql.{Date, Timestamp} import java.text.{DateFormat, SimpleDateFormat} import java.util.{Calendar, Locale, TimeZone} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.{Function => JFunction} import javax.xml.bind.DatatypeConverter import scala.annotation.tailrec @@ -99,12 +101,13 @@ object DateTimeUtils { sdf } - private val threadLocalTimeZones = new ThreadLocal[mutable.Map[String, TimeZone]] { - override def initialValue(): mutable.Map[String, TimeZone] = mutable.Map.empty + private val computedTimeZones = new ConcurrentHashMap[String, TimeZone] + private val computeTimeZone = new JFunction[String, TimeZone] { + override def apply(timeZoneId: String): TimeZone = TimeZone.getTimeZone(timeZoneId) } def getTimeZone(timeZoneId: String): TimeZone = { - threadLocalTimeZones.get().getOrElseUpdate(timeZoneId, TimeZone.getTimeZone(timeZoneId)) + computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } def newDateFormat(formatString: String, timeZone: TimeZone): DateFormat = { @@ -1054,6 +1057,5 @@ object DateTimeUtils { threadLocalGmtCalendar.remove() threadLocalTimestampFormat.remove() threadLocalDateFormat.remove() - threadLocalTimeZones.remove() } } From 3cdbb3acf12b2082056e8b4e2eb3f1645fa1bde7 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 15 May 2017 11:03:30 +0900 Subject: [PATCH 4/4] Remove unnecessary import. --- .../scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 1d791efe6e2b..f5458280e13e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -25,7 +25,6 @@ import java.util.function.{Function => JFunction} import javax.xml.bind.DatatypeConverter import scala.annotation.tailrec -import scala.collection.mutable import org.apache.spark.unsafe.types.UTF8String