Skip to content

Commit 990c9f7

Browse files
viiryaliancheng
authored andcommitted
[SPARK-9170] [SQL] Use OrcStructInspector to be case preserving when writing ORC files
JIRA: https://issues.apache.org/jira/browse/SPARK-9170 `StandardStructObjectInspector` will implicitly lowercase column names. But I think Orc format doesn't have such requirement. In fact, there is a `OrcStructInspector` specified for Orc format. We should use it when serialize rows to Orc file. It can be case preserving when writing ORC files. Author: Liang-Chi Hsieh <[email protected]> Closes #7520 from viirya/use_orcstruct.
1 parent 6ceed85 commit 990c9f7

File tree

2 files changed

+40
-21
lines changed

2 files changed

+40
-21
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ import com.google.common.base.Objects
2525
import org.apache.hadoop.conf.Configuration
2626
import org.apache.hadoop.fs.{FileStatus, Path}
2727
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
28-
import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit}
29-
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
30-
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
28+
import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit, OrcStruct}
29+
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
30+
import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, StructTypeInfo}
3131
import org.apache.hadoop.io.{NullWritable, Writable}
3232
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
3333
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
@@ -89,21 +89,10 @@ private[orc] class OrcOutputWriter(
8989
TypeInfoUtils.getTypeInfoFromTypeString(
9090
HiveMetastoreTypes.toMetastoreType(dataSchema))
9191

92-
TypeInfoUtils
93-
.getStandardJavaObjectInspectorFromTypeInfo(typeInfo)
94-
.asInstanceOf[StructObjectInspector]
92+
OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo])
93+
.asInstanceOf[SettableStructObjectInspector]
9594
}
9695

97-
// Used to hold temporary `Writable` fields of the next row to be written.
98-
private val reusableOutputBuffer = new Array[Any](dataSchema.length)
99-
100-
// Used to convert Catalyst values into Hadoop `Writable`s.
101-
private val wrappers = structOI.getAllStructFieldRefs.asScala
102-
.zip(dataSchema.fields.map(_.dataType))
103-
.map { case (ref, dt) =>
104-
wrapperFor(ref.getFieldObjectInspector, dt)
105-
}.toArray
106-
10796
// `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this
10897
// flag to decide whether `OrcRecordWriter.close()` needs to be called.
10998
private var recordWriterInstantiated = false
@@ -127,16 +116,32 @@ private[orc] class OrcOutputWriter(
127116

128117
override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
129118

130-
override protected[sql] def writeInternal(row: InternalRow): Unit = {
119+
private def wrapOrcStruct(
120+
struct: OrcStruct,
121+
oi: SettableStructObjectInspector,
122+
row: InternalRow): Unit = {
123+
val fieldRefs = oi.getAllStructFieldRefs
131124
var i = 0
132-
while (i < row.numFields) {
133-
reusableOutputBuffer(i) = wrappers(i)(row.get(i, dataSchema(i).dataType))
125+
while (i < fieldRefs.size) {
126+
oi.setStructFieldData(
127+
struct,
128+
fieldRefs.get(i),
129+
wrap(
130+
row.get(i, dataSchema(i).dataType),
131+
fieldRefs.get(i).getFieldObjectInspector,
132+
dataSchema(i).dataType))
134133
i += 1
135134
}
135+
}
136+
137+
val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct]
138+
139+
override protected[sql] def writeInternal(row: InternalRow): Unit = {
140+
wrapOrcStruct(cachedOrcStruct, structOI, row)
136141

137142
recordWriter.write(
138143
NullWritable.get(),
139-
serializer.serialize(reusableOutputBuffer, structOI))
144+
serializer.serialize(cachedOrcStruct, structOI))
140145
}
141146

142147
override def close(): Unit = {
@@ -259,7 +264,7 @@ private[orc] case class OrcTableScan(
259264
maybeStructOI.map { soi =>
260265
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
261266
case (attr, ordinal) =>
262-
soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal
267+
soi.getStructFieldRef(attr.name) -> ordinal
263268
}.unzip
264269
val unwrappers = fieldRefs.map(unwrapperFor)
265270
// Map each tuple to a row object

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,20 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
287287
}
288288
}
289289

290+
test("SPARK-9170: Don't implicitly lowercase of user-provided columns") {
291+
withTempPath { dir =>
292+
val path = dir.getCanonicalPath
293+
294+
sqlContext.range(0, 10).select('id as "Acol").write.format("orc").save(path)
295+
sqlContext.read.format("orc").load(path).schema("Acol")
296+
intercept[IllegalArgumentException] {
297+
sqlContext.read.format("orc").load(path).schema("acol")
298+
}
299+
checkAnswer(sqlContext.read.format("orc").load(path).select("acol").sort("acol"),
300+
(0 until 10).map(Row(_)))
301+
}
302+
}
303+
290304
test("SPARK-8501: Avoids discovery schema from empty ORC files") {
291305
withTempPath { dir =>
292306
val path = dir.getCanonicalPath

0 commit comments

Comments
 (0)