Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,4 +360,6 @@ abstract class ExternalCatalog
event: ExternalCatalogEvent): Unit = {
listener.onEvent(event)
}

def close(): Unit = { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,12 @@ class SparkSession private(
@InterfaceStability.Unstable
@transient
lazy val sharedState: SharedState = {
sharedStateInitialized = true
existingSharedState.getOrElse(new SharedState(sparkContext))
}

var sharedStateInitialized: Boolean = false

/**
* Initial options for session. This options are applied once when sessionState is created.
*/
Expand Down Expand Up @@ -706,6 +709,9 @@ class SparkSession private(
*/
def stop(): Unit = {
sparkContext.stop()
if (sharedStateInitialized) {
sharedState.close()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
*/
val listener: SQLListener = createListenerAndUI(sparkContext)

var externalCatalogInitialized: Boolean = false
/**
* A catalog that interacts with external systems.
*/
Expand Down Expand Up @@ -115,6 +116,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
}
})

externalCatalogInitialized = true
externalCatalog
}

Expand Down Expand Up @@ -154,6 +156,12 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
}
SparkSession.sqlListener.get()
}

def close(): Unit = {
if (externalCatalogInitialized) {
externalCatalog.close()
}
}
}

object SharedState extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveExc
ss.setIsHiveServerQuery(true);
SessionState.start(ss);
ss.applyAuthorizationPolicy();
try {
ss.close();
} catch (IOException e) {
LOG.error("Failed closing Hive session state.", e);
}
}

private void setupBlockedUdfs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ object HiveThriftServer2 extends Logging {
case e: Exception =>
logError("Error starting HiveThriftServer2", e)
System.exit(-1)
} finally {
executionHive.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ private[hive] object SparkSQLCLIDriver extends Logging {
SessionState.start(sessionState)

// Clean up after we exit
ShutdownHookManager.addShutdownHook { () => SparkSQLEnv.stop() }
ShutdownHookManager.addShutdownHook { () =>
sessionState.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it matters, but keep in mind this will hold a reference to sessionState then until the JVM exits

SparkSQLEnv.stop()
}

val remoteMode = isRemoteMode(sessionState)
// "-h" option has been passed, so connect to Hive thrift server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.PrintStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.util.Utils

/** A singleton object for the master program. The slaves should not access this. */
Expand All @@ -31,6 +31,7 @@ private[hive] object SparkSQLEnv extends Logging {

var sqlContext: SQLContext = _
var sparkContext: SparkContext = _
var sparkSession: SparkSession = _

def init() {
if (sqlContext == null) {
Expand All @@ -45,16 +46,10 @@ private[hive] object SparkSQLEnv extends Logging {
sparkConf
.setAppName(maybeAppName.getOrElse(s"SparkSQL::${Utils.localHostName()}"))

val sparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate()
sparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate()
sparkContext = sparkSession.sparkContext
sqlContext = sparkSession.sqlContext

val metadataHive = sparkSession
.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
.client.newSession()
metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
sparkSession.conf.set("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
}
}
Expand All @@ -65,6 +60,7 @@ private[hive] object SparkSQLEnv extends Logging {
// Stop the SparkContext
if (SparkSQLEnv.sparkContext != null) {
sparkContext.stop()
sparkSession.close()
sparkContext = null
sqlContext = null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.listFunctions(db, pattern)
}

override def close(): Unit = {
super.close()
client.close()
}

}

object HiveExternalCatalog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
* Create a Hive aware resource loader.
*/
override protected lazy val resourceLoader: HiveSessionResourceLoader = {
val client: HiveClient = externalCatalog.client.newSession()
val client: HiveClient = externalCatalog.client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newSession is to isolate SparkSession in Spark ThriftServer, You must judge whether is thriftserver or not.

new HiveSessionResourceLoader(session, client)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,6 @@ private[hive] trait HiveClient {
/** Used for testing only. Removes all metadata from this instance of Hive. */
def reset(): Unit

/** Close this client. */
def close(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,10 @@ private[hive] class HiveClientImpl(
client.dropDatabase(db, true, false, true)
}
}

def close(): Unit = {
state.close()
}
}

private[hive] object HiveClientImpl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private[hive] class TestHiveSparkSession(
new TestHiveSessionStateBuilder(this, parentSessionState).build()
}

lazy val metadataHive: HiveClient = sharedState.externalCatalog.client.newSession()
lazy val metadataHive: HiveClient = sharedState.externalCatalog.client

override def newSession(): TestHiveSparkSession = {
new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,11 +578,6 @@ class VersionsSuite extends SparkFunSuite with Logging {
client.setError(new PrintStream(new ByteArrayOutputStream()))
}

test(s"$version: newSession") {
val newClient = client.newSession()
assert(newClient != null)
}

test(s"$version: withHiveState and addJar") {
val newClassPath = "."
client.addJar(newClassPath)
Expand Down