From 3cd6fab15c721641bcc6590bd80ab8306f2d4137 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 22 Oct 2014 14:41:13 +0800 Subject: [PATCH 1/4] Fixes SPARK-4037 --- .../hive/thriftserver/HiveThriftServer2.scala | 17 +---------------- .../sql/hive/thriftserver/SparkSQLEnv.scala | 18 +++++++----------- .../thriftserver/HiveThriftServer2Suite.scala | 10 ++++++---- 3 files changed, 14 insertions(+), 31 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 3d468d804622c..bd4e99492b395 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -17,11 +17,8 @@ package org.apache.spark.sql.hive.thriftserver -import scala.collection.JavaConversions._ - import org.apache.commons.logging.LogFactory import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor} @@ -51,24 +48,12 @@ object HiveThriftServer2 extends Logging { def main(args: Array[String]) { val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2") - if (!optionsProcessor.process(args)) { System.exit(-1) } - val ss = new SessionState(new HiveConf(classOf[SessionState])) - - // Set all properties specified via command line. - val hiveConf: HiveConf = ss.getConf - hiveConf.getAllProperties.toSeq.sortBy(_._1).foreach { case (k, v) => - logDebug(s"HiveConf var: $k=$v") - } - - SessionState.start(ss) - logInfo("Starting SparkContext") SparkSQLEnv.init() - SessionState.start(ss) Runtime.getRuntime.addShutdownHook( new Thread() { @@ -80,7 +65,7 @@ object HiveThriftServer2 extends Logging { try { val server = new HiveThriftServer2(SparkSQLEnv.hiveContext) - server.init(hiveConf) + server.init(SparkSQLEnv.hiveContext.hiveconf) server.start() logInfo("HiveThriftServer2 started") } catch { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 2136a2ea63543..50425863518c3 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -17,12 +17,10 @@ package org.apache.spark.sql.hive.thriftserver -import org.apache.hadoop.hive.ql.session.SessionState - -import org.apache.spark.scheduler.{SplitInfo, StatsReportListener} -import org.apache.spark.Logging +import org.apache.spark.scheduler.StatsReportListener import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{Logging, SparkConf, SparkContext} +import scala.collection.JavaConversions._ /** A singleton object for the master program. The slaves should not access this. */ private[hive] object SparkSQLEnv extends Logging { @@ -37,14 +35,12 @@ private[hive] object SparkSQLEnv extends Logging { .setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}")) sparkContext.addSparkListener(new StatsReportListener()) + hiveContext = new HiveContext(sparkContext) - hiveContext = new HiveContext(sparkContext) { - @transient override lazy val sessionState = { - val state = SessionState.get() - setConf(state.getConf.getAllProperties) - state + if (log.isDebugEnabled) { + hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) => + logDebug(s"HiveConf var: $k=$v") } - @transient override lazy val hiveconf = sessionState.getConf } } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index e3b4e45a3d68c..c60e8fa5b1259 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -150,10 +150,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") - val queries = Seq( - "CREATE TABLE test(key INT, val STRING)", - s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test", - "CACHE TABLE test") + val queries = + s"""SET spark.sql.shuffle.partitions=3; + |CREATE TABLE test(key INT, val STRING); + |LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test; + |CACHE TABLE test; + """.stripMargin.split(";").map(_.trim).filter(_.nonEmpty) queries.foreach(statement.execute) From 49b1c5b961d405dfed0da89fda84cfe9d989e4dd Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 22 Oct 2014 16:21:05 +0800 Subject: [PATCH 2/4] Reuses existing started SessionState if any --- .../apache/spark/sql/hive/HiveContext.scala | 43 +++++++++++++------ 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index fad4091d48a89..1dd8581b62220 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -224,21 +224,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } /** - * SQLConf and HiveConf contracts: when the hive session is first initialized, params in - * HiveConf will get picked up by the SQLConf. Additionally, any properties set by - * set() or a SET command inside sql() will be set in the SQLConf *as well as* - * in the HiveConf. + * SQLConf and HiveConf contracts: + * + * 1. reuse existing started SessionState if any + * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the + * SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be + * set in the SQLConf *as well as* in the HiveConf. */ - @transient lazy val hiveconf = new HiveConf(classOf[SessionState]) - @transient protected[hive] lazy val sessionState = { - val ss = new SessionState(hiveconf) - setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf. - SessionState.start(ss) - ss.err = new PrintStream(outputBuffer, true, "UTF-8") - ss.out = new PrintStream(outputBuffer, true, "UTF-8") - - ss - } + @transient protected[hive] lazy val (hiveconf, sessionState) = + Option(SessionState.get()) + .orElse { + val newState = new SessionState(new HiveConf(classOf[SessionState])) + // Only starts newly created `SessionState` instance. Any existing `SessionState` instance + // returned by `SessionState.get()` must be the most recently started one. + SessionState.start(newState) + Some(newState) + } + .map { state => + setConf(state.getConf.getAllProperties) + if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8") + if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8") + (state.getConf, state) + } + .get override def setConf(key: String, value: String): Unit = { super.setConf(key, value) @@ -288,8 +296,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf) + // Makes sure the session represented by the `sessionState` field is activated. This implies + // Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks + // session isolation under multi-user scenarios (i.e. HiveThriftServer2). + // TODO Fix session isolation + SessionState.start(sessionState) + proc match { case driver: Driver => + driver.init() val results = HiveShim.createDriverResultsArray val response: CommandProcessorResponse = driver.run(cmd) // Throw an exception if there is an error in query processing. From a28fef53869dceecbbb75c3e9cd9e2a48b3e8fc1 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 29 Oct 2014 07:16:17 +0800 Subject: [PATCH 3/4] Avoid starting HiveContext.sessionState multiple times --- .../main/scala/org/apache/spark/sql/hive/HiveContext.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 1dd8581b62220..1522c35b66b95 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -300,7 +300,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { // Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks // session isolation under multi-user scenarios (i.e. HiveThriftServer2). // TODO Fix session isolation - SessionState.start(sessionState) + if (SessionState.get() != sessionState) { + SessionState.start(sessionState) + } proc match { case driver: Driver => From 8446675f8a82a0e873ea2068e085b843a8107f5c Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 29 Oct 2014 07:30:26 +0800 Subject: [PATCH 4/4] Removes redundant Driver initialization --- .../src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 1522c35b66b95..ff8fa44194d98 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -306,7 +306,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { proc match { case driver: Driver => - driver.init() val results = HiveShim.createDriverResultsArray val response: CommandProcessorResponse = driver.run(cmd) // Throw an exception if there is an error in query processing.