Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,70 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector

import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveMetastoreTypes
import org.apache.spark.sql.types.StructType

private[orc] object OrcFileOperator extends Logging {
def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
/**
* Retrieves a ORC file reader from a given path. The path can point to either a directory or a
* single ORC file. If it points to an directory, it picks any non-empty ORC file within that
* directory.
*
* The reader returned by this method is mainly used for two purposes:
*
* 1. Retrieving file metadata (schema and compression codecs, etc.)
* 2. Read the actual file content (in this case, the given path should point to the target file)
*
* @note As recorded by SPARK-8501, ORC writes an empty schema (<code>struct&lt;&gt;</code) to an
* ORC file if the file contains zero rows. This is OK for Hive since the schema of the
* table is managed by metastore. But this becomes a problem when reading ORC files
* directly from HDFS via Spark SQL, because we have to discover the schema from raw ORC
* files. So this method always tries to find a ORC file whose schema is non-empty, and
* create the result reader from that file. If no such file is found, it returns `None`.
*
* @todo Needs to consider all files when schema evolution is taken into account.
*/
def getFileReader(basePath: String, config: Option[Configuration] = None): Option[Reader] = {
def isWithNonEmptySchema(path: Path, reader: Reader): Boolean = {
reader.getObjectInspector match {
case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() == 0 =>
logInfo(
s"ORC file $path has empty schema, it probably contains no rows. " +
"Trying to read another ORC file to figure out the schema.")
false
case _ => true
}
}

val conf = config.getOrElse(new Configuration)
val fspath = new Path(pathStr)
val fs = fspath.getFileSystem(conf)
val orcFiles = listOrcFiles(pathStr, conf)
logDebug(s"Creating ORC Reader from ${orcFiles.head}")
// TODO Need to consider all files when schema evolution is taken into account.
OrcFile.createReader(fs, orcFiles.head)
val fs = {
val hdfsPath = new Path(basePath)
hdfsPath.getFileSystem(conf)
}

listOrcFiles(basePath, conf).iterator.map { path =>
path -> OrcFile.createReader(fs, path)
}.collectFirst {
case (path, reader) if isWithNonEmptySchema(path, reader) => reader
}
}

def readSchema(path: String, conf: Option[Configuration]): StructType = {
val reader = getFileReader(path, conf)
val reader = getFileReader(path, conf).getOrElse {
throw new AnalysisException(
s"Failed to discover schema from ORC files stored in $path. " +
"Probably there are either no ORC files or only empty ORC files.")
}
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
logDebug(s"Reading schema from file $path, got Hive schema string: $schema")
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
}

def getObjectInspector(path: String, conf: Option[Configuration]): StructObjectInspector = {
getFileReader(path, conf).getObjectInspector.asInstanceOf[StructObjectInspector]
def getObjectInspector(
path: String, conf: Option[Configuration]): Option[StructObjectInspector] = {
getFileReader(path, conf).map(_.getObjectInspector.asInstanceOf[StructObjectInspector])
}

def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,26 +242,34 @@ private[orc] case class OrcTableScan(
nonPartitionKeyAttrs: Seq[(Attribute, Int)],
mutableRow: MutableRow): Iterator[InternalRow] = {
val deserializer = new OrcSerde
val soi = OrcFileOperator.getObjectInspector(path, Some(conf))
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
case (attr, ordinal) =>
soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal
}.unzip
val unwrappers = fieldRefs.map(unwrapperFor)
// Map each tuple to a row object
iterator.map { value =>
val raw = deserializer.deserialize(value)
var i = 0
while (i < fieldRefs.length) {
val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
if (fieldValue == null) {
mutableRow.setNullAt(fieldOrdinals(i))
} else {
unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf))

// SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero
// rows, and thus couldn't give a proper ObjectInspector. In this case we just return an empty
// partition since we know that this file is empty.
maybeStructOI.map { soi =>
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
case (attr, ordinal) =>
soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal
}.unzip
val unwrappers = fieldRefs.map(unwrapperFor)
// Map each tuple to a row object
iterator.map { value =>
val raw = deserializer.deserialize(value)
var i = 0
while (i < fieldRefs.length) {
val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
if (fieldValue == null) {
mutableRow.setNullAt(fieldOrdinals(i))
} else {
unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
}
i += 1
}
i += 1
mutableRow: InternalRow
}
mutableRow: InternalRow
}.getOrElse {
Iterator.empty
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc.CompressionKind
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.InternalRow
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._

Expand Down Expand Up @@ -170,7 +167,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
test("Default compression options for writing to an ORC file") {
withOrcFile((1 to 100).map(i => (i, s"val_$i"))) { file =>
assertResult(CompressionKind.ZLIB) {
OrcFileOperator.getFileReader(file).getCompression
OrcFileOperator.getFileReader(file).get.getCompression
}
}
}
Expand All @@ -183,21 +180,21 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "SNAPPY")
withOrcFile(data) { file =>
assertResult(CompressionKind.SNAPPY) {
OrcFileOperator.getFileReader(file).getCompression
OrcFileOperator.getFileReader(file).get.getCompression
}
}

conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "NONE")
withOrcFile(data) { file =>
assertResult(CompressionKind.NONE) {
OrcFileOperator.getFileReader(file).getCompression
OrcFileOperator.getFileReader(file).get.getCompression
}
}

conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "LZO")
withOrcFile(data) { file =>
assertResult(CompressionKind.LZO) {
OrcFileOperator.getFileReader(file).getCompression
OrcFileOperator.getFileReader(file).get.getCompression
}
}
}
Expand Down Expand Up @@ -289,4 +286,48 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
List(Row("same", "run_5", 100)))
}
}

