Skip to content

Commit 2bbd9bf

Browse files
MaxGekkfishcus
authored andcommitted
[SPARK-36034][SQL][3.1] Rebase datetime in pushed down filters to parquet
### What changes were proposed in this pull request? In the PR, I propose to propagate either the SQL config `spark.sql.parquet.datetimeRebaseModeInRead` or/and Parquet option `datetimeRebaseMode` to `ParquetFilters`. The `ParquetFilters` class uses the settings in conversions of dates/timestamps instances from datasource filters to values pushed via `FilterApi` to the `parquet-column` lib. Before the changes, date/timestamp values expressed as days/microseconds/milliseconds are interpreted as offsets in Proleptic Gregorian calendar, and pushed to the parquet library as is. That works fine if timestamp/dates values in parquet files were saved in the `CORRECTED` mode but in the `LEGACY` mode, filter's values could not match to actual values. After the changes, timestamp/dates values of filters pushed down to parquet libs such as `FilterApi.eq(col1, -719162)` are rebased according the rebase settings. For the example, if the rebase mode is `CORRECTED`, **-719162** is pushed down as is but if the current rebase mode is `LEGACY`, the number of days is rebased to **-719164**. For more context, the PR description apache#28067 shows the diffs between two calendars. ### Why are the changes needed? The changes fix the bug portrayed by the following example from SPARK-36034: ```scala In [27]: spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY") >>> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy") >>> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show() +----+ |date| +----+ +----+ ``` The result must have the date value `0001-01-01`. ### Does this PR introduce _any_ user-facing change? In some sense, yes. Query results can be different in some cases. For the example above: ```scala scala> spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY") scala> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy") scala> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show(false) +----------+ |date | +----------+ |0001-01-01| +----------+ ``` ### How was this patch tested? By running the modified test suite `ParquetFilterSuite`: ``` $ build/sbt "test:testOnly *ParquetV1FilterSuite" $ build/sbt "test:testOnly *ParquetV2FilterSuite" ``` Authored-by: Max Gekk <max.gekkgmail.com> Signed-off-by: Max Gekk <max.gekkgmail.com> (cherry picked from commit b09b7f7) Signed-off-by: Max Gekk <max.gekkgmail.com> Closes apache#33375 from MaxGekk/fix-parquet-ts-filter-pushdown-3.1. Authored-by: Max Gekk <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent a27f021 commit 2bbd9bf

File tree

5 files changed

+135
-86
lines changed

5 files changed

+135
-86
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,11 +271,21 @@ class ParquetFileFormat
271271
S3FileUtils.tryOpenClose(sharedConf, filePath)
272272
lazy val footerFileMetaData =
273273
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
274+
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
275+
footerFileMetaData.getKeyValueMetaData.get,
276+
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
274277
// Try to push down filters when filter push-down is enabled.
275278
val pushed = if (enableParquetFilterPushDown) {
276279
val parquetSchema = footerFileMetaData.getSchema
277-
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
278-
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
280+
val parquetFilters = new ParquetFilters(
281+
parquetSchema,
282+
pushDownDate,
283+
pushDownTimestamp,
284+
pushDownDecimal,
285+
pushDownStringStartWith,
286+
pushDownInFilterThreshold,
287+
isCaseSensitive,
288+
datetimeRebaseMode)
279289
filters
280290
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
281291
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
@@ -301,9 +311,6 @@ class ParquetFileFormat
301311
None
302312
}
303313

304-
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
305-
footerFileMetaData.getKeyValueMetaData.get,
306-
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
307314
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
308315
footerFileMetaData.getKeyValueMetaData.get,
309316
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
3434
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
3535

3636
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
37+
import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseGregorianToJulianMicros}
38+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
3739
import org.apache.spark.sql.sources
3840
import org.apache.spark.unsafe.types.UTF8String
3941

