Skip to content

Commit c266a8e

Browse files
committed
set hive.query.reexecution.enabled=false
1 parent 86b4a86 commit c266a8e

File tree

3 files changed

+44
-131
lines changed

3 files changed

+44
-131
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -712,55 +712,47 @@ private[hive] class HiveClientImpl(
712712
* in the sequence is one row.
713713
* Since upgrading the built-in Hive to 2.3, hive-llap-client is needed when
714714
* running MapReduce jobs with `runHive`.
715+
* Since HIVE-17626(Hive 3.0.0), need to set hive.query.reexecution.enabled=false.
715716
*/
716717
protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState {
717-
logDebug(s"Running hiveql '$cmd'")
718-
if (cmd.toLowerCase(Locale.ROOT).startsWith("set")) { logDebug(s"Changing config: $cmd") }
719-
720-
def runProcessor(proc: CommandProcessor, tokens: Array[String], cmd_1: String): Seq[String] = {
721-
if (state.out != null) {
722-
// scalastyle:off println
723-
state.out.println(tokens(0) + " " + cmd_1)
724-
// scalastyle:on println
718+
def closeDriver(driver: Driver): Unit = {
719+
// Since HIVE-18238(Hive 3.0.0), the Driver.close function's return type changed
720+
// and the CommandProcessorFactory.clean function removed.
721+
driver.getClass.getMethod("close").invoke(driver)
722+
if (version != hive.v3_1) {
723+
CommandProcessorFactory.clean(conf)
725724
}
726-
Seq(proc.run(cmd_1).getResponseCode.toString)
727725
}
728726

727+
logDebug(s"Running hiveql '$cmd'")
728+
if (cmd.toLowerCase(Locale.ROOT).startsWith("set")) { logDebug(s"Changing config: $cmd") }
729729
try {
730730
val cmd_trimmed: String = cmd.trim()
731731
val tokens: Array[String] = cmd_trimmed.split("\\s+")
732732
// The remainder of the command.
733733
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
734734
val proc = shim.getCommandProcessor(tokens(0), conf)
735735
proc match {
736-
// HIVE-18238(Hive 3.0.0) changed the close() function return type.
737-
// This change is not compatible with the built-in Hive.
738-
case driver: Driver if version != hive.v3_1 =>
736+
case driver: Driver =>
739737
val response: CommandProcessorResponse = driver.run(cmd)
740738
// Throw an exception if there is an error in query processing.
741739
if (response.getResponseCode != 0) {
742-
driver.close()
743-
CommandProcessorFactory.clean(conf)
740+
closeDriver(driver)
744741
throw new QueryExecutionException(response.getErrorMessage)
745742
}
746743
driver.setMaxRows(maxRows)
747744

748745
val results = shim.getDriverResults(driver)
749-
driver.close()
750-
CommandProcessorFactory.clean(conf)
746+
closeDriver(driver)
751747
results
752748

753-
case _: SetProcessor | _: ResetProcessor =>
754-
runProcessor(proc, tokens, cmd_1)
755-
case _: AddResourceProcessor | _: DeleteResourceProcessor | _: ListResourceProcessor =>
756-
runProcessor(proc, tokens, cmd_1)
757-
case _: CompileProcessor | _: CryptoProcessor | _: DfsProcessor | _: ReloadProcessor =>
758-
runProcessor(proc, tokens, cmd_1)
759-
760-
case unsupportedProcessor =>
761-
val className = unsupportedProcessor.getClass.getCanonicalName
762-
throw new AnalysisException(
763-
s"Dose not support Hive ${version.fullVersion} processor: $className")
749+
case _ =>
750+
if (state.out != null) {
751+
// scalastyle:off println
752+
state.out.println(tokens(0) + " " + cmd_1)
753+
// scalastyle:on println
754+
}
755+
Seq(proc.run(cmd_1).getResponseCode.toString)
764756
}
765757
} catch {
766758
case e: Exception =>
@@ -872,20 +864,31 @@ private[hive] class HiveClientImpl(
872864
}
873865

874866
def reset(): Unit = withHiveState {
875-
client.getAllTables("default").asScala.foreach { t =>
876-
logDebug(s"Deleting table $t")
877-
val table = client.getTable("default", t)
867+
val allTables = client.getAllTables("default")
868+
val (mvs, others) = allTables.asScala.map(t => client.getTable("default", t))
869+
.partition(_.getTableType.toString.equals("MATERIALIZED_VIEW"))
870+
871+
// Remove materialized view first, otherwise caused a violation of foreign key constraint.
872+
mvs.foreach { table =>
873+
val tableName = table.getTableName
874+
logDebug(s"Deleting materialized view $tableName")
875+
client.dropTable("default", table.getTableName)
876+
}
877+
878+
others.foreach { table =>
879+
val tableName = table.getTableName
880+
logDebug(s"Deleting table $tableName")
878881
try {
879-
client.getIndexes("default", t, 255).asScala.foreach { index =>
880-
shim.dropIndex(client, "default", t, index.getIndexName)
882+
client.getIndexes("default", tableName, 255).asScala.foreach { index =>
883+
shim.dropIndex(client, "default", tableName, index.getIndexName)
881884
}
882885
if (!table.isIndexTable) {
883-
client.dropTable("default", t)
886+
client.dropTable("default", tableName)
884887
}
885888
} catch {
886889
case _: NoSuchMethodError =>
887890
// HIVE-18448 Hive 3.0 remove index APIs
888-
client.dropTable("default", t)
891+
client.dropTable("default", tableName)
889892
}
890893
}
891894
client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db =>

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/Hive_3_1_ClientSuite.scala

Lines changed: 0 additions & 93 deletions
This file was deleted.

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,11 @@ class VersionsSuite extends SparkFunSuite with Logging {
123123
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
124124
hadoopConf.set("hive.metastore.schema.verification", "false")
125125
}
126-
// Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`.
127126
if (version == "3.1") {
127+
// Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`.
128128
hadoopConf.set("hive.in.test", "true")
129+
// Since HIVE-17626(Hive 3.0.0), need to set hive.query.reexecution.enabled=false.
130+
hadoopConf.set("hive.query.reexecution.enabled", "false")
129131
}
130132
client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
131133
if (versionSpark != null) versionSpark.reset()
@@ -584,10 +586,11 @@ class VersionsSuite extends SparkFunSuite with Logging {
584586

585587
test(s"$version: sql read hive materialized view") {
586588
// HIVE-14249 Since Hive 2.3.0, materialized view is supported.
587-
// But skip Hive 3.1 because of SPARK-27074.
588-
if (version == "2.3") {
589+
if (version == "2.3" || version == "3.1") {
590+
val disableRewrite = if (version == "2.3") "" else "DISABLE REWRITE"
589591
client.runSqlHive("CREATE TABLE materialized_view_tbl (c1 INT)")
590-
client.runSqlHive("CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM materialized_view_tbl")
592+
client.runSqlHive(
593+
s"CREATE MATERIALIZED VIEW mv1 $disableRewrite AS SELECT * FROM materialized_view_tbl")
591594
val e = intercept[AnalysisException](versionSpark.table("mv1").collect()).getMessage
592595
assert(e.contains("Hive materialized view is not supported"))
593596
}

0 commit comments

Comments
 (0)