Skip to content

Commit b09b7f7

Browse files
committed
[SPARK-36034][SQL] Rebase datetime in pushed down filters to parquet
### What changes were proposed in this pull request? In the PR, I propose to propagate either the SQL config `spark.sql.parquet.datetimeRebaseModeInRead` or/and Parquet option `datetimeRebaseMode` to `ParquetFilters`. The `ParquetFilters` class uses the settings in conversions of dates/timestamps instances from datasource filters to values pushed via `FilterApi` to the `parquet-column` lib. Before the changes, date/timestamp values expressed as days/microseconds/milliseconds are interpreted as offsets in Proleptic Gregorian calendar, and pushed to the parquet library as is. That works fine if timestamp/dates values in parquet files were saved in the `CORRECTED` mode but in the `LEGACY` mode, filter's values could not match to actual values. After the changes, timestamp/dates values of filters pushed down to parquet libs such as `FilterApi.eq(col1, -719162)` are rebased according the rebase settings. For the example, if the rebase mode is `CORRECTED`, **-719162** is pushed down as is but if the current rebase mode is `LEGACY`, the number of days is rebased to **-719164**. For more context, the PR description #28067 shows the diffs between two calendars. ### Why are the changes needed? The changes fix the bug portrayed by the following example from SPARK-36034: ```scala In [27]: spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY") >>> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy") >>> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show() +----+ |date| +----+ +----+ ``` The result must have the date value `0001-01-01`. ### Does this PR introduce _any_ user-facing change? In some sense, yes. Query results can be different in some cases. For the example above: ```scala scala> spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY") scala> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy") scala> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show(false) +----------+ |date | +----------+ |0001-01-01| +----------+ ``` ### How was this patch tested? By running the modified test suite `ParquetFilterSuite`: ``` $ build/sbt "test:testOnly *ParquetV1FilterSuite" $ build/sbt "test:testOnly *ParquetV2FilterSuite" ``` Closes #33347 from MaxGekk/fix-parquet-ts-filter-pushdown. Authored-by: Max Gekk <[email protected]> Signed-off-by: Max Gekk <[email protected]>
1 parent c8a3c22 commit b09b7f7

File tree

5 files changed

+145
-96
lines changed

5 files changed

+145
-96
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -266,11 +266,21 @@ class ParquetFileFormat
266266

267267
lazy val footerFileMetaData =
268268
ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
269+
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
270+
footerFileMetaData.getKeyValueMetaData.get,
271+
datetimeRebaseModeInRead)
269272
// Try to push down filters when filter push-down is enabled.
270273
val pushed = if (enableParquetFilterPushDown) {
271274
val parquetSchema = footerFileMetaData.getSchema
272-
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
273-
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
275+
val parquetFilters = new ParquetFilters(
276+
parquetSchema,
277+
pushDownDate,
278+
pushDownTimestamp,
279+
pushDownDecimal,
280+
pushDownStringStartWith,
281+
pushDownInFilterThreshold,
282+
isCaseSensitive,
283+
datetimeRebaseMode)
274284
filters
275285
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
276286
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
@@ -296,9 +306,6 @@ class ParquetFileFormat
296306
None
297307
}
298308

299-
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
300-
footerFileMetaData.getKeyValueMetaData.get,
301-
datetimeRebaseModeInRead)
302309
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
303310
footerFileMetaData.getKeyValueMetaData.get,
304311
int96RebaseModeInRead)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
3535
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
3636

3737
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
38+
import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseGregorianToJulianMicros}
39+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
3840
import org.apache.spark.sql.sources
3941
import org.apache.spark.unsafe.types.UTF8String
4042

