Skip to content

Commit a69d8d1

Browse files
mgaido91gatorsmile
authored andcommitted
[SPARK-23436][SQL] Infer partition as Date only if it can be casted to Date
## What changes were proposed in this pull request? Before the patch, Spark could infer as Date a partition value which cannot be casted to Date (this can happen when there are extra characters after a valid date, like `2018-02-15AAA`). When this happens and the input format has metadata which define the schema of the table, then `null` is returned as a value for the partition column, because the `cast` operator used in (`PartitioningAwareFileIndex.inferPartitioning`) is unable to convert the value. The PR checks in the partition inference that values can be casted to Date and Timestamp, in order to infer that datatype to them. ## How was this patch tested? added UT Author: Marco Gaido <[email protected]> Closes #20621 from mgaido91/SPARK-23436.
1 parent 8cd6a96 commit a69d8d1

File tree

2 files changed

+44
-10
lines changed

2 files changed

+44
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,34 @@ object PartitioningUtils {
407407
Literal(bigDecimal)
408408
}
409409

410+
val dateTry = Try {
411+
// try and parse the date, if no exception occurs this is a candidate to be resolved as
412+
// DateType
413+
DateTimeUtils.getThreadLocalDateFormat.parse(raw)
414+
// SPARK-23436: Casting the string to date may still return null if a bad Date is provided.
415+
// This can happen since DateFormat.parse may not use the entire text of the given string:
416+
// so if there are extra-characters after the date, it returns correctly.
417+
// We need to check that we can cast the raw string since we later can use Cast to get
418+
// the partition values with the right DataType (see
419+
// org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning)
420+
val dateValue = Cast(Literal(raw), DateType).eval()
421+
// Disallow DateType if the cast returned null
422+
require(dateValue != null)
423+
Literal.create(dateValue, DateType)
424+
}
425+
426+
val timestampTry = Try {
427+
val unescapedRaw = unescapePathName(raw)
428+
// try and parse the date, if no exception occurs this is a candidate to be resolved as
429+
// TimestampType
430+
DateTimeUtils.getThreadLocalTimestampFormat(timeZone).parse(unescapedRaw)
431+
// SPARK-23436: see comment for date
432+
val timestampValue = Cast(Literal(unescapedRaw), TimestampType, Some(timeZone.getID)).eval()
433+
// Disallow TimestampType if the cast returned null
434+
require(timestampValue != null)
435+
Literal.create(timestampValue, TimestampType)
436+
}
437+
410438
if (typeInference) {
411439
// First tries integral types
412440
Try(Literal.create(Integer.parseInt(raw), IntegerType))
@@ -415,16 +443,8 @@ object PartitioningUtils {
415443
// Then falls back to fractional types
416444
.orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
417445
// Then falls back to date/timestamp types
418-
.orElse(Try(
419-
Literal.create(
420-
DateTimeUtils.getThreadLocalTimestampFormat(timeZone)
421-
.parse(unescapePathName(raw)).getTime * 1000L,
422-
TimestampType)))
423-
.orElse(Try(
424-
Literal.create(
425-
DateTimeUtils.millisToDays(
426-
DateTimeUtils.getThreadLocalDateFormat.parse(raw).getTime),
427-
DateType)))
446+
.orElse(timestampTry)
447+
.orElse(dateTry)
428448
// Then falls back to string
429449
.getOrElse {
430450
if (raw == DEFAULT_PARTITION_NAME) {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,4 +1120,18 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
11201120
Row(3, BigDecimal("2" * 30)) :: Nil)
11211121
}
11221122
}
1123+
1124+
test("SPARK-23436: invalid Dates should be inferred as String in partition inference") {
1125+
withTempPath { path =>
1126+
val data = Seq(("1", "2018-01", "2018-01-01-04", "test"))
1127+
.toDF("id", "date_month", "date_hour", "data")
1128+
1129+
data.write.partitionBy("date_month", "date_hour").parquet(path.getAbsolutePath)
1130+
val input = spark.read.parquet(path.getAbsolutePath).select("id",
1131+
"date_month", "date_hour", "data")
1132+
1133+
assert(input.schema.sameType(input.schema))
1134+
checkAnswer(input, data)
1135+
}
1136+
}
11231137
}

0 commit comments

Comments
 (0)