@@ -47,7 +49,8 @@ class ParquetFilters(
4749
pushDownDecimal: Boolean,
4850
pushDownStartWith: Boolean,
4951
pushDownInFilterThreshold: Int,
50-
caseSensitive: Boolean) {
52+
caseSensitive: Boolean,
53+
datetimeRebaseMode: LegacyBehaviorPolicy.Value) {
5154
// A map which contains parquet field name and data type, if predicate push down applies.
5255
//
5356
// Each key in `nameToParquetField` represents a column; `dots` are used as separators for
@@ -123,14 +126,26 @@ class ParquetFilters(
123126
private val ParquetTimestampMicrosType = ParquetSchemaType(TIMESTAMP_MICROS, INT64, 0, null)
124127
private val ParquetTimestampMillisType = ParquetSchemaType(TIMESTAMP_MILLIS, INT64, 0, null)
125128

126-
private def dateToDays(date: Any): Int = date match {
127-
case d: Date => DateTimeUtils.fromJavaDate(d)
128-
case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
129+
private def dateToDays(date: Any): Int = {
130+
val gregorianDays = date match {
131+
case d: Date => DateTimeUtils.fromJavaDate(d)
132+
case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
133+
}
134+
datetimeRebaseMode match {
135+
case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianDays(gregorianDays)
136+
case _ => gregorianDays
137+
}
129138
}
130139

131-
private def timestampToMicros(v: Any): JLong = v match {
132-
case i: Instant => DateTimeUtils.instantToMicros(i)
133-
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
140+
private def timestampToMicros(v: Any): JLong = {
141+
val gregorianMicros = v match {
142+
case i: Instant => DateTimeUtils.instantToMicros(i)
143+
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
144+
}
145+
datetimeRebaseMode match {
146+
case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianMicros(gregorianMicros)
147+
case _ => gregorianMicros
148+
}
134149
}
135150

136151
private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue()

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,21 @@ case class ParquetPartitionReaderFactory(
135135

136136
lazy val footerFileMetaData =
137137
ParquetFileReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData
138+
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
139+
footerFileMetaData.getKeyValueMetaData.get,
140+
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
138141
// Try to push down filters when filter push-down is enabled.
139142
val pushed = if (enableParquetFilterPushDown) {
140143
val parquetSchema = footerFileMetaData.getSchema
141-
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
142-
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
144+
val parquetFilters = new ParquetFilters(
145+
parquetSchema,
146+
pushDownDate,
147+
pushDownTimestamp,
148+
pushDownDecimal,
149+
pushDownStringStartWith,
150+
pushDownInFilterThreshold,
151+
isCaseSensitive,
152+
datetimeRebaseMode)
143153
filters
144154
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
145155
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
@@ -172,9 +182,6 @@ case class ParquetPartitionReaderFactory(
172182
if (pushed.isDefined) {
173183
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
174184
}
175-
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
176-
footerFileMetaData.getKeyValueMetaData.get,
177-
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
178185
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
179186
footerFileMetaData.getKeyValueMetaData.get,
180187
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
2424
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
2525
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, SparkToParquetSchemaConverter}
2626
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
27+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
2728
import org.apache.spark.sql.sources.Filter
2829
import org.apache.spark.sql.types.StructType
2930
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -51,8 +52,17 @@ case class ParquetScanBuilder(
5152
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
5253
val parquetSchema =
5354
new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(schema)
54-
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
55-
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
55+
val parquetFilters = new ParquetFilters(
56+
parquetSchema,
57+
pushDownDate,
58+
pushDownTimestamp,
59+
pushDownDecimal,
60+
pushDownStringStartWith,
61+
pushDownInFilterThreshold,
62+
isCaseSensitive,
63+
// The rebase mode doesn't matter here because the filters are used to determine
64+
// whether they is convertible.
65+
LegacyBehaviorPolicy.CORRECTED)
5666
parquetFilters.convertibleFilters(this.filters).toArray
5767
}
5868

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 77 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
4242
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
4343
import org.apache.spark.sql.functions._
4444
import org.apache.spark.sql.internal.SQLConf
45-
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
45+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
46+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, LEGACY}
47+
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.{INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS}
4648
import org.apache.spark.sql.test.SharedSparkSession
4749
import org.apache.spark.sql.types._
4850
import org.apache.spark.tags.ExtendedSQLTest
@@ -70,11 +72,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
7072

7173
protected def createParquetFilters(
7274
schema: MessageType,
73-
caseSensitive: Option[Boolean] = None): ParquetFilters =
75+
caseSensitive: Option[Boolean] = None,
76+
datetimeRebaseMode: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED
77+
): ParquetFilters =
7478
new ParquetFilters(schema, conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp,
7579
conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith,
7680
conf.parquetFilterPushDownInFilterThreshold,
77-
caseSensitive.getOrElse(conf.caseSensitiveAnalysis))
81+
caseSensitive.getOrElse(conf.caseSensitiveAnalysis),
82+
datetimeRebaseMode)
7883

7984
override def beforeEach(): Unit = {
8085
super.beforeEach()
@@ -521,98 +526,103 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
521526
def date: Date = Date.valueOf(s)
522527
}
523528

524-
val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")
529+
val data = Seq("1000-01-01", "2018-03-19", "2018-03-20", "2018-03-21")
525530
import testImplicits._
526531

527532
Seq(false, true).foreach { java8Api =>
528-
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
529-
val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF()
530-
withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) =>
531-
implicit val df: DataFrame = inputDF
533+
Seq(CORRECTED, LEGACY).foreach { rebaseMode =>
534+
withSQLConf(
535+
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
536+
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString) {
537+
val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF()
538+
withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) =>
539+
implicit val df: DataFrame = inputDF
540+
541+
def resultFun(dateStr: String): Any = {
542+
val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr)
543+
fun(parsed)
544+
}
532545

533-
def resultFun(dateStr: String): Any = {
534-
val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr)
535-
fun(parsed)
546+
val dateAttr: Expression = df(colName).expr
547+
assert(df(colName).expr.dataType === DateType)
548+
549+
checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
550+
checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
551+
data.map(i => Row.apply(resultFun(i))))
552+
553+
checkFilterPredicate(dateAttr === "1000-01-01".date, classOf[Eq[_]],
554+
resultFun("1000-01-01"))
555+
checkFilterPredicate(dateAttr <=> "1000-01-01".date, classOf[Eq[_]],
556+
resultFun("1000-01-01"))
557+
checkFilterPredicate(dateAttr =!= "1000-01-01".date, classOf[NotEq[_]],
558+
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i))))
559+
560+
checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
561+
resultFun("1000-01-01"))
562+
checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
563+
resultFun("2018-03-21"))
564+
checkFilterPredicate(dateAttr <= "1000-01-01".date, classOf[LtEq[_]],
565+
resultFun("1000-01-01"))
566+
checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
567+
resultFun("2018-03-21"))
568+
569+
checkFilterPredicate(Literal("1000-01-01".date) === dateAttr, classOf[Eq[_]],
570+
resultFun("1000-01-01"))
571+
checkFilterPredicate(Literal("1000-01-01".date) <=> dateAttr, classOf[Eq[_]],
572+
resultFun("1000-01-01"))
573+
checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]],
574+
resultFun("1000-01-01"))
575+
checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]],
576+
resultFun("2018-03-21"))
577+
checkFilterPredicate(Literal("1000-01-01".date) >= dateAttr, classOf[LtEq[_]],
578+
resultFun("1000-01-01"))
579+
checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]],
580+
resultFun("2018-03-21"))
581+
582+
checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]],
583+
resultFun("2018-03-21"))
584+
checkFilterPredicate(
585+
dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
586+
classOf[Operators.Or],
587+
Seq(Row(resultFun("1000-01-01")), Row(resultFun("2018-03-21"))))
536588
}
537-
538-
val dateAttr: Expression = df(colName).expr
539-
assert(df(colName).expr.dataType === DateType)
540-
541-
checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
542-
checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
543-
data.map(i => Row.apply(resultFun(i))))
544-
545-
checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]],
546-
resultFun("2018-03-18"))
547-
checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]],
548-
resultFun("2018-03-18"))
549-
checkFilterPredicate(dateAttr =!= "2018-03-18".date, classOf[NotEq[_]],
550-
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i))))
551-
552-
checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
553-
resultFun("2018-03-18"))
554-
checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
555-
resultFun("2018-03-21"))
556-
checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]],
557-
resultFun("2018-03-18"))
558-
checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
559-
resultFun("2018-03-21"))
560-
561-
checkFilterPredicate(Literal("2018-03-18".date) === dateAttr, classOf[Eq[_]],
562-
resultFun("2018-03-18"))
563-
checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr, classOf[Eq[_]],
564-
resultFun("2018-03-18"))
565-
checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]],
566-
resultFun("2018-03-18"))
567-
checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]],
568-
resultFun("2018-03-21"))
569-
checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr, classOf[LtEq[_]],
570-
resultFun("2018-03-18"))
571-
checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]],
572-
resultFun("2018-03-21"))
573-
574-
checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]],
575-
resultFun("2018-03-21"))
576-
checkFilterPredicate(
577-
dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
578-
classOf[Operators.Or],
579-
Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21"))))
580589
}
581590
}
582591
}
583592
}
584593

