From aa052f4d11929192b749752f4b73772664d0460c Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 5 Jan 2017 18:29:42 +0900 Subject: [PATCH 1/8] Add timeZone option to JSONOptions. --- .../expressions/jsonExpressions.scala | 53 +++++++++-- .../spark/sql/catalyst/json/JSONOptions.scala | 6 +- .../sql/catalyst/json/JacksonGenerator.scala | 2 +- .../expressions/JsonExpressionsSuite.scala | 89 +++++++++++++++++-- 4 files changed, 132 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index d55f85d56281..70b16d8337e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json._ -import org.apache.spark.sql.catalyst.util.ParseModes +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, ParseModes} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -482,19 +482,40 @@ case class JsonTuple(children: Seq[Expression]) /** * Converts an json input string to a [[StructType]] with the specified schema. */ -case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression) - extends UnaryExpression with CodegenFallback with ExpectsInputTypes { +case class JsonToStruct( + schema: StructType, + options: Map[String, String], + child: Expression, + timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { override def nullable: Boolean = true + def this(schema: StructType, options: Map[String, String], child: Expression) = + this(schema, options, child, None) + + @transient + lazy val optionsWithTimeZone = { + val caseInsensitiveOptions: CaseInsensitiveMap = + new CaseInsensitiveMap(options + ("mode" -> ParseModes.FAIL_FAST_MODE)) + if (caseInsensitiveOptions.contains("timeZone")) { + caseInsensitiveOptions + } else { + new CaseInsensitiveMap(caseInsensitiveOptions + ("timeZone" -> timeZoneId.get)) + } + } + @transient lazy val parser = new JacksonParser( schema, "invalid", // Not used since we force fail fast. Invalid rows will be set to `null`. - new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE))) + new JSONOptions(optionsWithTimeZone)) override def dataType: DataType = schema + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + copy(timeZoneId = Option(timeZoneId)) + override def nullSafeEval(json: Any): Any = { try parser.parse(json.toString).head catch { case _: SparkSQLJsonProcessingException => null @@ -507,10 +528,25 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child: /** * Converts a [[StructType]] to a json output string. */ -case class StructToJson(options: Map[String, String], child: Expression) - extends UnaryExpression with CodegenFallback with ExpectsInputTypes { +case class StructToJson( + options: Map[String, String], + child: Expression, + timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { override def nullable: Boolean = true + def this(options: Map[String, String], child: Expression) = this(options, child, None) + + @transient + lazy val optionsWithTimeZone = { + val caseInsensitiveOptions: CaseInsensitiveMap = new CaseInsensitiveMap(options) + if (caseInsensitiveOptions.contains("timeZone")) { + caseInsensitiveOptions + } else { + new CaseInsensitiveMap(caseInsensitiveOptions + ("timeZone" -> timeZoneId.get)) + } + } + @transient lazy val writer = new CharArrayWriter() @@ -519,7 +555,7 @@ case class StructToJson(options: Map[String, String], child: Expression) new JacksonGenerator( child.dataType.asInstanceOf[StructType], writer, - new JSONOptions(options)) + new JSONOptions(optionsWithTimeZone)) override def dataType: DataType = StringType @@ -538,6 +574,9 @@ case class StructToJson(options: Map[String, String], child: Expression) } } + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + copy(timeZoneId = Option(timeZoneId)) + override def nullSafeEval(row: Any): Any = { gen.write(row.asInstanceOf[InternalRow]) gen.flush() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 38e191bbbad6..4fb667684a1f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.json -import java.util.Locale +import java.util.{Locale, TimeZone} import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import org.apache.commons.lang3.time.FastDateFormat @@ -58,13 +58,15 @@ private[sql] class JSONOptions( private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord") + val timeZone: TimeZone = TimeZone.getTimeZone(parameters("timeZone")) + // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. val dateFormat: FastDateFormat = FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US) val timestampFormat: FastDateFormat = FastDateFormat.getInstance( - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), Locale.US) + parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US) // Parse mode flags if (!ParseModes.isValidMode(parseMode)) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index bf8e3c812ee8..dec55279c9fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.types._ private[sql] class JacksonGenerator( schema: StructType, writer: Writer, - options: JSONOptions = new JSONOptions(Map.empty[String, String])) { + options: JSONOptions) { // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 618b8b29e8ee..ac47074ca3ec 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.catalyst.expressions +import java.util.Calendar + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.ParseModes -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, ParseModes} +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimestampType} import org.apache.spark.unsafe.types.UTF8String class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -344,11 +346,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc")))) } + val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID) + test("from_json") { val jsonData = """{"a": 1}""" val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData)), + JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), InternalRow.fromSeq(1 :: Nil) ) } @@ -357,13 +361,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val jsonData = """{"a" 1}""" val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData)), + JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), null ) // Other modes should still return `null`. checkEvaluation( - JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData)), + JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId), null ) } @@ -371,16 +375,55 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("from_json null input column") { val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal.create(null, StringType)), + JsonToStruct(schema, Map.empty, Literal.create(null, StringType), gmtId), null ) } + test("from_json with timestamp") { + val schema = StructType(StructField("t", TimestampType) :: Nil) + + val jsonData1 = """{"t": "2016-01-01T00:00:00.123Z"}""" + var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) + c.set(2016, 0, 1, 0, 0, 0) + c.set(Calendar.MILLISECOND, 123) + checkEvaluation( + JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId), + InternalRow.fromSeq(c.getTimeInMillis * 1000L :: Nil) + ) + checkEvaluation( + JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")), + InternalRow.fromSeq(c.getTimeInMillis * 1000L :: Nil) + ) + + val jsonData2 = """{"t": "2016-01-01T00:00:00"}""" + for (tz <- DateTimeTestUtils.ALL_TIMEZONES) { + c = Calendar.getInstance(tz) + c.set(2016, 0, 1, 0, 0, 0) + c.set(Calendar.MILLISECOND, 0) + checkEvaluation( + JsonToStruct( + schema, + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"), + Literal(jsonData2), + Option(tz.getID)), + InternalRow.fromSeq(c.getTimeInMillis * 1000L :: Nil) + ) + checkEvaluation( + JsonToStruct( + schema, + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> tz.getID), + Literal(jsonData2)), + InternalRow.fromSeq(c.getTimeInMillis * 1000L :: Nil) + ) + } + } + test("to_json") { val schema = StructType(StructField("a", IntegerType) :: Nil) val struct = Literal.create(create_row(1), schema) checkEvaluation( - StructToJson(Map.empty, struct), + StructToJson(Map.empty, struct, gmtId), """{"a":1}""" ) } @@ -389,8 +432,38 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val schema = StructType(StructField("a", IntegerType) :: Nil) val struct = Literal.create(null, schema) checkEvaluation( - StructToJson(Map.empty, struct), + StructToJson(Map.empty, struct, gmtId), null ) } + + test("to_json with timestamp") { + val schema = StructType(StructField("t", TimestampType) :: Nil) + val c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) + c.set(2016, 0, 1, 0, 0, 0) + c.set(Calendar.MILLISECOND, 0) + val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema) + + checkEvaluation( + StructToJson(Map.empty, struct, gmtId), + """{"t":"2016-01-01T00:00:00.000Z"}""" + ) + checkEvaluation( + StructToJson(Map.empty, struct, Option("PST")), + """{"t":"2015-12-31T16:00:00.000-08:00"}""" + ) + + checkEvaluation( + StructToJson( + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> gmtId.get), + struct), + """{"t":"2016-01-01T00:00:00"}""" + ) + checkEvaluation( + StructToJson( + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> "PST"), + struct), + """{"t":"2015-12-31T16:00:00"}""" + ) + } } From 890879e24b3f63509a000585e18b288961a4e5cf Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 6 Jan 2017 14:11:41 +0900 Subject: [PATCH 2/8] Apply timeZone option to JSON datasources. --- .../apache/spark/sql/DataFrameReader.scala | 15 ++++++- .../apache/spark/sql/DataFrameWriter.scala | 2 + .../scala/org/apache/spark/sql/Dataset.scala | 5 ++- .../datasources/json/JsonFileFormat.scala | 23 ++++++++-- .../sql/streaming/DataStreamReader.scala | 2 + .../datasources/json/JsonSuite.scala | 45 +++++++++++++++++-- .../sql/sources/ResolvedDataSourceSuite.scala | 6 ++- 7 files changed, 86 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index fe34d597dbf1..3ffce988c498 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -27,6 +27,7 @@ import org.apache.spark.Partition import org.apache.spark.annotation.InterfaceStability import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource @@ -298,6 +299,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.
  • * * * @since 2.0.0 @@ -329,7 +332,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def json(jsonRDD: RDD[String]): DataFrame = { - val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap) + val optionsWithTimeZone = { + val options = extraOptions.toMap + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + if (caseInsensitiveOptions.contains("timeZone")) { + caseInsensitiveOptions + } else { + new CaseInsensitiveMap( + options + ("timeZone" -> sparkSession.sessionState.conf.sessionLocalTimeZone)) + } + } + val parsedOptions: JSONOptions = new JSONOptions(optionsWithTimeZone) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ff1f0177e8ba..d891ba1daeee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -492,6 +492,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to format timestamps.
  • * * * @since 1.4.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 391c34f1285e..062b59054db9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.json.JacksonGenerator +import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans._ @@ -2670,10 +2670,11 @@ class Dataset[T] private[sql]( */ def toJSON: Dataset[String] = { val rowSchema = this.schema + val options = Map("timeZone" -> sparkSession.sessionState.conf.sessionLocalTimeZone) val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter => val writer = new CharArrayWriter() // create the Generator without separator inserted between 2 records - val gen = new JacksonGenerator(rowSchema, writer) + val gen = new JacksonGenerator(rowSchema, writer, new JSONOptions(options)) new Iterator[String] { override def hasNext: Boolean = iter.hasNext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index be1f94dbad91..ad81852d7cc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions} -import org.apache.spark.sql.catalyst.util.CompressionCodecs +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType @@ -47,7 +47,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { if (files.isEmpty) { None } else { - val parsedOptions: JSONOptions = new JSONOptions(options) + val optionsWithTimeZone = getOptionsWithTimeZone(sparkSession, options) + val parsedOptions: JSONOptions = new JSONOptions(optionsWithTimeZone) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) @@ -67,7 +68,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { val conf = job.getConfiguration - val parsedOptions: JSONOptions = new JSONOptions(options) + val optionsWithTimeZone = getOptionsWithTimeZone(sparkSession, options) + val parsedOptions: JSONOptions = new JSONOptions(optionsWithTimeZone) parsedOptions.compressionCodec.foreach { codec => CompressionCodecs.setCodecConfiguration(conf, codec) } @@ -97,7 +99,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val parsedOptions: JSONOptions = new JSONOptions(options) + val optionsWithTimeZone = getOptionsWithTimeZone(sparkSession, options) + val parsedOptions: JSONOptions = new JSONOptions(optionsWithTimeZone) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) @@ -129,6 +132,18 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { classOf[Text]).map(_._2.toString) // get the text line } + private def getOptionsWithTimeZone( + sparkSession: SparkSession, + options: Map[String, String]): CaseInsensitiveMap = { + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + if (caseInsensitiveOptions.contains("timeZone")) { + caseInsensitiveOptions + } else { + new CaseInsensitiveMap( + options + ("timeZone" -> sparkSession.sessionState.conf.sessionLocalTimeZone)) + } + } + /** Constraints to be imposed on schema to be stored. */ private def checkConstraints(schema: StructType): Unit = { if (schema.fieldNames.length != schema.fieldNames.distinct.length) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 6d2cede6f5a2..e5179263f686 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -181,6 +181,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.
  • * * * @since 2.0.0 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 161a409d83ad..b37bf8880f63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -62,7 +62,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { generator.flush() } - val dummyOption = new JSONOptions(Map.empty[String, String]) + val dummyOption = new JSONOptions(Map("timeZone" -> "GMT")) val dummySchema = StructType(Seq.empty) val parser = new JacksonParser(dummySchema, "", dummyOption) @@ -1366,7 +1366,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-6245 JsonRDD.inferSchema on empty RDD") { // This is really a test that it doesn't throw an exception - val emptySchema = InferSchema.infer(empty, "", new JSONOptions(Map.empty[String, String])) + val emptySchema = InferSchema.infer(empty, "", new JSONOptions(Map("timeZone" -> "GMT"))) assert(StructType(Seq()) === emptySchema) } @@ -1391,7 +1391,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-8093 Erase empty structs") { val emptySchema = InferSchema.infer( - emptyRecords, "", new JSONOptions(Map.empty[String, String])) + emptyRecords, "", new JSONOptions(Map("timeZone" -> "GMT"))) assert(StructType(Seq()) === emptySchema) } @@ -1723,7 +1723,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - test("Write timestamps correctly with dateFormat option") { + test("Write timestamps correctly with timestampFormat option") { val customSchema = new StructType(Array(StructField("date", TimestampType, true))) withTempDir { dir => // With dateFormat option. @@ -1751,6 +1751,43 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } + test("Write timestamps correctly with timestampFormat option and timeZone option") { + val customSchema = new StructType(Array(StructField("date", TimestampType, true))) + withTempDir { dir => + // With dateFormat option and timeZone option. + val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json" + val timestampsWithFormat = spark.read + .schema(customSchema) + .option("timestampFormat", "dd/MM/yyyy HH:mm") + .json(datesRecords) + timestampsWithFormat.write + .format("json") + .option("timestampFormat", "yyyy/MM/dd HH:mm") + .option("timeZone", "GMT") + .save(timestampsWithFormatPath) + + // This will load back the timestamps as string. + val stringSchema = StructType(StructField("date", StringType, true) :: Nil) + val stringTimestampsWithFormat = spark.read + .schema(stringSchema) + .json(timestampsWithFormatPath) + val expectedStringDatesWithFormat = Seq( + Row("2015/08/27 01:00"), + Row("2014/10/28 01:30"), + Row("2016/01/29 04:00")) + + checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat) + + val readBack = spark.read + .schema(customSchema) + .option("timestampFormat", "yyyy/MM/dd HH:mm") + .option("timeZone", "GMT") + .json(timestampsWithFormatPath) + + checkAnswer(readBack, timestampsWithFormat) + } + } + test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { val records = sparkContext .parallelize("""{"a": 3, "b": 1.1}""" :: """{"a": 3.1, "b": 0.000001}""" :: Nil) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 76ffb949f129..9b5e364e512a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -19,11 +19,15 @@ package org.apache.spark.sql.sources import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.DataSource class ResolvedDataSourceSuite extends SparkFunSuite { private def getProvidingClass(name: String): Class[_] = - DataSource(sparkSession = null, className = name).providingClass + DataSource( + sparkSession = null, + className = name, + options = Map("timeZone" -> DateTimeUtils.defaultTimeZone().getID)).providingClass test("jdbc") { assert( From f08b78c16ac444550e7ea0857d0275b9a91b7561 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 6 Jan 2017 15:03:34 +0900 Subject: [PATCH 3/8] Apply timeZone option to CSV datasources. --- .../apache/spark/sql/DataFrameReader.scala | 2 + .../apache/spark/sql/DataFrameWriter.scala | 2 + .../datasources/csv/CSVFileFormat.scala | 23 +++++- .../datasources/csv/CSVOptions.scala | 15 +--- .../datasources/csv/UnivocityParser.scala | 2 +- .../sql/streaming/DataStreamReader.scala | 2 + .../datasources/csv/CSVInferSchemaSuite.scala | 20 ++--- .../execution/datasources/csv/CSVSuite.scala | 44 ++++++++++- .../csv/UnivocityParserSuite.scala | 73 +++++++++++-------- 9 files changed, 127 insertions(+), 56 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 3ffce988c498..0ec2d337c247 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -414,6 +414,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.
  • *
  • `maxColumns` (default `20480`): defines a hard limit of how many columns * a record can have.
  • *
  • `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d891ba1daeee..78d58154adc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -600,6 +600,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to format timestamps.
  • * * * @since 2.0.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 38970160d5fb..60c5d5e7ab76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.CompressionCodecs +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.functions.{length, trim} @@ -58,7 +58,8 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { files: Seq[FileStatus]): Option[StructType] = { require(files.nonEmpty, "Cannot infer schema from an empty set of files") - val csvOptions = new CSVOptions(options) + val optionsWithTimeZone = getOptionsWithTimeZone(sparkSession, options) + val csvOptions = new CSVOptions(optionsWithTimeZone) val paths = files.map(_.getPath.toString) val lines: Dataset[String] = readText(sparkSession, csvOptions, paths) val firstLine: String = findFirstLine(csvOptions, lines) @@ -127,7 +128,8 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { dataSchema: StructType): OutputWriterFactory = { verifySchema(dataSchema) val conf = job.getConfiguration - val csvOptions = new CSVOptions(options) + val optionsWithTimeZone = getOptionsWithTimeZone(sparkSession, options) + val csvOptions = new CSVOptions(optionsWithTimeZone) csvOptions.compressionCodec.foreach { codec => CompressionCodecs.setCodecConfiguration(conf, codec) } @@ -154,7 +156,8 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - val csvOptions = new CSVOptions(options) + val optionsWithTimeZone = getOptionsWithTimeZone(sparkSession, options) + val csvOptions = new CSVOptions(optionsWithTimeZone) val commentPrefix = csvOptions.comment.toString val broadcastedHadoopConf = @@ -231,6 +234,18 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { schema.foreach(field => verifyType(field.dataType)) } + + private def getOptionsWithTimeZone( + sparkSession: SparkSession, + options: Map[String, String]): CaseInsensitiveMap = { + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + if (caseInsensitiveOptions.contains("timeZone")) { + caseInsensitiveOptions + } else { + new CaseInsensitiveMap( + options + ("timeZone" -> sparkSession.sessionState.conf.sessionLocalTimeZone)) + } + } } private[csv] class CsvOutputWriter( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 140ce23958dc..2c669242ae06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.csv import java.nio.charset.StandardCharsets -import java.util.Locale +import java.util.{Locale, TimeZone} import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling} import org.apache.commons.lang3.time.FastDateFormat @@ -106,13 +106,15 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive name.map(CompressionCodecs.getCodecClassName) } + val timeZone: TimeZone = TimeZone.getTimeZone(parameters("timeZone")) + // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. val dateFormat: FastDateFormat = FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US) val timestampFormat: FastDateFormat = FastDateFormat.getInstance( - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), Locale.US) + parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US) val maxColumns = getInt("maxColumns", 20480) @@ -161,12 +163,3 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive settings } } - -object CSVOptions { - - def apply(): CSVOptions = new CSVOptions(new CaseInsensitiveMap(Map.empty)) - - def apply(paramName: String, paramValue: String): CSVOptions = { - new CSVOptions(Map(paramName -> paramValue)) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 3b42aa60b024..2e409b3f5fbf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -76,7 +76,7 @@ private[csv] class UnivocityParser( name: String, dataType: DataType, nullable: Boolean = true, - options: CSVOptions = CSVOptions()): ValueConverter = dataType match { + options: CSVOptions): ValueConverter = dataType match { case _: ByteType => (d: String) => nullSafeDatum(d, name, nullable, options)(_.toByte) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index e5179263f686..9e089225c82b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -232,6 +232,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.
  • *
  • `maxColumns` (default `20480`): defines a hard limit of how many columns * a record can have.
  • *
  • `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index 8620bb9f65b9..e937790f0f6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.types._ class CSVInferSchemaSuite extends SparkFunSuite { test("String fields types are inferred correctly from null types") { - val options = new CSVOptions(Map.empty[String, String]) + val options = new CSVOptions(Map("timeZone" -> "GMT")) assert(CSVInferSchema.inferField(NullType, "", options) == NullType) assert(CSVInferSchema.inferField(NullType, null, options) == NullType) assert(CSVInferSchema.inferField(NullType, "100000000000", options) == LongType) @@ -41,7 +41,7 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("String fields types are inferred correctly from other types") { - val options = new CSVOptions(Map.empty[String, String]) + val options = new CSVOptions(Map("timeZone" -> "GMT")) assert(CSVInferSchema.inferField(LongType, "1.0", options) == DoubleType) assert(CSVInferSchema.inferField(LongType, "test", options) == StringType) assert(CSVInferSchema.inferField(IntegerType, "1.0", options) == DoubleType) @@ -60,21 +60,21 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("Timestamp field types are inferred correctly via custom data format") { - var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm")) + var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm", "timeZone" -> "GMT")) assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) - options = new CSVOptions(Map("timestampFormat" -> "yyyy")) + options = new CSVOptions(Map("timestampFormat" -> "yyyy", "timeZone" -> "GMT")) assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType) } test("Timestamp field types are inferred correctly from other types") { - val options = new CSVOptions(Map.empty[String, String]) + val options = new CSVOptions(Map("timeZone" -> "GMT")) assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType) assert(CSVInferSchema.inferField(DoubleType, "2015-08-20 14:10", options) == StringType) assert(CSVInferSchema.inferField(LongType, "2015-08 14:49:00", options) == StringType) } test("Boolean fields types are inferred correctly from other types") { - val options = new CSVOptions(Map.empty[String, String]) + val options = new CSVOptions(Map("timeZone" -> "GMT")) assert(CSVInferSchema.inferField(LongType, "Fale", options) == StringType) assert(CSVInferSchema.inferField(DoubleType, "TRUEe", options) == StringType) } @@ -92,12 +92,12 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("Null fields are handled properly when a nullValue is specified") { - var options = new CSVOptions(Map("nullValue" -> "null")) + var options = new CSVOptions(Map("nullValue" -> "null", "timeZone" -> "GMT")) assert(CSVInferSchema.inferField(NullType, "null", options) == NullType) assert(CSVInferSchema.inferField(StringType, "null", options) == StringType) assert(CSVInferSchema.inferField(LongType, "null", options) == LongType) - options = new CSVOptions(Map("nullValue" -> "\\N")) + options = new CSVOptions(Map("nullValue" -> "\\N", "timeZone" -> "GMT")) assert(CSVInferSchema.inferField(IntegerType, "\\N", options) == IntegerType) assert(CSVInferSchema.inferField(DoubleType, "\\N", options) == DoubleType) assert(CSVInferSchema.inferField(TimestampType, "\\N", options) == TimestampType) @@ -111,12 +111,12 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { - val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm")) + val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm", "timeZone" -> "GMT")) assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) } test("SPARK-18877: `inferField` on DecimalType should find a common type with `typeSoFar`") { - val options = new CSVOptions(Map.empty[String, String]) + val options = new CSVOptions(Map("timeZone" -> "GMT")) // 9.03E+12 is Decimal(3, -10) and 1.19E+11 is Decimal(3, -9). assert(CSVInferSchema.inferField(DecimalType(3, -10), "1.19E+11", options) == diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 491ff72337a8..2473d0900b94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -828,7 +828,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } - test("Write timestamps correctly with dateFormat option") { + test("Write timestamps correctly with timestampFormat option") { withTempDir { dir => // With dateFormat option. val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv" @@ -859,6 +859,48 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + test("Write timestamps correctly with timestampFormat option and timeZone option") { + withTempDir { dir => + // With dateFormat option and timeZone option. + val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv" + val timestampsWithFormat = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "true") + .option("timestampFormat", "dd/MM/yyyy HH:mm") + .load(testFile(datesFile)) + timestampsWithFormat.write + .format("csv") + .option("header", "true") + .option("timestampFormat", "yyyy/MM/dd HH:mm") + .option("timeZone", "GMT") + .save(timestampsWithFormatPath) + + // This will load back the timestamps as string. + val stringTimestampsWithFormat = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "false") + .load(timestampsWithFormatPath) + val expectedStringTimestampsWithFormat = Seq( + Row("2015/08/27 01:00"), + Row("2014/10/28 01:30"), + Row("2016/01/29 04:00")) + + checkAnswer(stringTimestampsWithFormat, expectedStringTimestampsWithFormat) + + val readBack = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "true") + .option("timestampFormat", "yyyy/MM/dd HH:mm") + .option("timeZone", "GMT") + .load(timestampsWithFormatPath) + + checkAnswer(readBack, timestampsWithFormat) + } + } + test("load duplicated field names consistently with null or empty strings - case sensitive") { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { withTempPath { path => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala index 2ca6308852a7..03e166462acd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.unsafe.types.UTF8String class UnivocityParserSuite extends SparkFunSuite { private val parser = - new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map.empty[String, String])) + new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map("timeZone" -> "GMT"))) private def assertNull(v: Any) = assert(v == null) @@ -38,7 +38,8 @@ class UnivocityParserSuite extends SparkFunSuite { stringValues.zip(decimalValues).foreach { case (strVal, decimalVal) => val decimalValue = new BigDecimal(decimalVal.toString) - assert(parser.makeConverter("_1", decimalType).apply(strVal) === + val options = new CSVOptions(Map("timeZone" -> "GMT")) + assert(parser.makeConverter("_1", decimalType, options = options).apply(strVal) === Decimal(decimalValue, decimalType.precision, decimalType.scale)) } } @@ -74,20 +75,23 @@ class UnivocityParserSuite extends SparkFunSuite { // Nullable field with nullValue option. types.foreach { t => // Tests that a custom nullValue. + val nullValueOptions = new CSVOptions(Map("nullValue" -> "-", "timeZone" -> "GMT")) val converter = - parser.makeConverter("_1", t, nullable = true, CSVOptions("nullValue", "-")) + parser.makeConverter("_1", t, nullable = true, options = nullValueOptions) assertNull(converter.apply("-")) assertNull(converter.apply(null)) // Tests that the default nullValue is empty string. - assertNull(parser.makeConverter("_1", t, nullable = true).apply("")) + val options = new CSVOptions(Map("timeZone" -> "GMT")) + assertNull(parser.makeConverter("_1", t, nullable = true, options = options).apply("")) } // Not nullable field with nullValue option. types.foreach { t => // Casts a null to not nullable field should throw an exception. + val options = new CSVOptions(Map("nullValue" -> "-", "timeZone" -> "GMT")) val converter = - parser.makeConverter("_1", t, nullable = false, CSVOptions("nullValue", "-")) + parser.makeConverter("_1", t, nullable = false, options = options) var message = intercept[RuntimeException] { converter.apply("-") }.getMessage @@ -101,48 +105,52 @@ class UnivocityParserSuite extends SparkFunSuite { // If nullValue is different with empty string, then, empty string should not be casted into // null. Seq(true, false).foreach { b => + val options = new CSVOptions(Map("nullValue" -> "null", "timeZone" -> "GMT")) val converter = - parser.makeConverter("_1", StringType, nullable = b, CSVOptions("nullValue", "null")) + parser.makeConverter("_1", StringType, nullable = b, options = options) assert(converter.apply("") == UTF8String.fromString("")) } } test("Throws exception for empty string with non null type") { + val options = new CSVOptions(Map("timeZone" -> "GMT")) val exception = intercept[RuntimeException]{ - parser.makeConverter("_1", IntegerType, nullable = false, CSVOptions()).apply("") + parser.makeConverter("_1", IntegerType, nullable = false, options = options).apply("") } assert(exception.getMessage.contains("null value found but field _1 is not nullable.")) } test("Types are cast correctly") { - assert(parser.makeConverter("_1", ByteType).apply("10") == 10) - assert(parser.makeConverter("_1", ShortType).apply("10") == 10) - assert(parser.makeConverter("_1", IntegerType).apply("10") == 10) - assert(parser.makeConverter("_1", LongType).apply("10") == 10) - assert(parser.makeConverter("_1", FloatType).apply("1.00") == 1.0) - assert(parser.makeConverter("_1", DoubleType).apply("1.00") == 1.0) - assert(parser.makeConverter("_1", BooleanType).apply("true") == true) - - val timestampsOptions = CSVOptions("timestampFormat", "dd/MM/yyyy hh:mm") + val options = new CSVOptions(Map("timeZone" -> "GMT")) + assert(parser.makeConverter("_1", ByteType, options = options).apply("10") == 10) + assert(parser.makeConverter("_1", ShortType, options = options).apply("10") == 10) + assert(parser.makeConverter("_1", IntegerType, options = options).apply("10") == 10) + assert(parser.makeConverter("_1", LongType, options = options).apply("10") == 10) + assert(parser.makeConverter("_1", FloatType, options = options).apply("1.00") == 1.0) + assert(parser.makeConverter("_1", DoubleType, options = options).apply("1.00") == 1.0) + assert(parser.makeConverter("_1", BooleanType, options = options).apply("true") == true) + + val timestampsOptions = + new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm", "timeZone" -> "GMT")) val customTimestamp = "31/01/2015 00:00" val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime val castedTimestamp = - parser.makeConverter("_1", TimestampType, nullable = true, timestampsOptions) + parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions) .apply(customTimestamp) assert(castedTimestamp == expectedTime * 1000L) val customDate = "31/01/2015" - val dateOptions = CSVOptions("dateFormat", "dd/MM/yyyy") + val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy", "timeZone" -> "GMT")) val expectedDate = dateOptions.dateFormat.parse(customDate).getTime val castedDate = - parser.makeConverter("_1", DateType, nullable = true, dateOptions) + parser.makeConverter("_1", DateType, nullable = true, options = dateOptions) .apply(customTimestamp) assert(castedDate == DateTimeUtils.millisToDays(expectedDate)) val timestamp = "2015-01-01 00:00:00" - assert(parser.makeConverter("_1", TimestampType).apply(timestamp) == + assert(parser.makeConverter("_1", TimestampType, options = options).apply(timestamp) == DateTimeUtils.stringToTime(timestamp).getTime * 1000L) - assert(parser.makeConverter("_1", DateType).apply("2015-01-01") == + assert(parser.makeConverter("_1", DateType, options = options).apply("2015-01-01") == DateTimeUtils.millisToDays(DateTimeUtils.stringToTime("2015-01-01").getTime)) } @@ -151,16 +159,18 @@ class UnivocityParserSuite extends SparkFunSuite { try { Locale.setDefault(new Locale("fr", "FR")) // Would parse as 1.0 in fr-FR - assert(parser.makeConverter("_1", FloatType).apply("1,00") == 100.0) - assert(parser.makeConverter("_1", DoubleType).apply("1,00") == 100.0) + val options = new CSVOptions(Map("timeZone" -> "GMT")) + assert(parser.makeConverter("_1", FloatType, options = options).apply("1,00") == 100.0) + assert(parser.makeConverter("_1", DoubleType, options = options).apply("1,00") == 100.0) } finally { Locale.setDefault(originalLocale) } } test("Float NaN values are parsed correctly") { + val options = new CSVOptions(Map("nanValue" -> "nn", "timeZone" -> "GMT")) val floatVal: Float = parser.makeConverter( - "_1", FloatType, nullable = true, CSVOptions("nanValue", "nn") + "_1", FloatType, nullable = true, options = options ).apply("nn").asInstanceOf[Float] // Java implements the IEEE-754 floating point standard which guarantees that any comparison @@ -169,36 +179,41 @@ class UnivocityParserSuite extends SparkFunSuite { } test("Double NaN values are parsed correctly") { + val options = new CSVOptions(Map("nanValue" -> "-", "timeZone" -> "GMT")) val doubleVal: Double = parser.makeConverter( - "_1", DoubleType, nullable = true, CSVOptions("nanValue", "-") + "_1", DoubleType, nullable = true, options = options ).apply("-").asInstanceOf[Double] assert(doubleVal.isNaN) } test("Float infinite values can be parsed") { + val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max", "timeZone" -> "GMT")) val floatVal1 = parser.makeConverter( - "_1", FloatType, nullable = true, CSVOptions("negativeInf", "max") + "_1", FloatType, nullable = true, options = negativeInfOptions ).apply("max").asInstanceOf[Float] assert(floatVal1 == Float.NegativeInfinity) + val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max", "timeZone" -> "GMT")) val floatVal2 = parser.makeConverter( - "_1", FloatType, nullable = true, CSVOptions("positiveInf", "max") + "_1", FloatType, nullable = true, options = positiveInfOptions ).apply("max").asInstanceOf[Float] assert(floatVal2 == Float.PositiveInfinity) } test("Double infinite values can be parsed") { + val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max", "timeZone" -> "GMT")) val doubleVal1 = parser.makeConverter( - "_1", DoubleType, nullable = true, CSVOptions("negativeInf", "max") + "_1", DoubleType, nullable = true, options = negativeInfOptions ).apply("max").asInstanceOf[Double] assert(doubleVal1 == Double.NegativeInfinity) + val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max", "timeZone" -> "GMT")) val doubleVal2 = parser.makeConverter( - "_1", DoubleType, nullable = true, CSVOptions("positiveInf", "max") + "_1", DoubleType, nullable = true, options = positiveInfOptions ).apply("max").asInstanceOf[Double] assert(doubleVal2 == Double.PositiveInfinity) From 551cff99785927be3ef68c4393dca4dabb3c2ba0 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 6 Jan 2017 17:39:26 +0900 Subject: [PATCH 4/8] Modify python files. --- python/pyspark/sql/readwriter.py | 45 ++++++++++++++++++++------------ python/pyspark/sql/streaming.py | 24 ++++++++++------- 2 files changed, 42 insertions(+), 27 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index d31f3fb8f604..df1c4768981d 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -158,7 +158,8 @@ def load(self, path=None, format=None, schema=None, **options): def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, - mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None): + mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, + timeZone=None): """ Loads a JSON file (`JSON Lines text format or newline-delimited JSON `_) or an RDD of Strings storing JSON objects (one object per @@ -204,11 +205,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the - default value value, ``yyyy-MM-dd``. + default value, ``yyyy-MM-dd``. :param timestampFormat: sets the string that indicates a timestamp format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the - default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. + If None is set, it uses the default value, session local timezone. >>> df1 = spark.read.json('python/test_support/sql/people.json') >>> df1.dtypes @@ -225,7 +228,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, - timestampFormat=timestampFormat) + timestampFormat=timestampFormat, timeZone=timeZone) if isinstance(path, basestring): path = [path] if type(path) == list: @@ -297,7 +300,7 @@ def text(self, paths): def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, + negativeInf=None, dateFormat=None, timestampFormat=None, timeZone=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None): """Loads a CSV file and returns the result as a :class:`DataFrame`. @@ -341,11 +344,13 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the - default value value, ``yyyy-MM-dd``. + default value, ``yyyy-MM-dd``. :param timestampFormat: sets the string that indicates a timestamp format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the - default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. + If None is set, it uses the default value, session local timezone. :param maxColumns: defines a hard limit of how many columns a record can have. If None is set, it uses the default value, ``20480``. :param maxCharsPerColumn: defines the maximum number of characters allowed for any given @@ -372,8 +377,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, - dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, - maxCharsPerColumn=maxCharsPerColumn, + dateFormat=dateFormat, timestampFormat=timestampFormat, timeZone=timeZone, + maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) if isinstance(path, basestring): path = [path] @@ -591,7 +596,8 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options) self._jwrite.saveAsTable(name) @since(1.4) - def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None): + def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None, + timeZone=None): """Saves the content of the :class:`DataFrame` in JSON format at the specified path. :param path: the path in any Hadoop supported file system @@ -607,17 +613,20 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the - default value value, ``yyyy-MM-dd``. + default value, ``yyyy-MM-dd``. :param timestampFormat: sets the string that indicates a timestamp format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the - default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + :param timeZone: sets the string that indicates a timezone to be used to format timestamps. + If None is set, it uses the default value, session local timezone. >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) self._set_opts( - compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat) + compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat, + timeZone=timeZone) self._jwrite.json(path) @since(1.4) @@ -664,7 +673,7 @@ def text(self, path, compression=None): @since(2.0) def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, - timestampFormat=None): + timestampFormat=None, timeZone=None): """Saves the content of the :class:`DataFrame` in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -699,18 +708,20 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the - default value value, ``yyyy-MM-dd``. + default value, ``yyyy-MM-dd``. :param timestampFormat: sets the string that indicates a timestamp format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the - default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. + If None is set, it uses the default value, session local timezone. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header, nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll, - dateFormat=dateFormat, timestampFormat=timestampFormat) + dateFormat=dateFormat, timestampFormat=timestampFormat, timeZone=timeZone) self._jwrite.csv(path) @since(1.5) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index a10b185cd4c7..50d9a87fc8a7 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -429,7 +429,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, - timestampFormat=None): + timestampFormat=None, timeZone=None): """ Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON `_) and returns a :class`DataFrame`. @@ -476,11 +476,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the - default value value, ``yyyy-MM-dd``. + default value, ``yyyy-MM-dd``. :param timestampFormat: sets the string that indicates a timestamp format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the - default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. + If None is set, it uses the default value, session local timezone. >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema) >>> json_sdf.isStreaming @@ -494,7 +496,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, - timestampFormat=timestampFormat) + timestampFormat=timestampFormat, timeZone=timeZone) if isinstance(path, basestring): return self._df(self._jreader.json(path)) else: @@ -551,8 +553,8 @@ def text(self, path): def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, - maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None): + negativeInf=None, dateFormat=None, timestampFormat=None, timeZone=None, + maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None): """Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -597,11 +599,13 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the - default value value, ``yyyy-MM-dd``. + default value, ``yyyy-MM-dd``. :param timestampFormat: sets the string that indicates a timestamp format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the - default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. + If None is set, it uses the default value, session local timezone. :param maxColumns: defines a hard limit of how many columns a record can have. If None is set, it uses the default value, ``20480``. :param maxCharsPerColumn: defines the maximum number of characters allowed for any given @@ -626,8 +630,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, - dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, - maxCharsPerColumn=maxCharsPerColumn, + dateFormat=dateFormat, timestampFormat=timestampFormat, timeZone=timeZone, + maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) if isinstance(path, basestring): return self._df(self._jreader.csv(path)) From bdad5b72d9525fa5885e4131e7f0781d88b092ee Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 1 Feb 2017 11:55:27 +0900 Subject: [PATCH 5/8] Move new arguments to the end of the argument list. --- python/pyspark/sql/readwriter.py | 14 +++++++------- python/pyspark/sql/streaming.py | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index df1c4768981d..167833488980 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -300,8 +300,8 @@ def text(self, paths): def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, dateFormat=None, timestampFormat=None, timeZone=None, maxColumns=None, - maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None): + negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None): """Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -349,8 +349,6 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. - :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. - If None is set, it uses the default value, session local timezone. :param maxColumns: defines a hard limit of how many columns a record can have. If None is set, it uses the default value, ``20480``. :param maxCharsPerColumn: defines the maximum number of characters allowed for any given @@ -362,6 +360,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non uses the default value, ``10``. :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. + :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. + If None is set, it uses the default value, session local timezone. * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. When a schema is set by user, it sets ``null`` for extra fields. @@ -377,9 +377,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, - dateFormat=dateFormat, timestampFormat=timestampFormat, timeZone=timeZone, - maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, - maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) + dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, + maxCharsPerColumn=maxCharsPerColumn, + maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone) if isinstance(path, basestring): path = [path] return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 50d9a87fc8a7..d988e596a86d 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -553,8 +553,8 @@ def text(self, path): def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, dateFormat=None, timestampFormat=None, timeZone=None, - maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None): + negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None): """Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -604,8 +604,6 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. - :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. - If None is set, it uses the default value, session local timezone. :param maxColumns: defines a hard limit of how many columns a record can have. If None is set, it uses the default value, ``20480``. :param maxCharsPerColumn: defines the maximum number of characters allowed for any given @@ -613,6 +611,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non ``-1`` meaning unlimited length. :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. + :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. + If None is set, it uses the default value, session local timezone. * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. When a schema is set by user, it sets ``null`` for extra fields. @@ -630,9 +630,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, - dateFormat=dateFormat, timestampFormat=timestampFormat, timeZone=timeZone, - maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, - maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) + dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, + maxCharsPerColumn=maxCharsPerColumn, + maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone) if isinstance(path, basestring): return self._df(self._jreader.csv(path)) else: From d5ab37c7e1bdcd79586b802f3450bfbc7a9a8f36 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 1 Feb 2017 14:42:23 +0900 Subject: [PATCH 6/8] Modify JSONOptions/CSVOptions to take default timezone id argument. --- .../expressions/jsonExpressions.scala | 27 ++------------- .../spark/sql/catalyst/json/JSONOptions.scala | 7 ++-- .../expressions/JsonExpressionsSuite.scala | 9 +++-- .../apache/spark/sql/DataFrameReader.scala | 13 ++----- .../scala/org/apache/spark/sql/Dataset.scala | 5 +-- .../datasources/csv/CSVFileFormat.scala | 21 ++---------- .../datasources/csv/CSVOptions.scala | 8 +++-- .../datasources/csv/UnivocityGenerator.scala | 2 +- .../datasources/json/JsonFileFormat.scala | 26 ++++---------- .../datasources/csv/CSVInferSchemaSuite.scala | 20 +++++------ .../csv/UnivocityParserSuite.scala | 34 +++++++++---------- .../datasources/json/JsonSuite.scala | 7 ++-- 12 files changed, 65 insertions(+), 114 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 70b16d8337e4..aea7106f629a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json._ -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, ParseModes} +import org.apache.spark.sql.catalyst.util.ParseModes import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -493,23 +493,12 @@ case class JsonToStruct( def this(schema: StructType, options: Map[String, String], child: Expression) = this(schema, options, child, None) - @transient - lazy val optionsWithTimeZone = { - val caseInsensitiveOptions: CaseInsensitiveMap = - new CaseInsensitiveMap(options + ("mode" -> ParseModes.FAIL_FAST_MODE)) - if (caseInsensitiveOptions.contains("timeZone")) { - caseInsensitiveOptions - } else { - new CaseInsensitiveMap(caseInsensitiveOptions + ("timeZone" -> timeZoneId.get)) - } - } - @transient lazy val parser = new JacksonParser( schema, "invalid", // Not used since we force fail fast. Invalid rows will be set to `null`. - new JSONOptions(optionsWithTimeZone)) + new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get)) override def dataType: DataType = schema @@ -537,16 +526,6 @@ case class StructToJson( def this(options: Map[String, String], child: Expression) = this(options, child, None) - @transient - lazy val optionsWithTimeZone = { - val caseInsensitiveOptions: CaseInsensitiveMap = new CaseInsensitiveMap(options) - if (caseInsensitiveOptions.contains("timeZone")) { - caseInsensitiveOptions - } else { - new CaseInsensitiveMap(caseInsensitiveOptions + ("timeZone" -> timeZoneId.get)) - } - } - @transient lazy val writer = new CharArrayWriter() @@ -555,7 +534,7 @@ case class StructToJson( new JacksonGenerator( child.dataType.asInstanceOf[StructType], writer, - new JSONOptions(optionsWithTimeZone)) + new JSONOptions(options, timeZoneId.get)) override def dataType: DataType = StringType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 4fb667684a1f..5456dcc271bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -31,10 +31,11 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]]. */ private[sql] class JSONOptions( - @transient private val parameters: CaseInsensitiveMap) + @transient private val parameters: CaseInsensitiveMap, defaultTimeZoneId: String) extends Logging with Serializable { - def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String], defaultTimeZoneId: String) = + this(new CaseInsensitiveMap(parameters), defaultTimeZoneId) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) @@ -58,7 +59,7 @@ private[sql] class JSONOptions( private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord") - val timeZone: TimeZone = TimeZone.getTimeZone(parameters("timeZone")) + val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId)) // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. val dateFormat: FastDateFormat = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index ac47074ca3ec..0ec5aa6f22f3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -413,7 +413,8 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { JsonToStruct( schema, Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> tz.getID), - Literal(jsonData2)), + Literal(jsonData2), + gmtId), InternalRow.fromSeq(c.getTimeInMillis * 1000L :: Nil) ) } @@ -456,13 +457,15 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation( StructToJson( Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> gmtId.get), - struct), + struct, + gmtId), """{"t":"2016-01-01T00:00:00"}""" ) checkEvaluation( StructToJson( Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> "PST"), - struct), + struct, + gmtId), """{"t":"2015-12-31T16:00:00"}""" ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 0ec2d337c247..c7af2c88baff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -332,17 +332,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def json(jsonRDD: RDD[String]): DataFrame = { - val optionsWithTimeZone = { - val options = extraOptions.toMap - val caseInsensitiveOptions = new CaseInsensitiveMap(options) - if (caseInsensitiveOptions.contains("timeZone")) { - caseInsensitiveOptions - } else { - new CaseInsensitiveMap( - options + ("timeZone" -> sparkSession.sessionState.conf.sessionLocalTimeZone)) - } - } - val parsedOptions: JSONOptions = new JSONOptions(optionsWithTimeZone) + val parsedOptions: JSONOptions = + new JSONOptions(extraOptions.toMap, sparkSession.sessionState.conf.sessionLocalTimeZone) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 062b59054db9..8aca04ba6998 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2670,11 +2670,12 @@ class Dataset[T] private[sql]( */ def toJSON: Dataset[String] = { val rowSchema = this.schema - val options = Map("timeZone" -> sparkSession.sessionState.conf.sessionLocalTimeZone) + val sessionLocalTimeZone = sparkSession.sessionState.conf.sessionLocalTimeZone val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter => val writer = new CharArrayWriter() // create the Generator without separator inserted between 2 records - val gen = new JacksonGenerator(rowSchema, writer, new JSONOptions(options)) + val gen = new JacksonGenerator(rowSchema, writer, + new JSONOptions(Map.empty[String, String], sessionLocalTimeZone)) new Iterator[String] { override def hasNext: Boolean = iter.hasNext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 60c5d5e7ab76..ad18c8731119 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -58,8 +58,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { files: Seq[FileStatus]): Option[StructType] = { require(files.nonEmpty, "Cannot infer schema from an empty set of files") - val optionsWithTimeZone = getOptionsWithTimeZone(sparkSession, options) - val csvOptions = new CSVOptions(optionsWithTimeZone) + val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val paths = files.map(_.getPath.toString) val lines: Dataset[String] = readText(sparkSession, csvOptions, paths) val firstLine: String = findFirstLine(csvOptions, lines) @@ -128,8 +127,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { dataSchema: StructType): OutputWriterFactory = { verifySchema(dataSchema) val conf = job.getConfiguration - val optionsWithTimeZone = getOptionsWithTimeZone(sparkSession, options) - val csvOptions = new CSVOptions(optionsWithTimeZone) + val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) csvOptions.compressionCodec.foreach { codec => CompressionCodecs.setCodecConfiguration(conf, codec) } @@ -156,8 +154,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - val optionsWithTimeZone = getOptionsWithTimeZone(sparkSession, options) - val csvOptions = new CSVOptions(optionsWithTimeZone) + val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val commentPrefix = csvOptions.comment.toString val broadcastedHadoopConf = @@ -234,18 +231,6 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { schema.foreach(field => verifyType(field.dataType)) } - - private def getOptionsWithTimeZone( - sparkSession: SparkSession, - options: Map[String, String]): CaseInsensitiveMap = { - val caseInsensitiveOptions = new CaseInsensitiveMap(options) - if (caseInsensitiveOptions.contains("timeZone")) { - caseInsensitiveOptions - } else { - new CaseInsensitiveMap( - options + ("timeZone" -> sparkSession.sessionState.conf.sessionLocalTimeZone)) - } - } } private[csv] class CsvOutputWriter( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 2c669242ae06..791ecaf7a699 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -26,10 +26,12 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes} -private[csv] class CSVOptions(@transient private val parameters: CaseInsensitiveMap) +private[csv] class CSVOptions( + @transient private val parameters: CaseInsensitiveMap, defaultTimeZoneId: String) extends Logging with Serializable { - def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String], defaultTimeZoneId: String) = + this(new CaseInsensitiveMap(parameters), defaultTimeZoneId) private def getChar(paramName: String, default: Char): Char = { val paramValue = parameters.get(paramName) @@ -106,7 +108,7 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive name.map(CompressionCodecs.getCodecClassName) } - val timeZone: TimeZone = TimeZone.getTimeZone(parameters("timeZone")) + val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId)) // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. val dateFormat: FastDateFormat = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala index ee79138c0f19..4082a0df8ba7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types._ private[csv] class UnivocityGenerator( schema: StructType, writer: Writer, - options: CSVOptions = new CSVOptions(Map.empty[String, String])) { + options: CSVOptions) { private val writerSettings = options.asWriterSettings writerSettings.setHeaders(schema.fieldNames: _*) private val gen = new CsvWriter(writer, writerSettings) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index ad81852d7cc8..78637c1f61c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions} -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs} +import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType @@ -47,8 +47,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { if (files.isEmpty) { None } else { - val optionsWithTimeZone = getOptionsWithTimeZone(sparkSession, options) - val parsedOptions: JSONOptions = new JSONOptions(optionsWithTimeZone) + val parsedOptions: JSONOptions = + new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) @@ -68,8 +68,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { val conf = job.getConfiguration - val optionsWithTimeZone = getOptionsWithTimeZone(sparkSession, options) - val parsedOptions: JSONOptions = new JSONOptions(optionsWithTimeZone) + val parsedOptions: JSONOptions = + new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) parsedOptions.compressionCodec.foreach { codec => CompressionCodecs.setCodecConfiguration(conf, codec) } @@ -99,8 +99,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val optionsWithTimeZone = getOptionsWithTimeZone(sparkSession, options) - val parsedOptions: JSONOptions = new JSONOptions(optionsWithTimeZone) + val parsedOptions: JSONOptions = + new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) @@ -132,18 +132,6 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { classOf[Text]).map(_._2.toString) // get the text line } - private def getOptionsWithTimeZone( - sparkSession: SparkSession, - options: Map[String, String]): CaseInsensitiveMap = { - val caseInsensitiveOptions = new CaseInsensitiveMap(options) - if (caseInsensitiveOptions.contains("timeZone")) { - caseInsensitiveOptions - } else { - new CaseInsensitiveMap( - options + ("timeZone" -> sparkSession.sessionState.conf.sessionLocalTimeZone)) - } - } - /** Constraints to be imposed on schema to be stored. */ private def checkConstraints(schema: StructType): Unit = { if (schema.fieldNames.length != schema.fieldNames.distinct.length) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index e937790f0f6d..288dbc40547f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.types._ class CSVInferSchemaSuite extends SparkFunSuite { test("String fields types are inferred correctly from null types") { - val options = new CSVOptions(Map("timeZone" -> "GMT")) + val options = new CSVOptions(Map.empty[String, String], "GMT") assert(CSVInferSchema.inferField(NullType, "", options) == NullType) assert(CSVInferSchema.inferField(NullType, null, options) == NullType) assert(CSVInferSchema.inferField(NullType, "100000000000", options) == LongType) @@ -41,7 +41,7 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("String fields types are inferred correctly from other types") { - val options = new CSVOptions(Map("timeZone" -> "GMT")) + val options = new CSVOptions(Map.empty[String, String], "GMT") assert(CSVInferSchema.inferField(LongType, "1.0", options) == DoubleType) assert(CSVInferSchema.inferField(LongType, "test", options) == StringType) assert(CSVInferSchema.inferField(IntegerType, "1.0", options) == DoubleType) @@ -60,21 +60,21 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("Timestamp field types are inferred correctly via custom data format") { - var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm", "timeZone" -> "GMT")) + var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT") assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) - options = new CSVOptions(Map("timestampFormat" -> "yyyy", "timeZone" -> "GMT")) + options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT") assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType) } test("Timestamp field types are inferred correctly from other types") { - val options = new CSVOptions(Map("timeZone" -> "GMT")) + val options = new CSVOptions(Map.empty[String, String], "GMT") assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType) assert(CSVInferSchema.inferField(DoubleType, "2015-08-20 14:10", options) == StringType) assert(CSVInferSchema.inferField(LongType, "2015-08 14:49:00", options) == StringType) } test("Boolean fields types are inferred correctly from other types") { - val options = new CSVOptions(Map("timeZone" -> "GMT")) + val options = new CSVOptions(Map.empty[String, String], "GMT") assert(CSVInferSchema.inferField(LongType, "Fale", options) == StringType) assert(CSVInferSchema.inferField(DoubleType, "TRUEe", options) == StringType) } @@ -92,12 +92,12 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("Null fields are handled properly when a nullValue is specified") { - var options = new CSVOptions(Map("nullValue" -> "null", "timeZone" -> "GMT")) + var options = new CSVOptions(Map("nullValue" -> "null"), "GMT") assert(CSVInferSchema.inferField(NullType, "null", options) == NullType) assert(CSVInferSchema.inferField(StringType, "null", options) == StringType) assert(CSVInferSchema.inferField(LongType, "null", options) == LongType) - options = new CSVOptions(Map("nullValue" -> "\\N", "timeZone" -> "GMT")) + options = new CSVOptions(Map("nullValue" -> "\\N"), "GMT") assert(CSVInferSchema.inferField(IntegerType, "\\N", options) == IntegerType) assert(CSVInferSchema.inferField(DoubleType, "\\N", options) == DoubleType) assert(CSVInferSchema.inferField(TimestampType, "\\N", options) == TimestampType) @@ -111,12 +111,12 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { - val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm", "timeZone" -> "GMT")) + val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"), "GMT") assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) } test("SPARK-18877: `inferField` on DecimalType should find a common type with `typeSoFar`") { - val options = new CSVOptions(Map("timeZone" -> "GMT")) + val options = new CSVOptions(Map.empty[String, String], "GMT") // 9.03E+12 is Decimal(3, -10) and 1.19E+11 is Decimal(3, -9). assert(CSVInferSchema.inferField(DecimalType(3, -10), "1.19E+11", options) == diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala index 03e166462acd..447e71e1675d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.unsafe.types.UTF8String class UnivocityParserSuite extends SparkFunSuite { private val parser = - new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map("timeZone" -> "GMT"))) + new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map.empty[String, String], "GMT")) private def assertNull(v: Any) = assert(v == null) @@ -38,7 +38,7 @@ class UnivocityParserSuite extends SparkFunSuite { stringValues.zip(decimalValues).foreach { case (strVal, decimalVal) => val decimalValue = new BigDecimal(decimalVal.toString) - val options = new CSVOptions(Map("timeZone" -> "GMT")) + val options = new CSVOptions(Map.empty[String, String], "GMT") assert(parser.makeConverter("_1", decimalType, options = options).apply(strVal) === Decimal(decimalValue, decimalType.precision, decimalType.scale)) } @@ -75,21 +75,21 @@ class UnivocityParserSuite extends SparkFunSuite { // Nullable field with nullValue option. types.foreach { t => // Tests that a custom nullValue. - val nullValueOptions = new CSVOptions(Map("nullValue" -> "-", "timeZone" -> "GMT")) + val nullValueOptions = new CSVOptions(Map("nullValue" -> "-"), "GMT") val converter = parser.makeConverter("_1", t, nullable = true, options = nullValueOptions) assertNull(converter.apply("-")) assertNull(converter.apply(null)) // Tests that the default nullValue is empty string. - val options = new CSVOptions(Map("timeZone" -> "GMT")) + val options = new CSVOptions(Map.empty[String, String], "GMT") assertNull(parser.makeConverter("_1", t, nullable = true, options = options).apply("")) } // Not nullable field with nullValue option. types.foreach { t => // Casts a null to not nullable field should throw an exception. - val options = new CSVOptions(Map("nullValue" -> "-", "timeZone" -> "GMT")) + val options = new CSVOptions(Map("nullValue" -> "-"), "GMT") val converter = parser.makeConverter("_1", t, nullable = false, options = options) var message = intercept[RuntimeException] { @@ -105,7 +105,7 @@ class UnivocityParserSuite extends SparkFunSuite { // If nullValue is different with empty string, then, empty string should not be casted into // null. Seq(true, false).foreach { b => - val options = new CSVOptions(Map("nullValue" -> "null", "timeZone" -> "GMT")) + val options = new CSVOptions(Map("nullValue" -> "null"), "GMT") val converter = parser.makeConverter("_1", StringType, nullable = b, options = options) assert(converter.apply("") == UTF8String.fromString("")) @@ -113,7 +113,7 @@ class UnivocityParserSuite extends SparkFunSuite { } test("Throws exception for empty string with non null type") { - val options = new CSVOptions(Map("timeZone" -> "GMT")) + val options = new CSVOptions(Map.empty[String, String], "GMT") val exception = intercept[RuntimeException]{ parser.makeConverter("_1", IntegerType, nullable = false, options = options).apply("") } @@ -121,7 +121,7 @@ class UnivocityParserSuite extends SparkFunSuite { } test("Types are cast correctly") { - val options = new CSVOptions(Map("timeZone" -> "GMT")) + val options = new CSVOptions(Map.empty[String, String], "GMT") assert(parser.makeConverter("_1", ByteType, options = options).apply("10") == 10) assert(parser.makeConverter("_1", ShortType, options = options).apply("10") == 10) assert(parser.makeConverter("_1", IntegerType, options = options).apply("10") == 10) @@ -131,7 +131,7 @@ class UnivocityParserSuite extends SparkFunSuite { assert(parser.makeConverter("_1", BooleanType, options = options).apply("true") == true) val timestampsOptions = - new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm", "timeZone" -> "GMT")) + new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), "GMT") val customTimestamp = "31/01/2015 00:00" val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime val castedTimestamp = @@ -140,7 +140,7 @@ class UnivocityParserSuite extends SparkFunSuite { assert(castedTimestamp == expectedTime * 1000L) val customDate = "31/01/2015" - val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy", "timeZone" -> "GMT")) + val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), "GMT") val expectedDate = dateOptions.dateFormat.parse(customDate).getTime val castedDate = parser.makeConverter("_1", DateType, nullable = true, options = dateOptions) @@ -159,7 +159,7 @@ class UnivocityParserSuite extends SparkFunSuite { try { Locale.setDefault(new Locale("fr", "FR")) // Would parse as 1.0 in fr-FR - val options = new CSVOptions(Map("timeZone" -> "GMT")) + val options = new CSVOptions(Map.empty[String, String], "GMT") assert(parser.makeConverter("_1", FloatType, options = options).apply("1,00") == 100.0) assert(parser.makeConverter("_1", DoubleType, options = options).apply("1,00") == 100.0) } finally { @@ -168,7 +168,7 @@ class UnivocityParserSuite extends SparkFunSuite { } test("Float NaN values are parsed correctly") { - val options = new CSVOptions(Map("nanValue" -> "nn", "timeZone" -> "GMT")) + val options = new CSVOptions(Map("nanValue" -> "nn"), "GMT") val floatVal: Float = parser.makeConverter( "_1", FloatType, nullable = true, options = options ).apply("nn").asInstanceOf[Float] @@ -179,7 +179,7 @@ class UnivocityParserSuite extends SparkFunSuite { } test("Double NaN values are parsed correctly") { - val options = new CSVOptions(Map("nanValue" -> "-", "timeZone" -> "GMT")) + val options = new CSVOptions(Map("nanValue" -> "-"), "GMT") val doubleVal: Double = parser.makeConverter( "_1", DoubleType, nullable = true, options = options ).apply("-").asInstanceOf[Double] @@ -188,14 +188,14 @@ class UnivocityParserSuite extends SparkFunSuite { } test("Float infinite values can be parsed") { - val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max", "timeZone" -> "GMT")) + val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT") val floatVal1 = parser.makeConverter( "_1", FloatType, nullable = true, options = negativeInfOptions ).apply("max").asInstanceOf[Float] assert(floatVal1 == Float.NegativeInfinity) - val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max", "timeZone" -> "GMT")) + val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT") val floatVal2 = parser.makeConverter( "_1", FloatType, nullable = true, options = positiveInfOptions ).apply("max").asInstanceOf[Float] @@ -204,14 +204,14 @@ class UnivocityParserSuite extends SparkFunSuite { } test("Double infinite values can be parsed") { - val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max", "timeZone" -> "GMT")) + val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT") val doubleVal1 = parser.makeConverter( "_1", DoubleType, nullable = true, options = negativeInfOptions ).apply("max").asInstanceOf[Double] assert(doubleVal1 == Double.NegativeInfinity) - val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max", "timeZone" -> "GMT")) + val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT") val doubleVal2 = parser.makeConverter( "_1", DoubleType, nullable = true, options = positiveInfOptions ).apply("max").asInstanceOf[Double] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index b37bf8880f63..9e000abfd28e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -62,7 +62,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { generator.flush() } - val dummyOption = new JSONOptions(Map("timeZone" -> "GMT")) + val dummyOption = new JSONOptions(Map.empty[String, String], "GMT") val dummySchema = StructType(Seq.empty) val parser = new JacksonParser(dummySchema, "", dummyOption) @@ -1366,7 +1366,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-6245 JsonRDD.inferSchema on empty RDD") { // This is really a test that it doesn't throw an exception - val emptySchema = InferSchema.infer(empty, "", new JSONOptions(Map("timeZone" -> "GMT"))) + val emptySchema = InferSchema.infer( + empty, "", new JSONOptions(Map.empty[String, String], "GMT")) assert(StructType(Seq()) === emptySchema) } @@ -1391,7 +1392,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-8093 Erase empty structs") { val emptySchema = InferSchema.infer( - emptyRecords, "", new JSONOptions(Map("timeZone" -> "GMT"))) + emptyRecords, "", new JSONOptions(Map.empty[String, String], "GMT")) assert(StructType(Seq()) === emptySchema) } From dec84e05dc6a1dde778248f362891faafdd51c34 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 9 Feb 2017 12:45:19 +0900 Subject: [PATCH 7/8] Use `InternalRow.apply()` instead of `InternalRow.fromSeq()`. --- .../expressions/JsonExpressionsSuite.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 0ec5aa6f22f3..98556dead63d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -307,43 +307,43 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("json_tuple - hive key 4 - null json") { checkJsonTuple( JsonTuple(Literal(null) :: jsonTupleQuery), - InternalRow.fromSeq(Seq(null, null, null, null, null))) + InternalRow(null, null, null, null, null)) } test("json_tuple - hive key 5 - null and empty fields") { checkJsonTuple( JsonTuple(Literal("""{"f1": "", "f5": null}""") :: jsonTupleQuery), - InternalRow.fromSeq(Seq(UTF8String.fromString(""), null, null, null, null))) + InternalRow(UTF8String.fromString(""), null, null, null, null)) } test("json_tuple - hive key 6 - invalid json (array)") { checkJsonTuple( JsonTuple(Literal("[invalid JSON string]") :: jsonTupleQuery), - InternalRow.fromSeq(Seq(null, null, null, null, null))) + InternalRow(null, null, null, null, null)) } test("json_tuple - invalid json (object start only)") { checkJsonTuple( JsonTuple(Literal("{") :: jsonTupleQuery), - InternalRow.fromSeq(Seq(null, null, null, null, null))) + InternalRow(null, null, null, null, null)) } test("json_tuple - invalid json (no object end)") { checkJsonTuple( JsonTuple(Literal("""{"foo": "bar"""") :: jsonTupleQuery), - InternalRow.fromSeq(Seq(null, null, null, null, null))) + InternalRow(null, null, null, null, null)) } test("json_tuple - invalid json (invalid json)") { checkJsonTuple( JsonTuple(Literal("\\") :: jsonTupleQuery), - InternalRow.fromSeq(Seq(null, null, null, null, null))) + InternalRow(null, null, null, null, null)) } test("json_tuple - preserve newlines") { checkJsonTuple( JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil), - InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc")))) + InternalRow(UTF8String.fromString("b\nc"))) } val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID) @@ -353,7 +353,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), - InternalRow.fromSeq(1 :: Nil) + InternalRow(1) ) } @@ -389,11 +389,11 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { c.set(Calendar.MILLISECOND, 123) checkEvaluation( JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId), - InternalRow.fromSeq(c.getTimeInMillis * 1000L :: Nil) + InternalRow(c.getTimeInMillis * 1000L) ) checkEvaluation( JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")), - InternalRow.fromSeq(c.getTimeInMillis * 1000L :: Nil) + InternalRow(c.getTimeInMillis * 1000L) ) val jsonData2 = """{"t": "2016-01-01T00:00:00"}""" @@ -407,7 +407,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"), Literal(jsonData2), Option(tz.getID)), - InternalRow.fromSeq(c.getTimeInMillis * 1000L :: Nil) + InternalRow(c.getTimeInMillis * 1000L) ) checkEvaluation( JsonToStruct( @@ -415,7 +415,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> tz.getID), Literal(jsonData2), gmtId), - InternalRow.fromSeq(c.getTimeInMillis * 1000L :: Nil) + InternalRow(c.getTimeInMillis * 1000L) ) } } From ffc4912e17cc900fc9d7ceefd0f66461109728e9 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 9 Feb 2017 13:06:16 +0900 Subject: [PATCH 8/8] Add a comment to explain the test result. --- .../spark/sql/catalyst/expressions/JsonExpressionsSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 98556dead63d..136627bfa945 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -391,6 +391,9 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId), InternalRow(c.getTimeInMillis * 1000L) ) + // The result doesn't change because the json string includes timezone string ("Z" here), + // which means the string represents the timestamp string in the timezone regardless of + // the timeZoneId parameter. checkEvaluation( JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")), InternalRow(c.getTimeInMillis * 1000L)