Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import org.apache.spark.Logging
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.sql.catalyst.util.getTempFilePath

class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
val verbose = Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).isDefined

def runCliWithin(
timeout: FiniteDuration,
extraArgs: Seq[String] = Seq.empty)(
Expand Down Expand Up @@ -61,6 +63,9 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
val buffer = new ArrayBuffer[String]()

def captureOutput(source: String)(line: String) {
if (verbose) {
logInfo(s"$source> $line")
}
buffer += s"$source> $line"
if (line.contains(expectedAnswers(next.get()))) {
if (next.incrementAndGet() == expectedAnswers.size) {
Expand All @@ -75,7 +80,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {

Future {
val exitValue = process.exitValue()
logInfo(s"Spark SQL CLI process exit value: $exitValue")
foundAllExpectedAnswers.tryFailure(
new SparkException(s"Spark SQL CLI process exit value: $exitValue"))
}

try {
Expand Down Expand Up @@ -118,7 +124,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
-> "Time taken: ",
"SELECT COUNT(*) FROM hive_test;"
-> "5",
"DROP TABLE hive_test"
"DROP TABLE hive_test;"
-> "Time taken: "
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.jdbc.HiveDriver
import org.scalatest.FunSuite

import org.apache.spark.Logging
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.sql.catalyst.util.getTempFilePath

/**
Expand All @@ -41,22 +41,22 @@ import org.apache.spark.sql.catalyst.util.getTempFilePath
class HiveThriftServer2Suite extends FunSuite with Logging {
Class.forName(classOf[HiveDriver].getCanonicalName)

private val listeningHost = "localhost"
private val listeningPort = {
// Let the system to choose a random available port to avoid collision with other parallel
// builds.
val socket = new ServerSocket(0)
val port = socket.getLocalPort
socket.close()
port
}

private val warehousePath = getTempFilePath("warehouse")
private val metastorePath = getTempFilePath("metastore")
private val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
val verbose = Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).isDefined

def startThriftServerWithin(timeout: FiniteDuration = 30.seconds)(f: Statement => Unit) {
val serverScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
val warehousePath = getTempFilePath("warehouse")
val metastorePath = getTempFilePath("metastore")
val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
val listeningHost = "localhost"
val listeningPort = {
// Let the system to choose a random available port to avoid collision with other parallel
// builds.
val socket = new ServerSocket(0)
val port = socket.getLocalPort
socket.close()
port
}

val command =
s"""$serverScript
Expand All @@ -68,13 +68,16 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$listeningPort
""".stripMargin.split("\\s+").toSeq

val serverStarted = Promise[Unit]()
val serverRunning = Promise[Unit]()
val buffer = new ArrayBuffer[String]()

def captureOutput(source: String)(line: String) {
if (verbose) {
logInfo(s"$source> $line")
}
buffer += s"$source> $line"
if (line.contains("ThriftBinaryCLIService listening on")) {
serverStarted.success(())
serverRunning.success(())
}
}

Expand All @@ -83,14 +86,16 @@ class HiveThriftServer2Suite extends FunSuite with Logging {

Future {
val exitValue = process.exitValue()
logInfo(s"Spark SQL Thrift server process exit value: $exitValue")
// Stop waiting for server start in case the Thrift server exits prematurely.
serverRunning.tryFailure(
new SparkException(s"Spark SQL Thrift server process exit value: $exitValue"))
}

val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/"
val user = System.getProperty("user.name")

try {
Await.result(serverStarted.future, timeout)
Await.result(serverRunning.future, timeout)

val connection = DriverManager.getConnection(jdbcUri, user, "")
val statement = connection.createStatement()
Expand Down Expand Up @@ -122,6 +127,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
|End HiveThriftServer2Suite failure output
|=========================================
""".stripMargin, cause)
throw cause
} finally {
warehousePath.delete()
metastorePath.delete()
Expand Down