From d11ce09d07f36f39c4a8a92c4f0ff0f19a35a724 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 10 Oct 2017 19:12:31 -0700 Subject: [PATCH 1/4] [SPARK-18355][SQL] Use Spark schema to read ORC table instead of ORC file schema --- .../spark/sql/hive/orc/OrcFileFormat.scala | 30 ++++++---- .../sql/hive/execution/SQLQuerySuite.scala | 58 ++++++++++++++++++- 2 files changed, 77 insertions(+), 11 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index c76f0ebb36a60..00eeb29732483 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -138,8 +138,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable if (maybePhysicalSchema.isEmpty) { Iterator.empty } else { - val physicalSchema = maybePhysicalSchema.get - OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) + OrcRelation.setRequiredColumns(conf, dataSchema, requiredSchema) val orcRecordReader = { val job = Job.getInstance(conf) @@ -163,6 +162,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable // Unwraps `OrcStruct`s to `UnsafeRow`s OrcRelation.unwrapOrcStructs( conf, + dataSchema, requiredSchema, Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]), recordsIterator) @@ -272,25 +272,35 @@ private[orc] object OrcRelation extends HiveInspectors { def unwrapOrcStructs( conf: Configuration, dataSchema: StructType, + requiredSchema: StructType, maybeStructOI: Option[StructObjectInspector], iterator: Iterator[Writable]): Iterator[InternalRow] = { val deserializer = new OrcSerde - val mutableRow = new SpecificInternalRow(dataSchema.map(_.dataType)) - val unsafeProjection = UnsafeProjection.create(dataSchema) + val mutableRow = new SpecificInternalRow(requiredSchema.map(_.dataType)) + val unsafeProjection = UnsafeProjection.create(requiredSchema) def unwrap(oi: StructObjectInspector): Iterator[InternalRow] = { - val (fieldRefs, fieldOrdinals) = dataSchema.zipWithIndex.map { - case (field, ordinal) => oi.getStructFieldRef(field.name) -> ordinal + val (fieldRefs, fieldOrdinals) = requiredSchema.zipWithIndex.map { + case (field, ordinal) => + var ref = oi.getStructFieldRef(field.name) + if (ref == null) { + val maybeIndex = dataSchema.getFieldIndex(field.name) + if (maybeIndex.isDefined) { + ref = oi.getStructFieldRef("_col" + maybeIndex.get) + } + } + ref -> ordinal }.unzip - val unwrappers = fieldRefs.map(unwrapperFor) + val unwrappers = fieldRefs.map(r => if (r == null) null else unwrapperFor(r)) iterator.map { value => val raw = deserializer.deserialize(value) var i = 0 val length = fieldRefs.length while (i < length) { - val fieldValue = oi.getStructFieldData(raw, fieldRefs(i)) + val fieldRef = fieldRefs(i) + val fieldValue = if (fieldRef == null) null else oi.getStructFieldData(raw, fieldRefs(i)) if (fieldValue == null) { mutableRow.setNullAt(fieldOrdinals(i)) } else { @@ -306,8 +316,8 @@ private[orc] object OrcRelation extends HiveInspectors { } def setRequiredColumns( - conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { - val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) + conf: Configuration, dataSchema: StructType, requestedSchema: StructType): Unit = { + val ids = requestedSchema.map(a => dataSchema.fieldIndex(a.name): Integer) val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 09c59000b3e3f..f4e59c53fb9dc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -2050,4 +2050,60 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + test("SPARK-18355 Use Spark schema to read ORC table instead of ORC file schema") { + val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + + Seq("true", "false").foreach { value => + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> value) { + withTempDatabase { db => + client.runSqlHive( + s""" + |CREATE TABLE $db.t( + | click_id string, + | search_id string, + | uid bigint) + |PARTITIONED BY ( + | ts string, + | hour string) + |STORED AS ORC + """.stripMargin) + + client.runSqlHive( + s""" + |INSERT INTO TABLE $db.t + |PARTITION (ts = '98765', hour = '01') + |VALUES (12, 2, 12345) + """.stripMargin + ) + + checkAnswer( + sql(s"SELECT * FROM $db.t"), + Row("12", "2", 12345, "98765", "01")) + + client.runSqlHive(s"ALTER TABLE $db.t ADD COLUMNS (dummy string)") + + checkAnswer( + sql(s"SELECT click_id, search_id FROM $db.t"), + Row("12", "2")) + + checkAnswer( + sql(s"SELECT search_id, click_id FROM $db.t"), + Row("2", "12")) + + checkAnswer( + sql(s"SELECT search_id FROM $db.t"), + Row("2")) + + checkAnswer( + sql(s"SELECT dummy, click_id FROM $db.t"), + Row(null, "12")) + + checkAnswer( + sql(s"SELECT * FROM $db.t"), + Row("12", "2", 12345, null, "98765", "01")) + } + } + } + } } From 8ac1acfad74eba27a7123e04ca14682bff59a20b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 12 Oct 2017 12:26:03 -0700 Subject: [PATCH 2/4] Add SPARK-22267 test case to prevent regression. --- .../sql/hive/execution/SQLQuerySuite.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index f4e59c53fb9dc..54e6664e25973 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2106,4 +2106,24 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + // This test case is added to prevent regression. + test("SPARK-22267 Spark SQL incorrectly reads ORC files when column order is different") { + withTempDir { dir => + val path = dir.getCanonicalPath + + Seq(1 -> 2).toDF("c1", "c2").write.format("orc").mode("overwrite").save(path) + checkAnswer(spark.read.orc(path), Row(1, 2)) + + Seq("true", "false").foreach { value => + withTable("t") { + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> value) { + sql(s"CREATE EXTERNAL TABLE t(c2 INT, c1 INT) STORED AS ORC LOCATION '$path'") + // The correct answer is Row(2, 1). SPARK-22267 should fix this later. + checkAnswer(spark.table("t"), if (value == "true") Row(2, 1) else Row(1, 2)) + } + } + } + } + } } From ef2123ecc516fce6feb2a4abe051b4ef862c51a0 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 12 Oct 2017 21:24:06 -0700 Subject: [PATCH 3/4] Address comments. --- .../spark/sql/hive/orc/OrcFileFormat.scala | 11 ++++----- .../sql/hive/execution/SQLQuerySuite.scala | 24 ++----------------- 2 files changed, 6 insertions(+), 29 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 00eeb29732483..194e69c93e1a8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -134,8 +134,8 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file // using the given physical schema. Instead, we simply return an empty iterator. - val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)) - if (maybePhysicalSchema.isEmpty) { + val isEmptyFile = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)).isEmpty + if (isEmptyFile) { Iterator.empty } else { OrcRelation.setRequiredColumns(conf, dataSchema, requiredSchema) @@ -284,10 +284,7 @@ private[orc] object OrcRelation extends HiveInspectors { case (field, ordinal) => var ref = oi.getStructFieldRef(field.name) if (ref == null) { - val maybeIndex = dataSchema.getFieldIndex(field.name) - if (maybeIndex.isDefined) { - ref = oi.getStructFieldRef("_col" + maybeIndex.get) - } + ref = oi.getStructFieldRef("_col" + dataSchema.fieldIndex(field.name)) } ref -> ordinal }.unzip @@ -300,7 +297,7 @@ private[orc] object OrcRelation extends HiveInspectors { val length = fieldRefs.length while (i < length) { val fieldRef = fieldRefs(i) - val fieldValue = if (fieldRef == null) null else oi.getStructFieldData(raw, fieldRefs(i)) + val fieldValue = if (fieldRef == null) null else oi.getStructFieldData(raw, fieldRef) if (fieldValue == null) { mutableRow.setNullAt(fieldOrdinals(i)) } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 54e6664e25973..5caf31df527a0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2078,7 +2078,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { ) checkAnswer( - sql(s"SELECT * FROM $db.t"), + sql(s"SELECT click_id, search_id, uid, ts, hour FROM $db.t"), Row("12", "2", 12345, "98765", "01")) client.runSqlHive(s"ALTER TABLE $db.t ADD COLUMNS (dummy string)") @@ -2100,30 +2100,10 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { Row(null, "12")) checkAnswer( - sql(s"SELECT * FROM $db.t"), + sql(s"SELECT click_id, search_id, uid, dummy, ts, hour FROM $db.t"), Row("12", "2", 12345, null, "98765", "01")) } } } } - - // This test case is added to prevent regression. - test("SPARK-22267 Spark SQL incorrectly reads ORC files when column order is different") { - withTempDir { dir => - val path = dir.getCanonicalPath - - Seq(1 -> 2).toDF("c1", "c2").write.format("orc").mode("overwrite").save(path) - checkAnswer(spark.read.orc(path), Row(1, 2)) - - Seq("true", "false").foreach { value => - withTable("t") { - withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> value) { - sql(s"CREATE EXTERNAL TABLE t(c2 INT, c1 INT) STORED AS ORC LOCATION '$path'") - // The correct answer is Row(2, 1). SPARK-22267 should fix this later. - checkAnswer(spark.table("t"), if (value == "true") Row(2, 1) else Row(1, 2)) - } - } - } - } - } } From 8e7fe9b9e3fc6121686caad45dcfdb4ff08f0c4a Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 12 Oct 2017 22:06:58 -0700 Subject: [PATCH 4/4] Add parquet, too. --- .../sql/hive/execution/SQLQuerySuite.scala | 106 +++++++++--------- 1 file changed, 55 insertions(+), 51 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 5caf31df527a0..94fa43dec7313 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2051,57 +2051,61 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("SPARK-18355 Use Spark schema to read ORC table instead of ORC file schema") { - val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client - - Seq("true", "false").foreach { value => - withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> value) { - withTempDatabase { db => - client.runSqlHive( - s""" - |CREATE TABLE $db.t( - | click_id string, - | search_id string, - | uid bigint) - |PARTITIONED BY ( - | ts string, - | hour string) - |STORED AS ORC - """.stripMargin) - - client.runSqlHive( - s""" - |INSERT INTO TABLE $db.t - |PARTITION (ts = '98765', hour = '01') - |VALUES (12, 2, 12345) - """.stripMargin - ) - - checkAnswer( - sql(s"SELECT click_id, search_id, uid, ts, hour FROM $db.t"), - Row("12", "2", 12345, "98765", "01")) - - client.runSqlHive(s"ALTER TABLE $db.t ADD COLUMNS (dummy string)") - - checkAnswer( - sql(s"SELECT click_id, search_id FROM $db.t"), - Row("12", "2")) - - checkAnswer( - sql(s"SELECT search_id, click_id FROM $db.t"), - Row("2", "12")) - - checkAnswer( - sql(s"SELECT search_id FROM $db.t"), - Row("2")) - - checkAnswer( - sql(s"SELECT dummy, click_id FROM $db.t"), - Row(null, "12")) - - checkAnswer( - sql(s"SELECT click_id, search_id, uid, dummy, ts, hour FROM $db.t"), - Row("12", "2", 12345, null, "98765", "01")) + Seq("orc", "parquet").foreach { format => + test(s"SPARK-18355 Read data from a hive table with a new column - $format") { + val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + + Seq("true", "false").foreach { value => + withSQLConf( + HiveUtils.CONVERT_METASTORE_ORC.key -> value, + HiveUtils.CONVERT_METASTORE_PARQUET.key -> value) { + withTempDatabase { db => + client.runSqlHive( + s""" + |CREATE TABLE $db.t( + | click_id string, + | search_id string, + | uid bigint) + |PARTITIONED BY ( + | ts string, + | hour string) + |STORED AS $format + """.stripMargin) + + client.runSqlHive( + s""" + |INSERT INTO TABLE $db.t + |PARTITION (ts = '98765', hour = '01') + |VALUES (12, 2, 12345) + """.stripMargin + ) + + checkAnswer( + sql(s"SELECT click_id, search_id, uid, ts, hour FROM $db.t"), + Row("12", "2", 12345, "98765", "01")) + + client.runSqlHive(s"ALTER TABLE $db.t ADD COLUMNS (dummy string)") + + checkAnswer( + sql(s"SELECT click_id, search_id FROM $db.t"), + Row("12", "2")) + + checkAnswer( + sql(s"SELECT search_id, click_id FROM $db.t"), + Row("2", "12")) + + checkAnswer( + sql(s"SELECT search_id FROM $db.t"), + Row("2")) + + checkAnswer( + sql(s"SELECT dummy, click_id FROM $db.t"), + Row(null, "12")) + + checkAnswer( + sql(s"SELECT click_id, search_id, uid, dummy, ts, hour FROM $db.t"), + Row("12", "2", 12345, null, "98765", "01")) + } } } }