From d5c946bd8171cd0430bac9627a5db31909accabc Mon Sep 17 00:00:00 2001 From: Ali Smesseim Date: Tue, 12 May 2020 09:14:34 -0700 Subject: [PATCH 1/5] [SPARK-31387] Handle unknown operation/session ID in HiveThriftServer2Listener ### What changes were proposed in this pull request? The update methods in HiveThriftServer2Listener now check if the parameter operation/session ID actually exist in the `sessionList` and `executionList` respectively. This prevents NullPointerExceptions if the operation or session ID is unknown. Instead, a warning is written to the log. Also, in HiveSessionImpl.close(), we catch any exception thrown by `operationManager.closeOperation`. If for any reason this throws an exception, other operations are not prevented from being closed. ### Why are the changes needed? The listener's update methods would throw an exception if the operation or session ID is unknown. In Spark 2, where the listener is called directly, this hampers with the caller's control flow. In Spark 3, the exception is caught by the ListenerBus but results in an uninformative NullPointerException. In HiveSessionImpl.close(), if an exception is thrown when closing an operation, all following operations are not closed. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit tests Closes #28155 from alismess-db/hive-thriftserver-listener-update-safer. Authored-by: Ali Smesseim Signed-off-by: gatorsmile (cherry picked from commit 6994c64efd5770a8fd33220cbcaddc1d96fed886) Signed-off-by: Ali Smesseim --- .../ui/HiveThriftServer2Listener.scala | 120 +++++++++++------- .../thriftserver/HiveSessionImplSuite.scala | 73 +++++++++++ .../ui/HiveThriftServer2ListenerSuite.scala | 16 +++ .../service/cli/session/HiveSessionImpl.java | 6 +- .../service/cli/session/HiveSessionImpl.java | 6 +- 5 files changed, 170 insertions(+), 51 deletions(-) create mode 100644 sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala index 6d0a506fa94d..20a8f2c833c2 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hive.service.server.HiveServer2 import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD import org.apache.spark.scheduler._ import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState @@ -38,7 +39,7 @@ private[thriftserver] class HiveThriftServer2Listener( kvstore: ElementTrackingStore, sparkConf: SparkConf, server: Option[HiveServer2], - live: Boolean = true) extends SparkListener { + live: Boolean = true) extends SparkListener with Logging { private val sessionList = new ConcurrentHashMap[String, LiveSessionData]() private val executionList = new ConcurrentHashMap[String, LiveExecutionData]() @@ -131,60 +132,81 @@ private[thriftserver] class HiveThriftServer2Listener( updateLiveStore(session) } - private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = { - val session = sessionList.get(e.sessionId) - session.finishTimestamp = e.finishTime - updateStoreWithTriggerEnabled(session) - sessionList.remove(e.sessionId) - } + private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = + Option(sessionList.get(e.sessionId)) match { + case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}") + case Some(sessionData) => + val session = sessionData + session.finishTimestamp = e.finishTime + updateStoreWithTriggerEnabled(session) + sessionList.remove(e.sessionId) + } - private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = { - val info = getOrCreateExecution( - e.id, - e.statement, - e.sessionId, - e.startTime, - e.userName) - - info.state = ExecutionState.STARTED - executionList.put(e.id, info) - sessionList.get(e.sessionId).totalExecution += 1 - executionList.get(e.id).groupId = e.groupId - updateLiveStore(executionList.get(e.id)) - updateLiveStore(sessionList.get(e.sessionId)) - } + private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = + Option(sessionList.get(e.sessionId)) match { + case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}") + case Some(sessionData) => + val info = getOrCreateExecution( + e.id, + e.statement, + e.sessionId, + e.startTime, + e.userName) + + info.state = ExecutionState.STARTED + executionList.put(e.id, info) + sessionData.totalExecution += 1 + executionList.get(e.id).groupId = e.groupId + updateLiveStore(executionList.get(e.id)) + updateLiveStore(sessionData) + } - private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = { - executionList.get(e.id).executePlan = e.executionPlan - executionList.get(e.id).state = ExecutionState.COMPILED - updateLiveStore(executionList.get(e.id)) - } + private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = + Option(executionList.get(e.id)) match { + case None => logWarning(s"onOperationParsed called with unknown operation id: ${e.id}") + case Some(executionData) => + executionData.executePlan = e.executionPlan + executionData.state = ExecutionState.COMPILED + updateLiveStore(executionData) + } - private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = { - executionList.get(e.id).finishTimestamp = e.finishTime - executionList.get(e.id).state = ExecutionState.CANCELED - updateLiveStore(executionList.get(e.id)) - } + private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = + Option(executionList.get(e.id)) match { + case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}") + case Some(executionData) => + executionData.finishTimestamp = e.finishTime + executionData.state = ExecutionState.CANCELED + updateLiveStore(executionData) + } - private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = { - executionList.get(e.id).finishTimestamp = e.finishTime - executionList.get(e.id).detail = e.errorMsg - executionList.get(e.id).state = ExecutionState.FAILED - updateLiveStore(executionList.get(e.id)) - } + private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = + Option(executionList.get(e.id)) match { + case None => logWarning(s"onOperationError called with unknown operation id: ${e.id}") + case Some(executionData) => + executionData.finishTimestamp = e.finishTime + executionData.detail = e.errorMsg + executionData.state = ExecutionState.FAILED + updateLiveStore(executionData) + } - private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = { - executionList.get(e.id).finishTimestamp = e.finishTime - executionList.get(e.id).state = ExecutionState.FINISHED - updateLiveStore(executionList.get(e.id)) - } + private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = + Option(executionList.get(e.id)) match { + case None => logWarning(s"onOperationFinished called with unknown operation id: ${e.id}") + case Some(executionData) => + executionData.finishTimestamp = e.finishTime + executionData.state = ExecutionState.FINISHED + updateLiveStore(executionData) + } - private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = { - executionList.get(e.id).closeTimestamp = e.closeTime - executionList.get(e.id).state = ExecutionState.CLOSED - updateStoreWithTriggerEnabled(executionList.get(e.id)) - executionList.remove(e.id) - } + private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = + Option(executionList.get(e.id)) match { + case None => logWarning(s"onOperationClosed called with unknown operation id: ${e.id}") + case Some(executionData) => + executionData.closeTimestamp = e.closeTime + executionData.state = ExecutionState.CLOSED + updateStoreWithTriggerEnabled(executionData) + executionList.remove(e.id) + } // Update both live and history stores. Trigger is enabled by default, hence // it will cleanup the entity which exceeds the threshold. diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala new file mode 100644 index 000000000000..05d540d782e3 --- /dev/null +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.thriftserver + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hive.service.cli.OperationHandle +import org.apache.hive.service.cli.operation.{GetCatalogsOperation, OperationManager} +import org.apache.hive.service.cli.session.{HiveSessionImpl, SessionManager} +import org.mockito.Mockito.{mock, verify, when} +import org.mockito.invocation.InvocationOnMock + +import org.apache.spark.SparkFunSuite + +class HiveSessionImplSuite extends SparkFunSuite { + private var session: HiveSessionImpl = _ + private var operationManager: OperationManager = _ + + override def beforeAll() { + super.beforeAll() + + session = new HiveSessionImpl( + ThriftserverShimUtils.testedProtocolVersions.head, + "", + "", + new HiveConf(), + "" + ) + val sessionManager = mock(classOf[SessionManager]) + session.setSessionManager(sessionManager) + operationManager = mock(classOf[OperationManager]) + session.setOperationManager(operationManager) + when(operationManager.newGetCatalogsOperation(session)).thenAnswer( + (_: InvocationOnMock) => { + val operation = mock(classOf[GetCatalogsOperation]) + when(operation.getHandle).thenReturn(mock(classOf[OperationHandle])) + operation + } + ) + + session.open(Map.empty[String, String].asJava) + } + + test("SPARK-31387 - session.close() closes all sessions regardless of thrown exceptions") { + val operationHandle1 = session.getCatalogs + val operationHandle2 = session.getCatalogs + + when(operationManager.closeOperation(operationHandle1)) + .thenThrow(classOf[NullPointerException]) + when(operationManager.closeOperation(operationHandle2)) + .thenThrow(classOf[NullPointerException]) + + session.close() + + verify(operationManager).closeOperation(operationHandle1) + verify(operationManager).closeOperation(operationHandle2) + } +} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala index 075032fa5d09..ea2523dfb581 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala @@ -140,6 +140,22 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(listener.noLiveData()) } + test("SPARK-31387 - listener update methods should not throw exception with unknown input") { + val (statusStore: HiveThriftServer2AppStatusStore, + listener: HiveThriftServer2Listener) = createAppStatusStore(true) + val unknownSession = "unknown_session" + val unknownOperation = "unknown_operation" + listener.onOtherEvent(SparkListenerThriftServerSessionClosed(unknownSession, 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationStart("id", unknownSession, + "stmt", "groupId", 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationParsed(unknownOperation, "query")) + listener.onOtherEvent(SparkListenerThriftServerOperationCanceled(unknownOperation, 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationError(unknownOperation, + "msg", "trace", 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationFinish(unknownOperation, 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationClosed(unknownOperation, 0)) + } + private def createProperties: Properties = { val properties = new Properties() properties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "groupId") diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 745f385e87f7..3e2c3def48d8 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -636,7 +636,11 @@ public void close() throws HiveSQLException { acquire(true); // Iterate through the opHandles and close their operations for (OperationHandle opHandle : opHandleSet) { - operationManager.closeOperation(opHandle); + try { + operationManager.closeOperation(opHandle); + } catch (Exception e) { + LOG.warn("Exception is thrown closing operation " + opHandle, e); + } } opHandleSet.clear(); // Cleanup session log directory. diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 14e9c4704c97..5cdae00460e3 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -650,7 +650,11 @@ public void close() throws HiveSQLException { acquire(true); // Iterate through the opHandles and close their operations for (OperationHandle opHandle : opHandleSet) { - operationManager.closeOperation(opHandle); + try { + operationManager.closeOperation(opHandle); + } catch (Exception e) { + LOG.warn("Exception is thrown closing operation " + opHandle, e); + } } opHandleSet.clear(); // Cleanup session log directory. From cae817aebb773a4e766544d237d3b4068a881b02 Mon Sep 17 00:00:00 2001 From: Ali Smesseim Date: Mon, 18 May 2020 12:09:54 +0200 Subject: [PATCH 2/5] Order None pattern after Some --- .../ui/HiveThriftServer2Listener.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala index 20a8f2c833c2..b972d8dfbcbe 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala @@ -134,17 +134,15 @@ private[thriftserver] class HiveThriftServer2Listener( private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = Option(sessionList.get(e.sessionId)) match { - case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}") case Some(sessionData) => - val session = sessionData - session.finishTimestamp = e.finishTime - updateStoreWithTriggerEnabled(session) + sessionData.finishTimestamp = e.finishTime + updateStoreWithTriggerEnabled(sessionData) sessionList.remove(e.sessionId) + case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}") } private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = Option(sessionList.get(e.sessionId)) match { - case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}") case Some(sessionData) => val info = getOrCreateExecution( e.id, @@ -159,53 +157,54 @@ private[thriftserver] class HiveThriftServer2Listener( executionList.get(e.id).groupId = e.groupId updateLiveStore(executionList.get(e.id)) updateLiveStore(sessionData) + case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}") } private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = Option(executionList.get(e.id)) match { - case None => logWarning(s"onOperationParsed called with unknown operation id: ${e.id}") case Some(executionData) => executionData.executePlan = e.executionPlan executionData.state = ExecutionState.COMPILED updateLiveStore(executionData) + case None => logWarning(s"onOperationParsed called with unknown operation id: ${e.id}") } private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = Option(executionList.get(e.id)) match { - case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}") case Some(executionData) => executionData.finishTimestamp = e.finishTime executionData.state = ExecutionState.CANCELED updateLiveStore(executionData) + case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}") } private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = Option(executionList.get(e.id)) match { - case None => logWarning(s"onOperationError called with unknown operation id: ${e.id}") case Some(executionData) => executionData.finishTimestamp = e.finishTime executionData.detail = e.errorMsg executionData.state = ExecutionState.FAILED updateLiveStore(executionData) + case None => logWarning(s"onOperationError called with unknown operation id: ${e.id}") } private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = Option(executionList.get(e.id)) match { - case None => logWarning(s"onOperationFinished called with unknown operation id: ${e.id}") case Some(executionData) => executionData.finishTimestamp = e.finishTime executionData.state = ExecutionState.FINISHED updateLiveStore(executionData) + case None => logWarning(s"onOperationFinished called with unknown operation id: ${e.id}") } private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = Option(executionList.get(e.id)) match { - case None => logWarning(s"onOperationClosed called with unknown operation id: ${e.id}") case Some(executionData) => executionData.closeTimestamp = e.closeTime executionData.state = ExecutionState.CLOSED updateStoreWithTriggerEnabled(executionData) executionList.remove(e.id) + case None => logWarning(s"onOperationClosed called with unknown operation id: ${e.id}") } // Update both live and history stores. Trigger is enabled by default, hence From aa82589fa69ec147fe62e1a287677b3f00e8f96d Mon Sep 17 00:00:00 2001 From: Ali Smesseim Date: Mon, 18 May 2020 12:13:50 +0200 Subject: [PATCH 3/5] Add failsafe in case of missing pipeout dir --- .../hive/service/cli/session/HiveSessionImpl.java | 14 +++++++++----- .../hive/service/cli/session/HiveSessionImpl.java | 14 +++++++++----- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 3e2c3def48d8..e3fb54d9f47e 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -678,11 +678,15 @@ private void cleanupPipeoutFile() { File[] fileAry = new File(lScratchDir).listFiles( (dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout")); - for (File file : fileAry) { - try { - FileUtils.forceDelete(file); - } catch (Exception e) { - LOG.error("Failed to cleanup pipeout file: " + file, e); + if (fileAry == null) { + LOG.error("Unable to access pipeout files in " + lScratchDir); + } else { + for (File file : fileAry) { + try { + FileUtils.forceDelete(file); + } catch (Exception e) { + LOG.error("Failed to cleanup pipeout file: " + file, e); + } } } } diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 5cdae00460e3..1b3e8fe6bfb9 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -692,11 +692,15 @@ private void cleanupPipeoutFile() { File[] fileAry = new File(lScratchDir).listFiles( (dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout")); - for (File file : fileAry) { - try { - FileUtils.forceDelete(file); - } catch (Exception e) { - LOG.error("Failed to cleanup pipeout file: " + file, e); + if (fileAry == null) { + LOG.error("Unable to access pipeout files in " + lScratchDir); + } else { + for (File file : fileAry) { + try { + FileUtils.forceDelete(file); + } catch (Exception e) { + LOG.error("Failed to cleanup pipeout file: " + file, e); + } } } } From f3831df272b92bd15c63828ab2987d14982e9ac8 Mon Sep 17 00:00:00 2001 From: Ali Smesseim Date: Wed, 20 May 2020 12:44:31 +0200 Subject: [PATCH 4/5] Fix indentation --- .../thriftserver/ui/HiveThriftServer2ListenerSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala index ea2523dfb581..9a9f574153a0 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala @@ -141,8 +141,9 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter { } test("SPARK-31387 - listener update methods should not throw exception with unknown input") { - val (statusStore: HiveThriftServer2AppStatusStore, - listener: HiveThriftServer2Listener) = createAppStatusStore(true) + val (statusStore: HiveThriftServer2AppStatusStore, listener: HiveThriftServer2Listener) = + createAppStatusStore(true) + val unknownSession = "unknown_session" val unknownOperation = "unknown_operation" listener.onOtherEvent(SparkListenerThriftServerSessionClosed(unknownSession, 0)) From ac3a8815f1bbc42dfad84086a589f529d9801bff Mon Sep 17 00:00:00 2001 From: Ali Smesseim Date: Wed, 20 May 2020 12:45:34 +0200 Subject: [PATCH 5/5] Separate onOperationStart into execution creation and guarded session update --- .../ui/HiveThriftServer2Listener.scala | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala index b972d8dfbcbe..6b7e5ee61141 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala @@ -141,24 +141,27 @@ private[thriftserver] class HiveThriftServer2Listener( case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}") } - private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = + private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = { + val executionData = getOrCreateExecution( + e.id, + e.statement, + e.sessionId, + e.startTime, + e.userName) + + executionData.state = ExecutionState.STARTED + executionList.put(e.id, executionData) + executionData.groupId = e.groupId + updateLiveStore(executionData) + Option(sessionList.get(e.sessionId)) match { case Some(sessionData) => - val info = getOrCreateExecution( - e.id, - e.statement, - e.sessionId, - e.startTime, - e.userName) - - info.state = ExecutionState.STARTED - executionList.put(e.id, info) sessionData.totalExecution += 1 - executionList.get(e.id).groupId = e.groupId - updateLiveStore(executionList.get(e.id)) updateLiveStore(sessionData) - case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}") + case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}." + + s"Regardless, the operation has been registered.") } + } private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = Option(executionList.get(e.id)) match {