Skip to content

Commit 6be8b93

Browse files
SaurabhChawla100cloud-fan
authored andcommitted
[SPARK-32234][SQL] Spark sql commands are failing on selecting the orc tables
### What changes were proposed in this pull request? Spark sql commands are failing on selecting the orc tables Steps to reproduce Example 1 - Prerequisite - This is the location(/Users/test/tpcds_scale5data/date_dim) for orc data which is generated by the hive. ``` val table = """CREATE TABLE `date_dim` ( `d_date_sk` INT, `d_date_id` STRING, `d_date` TIMESTAMP, `d_month_seq` INT, `d_week_seq` INT, `d_quarter_seq` INT, `d_year` INT, `d_dow` INT, `d_moy` INT, `d_dom` INT, `d_qoy` INT, `d_fy_year` INT, `d_fy_quarter_seq` INT, `d_fy_week_seq` INT, `d_day_name` STRING, `d_quarter_name` STRING, `d_holiday` STRING, `d_weekend` STRING, `d_following_holiday` STRING, `d_first_dom` INT, `d_last_dom` INT, `d_same_day_ly` INT, `d_same_day_lq` INT, `d_current_day` STRING, `d_current_week` STRING, `d_current_month` STRING, `d_current_quarter` STRING, `d_current_year` STRING) USING orc LOCATION '/Users/test/tpcds_scale5data/date_dim'""" spark.sql(table).collect val u = """select date_dim.d_date_id from date_dim limit 5""" spark.sql(u).collect ``` Example 2 ``` val table = """CREATE TABLE `test_orc_data` ( `_col1` INT, `_col2` STRING, `_col3` INT) USING orc""" spark.sql(table).collect spark.sql("insert into test_orc_data values(13, '155', 2020)").collect val df = """select _col2 from test_orc_data limit 5""" spark.sql(df).collect ``` Its Failing with below error ``` org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, 192.168.0.103, executor driver): java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.initBatch(OrcColumnarBatchReader.java:156) at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$7(OrcFileFormat.scala:258) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:141) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:203) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116) at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:620) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:343) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:895) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:895) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:372) at org.apache.spark.rdd.RDD.iterator(RDD.scala:336) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:133) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:445) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1489) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:448) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)` ``` The reason behind this initBatch is not getting the schema that is needed to find out the column value in OrcFileFormat.scala ``` batchReader.initBatch( TypeDescription.fromString(resultSchemaString) ``` ### Why are the changes needed? Spark sql queries for orc tables are failing ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test is added for this .Also Tested through spark shell and spark submit the failing queries Closes #29045 from SaurabhChawla100/SPARK-32234. Lead-authored-by: SaurabhChawla <[email protected]> Co-authored-by: SaurabhChawla <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent bdeb626 commit 6be8b93

File tree

4 files changed

+78
-22
lines changed

4 files changed

+78
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,6 @@ class OrcFileFormat
164164
val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
165165
val capacity = sqlConf.orcVectorizedReaderBatchSize
166166

167-
val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema)
168-
OrcConf.MAPRED_INPUT_SCHEMA.setString(hadoopConf, resultSchemaString)
169167
OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis)
170168

171169
val broadcastedConf =
@@ -179,16 +177,18 @@ class OrcFileFormat
179177

180178
val fs = filePath.getFileSystem(conf)
181179
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
182-
val requestedColIdsOrEmptyFile =
180+
val resultedColPruneInfo =
183181
Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
184182
OrcUtils.requestedColumnIds(
185183
isCaseSensitive, dataSchema, requiredSchema, reader, conf)
186184
}
187185