@@ -48,7 +50,8 @@ class ParquetFilters(
4850
pushDownDecimal: Boolean,
4951
pushDownStartWith: Boolean,
5052
pushDownInFilterThreshold: Int,
51-
caseSensitive: Boolean) {
53+
caseSensitive: Boolean,
54+
datetimeRebaseMode: LegacyBehaviorPolicy.Value) {
5255
// A map which contains parquet field name and data type, if predicate push down applies.
5356
//
5457
// Each key in `nameToParquetField` represents a column; `dots` are used as separators for
@@ -129,14 +132,26 @@ class ParquetFilters(
129132
private val ParquetTimestampMillisType =
130133
ParquetSchemaType(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS), INT64, 0)
131134

132-
private def dateToDays(date: Any): Int = date match {
133-
case d: Date => DateTimeUtils.fromJavaDate(d)
134-
case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
135+
private def dateToDays(date: Any): Int = {
136+
val gregorianDays = date match {
137+
case d: Date => DateTimeUtils.fromJavaDate(d)
138+
case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
139+
}
140+
datetimeRebaseMode match {
141+
case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianDays(gregorianDays)
142+
case _ => gregorianDays
143+
}
135144
}
136145

137-
private def timestampToMicros(v: Any): JLong = v match {
138-
case i: Instant => DateTimeUtils.instantToMicros(i)
139-
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
146+
private def timestampToMicros(v: Any): JLong = {
147+
val gregorianMicros = v match {
148+
case i: Instant => DateTimeUtils.instantToMicros(i)
149+
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
150+
}
151+
datetimeRebaseMode match {
152+
case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianMicros(gregorianMicros)
153+
case _ => gregorianMicros
154+
}
140155
}
141156

142157
private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue()

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,21 @@ case class ParquetPartitionReaderFactory(
133133

134134
lazy val footerFileMetaData =
135135
ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData
136+
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
137+
footerFileMetaData.getKeyValueMetaData.get,
138+
datetimeRebaseModeInRead)
136139
// Try to push down filters when filter push-down is enabled.
137140
val pushed = if (enableParquetFilterPushDown) {
138141
val parquetSchema = footerFileMetaData.getSchema
139-
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
140-
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
142+
val parquetFilters = new ParquetFilters(
143+
parquetSchema,
144+
pushDownDate,
145+
pushDownTimestamp,
146+
pushDownDecimal,
147+
pushDownStringStartWith,
148+
pushDownInFilterThreshold,
149+
isCaseSensitive,
150+
datetimeRebaseMode)
141151
filters
142152
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
143153
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
@@ -170,9 +180,6 @@ case class ParquetPartitionReaderFactory(
170180
if (pushed.isDefined) {
171181
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
172182
}
173-
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
174-
footerFileMetaData.getKeyValueMetaData.get,
175-
datetimeRebaseModeInRead)
176183
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
177184
footerFileMetaData.getKeyValueMetaData.get,
178185
int96RebaseModeInRead)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
2424
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
2525
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, SparkToParquetSchemaConverter}
2626
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
27+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
2728
import org.apache.spark.sql.sources.Filter
2829
import org.apache.spark.sql.types.StructType
2930
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -51,8 +52,17 @@ case class ParquetScanBuilder(
5152
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
5253
val parquetSchema =
5354
new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(readDataSchema())
54-
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
55-
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
55+
val parquetFilters = new ParquetFilters(
56+
parquetSchema,
57+
pushDownDate,
58+
pushDownTimestamp,
59+
pushDownDecimal,
60+
pushDownStringStartWith,
61+
pushDownInFilterThreshold,
62+
isCaseSensitive,
63+
// The rebase mode doesn't matter here because the filters are used to determine
64+
// whether they is convertible.
65+
LegacyBehaviorPolicy.CORRECTED)
5666
parquetFilters.convertibleFilters(this.filters).toArray
5767
}
5868

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 87 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
4545
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
4646
import org.apache.spark.sql.functions._
4747
import org.apache.spark.sql.internal.SQLConf
48-
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
48+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
49+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, LEGACY}
50+
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.{INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS}
4951
import org.apache.spark.sql.test.SharedSparkSession
5052
import org.apache.spark.sql.types._
5153
import org.apache.spark.tags.ExtendedSQLTest
@@ -73,11 +75,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
7375

7476
protected def createParquetFilters(
7577
schema: MessageType,
76-
caseSensitive: Option[Boolean] = None): ParquetFilters =
78+
caseSensitive: Option[Boolean] = None,
79+
datetimeRebaseMode: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED
80+
): ParquetFilters =
7781
new ParquetFilters(schema, conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp,
7882
conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith,
7983
conf.parquetFilterPushDownInFilterThreshold,
80-
caseSensitive.getOrElse(conf.caseSensitiveAnalysis))
84+
caseSensitive.getOrElse(conf.caseSensitiveAnalysis),
85+
datetimeRebaseMode)
8186

