From 8615764056bc9039933ca97d85564cf60097fb5a Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Tue, 19 Jan 2016 09:46:28 +0530 Subject: [PATCH] SPARK-12417. [SQL] Orc bloom filter options are not propagated during file --- .../scala/org/apache/spark/sql/SQLConf.scala | 4 +- .../spark/sql/hive/orc/OrcRelation.scala | 10 +++++ .../spark/sql/hive/orc/OrcQuerySuite.scala | 42 ++++++++++++++++++- 3 files changed, 52 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 2d664d3ee691..3b2cdcb5972c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -345,8 +345,8 @@ private[spark] object SQLConf { doc = "Enables using the custom ParquetUnsafeRowRecordReader.") val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown", - defaultValue = Some(false), - doc = "When true, enable filter pushdown for ORC files.") + defaultValue = Some(true), + doc = "When false, disables filter pushdown for ORC files.") val HIVE_VERIFY_PARTITION_PATH = booleanConf("spark.sql.hive.verifyPartitionPath", defaultValue = Some(false), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 40409169b095..cf7a852c757a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -162,6 +162,10 @@ private[sql] class OrcRelation( extends HadoopFsRelation(maybePartitionSpec, parameters) with Logging { + /* org.apache.orc contains OrcConf which can be used later */ + private[sql] val ORC_BLOOM_FILTER_COLUMNS = "orc.bloom.filter.columns" + private[sql] val ORC_BLOOM_FILTER_FPP = "orc.bloom.filter.fpp" + private[sql] def this( paths: Array[String], maybeDataSchema: Option[StructType], @@ -177,6 +181,9 @@ private[sql] class OrcRelation( parameters)(sqlContext) } + private val orcBloomFilterCols = parameters.get(ORC_BLOOM_FILTER_COLUMNS) + private val orcBloomFilterFpp = parameters.get(ORC_BLOOM_FILTER_FPP) + override val dataSchema: StructType = maybeDataSchema.getOrElse { OrcFileOperator.readSchema( paths.head, Some(sqlContext.sparkContext.hadoopConfiguration)) @@ -211,6 +218,9 @@ private[sql] class OrcRelation( } override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = { + val conf = job.getConfiguration + orcBloomFilterCols.map(conf.set(ORC_BLOOM_FILTER_COLUMNS, _)) + orcBloomFilterFpp.map(conf.set(ORC_BLOOM_FILTER_FPP, _)) job.getConfiguration match { case conf: JobConf => conf.setOutputFormat(classOf[OrcOutputFormat]) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 2156806d21f9..34ed07e5eeb2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.hive.orc -import java.io.File +import java.io.{PrintStream, ByteArrayOutputStream, File} import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.ql.io.orc.CompressionKind +import org.apache.hadoop.hive.ql.io.orc.{FileDump, CompressionKind} import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ @@ -394,4 +394,42 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } } + + test("SPARK-12417. Orc bloom filter options are not propagated during file " + + "generation") { + withTempPath { dir => + // Create separate sub dir for bloom filter testing + val path = new File(dir, "orc").getCanonicalPath + + // Write some data + val data = (0 until 10).map { i => + val maybeInt = if (i % 2 == 0) None else Some(i) + val nullValue: Option[String] = None + (maybeInt, nullValue) + } + + // Dump data to orc + createDataFrame(data).toDF("a", "b") + .write.option("orc.bloom.filter.columns", "*").orc(path) + + // Verify if orc bloom filters are present. This can be verified via + // ORC RecordReaderImpl when it is made public. Until then, verify by + // dumping file statistics and checking whether bloom filter was added. + new File(path).listFiles().filter(_.getName.endsWith("orc")) map { file => + withTempStream { buf => + val fileDumpArgs = Array(file.getCanonicalPath) + FileDump.main(fileDumpArgs) + assert(buf.toString.contains("BLOOM_FILTER")) + } + } + } + } + + def withTempStream(f: ByteArrayOutputStream => Unit): Unit = { + val oriStream = System.out + val buf = new ByteArrayOutputStream() + val stream = new PrintStream(buf) + System.setOut(stream) + try f(buf) finally System.setOut(oriStream) + } }