Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>jvm-profiler</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-profiler_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>bigtop-dist</id>
<!-- This profile uses the assembly plugin to create a special "dist" package for BigTop
Expand Down
119 changes: 119 additions & 0 deletions connector/profiler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Spark JVM Profiler Plugin

## Build

To build
```
./build/mvn clean package -DskipTests -Pjvm-profiler
```

## Executor Code Profiling

The spark profiler module enables code profiling of executors in cluster mode based on the the [async profiler](https://github.com/async-profiler/async-profiler/blob/v2.10/README.md), a low overhead sampling profiler. This allows a Spark application to capture CPU and memory profiles for application running on a cluster which can later be analyzed for performance issues. The profiler captures [Java Flight Recorder (jfr)](https://access.redhat.com/documentation/es-es/red_hat_build_of_openjdk/17/html/using_jdk_flight_recorder_with_red_hat_build_of_openjdk/openjdk-flight-recorded-overview) files for each executor; these can be read by many tools including Java Mission Control and Intellij.

The profiler writes the jfr files to the executor's working directory in the executor's local file system and the files can grow to be large so it is advisable that the executor machines have adequate storage. The profiler can be configured to copy the jfr files to a hdfs location before the executor shuts down.

Code profiling is currently only supported for
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question. Why not Windows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because async-profiler requires specific POSIX signals capabilities which Windows implements differently. So async-profiler doesn't support windows. More here: async-profiler/async-profiler#188


* Linux (x64)
* Linux (arm 64)
* Linux (musl, x64)
* MacOS

To get maximum profiling information set the following jvm options for the executor :

```
-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:+PreserveFramePointer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the trailing space.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

```

For more information on async_profiler see the [Async Profiler Manual](https://krzysztofslusarski.github.io/2022/12/12/async-manual.html)


To enable code profiling, first enable the code profiling plugin via

```
spark.plugins=org.apache.spark.executor.profiler.ExecutorProfilerPlugin
```

Then enable the profiling in the configuration.


### Code profiling configuration

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
<td><code>spark.executor.profiling.enabled</code></td>
<td><code>false</code></td>
<td>
If true, will enable code profiling
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.dfsDir</code></td>
<td>(none)</td>
<td>
An HDFS compatible path to which the profiler's output files are copied. The output files will be written as <i>dfsDir/application_id/profile-appname-exec-executor_id.jfr</i> <br/>
If no <i>dfsDir</i> is specified then the files are not copied over. Users should ensure there is sufficient disk space available otherwise it may lead to corrupt jfr files.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.localDir</code></td>
<td><code>.</code> i.e. the executor's working dir</td>
<td>
The local directory in the executor container to write the jfr files to. If not specified the file will be written to the executor's working directory. Users should ensure there is sufficient disk space available on the system as running out of space may result in corrupt jfr file and even cause jobs to fail on systems like K8s.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.options</code></td>
<td>event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s</td>
<td>
Options to pass to the profiler. Detailed options are documented in the comments here:
<a href="https://github.com/async-profiler/async-profiler/blob/32601bccd9e49adda9510a2ed79d142ac6ef0ff9/src/arguments.cpp#L52">Profiler arguments</a>.
Note that the options to start, stop, specify output format, and output file do not have to be specified.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.fraction</code></td>
<td>0.10</td>
<td>
The fraction of executors on which to enable code profiling. The executors to be profiled are picked at random.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.writeInterval</code></td>
<td>30</td>
<td>
Time interval, in seconds, after which the profiler output will be synced to dfs.
</td>
<td>4.0.0</td>
</tr>
</table>

### Kubernetes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏🏾

On Kubernetes, spark will try to shut down the executor pods while the profiler files are still being saved. To prevent this set
```
spark.kubernetes.executor.deleteOnTermination=false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the trailing spaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

```

### Example
```
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
-c spark.executor.extraJavaOptions="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:+PreserveFramePointer" \
-c spark.plugins=org.apache.spark.executor.profiler.ExecutorProfilerPlugin \
-c spark.executor.profiling.enabled=true \
-c spark.executor.profiling.dfsDir=s3a://my-bucket/spark/profiles/ \
-c spark.executor.profiling.options=event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s \
-c spark.executor.profiling.fraction=0.10 \
-c spark.kubernetes.executor.deleteOnTermination=false \
<application-jar> \
[application-arguments]
```
50 changes: 50 additions & 0 deletions connector/profiler/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.13</artifactId>
<version>4.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>spark-profiler_2.13</artifactId>
<properties>
<sbt.project.name>profiler</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Profiler</name>
<url>https://spark.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- async-profiler loader contains async_profiler binaries for multiple platforms -->
<dependency>
<groupId>me.bechberger</groupId>
<artifactId>ap-loader-all</artifactId>
<version>2.9-7</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor.profiler

import java.io.{BufferedInputStream, FileInputStream, InputStream, IOException}
import java.net.URI
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import one.profiler.{AsyncProfiler, AsyncProfilerLoader}
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.util.ThreadUtils


/**
* A class that enables the async JVM code profiler
*/
private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) extends Logging {

private var running = false
private val enableProfiler = conf.get(EXECUTOR_PROFILING_ENABLED)
private val profilerOptions = conf.get(EXECUTOR_PROFILING_OPTIONS)
private val profilerDfsDir = conf.get(EXECUTOR_PROFILING_DFS_DIR)
private val profilerLocalDir = conf.get(EXECUTOR_PROFILING_LOCAL_DIR)
private val writeInterval = conf.get(EXECUTOR_PROFILING_WRITE_INTERVAL)

private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr"
private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr"
private val dumpcmd = s"dump,$profilerOptions,file=$profilerLocalDir/profile.jfr"
private val resumecmd = s"resume,$profilerOptions,file=$profilerLocalDir/profile.jfr"

private val UPLOAD_SIZE = 8 * 1024 * 1024 // 8 MB
private var outputStream: FSDataOutputStream = _
private var inputStream: InputStream = _
private val dataBuffer = new Array[Byte](UPLOAD_SIZE)
private var threadpool: ScheduledExecutorService = _
private var writing: Boolean = false

val profiler: Option[AsyncProfiler] = {
Option(
if (enableProfiler && AsyncProfilerLoader.isSupported) AsyncProfilerLoader.load() else null
)
}

def start(): Unit = {
if (!running) {
try {
profiler.foreach(p => {
p.execute(startcmd)
logInfo("Executor JVM profiling started.")
running = true
startWriting()
})
} catch {
case e @ (_: IllegalArgumentException | _: IllegalStateException | _: IOException) =>
logError("JVM profiling aborted. Exception occurred in profiler native code: ", e)
case e: Exception => logWarning("Executor JVM profiling aborted due to exception: ", e)
}
}
}

/** Stops the profiling and saves output to dfs location. */
def stop(): Unit = {
if (running) {
profiler.foreach(p => {
p.execute(stopcmd)
logInfo("JVM profiler stopped")
running = false
finishWriting()
})
}
}

private def startWriting(): Unit = {
if (profilerDfsDir.isDefined) {
val applicationId = try {
conf.getAppId
} catch {
case _: NoSuchElementException => "local-" + System.currentTimeMillis
}
val config = SparkHadoopUtil.get.newConfiguration(conf)
val appName = conf.get("spark.app.name").replace(" ", "-")
val profilerOutputDirname = profilerDfsDir.get

val profileOutputFile =
s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId.jfr"
val fs = FileSystem.get(new URI(profileOutputFile), config);
val filenamePath = new Path(profileOutputFile)
outputStream = fs.create(filenamePath)
try {
if (fs.exists(filenamePath)) {
fs.delete(filenamePath, true)
}
logInfo(s"Copying executor profiling file to $profileOutputFile")
inputStream = new BufferedInputStream(new FileInputStream(s"$profilerLocalDir/profile.jfr"))
threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread")
threadpool.scheduleWithFixedDelay(
new Runnable() {
override def run(): Unit = writeChunk(false)
},
writeInterval,
writeInterval,
TimeUnit.SECONDS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation of the above four lines looks weird to me. Could you revise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Hopefully this is better now.

writing = true
} catch {
case e: Exception =>
logError("Failed to start JVM profiler", e)
if (threadpool != null) {
threadpool.shutdownNow()
}
if (inputStream != null) {
inputStream.close()
}
if (outputStream != null) {
outputStream.close()
}
}
}
}

private def writeChunk(lastChunk: Boolean): Unit = {
if (!writing) {
return
}
try {
// stop (pause) the profiler, dump the results and then resume. This is not ideal as we miss
// the events while the file is being dumped, but that is the only way to make sure that
// the chunk of data we are copying to dfs is in a consistent state.
profiler.get.execute(stopcmd)
profiler.get.execute(dumpcmd)
var remaining = inputStream.available()
if (!lastChunk) {
profiler.get.execute(resumecmd)
}
while (remaining > 0) {
val read = inputStream.read(dataBuffer, 0, math.min(remaining, UPLOAD_SIZE))
outputStream.write(dataBuffer, 0, read)
remaining -= read
}
} catch {
case e: IOException => logError("Exception occurred while writing some profiler output: ", e)
case e @ (_: IllegalArgumentException | _: IllegalStateException) =>
logError("Some profiler output not written." +
" Exception occurred in profiler native code: ", e)
case e: Exception => logError("Some profiler output not written. Unexpected exception: ", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering what we can get in this case because the flag writing is still true. Do we need to keep writing because we need to invoke finishWriting? However, it seems that we cannot invoke inputStream.close() and outputStream.close() eventually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get a failure, we can still keep writing because only the portion (chunk) of the data being written may be lost. The output file would still be valid.
finishWriting is eventually called to shutdown the thread and the streams cleanly.
We could potentially stop profiling if we get any of these exceptions, but I feel that it is perhaps too drastic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Since we have the error log, it might be okay for now.

}
}

private def finishWriting(): Unit = {
if (profilerDfsDir.isDefined && writing) {
try {
// shutdown background writer
threadpool.shutdown()
threadpool.awaitTermination(30, TimeUnit.SECONDS)
// flush remaining data
writeChunk(true)
inputStream.close()
outputStream.close()
} catch {
case _: InterruptedException => Thread.currentThread().interrupt()
case e: IOException =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this come from when writeChunk swallows all exceptions with case e: Exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exceptions may come from threadpool.shutdown, awaitTermination which may throw InterruptedException or from {input,output}Stream.close which may get an IOException

logWarning("Some profiling output not written." +
"Exception occurred while completing profiler output", e)
}
writing = false
}
}
}
Loading