Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,34 @@ object PartitioningUtils {
Literal(bigDecimal)
}

val dateTry = Try {
// try and parse the date, if no exception occurs this is a candidate to be resolved as
// DateType
DateTimeUtils.getThreadLocalDateFormat.parse(raw)
// SPARK-23436: Casting the string to date may still return null if a bad Date is provided.
// This can happen since DateFormat.parse may not use the entire text of the given string:
// so if there are extra-characters after the date, it returns correctly.
// We need to check that we can cast the raw string since we later can use Cast to get
// the partition values with the right DataType (see
// org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning)
val dateValue = Cast(Literal(raw), DateType).eval()
// Disallow DateType if the cast returned null
require(dateValue != null)
Literal.create(dateValue, DateType)
}

val timestampTry = Try {
val unescapedRaw = unescapePathName(raw)
// try and parse the date, if no exception occurs this is a candidate to be resolved as
// TimestampType
DateTimeUtils.getThreadLocalTimestampFormat(timeZone).parse(unescapedRaw)
// SPARK-23436: see comment for date
val timestampValue = Cast(Literal(unescapedRaw), TimestampType, Some(timeZone.getID)).eval()
// Disallow TimestampType if the cast returned null
require(timestampValue != null)
Literal.create(timestampValue, TimestampType)
}

if (typeInference) {
// First tries integral types
Try(Literal.create(Integer.parseInt(raw), IntegerType))
Expand All @@ -415,16 +443,8 @@ object PartitioningUtils {
// Then falls back to fractional types
.orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
// Then falls back to date/timestamp types
.orElse(Try(
Literal.create(
DateTimeUtils.getThreadLocalTimestampFormat(timeZone)
.parse(unescapePathName(raw)).getTime * 1000L,
TimestampType)))
.orElse(Try(
Literal.create(
DateTimeUtils.millisToDays(
DateTimeUtils.getThreadLocalDateFormat.parse(raw).getTime),
DateType)))
.orElse(timestampTry)
.orElse(dateTry)
// Then falls back to string
.getOrElse {
if (raw == DEFAULT_PARTITION_NAME) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1120,4 +1120,18 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
Row(3, BigDecimal("2" * 30)) :: Nil)
}
}

test("SPARK-23436: invalid Dates should be inferred as String in partition inference") {
withTempPath { path =>
val data = Seq(("1", "2018-01", "2018-01-01-04", "test"))
.toDF("id", "date_month", "date_hour", "data")

data.write.partitionBy("date_month", "date_hour").parquet(path.getAbsolutePath)
val input = spark.read.parquet(path.getAbsolutePath).select("id",
"date_month", "date_hour", "data")

assert(input.schema.sameType(input.schema))
checkAnswer(input, data)
}
}
}