test("SPARK-8501: Avoids discovery schema from empty ORC files") {
withTempPath { dir =>
val path = dir.getCanonicalPath

withTable("empty_orc") {
withTempTable("empty", "single") {
sqlContext.sql(
s"""CREATE TABLE empty_orc(key INT, value STRING)
|STORED AS ORC
|LOCATION '$path'
""".stripMargin)

val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1)
emptyDF.registerTempTable("empty")

// This creates 1 empty ORC file with Hive ORC SerDe. We are using this trick because
// Spark SQL ORC data source always avoids write empty ORC files.
sqlContext.sql(
s"""INSERT INTO TABLE empty_orc
|SELECT key, value FROM empty
""".stripMargin)

val errorMessage = intercept[AnalysisException] {
sqlContext.read.format("orc").load(path)
}.getMessage

assert(errorMessage.contains("Failed to discover schema from ORC files"))

val singleRowDF = Seq((0, "foo")).toDF("key", "value").coalesce(1)
singleRowDF.registerTempTable("single")

sqlContext.sql(
s"""INSERT INTO TABLE empty_orc
|SELECT key, value FROM single
""".stripMargin)

val df = sqlContext.read.format("orc").load(path)
assert(df.schema === singleRowDF.schema.asNullable)
checkAnswer(df, singleRowDF)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,8 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
orcTableDir.mkdir()
import org.apache.spark.sql.hive.test.TestHive.implicits._

// Originally we were using a 10-row RDD for testing. However, when default parallelism is
// greater than 10 (e.g., running on a node with 32 cores), this RDD contains empty partitions,
// which result in empty ORC files. Unfortunately, ORC doesn't handle empty files properly and
// causes build failure on Jenkins, which happens to have 32 cores. Please refer to SPARK-8501
// for more details. To workaround this issue before fixing SPARK-8501, we simply increase row
// number in this RDD to avoid empty partitions.
sparkContext
.makeRDD(1 to 100)
.makeRDD(1 to 10)
.map(i => OrcData(i, s"part-$i"))
.toDF()
.registerTempTable(s"orc_temp_table")
Expand All @@ -76,43 +70,43 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
}

test("create temporary orc table") {
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100))
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))

checkAnswer(
sql("SELECT * FROM normal_orc_source"),
(1 to 100).map(i => Row(i, s"part-$i")))
(1 to 10).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT * FROM normal_orc_source where intField > 5"),
(6 to 100).map(i => Row(i, s"part-$i")))
(6 to 10).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
(1 to 100).map(i => Row(1, s"part-$i")))
(1 to 10).map(i => Row(1, s"part-$i")))
}

test("create temporary orc table as") {
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100))
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))

checkAnswer(
sql("SELECT * FROM normal_orc_source"),
(1 to 100).map(i => Row(i, s"part-$i")))
(1 to 10).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
(6 to 100).map(i => Row(i, s"part-$i")))
(6 to 10).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
(1 to 100).map(i => Row(1, s"part-$i")))
(1 to 10).map(i => Row(1, s"part-$i")))
}

test("appending insert") {
sql("INSERT INTO TABLE normal_orc_source SELECT * FROM orc_temp_table WHERE intField > 5")

checkAnswer(
sql("SELECT * FROM normal_orc_source"),
(1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i =>
(1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
Seq.fill(2)(Row(i, s"part-$i"))
})
}
Expand All @@ -125,7 +119,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {

checkAnswer(
sql("SELECT * FROM normal_orc_as_source"),
(6 to 100).map(i => Row(i, s"part-$i")))
(6 to 10).map(i => Row(i, s"part-$i")))
}
}

Expand Down