Skip to content

Commit 2ddd418

Browse files
fenzhuGitHub Enterprise
authored andcommitted
[CARMEL-6632] Fix the running time of download statement in query log (#1275)
1 parent 9e899c1 commit 2ddd418

File tree

1 file changed

+20
-23
lines changed

1 file changed

+20
-23
lines changed

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkDownloadDataOperation.scala

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -143,15 +143,6 @@ private[hive] class SparkDownloadDataOperation(
143143
if (getStatus.getState eq OperationState.FINISHED) {
144144
HiveThriftServer2.eventManager.onStatementFinish(statementId)
145145
}
146-
HiveThriftServer2.listener.onQueryExit(
147-
SparkListenerThriftServerQueryExit(
148-
statementId,
149-
QueryLogObjectList(Option(result).map(_.queryExecution)),
150-
QueryLogExtInfo(
151-
finalQueryStateId,
152-
failDetails,
153-
System.currentTimeMillis(),
154-
false, totalDataSize)))
155146
logInfo(s"CLOSING $statementId")
156147
cleanup(OperationState.CLOSED)
157148
sqlContext.sparkContext.clearJobGroup()
@@ -324,6 +315,16 @@ private[hive] class SparkDownloadDataOperation(
324315
setOperationException(exception)
325316
} finally {
326317
TaskSchedulerImpl.activeSqls.decrementAndGet()
318+
logInfo("Post data to query audit log for download operation")
319+
HiveThriftServer2.listener.onQueryExit(
320+
SparkListenerThriftServerQueryExit(
321+
statementId,
322+
QueryLogObjectList(Option(result).map(_.queryExecution)),
323+
QueryLogExtInfo(
324+
finalQueryStateId,
325+
failDetails,
326+
System.currentTimeMillis(),
327+
false, totalDataSize)))
327328
}
328329
}
329330

@@ -609,21 +610,17 @@ private[hive] class SparkDownloadDataOperation(
609610
}
610611

611612
override def cancel(): Unit = {
612-
failDetails = s"Canceling operation, stack trace:\n" +
613-
Thread.currentThread().getStackTrace.mkString("\n")
614-
if (statementId != null) {
615-
HiveThriftServer2.eventManager.onStatementCanceled(statementId, failDetails)
613+
synchronized {
614+
if (!getStatus.getState.isTerminal) {
615+
logInfo(s"Cancel download with $statementId")
616+
failDetails = s"Canceling operation, stack trace:\n" +
617+
Thread.currentThread().getStackTrace.mkString("\n")
618+
if (statementId != null) {
619+
HiveThriftServer2.eventManager.onStatementCanceled(statementId, failDetails)
620+
}
621+
cleanup(OperationState.CANCELED)
622+
}
616623
}
617-
HiveThriftServer2.listener.onQueryExit(
618-
SparkListenerThriftServerQueryExit(
619-
statementId,
620-
QueryLogObjectList(Option(result).map(_.queryExecution)),
621-
QueryLogExtInfo(
622-
ExecutionState.CANCELED.id,
623-
failDetails,
624-
System.currentTimeMillis(),
625-
false, totalDataSize)))
626-
cleanup(OperationState.CANCELED)
627624
}
628625

629626
private def withRetry[T](f: => T): T = {

0 commit comments

Comments
 (0)