diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala index 6d0a506fa94d..6b7e5ee61141 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hive.service.server.HiveServer2 import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD import org.apache.spark.scheduler._ import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState @@ -38,7 +39,7 @@ private[thriftserver] class HiveThriftServer2Listener( kvstore: ElementTrackingStore, sparkConf: SparkConf, server: Option[HiveServer2], - live: Boolean = true) extends SparkListener { + live: Boolean = true) extends SparkListener with Logging { private val sessionList = new ConcurrentHashMap[String, LiveSessionData]() private val executionList = new ConcurrentHashMap[String, LiveExecutionData]() @@ -131,60 +132,83 @@ private[thriftserver] class HiveThriftServer2Listener( updateLiveStore(session) } - private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = { - val session = sessionList.get(e.sessionId) - session.finishTimestamp = e.finishTime - updateStoreWithTriggerEnabled(session) - sessionList.remove(e.sessionId) - } + private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = + Option(sessionList.get(e.sessionId)) match { + case Some(sessionData) => + sessionData.finishTimestamp = e.finishTime + updateStoreWithTriggerEnabled(sessionData) + sessionList.remove(e.sessionId) + case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}") + } private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = { - val info = getOrCreateExecution( + val executionData = getOrCreateExecution( e.id, e.statement, e.sessionId, e.startTime, e.userName) - info.state = ExecutionState.STARTED - executionList.put(e.id, info) - sessionList.get(e.sessionId).totalExecution += 1 - executionList.get(e.id).groupId = e.groupId - updateLiveStore(executionList.get(e.id)) - updateLiveStore(sessionList.get(e.sessionId)) + executionData.state = ExecutionState.STARTED + executionList.put(e.id, executionData) + executionData.groupId = e.groupId + updateLiveStore(executionData) + + Option(sessionList.get(e.sessionId)) match { + case Some(sessionData) => + sessionData.totalExecution += 1 + updateLiveStore(sessionData) + case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}." + + s"Regardless, the operation has been registered.") + } } - private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = { - executionList.get(e.id).executePlan = e.executionPlan - executionList.get(e.id).state = ExecutionState.COMPILED - updateLiveStore(executionList.get(e.id)) - } + private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = + Option(executionList.get(e.id)) match { + case Some(executionData) => + executionData.executePlan = e.executionPlan + executionData.state = ExecutionState.COMPILED + updateLiveStore(executionData) + case None => logWarning(s"onOperationParsed called with unknown operation id: ${e.id}") + } - private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = { - executionList.get(e.id).finishTimestamp = e.finishTime - executionList.get(e.id).state = ExecutionState.CANCELED - updateLiveStore(executionList.get(e.id)) - } + private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = + Option(executionList.get(e.id)) match { + case Some(executionData) => + executionData.finishTimestamp = e.finishTime + executionData.state = ExecutionState.CANCELED + updateLiveStore(executionData) + case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}") + } - private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = { - executionList.get(e.id).finishTimestamp = e.finishTime - executionList.get(e.id).detail = e.errorMsg - executionList.get(e.id).state = ExecutionState.FAILED - updateLiveStore(executionList.get(e.id)) - } + private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = + Option(executionList.get(e.id)) match { + case Some(executionData) => + executionData.finishTimestamp = e.finishTime + executionData.detail = e.errorMsg + executionData.state = ExecutionState.FAILED + updateLiveStore(executionData) + case None => logWarning(s"onOperationError called with unknown operation id: ${e.id}") + } - private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = { - executionList.get(e.id).finishTimestamp = e.finishTime - executionList.get(e.id).state = ExecutionState.FINISHED - updateLiveStore(executionList.get(e.id)) - } + private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = + Option(executionList.get(e.id)) match { + case Some(executionData) => + executionData.finishTimestamp = e.finishTime + executionData.state = ExecutionState.FINISHED + updateLiveStore(executionData) + case None => logWarning(s"onOperationFinished called with unknown operation id: ${e.id}") + } - private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = { - executionList.get(e.id).closeTimestamp = e.closeTime - executionList.get(e.id).state = ExecutionState.CLOSED - updateStoreWithTriggerEnabled(executionList.get(e.id)) - executionList.remove(e.id) - } + private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = + Option(executionList.get(e.id)) match { + case Some(executionData) => + executionData.closeTimestamp = e.closeTime + executionData.state = ExecutionState.CLOSED + updateStoreWithTriggerEnabled(executionData) + executionList.remove(e.id) + case None => logWarning(s"onOperationClosed called with unknown operation id: ${e.id}") + } // Update both live and history stores. Trigger is enabled by default, hence // it will cleanup the entity which exceeds the threshold. diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala new file mode 100644 index 000000000000..05d540d782e3 --- /dev/null +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.thriftserver + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hive.service.cli.OperationHandle +import org.apache.hive.service.cli.operation.{GetCatalogsOperation, OperationManager} +import org.apache.hive.service.cli.session.{HiveSessionImpl, SessionManager} +import org.mockito.Mockito.{mock, verify, when} +import org.mockito.invocation.InvocationOnMock + +import org.apache.spark.SparkFunSuite + +class HiveSessionImplSuite extends SparkFunSuite { + private var session: HiveSessionImpl = _ + private var operationManager: OperationManager = _ + + override def beforeAll() { + super.beforeAll() + + session = new HiveSessionImpl( + ThriftserverShimUtils.testedProtocolVersions.head, + "", + "", + new HiveConf(), + "" + ) + val sessionManager = mock(classOf[SessionManager]) + session.setSessionManager(sessionManager) + operationManager = mock(classOf[OperationManager]) + session.setOperationManager(operationManager) + when(operationManager.newGetCatalogsOperation(session)).thenAnswer( + (_: InvocationOnMock) => { + val operation = mock(classOf[GetCatalogsOperation]) + when(operation.getHandle).thenReturn(mock(classOf[OperationHandle])) + operation + } + ) + + session.open(Map.empty[String, String].asJava) + } + + test("SPARK-31387 - session.close() closes all sessions regardless of thrown exceptions") { + val operationHandle1 = session.getCatalogs + val operationHandle2 = session.getCatalogs + + when(operationManager.closeOperation(operationHandle1)) + .thenThrow(classOf[NullPointerException]) + when(operationManager.closeOperation(operationHandle2)) + .thenThrow(classOf[NullPointerException]) + + session.close() + + verify(operationManager).closeOperation(operationHandle1) + verify(operationManager).closeOperation(operationHandle2) + } +} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala index 075032fa5d09..9a9f574153a0 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala @@ -140,6 +140,23 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(listener.noLiveData()) } + test("SPARK-31387 - listener update methods should not throw exception with unknown input") { + val (statusStore: HiveThriftServer2AppStatusStore, listener: HiveThriftServer2Listener) = + createAppStatusStore(true) + + val unknownSession = "unknown_session" + val unknownOperation = "unknown_operation" + listener.onOtherEvent(SparkListenerThriftServerSessionClosed(unknownSession, 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationStart("id", unknownSession, + "stmt", "groupId", 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationParsed(unknownOperation, "query")) + listener.onOtherEvent(SparkListenerThriftServerOperationCanceled(unknownOperation, 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationError(unknownOperation, + "msg", "trace", 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationFinish(unknownOperation, 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationClosed(unknownOperation, 0)) + } + private def createProperties: Properties = { val properties = new Properties() properties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "groupId") diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 745f385e87f7..e3fb54d9f47e 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -636,7 +636,11 @@ public void close() throws HiveSQLException { acquire(true); // Iterate through the opHandles and close their operations for (OperationHandle opHandle : opHandleSet) { - operationManager.closeOperation(opHandle); + try { + operationManager.closeOperation(opHandle); + } catch (Exception e) { + LOG.warn("Exception is thrown closing operation " + opHandle, e); + } } opHandleSet.clear(); // Cleanup session log directory. @@ -674,11 +678,15 @@ private void cleanupPipeoutFile() { File[] fileAry = new File(lScratchDir).listFiles( (dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout")); - for (File file : fileAry) { - try { - FileUtils.forceDelete(file); - } catch (Exception e) { - LOG.error("Failed to cleanup pipeout file: " + file, e); + if (fileAry == null) { + LOG.error("Unable to access pipeout files in " + lScratchDir); + } else { + for (File file : fileAry) { + try { + FileUtils.forceDelete(file); + } catch (Exception e) { + LOG.error("Failed to cleanup pipeout file: " + file, e); + } } } } diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 14e9c4704c97..1b3e8fe6bfb9 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -650,7 +650,11 @@ public void close() throws HiveSQLException { acquire(true); // Iterate through the opHandles and close their operations for (OperationHandle opHandle : opHandleSet) { - operationManager.closeOperation(opHandle); + try { + operationManager.closeOperation(opHandle); + } catch (Exception e) { + LOG.warn("Exception is thrown closing operation " + opHandle, e); + } } opHandleSet.clear(); // Cleanup session log directory. @@ -688,11 +692,15 @@ private void cleanupPipeoutFile() { File[] fileAry = new File(lScratchDir).listFiles( (dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout")); - for (File file : fileAry) { - try { - FileUtils.forceDelete(file); - } catch (Exception e) { - LOG.error("Failed to cleanup pipeout file: " + file, e); + if (fileAry == null) { + LOG.error("Unable to access pipeout files in " + lScratchDir); + } else { + for (File file : fileAry) { + try { + FileUtils.forceDelete(file); + } catch (Exception e) { + LOG.error("Failed to cleanup pipeout file: " + file, e); + } } } }