Skip to content

Commit 7844933

Browse files
committed
pass the user parameters to metaDataHive
1 parent 2811265 commit 7844933

File tree

6 files changed

+35
-16
lines changed

6 files changed

+35
-16
lines changed

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
263263

264264
}
265265

266+
266267
private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
267268
private val sessionState = SessionState.get().asInstanceOf[CliSessionState]
268269

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.hive.thriftserver
1919

2020
import java.util.{Arrays, ArrayList => JArrayList, List => JList}
21-
import org.apache.log4j.LogManager
2221
import org.apache.spark.sql.AnalysisException
2322

2423
import scala.collection.JavaConverters._

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import java.io.PrintStream
2121

2222
import scala.collection.JavaConverters._
2323

24+
import org.apache.hadoop.hive.ql.session.SessionState
25+
2426
import org.apache.spark.scheduler.StatsReportListener
25-
import org.apache.spark.sql.hive.HiveContext
27+
import org.apache.spark.sql.hive.{UserInput, HiveContext}
2628
import org.apache.spark.{Logging, SparkConf, SparkContext}
2729
import org.apache.spark.util.Utils
2830

@@ -55,7 +57,10 @@ private[hive] object SparkSQLEnv extends Logging {
5557

5658
sparkContext = new SparkContext(sparkConf)
5759
sparkContext.addSparkListener(new StatsReportListener())
58-
hiveContext = new HiveContext(sparkContext)
60+
61+
val sessionState = SessionState.get()
62+
hiveContext = new HiveContext(sparkContext,
63+
Some(UserInput(sessionState.getIsSilent, sessionState.getIsVerbose)))
5964

6065
hiveContext.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
6166
hiveContext.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ import org.apache.spark.unsafe.types.UTF8String
5656
import org.apache.spark.util.Utils
5757
import org.apache.spark.{Logging, SparkContext}
5858

59+
/**
60+
* Use to encapsulate the user input parameters from spark-sql CLI.
61+
*/
62+
private[hive] case class UserInput (isSilent: Boolean, isVerbose: Boolean)
5963

6064
/**
6165
* This is the HiveQL Dialect, this dialect is strongly bind with HiveContext
@@ -93,13 +97,19 @@ class HiveContext private[hive](
9397
listener: SQLListener,
9498
@transient private val execHive: ClientWrapper,
9599
@transient private val metaHive: ClientInterface,
96-
isRootContext: Boolean)
100+
isRootContext: Boolean,
101+
userInput: Option[UserInput] = None)
97102
extends SQLContext(sc, cacheManager, listener, isRootContext) with Logging {
98103
self =>
99104

100105
def this(sc: SparkContext) = {
101-
this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), null, null, true)
106+
this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), null, null, true, None)
107+
}
108+
109+
def this(sc: SparkContext, userInput: Option[UserInput]) = {
110+
this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), null, null, true, userInput)
102111
}
112+
103113
def this(sc: JavaSparkContext) = this(sc.sc)
104114

105115
import org.apache.spark.sql.hive.HiveContext._
@@ -215,7 +225,7 @@ class HiveContext private[hive](
215225
config = newTemporaryConfiguration(useInMemoryDerby = true),
216226
isolationOn = false,
217227
baseClassLoader = Utils.getContextOrSparkClassLoader)
218-
loader.createClient().asInstanceOf[ClientWrapper]
228+
loader.createClient(userInput).asInstanceOf[ClientWrapper]
219229
}
220230

221231
/**
@@ -324,7 +334,7 @@ class HiveContext private[hive](
324334
barrierPrefixes = hiveMetastoreBarrierPrefixes,
325335
sharedPrefixes = hiveMetastoreSharedPrefixes)
326336
}
327-
isolatedLoader.createClient()
337+
isolatedLoader.createClient(userInput)
328338
}
329339

330340
protected[sql] override def parseSql(sql: String): LogicalPlan = {
@@ -656,7 +666,6 @@ class HiveContext private[hive](
656666
}
657667
}
658668

659-
660669
private[hive] object HiveContext {
661670
/** The version of hive used internally by Spark SQL. */
662671
val hiveExecutionVersion: String = "1.2.1"

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.hadoop.security.UserGroupInformation
3636
import org.apache.hadoop.util.VersionInfo
3737

3838
import org.apache.spark.{SparkConf, SparkException, Logging}
39+
import org.apache.spark.sql.hive.UserInput
3940
import org.apache.spark.sql.catalyst.expressions.Expression
4041
import org.apache.spark.sql.execution.QueryExecutionException
4142
import org.apache.spark.util.{CircularBuffer, Utils}
@@ -61,10 +62,10 @@ private[hive] class ClientWrapper(
6162
override val version: HiveVersion,
6263
config: Map[String, String],
6364
initClassLoader: ClassLoader,
64-
val clientLoader: IsolatedClientLoader)
65+
val clientLoader: IsolatedClientLoader,
66+
userInput: Option[UserInput])
6567
extends ClientInterface
6668
with Logging {
67-
6869
overrideHadoopShims()
6970

7071
// !! HACK ALERT !!
@@ -194,6 +195,10 @@ private[hive] class ClientWrapper(
194195
SessionState.start(state)
195196
state.out = new PrintStream(outputBuffer, true, "UTF-8")
196197
state.err = new PrintStream(outputBuffer, true, "UTF-8")
198+
userInput.foreach { input =>
199+
state.setIsSilent(input.isSilent)
200+
state.setIsVerbose(input.isVerbose)
201+
}
197202
state
198203
} finally {
199204
Thread.currentThread().setContextClassLoader(original)
@@ -494,7 +499,7 @@ private[hive] class ClientWrapper(
494499
results
495500

496501
case _ =>
497-
if (state.out != null) {
502+
if (state.out != null && !state.getIsSilent) {
498503
// scalastyle:off println
499504
state.out.println(tokens(0) + " " + cmd_1)
500505
// scalastyle:on println
@@ -582,7 +587,7 @@ private[hive] class ClientWrapper(
582587
}
583588

584589
def newSession(): ClientWrapper = {
585-
clientLoader.createClient().asInstanceOf[ClientWrapper]
590+
clientLoader.createClient(userInput).asInstanceOf[ClientWrapper]
586591
}
587592

588593
def reset(): Unit = withHiveState {

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.commons.io.{FileUtils, IOUtils}
3030
import org.apache.spark.Logging
3131
import org.apache.spark.deploy.SparkSubmitUtils
3232
import org.apache.spark.sql.catalyst.util.quietly
33-
import org.apache.spark.sql.hive.HiveContext
33+
import org.apache.spark.sql.hive.{UserInput, HiveContext}
3434
import org.apache.spark.util.{MutableURLClassLoader, Utils}
3535

3636
/** Factory for `IsolatedClientLoader` with specific versions of hive. */
@@ -233,9 +233,9 @@ private[hive] class IsolatedClientLoader(
233233
}
234234

235235
/** The isolated client interface to Hive. */
236-
private[hive] def createClient(): ClientInterface = {
236+
private[hive] def createClient(userInput: Option[UserInput] = None): ClientInterface = {
237237
if (!isolationOn) {
238-
return new ClientWrapper(version, config, baseClassLoader, this)
238+
return new ClientWrapper(version, config, baseClassLoader, this, userInput)
239239
}
240240
// Pre-reflective instantiation setup.
241241
logDebug("Initializing the logger to avoid disaster...")
@@ -246,7 +246,7 @@ private[hive] class IsolatedClientLoader(
246246
classLoader
247247
.loadClass(classOf[ClientWrapper].getName)
248248
.getConstructors.head
249-
.newInstance(version, config, classLoader, this)
249+
.newInstance(version, config, classLoader, this, userInput)
250250
.asInstanceOf[ClientInterface]
251251
} catch {
252252
case e: InvocationTargetException =>

0 commit comments

Comments
 (0)