From 34b7680dfd915449087b8f5541c682b02df5dfbc Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Wed, 20 Dec 2017 09:58:37 +0800 Subject: [PATCH 1/4] [SPARK-22837][SQL]Session timeout checker does not work in SessionManager. --- .../apache/hive/service/cli/session/SessionManager.java | 9 +++++++++ .../sql/hive/thriftserver/SparkSQLSessionManager.scala | 2 ++ 2 files changed, 11 insertions(+) diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java index 859f9c8b449e5..cab126089041f 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java @@ -148,6 +148,15 @@ public synchronized void start() { } } + private void initSessionTimeoutCheckerConfig() { + checkInterval = HiveConf.getTimeVar( + hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS); + sessionTimeout = HiveConf.getTimeVar( + hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); + checkOperation = HiveConf.getBoolVar(hiveConf, + ConfVars.HIVE_SERVER2_IDLE_SESSION_CHECK_OPERATION); + } + private void startTimeoutChecker() { final long interval = Math.max(checkInterval, 3000L); // minimum 3 seconds Runnable timeoutChecker = new Runnable() { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 48c0ebef3e0ce..28c6054e5815d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -42,6 +42,8 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: override def init(hiveConf: HiveConf) { setSuperField(this, "hiveConf", hiveConf) + invoke(classOf[SessionManager], this, "initSessionTimeoutCheckerConfig") + // Create operation log root directory, if operation logging is enabled if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { invoke(classOf[SessionManager], this, "initOperationLogRootDir") From 79fc11ad58136f3f254b68d8fa3b2d8d1dd1942d Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Wed, 17 Jan 2018 11:26:21 +0800 Subject: [PATCH 2/4] fix code review --- .../service/cli/session/SessionManager.java | 45 ++++--------------- .../thriftserver/SparkSQLSessionManager.scala | 17 +------ 2 files changed, 9 insertions(+), 53 deletions(-) diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java index cab126089041f..bc96a545e8f8b 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java @@ -23,11 +23,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; @@ -79,35 +75,19 @@ public synchronized void init(HiveConf hiveConf) { initOperationLogRootDir(); } createBackgroundOperationPool(); - addService(operationManager); - super.init(hiveConf); - } - - private void createBackgroundOperationPool() { - int poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); - LOG.info("HiveServer2: Background operation thread pool size: " + poolSize); - int poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE); - LOG.info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize); - long keepAliveTime = HiveConf.getTimeVar( - hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS); - LOG.info( - "HiveServer2: Background operation thread keepalive time: " + keepAliveTime + " seconds"); - - // Create a thread pool with #poolSize threads - // Threads terminate when they are idle for more than the keepAliveTime - // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize - String threadPoolName = "HiveServer2-Background-Pool"; - backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize, - keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(poolQueueSize), - new ThreadFactoryWithGarbageCleanup(threadPoolName)); - backgroundOperationPool.allowCoreThreadTimeOut(true); - checkInterval = HiveConf.getTimeVar( hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS); sessionTimeout = HiveConf.getTimeVar( hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); checkOperation = HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_CHECK_OPERATION); + addService(operationManager); + } + + private void createBackgroundOperationPool() { + int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); + backgroundOperationPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(backgroundPoolSize); + LOG.info("HiveServer2: Async execution pool size: " + backgroundPoolSize); } private void initOperationLogRootDir() { @@ -148,15 +128,6 @@ public synchronized void start() { } } - private void initSessionTimeoutCheckerConfig() { - checkInterval = HiveConf.getTimeVar( - hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS); - sessionTimeout = HiveConf.getTimeVar( - hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); - checkOperation = HiveConf.getBoolVar(hiveConf, - ConfVars.HIVE_SERVER2_IDLE_SESSION_CHECK_OPERATION); - } - private void startTimeoutChecker() { final long interval = Math.max(checkInterval, 3000L); // minimum 3 seconds Runnable timeoutChecker = new Runnable() { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 28c6054e5815d..c768d4955082d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -40,23 +40,8 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: private lazy val sparkSqlOperationManager = new SparkSQLOperationManager() override def init(hiveConf: HiveConf) { - setSuperField(this, "hiveConf", hiveConf) - - invoke(classOf[SessionManager], this, "initSessionTimeoutCheckerConfig") - - // Create operation log root directory, if operation logging is enabled - if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { - invoke(classOf[SessionManager], this, "initOperationLogRootDir") - } - - val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS) - setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize)) - getAncestorField[Log](this, 3, "LOG").info( - s"HiveServer2: Async execution pool size $backgroundPoolSize") - setSuperField(this, "operationManager", sparkSqlOperationManager) - addService(sparkSqlOperationManager) - + super.init(hiveConf) initCompositeService(hiveConf) } From 5b3c06f07f017c9a7b1dc51a4461fbf1c63ac350 Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Fri, 19 Jan 2018 11:03:48 +0800 Subject: [PATCH 3/4] Keep createBackgroundOperationPool unchanged. Looks like the hive thread pool is more flexible than the spark one. --- .../service/cli/session/SessionManager.java | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java index bc96a545e8f8b..4b98ad042674d 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java @@ -75,19 +75,34 @@ public synchronized void init(HiveConf hiveConf) { initOperationLogRootDir(); } createBackgroundOperationPool(); + addService(operationManager); + } + + private void createBackgroundOperationPool() { + int poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); + LOG.info("HiveServer2: Background operation thread pool size: " + poolSize); + int poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE); + LOG.info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize); + long keepAliveTime = HiveConf.getTimeVar( + hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS); + LOG.info( + "HiveServer2: Background operation thread keepalive time: " + keepAliveTime + " seconds"); + + // Create a thread pool with #poolSize threads + // Threads terminate when they are idle for more than the keepAliveTime + // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize + String threadPoolName = "HiveServer2-Background-Pool"; + backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize, + keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(poolQueueSize), + new ThreadFactoryWithGarbageCleanup(threadPoolName)); + backgroundOperationPool.allowCoreThreadTimeOut(true); + checkInterval = HiveConf.getTimeVar( hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS); sessionTimeout = HiveConf.getTimeVar( hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); checkOperation = HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_CHECK_OPERATION); - addService(operationManager); - } - - private void createBackgroundOperationPool() { - int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); - backgroundOperationPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(backgroundPoolSize); - LOG.info("HiveServer2: Async execution pool size: " + backgroundPoolSize); } private void initOperationLogRootDir() { From 883deb2309a7581178c8de8b53d92ba309f6b4b9 Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Tue, 23 Jan 2018 12:06:23 +0800 Subject: [PATCH 4/4] fix code review --- .../apache/hive/service/cli/session/SessionManager.java | 7 ++++++- .../sql/hive/thriftserver/SparkSQLSessionManager.scala | 1 - 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java index 4b98ad042674d..859f9c8b449e5 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java @@ -23,7 +23,11 @@ import java.util.ArrayList; import java.util.Date; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; @@ -76,6 +80,7 @@ public synchronized void init(HiveConf hiveConf) { } createBackgroundOperationPool(); addService(operationManager); + super.init(hiveConf); } private void createBackgroundOperationPool() { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index c768d4955082d..2958b771f3648 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -42,7 +42,6 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: override def init(hiveConf: HiveConf) { setSuperField(this, "operationManager", sparkSqlOperationManager) super.init(hiveConf) - initCompositeService(hiveConf) } override def openSession(