188-
if (requestedColIdsOrEmptyFile.isEmpty) {
186+
if (resultedColPruneInfo.isEmpty) {
189187
Iterator.empty
190188
} else {
191-
val requestedColIds = requestedColIdsOrEmptyFile.get
189+
val (requestedColIds, canPruneCols) = resultedColPruneInfo.get
190+
val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols,
191+
dataSchema, resultSchema, partitionSchema, conf)
192192
assert(requestedColIds.length == requiredSchema.length,
193193
"[BUG] requested column IDs do not match required schema")
194194
val taskConf = new Configuration(conf)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
2424

2525
import org.apache.hadoop.conf.Configuration
2626
import org.apache.hadoop.fs.{FileStatus, Path}
27-
import org.apache.orc.{OrcFile, Reader, TypeDescription, Writer}
27+
import org.apache.orc.{OrcConf, OrcFile, Reader, TypeDescription, Writer}
2828

2929
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
3030
import org.apache.spark.deploy.SparkHadoopUtil
@@ -116,15 +116,17 @@ object OrcUtils extends Logging {
116116
}
117117

118118
/**
119-
* Returns the requested column ids from the given ORC file. Column id can be -1, which means the
120-
* requested column doesn't exist in the ORC file. Returns None if the given ORC file is empty.
119+
* @return Returns the combination of requested column ids from the given ORC file and
120+
* boolean flag to find if the pruneCols is allowed or not. Requested Column id can be
121+
* -1, which means the requested column doesn't exist in the ORC file. Returns None
122+
* if the given ORC file is empty.
121123
*/
122124
def requestedColumnIds(
123125
isCaseSensitive: Boolean,
124126
dataSchema: StructType,
125127
requiredSchema: StructType,
126128
reader: Reader,
127-
conf: Configuration): Option[Array[Int]] = {
129+
conf: Configuration): Option[(Array[Int], Boolean)] = {
128130
val orcFieldNames = reader.getSchema.getFieldNames.asScala
129131
if (orcFieldNames.isEmpty) {
130132
// SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer.
@@ -136,14 +138,18 @@ object OrcUtils extends Logging {
136138
assert(orcFieldNames.length <= dataSchema.length, "The given data schema " +
137139
s"${dataSchema.catalogString} has less fields than the actual ORC physical schema, " +
138140
"no idea which columns were dropped, fail to read.")
141+
// for ORC file written by Hive, no field names
142+
// in the physical schema, there is a need to send the
143+
// entire dataSchema instead of required schema.
144+
// So pruneCols is not done in this case
139145
Some(requiredSchema.fieldNames.map { name =>
140146
val index = dataSchema.fieldIndex(name)
141147
if (index < orcFieldNames.length) {
142148
index
143149
} else {
144150
-1
145151
}
146-
})
152+
}, false)
147153
} else {
148154
if (isCaseSensitive) {
149155
Some(requiredSchema.fieldNames.zipWithIndex.map { case (name, idx) =>
@@ -152,7 +158,7 @@ object OrcUtils extends Logging {
152158
} else {
153159
-1
154160
}
155-
})
161+
}, true)
156162
} else {
157163
// Do case-insensitive resolution only if in case-insensitive mode
158164
val caseInsensitiveOrcFieldMap = orcFieldNames.groupBy(_.toLowerCase(Locale.ROOT))
@@ -170,7 +176,7 @@ object OrcUtils extends Logging {
170176
idx
171177
}
172178
}.getOrElse(-1)
173-
})
179+
}, true)
174180
}
175181
}
176182
}
@@ -199,4 +205,25 @@ object OrcUtils extends Logging {
199205
s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>"
200206
case _ => dt.catalogString
201207
}
208+
209+
/**
210+
* @return Returns the result schema string based on the canPruneCols flag.
211+
* resultSchemaString will be created using resultsSchema in case of
212+
* canPruneCols is true and for canPruneCols as false value
213+
* resultSchemaString will be created using the actual dataSchema.
214+
*/
215+
def orcResultSchemaString(
216+
canPruneCols: Boolean,
217+
dataSchema: StructType,
218+
resultSchema: StructType,
219+
partitionSchema: StructType,
220+
conf: Configuration): String = {
221+
val resultSchemaString = if (canPruneCols) {
222+
OrcUtils.orcTypeDescriptionString(resultSchema)
223+
} else {
224+
OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields))
225+
}
226+
OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
227+
resultSchemaString
228+
}
202229
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,24 +66,24 @@ case class OrcPartitionReaderFactory(
6666
override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = {
6767
val conf = broadcastedConf.value.value
6868

69-
val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema)
70-
OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
7169
OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive)
7270

