Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse

import org.apache.spark.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}

private[hive] abstract class AbstractSparkSQLDriver(
Expand Down Expand Up @@ -58,13 +59,23 @@ private[hive] abstract class AbstractSparkSQLDriver(
hiveResponse = execution.stringResult()
tableSchema = getResultSetSchema(execution)
new CommandProcessorResponse(0)
} catch {
case cause: Throwable =>
logError(s"Failed in [$command]", cause)
new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(cause), null)
}
}

def runWrapper(command: String): CommandProcessorResponseWrapper = try {
val result = run(command)
new CommandProcessorResponseWrapper(result, null)
} catch {
case ae: AnalysisException =>
logDebug(s"Failed in [$command]", ae)
new CommandProcessorResponseWrapper(new CommandProcessorResponse(1,
ExceptionUtils.getStackTrace(ae), null), ae)
case cause: Throwable =>
logError(s"Failed in [$command]", cause)
new CommandProcessorResponseWrapper(new CommandProcessorResponse(1,
ExceptionUtils.getStackTrace(cause), null), cause)
}

override def close(): Int = {
hiveResponse = null
tableSchema = null
Expand All @@ -79,3 +90,7 @@ private[hive] abstract class AbstractSparkSQLDriver(
tableSchema = null
}
}

private[hive] case class CommandProcessorResponseWrapper(
rc : CommandProcessorResponse,
cause : Throwable)
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.spark.sql.hive.thriftserver

import scala.collection.JavaConversions._

import java.io._
import java.util.{ArrayList => JArrayList}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no new line

import jline.{ConsoleReader, History}
import scala.collection.JavaConversions._

import jline.{ConsoleReader, History}
import org.apache.commons.lang3.StringUtils
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.conf.Configuration
Expand All @@ -32,13 +31,14 @@ import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, SetProcessor, CommandProcessor}
import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, CommandProcessor, SetProcessor}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.thrift.transport.TSocket

import org.apache.spark.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.{HiveContext, HiveShim}
import org.apache.spark.util.{ShutdownHookManager, Utils}
import org.apache.spark.util.ShutdownHookManager

private[hive] object SparkSQLCLIDriver {
private var prompt = "spark-sql"
Expand Down Expand Up @@ -276,19 +276,23 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {

driver.init()
val out = sessionState.out
val err = sessionState.err
val start: Long = System.currentTimeMillis()
if (sessionState.getIsVerbose) {
out.println(cmd)
}
val rc = driver.run(cmd)
val rcWrapper = driver.runWrapper(cmd)
val end = System.currentTimeMillis()
val timeTaken: Double = (end - start) / 1000.0

ret = rc.getResponseCode
ret = rcWrapper.rc.getResponseCode
if (ret != 0) {
console.printError(rc.getErrorMessage())
driver.close()
return ret
// For analysis exception, only the error is printed out to the console.
rcWrapper.cause match {
case e: AnalysisException =>
err.println(s"""Error in query: ${e.getMessage}""")
case _ => err.println(rcWrapper.rc.getErrorMessage())
}
}

val res = new JArrayList[String]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,21 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
metastorePath.delete()
}

/**
* Run a CLI operation and expect all the queries and expected answers to be returned.
* @param timeout maximum time for the commands to complete
* @param extraArgs any extra arguments
* @param errorResponses a sequence of strings whose presence in the stdout of the forked process
* is taken as an immediate error condition. That is: if a line containing
* with one of these strings is found, fail the test immediately.
* The default value is `Seq("Error:")`
*
* @param queriesAndExpectedAnswers one or more tupes of query + answer
*/
def runCliWithin(
timeout: FiniteDuration,
extraArgs: Seq[String] = Seq.empty)(
extraArgs: Seq[String] = Seq.empty,
errorResponses: Seq[String] = Seq("Error:"))(
queriesAndExpectedAnswers: (String, String)*): Unit = {

val (queries, expectedAnswers) = queriesAndExpectedAnswers.unzip
Expand Down Expand Up @@ -82,6 +94,14 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
foundAllExpectedAnswers.trySuccess(())
}
}
else {
errorResponses.foreach { r =>
if (line.contains(r)) {
foundAllExpectedAnswers.tryFailure(
new RuntimeException(s"Failed with error line '$line'"))
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent is off due to newline.

}
}

// Searching expected output line from both stdout and stderr of the CLI process
Expand Down Expand Up @@ -184,4 +204,12 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
-> "OK"
)
}

test("SPARK-11188 Analysis error reporting") {
runCliWithin(timeout = 2.minute,
errorResponses = Seq("AnalysisException"))(
"select * from nonexistent_table;"
-> "Error in query: no such table nonexistent_table;"
)
}
}