From 2f05ab8e82b0940e84cbe407abe49f72cddeef11 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 15 Feb 2018 17:59:20 +0100 Subject: [PATCH 1/5] [SPARK-23436][SQL] Infer partition as Date only if it can be casted to Date --- .../datasources/PartitioningUtils.scala | 38 +++++++++++++------ .../ParquetPartitionDiscoverySuite.scala | 12 ++++++ 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 472bf82d3604d..37a33af0db37c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -23,9 +23,7 @@ import java.util.{Locale, TimeZone} import scala.collection.mutable.ArrayBuffer import scala.util.Try - import org.apache.hadoop.fs.Path - import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion} @@ -34,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String // TODO: We should tighten up visibility of the classes here once we clean up Hive coupling. @@ -407,6 +406,29 @@ object PartitioningUtils { Literal(bigDecimal) } + val dateTry = Try { + // try and parse the date, if no exception occurs this is a candidate to be resolved as + // DateType + DateTimeUtils.getThreadLocalDateFormat.parse(raw) + // SPARK-23436: Casting the string to date may still return null if a bad Date is provided. + // We need to check that we can cast the raw string since we later can use Cast to get + // the partition values with the right DataType (see + // org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning) + val dateOption = Option(Cast(Literal(raw), DateType).eval()) + Literal.create(dateOption.get, DateType) + } + + val timestampTry = Try { + val unescapedRaw = unescapePathName(raw) + // try and parse the date, if no exception occurs this is a candidate to be resolved as + // TimestampType + DateTimeUtils.getThreadLocalTimestampFormat(timeZone).parse(unescapedRaw) + // SPARK-23436: see comment for date + val timestampOption = Option(Cast(Literal(unescapedRaw), TimestampType, + Some(timeZone.getID)).eval()) + Literal.create(timestampOption.get, TimestampType) + } + if (typeInference) { // First tries integral types Try(Literal.create(Integer.parseInt(raw), IntegerType)) @@ -415,16 +437,8 @@ object PartitioningUtils { // Then falls back to fractional types .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType))) // Then falls back to date/timestamp types - .orElse(Try( - Literal.create( - DateTimeUtils.getThreadLocalTimestampFormat(timeZone) - .parse(unescapePathName(raw)).getTime * 1000L, - TimestampType))) - .orElse(Try( - Literal.create( - DateTimeUtils.millisToDays( - DateTimeUtils.getThreadLocalDateFormat.parse(raw).getTime), - DateType))) + .orElse(timestampTry) + .orElse(dateTry) // Then falls back to string .getOrElse { if (raw == DEFAULT_PARTITION_NAME) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index d4902641e335f..ad06bdb1a8429 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -1120,4 +1120,16 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha Row(3, BigDecimal("2" * 30)) :: Nil) } } + + test("SPARK-23436: invalid Dates should be inferred as String in partition inference") { + withTempPath { path => + val data = Seq(("1", "2018-01", "2018-01-01-04", "test")) + .toDF("id", "date_month", "date_hour", "data") + + data.write.partitionBy("date_month", "date_hour").parquet(path.getAbsolutePath) + val input = spark.read.parquet(path.getAbsolutePath) + checkAnswer(input.select("id", "date_month", "date_hour", "data"), + data.select("id", "date_month", "date_hour", "data")) + } + } } From 5d60f88bdc4fbe4833dea70996034b1f6824b3c4 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 15 Feb 2018 19:50:57 +0100 Subject: [PATCH 2/5] fix scalastyle --- .../spark/sql/execution/datasources/PartitioningUtils.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 37a33af0db37c..b18e2b1fb420d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -23,7 +23,9 @@ import java.util.{Locale, TimeZone} import scala.collection.mutable.ArrayBuffer import scala.util.Try + import org.apache.hadoop.fs.Path + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion} From 6b5640833a2d45986a0cf6074d7211a8ba9d2b3e Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 15 Feb 2018 19:52:00 +0100 Subject: [PATCH 3/5] remove unneeded import --- .../spark/sql/execution/datasources/PartitioningUtils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index b18e2b1fb420d..0f1c39c50816a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils -import org.apache.spark.unsafe.types.UTF8String // TODO: We should tighten up visibility of the classes here once we clean up Hive coupling. From 6274537139b2282ac5f9ded605037f63c7bee2f9 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 16 Feb 2018 10:34:37 +0100 Subject: [PATCH 4/5] address comments --- .../execution/datasources/PartitioningUtils.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 0f1c39c50816a..379acb67f7c71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -412,11 +412,15 @@ object PartitioningUtils { // DateType DateTimeUtils.getThreadLocalDateFormat.parse(raw) // SPARK-23436: Casting the string to date may still return null if a bad Date is provided. + // This can happen since DateFormat.parse may not use the entire text of the given string: + // so if there are extra-characters after the date, it returns correctly. // We need to check that we can cast the raw string since we later can use Cast to get // the partition values with the right DataType (see // org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning) - val dateOption = Option(Cast(Literal(raw), DateType).eval()) - Literal.create(dateOption.get, DateType) + val dateValue = Cast(Literal(raw), DateType).eval() + // Disallow DateType if the cast returned null + require(dateValue != null) + Literal.create(dateValue, DateType) } val timestampTry = Try { @@ -425,9 +429,10 @@ object PartitioningUtils { // TimestampType DateTimeUtils.getThreadLocalTimestampFormat(timeZone).parse(unescapedRaw) // SPARK-23436: see comment for date - val timestampOption = Option(Cast(Literal(unescapedRaw), TimestampType, - Some(timeZone.getID)).eval()) - Literal.create(timestampOption.get, TimestampType) + val timestampValue = Cast(Literal(unescapedRaw), TimestampType, Some(timeZone.getID)).eval() + // Disallow TimestampType if the cast returned null + require(timestampValue != null) + Literal.create(timestampValue, TimestampType) } if (typeInference) { From 8698f4de7b6e1b9453a12bc949ce8666d6322a87 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sat, 17 Feb 2018 18:53:36 +0100 Subject: [PATCH 5/5] add check on output schema --- .../parquet/ParquetPartitionDiscoverySuite.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index ad06bdb1a8429..edb3da904d10d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -1127,9 +1127,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha .toDF("id", "date_month", "date_hour", "data") data.write.partitionBy("date_month", "date_hour").parquet(path.getAbsolutePath) - val input = spark.read.parquet(path.getAbsolutePath) - checkAnswer(input.select("id", "date_month", "date_hour", "data"), - data.select("id", "date_month", "date_hour", "data")) + val input = spark.read.parquet(path.getAbsolutePath).select("id", + "date_month", "date_hour", "data") + + assert(input.schema.sameType(input.schema)) + checkAnswer(input, data) } } }