diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index a649daf5a1c1..66fad85ea026 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -176,6 +176,10 @@
org.apache.thrift
libfb303
+
+ org.apache.derby
+ derby
+
org.scala-lang
scala-compiler
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
deleted file mode 100644
index 3bd3d0d6db35..000000000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.net.URI
-
-import org.apache.hadoop.fs.Path
-import org.scalatest.BeforeAndAfterEach
-
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.Utils
-
-
-class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
- with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
-
- val tempDir = Utils.createTempDir().getCanonicalFile
- val tempDirUri = tempDir.toURI
- val tempDirStr = tempDir.getAbsolutePath
-
- override def beforeEach(): Unit = {
- sql("CREATE DATABASE test_db")
- for ((tbl, _) <- rawTablesAndExpectations) {
- hiveClient.createTable(tbl, ignoreIfExists = false)
- }
- }
-
- override def afterEach(): Unit = {
- Utils.deleteRecursively(tempDir)
- hiveClient.dropDatabase("test_db", ignoreIfNotExists = false, cascade = true)
- }
-
- private def getTableMetadata(tableName: String): CatalogTable = {
- spark.sharedState.externalCatalog.getTable("test_db", tableName)
- }
-
- private def defaultTableURI(tableName: String): URI = {
- spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName, Some("test_db")))
- }
-
- // Raw table metadata that are dumped from tables created by Spark 2.0. Note that, all spark
- // versions prior to 2.1 would generate almost same raw table metadata for a specific table.
- val simpleSchema = new StructType().add("i", "int")
- val partitionedSchema = new StructType().add("i", "int").add("j", "int")
-
- lazy val hiveTable = CatalogTable(
- identifier = TableIdentifier("tbl1", Some("test_db")),
- tableType = CatalogTableType.MANAGED,
- storage = CatalogStorageFormat.empty.copy(
- inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
- outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
- schema = simpleSchema)
-
- lazy val externalHiveTable = CatalogTable(
- identifier = TableIdentifier("tbl2", Some("test_db")),
- tableType = CatalogTableType.EXTERNAL,
- storage = CatalogStorageFormat.empty.copy(
- locationUri = Some(tempDirUri),
- inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
- outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
- schema = simpleSchema)
-
- lazy val partitionedHiveTable = CatalogTable(
- identifier = TableIdentifier("tbl3", Some("test_db")),
- tableType = CatalogTableType.MANAGED,
- storage = CatalogStorageFormat.empty.copy(
- inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
- outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
- schema = partitionedSchema,
- partitionColumnNames = Seq("j"))
-
-
- val simpleSchemaJson =
- """
- |{
- | "type": "struct",
- | "fields": [{
- | "name": "i",
- | "type": "integer",
- | "nullable": true,
- | "metadata": {}
- | }]
- |}
- """.stripMargin
-
- val partitionedSchemaJson =
- """
- |{
- | "type": "struct",
- | "fields": [{
- | "name": "i",
- | "type": "integer",
- | "nullable": true,
- | "metadata": {}
- | },
- | {
- | "name": "j",
- | "type": "integer",
- | "nullable": true,
- | "metadata": {}
- | }]
- |}
- """.stripMargin
-
- lazy val dataSourceTable = CatalogTable(
- identifier = TableIdentifier("tbl4", Some("test_db")),
- tableType = CatalogTableType.MANAGED,
- storage = CatalogStorageFormat.empty.copy(
- properties = Map("path" -> defaultTableURI("tbl4").toString)),
- schema = new StructType(),
- provider = Some("json"),
- properties = Map(
- "spark.sql.sources.provider" -> "json",
- "spark.sql.sources.schema.numParts" -> "1",
- "spark.sql.sources.schema.part.0" -> simpleSchemaJson))
-
- lazy val hiveCompatibleDataSourceTable = CatalogTable(
- identifier = TableIdentifier("tbl5", Some("test_db")),
- tableType = CatalogTableType.MANAGED,
- storage = CatalogStorageFormat.empty.copy(
- properties = Map("path" -> defaultTableURI("tbl5").toString)),
- schema = simpleSchema,
- provider = Some("parquet"),
- properties = Map(
- "spark.sql.sources.provider" -> "parquet",
- "spark.sql.sources.schema.numParts" -> "1",
- "spark.sql.sources.schema.part.0" -> simpleSchemaJson))
-
- lazy val partitionedDataSourceTable = CatalogTable(
- identifier = TableIdentifier("tbl6", Some("test_db")),
- tableType = CatalogTableType.MANAGED,
- storage = CatalogStorageFormat.empty.copy(
- properties = Map("path" -> defaultTableURI("tbl6").toString)),
- schema = new StructType(),
- provider = Some("json"),
- properties = Map(
- "spark.sql.sources.provider" -> "json",
- "spark.sql.sources.schema.numParts" -> "1",
- "spark.sql.sources.schema.part.0" -> partitionedSchemaJson,
- "spark.sql.sources.schema.numPartCols" -> "1",
- "spark.sql.sources.schema.partCol.0" -> "j"))
-
- lazy val externalDataSourceTable = CatalogTable(
- identifier = TableIdentifier("tbl7", Some("test_db")),
- tableType = CatalogTableType.EXTERNAL,
- storage = CatalogStorageFormat.empty.copy(
- locationUri = Some(new URI(defaultTableURI("tbl7") + "-__PLACEHOLDER__")),
- properties = Map("path" -> tempDirStr)),
- schema = new StructType(),
- provider = Some("json"),
- properties = Map(
- "spark.sql.sources.provider" -> "json",
- "spark.sql.sources.schema.numParts" -> "1",
- "spark.sql.sources.schema.part.0" -> simpleSchemaJson))
-
- lazy val hiveCompatibleExternalDataSourceTable = CatalogTable(
- identifier = TableIdentifier("tbl8", Some("test_db")),
- tableType = CatalogTableType.EXTERNAL,
- storage = CatalogStorageFormat.empty.copy(
- locationUri = Some(tempDirUri),
- properties = Map("path" -> tempDirStr)),
- schema = simpleSchema,
- properties = Map(
- "spark.sql.sources.provider" -> "parquet",
- "spark.sql.sources.schema.numParts" -> "1",
- "spark.sql.sources.schema.part.0" -> simpleSchemaJson))
-
- lazy val dataSourceTableWithoutSchema = CatalogTable(
- identifier = TableIdentifier("tbl9", Some("test_db")),
- tableType = CatalogTableType.EXTERNAL,
- storage = CatalogStorageFormat.empty.copy(
- locationUri = Some(new URI(defaultTableURI("tbl9") + "-__PLACEHOLDER__")),
- properties = Map("path" -> tempDirStr)),
- schema = new StructType(),
- provider = Some("json"),
- properties = Map("spark.sql.sources.provider" -> "json"))
-
- // A list of all raw tables we want to test, with their expected schema.
- lazy val rawTablesAndExpectations = Seq(
- hiveTable -> simpleSchema,
- externalHiveTable -> simpleSchema,
- partitionedHiveTable -> partitionedSchema,
- dataSourceTable -> simpleSchema,
- hiveCompatibleDataSourceTable -> simpleSchema,
- partitionedDataSourceTable -> partitionedSchema,
- externalDataSourceTable -> simpleSchema,
- hiveCompatibleExternalDataSourceTable -> simpleSchema,
- dataSourceTableWithoutSchema -> new StructType())
-
- test("make sure we can read table created by old version of Spark") {
- for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
- val readBack = getTableMetadata(tbl.identifier.table)
- assert(readBack.schema.sameType(expectedSchema))
-
- if (tbl.tableType == CatalogTableType.EXTERNAL) {
- // trim the URI prefix
- val tableLocation = readBack.storage.locationUri.get.getPath
- val expectedLocation = tempDir.toURI.getPath.stripSuffix("/")
- assert(tableLocation == expectedLocation)
- }
- }
- }
-
- test("make sure we can alter table location created by old version of Spark") {
- withTempDir { dir =>
- for ((tbl, _) <- rawTablesAndExpectations if tbl.tableType == CatalogTableType.EXTERNAL) {
- val path = dir.toURI.toString.stripSuffix("/")
- sql(s"ALTER TABLE ${tbl.identifier} SET LOCATION '$path'")
-
- val readBack = getTableMetadata(tbl.identifier.table)
-
- // trim the URI prefix
- val actualTableLocation = readBack.storage.locationUri.get.getPath
- val expected = dir.toURI.getPath.stripSuffix("/")
- assert(actualTableLocation == expected)
- }
- }
- }
-
- test("make sure we can rename table created by old version of Spark") {
- for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
- val newName = tbl.identifier.table + "_renamed"
- sql(s"ALTER TABLE ${tbl.identifier} RENAME TO $newName")
-
- val readBack = getTableMetadata(newName)
- assert(readBack.schema.sameType(expectedSchema))
-
- // trim the URI prefix
- val actualTableLocation = readBack.storage.locationUri.get.getPath
- val expectedLocation = if (tbl.tableType == CatalogTableType.EXTERNAL) {
- tempDir.toURI.getPath.stripSuffix("/")
- } else {
- // trim the URI prefix
- defaultTableURI(newName).getPath
- }
- assert(actualTableLocation == expectedLocation)
- }
- }
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
new file mode 100644
index 000000000000..2928a734a7e3
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.nio.file.Files
+
+import org.apache.spark.TestUtils
+import org.apache.spark.sql.{QueryTest, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.util.Utils
+
+/**
+ * Test HiveExternalCatalog backward compatibility.
+ *
+ * Note that, this test suite will automatically download spark binary packages of different
+ * versions to a local directory `/tmp/spark-test`. If there is already a spark folder with
+ * expected version under this local directory, e.g. `/tmp/spark-test/spark-2.0.3`, we will skip the
+ * downloading for this spark version.
+ */
+class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
+ private val wareHousePath = Utils.createTempDir(namePrefix = "warehouse")
+ private val tmpDataDir = Utils.createTempDir(namePrefix = "test-data")
+ private val sparkTestingDir = "/tmp/spark-test"
+ private val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+
+ override def afterAll(): Unit = {
+ Utils.deleteRecursively(wareHousePath)
+ Utils.deleteRecursively(tmpDataDir)
+ super.afterAll()
+ }
+
+ private def downloadSpark(version: String): Unit = {
+ import scala.sys.process._
+
+ val url = s"https://d3kbcqa49mib13.cloudfront.net/spark-$version-bin-hadoop2.7.tgz"
+
+ Seq("wget", url, "-q", "-P", sparkTestingDir).!
+
+ val downloaded = new File(sparkTestingDir, s"spark-$version-bin-hadoop2.7.tgz").getCanonicalPath
+ val targetDir = new File(sparkTestingDir, s"spark-$version").getCanonicalPath
+
+ Seq("mkdir", targetDir).!
+
+ Seq("tar", "-xzf", downloaded, "-C", targetDir, "--strip-components=1").!
+
+ Seq("rm", downloaded).!
+ }
+
+ private def genDataDir(name: String): String = {
+ new File(tmpDataDir, name).getCanonicalPath
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ val tempPyFile = File.createTempFile("test", ".py")
+ Files.write(tempPyFile.toPath,
+ s"""
+ |from pyspark.sql import SparkSession
+ |
+ |spark = SparkSession.builder.enableHiveSupport().getOrCreate()
+ |version_index = spark.conf.get("spark.sql.test.version.index", None)
+ |
+ |spark.sql("create table data_source_tbl_{} using json as select 1 i".format(version_index))
+ |
+ |spark.sql("create table hive_compatible_data_source_tbl_" + version_index + \\
+ | " using parquet as select 1 i")
+ |
+ |json_file = "${genDataDir("json_")}" + str(version_index)
+ |spark.range(1, 2).selectExpr("cast(id as int) as i").write.json(json_file)
+ |spark.sql("create table external_data_source_tbl_" + version_index + \\
+ | "(i int) using json options (path '{}')".format(json_file))
+ |
+ |parquet_file = "${genDataDir("parquet_")}" + str(version_index)
+ |spark.range(1, 2).selectExpr("cast(id as int) as i").write.parquet(parquet_file)
+ |spark.sql("create table hive_compatible_external_data_source_tbl_" + version_index + \\
+ | "(i int) using parquet options (path '{}')".format(parquet_file))
+ |
+ |json_file2 = "${genDataDir("json2_")}" + str(version_index)
+ |spark.range(1, 2).selectExpr("cast(id as int) as i").write.json(json_file2)
+ |spark.sql("create table external_table_without_schema_" + version_index + \\
+ | " using json options (path '{}')".format(json_file2))
+ |
+ |spark.sql("create view v_{} as select 1 i".format(version_index))
+ """.stripMargin.getBytes("utf8"))
+
+ PROCESS_TABLES.testingVersions.zipWithIndex.foreach { case (version, index) =>
+ val sparkHome = new File(sparkTestingDir, s"spark-$version")
+ if (!sparkHome.exists()) {
+ downloadSpark(version)
+ }
+
+ val args = Seq(
+ "--name", "prepare testing tables",
+ "--master", "local[2]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}",
+ "--conf", s"spark.sql.test.version.index=$index",
+ "--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}",
+ tempPyFile.getCanonicalPath)
+ runSparkSubmit(args, Some(sparkHome.getCanonicalPath))
+ }
+
+ tempPyFile.delete()
+ }
+
+ test("backward compatibility") {
+ val args = Seq(
+ "--class", PROCESS_TABLES.getClass.getName.stripSuffix("$"),
+ "--name", "HiveExternalCatalog backward compatibility test",
+ "--master", "local[2]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}",
+ "--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}",
+ unusedJar.toString)
+ runSparkSubmit(args)
+ }
+}
+
+object PROCESS_TABLES extends QueryTest with SQLTestUtils {
+ // Tests the latest version of every release line.
+ val testingVersions = Seq("2.0.2", "2.1.1", "2.2.0")
+
+ protected var spark: SparkSession = _
+
+ def main(args: Array[String]): Unit = {
+ val session = SparkSession.builder()
+ .enableHiveSupport()
+ .getOrCreate()
+ spark = session
+
+ testingVersions.indices.foreach { index =>
+ Seq(
+ s"data_source_tbl_$index",
+ s"hive_compatible_data_source_tbl_$index",
+ s"external_data_source_tbl_$index",
+ s"hive_compatible_external_data_source_tbl_$index",
+ s"external_table_without_schema_$index").foreach { tbl =>
+ val tableMeta = spark.sharedState.externalCatalog.getTable("default", tbl)
+
+ // make sure we can insert and query these tables.
+ session.sql(s"insert into $tbl select 2")
+ checkAnswer(session.sql(s"select * from $tbl"), Row(1) :: Row(2) :: Nil)
+ checkAnswer(session.sql(s"select i from $tbl where i > 1"), Row(2))
+
+ // make sure we can rename table.
+ val newName = tbl + "_renamed"
+ sql(s"ALTER TABLE $tbl RENAME TO $newName")
+ val readBack = spark.sharedState.externalCatalog.getTable("default", newName)
+
+ val actualTableLocation = readBack.storage.locationUri.get.getPath
+ val expectedLocation = if (tableMeta.tableType == CatalogTableType.EXTERNAL) {
+ tableMeta.storage.locationUri.get.getPath
+ } else {
+ spark.sessionState.catalog.defaultTablePath(TableIdentifier(newName, None)).getPath
+ }
+ assert(actualTableLocation == expectedLocation)
+
+ // make sure we can alter table location.
+ withTempDir { dir =>
+ val path = dir.toURI.toString.stripSuffix("/")
+ sql(s"ALTER TABLE ${tbl}_renamed SET LOCATION '$path'")
+ val readBack = spark.sharedState.externalCatalog.getTable("default", tbl + "_renamed")
+ val actualTableLocation = readBack.storage.locationUri.get.getPath
+ val expected = dir.toURI.getPath.stripSuffix("/")
+ assert(actualTableLocation == expected)
+ }
+ }
+
+ // test permanent view
+ checkAnswer(sql(s"select i from v_$index"), Row(1))
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index be6aa6d8dc3c..21b3e281490c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -18,17 +18,11 @@
package org.apache.spark.sql.hive
import java.io.{BufferedWriter, File, FileWriter}
-import java.sql.Timestamp
-import java.util.Date
-import scala.collection.mutable.ArrayBuffer
import scala.tools.nsc.Properties
import org.apache.hadoop.fs.Path
import org.scalatest.{BeforeAndAfterEach, Matchers}
-import org.scalatest.concurrent.TimeLimits
-import org.scalatest.exceptions.TestFailedDueToTimeoutException
-import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.internal.Logging
@@ -38,7 +32,6 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
-import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.spark.util.{ResetSystemProperties, Utils}
@@ -46,11 +39,10 @@ import org.apache.spark.util.{ResetSystemProperties, Utils}
* This suite tests spark-submit with applications using HiveContext.
*/
class HiveSparkSubmitSuite
- extends SparkFunSuite
+ extends SparkSubmitTestUtils
with Matchers
with BeforeAndAfterEach
- with ResetSystemProperties
- with TimeLimits {
+ with ResetSystemProperties {
// TODO: rewrite these or mark them as slow tests to be run sparingly
@@ -333,71 +325,6 @@ class HiveSparkSubmitSuite
unusedJar.toString)
runSparkSubmit(argsForShowTables)
}
-
- // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
- // This is copied from org.apache.spark.deploy.SparkSubmitSuite
- private def runSparkSubmit(args: Seq[String]): Unit = {
- val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
- val history = ArrayBuffer.empty[String]
- val sparkSubmit = if (Utils.isWindows) {
- // On Windows, `ProcessBuilder.directory` does not change the current working directory.
- new File("..\\..\\bin\\spark-submit.cmd").getAbsolutePath
- } else {
- "./bin/spark-submit"
- }
- val commands = Seq(sparkSubmit) ++ args
- val commandLine = commands.mkString("'", "' '", "'")
-
- val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome))
- val env = builder.environment()
- env.put("SPARK_TESTING", "1")
- env.put("SPARK_HOME", sparkHome)
-
- def captureOutput(source: String)(line: String): Unit = {
- // This test suite has some weird behaviors when executed on Jenkins:
- //
- // 1. Sometimes it gets extremely slow out of unknown reason on Jenkins. Here we add a
- // timestamp to provide more diagnosis information.
- // 2. Log lines are not correctly redirected to unit-tests.log as expected, so here we print
- // them out for debugging purposes.
- val logLine = s"${new Timestamp(new Date().getTime)} - $source> $line"
- // scalastyle:off println
- println(logLine)
- // scalastyle:on println
- history += logLine
- }
-
- val process = builder.start()
- new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start()
- new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start()
-
- try {
- val exitCode = failAfter(300.seconds) { process.waitFor() }
- if (exitCode != 0) {
- // include logs in output. Note that logging is async and may not have completed
- // at the time this exception is raised
- Thread.sleep(1000)
- val historyLog = history.mkString("\n")
- fail {
- s"""spark-submit returned with exit code $exitCode.
- |Command line: $commandLine
- |
- |$historyLog
- """.stripMargin
- }
- }
- } catch {
- case to: TestFailedDueToTimeoutException =>
- val historyLog = history.mkString("\n")
- fail(s"Timeout of $commandLine" +
- s" See the log4j logs for more detail." +
- s"\n$historyLog", to)
- case t: Throwable => throw t
- } finally {
- // Ensure we still kill the process in case it timed out
- process.destroy()
- }
- }
}
object SetMetastoreURLTest extends Logging {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 83cee5d1b8a4..29b0e6c8533e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1354,31 +1354,4 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sparkSession.sparkContext.conf.set(DEBUG_MODE, previousValue)
}
}
-
- test("SPARK-18464: support old table which doesn't store schema in table properties") {
- withTable("old") {
- withTempPath { path =>
- Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
- val tableDesc = CatalogTable(
- identifier = TableIdentifier("old", Some("default")),
- tableType = CatalogTableType.EXTERNAL,
- storage = CatalogStorageFormat.empty.copy(
- properties = Map("path" -> path.getAbsolutePath)
- ),
- schema = new StructType(),
- provider = Some("parquet"),
- properties = Map(
- HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet"))
- hiveClient.createTable(tableDesc, ignoreIfExists = false)
-
- checkAnswer(spark.table("old"), Row(1, "a"))
- checkAnswer(sql("select * from old"), Row(1, "a"))
-
- val expectedSchema = StructType(Seq(
- StructField("i", IntegerType, nullable = true),
- StructField("j", StringType, nullable = true)))
- assert(table("old").schema === expectedSchema)
- }
- }
- }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala
new file mode 100644
index 000000000000..ede44df4afe1
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.sql.Timestamp
+import java.util.Date
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.concurrent.TimeLimits
+import org.scalatest.exceptions.TestFailedDueToTimeoutException
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
+import org.apache.spark.util.Utils
+
+trait SparkSubmitTestUtils extends SparkFunSuite with TimeLimits {
+
+ // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
+ // This is copied from org.apache.spark.deploy.SparkSubmitSuite
+ protected def runSparkSubmit(args: Seq[String], sparkHomeOpt: Option[String] = None): Unit = {
+ val sparkHome = sparkHomeOpt.getOrElse(
+ sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")))
+ val history = ArrayBuffer.empty[String]
+ val sparkSubmit = if (Utils.isWindows) {
+ // On Windows, `ProcessBuilder.directory` does not change the current working directory.
+ new File("..\\..\\bin\\spark-submit.cmd").getAbsolutePath
+ } else {
+ "./bin/spark-submit"
+ }
+ val commands = Seq(sparkSubmit) ++ args
+ val commandLine = commands.mkString("'", "' '", "'")
+
+ val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome))
+ val env = builder.environment()
+ env.put("SPARK_TESTING", "1")
+ env.put("SPARK_HOME", sparkHome)
+
+ def captureOutput(source: String)(line: String): Unit = {
+ // This test suite has some weird behaviors when executed on Jenkins:
+ //
+ // 1. Sometimes it gets extremely slow out of unknown reason on Jenkins. Here we add a
+ // timestamp to provide more diagnosis information.
+ // 2. Log lines are not correctly redirected to unit-tests.log as expected, so here we print
+ // them out for debugging purposes.
+ val logLine = s"${new Timestamp(new Date().getTime)} - $source> $line"
+ // scalastyle:off println
+ println(logLine)
+ // scalastyle:on println
+ history += logLine
+ }
+
+ val process = builder.start()
+ new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start()
+ new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start()
+
+ try {
+ val exitCode = failAfter(300.seconds) { process.waitFor() }
+ if (exitCode != 0) {
+ // include logs in output. Note that logging is async and may not have completed
+ // at the time this exception is raised
+ Thread.sleep(1000)
+ val historyLog = history.mkString("\n")
+ fail {
+ s"""spark-submit returned with exit code $exitCode.
+ |Command line: $commandLine
+ |
+ |$historyLog
+ """.stripMargin
+ }
+ }
+ } catch {
+ case to: TestFailedDueToTimeoutException =>
+ val historyLog = history.mkString("\n")
+ fail(s"Timeout of $commandLine" +
+ s" See the log4j logs for more detail." +
+ s"\n$historyLog", to)
+ case t: Throwable => throw t
+ } finally {
+ // Ensure we still kill the process in case it timed out
+ process.destroy()
+ }
+ }
+}