diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 3475c2c9db080..50e5964e37590 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -30,10 +30,12 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.scalatest.{BeforeAndAfterAll, FunSuite} -import org.apache.spark.Logging +import org.apache.spark.{SparkException, Logging} import org.apache.spark.sql.catalyst.util.getTempFilePath class CliSuite extends FunSuite with BeforeAndAfterAll with Logging { + val verbose = Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).isDefined + def runCliWithin( timeout: FiniteDuration, extraArgs: Seq[String] = Seq.empty)( @@ -61,6 +63,9 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging { val buffer = new ArrayBuffer[String]() def captureOutput(source: String)(line: String) { + if (verbose) { + logInfo(s"$source> $line") + } buffer += s"$source> $line" if (line.contains(expectedAnswers(next.get()))) { if (next.incrementAndGet() == expectedAnswers.size) { @@ -75,7 +80,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging { Future { val exitValue = process.exitValue() - logInfo(s"Spark SQL CLI process exit value: $exitValue") + foundAllExpectedAnswers.tryFailure( + new SparkException(s"Spark SQL CLI process exit value: $exitValue")) } try { @@ -118,7 +124,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging { -> "Time taken: ", "SELECT COUNT(*) FROM hive_test;" -> "5", - "DROP TABLE hive_test" + "DROP TABLE hive_test;" -> "Time taken: " ) } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index 38977ff162097..3644dadbb4f26 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.jdbc.HiveDriver import org.scalatest.FunSuite -import org.apache.spark.Logging +import org.apache.spark.{SparkException, Logging} import org.apache.spark.sql.catalyst.util.getTempFilePath /** @@ -41,22 +41,22 @@ import org.apache.spark.sql.catalyst.util.getTempFilePath class HiveThriftServer2Suite extends FunSuite with Logging { Class.forName(classOf[HiveDriver].getCanonicalName) - private val listeningHost = "localhost" - private val listeningPort = { - // Let the system to choose a random available port to avoid collision with other parallel - // builds. - val socket = new ServerSocket(0) - val port = socket.getLocalPort - socket.close() - port - } - - private val warehousePath = getTempFilePath("warehouse") - private val metastorePath = getTempFilePath("metastore") - private val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true" + val verbose = Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).isDefined def startThriftServerWithin(timeout: FiniteDuration = 30.seconds)(f: Statement => Unit) { val serverScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator) + val warehousePath = getTempFilePath("warehouse") + val metastorePath = getTempFilePath("metastore") + val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true" + val listeningHost = "localhost" + val listeningPort = { + // Let the system to choose a random available port to avoid collision with other parallel + // builds. + val socket = new ServerSocket(0) + val port = socket.getLocalPort + socket.close() + port + } val command = s"""$serverScript @@ -68,13 +68,16 @@ class HiveThriftServer2Suite extends FunSuite with Logging { | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$listeningPort """.stripMargin.split("\\s+").toSeq - val serverStarted = Promise[Unit]() + val serverRunning = Promise[Unit]() val buffer = new ArrayBuffer[String]() def captureOutput(source: String)(line: String) { + if (verbose) { + logInfo(s"$source> $line") + } buffer += s"$source> $line" if (line.contains("ThriftBinaryCLIService listening on")) { - serverStarted.success(()) + serverRunning.success(()) } } @@ -83,14 +86,16 @@ class HiveThriftServer2Suite extends FunSuite with Logging { Future { val exitValue = process.exitValue() - logInfo(s"Spark SQL Thrift server process exit value: $exitValue") + // Stop waiting for server start in case the Thrift server exits prematurely. + serverRunning.tryFailure( + new SparkException(s"Spark SQL Thrift server process exit value: $exitValue")) } val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/" val user = System.getProperty("user.name") try { - Await.result(serverStarted.future, timeout) + Await.result(serverRunning.future, timeout) val connection = DriverManager.getConnection(jdbcUri, user, "") val statement = connection.createStatement() @@ -122,6 +127,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { |End HiveThriftServer2Suite failure output |========================================= """.stripMargin, cause) + throw cause } finally { warehousePath.delete() metastorePath.delete()