Skip to content

Commit 746b2f9

Browse files
fenzhuGitHub Enterprise
authored andcommitted
[HADP-56069] Support writing spark-monitor events to EventLog file in Hermes (apache#654)
1 parent 34c2f4a commit 746b2f9

File tree

1 file changed

+25
-0
lines changed

1 file changed

+25
-0
lines changed

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SessionAwareEventLoggingListener.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,31 @@ private[spark] class SessionAwareEventLoggingListener(
393393
sessionId)
394394
case s: SparkListenerQueryPlanSignature =>
395395
logEvent(event, flushLogger = true, s.sessionId)
396+
case s: SparkListenerEvent if s.getClass.getSimpleName.startsWith("SparkMonitor") =>
397+
try {
398+
val sessionIdField = s.getClass.getDeclaredField("sessionId")
399+
sessionIdField.setAccessible(true)
400+
val sessionId = sessionIdField.get(s).asInstanceOf[String]
401+
logInfo(s.getClass.getCanonicalName + ": sessionId = " + sessionId)
402+
if (sessionId != null && sessionId.nonEmpty) {
403+
logEvent(event, flushLogger = true, sessionId)
404+
} else {
405+
val executionIdField = s.getClass.getDeclaredField("executionId")
406+
executionIdField.setAccessible(true)
407+
val executionId = executionIdField.get(s).asInstanceOf[Long]
408+
if (executionIdToSessionId.containsKey(String.valueOf(executionId))) {
409+
val sessionId = executionIdToSessionId.get(String.valueOf(executionId))
410+
logInfo(s"Flush ${s.getClass.getCanonicalName}: executionId = " +
411+
s"$executionId, sessionId = $sessionId")
412+
logEvent(event, flushLogger = true, sessionId)
413+
} else {
414+
logError(s"Execution $executionId in ${s.getClass.getCanonicalName} " +
415+
s"doesn't belong to any session")
416+
}
417+
}
418+
} catch {
419+
case e: Exception => logError(s"Fail to persist ${s.getClass.getCanonicalName}: $e")
420+
}
396421
case _ =>
397422
}
398423
}

0 commit comments

Comments
 (0)