7371
val filePath = new Path(new URI(file.filePath))
7472

7573
val fs = filePath.getFileSystem(conf)
7674
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
77-
val requestedColIdsOrEmptyFile =
75+
val resultedColPruneInfo =
7876
Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
7977
OrcUtils.requestedColumnIds(
8078
isCaseSensitive, dataSchema, readDataSchema, reader, conf)
8179
}
8280

83-
if (requestedColIdsOrEmptyFile.isEmpty) {
81+
if (resultedColPruneInfo.isEmpty) {
8482
new EmptyPartitionReader[InternalRow]
8583
} else {
86-
val requestedColIds = requestedColIdsOrEmptyFile.get
84+
val (requestedColIds, canPruneCols) = resultedColPruneInfo.get
85+
val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols,
86+
dataSchema, resultSchema, partitionSchema, conf)
8787
assert(requestedColIds.length == readDataSchema.length,
8888
"[BUG] requested column IDs do not match required schema")
8989

@@ -112,24 +112,25 @@ case class OrcPartitionReaderFactory(
112112
override def buildColumnarReader(file: PartitionedFile): PartitionReader[ColumnarBatch] = {
113113
val conf = broadcastedConf.value.value
114114

115-
val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema)
116-
OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
117115
OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive)
118116

119117
val filePath = new Path(new URI(file.filePath))
120118

121119
val fs = filePath.getFileSystem(conf)
122120
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
123-
val requestedColIdsOrEmptyFile =
121+
val resultedColPruneInfo =
124122
Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
125123
OrcUtils.requestedColumnIds(
126124
isCaseSensitive, dataSchema, readDataSchema, reader, conf)
127125
}
128126

129-
if (requestedColIdsOrEmptyFile.isEmpty) {
127+
if (resultedColPruneInfo.isEmpty) {
130128
new EmptyPartitionReader
131129
} else {
132-
val requestedColIds = requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1)
130+
val (requestedDataColIds, canPruneCols) = resultedColPruneInfo.get
131+
val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols,
132+
dataSchema, resultSchema, partitionSchema, conf)
133+
val requestedColIds = requestedDataColIds ++ Array.fill(partitionSchema.length)(-1)
133134
assert(requestedColIds.length == resultSchema.length,
134135
"[BUG] requested column IDs do not match required schema")
135136
val taskConf = new Configuration(conf)

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,4 +288,32 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton {
288288
}
289289
}
290290
}
291+
292+
test("SPARK-32234 read ORC table with column names all starting with '_col'") {
293+
Seq("native", "hive").foreach { orcImpl =>
294+
Seq("false", "true").foreach { vectorized =>
295+
withSQLConf(
296+
SQLConf.ORC_IMPLEMENTATION.key -> orcImpl,
297+
SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized) {
298+
withTable("test_hive_orc_impl") {
299+
spark.sql(
300+
s"""
301+
| CREATE TABLE test_hive_orc_impl
302+
| (_col1 INT, _col2 STRING, _col3 INT)
303+
| STORED AS ORC
304+
""".stripMargin)
305+
spark.sql(
306+
s"""
307+
| INSERT INTO
308+
| test_hive_orc_impl
309+
| VALUES(9, '12', 2020)
310+
""".stripMargin)
311+
312+
val df = spark.sql("SELECT _col2 FROM test_hive_orc_impl")
313+
checkAnswer(df, Row("12"))
314+
}
315+
}
316+
}
317+
}
318+
}
291319
}

0 commit comments

Comments
 (0)