Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ private[spark] object SQLConf {
doc = "Enables using the custom ParquetUnsafeRowRecordReader.")

val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
defaultValue = Some(false),
doc = "When true, enable filter pushdown for ORC files.")
defaultValue = Some(true),
doc = "When false, disables filter pushdown for ORC files.")

val HIVE_VERIFY_PARTITION_PATH = booleanConf("spark.sql.hive.verifyPartitionPath",
defaultValue = Some(false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ private[sql] class OrcRelation(
extends HadoopFsRelation(maybePartitionSpec, parameters)
with Logging {

/* org.apache.orc contains OrcConf which can be used later */
private[sql] val ORC_BLOOM_FILTER_COLUMNS = "orc.bloom.filter.columns"
private[sql] val ORC_BLOOM_FILTER_FPP = "orc.bloom.filter.fpp"

private[sql] def this(
paths: Array[String],
maybeDataSchema: Option[StructType],
Expand All @@ -174,6 +178,9 @@ private[sql] class OrcRelation(
parameters)(sqlContext)
}

private val orcBloomFilterCols = parameters.get(ORC_BLOOM_FILTER_COLUMNS)
private val orcBloomFilterFpp = parameters.get(ORC_BLOOM_FILTER_FPP)

override val dataSchema: StructType = maybeDataSchema.getOrElse {
OrcFileOperator.readSchema(
paths.head, Some(sqlContext.sparkContext.hadoopConfiguration))
Expand Down Expand Up @@ -208,6 +215,9 @@ private[sql] class OrcRelation(
}

override def prepareJobForWrite(job: Job): OutputWriterFactory = {
val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
orcBloomFilterCols.map(conf.set(ORC_BLOOM_FILTER_COLUMNS, _))
orcBloomFilterFpp.map(conf.set(ORC_BLOOM_FILTER_FPP, _))
SparkHadoopUtil.get.getConfigurationFromJobContext(job) match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.sql.hive.orc

import java.io.File
import java.io.{PrintStream, ByteArrayOutputStream, File}

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc.CompressionKind
import org.apache.hadoop.hive.ql.io.orc.{FileDump, CompressionKind}
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql._
Expand Down Expand Up @@ -394,4 +394,42 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}
}

test("SPARK-12417. Orc bloom filter options are not propagated during file " +
"generation") {
withTempPath { dir =>
// Create separate sub dir for bloom filter testing
val path = new File(dir, "orc").getCanonicalPath

// Write some data
val data = (0 until 10).map { i =>
val maybeInt = if (i % 2 == 0) None else Some(i)
val nullValue: Option[String] = None
(maybeInt, nullValue)
}

// Dump data to orc
createDataFrame(data).toDF("a", "b")
.write.option("orc.bloom.filter.columns", "*").orc(path)

// Verify if orc bloom filters are present. This can be verified via
// ORC RecordReaderImpl when it is made public. Until then, verify by
// dumping file statistics and checking whether bloom filter was added.
new File(path).listFiles().filter(_.getName.endsWith("orc")) map { file =>
withTempStream { buf =>
val fileDumpArgs = Array(file.getCanonicalPath)
FileDump.main(fileDumpArgs)
assert(buf.toString.contains("BLOOM_FILTER"))
}
}
}
}

def withTempStream(f: ByteArrayOutputStream => Unit): Unit = {
val oriStream = System.out
val buf = new ByteArrayOutputStream()
val stream = new PrintStream(buf)
System.setOut(stream)
try f(buf) finally System.setOut(oriStream)
}
}