8287
override def beforeEach(): Unit = {
8388
super.beforeEach()
@@ -587,71 +592,75 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
587592
def date: Date = Date.valueOf(s)
588593
}
589594

590-
val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")
595+
val data = Seq("1000-01-01", "2018-03-19", "2018-03-20", "2018-03-21")
591596
import testImplicits._
592597

593598
Seq(false, true).foreach { java8Api =>
594-
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
595-
val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF()
596-
withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) =>
597-
implicit val df: DataFrame = inputDF
598-
599-
def resultFun(dateStr: String): Any = {
600-
val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr)
601-
fun(parsed)
602-
}
603-
604-
val dateAttr: Expression = df(colName).expr
605-
assert(df(colName).expr.dataType === DateType)
606-
607-
checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
608-
checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
609-
data.map(i => Row.apply(resultFun(i))))
610-
611-
checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]],
612-
resultFun("2018-03-18"))
613-
checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]],
614-
resultFun("2018-03-18"))
615-
checkFilterPredicate(dateAttr =!= "2018-03-18".date, classOf[NotEq[_]],
616-
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i))))
617-
618-
checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
619-
resultFun("2018-03-18"))
620-
checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
621-
resultFun("2018-03-21"))
622-
checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]],
623-
resultFun("2018-03-18"))
624-
checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
625-
resultFun("2018-03-21"))
626-
627-
checkFilterPredicate(Literal("2018-03-18".date) === dateAttr, classOf[Eq[_]],
628-
resultFun("2018-03-18"))
629-
checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr, classOf[Eq[_]],
630-
resultFun("2018-03-18"))
631-
checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]],
632-
resultFun("2018-03-18"))
633-
checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]],
634-
resultFun("2018-03-21"))
635-
checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr, classOf[LtEq[_]],
636-
resultFun("2018-03-18"))
637-
checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]],
638-
resultFun("2018-03-21"))
639-
640-
checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]],
641-
resultFun("2018-03-21"))
642-
checkFilterPredicate(
643-
dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
644-
classOf[Operators.Or],
645-
Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21"))))
599+
Seq(CORRECTED, LEGACY).foreach { rebaseMode =>
600+
withSQLConf(
601+
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
602+
SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString) {
603+
val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF()
604+
withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) =>
605+
implicit val df: DataFrame = inputDF
606+
607+
def resultFun(dateStr: String): Any = {
608+
val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr)
609+
fun(parsed)
610+
}
646611

647-
Seq(3, 20).foreach { threshold =>
648-
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> s"$threshold") {
649-
checkFilterPredicate(
650-
In(dateAttr, Array("2018-03-19".date, "2018-03-20".date, "2018-03-21".date,
651-
"2018-03-22".date).map(Literal.apply)),
652-
if (threshold == 3) classOf[Operators.And] else classOf[Operators.Or],
653-
Seq(Row(resultFun("2018-03-19")), Row(resultFun("2018-03-20")),
654-
Row(resultFun("2018-03-21"))))
612+
val dateAttr: Expression = df(colName).expr
613+
assert(df(colName).expr.dataType === DateType)
614+
615+
checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
616+
checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
617+
data.map(i => Row.apply(resultFun(i))))
618+
619+
checkFilterPredicate(dateAttr === "1000-01-01".date, classOf[Eq[_]],
620+
resultFun("1000-01-01"))
621+
checkFilterPredicate(dateAttr <=> "1000-01-01".date, classOf[Eq[_]],
622+
resultFun("1000-01-01"))
623+
checkFilterPredicate(dateAttr =!= "1000-01-01".date, classOf[NotEq[_]],
624+
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i))))
625+
626+
checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
627+
resultFun("1000-01-01"))
628+
checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
629+
resultFun("2018-03-21"))
630+
checkFilterPredicate(dateAttr <= "1000-01-01".date, classOf[LtEq[_]],
631+
resultFun("1000-01-01"))
632+
checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
633+
resultFun("2018-03-21"))
634+
635+
checkFilterPredicate(Literal("1000-01-01".date) === dateAttr, classOf[Eq[_]],
636+
resultFun("1000-01-01"))
637+
checkFilterPredicate(Literal("1000-01-01".date) <=> dateAttr, classOf[Eq[_]],
638+
resultFun("1000-01-01"))
639+
checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]],
640+
resultFun("1000-01-01"))
641+
checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]],
642+
resultFun("2018-03-21"))
643+
checkFilterPredicate(Literal("1000-01-01".date) >= dateAttr, classOf[LtEq[_]],
644+
resultFun("1000-01-01"))
645+
checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]],
646+
resultFun("2018-03-21"))
647+
648+
checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]],
649+
resultFun("2018-03-21"))
650+
checkFilterPredicate(
651+
dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
652+
classOf[Operators.Or],
653+
Seq(Row(resultFun("1000-01-01")), Row(resultFun("2018-03-21"))))
654+
655+
Seq(3, 20).foreach { threshold =>
656+
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> s"$threshold") {
657+
checkFilterPredicate(
658+
In(dateAttr, Array("2018-03-19".date, "2018-03-20".date, "2018-03-21".date,
659+
"2018-03-22".date).map(Literal.apply)),
660+
if (threshold == 3) classOf[Operators.And] else classOf[Operators.Or],
661+
Seq(Row(resultFun("2018-03-19")), Row(resultFun("2018-03-20")),
662+
Row(resultFun("2018-03-21"))))
663+
}
655664
}
656665
}
657666
}
@@ -661,35 +670,36 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
661670