585594
test("filter pushdown - timestamp") {
586595
Seq(true, false).foreach { java8Api =>
587-
withSQLConf(
588-
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
589-
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED",
590-
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
591-
// spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
596+
Seq(CORRECTED, LEGACY).foreach { rebaseMode =>
592597
val millisData = Seq(
593598
"1000-06-14 08:28:53.123",
594599
"1582-06-15 08:28:53.001",
595600
"1900-06-16 08:28:53.0",
596601
"2018-06-17 08:28:53.999")
597-
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
598-
ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
602+
withSQLConf(
603+
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
604+
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
605+
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MILLIS.toString) {
599606
testTimestampPushdown(millisData, java8Api)
600607
}
601608

602-
// spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
603609
val microsData = Seq(
604610
"1000-06-14 08:28:53.123456",
605611
"1582-06-15 08:28:53.123456",
606612
"1900-06-16 08:28:53.123456",
607613
"2018-06-17 08:28:53.123456")
608-
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
609-
ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
614+
withSQLConf(
615+
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
616+
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
617+
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MICROS.toString) {
610618
testTimestampPushdown(microsData, java8Api)
611619
}
612620

613-
// spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown
614-
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
615-
ParquetOutputTimestampType.INT96.toString) {
621+
// INT96 doesn't support pushdown
622+
withSQLConf(
623+
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
624+
SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
625+
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> INT96.toString) {
616626
import testImplicits._
617627
withTempPath { file =>
618628
millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF

0 commit comments

Comments
 (0)