Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,8 @@ private[spark] object SQLConf {
doc = "Enables using the custom ParquetUnsafeRowRecordReader.")

val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
defaultValue = Some(false),
doc = "When true, enable filter pushdown for ORC files.")
defaultValue = Some(true),
doc = "When false, disables filter pushdown for ORC files.")

val HIVE_VERIFY_PARTITION_PATH = booleanConf("spark.sql.hive.verifyPartitionPath",
defaultValue = Some(false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ private[sql] class OrcRelation(
extends HadoopFsRelation(maybePartitionSpec, parameters)
with Logging {

/* org.apache.orc contains OrcConf which can be used later */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's unclear what this message actually means - is this a todo for the future?

private[sql] val ORC_BLOOM_FILTER_COLUMNS = "orc.bloom.filter.columns"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is really confusing to have ORC_BLOOM_FILTER_COLUMNS and orcBloomFilterCols in the same class.

I'd just inline the string in orcBloomFilterCols. Same for orcBloomFilterFpp.

private[sql] val ORC_BLOOM_FILTER_FPP = "orc.bloom.filter.fpp"

private[sql] def this(
paths: Array[String],
maybeDataSchema: Option[StructType],
Expand All @@ -177,6 +181,9 @@ private[sql] class OrcRelation(
parameters)(sqlContext)
}

private val orcBloomFilterCols = parameters.get(ORC_BLOOM_FILTER_COLUMNS)
private val orcBloomFilterFpp = parameters.get(ORC_BLOOM_FILTER_FPP)

override val dataSchema: StructType = maybeDataSchema.getOrElse {
OrcFileOperator.readSchema(
paths.head, Some(sqlContext.sparkContext.hadoopConfiguration))
Expand Down Expand Up @@ -211,6 +218,9 @@ private[sql] class OrcRelation(
}

override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = {
val conf = job.getConfiguration
orcBloomFilterCols.map(conf.set(ORC_BLOOM_FILTER_COLUMNS, _))
orcBloomFilterFpp.map(conf.set(ORC_BLOOM_FILTER_FPP, _))
job.getConfiguration match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.sql.hive.orc

import java.io.File
import java.io.{PrintStream, ByteArrayOutputStream, File}

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc.CompressionKind
import org.apache.hadoop.hive.ql.io.orc.{FileDump, CompressionKind}
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql._
Expand Down Expand Up @@ -394,4 +394,42 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}
}

test("SPARK-12417. Orc bloom filter options are not propagated during file " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the title to

"SPARK-12417: Propagate Bloom filters when writing out ORC files"

"generation") {
withTempPath { dir =>
// Create separate sub dir for bloom filter testing
val path = new File(dir, "orc").getCanonicalPath

// Write some data
val data = (0 until 10).map { i =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need data to be in this form? can't it just be a list of numbers from 0 to 10?

val maybeInt = if (i % 2 == 0) None else Some(i)
val nullValue: Option[String] = None
(maybeInt, nullValue)
}

// Dump data to orc
createDataFrame(data).toDF("a", "b")
.write.option("orc.bloom.filter.columns", "*").orc(path)

// Verify if orc bloom filters are present. This can be verified via
// ORC RecordReaderImpl when it is made public. Until then, verify by
// dumping file statistics and checking whether bloom filter was added.
new File(path).listFiles().filter(_.getName.endsWith("orc")) map { file =>
withTempStream { buf =>
val fileDumpArgs = Array(file.getCanonicalPath)
FileDump.main(fileDumpArgs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems really hacky and is depending on some non-public API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you just read the files directly?

assert(buf.toString.contains("BLOOM_FILTER"))
}
}
}
}

def withTempStream(f: ByteArrayOutputStream => Unit): Unit = {
val oriStream = System.out
val buf = new ByteArrayOutputStream()
val stream = new PrintStream(buf)
System.setOut(stream)
try f(buf) finally System.setOut(oriStream)
}
}