From 0db4d60a317af2a2f434fc237a62eca03b67a9df Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Sat, 26 Aug 2023 19:55:03 -0700 Subject: [PATCH 1/4] [SPARK-46094][CONNECT] Add support for code profiling executors This adds support for the async profiler to Spark Profiling of JVM applications on a cluster is cumbersome and it can be complicated to save the output of the profiler especially if the cluster is on K8s where the executor pods are removed and any files saved to the local file system become inaccessible. This feature makes it simple to turn profiling on/off, includes the jar/binaries needed for profiling, and makes it simple to save output to an HDFS location. This PR introduces three new configuration parameters. These are described in the documentation. --- assembly/pom.xml | 10 + connector/profiler/README.md | 86 +++++++++ connector/profiler/pom.xml | 50 +++++ .../spark/executor/ExecutorCodeProfiler.scala | 173 ++++++++++++++++++ .../executor/ExecutorProfilerPlugin.scala | 75 ++++++++ .../spark/internal/config/package.scala | 38 ++++ pom.xml | 7 + 7 files changed, 439 insertions(+) create mode 100644 connector/profiler/README.md create mode 100644 connector/profiler/pom.xml create mode 100644 connector/profiler/src/main/scala/org/apache/spark/executor/ExecutorCodeProfiler.scala create mode 100644 connector/profiler/src/main/scala/org/apache/spark/executor/ExecutorProfilerPlugin.scala diff --git a/assembly/pom.xml b/assembly/pom.xml index 48fe8f46341e..4d3b1b43447c 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -204,6 +204,16 @@ + + code-profiler + + + org.apache.spark + spark-profiler_${scala.binary.version} + ${project.version} + + + bigtop-dist + + + 4.0.0 + + org.apache.spark + spark-parent_2.13 + 4.0.0-SNAPSHOT + ../../pom.xml + + + spark-profiler_2.13 + + profiler + + jar + Spark Profiler + https://spark.apache.org/ + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + provided + + + + me.bechberger + ap-loader-all + 2.9-7 + + + diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/ExecutorCodeProfiler.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/ExecutorCodeProfiler.scala new file mode 100644 index 000000000000..f688830eef6e --- /dev/null +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/ExecutorCodeProfiler.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import java.io.{BufferedInputStream, FileInputStream, InputStream} +import java.net.URI +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import one.profiler.{AsyncProfiler, AsyncProfilerLoader} +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.util.ThreadUtils + + +/** + * A class that enables the async code profiler + * + */ +private[spark] class ExecutorCodeProfiler(conf: SparkConf, executorId: String) extends Logging { + + private var running = false + private val enableProfiler = conf.get(EXECUTOR_CODE_PROFILING_ENABLED) + private val profilerOptions = conf.get(EXECUTOR_CODE_PROFILING_OPTIONS) + private val profilerOutputDir = conf.get(EXECUTOR_CODE_PROFILING_OUTPUT_DIR) + private val profilerLocalDir = conf.get(EXECUTOR_CODE_PROFILING_LOCAL_DIR) + + private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr" + private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr" + private val dumpcmd = s"dump,$profilerOptions,file=$profilerLocalDir/profile.jfr" + private val resumecmd = s"resume,$profilerOptions,file=$profilerLocalDir/profile.jfr" + + private val UPLOAD_SIZE = 8 * 1024 * 1024 // 8 MB + private val WRITE_INTERVAL = 30 // seconds + private var outputStream: FSDataOutputStream = _ + private var inputStream: InputStream = _ + private val dataBuffer = new Array[Byte](UPLOAD_SIZE) + private var threadpool: ScheduledExecutorService = _ + private var writing: Boolean = false + + val profiler: AsyncProfiler = if (enableProfiler) { + if (AsyncProfilerLoader.isSupported) { + AsyncProfilerLoader.load() + } else { + logWarning("Executor code profiling is enabled but is not supported for this platform") + null + } + } else { + null + } + + def start(): Unit = { + if (profiler != null && !running) { + logInfo("Executor code profiling starting.") + try { + profiler.execute(startcmd) + } catch { + case e: Exception => + logWarning("Executor code profiling aborted due to exception: ", e) + return + } + logInfo("Executor code profiling started.") + running = true + } + startWriting() + } + + /** Stops the profiling and saves output to hdfs location. */ + def stop(): Unit = { + if (profiler != null && running) { + profiler.execute(stopcmd) + logInfo("Code profiler stopped") + running = false + finishWriting() + } + } + + private def startWriting(): Unit = { + if (profilerOutputDir.isDefined) { + val applicationId = conf.getAppId + val config = SparkHadoopUtil.get.newConfiguration(conf) + val appName = conf.get("spark.app.name"); + val profilerOutputDirname = profilerOutputDir.get + val profileOutputFile = + s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId.jfr" + val fs = FileSystem.get(new URI(profileOutputFile), config); + val filenamePath = new Path(profileOutputFile) + outputStream = fs.create(filenamePath) + try { + if (fs.exists(filenamePath)) { + fs.delete(filenamePath, true) + } + logInfo(s"Copying executor profiling file to $profileOutputFile") + inputStream = new BufferedInputStream(new FileInputStream(s"$profilerLocalDir/profile.jfr")) + threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread") + threadpool.scheduleWithFixedDelay(new Runnable() { + override def run(): Unit = writeChunk() + }, WRITE_INTERVAL, WRITE_INTERVAL, + TimeUnit.SECONDS) + writing = true + } catch { + case e: Exception => + logError("Failed to start code profiler", e) + if (threadpool != null) { + threadpool.shutdownNow() + } + if (inputStream != null) { + inputStream.close() + } + if (outputStream != null) { + outputStream.close() + } + } + } + } + + private def writeChunk(): Unit = { + if (!writing) { + return + } + try { + // stop (pause) the profiler, dump the results and then resume. This is not ideal as we miss + // the events while the file is being dumped, but that is the only way to make sure that + // the chunk of data we are copying to dfs is in a consistent state. + profiler.execute(stopcmd) + profiler.execute(dumpcmd) + var remaining = inputStream.available() + profiler.execute(resumecmd) + while (remaining > 0) { + val read = inputStream.read(dataBuffer, 0, math.min(remaining, UPLOAD_SIZE)) + outputStream.write(dataBuffer, 0, read) + remaining -= read + } + } catch { + case e: Exception => logError("Exception occurred while writing profiler output", e) + } + } + + private def finishWriting(): Unit = { + if (profilerOutputDir.isDefined && writing) { + try { + // shutdown background writer + threadpool.shutdown() + threadpool.awaitTermination(30, TimeUnit.SECONDS) + // flush remaining data + writeChunk() + inputStream.close() + outputStream.close() + } catch { + case e: Exception => + logError("Exception in completing profiler output", e) + } + writing = false + } + } +} diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/ExecutorProfilerPlugin.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/ExecutorProfilerPlugin.scala new file mode 100644 index 000000000000..56c4e529c72b --- /dev/null +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/ExecutorProfilerPlugin.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import java.util.{Map => JMap} + +import scala.jdk.CollectionConverters._ +import scala.util.Random + +import org.apache.spark.SparkConf +import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{EXECUTOR_CODE_PROFILING_ENABLED, EXECUTOR_CODE_PROFILING_FRACTION} + + +/** + * Spark plugin to do code profiling of executors + * + */ +class ExecutorProfilerPlugin extends SparkPlugin { + override def driverPlugin(): DriverPlugin = null + + // No-op + override def executorPlugin(): ExecutorPlugin = new CodeProfilerExecutorPlugin +} + +class CodeProfilerExecutorPlugin extends ExecutorPlugin with Logging { + + private var sparkConf: SparkConf = _ + private var pluginCtx: PluginContext = _ + private var profiler: ExecutorCodeProfiler = _ + private var codeProfilingEnabled: Boolean = _ + private var codeProfilingFraction: Double = _ + private val rand: Random = new Random(System.currentTimeMillis()) + + override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = { + pluginCtx = ctx + sparkConf = ctx.conf() + codeProfilingEnabled = sparkConf.get(EXECUTOR_CODE_PROFILING_ENABLED) + codeProfilingFraction = sparkConf.get(EXECUTOR_CODE_PROFILING_FRACTION) + + if (codeProfilingEnabled) { + if (rand.nextInt(100) * 0.01 < codeProfilingFraction) { + logInfo(s"Executor id ${pluginCtx.executorID()} selected for code profiling") + profiler = new ExecutorCodeProfiler(sparkConf, pluginCtx.executorID()) + profiler.start() + } + } + Map.empty[String, String].asJava + } + + override def shutdown(): Unit = { + + logInfo("Executor code profiler shutting down") + if (profiler != null) { + profiler.stop() + } + } + + +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index b2bf30863a91..ddf58c4d47b1 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -799,6 +799,44 @@ package object config { .intConf .createOptional + private[spark] val EXECUTOR_CODE_PROFILING_ENABLED = + ConfigBuilder("spark.executor.profiling.enabled") + .doc("Turn on code profiling via async_profiler in executors.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + + private[spark] val EXECUTOR_CODE_PROFILING_OUTPUT_DIR = + ConfigBuilder("spark.executor.profiling.outputDir") + .doc("HDFS compatible file-system path to where the profiler will write output jfr files.") + .version("4.0.0") + .stringConf + .createOptional + + private[spark] val EXECUTOR_CODE_PROFILING_LOCAL_DIR = + ConfigBuilder("spark.executor.profiling.localDir") + .doc("Local file system path on executor where profiler output is saved. Defaults to the " + + "working directory of the executor process.") + .version("4.0.0") + .stringConf + .createWithDefault(".") + + private[spark] val EXECUTOR_CODE_PROFILING_OPTIONS = + ConfigBuilder("spark.executor.profiling.options") + .doc("Options to pass on to the async profiler.") + .version("4.0.0") + .stringConf + .createWithDefault("event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s") + + private[spark] val EXECUTOR_CODE_PROFILING_FRACTION = + ConfigBuilder("spark.executor.profiling.fraction") + .doc("Fraction of executors to profile") + .version("4.0.0") + .doubleConf + .checkValue(v => v >= 0.0 && v < 1.0, + "Fraction of executors to profile must be in [0,1)") + .createWithDefault(0.1) + private[spark] val PY_FILES = ConfigBuilder("spark.yarn.dist.pyFiles") .internal() .version("2.2.1") diff --git a/pom.xml b/pom.xml index c65794a25c99..0673aa0629ef 100644 --- a/pom.xml +++ b/pom.xml @@ -3657,6 +3657,13 @@ + + code-profiler + + connector/profiler + + + test-java-home From da9e12d03b2dea98641bf18f8ef693f8446a3453 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Thu, 30 Nov 2023 10:58:54 -0800 Subject: [PATCH 2/4] Address review comments --- assembly/pom.xml | 2 +- connector/profiler/README.md | 61 +++++++++++------ .../ExecutorJVMProfiler.scala} | 10 ++- .../ExecutorProfilerPlugin.scala | 11 +-- .../spark/executor/profiler/package.scala | 68 +++++++++++++++++++ .../spark/internal/config/package.scala | 38 ----------- pom.xml | 2 +- 7 files changed, 119 insertions(+), 73 deletions(-) rename connector/profiler/src/main/scala/org/apache/spark/executor/{ExecutorCodeProfiler.scala => profiler/ExecutorJVMProfiler.scala} (95%) rename connector/profiler/src/main/scala/org/apache/spark/executor/{ => profiler}/ExecutorProfilerPlugin.scala (89%) create mode 100644 connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala diff --git a/assembly/pom.xml b/assembly/pom.xml index 4d3b1b43447c..77ff87c17f52 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -205,7 +205,7 @@ - code-profiler + jvm-profiler org.apache.spark diff --git a/connector/profiler/README.md b/connector/profiler/README.md index 993edd8583e8..18332748c26a 100644 --- a/connector/profiler/README.md +++ b/connector/profiler/README.md @@ -1,15 +1,15 @@ -# Spark Code Profiler Plugin +# Spark JVM Profiler Plugin ## Build -To build -``` - ./build/mvn clean package -P code-profiler +To build +``` + ./build/mvn clean package -DskipTests -Pjvm-profiler ``` ## Executor Code Profiling -The spark profiler module enables code profiling of executors in cluster mode based on the the [async profiler](https://github.com/async-profiler/async-profiler/blob/master/README.md), a low overhead sampling profiler. This allows a Spark application to capture CPU and memory profiles for application running on a cluster which can later be analyzed for performance issues. The profiler captures [Java Flight Recorder (jfr)](https://developers.redhat.com/blog/2020/08/25/get-started-with-jdk-flight-recorder-in-openjdk-8u#) files for each executor; these can be read by many tools including Java Mission Control and Intellij. +The spark profiler module enables code profiling of executors in cluster mode based on the the [async profiler](https://github.com/async-profiler/async-profiler/blob/v2.10/README.md), a low overhead sampling profiler. This allows a Spark application to capture CPU and memory profiles for application running on a cluster which can later be analyzed for performance issues. The profiler captures [Java Flight Recorder (jfr)](https://access.redhat.com/documentation/es-es/red_hat_build_of_openjdk/17/html/using_jdk_flight_recorder_with_red_hat_build_of_openjdk/openjdk-flight-recorded-overview) files for each executor; these can be read by many tools including Java Mission Control and Intellij. The profiler writes the jfr files to the executor's working directory in the executor's local file system and the files can grow to be large so it is advisable that the executor machines have adequate storage. The profiler can be configured to copy the jfr files to a hdfs location before the executor shuts down. @@ -20,7 +20,7 @@ Code profiling is currently only supported for * Linux (musl, x64) * MacOS -To get maximum profiling information set the following jvm options for the executor - +To get maximum profiling information set the following jvm options for the executor : ``` -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:+PreserveFramePointer @@ -32,7 +32,7 @@ For more information on async_profiler see the [Async Profiler Manual](https://k To enable code profiling, first enable the code profiling plugin via ``` -spark.plugins=org.apache.spark.executor.ExecutorProfilerPlugin +spark.plugins=org.apache.spark.executor.profiler.ExecutorProfilerPlugin ``` Then enable the profiling in the configuration. @@ -44,20 +44,18 @@ Then enable the profiling in the configuration. Property NameDefaultMeaningSince Version spark.executor.profiling.enabled + false - false - - - If true, will enable code profiling + If true, will enable code profiling 4.0.0 - spark.executor.profiling.outputDir - + spark.executor.profiling.dfsDir + (none) - An hdfs compatible path to which the profiler's output files are copied. The output files will be written as outputDir/application_id/profile-appname-exec-executor_id.jfr
- If no outputDir is specified then the files are not copied over. + An HDFS compatible path to which the profiler's output files are copied. The output files will be written as dfsDir/application_id/profile-appname-exec-executor_id.jfr
+ If no dfsDir is specified then the files are not copied over. Users should ensure there is sufficient disk space available otherwise it may lead to corrupt jfr files. 4.0.0 @@ -65,7 +63,7 @@ Then enable the profiling in the configuration. spark.executor.profiling.localDir . i.e. the executor's working dir - The local directory in the executor container to write the jfr files to. If not specified the file will be written to the executor's working directory. + The local directory in the executor container to write the jfr files to. If not specified the file will be written to the executor's working directory. Users should ensure there is sufficient disk space available on the system as running out of space may result in corrupt jfr file and even cause jobs to fail on systems like K8s. 4.0.0 @@ -73,14 +71,39 @@ Then enable the profiling in the configuration. spark.executor.profiling.fraction 0.10 - The fraction of executors on which to enable code profiling. The executors to be profiled are picked at random. + The fraction of executors on which to enable code profiling. The executors to be profiled are picked at random. + + 4.0.0 + + + spark.executor.profiling.writeInterval + 30 + + Time interval, in seconds, after which the profiler output will be synced to dfs. 4.0.0 ### Kubernetes -On Kubernetes, spark will try to shut down the executor pods while the profiler files are still being saved. To prevent this set +On Kubernetes, spark will try to shut down the executor pods while the profiler files are still being saved. To prevent this set ``` spark.kubernetes.executor.deleteOnTermination=false -``` \ No newline at end of file +``` + +### Example +``` +./bin/spark-submit \ + --class \ + --master \ + --deploy-mode \ + -c spark.executor.extraJavaOptions="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:+PreserveFramePointer" \ + -c spark.plugins=org.apache.spark.executor.profiler.ExecutorProfilerPlugin \ + -c spark.executor.profiling.enabled=true \ + -c spark.executor.profiling.outputDir=s3a://my-bucket/spark/profiles/ \ + -c spark.executor.profiling.options=event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s \ + -c spark.executor.profiling.fraction=0.10 \ + -c spark.kubernetes.executor.deleteOnTermination=false \ + \ + [application-arguments] +``` diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/ExecutorCodeProfiler.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala similarity index 95% rename from connector/profiler/src/main/scala/org/apache/spark/executor/ExecutorCodeProfiler.scala rename to connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala index f688830eef6e..680d6e9149c5 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/ExecutorCodeProfiler.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.executor +package org.apache.spark.executor.profiler import java.io.{BufferedInputStream, FileInputStream, InputStream} import java.net.URI @@ -26,21 +26,20 @@ import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ import org.apache.spark.util.ThreadUtils /** * A class that enables the async code profiler - * */ -private[spark] class ExecutorCodeProfiler(conf: SparkConf, executorId: String) extends Logging { +private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) extends Logging { private var running = false private val enableProfiler = conf.get(EXECUTOR_CODE_PROFILING_ENABLED) private val profilerOptions = conf.get(EXECUTOR_CODE_PROFILING_OPTIONS) private val profilerOutputDir = conf.get(EXECUTOR_CODE_PROFILING_OUTPUT_DIR) private val profilerLocalDir = conf.get(EXECUTOR_CODE_PROFILING_LOCAL_DIR) + private val writeInterval = conf.get(EXECUTOR_CODE_PROFILING_WRITE_INTERVAL) private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr" private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr" @@ -48,7 +47,6 @@ private[spark] class ExecutorCodeProfiler(conf: SparkConf, executorId: String) e private val resumecmd = s"resume,$profilerOptions,file=$profilerLocalDir/profile.jfr" private val UPLOAD_SIZE = 8 * 1024 * 1024 // 8 MB - private val WRITE_INTERVAL = 30 // seconds private var outputStream: FSDataOutputStream = _ private var inputStream: InputStream = _ private val dataBuffer = new Array[Byte](UPLOAD_SIZE) @@ -112,7 +110,7 @@ private[spark] class ExecutorCodeProfiler(conf: SparkConf, executorId: String) e threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread") threadpool.scheduleWithFixedDelay(new Runnable() { override def run(): Unit = writeChunk() - }, WRITE_INTERVAL, WRITE_INTERVAL, + }, writeInterval, writeInterval, TimeUnit.SECONDS) writing = true } catch { diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/ExecutorProfilerPlugin.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala similarity index 89% rename from connector/profiler/src/main/scala/org/apache/spark/executor/ExecutorProfilerPlugin.scala rename to connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala index 56c4e529c72b..31b8676c77b6 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/ExecutorProfilerPlugin.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.executor +package org.apache.spark.executor.profiler import java.util.{Map => JMap} @@ -24,12 +24,10 @@ import scala.util.Random import org.apache.spark.SparkConf import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{EXECUTOR_CODE_PROFILING_ENABLED, EXECUTOR_CODE_PROFILING_FRACTION} /** * Spark plugin to do code profiling of executors - * */ class ExecutorProfilerPlugin extends SparkPlugin { override def driverPlugin(): DriverPlugin = null @@ -42,7 +40,7 @@ class CodeProfilerExecutorPlugin extends ExecutorPlugin with Logging { private var sparkConf: SparkConf = _ private var pluginCtx: PluginContext = _ - private var profiler: ExecutorCodeProfiler = _ + private var profiler: ExecutorJVMProfiler = _ private var codeProfilingEnabled: Boolean = _ private var codeProfilingFraction: Double = _ private val rand: Random = new Random(System.currentTimeMillis()) @@ -56,7 +54,7 @@ class CodeProfilerExecutorPlugin extends ExecutorPlugin with Logging { if (codeProfilingEnabled) { if (rand.nextInt(100) * 0.01 < codeProfilingFraction) { logInfo(s"Executor id ${pluginCtx.executorID()} selected for code profiling") - profiler = new ExecutorCodeProfiler(sparkConf, pluginCtx.executorID()) + profiler = new ExecutorJVMProfiler(sparkConf, pluginCtx.executorID()) profiler.start() } } @@ -64,12 +62,9 @@ class CodeProfilerExecutorPlugin extends ExecutorPlugin with Logging { } override def shutdown(): Unit = { - logInfo("Executor code profiler shutting down") if (profiler != null) { profiler.stop() } } - - } diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala new file mode 100644 index 000000000000..088fb4acb8c7 --- /dev/null +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.internal.config.ConfigBuilder + +package object profiler { + + private[profiler] val EXECUTOR_CODE_PROFILING_ENABLED = + ConfigBuilder("spark.executor.profiling.enabled") + .doc("Turn on code profiling via async_profiler in executors.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + + private[profiler] val EXECUTOR_CODE_PROFILING_OUTPUT_DIR = + ConfigBuilder("spark.executor.profiling.dfsDir") + .doc("HDFS compatible file-system path to where the profiler will write output jfr files.") + .version("4.0.0") + .stringConf + .createOptional + + private[profiler] val EXECUTOR_CODE_PROFILING_LOCAL_DIR = + ConfigBuilder("spark.executor.profiling.localDir") + .doc("Local file system path on executor where profiler output is saved. Defaults to the " + + "working directory of the executor process.") + .version("4.0.0") + .stringConf + .createWithDefault(".") + + private[profiler] val EXECUTOR_CODE_PROFILING_OPTIONS = + ConfigBuilder("spark.executor.profiling.options") + .doc("Options to pass on to the async profiler.") + .version("4.0.0") + .stringConf + .createWithDefault("event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s") + + private[profiler] val EXECUTOR_CODE_PROFILING_FRACTION = + ConfigBuilder("spark.executor.profiling.fraction") + .doc("Fraction of executors to profile") + .version("4.0.0") + .doubleConf + .checkValue(v => v >= 0.0 && v < 1.0, + "Fraction of executors to profile must be in [0,1)") + .createWithDefault(0.1) + + private[profiler] val EXECUTOR_CODE_PROFILING_WRITE_INTERVAL = + ConfigBuilder("spark.executor.profiling.writeInterval") + .doc("Time interval in seconds after which the profiler output will be synced to dfs") + .version("4.0.0") + .intConf + .createWithDefault(30) + +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index ddf58c4d47b1..b2bf30863a91 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -799,44 +799,6 @@ package object config { .intConf .createOptional - private[spark] val EXECUTOR_CODE_PROFILING_ENABLED = - ConfigBuilder("spark.executor.profiling.enabled") - .doc("Turn on code profiling via async_profiler in executors.") - .version("4.0.0") - .booleanConf - .createWithDefault(false) - - private[spark] val EXECUTOR_CODE_PROFILING_OUTPUT_DIR = - ConfigBuilder("spark.executor.profiling.outputDir") - .doc("HDFS compatible file-system path to where the profiler will write output jfr files.") - .version("4.0.0") - .stringConf - .createOptional - - private[spark] val EXECUTOR_CODE_PROFILING_LOCAL_DIR = - ConfigBuilder("spark.executor.profiling.localDir") - .doc("Local file system path on executor where profiler output is saved. Defaults to the " + - "working directory of the executor process.") - .version("4.0.0") - .stringConf - .createWithDefault(".") - - private[spark] val EXECUTOR_CODE_PROFILING_OPTIONS = - ConfigBuilder("spark.executor.profiling.options") - .doc("Options to pass on to the async profiler.") - .version("4.0.0") - .stringConf - .createWithDefault("event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s") - - private[spark] val EXECUTOR_CODE_PROFILING_FRACTION = - ConfigBuilder("spark.executor.profiling.fraction") - .doc("Fraction of executors to profile") - .version("4.0.0") - .doubleConf - .checkValue(v => v >= 0.0 && v < 1.0, - "Fraction of executors to profile must be in [0,1)") - .createWithDefault(0.1) - private[spark] val PY_FILES = ConfigBuilder("spark.yarn.dist.pyFiles") .internal() .version("2.2.1") diff --git a/pom.xml b/pom.xml index 0673aa0629ef..a9b42e19985a 100644 --- a/pom.xml +++ b/pom.xml @@ -3658,7 +3658,7 @@
- code-profiler + jvm-profiler connector/profiler From ae98818d9fd854beddb452136cba60e1fa83830a Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Mon, 18 Dec 2023 14:48:44 -0800 Subject: [PATCH 3/4] Address more review comments. --- connector/profiler/README.md | 12 +- .../profiler/ExecutorJVMProfiler.scala | 104 ++++++++++-------- .../profiler/ExecutorProfilerPlugin.scala | 15 ++- .../spark/executor/profiler/package.scala | 23 ++-- 4 files changed, 89 insertions(+), 65 deletions(-) diff --git a/connector/profiler/README.md b/connector/profiler/README.md index 18332748c26a..3512dadb0791 100644 --- a/connector/profiler/README.md +++ b/connector/profiler/README.md @@ -67,6 +67,16 @@ Then enable the profiling in the configuration. 4.0.0 + + spark.executor.profiling.options + event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s + + Options to pass to the profiler. Detailed options are documented in the comments here: + Profiler arguments. + Note that the options to start, stop, specify output format, and output file do not have to be specified. + + 4.0.0 + spark.executor.profiling.fraction 0.10 @@ -100,7 +110,7 @@ On Kubernetes, spark will try to shut down the executor pods while the profiler -c spark.executor.extraJavaOptions="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:+PreserveFramePointer" \ -c spark.plugins=org.apache.spark.executor.profiler.ExecutorProfilerPlugin \ -c spark.executor.profiling.enabled=true \ - -c spark.executor.profiling.outputDir=s3a://my-bucket/spark/profiles/ \ + -c spark.executor.profiling.dfsDir=s3a://my-bucket/spark/profiles/ \ -c spark.executor.profiling.options=event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s \ -c spark.executor.profiling.fraction=0.10 \ -c spark.kubernetes.executor.deleteOnTermination=false \ diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala index 680d6e9149c5..baa459cfe474 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.executor.profiler -import java.io.{BufferedInputStream, FileInputStream, InputStream} +import java.io.{BufferedInputStream, FileInputStream, InputStream, IOException} import java.net.URI import java.util.concurrent.{ScheduledExecutorService, TimeUnit} @@ -30,16 +30,16 @@ import org.apache.spark.util.ThreadUtils /** - * A class that enables the async code profiler + * A class that enables the async JVM code profiler */ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) extends Logging { private var running = false - private val enableProfiler = conf.get(EXECUTOR_CODE_PROFILING_ENABLED) - private val profilerOptions = conf.get(EXECUTOR_CODE_PROFILING_OPTIONS) - private val profilerOutputDir = conf.get(EXECUTOR_CODE_PROFILING_OUTPUT_DIR) - private val profilerLocalDir = conf.get(EXECUTOR_CODE_PROFILING_LOCAL_DIR) - private val writeInterval = conf.get(EXECUTOR_CODE_PROFILING_WRITE_INTERVAL) + private val enableProfiler = conf.get(EXECUTOR_PROFILING_ENABLED) + private val profilerOptions = conf.get(EXECUTOR_PROFILING_OPTIONS) + private val profilerDfsDir = conf.get(EXECUTOR_PROFILING_DFS_DIR) + private val profilerLocalDir = conf.get(EXECUTOR_PROFILING_LOCAL_DIR) + private val writeInterval = conf.get(EXECUTOR_PROFILING_WRITE_INTERVAL) private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr" private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr" @@ -53,49 +53,53 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex private var threadpool: ScheduledExecutorService = _ private var writing: Boolean = false - val profiler: AsyncProfiler = if (enableProfiler) { - if (AsyncProfilerLoader.isSupported) { - AsyncProfilerLoader.load() - } else { - logWarning("Executor code profiling is enabled but is not supported for this platform") - null - } - } else { - null + val profiler: Option[AsyncProfiler] = { + Option( + if (enableProfiler && AsyncProfilerLoader.isSupported) AsyncProfilerLoader.load() else null + ) } def start(): Unit = { - if (profiler != null && !running) { - logInfo("Executor code profiling starting.") + if (!running) { try { - profiler.execute(startcmd) + profiler.foreach(p => { + p.execute(startcmd) + logInfo("Executor JVM profiling started.") + running = true + startWriting() + } + ) } catch { - case e: Exception => - logWarning("Executor code profiling aborted due to exception: ", e) - return + case e@(_: IllegalArgumentException | _: IllegalStateException | _: IOException) => + logError("JVM profiling aborted. Exception occurred in profiler native code: ", e) + case e: Exception => logWarning("Executor JVM profiling aborted due to exception: ", e) } - logInfo("Executor code profiling started.") - running = true } - startWriting() } - /** Stops the profiling and saves output to hdfs location. */ + /** Stops the profiling and saves output to dfs location. */ def stop(): Unit = { - if (profiler != null && running) { - profiler.execute(stopcmd) - logInfo("Code profiler stopped") - running = false - finishWriting() + if (running) { + profiler.foreach(p => { + p.execute(stopcmd) + logInfo("JVM profiler stopped") + running = false + finishWriting() + }) } } private def startWriting(): Unit = { - if (profilerOutputDir.isDefined) { - val applicationId = conf.getAppId + if (profilerDfsDir.isDefined) { + val applicationId = try { + conf.getAppId + } catch { + case _: NoSuchElementException => "local-" + System.currentTimeMillis + } val config = SparkHadoopUtil.get.newConfiguration(conf) - val appName = conf.get("spark.app.name"); - val profilerOutputDirname = profilerOutputDir.get + val appName = conf.get("spark.app.name").replace(" ", "-") + val profilerOutputDirname = profilerDfsDir.get + val profileOutputFile = s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId.jfr" val fs = FileSystem.get(new URI(profileOutputFile), config); @@ -109,13 +113,13 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex inputStream = new BufferedInputStream(new FileInputStream(s"$profilerLocalDir/profile.jfr")) threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread") threadpool.scheduleWithFixedDelay(new Runnable() { - override def run(): Unit = writeChunk() + override def run(): Unit = writeChunk(false) }, writeInterval, writeInterval, TimeUnit.SECONDS) writing = true } catch { case e: Exception => - logError("Failed to start code profiler", e) + logError("Failed to start JVM profiler", e) if (threadpool != null) { threadpool.shutdownNow() } @@ -129,7 +133,7 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex } } - private def writeChunk(): Unit = { + private def writeChunk(lastChunk: Boolean): Unit = { if (!writing) { return } @@ -137,33 +141,41 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex // stop (pause) the profiler, dump the results and then resume. This is not ideal as we miss // the events while the file is being dumped, but that is the only way to make sure that // the chunk of data we are copying to dfs is in a consistent state. - profiler.execute(stopcmd) - profiler.execute(dumpcmd) + profiler.get.execute(stopcmd) + profiler.get.execute(dumpcmd) var remaining = inputStream.available() - profiler.execute(resumecmd) + if (!lastChunk) { + profiler.get.execute(resumecmd) + } while (remaining > 0) { val read = inputStream.read(dataBuffer, 0, math.min(remaining, UPLOAD_SIZE)) outputStream.write(dataBuffer, 0, read) remaining -= read } } catch { - case e: Exception => logError("Exception occurred while writing profiler output", e) + case e: IOException => logError("Exception occurred while writing some profiler output: ", e) + case e@(_: IllegalArgumentException | _: IllegalStateException) => + logError("Some profiler output not written." + + " Exception occurred in profiler native code: ", e) + case e: Exception => logError("Some profiler output not written. Unexpected exception: ", e) } } private def finishWriting(): Unit = { - if (profilerOutputDir.isDefined && writing) { + if (profilerDfsDir.isDefined && writing) { try { // shutdown background writer threadpool.shutdown() threadpool.awaitTermination(30, TimeUnit.SECONDS) // flush remaining data - writeChunk() + writeChunk(true) inputStream.close() outputStream.close() } catch { - case e: Exception => - logError("Exception in completing profiler output", e) + case _: InterruptedException => Thread.currentThread().interrupt() + case e: IOException => + logWarning("Some profiling output not written." + + "Exception occurred while completing profiler output", e) } writing = false } diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala index 31b8676c77b6..e144092cdecd 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala @@ -27,16 +27,16 @@ import org.apache.spark.internal.Logging /** - * Spark plugin to do code profiling of executors + * Spark plugin to do JVM code profiling of executors */ class ExecutorProfilerPlugin extends SparkPlugin { override def driverPlugin(): DriverPlugin = null // No-op - override def executorPlugin(): ExecutorPlugin = new CodeProfilerExecutorPlugin + override def executorPlugin(): ExecutorPlugin = new JVMProfilerExecutorPlugin } -class CodeProfilerExecutorPlugin extends ExecutorPlugin with Logging { +class JVMProfilerExecutorPlugin extends ExecutorPlugin with Logging { private var sparkConf: SparkConf = _ private var pluginCtx: PluginContext = _ @@ -48,12 +48,11 @@ class CodeProfilerExecutorPlugin extends ExecutorPlugin with Logging { override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = { pluginCtx = ctx sparkConf = ctx.conf() - codeProfilingEnabled = sparkConf.get(EXECUTOR_CODE_PROFILING_ENABLED) - codeProfilingFraction = sparkConf.get(EXECUTOR_CODE_PROFILING_FRACTION) - + codeProfilingEnabled = sparkConf.get(EXECUTOR_PROFILING_ENABLED) if (codeProfilingEnabled) { + codeProfilingFraction = sparkConf.get(EXECUTOR_PROFILING_FRACTION) if (rand.nextInt(100) * 0.01 < codeProfilingFraction) { - logInfo(s"Executor id ${pluginCtx.executorID()} selected for code profiling") + logInfo(s"Executor id ${pluginCtx.executorID()} selected for JVM code profiling") profiler = new ExecutorJVMProfiler(sparkConf, pluginCtx.executorID()) profiler.start() } @@ -62,7 +61,7 @@ class CodeProfilerExecutorPlugin extends ExecutorPlugin with Logging { } override def shutdown(): Unit = { - logInfo("Executor code profiler shutting down") + logInfo("Executor JVM profiler shutting down") if (profiler != null) { profiler.stop() } diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala index 088fb4acb8c7..f9adec2d4be9 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala @@ -16,25 +16,27 @@ */ package org.apache.spark.executor +import java.util.concurrent.TimeUnit + import org.apache.spark.internal.config.ConfigBuilder package object profiler { - private[profiler] val EXECUTOR_CODE_PROFILING_ENABLED = + private[profiler] val EXECUTOR_PROFILING_ENABLED = ConfigBuilder("spark.executor.profiling.enabled") .doc("Turn on code profiling via async_profiler in executors.") .version("4.0.0") .booleanConf .createWithDefault(false) - private[profiler] val EXECUTOR_CODE_PROFILING_OUTPUT_DIR = + private[profiler] val EXECUTOR_PROFILING_DFS_DIR = ConfigBuilder("spark.executor.profiling.dfsDir") - .doc("HDFS compatible file-system path to where the profiler will write output jfr files.") + .doc("HDFS compatible file-system path to where the profiler will write output jfr files.") .version("4.0.0") .stringConf .createOptional - private[profiler] val EXECUTOR_CODE_PROFILING_LOCAL_DIR = + private[profiler] val EXECUTOR_PROFILING_LOCAL_DIR = ConfigBuilder("spark.executor.profiling.localDir") .doc("Local file system path on executor where profiler output is saved. Defaults to the " + "working directory of the executor process.") @@ -42,27 +44,28 @@ package object profiler { .stringConf .createWithDefault(".") - private[profiler] val EXECUTOR_CODE_PROFILING_OPTIONS = + private[profiler] val EXECUTOR_PROFILING_OPTIONS = ConfigBuilder("spark.executor.profiling.options") .doc("Options to pass on to the async profiler.") .version("4.0.0") .stringConf .createWithDefault("event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s") - private[profiler] val EXECUTOR_CODE_PROFILING_FRACTION = + private[profiler] val EXECUTOR_PROFILING_FRACTION = ConfigBuilder("spark.executor.profiling.fraction") .doc("Fraction of executors to profile") .version("4.0.0") .doubleConf - .checkValue(v => v >= 0.0 && v < 1.0, - "Fraction of executors to profile must be in [0,1)") + .checkValue(v => v >= 0.0 && v <= 1.0, + "Fraction of executors to profile must be in [0,1]") .createWithDefault(0.1) - private[profiler] val EXECUTOR_CODE_PROFILING_WRITE_INTERVAL = + private[profiler] val EXECUTOR_PROFILING_WRITE_INTERVAL = ConfigBuilder("spark.executor.profiling.writeInterval") .doc("Time interval in seconds after which the profiler output will be synced to dfs") .version("4.0.0") - .intConf + .timeConf(TimeUnit.SECONDS) + .checkValue(_ >= 0, "Write interval should be non-negative") .createWithDefault(30) } From 7240e31f537bdf4bee9752928dbb088d75b92e10 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Fri, 12 Jan 2024 16:51:45 -0800 Subject: [PATCH 4/4] Address more review comments --- .../executor/profiler/ExecutorJVMProfiler.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala index baa459cfe474..a5d5b2a1e98d 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala @@ -67,10 +67,9 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex logInfo("Executor JVM profiling started.") running = true startWriting() - } - ) + }) } catch { - case e@(_: IllegalArgumentException | _: IllegalStateException | _: IOException) => + case e @ (_: IllegalArgumentException | _: IllegalStateException | _: IOException) => logError("JVM profiling aborted. Exception occurred in profiler native code: ", e) case e: Exception => logWarning("Executor JVM profiling aborted due to exception: ", e) } @@ -112,9 +111,12 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex logInfo(s"Copying executor profiling file to $profileOutputFile") inputStream = new BufferedInputStream(new FileInputStream(s"$profilerLocalDir/profile.jfr")) threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread") - threadpool.scheduleWithFixedDelay(new Runnable() { - override def run(): Unit = writeChunk(false) - }, writeInterval, writeInterval, + threadpool.scheduleWithFixedDelay( + new Runnable() { + override def run(): Unit = writeChunk(false) + }, + writeInterval, + writeInterval, TimeUnit.SECONDS) writing = true } catch { @@ -154,7 +156,7 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex } } catch { case e: IOException => logError("Exception occurred while writing some profiler output: ", e) - case e@(_: IllegalArgumentException | _: IllegalStateException) => + case e @ (_: IllegalArgumentException | _: IllegalStateException) => logError("Some profiler output not written." + " Exception occurred in profiler native code: ", e) case e: Exception => logError("Some profiler output not written. Unexpected exception: ", e)