From 7fd675783f399b30ab186d01a0629a148127d0c6 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 6 Oct 2014 22:15:21 +0800 Subject: [PATCH 1/6] Fixes test suites in hive-thriftserver --- .../sql/hive/thriftserver/CliSuite.scala | 8 +- .../thriftserver/HiveThriftServer2Suite.scala | 92 +++++++++++-------- 2 files changed, 61 insertions(+), 39 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index d68dd090b5e6c..73240997e2ad6 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.scalatest.{BeforeAndAfterAll, FunSuite} -import org.apache.spark.Logging +import org.apache.spark.{SparkException, Logging} import org.apache.spark.sql.catalyst.util.getTempFilePath class CliSuite extends FunSuite with BeforeAndAfterAll with Logging { @@ -77,7 +77,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging { Future { val exitValue = process.exitValue() - logInfo(s"Spark SQL CLI process exit value: $exitValue") + foundAllExpectedAnswers.tryFailure( + new SparkException(s"Spark SQL CLI process exit value: $exitValue")) } try { @@ -98,6 +99,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging { |End CliSuite failure output |=========================== """.stripMargin, cause) + throw cause } finally { warehousePath.delete() metastorePath.delete() @@ -120,7 +122,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging { -> "Time taken: ", "SELECT COUNT(*) FROM hive_test;" -> "5", - "DROP TABLE hive_test" + "DROP TABLE hive_test;" -> "Time taken: " ) } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index 38977ff162097..e8abfbf74e552 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -17,17 +17,17 @@ package org.apache.spark.sql.hive.thriftserver -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future, Promise} -import scala.sys.process.{Process, ProcessLogger} - import java.io.File import java.net.ServerSocket import java.sql.{DriverManager, Statement} import java.util.concurrent.TimeoutException +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ +import scala.concurrent.{Await, Promise} +import scala.sys.process.{Process, ProcessLogger} +import scala.util.Try + import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.jdbc.HiveDriver import org.scalatest.FunSuite @@ -41,25 +41,28 @@ import org.apache.spark.sql.catalyst.util.getTempFilePath class HiveThriftServer2Suite extends FunSuite with Logging { Class.forName(classOf[HiveDriver].getCanonicalName) - private val listeningHost = "localhost" - private val listeningPort = { - // Let the system to choose a random available port to avoid collision with other parallel - // builds. - val socket = new ServerSocket(0) - val port = socket.getLocalPort - socket.close() - port - } - - private val warehousePath = getTempFilePath("warehouse") - private val metastorePath = getTempFilePath("metastore") - private val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true" - - def startThriftServerWithin(timeout: FiniteDuration = 30.seconds)(f: Statement => Unit) { - val serverScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator) + val verbose = Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).isDefined + + def startThriftServerWithin(timeout: FiniteDuration = 10.seconds)(f: Statement => Unit) { + Thread.sleep(5000) + + val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator) + val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator) + val warehousePath = getTempFilePath("warehouse") + val metastorePath = getTempFilePath("metastore") + val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true" + val listeningHost = "localhost" + val listeningPort = { + // Let the system to choose a random available port to avoid collision with other parallel + // builds. + val socket = new ServerSocket(0) + val port = socket.getLocalPort + socket.close() + port + } val command = - s"""$serverScript + s"""$startScript | --master local | --hiveconf hive.root.logger=INFO,console | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri @@ -68,29 +71,41 @@ class HiveThriftServer2Suite extends FunSuite with Logging { | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$listeningPort """.stripMargin.split("\\s+").toSeq - val serverStarted = Promise[Unit]() + val serverRunning = Promise[Unit]() val buffer = new ArrayBuffer[String]() - - def captureOutput(source: String)(line: String) { - buffer += s"$source> $line" + val LOGGING_MARK = + s"starting ${HiveThriftServer2.getClass.getCanonicalName.stripSuffix("$")}, logging to " + var logTailingProcess: Process = null + var logFilePath: String = null + + def captureLogOutput(line: String): Unit = { + logInfo(s"server log | $line") + buffer += line if (line.contains("ThriftBinaryCLIService listening on")) { - serverStarted.success(()) + serverRunning.success(()) } } - val process = Process(command).run( - ProcessLogger(captureOutput("stdout"), captureOutput("stderr"))) - - Future { - val exitValue = process.exitValue() - logInfo(s"Spark SQL Thrift server process exit value: $exitValue") + def captureThriftServerOutput(source: String)(line: String): Unit = { + logInfo(s"server $source | $line") + if (line.startsWith(LOGGING_MARK)) { + logFilePath = line.drop(LOGGING_MARK.length).trim + // Ensure that the log file is created so that the `tail' command won't fail + Try(new File(logFilePath).createNewFile()) + logTailingProcess = Process(s"/usr/bin/env tail -f $logFilePath") + .run(ProcessLogger(captureLogOutput, _ => ())) + } } + Process(command).run(ProcessLogger( + captureThriftServerOutput("stdout"), + captureThriftServerOutput("stderr"))) + val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/" val user = System.getProperty("user.name") try { - Await.result(serverStarted.future, timeout) + Await.result(serverRunning.future, timeout) val connection = DriverManager.getConnection(jdbcUri, user, "") val statement = connection.createStatement() @@ -122,10 +137,15 @@ class HiveThriftServer2Suite extends FunSuite with Logging { |End HiveThriftServer2Suite failure output |========================================= """.stripMargin, cause) + throw cause } finally { warehousePath.delete() metastorePath.delete() - process.destroy() + Process(stopScript).run().exitValue() + // The `spark-daemon.sh' script uses kill, which is not synchronous, have to wait for a while. + Thread.sleep(3.seconds.toMillis) + Option(logTailingProcess).map(_.destroy()) + Option(logFilePath).map(new File(_).delete()) } } From ee92a82e606fe56dbd2a3f9778c30fcce25cb939 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 7 Oct 2014 00:20:02 +0800 Subject: [PATCH 2/6] Relaxes timeout --- .../org/apache/spark/sql/hive/thriftserver/CliSuite.scala | 5 ++++- .../spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala | 4 +--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 73240997e2ad6..fc97a25be34be 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -62,8 +62,11 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging { def captureOutput(source: String)(line: String) { buffer += s"$source> $line" + // If we haven't found all expected answers... if (next.get() < expectedAnswers.size) { + // If another expected answer is found... if (line.startsWith(expectedAnswers(next.get()))) { + // If all expected answers have been found... if (next.incrementAndGet() == expectedAnswers.size) { foundAllExpectedAnswers.trySuccess(()) } @@ -111,7 +114,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") - runCliWithin(1.minute)( + runCliWithin(3.minute)( "CREATE TABLE hive_test(key INT, val STRING);" -> "OK", "SHOW TABLES;" diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index e8abfbf74e552..e58a0edde6df2 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -43,7 +43,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { val verbose = Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).isDefined - def startThriftServerWithin(timeout: FiniteDuration = 10.seconds)(f: Statement => Unit) { + def startThriftServerWithin(timeout: FiniteDuration = 1.minute)(f: Statement => Unit) { Thread.sleep(5000) val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator) @@ -79,7 +79,6 @@ class HiveThriftServer2Suite extends FunSuite with Logging { var logFilePath: String = null def captureLogOutput(line: String): Unit = { - logInfo(s"server log | $line") buffer += line if (line.contains("ThriftBinaryCLIService listening on")) { serverRunning.success(()) @@ -87,7 +86,6 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } def captureThriftServerOutput(source: String)(line: String): Unit = { - logInfo(s"server $source | $line") if (line.startsWith(LOGGING_MARK)) { logFilePath = line.drop(LOGGING_MARK.length).trim // Ensure that the log file is created so that the `tail' command won't fail From d116405a69bc828c7da55ce12e0d2b2cf741bbb1 Mon Sep 17 00:00:00 2001 From: wangfei Date: Wed, 8 Oct 2014 00:23:07 +0800 Subject: [PATCH 3/6] make sure that log4j level is INFO --- .../sql/hive/thriftserver/HiveThriftServer2Suite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index e58a0edde6df2..e67ecf858ae5d 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -23,6 +23,7 @@ import java.sql.{DriverManager, Statement} import java.util.concurrent.TimeoutException import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConversions._ import scala.concurrent.duration._ import scala.concurrent.{Await, Promise} import scala.sys.process.{Process, ProcessLogger} @@ -42,7 +43,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging { Class.forName(classOf[HiveDriver].getCanonicalName) val verbose = Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).isDefined - + // Since we use info to assert server process started successfully, + // make sure that log4j level is INFO + org.apache.log4j.LogManager.getCurrentLoggers.foreach { log => + log.asInstanceOf[org.apache.log4j.Logger].setLevel(org.apache.log4j.Level.INFO) + } + def startThriftServerWithin(timeout: FiniteDuration = 1.minute)(f: Statement => Unit) { Thread.sleep(5000) From af2b5a973a83236deae49f6874bf8ee85d758e2b Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 8 Oct 2014 21:44:00 +0800 Subject: [PATCH 4/6] Removes log level hacks from TestHiveContext --- .../src/main/scala/org/apache/spark/sql/hive/TestHive.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index a4354c1379c63..66ce64a696fd7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -378,11 +378,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { */ def reset() { try { - // HACK: Hive is too noisy by default. - org.apache.log4j.LogManager.getCurrentLoggers.foreach { log => - log.asInstanceOf[org.apache.log4j.Logger].setLevel(org.apache.log4j.Level.WARN) - } - clearCache() loadedTables.clear() catalog.client.getAllTables("default").foreach { t => From 7805c33c71991d2a24ef5b82d4674a96ee0cb54b Mon Sep 17 00:00:00 2001 From: wangfei Date: Thu, 9 Oct 2014 16:29:53 +0800 Subject: [PATCH 5/6] reset SPARK_TESTING to avoid loading Log4J configurations in testing class paths --- .../sql/hive/thriftserver/HiveThriftServer2Suite.scala | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index e67ecf858ae5d..e28d23f2ce089 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -23,7 +23,6 @@ import java.sql.{DriverManager, Statement} import java.util.concurrent.TimeoutException import scala.collection.mutable.ArrayBuffer -import scala.collection.JavaConversions._ import scala.concurrent.duration._ import scala.concurrent.{Await, Promise} import scala.sys.process.{Process, ProcessLogger} @@ -43,11 +42,6 @@ class HiveThriftServer2Suite extends FunSuite with Logging { Class.forName(classOf[HiveDriver].getCanonicalName) val verbose = Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).isDefined - // Since we use info to assert server process started successfully, - // make sure that log4j level is INFO - org.apache.log4j.LogManager.getCurrentLoggers.foreach { log => - log.asInstanceOf[org.apache.log4j.Logger].setLevel(org.apache.log4j.Level.INFO) - } def startThriftServerWithin(timeout: FiniteDuration = 1.minute)(f: Statement => Unit) { Thread.sleep(5000) @@ -100,8 +94,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging { .run(ProcessLogger(captureLogOutput, _ => ())) } } - - Process(command).run(ProcessLogger( + // reset SPARK_TESTING to avoid loading Log4J configurations in testing class paths + Process(command, None, ("SPARK_TESTING", "0")).run(ProcessLogger( captureThriftServerOutput("stdout"), captureThriftServerOutput("stderr"))) From 1c384b7bc8b0b8d5b9b6bf294f399de5bb8a9976 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 9 Oct 2014 19:11:25 +0800 Subject: [PATCH 6/6] Minor code cleanup, restore the logging level hack in TestHive.scala --- .../sql/hive/thriftserver/HiveThriftServer2Suite.scala | 10 ++++------ .../scala/org/apache/spark/sql/hive/TestHive.scala | 5 +++++ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index e28d23f2ce089..e3b4e45a3d68c 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -41,13 +41,10 @@ import org.apache.spark.sql.catalyst.util.getTempFilePath class HiveThriftServer2Suite extends FunSuite with Logging { Class.forName(classOf[HiveDriver].getCanonicalName) - val verbose = Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).isDefined - def startThriftServerWithin(timeout: FiniteDuration = 1.minute)(f: Statement => Unit) { - Thread.sleep(5000) - val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator) val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator) + val warehousePath = getTempFilePath("warehouse") val metastorePath = getTempFilePath("metastore") val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true" @@ -94,8 +91,9 @@ class HiveThriftServer2Suite extends FunSuite with Logging { .run(ProcessLogger(captureLogOutput, _ => ())) } } - // reset SPARK_TESTING to avoid loading Log4J configurations in testing class paths - Process(command, None, ("SPARK_TESTING", "0")).run(ProcessLogger( + + // Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths + Process(command, None, "SPARK_TESTING" -> "0").run(ProcessLogger( captureThriftServerOutput("stdout"), captureThriftServerOutput("stderr"))) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 66ce64a696fd7..a4354c1379c63 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -378,6 +378,11 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { */ def reset() { try { + // HACK: Hive is too noisy by default. + org.apache.log4j.LogManager.getCurrentLoggers.foreach { log => + log.asInstanceOf[org.apache.log4j.Logger].setLevel(org.apache.log4j.Level.WARN) + } + clearCache() loadedTables.clear() catalog.client.getAllTables("default").foreach { t =>