diff --git a/assembly/pom.xml b/assembly/pom.xml
index 48fe8f46341e..77ff87c17f52 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -204,6 +204,16 @@
+
+ jvm-profiler
+
+
+ org.apache.spark
+ spark-profiler_${scala.binary.version}
+ ${project.version}
+
+
+
bigtop-dist
+
+
+ 4.0.0
+
+ org.apache.spark
+ spark-parent_2.13
+ 4.0.0-SNAPSHOT
+ ../../pom.xml
+
+
+ spark-profiler_2.13
+
+ profiler
+
+ jar
+ Spark Profiler
+ https://spark.apache.org/
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${project.version}
+ provided
+
+
+
+ me.bechberger
+ ap-loader-all
+ 2.9-7
+
+
+
diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala
new file mode 100644
index 000000000000..a5d5b2a1e98d
--- /dev/null
+++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor.profiler
+
+import java.io.{BufferedInputStream, FileInputStream, InputStream, IOException}
+import java.net.URI
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import one.profiler.{AsyncProfiler, AsyncProfilerLoader}
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * A class that enables the async JVM code profiler
+ */
+private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) extends Logging {
+
+ private var running = false
+ private val enableProfiler = conf.get(EXECUTOR_PROFILING_ENABLED)
+ private val profilerOptions = conf.get(EXECUTOR_PROFILING_OPTIONS)
+ private val profilerDfsDir = conf.get(EXECUTOR_PROFILING_DFS_DIR)
+ private val profilerLocalDir = conf.get(EXECUTOR_PROFILING_LOCAL_DIR)
+ private val writeInterval = conf.get(EXECUTOR_PROFILING_WRITE_INTERVAL)
+
+ private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr"
+ private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr"
+ private val dumpcmd = s"dump,$profilerOptions,file=$profilerLocalDir/profile.jfr"
+ private val resumecmd = s"resume,$profilerOptions,file=$profilerLocalDir/profile.jfr"
+
+ private val UPLOAD_SIZE = 8 * 1024 * 1024 // 8 MB
+ private var outputStream: FSDataOutputStream = _
+ private var inputStream: InputStream = _
+ private val dataBuffer = new Array[Byte](UPLOAD_SIZE)
+ private var threadpool: ScheduledExecutorService = _
+ private var writing: Boolean = false
+
+ val profiler: Option[AsyncProfiler] = {
+ Option(
+ if (enableProfiler && AsyncProfilerLoader.isSupported) AsyncProfilerLoader.load() else null
+ )
+ }
+
+ def start(): Unit = {
+ if (!running) {
+ try {
+ profiler.foreach(p => {
+ p.execute(startcmd)
+ logInfo("Executor JVM profiling started.")
+ running = true
+ startWriting()
+ })
+ } catch {
+ case e @ (_: IllegalArgumentException | _: IllegalStateException | _: IOException) =>
+ logError("JVM profiling aborted. Exception occurred in profiler native code: ", e)
+ case e: Exception => logWarning("Executor JVM profiling aborted due to exception: ", e)
+ }
+ }
+ }
+
+ /** Stops the profiling and saves output to dfs location. */
+ def stop(): Unit = {
+ if (running) {
+ profiler.foreach(p => {
+ p.execute(stopcmd)
+ logInfo("JVM profiler stopped")
+ running = false
+ finishWriting()
+ })
+ }
+ }
+
+ private def startWriting(): Unit = {
+ if (profilerDfsDir.isDefined) {
+ val applicationId = try {
+ conf.getAppId
+ } catch {
+ case _: NoSuchElementException => "local-" + System.currentTimeMillis
+ }
+ val config = SparkHadoopUtil.get.newConfiguration(conf)
+ val appName = conf.get("spark.app.name").replace(" ", "-")
+ val profilerOutputDirname = profilerDfsDir.get
+
+ val profileOutputFile =
+ s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId.jfr"
+ val fs = FileSystem.get(new URI(profileOutputFile), config);
+ val filenamePath = new Path(profileOutputFile)
+ outputStream = fs.create(filenamePath)
+ try {
+ if (fs.exists(filenamePath)) {
+ fs.delete(filenamePath, true)
+ }
+ logInfo(s"Copying executor profiling file to $profileOutputFile")
+ inputStream = new BufferedInputStream(new FileInputStream(s"$profilerLocalDir/profile.jfr"))
+ threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread")
+ threadpool.scheduleWithFixedDelay(
+ new Runnable() {
+ override def run(): Unit = writeChunk(false)
+ },
+ writeInterval,
+ writeInterval,
+ TimeUnit.SECONDS)
+ writing = true
+ } catch {
+ case e: Exception =>
+ logError("Failed to start JVM profiler", e)
+ if (threadpool != null) {
+ threadpool.shutdownNow()
+ }
+ if (inputStream != null) {
+ inputStream.close()
+ }
+ if (outputStream != null) {
+ outputStream.close()
+ }
+ }
+ }
+ }
+
+ private def writeChunk(lastChunk: Boolean): Unit = {
+ if (!writing) {
+ return
+ }
+ try {
+ // stop (pause) the profiler, dump the results and then resume. This is not ideal as we miss
+ // the events while the file is being dumped, but that is the only way to make sure that
+ // the chunk of data we are copying to dfs is in a consistent state.
+ profiler.get.execute(stopcmd)
+ profiler.get.execute(dumpcmd)
+ var remaining = inputStream.available()
+ if (!lastChunk) {
+ profiler.get.execute(resumecmd)
+ }
+ while (remaining > 0) {
+ val read = inputStream.read(dataBuffer, 0, math.min(remaining, UPLOAD_SIZE))
+ outputStream.write(dataBuffer, 0, read)
+ remaining -= read
+ }
+ } catch {
+ case e: IOException => logError("Exception occurred while writing some profiler output: ", e)
+ case e @ (_: IllegalArgumentException | _: IllegalStateException) =>
+ logError("Some profiler output not written." +
+ " Exception occurred in profiler native code: ", e)
+ case e: Exception => logError("Some profiler output not written. Unexpected exception: ", e)
+ }
+ }
+
+ private def finishWriting(): Unit = {
+ if (profilerDfsDir.isDefined && writing) {
+ try {
+ // shutdown background writer
+ threadpool.shutdown()
+ threadpool.awaitTermination(30, TimeUnit.SECONDS)
+ // flush remaining data
+ writeChunk(true)
+ inputStream.close()
+ outputStream.close()
+ } catch {
+ case _: InterruptedException => Thread.currentThread().interrupt()
+ case e: IOException =>
+ logWarning("Some profiling output not written." +
+ "Exception occurred while completing profiler output", e)
+ }
+ writing = false
+ }
+ }
+}
diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala
new file mode 100644
index 000000000000..e144092cdecd
--- /dev/null
+++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor.profiler
+
+import java.util.{Map => JMap}
+
+import scala.jdk.CollectionConverters._
+import scala.util.Random
+
+import org.apache.spark.SparkConf
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Spark plugin to do JVM code profiling of executors
+ */
+class ExecutorProfilerPlugin extends SparkPlugin {
+ override def driverPlugin(): DriverPlugin = null
+
+ // No-op
+ override def executorPlugin(): ExecutorPlugin = new JVMProfilerExecutorPlugin
+}
+
+class JVMProfilerExecutorPlugin extends ExecutorPlugin with Logging {
+
+ private var sparkConf: SparkConf = _
+ private var pluginCtx: PluginContext = _
+ private var profiler: ExecutorJVMProfiler = _
+ private var codeProfilingEnabled: Boolean = _
+ private var codeProfilingFraction: Double = _
+ private val rand: Random = new Random(System.currentTimeMillis())
+
+ override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = {
+ pluginCtx = ctx
+ sparkConf = ctx.conf()
+ codeProfilingEnabled = sparkConf.get(EXECUTOR_PROFILING_ENABLED)
+ if (codeProfilingEnabled) {
+ codeProfilingFraction = sparkConf.get(EXECUTOR_PROFILING_FRACTION)
+ if (rand.nextInt(100) * 0.01 < codeProfilingFraction) {
+ logInfo(s"Executor id ${pluginCtx.executorID()} selected for JVM code profiling")
+ profiler = new ExecutorJVMProfiler(sparkConf, pluginCtx.executorID())
+ profiler.start()
+ }
+ }
+ Map.empty[String, String].asJava
+ }
+
+ override def shutdown(): Unit = {
+ logInfo("Executor JVM profiler shutting down")
+ if (profiler != null) {
+ profiler.stop()
+ }
+ }
+}
diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala
new file mode 100644
index 000000000000..f9adec2d4be9
--- /dev/null
+++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.internal.config.ConfigBuilder
+
+package object profiler {
+
+ private[profiler] val EXECUTOR_PROFILING_ENABLED =
+ ConfigBuilder("spark.executor.profiling.enabled")
+ .doc("Turn on code profiling via async_profiler in executors.")
+ .version("4.0.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[profiler] val EXECUTOR_PROFILING_DFS_DIR =
+ ConfigBuilder("spark.executor.profiling.dfsDir")
+ .doc("HDFS compatible file-system path to where the profiler will write output jfr files.")
+ .version("4.0.0")
+ .stringConf
+ .createOptional
+
+ private[profiler] val EXECUTOR_PROFILING_LOCAL_DIR =
+ ConfigBuilder("spark.executor.profiling.localDir")
+ .doc("Local file system path on executor where profiler output is saved. Defaults to the " +
+ "working directory of the executor process.")
+ .version("4.0.0")
+ .stringConf
+ .createWithDefault(".")
+
+ private[profiler] val EXECUTOR_PROFILING_OPTIONS =
+ ConfigBuilder("spark.executor.profiling.options")
+ .doc("Options to pass on to the async profiler.")
+ .version("4.0.0")
+ .stringConf
+ .createWithDefault("event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s")
+
+ private[profiler] val EXECUTOR_PROFILING_FRACTION =
+ ConfigBuilder("spark.executor.profiling.fraction")
+ .doc("Fraction of executors to profile")
+ .version("4.0.0")
+ .doubleConf
+ .checkValue(v => v >= 0.0 && v <= 1.0,
+ "Fraction of executors to profile must be in [0,1]")
+ .createWithDefault(0.1)
+
+ private[profiler] val EXECUTOR_PROFILING_WRITE_INTERVAL =
+ ConfigBuilder("spark.executor.profiling.writeInterval")
+ .doc("Time interval in seconds after which the profiler output will be synced to dfs")
+ .version("4.0.0")
+ .timeConf(TimeUnit.SECONDS)
+ .checkValue(_ >= 0, "Write interval should be non-negative")
+ .createWithDefault(30)
+
+}
diff --git a/pom.xml b/pom.xml
index c65794a25c99..a9b42e19985a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3657,6 +3657,13 @@
+
+ jvm-profiler
+
+ connector/profiler
+
+
+
test-java-home