662671
test("filter pushdown - timestamp") {
663672
Seq(true, false).foreach { java8Api =>
664-
withSQLConf(
665-
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
666-
SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED",
667-
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
668-
// spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
673+
Seq(CORRECTED, LEGACY).foreach { rebaseMode =>
669674
val millisData = Seq(
670675
"1000-06-14 08:28:53.123",
671676
"1582-06-15 08:28:53.001",
672677
"1900-06-16 08:28:53.0",
673678
"2018-06-17 08:28:53.999")
674-
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
675-
ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
679+
withSQLConf(
680+
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
681+
SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
682+
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MILLIS.toString) {
676683
testTimestampPushdown(millisData, java8Api)
677684
}
678685

679-
// spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
680686
val microsData = Seq(
681687
"1000-06-14 08:28:53.123456",
682688
"1582-06-15 08:28:53.123456",
683689
"1900-06-16 08:28:53.123456",
684690
"2018-06-17 08:28:53.123456")
685-
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
686-
ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
691+
withSQLConf(
692+
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
693+
SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
694+
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MICROS.toString) {
687695
testTimestampPushdown(microsData, java8Api)
688696
}
689697

690-
// spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown
691-
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
692-
ParquetOutputTimestampType.INT96.toString) {
698+
// INT96 doesn't support pushdown
699+
withSQLConf(
700+
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
701+
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
702+
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> INT96.toString) {
693703
import testImplicits._
694704
withTempPath { file =>
695705
millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF

0 commit comments

Comments
 (0)