From c39d0a5eabc1f505117e711c17a48b089b266483 Mon Sep 17 00:00:00 2001 From: scwf Date: Mon, 6 Oct 2014 17:45:41 +0800 Subject: [PATCH 1/3] fix HiveThriftServer2Suite --- .../thriftserver/HiveThriftServer2Suite.scala | 101 +++++++++++++----- 1 file changed, 73 insertions(+), 28 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index 38977ff16209..63f58bf28a4a 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{Await, Future, Promise} +import scala.io.Source import scala.sys.process.{Process, ProcessLogger} import java.io.File @@ -30,19 +31,19 @@ import java.util.concurrent.TimeoutException import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.jdbc.HiveDriver -import org.scalatest.FunSuite +import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.util.getTempFilePath +import org.apache.spark.sql.catalyst.util._ /** * Tests for the HiveThriftServer2 using JDBC. */ -class HiveThriftServer2Suite extends FunSuite with Logging { +class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with Logging { Class.forName(classOf[HiveDriver].getCanonicalName) private val listeningHost = "localhost" - private val listeningPort = { + private val listeningPort = { // Let the system to choose a random available port to avoid collision with other parallel // builds. val socket = new ServerSocket(0) @@ -54,10 +55,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging { private val warehousePath = getTempFilePath("warehouse") private val metastorePath = getTempFilePath("metastore") private val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true" + val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/" + val user = System.getProperty("user.name") - def startThriftServerWithin(timeout: FiniteDuration = 30.seconds)(f: Statement => Unit) { + override def beforeAll(): Unit = { + val timeout: FiniteDuration = 30.seconds val serverScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator) - val command = s"""$serverScript | --master local @@ -70,37 +73,40 @@ class HiveThriftServer2Suite extends FunSuite with Logging { val serverStarted = Promise[Unit]() val buffer = new ArrayBuffer[String]() + val startString = + "starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to " + val maxTries = 30 def captureOutput(source: String)(line: String) { buffer += s"$source> $line" - if (line.contains("ThriftBinaryCLIService listening on")) { - serverStarted.success(()) + if (line.contains(startString)) { + val logFile = new File(line.substring(startString.length)) + var tryNum = 0 + // This is a hack to wait logFile is ready + Thread.sleep(5000) + + // logFile may have not finished, try every second + while (!logFile.exists() || (!fileToString(logFile).contains( + "ThriftBinaryCLIService listening on") && tryNum < maxTries)) { + Thread.sleep(1000) + } + if (fileToString(logFile).contains("ThriftBinaryCLIService listening on")) { + serverStarted.success(()) + } else { + throw new TimeoutException() + } } } - val process = Process(command).run( ProcessLogger(captureOutput("stdout"), captureOutput("stderr"))) Future { val exitValue = process.exitValue() - logInfo(s"Spark SQL Thrift server process exit value: $exitValue") + logInfo(s"Start Spark SQL Thrift server process exit value: $exitValue") } - val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/" - val user = System.getProperty("user.name") - try { Await.result(serverStarted.future, timeout) - - val connection = DriverManager.getConnection(jdbcUri, user, "") - val statement = connection.createStatement() - - try { - f(statement) - } finally { - statement.close() - connection.close() - } } catch { case cause: Exception => cause match { @@ -123,14 +129,45 @@ class HiveThriftServer2Suite extends FunSuite with Logging { |========================================= """.stripMargin, cause) } finally { - warehousePath.delete() - metastorePath.delete() process.destroy() } } + override def afterAll() { + warehousePath.delete() + metastorePath.delete() + stopThriftserver + } + + def stopThriftserver: Unit = { + val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator) + val builder = new ProcessBuilder(stopScript) + val process = builder.start() + new Thread("read stderr") { + override def run() { + for (line <- Source.fromInputStream(process.getErrorStream).getLines()) { + System.err.println(line) + } + } + }.start() + val output = new StringBuffer + val stdoutThread = new Thread("read stdout") { + override def run() { + for (line <- Source.fromInputStream(process.getInputStream).getLines()) { + output.append(line) + } + } + } + stdoutThread.start() + val exitValue = process.waitFor() + logInfo(s"Stop Spark SQL Thrift server process exit value: $exitValue") + } + test("Test JDBC query execution") { - startThriftServerWithin() { statement => + val connection = DriverManager.getConnection(jdbcUri, user, "") + val statement = connection.createStatement() + + try { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") @@ -146,11 +183,17 @@ class HiveThriftServer2Suite extends FunSuite with Logging { resultSet.next() resultSet.getInt(1) } + } finally { + statement.close() + connection.close() } } test("SPARK-3004 regression: result set containing NULL") { - startThriftServerWithin() { statement => + val connection = DriverManager.getConnection(jdbcUri, user, "") + val statement = connection.createStatement() + + try { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource( "data/files/small_kv_with_null.txt") @@ -169,8 +212,10 @@ class HiveThriftServer2Suite extends FunSuite with Logging { assert(resultSet.getInt(1) === 0) assert(resultSet.wasNull()) } - assert(!resultSet.next()) + } finally { + statement.close() + connection.close() } } } From 0081a508f147a2b7bd7065149b8d3da308ba3d37 Mon Sep 17 00:00:00 2001 From: scwf Date: Mon, 6 Oct 2014 17:51:14 +0800 Subject: [PATCH 2/3] fix code format --- .../spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index 63f58bf28a4a..b85b78cde9b2 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -82,9 +82,8 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with Loggi if (line.contains(startString)) { val logFile = new File(line.substring(startString.length)) var tryNum = 0 - // This is a hack to wait logFile is ready + // This is a hack to wait logFile ready Thread.sleep(5000) - // logFile may have not finished, try every second while (!logFile.exists() || (!fileToString(logFile).contains( "ThriftBinaryCLIService listening on") && tryNum < maxTries)) { From 48979f6e6d3384b8f4b900d700d11db79e18284a Mon Sep 17 00:00:00 2001 From: scwf Date: Tue, 7 Oct 2014 00:36:14 +0800 Subject: [PATCH 3/3] println the process output --- .../thriftserver/HiveThriftServer2Suite.scala | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index b85b78cde9b2..95fa8fe6380f 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.jdbc.HiveDriver import org.scalatest.{BeforeAndAfterAll, FunSuite} -import org.apache.spark.Logging +import org.apache.spark.{SparkException, Logging} import org.apache.spark.sql.catalyst.util._ /** @@ -58,7 +58,7 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with Loggi val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/" val user = System.getProperty("user.name") - override def beforeAll(): Unit = { + override def beforeAll() = { val timeout: FiniteDuration = 30.seconds val serverScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator) val command = @@ -87,8 +87,10 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with Loggi // logFile may have not finished, try every second while (!logFile.exists() || (!fileToString(logFile).contains( "ThriftBinaryCLIService listening on") && tryNum < maxTries)) { + tryNum = tryNum + 1 Thread.sleep(1000) } + println(fileToString(logFile)) if (fileToString(logFile).contains("ThriftBinaryCLIService listening on")) { serverStarted.success(()) } else { @@ -101,7 +103,8 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with Loggi Future { val exitValue = process.exitValue() - logInfo(s"Start Spark SQL Thrift server process exit value: $exitValue") + serverStarted.tryFailure( + new SparkException(s"Spark SQL Thrift server process exit value: $exitValue")) } try { @@ -127,6 +130,7 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with Loggi |End HiveThriftServer2Suite failure output |========================================= """.stripMargin, cause) + throw cause } finally { process.destroy() } @@ -138,26 +142,24 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with Loggi stopThriftserver } - def stopThriftserver: Unit = { + def stopThriftserver = { val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator) val builder = new ProcessBuilder(stopScript) val process = builder.start() new Thread("read stderr") { override def run() { for (line <- Source.fromInputStream(process.getErrorStream).getLines()) { - System.err.println(line) + println(line) } } }.start() - val output = new StringBuffer - val stdoutThread = new Thread("read stdout") { + new Thread("read stdout") { override def run() { for (line <- Source.fromInputStream(process.getInputStream).getLines()) { - output.append(line) + println(line) } } - } - stdoutThread.start() + }.start() val exitValue = process.waitFor() logInfo(s"Stop Spark SQL Thrift server process exit value: $exitValue") } @@ -180,6 +182,7 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with Loggi assertResult(5, "Row count mismatch") { val resultSet = statement.executeQuery("SELECT COUNT(*) FROM test") resultSet.next() +// println(s"#######${resultSet.getInt(1)}") resultSet.getInt(1) } } finally { @@ -208,6 +211,7 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with Loggi (0 until 5).foreach { _ => resultSet.next() +// println(s"#######${resultSet.getInt(1)}") assert(resultSet.getInt(1) === 0) assert(resultSet.wasNull()) }