Skip to content

Commit 59db9e9

Browse files
HyukjinKwonliancheng
authored andcommitted
[SPARK-11103][SQL] Filter applied on Merged Parquet shema with new column fail
When enabling mergedSchema and predicate filter, this fails since Parquet does not accept filters pushed down when the columns of the filters do not exist in the schema. This is related with Parquet issue (https://issues.apache.org/jira/browse/PARQUET-389). For now, it just simply disables predicate push down when using merged schema in this PR. Author: hyukjinkwon <[email protected]> Closes #9327 from HyukjinKwon/SPARK-11103.
1 parent 86d6526 commit 59db9e9

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,10 @@ private[sql] class ParquetRelation(
292292
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
293293
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
294294

295+
// When merging schemas is enabled and the column of the given filter does not exist,
296+
// Parquet emits an exception which is an issue of Parquet (PARQUET-389).
297+
val safeParquetFilterPushDown = !shouldMergeSchemas && parquetFilterPushDown
298+
295299
// Parquet row group size. We will use this value as the value for
296300
// mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
297301
// of these flags are smaller than the parquet row group size.
@@ -305,7 +309,7 @@ private[sql] class ParquetRelation(
305309
dataSchema,
306310
parquetBlockSize,
307311
useMetadataCache,
308-
parquetFilterPushDown,
312+
safeParquetFilterPushDown,
309313
assumeBinaryIsString,
310314
assumeInt96IsTimestamp) _
311315

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,4 +316,24 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
316316
}
317317
}
318318
}
319+
320+
test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
321+
import testImplicits._
322+
323+
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
324+
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
325+
withTempPath { dir =>
326+
var pathOne = s"${dir.getCanonicalPath}/table1"
327+
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
328+
var pathTwo = s"${dir.getCanonicalPath}/table2"
329+
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
330+
331+
// If the "c = 1" filter gets pushed down, this query will throw an exception which
332+
// Parquet emits. This is a Parquet issue (PARQUET-389).
333+
checkAnswer(
334+
sqlContext.read.parquet(pathOne, pathTwo).filter("c = 1"),
335+
(1 to 1).map(i => Row(i, i.toString, null)))
336+
}
337+
}
338+
}
319339
}

0 commit comments

Comments
 (0)