From e4211137bdc72c3e94d7bce2944d108e5cb70b55 Mon Sep 17 00:00:00 2001 From: jinxing Date: Sat, 16 Sep 2017 10:37:26 +0800 Subject: [PATCH] Close hive session state when finish. --- .../spark/sql/catalyst/catalog/ExternalCatalog.scala | 2 ++ .../scala/org/apache/spark/sql/SparkSession.scala | 6 ++++++ .../org/apache/spark/sql/internal/SharedState.scala | 8 ++++++++ .../java/org/apache/hive/service/cli/CLIService.java | 5 +++++ .../sql/hive/thriftserver/HiveThriftServer2.scala | 2 ++ .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 5 ++++- .../spark/sql/hive/thriftserver/SparkSQLEnv.scala | 12 ++++-------- .../apache/spark/sql/hive/HiveExternalCatalog.scala | 5 +++++ .../spark/sql/hive/HiveSessionStateBuilder.scala | 2 +- .../apache/spark/sql/hive/client/HiveClient.scala | 2 ++ .../spark/sql/hive/client/HiveClientImpl.scala | 4 ++++ .../org/apache/spark/sql/hive/test/TestHive.scala | 2 +- .../apache/spark/sql/hive/client/VersionsSuite.scala | 5 ----- 13 files changed, 44 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index d4c58db3708e3..469595462c8ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -360,4 +360,6 @@ abstract class ExternalCatalog event: ExternalCatalogEvent): Unit = { listener.onEvent(event) } + + def close(): Unit = { } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index d5ab53ad8fe29..26891df604ea3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -114,9 +114,12 @@ class SparkSession private( @InterfaceStability.Unstable @transient lazy val sharedState: SharedState = { + sharedStateInitialized = true existingSharedState.getOrElse(new SharedState(sparkContext)) } + var sharedStateInitialized: Boolean = false + /** * Initial options for session. This options are applied once when sessionState is created. */ @@ -706,6 +709,9 @@ class SparkSession private( */ def stop(): Unit = { sparkContext.stop() + if (sharedStateInitialized) { + sharedState.close() + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 7202f1222d10f..ce1ad4323b05e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -87,6 +87,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { */ val listener: SQLListener = createListenerAndUI(sparkContext) + var externalCatalogInitialized: Boolean = false /** * A catalog that interacts with external systems. */ @@ -115,6 +116,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { } }) + externalCatalogInitialized = true externalCatalog } @@ -154,6 +156,12 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { } SparkSession.sqlListener.get() } + + def close(): Unit = { + if (externalCatalogInitialized) { + externalCatalog.close() + } + } } object SharedState extends Logging { diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java index 791ddcbd2c5b6..dcd814dc9facb 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java @@ -126,6 +126,11 @@ private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveExc ss.setIsHiveServerQuery(true); SessionState.start(ss); ss.applyAuthorizationPolicy(); + try { + ss.close(); + } catch (IOException e) { + LOG.error("Failed closing Hive session state.", e); + } } private void setupBlockedUdfs() { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 7442c987efc72..f7bcd93ca9554 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -109,6 +109,8 @@ object HiveThriftServer2 extends Logging { case e: Exception => logError("Error starting HiveThriftServer2", e) System.exit(-1) + } finally { + executionHive.close() } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 832a15d09599f..cf1963ada77f8 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -124,7 +124,10 @@ private[hive] object SparkSQLCLIDriver extends Logging { SessionState.start(sessionState) // Clean up after we exit - ShutdownHookManager.addShutdownHook { () => SparkSQLEnv.stop() } + ShutdownHookManager.addShutdownHook { () => + sessionState.close() + SparkSQLEnv.stop() + } val remoteMode = isRemoteMode(sessionState) // "-h" option has been passed, so connect to Hive thrift server. diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 01c4eb131a564..e97db97cb0067 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -22,7 +22,7 @@ import java.io.PrintStream import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.util.Utils /** A singleton object for the master program. The slaves should not access this. */ @@ -31,6 +31,7 @@ private[hive] object SparkSQLEnv extends Logging { var sqlContext: SQLContext = _ var sparkContext: SparkContext = _ + var sparkSession: SparkSession = _ def init() { if (sqlContext == null) { @@ -45,16 +46,10 @@ private[hive] object SparkSQLEnv extends Logging { sparkConf .setAppName(maybeAppName.getOrElse(s"SparkSQL::${Utils.localHostName()}")) - val sparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate() + sparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate() sparkContext = sparkSession.sparkContext sqlContext = sparkSession.sqlContext - val metadataHive = sparkSession - .sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] - .client.newSession() - metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) - metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) - metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) sparkSession.conf.set("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) } } @@ -65,6 +60,7 @@ private[hive] object SparkSQLEnv extends Logging { // Stop the SparkContext if (SparkSQLEnv.sparkContext != null) { sparkContext.stop() + sparkSession.close() sparkContext = null sqlContext = null } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 96dc983b0bfc6..17f04fdf9def2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -1267,6 +1267,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.listFunctions(db, pattern) } + override def close(): Unit = { + super.close() + client.close() + } + } object HiveExternalCatalog { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 92cb4ef11c9e3..dc92ad3b0c1ac 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -42,7 +42,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session * Create a Hive aware resource loader. */ override protected lazy val resourceLoader: HiveSessionResourceLoader = { - val client: HiveClient = externalCatalog.client.newSession() + val client: HiveClient = externalCatalog.client new HiveSessionResourceLoader(session, client) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index ee3eb2ee8abe5..288340b897b90 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -278,4 +278,6 @@ private[hive] trait HiveClient { /** Used for testing only. Removes all metadata from this instance of Hive. */ def reset(): Unit + /** Close this client. */ + def close(): Unit } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c4e48c9360db7..a078bd20c7f6a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -838,6 +838,10 @@ private[hive] class HiveClientImpl( client.dropDatabase(db, true, false, true) } } + + def close(): Unit = { + state.close() + } } private[hive] object HiveClientImpl { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 0f6a81b6f813b..09788327cba8b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -198,7 +198,7 @@ private[hive] class TestHiveSparkSession( new TestHiveSessionStateBuilder(this, parentSessionState).build() } - lazy val metadataHive: HiveClient = sharedState.externalCatalog.client.newSession() + lazy val metadataHive: HiveClient = sharedState.externalCatalog.client override def newSession(): TestHiveSparkSession = { new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index edb9a9ffbaaf6..aa3be9dd1d034 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -578,11 +578,6 @@ class VersionsSuite extends SparkFunSuite with Logging { client.setError(new PrintStream(new ByteArrayOutputStream())) } - test(s"$version: newSession") { - val newClient = client.newSession() - assert(newClient != null) - } - test(s"$version: withHiveState and addJar") { val newClassPath = "." client.addJar(newClassPath)