From e3a9b0896db7371252a66cc6b84bd7db921268c1 Mon Sep 17 00:00:00 2001 From: cenyuhai <261810726@qq.com> Date: Thu, 1 Dec 2016 18:26:43 +0800 Subject: [PATCH 1/5] set statement state to error after user canceled job --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index aeabd6a15881d..a2ae3a51afd9a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -241,6 +241,8 @@ private[hive] class SparkExecuteStatementOperation( dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray } catch { case e: HiveSQLException => + HiveThriftServer2.listener.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) if (getStatus().getState() == OperationState.CANCELED) { return } else { From 196ab66af1e73454b8b926386654e8498f2d5ce9 Mon Sep 17 00:00:00 2001 From: cenyuhai <261810726@qq.com> Date: Fri, 2 Dec 2016 13:54:23 +0800 Subject: [PATCH 2/5] change statement state by the current state --- .../thriftserver/SparkExecuteStatementOperation.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index a2ae3a51afd9a..ea8e44ed18cd2 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -254,9 +254,17 @@ private[hive] class SparkExecuteStatementOperation( case e: Throwable => val currentState = getStatus().getState() logError(s"Error executing query, currentState $currentState, ", e) - setState(OperationState.ERROR) HiveThriftServer2.listener.onStatementError( statementId, e.getMessage, SparkUtils.exceptionString(e)) + getStatus().getState() match { + case OperationState.INITIALIZED => + setState(OperationState.CLOSED) + case OperationState.CANCELED => + case OperationState.FINISHED => + case _ => + setState(OperationState.ERROR) + } + throw new HiveSQLException(e.toString) } setState(OperationState.FINISHED) From ddb41289d1fe49df322e83bcd56c806468f2d9a5 Mon Sep 17 00:00:00 2001 From: cenyuhai <261810726@qq.com> Date: Tue, 6 Dec 2016 16:28:07 +0800 Subject: [PATCH 3/5] fix bug when transition status from "CLOSED" to "ERROR" --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index ea8e44ed18cd2..f4dc16c24a5ca 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -261,6 +261,7 @@ private[hive] class SparkExecuteStatementOperation( setState(OperationState.CLOSED) case OperationState.CANCELED => case OperationState.FINISHED => + case OperationState.CLOSED => case _ => setState(OperationState.ERROR) } From 501d121cd38580e1e909fd7e2a7ccc19ca8ad928 Mon Sep 17 00:00:00 2001 From: cenyuhai <261810726@qq.com> Date: Tue, 6 Dec 2016 17:30:56 +0800 Subject: [PATCH 4/5] fix bug when transition status from "CLOSED" to "FINISHED" --- .../SparkExecuteStatementOperation.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index f4dc16c24a5ca..695f6b28aa0bd 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -262,14 +262,24 @@ private[hive] class SparkExecuteStatementOperation( case OperationState.CANCELED => case OperationState.FINISHED => case OperationState.CLOSED => + case OperationState.ERROR => case _ => setState(OperationState.ERROR) } throw new HiveSQLException(e.toString) } - setState(OperationState.FINISHED) HiveThriftServer2.listener.onStatementFinish(statementId) + getStatus().getState() match { + case OperationState.INITIALIZED => + setState(OperationState.CLOSED) + case OperationState.CANCELED => + case OperationState.FINISHED => + case OperationState.CLOSED => + case OperationState.ERROR => + case _ => + setState(OperationState.FINISHED) + } } override def cancel(): Unit = { From 40894a10c4b59ff573382c7e4564cd850e132b18 Mon Sep 17 00:00:00 2001 From: cenyuhai <261810726@qq.com> Date: Tue, 6 Dec 2016 19:33:31 +0800 Subject: [PATCH 5/5] fix error when job is canceled, but job is still running --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 695f6b28aa0bd..4ad12bf9fb0ac 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -256,6 +256,9 @@ private[hive] class SparkExecuteStatementOperation( logError(s"Error executing query, currentState $currentState, ", e) HiveThriftServer2.listener.onStatementError( statementId, e.getMessage, SparkUtils.exceptionString(e)) + if (statementId != null) { + sqlContext.sparkContext.cancelJobGroup(statementId) + } getStatus().getState() match { case OperationState.INITIALIZED => setState(OperationState.CLOSED)