Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,11 @@ private[sql] class ParquetFileFormat
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
vectorizedReader

val iter = new RecordReaderIterator(vectorizedReader)
iter.asInstanceOf[Iterator[InternalRow]]
} else {
logDebug(s"Falling back to parquet-mr")
logDebug(s"Use parquet-mr")
val reader = pushed match {
case Some(filter) =>
new ParquetRecordReader[InternalRow](
Expand All @@ -375,26 +377,20 @@ private[sql] class ParquetFileFormat
new ParquetRecordReader[InternalRow](new ParquetReadSupport)
}
reader.initialize(split, hadoopAttemptContext)
reader
}

val iter = new RecordReaderIterator(parquetReader)

// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
enableVectorizedReader) {
iter.asInstanceOf[Iterator[InternalRow]]
} else {
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)

val iter = new RecordReaderIterator(reader)
// This is a horrible erasure hack... if we type the iterator above, then it actually check
// the type in next() and we get a class cast exception. If we make that function return
// Object, then we can defer the cast until later!
iter.asInstanceOf[Iterator[InternalRow]]
.map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
.map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
}
parquetReader
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ object SQLConf {
.createWithDefault("snappy")

val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown")
.doc("Enables Parquet filter push-down optimization when set to true.")
.doc("Enables Parquet filter push-down optimization when set to true and vectorized parquet " +
"decoding is not being used.")
.booleanConf
.createWithDefault(true)

Expand All @@ -236,7 +237,8 @@ object SQLConf {

val PARQUET_VECTORIZED_READER_ENABLED =
SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader")
.doc("Enables vectorized parquet decoding.")
.doc("Enables vectorized parquet decoding when set to true and all the data types of " +
"the table schemas are atomic types.")
.booleanConf
.createWithDefault(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
val df = spark.read.parquet(path).filter("a = 2")

// The result should be single row.
// When a filter is pushed to Parquet, Parquet can apply it to every row.
// So, we can check the number of rows returned from the Parquet
// When a filter is pushed to Parquet, Parquet can apply it to every row group and
// then every row. So, we can check the number of rows returned from the Parquet
// to make sure our filter pushdown work.
assert(stripSparkFilter(df).count == 1)
}
Expand Down Expand Up @@ -524,8 +524,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
val path = s"${dir.getCanonicalPath}/table1"
(1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path)

// When a filter is pushed to Parquet, Parquet can apply it to every row.
// So, we can check the number of rows returned from the Parquet
// When a filter is pushed to Parquet, Parquet can apply it to every row group and
// then every row. So, we can check the number of rows returned from the Parquet
// to make sure our filter pushdown work.
val df = spark.read.parquet(path).where("b in (0,2)")
assert(stripSparkFilter(df).count == 3)
Expand All @@ -545,4 +545,28 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}

test("Verify SQLConf PARQUET_FILTER_PUSHDOWN_ENABLED") {
import testImplicits._

Seq("true", "false").foreach { pushDown =>
// When SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key is set to true and all the data types
// of the table schema are AtomicType, the parquet reader uses vectorizedReader.
// In this mode, filters will not be pushed down, no matter whether
// SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key is true or not.
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushDown,
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
// When a filter is pushed to Parquet, Parquet can apply it to every row group and
// then every row. So, we can check the number of rows returned from the Parquet
// to make sure our filter pushdown work.
val df = spark.read.parquet(path).where("a > 2")
val numExpectedRows = if (pushDown == "true") 1 else 3
assert(stripSparkFilter(df).count == numExpectedRows)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") {
val extraOptions = Map(
SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName,
"spark.sql.parquet.output.committer.class" ->
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName
)
withTempPath { dir =>
Expand All @@ -462,23 +462,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}

test("SPARK-6330 regression test") {
test("SPARK-6330 non-existent file or host") {
// In 1.3.0, save to fs other than file: without configuring core-site.xml would get:
// IllegalArgumentException: Wrong FS: hdfs://..., expected: file:///
intercept[Throwable] {
var e = intercept[AnalysisException] {
spark.read.parquet("file:///nonexistent")
}
val errorMessage = intercept[Throwable] {
}.getMessage
assert(e.contains("Path does not exist: file:/nonexistent"))

e = intercept[IllegalArgumentException] {
spark.read.parquet("hdfs://nonexistent")
}.toString
assert(errorMessage.contains("UnknownHostException"))
}.getMessage
assert(e.contains("java.net.UnknownHostException: nonexistent"))
}

test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
// Using a output committer that always fail when committing a task, so that both
// `commitTask()` and `abortTask()` are invoked.
val extraOptions = Map[String, String](
"spark.sql.parquet.output.committer.class" ->
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
)

Expand Down