Skip to content

Conversation

@parthchandra
Copy link
Contributor

What changes were proposed in this pull request?

This adds support for the async profiler to Spark

Why are the changes needed?

Profiling of JVM applications on a cluster is cumbersome and it can be complicated to save the output of the profiler especially if the cluster is on K8s where the executor pods are removed and any files saved to the local file system become inaccessible. This feature makes it simple to turn profiling on/off, includes the jar/binaries needed for profiling, and makes it simple to save output to an HDFS location.

Does this PR introduce any user-facing change?

This PR introduces three new configuration parameters. These are described in the documentation.

How was this patch tested?

Tested manually on EKS

Was this patch authored or co-authored using generative AI tooling?

No

This adds support for the async profiler to Spark

Profiling of JVM applications on a cluster is cumbersome and it can be complicated to save the output of the profiler especially if the cluster is on K8s where the executor pods are removed and any files saved to the local file system become inaccessible.
This feature makes it simple to turn profiling on/off, includes the jar/binaries needed for profiling,  and makes it simple to save output to an HDFS location.

This PR introduces three new configuration parameters. These are described in the documentation.
@HyukjinKwon
Copy link
Member

how do you use it? would be great if it contains the example, how to run, etc.

@parthchandra
Copy link
Contributor Author

how do you use it? would be great if it contains the example, how to run, etc.

There's a README - connector/profiler/README.md. I can add more details if you think this is not enough.

@parthchandra
Copy link
Contributor Author

There's a whole slew of errors like the following while building documentation -

[error] /__w/spark/spark/Loading source file /__w/spark/spark/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java...
[error] Loading source file /__w/spark/spark/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java...
[error] Loading source file /__w/spark/spark/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java...
[error] Loading source file /__w/spark/spark/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java...
[error] Loading source file /__w/spark/spark/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java...

How does one fix this?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making a PR, @parthchandra .


To build
```
./build/mvn clean package -P code-profiler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the trailing spaces.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, please add -DskipTests in order to be more copy-and-paste friendly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Spark, it is more customary to write as -Pcode-profiler

assembly/pom.xml Outdated
</dependencies>
</profile>
<profile>
<id>code-profiler</id>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to avoid future confusions with Python Profiler, SPARK-40281 (spark.python.profile.memory).

Shall we rename this from code-profiler to jvm-profiler (or java-profiler)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure


## Executor Code Profiling

The spark profiler module enables code profiling of executors in cluster mode based on the the [async profiler](https://github.com/async-profiler/async-profiler/blob/master/README.md), a low overhead sampling profiler. This allows a Spark application to capture CPU and memory profiles for application running on a cluster which can later be analyzed for performance issues. The profiler captures [Java Flight Recorder (jfr)](https://developers.redhat.com/blog/2020/08/25/get-started-with-jdk-flight-recorder-in-openjdk-8u#) files for each executor; these can be read by many tools including Java Mission Control and Intellij.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be specific about what we are depending on.

- https://github.com/async-profiler/async-profiler/blob/master/README.md
+ https://github.com/async-profiler/async-profiler/blob/v2.10/README.md

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Java 8 reference link, it looks inappropriate because Apache Spark 4.0.0 dropped all Java versions less than 16.
Do you think we can have Java 17+ link, @parthchandra ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with a link that references jdk17.


The profiler writes the jfr files to the executor's working directory in the executor's local file system and the files can grow to be large so it is advisable that the executor machines have adequate storage. The profiler can be configured to copy the jfr files to a hdfs location before the executor shuts down.

Code profiling is currently only supported for
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question. Why not Windows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because async-profiler requires specific POSIX signals capabilities which Windows implements differently. So async-profiler doesn't support windows. More here: async-profiler/async-profiler#188

To get maximum profiling information set the following jvm options for the executor -

```
-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:+PreserveFramePointer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the trailing space.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* Linux (musl, x64)
* MacOS

To get maximum profiling information set the following jvm options for the executor -
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- -> :

<td><code>spark.executor.profiling.enabled</code></td>
<td>
<code>false</code>
</td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's unify the style. Like 74 line, one-liner (<td><code>false</code></td>) is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

</tr>
<tr>
<td><code>spark.executor.profiling.outputDir</code></td>
<td></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use (none).

<td>(none)</td>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

<td><code>spark.executor.profiling.outputDir</code></td>
<td></td>
<td>
An hdfs compatible path to which the profiler's output files are copied. The output files will be written as <i>outputDir/application_id/profile-appname-exec-executor_id.jfr</i> <br/>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hdfs -> HDFS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

<td></td>
<td>
An hdfs compatible path to which the profiler's output files are copied. The output files will be written as <i>outputDir/application_id/profile-appname-exec-executor_id.jfr</i> <br/>
If no outputDir is specified then the files are not copied over.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a warning about Out-Of-Disk situation because K8s is very strict about the disk usage unlike YARN or Standalone clusters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running out of space in the dfs will not affect the job. However, the jfr file may be corrupted. Added the warning.
Also added the warning for localDir where out of space in the local system may cause the job to fail on K8s.

<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.outputDir</code></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is remote HDFS-compatible location, let's follow our convention like the following.

<td><code>spark.driver.log.dfsDir</code></td>

In short, please rename outputDir to dfsDir.

- spark.executor.profiling.outputDir
+ spark.executor.profiling.dfsDir

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

</tr>
</table>

### Kubernetes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏🏾

### Kubernetes
On Kubernetes, spark will try to shut down the executor pods while the profiler files are still being saved. To prevent this set
```
spark.kubernetes.executor.deleteOnTermination=false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the trailing spaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/**
* A class that enables the async code profiler
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Remove the empty line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* A class that enables the async code profiler
*
*/
private[spark] class ExecutorCodeProfiler(conf: SparkConf, executorId: String) extends Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark Executors can have Java, Puython, and R runtimes. Given that, Code is a little vague term.

  • I'd like to propose to rename ExecutorCodeProfiler to ExecutorJVMProfiler.
  • Otherwise, at least, please document it clearly that this is JVM-only feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

private val resumecmd = s"resume,$profilerOptions,file=$profilerLocalDir/profile.jfr"

private val UPLOAD_SIZE = 8 * 1024 * 1024 // 8 MB
private val WRITE_INTERVAL = 30 // seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is a magic number instead of a configuration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt there were already too many configuration parameters and I found this to be a good value for real use cases.
Making this configurable.

private var writing: Boolean = false

val profiler: AsyncProfiler = if (enableProfiler) {
if (AsyncProfilerLoader.isSupported) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give me a link for this method, please?


/**
* Spark plugin to do code profiling of executors
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Remove this line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

override def shutdown(): Unit = {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Remove this line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Remove the above two lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.checkValue(v => v >= 0.0 && v < 1.0,
"Fraction of executors to profile must be in [0,1)")
.createWithDefault(0.1)

Copy link
Member

@dongjoon-hyun dongjoon-hyun Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind, shall we try to move this into a seperate package like Kafka module under connector.

package object kafka010 { // scalastyle:ignore
// ^^ scalastyle:ignore is for ignoring warnings about digits in package name
type PartitionOffsetMap = Map[TopicPartition, Long]
private[kafka010] val PRODUCER_CACHE_TIMEOUT =
ConfigBuilder("spark.kafka.producer.cache.timeout")
.doc("The expire time to remove the unused producers.")
.version("2.2.1")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10m")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For YARN and K8s, we have Config.scala.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. We don't need any yarn or kubernetes specific configuration

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a great addition, @parthchandra . Thank you so much! I finished my first review with a few comments. Other designs look good to me.

@mridulm
Copy link
Contributor

mridulm commented Dec 20, 2023

If async profiler does not allow us to map the native thread to its java thread (please validate this) - we cannot map stack traces to the corresponding task threads - and so limits usability of this integration in spark.
If this limitation exists, we should explore alternatives which might support it and is easy to integrate (honest profiler supports it but is not easy to integrate iirc).

Simply dumping per executor flamegraphs or stack traces has limited utility (and can be done today) unless we have a path to integrating with SPARK-45151 IMO.

Thoughts @dongjoon-hyun as well ?

@parthchandra
Copy link
Contributor Author

parthchandra commented Dec 20, 2023

If async profiler does not allow us to map the native thread to its java thread (please validate this) - we cannot map stack traces to the corresponding task threads - and so limits usability of this integration in spark.

AsyncGetCallTrace is used precisely to map calls in the native thread to calls in the java thread.
Not sure exactly what you are looking for here. Are you looking to profile individual tasks? It certainly can be done, but would require some changes similar to SPARK-45151 and some additional work if you want the profile available thru the UI. Or are you looking to enhance SPARK-45151 and get a stack trace that includes native calls? This is a little harder via async_profiler since there is no API to get a snapshot.
Note that getting a profile needs to be collected over a period of time and so is different from getting a snapshot as SPARK-45151 is doing.

Simply dumping per executor flamegraphs or stack traces has limited utility (and can be done today).

I would suggest that this PR makes it trivially simple to profile with no setup required. On K8s, with ephemeral storage, it is not a simple task to dump a profile to disk and get it off the pod before the pod is destroyed (it was in fact the original motivation behind doing this).

@mridulm
Copy link
Contributor

mridulm commented Dec 20, 2023

AsyncGetCallTrace is used precisely to map calls in the native thread to calls in the java thread. Not sure exactly what you are looking for here. Are you looking to profile individual tasks? It certainly can be done, but would require some changes similar to SPARK-45151 and some additional work if you want the profile available thru the UI. Or are you looking to enhance SPARK-45151 and get a stack trace that includes native calls? This is a little harder via async_profiler since there is no API to get a snapshot. Note that getting a profile needs to be collected over a period of time and so is different from getting a snapshot as SPARK-45151 is doing.

There is a difference between native thread id's and java thread ids.
Given the async profiler output, can we map it to the corresponding task (given task's java thread id) ?
My understanding is currently no - but if I am missing something, do let me know.

Assuming no, this means the stack traces generated are for all threads in the executor jvm - and so does not allow us to get stack traces and/or flamegraphs for a particular task, tasks of a stage, etc.

If yes, this would be very useful - and will allow for future evolution as part of SPARK-44893 [1].

Simply dumping per executor flamegraphs or stack traces has limited utility (and can be done today).

I would suggest that this PR makes it trivially simple to profile with no setup required. On K8s, with ephemeral storage, it is not a simple task to dump a profile to disk and get it off the pod before the pod is destroyed (it was in fact the original motivation behind doing this).

I am not seeing a lot of value in including this into Apache Spark itself - plugin api is public, and users can leverage it to do precisely what the PR is proposing.
On other hand, if the PR is integrating well with SPARK-44893 [1] - and/or there is a path to leveraging it in that work, it would be more useful.

I am not -1 on this @dongjoon-hyun , but I am not seeing a lot of value in it: will let you make the call (also because I am on vacation, dont have my desktop handy to investigate in detail :) ).

[1] This is the jira I was trying to paste, but github mobile messed it up - and ended up referencing a subtask !

@parthchandra
Copy link
Contributor Author

There is a difference between native thread id's and java thread ids. Given the async profiler output, can we map it to the corresponding task (given task's java thread id) ? My understanding is currently no - but if I am missing something, do let me know.

Yes we can map the stack traces to the java thread. Here's how it looks (this is in intellij's profiler window)
Screenshot 2023-12-20 at 9 46 49 AM

Assuming no, this means the stack traces generated are for all threads in the executor jvm - and so does not allow us to get stack traces and/or flamegraphs for a particular task, tasks of a stage, etc.

We can get individual threads and even filter to profile a single thread. This PR specifically profiles every thread in the executor.

If yes, this would be very useful - and will allow for future evolution as part of SPARK-44893 [1].

Ah, this JIRA makes it clearer. We can leverage the async-profiler to provide the features not yet implemented in SPARK-45209. The current implementation uses a simple snapshot of the task stack traces which can be enhanced by using the async-profiler to get accurate profiling.

I am not seeing a lot of value in including this into Apache Spark itself - plugin api is public, and users can leverage it to do precisely what the PR is proposing. On other hand, if the PR is integrating well with SPARK-44893 [1] - and/or there is a path to leveraging it in that work, it would be more useful.

I think we can certainly leverage this work. This PR by itself does not have the APIs needed to enhance SPARK-45209. It would probably need to be a separate PR because it may need changes to the UI implementation. We can either get a flamegraph (covering a period of time for a task) or collapsed call traces from which a flamegraph can be produced and the choice will affect the UI.

I am not -1 on this @dongjoon-hyun , but I am not seeing a lot of value in it: will let you make the call (also because I am on vacation, dont have my desktop handy to investigate in detail :) ).

🙏🏾

@mridulm
Copy link
Contributor

mridulm commented Dec 21, 2023

That sounds promising !
What is unclear to me is how we are going to do the mapping without something which ends up introducing safe point bias (essentially, cost of this operation) ...
For example, if the native to java thread mapping requires mxbean.getThreadInfo and/or similar approaches, it becomes fairly expensive.

Essentially what I am trying to make sure is - given (native-thread-id -> timestamp -> stack_dumps+)*, can we identify the native-thread -> java-thread-id ?
If yes, we can build the java-thread-id -> task-id in spark, and essentially get to (task-id -> stack_dumps+)* for all (most ?) tasks.

When we built Safari, this is what ended up being extremely powerful for understanding application performance - per-task stack dumps, correlated across all tasks for a stage: allowing us to understand what the stack dump for a particular stage is, what the difference between 'expensive' tasks in a stage vs average task is, etc - and ignoring most of the non-task thread dumps in an executor is (unless explicitly required)
At that time atleast, async-profiler did not provide a way to 'cheaply' do this - and so I ended up enhancing honest-profiler to support it (unfortunately, honest-profiler does not publish to maven, so using it is currently not a viable option).

@parthchandra
Copy link
Contributor Author

parthchandra commented Dec 21, 2023

That was pretty cool stuff you did in Safari!
I think we may not have to do too much work ourselves.
The way I see it, async_profiler is doing the mapping of the java threads and stack traces already for us (and we know that both async_profiler and honest_profiler avoid the safepoint bias problem so this is as good as it gets). In addition there is a filter API to filter on one or more threads so async_profiler collects events only for the given thread(s). The API for filtering takes a java.lang.Thread as input.
The way I see it potentially working is: when a user asks to profile a task, we start profiling for only the task's thread similar to the way a task stack trace is being done today. Then we ship over the the collected data and display it.
I'll have to play around with this though. There might be some gotchas in profiling multiple threads simultaneously and/or some APIs might be private

@dongjoon-hyun
Copy link
Member

Sorry for being away, @mridulm and @parthchandra . I've been traveling in South Korea since 14th December. I'll catch up the discussion and will revisit this PR on January. Thank you!

running = true
startWriting()
}
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Let's merge line 71 into 70.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
)
} catch {
case e@(_: IllegalArgumentException | _: IllegalStateException | _: IOException) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Add proper spaces?

- case e@(
+ case e @ (

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

threadpool.scheduleWithFixedDelay(new Runnable() {
override def run(): Unit = writeChunk(false)
}, writeInterval, writeInterval,
TimeUnit.SECONDS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation of the above four lines looks weird to me. Could you revise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Hopefully this is better now.

}
} catch {
case e: IOException => logError("Exception occurred while writing some profiler output: ", e)
case e@(_: IllegalArgumentException | _: IllegalStateException) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. e@( -> e @ (

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

case e@(_: IllegalArgumentException | _: IllegalStateException) =>
logError("Some profiler output not written." +
" Exception occurred in profiler native code: ", e)
case e: Exception => logError("Some profiler output not written. Unexpected exception: ", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering what we can get in this case because the flag writing is still true. Do we need to keep writing because we need to invoke finishWriting? However, it seems that we cannot invoke inputStream.close() and outputStream.close() eventually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get a failure, we can still keep writing because only the portion (chunk) of the data being written may be lost. The output file would still be valid.
finishWriting is eventually called to shutdown the thread and the streams cleanly.
We could potentially stop profiling if we get any of these exceptions, but I feel that it is perhaps too drastic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Since we have the error log, it might be okay for now.

outputStream.close()
} catch {
case _: InterruptedException => Thread.currentThread().interrupt()
case e: IOException =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this come from when writeChunk swallows all exceptions with case e: Exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exceptions may come from threadpool.shutdown, awaitTermination which may throw InterruptedException or from {input,output}Stream.close which may get an IOException

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jan 12, 2024

Thank you all. Especially, @mridulm for many ideas and advices on this PR reviews.

  • Ya, as you mentioned, honest-profiler has been abandoned about two years unfortunately. We should not depend on it.
  • Like Safari (2019), there has been many profiler approaches outside the community because it's valuable. However, IIUC, it's not easily available to the Apache Spark users in these days.
  • I believe the value of this PR is capable of providing Apache Spark 4.0.0 users the foundation of easy pre-defined profiling with additional help like spark.executor.profiling.fraction.
    • To be clear, we don't recommend users to enable this for all executors of all jobs.
  • Lastly, this plugin way of this PR is non-intrusive and has no maintain cost because it's based on the Spark standard. For the proposed integrated and detailed analysis, I believe we can achieve them in the future in a way because we agreed lots of values of that integrated analysis.

I hope we can merge this as a part of Apache Spark 4.0.0.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. (Pending CIs)

@dongjoon-hyun
Copy link
Member

Merged to master for Apache Spark 4.0.0.
Thank you, @parthchandra and all!

@parthchandra
Copy link
Contributor Author

Thank you @dongjoon-hyun @mridulm

RexXiong pushed a commit to apache/celeborn that referenced this pull request Mar 25, 2024
…c-profiler

### What changes were proposed in this pull request?

Introduce JVM profiling `JVMProfier` in Celeborn Worker using async-profiler to capture CPU and memory profiles.

### Why are the changes needed?

[async-profiler](https://github.com/async-profiler) is a sampling profiler for any JDK based on the HotSpot JVM that does not suffer from Safepoint bias problem. It has low overhead and doesn’t rely on JVMTI. It avoids the safepoint bias problem by using the `AsyncGetCallTrace` API provided by HotSpot JVM to profile the Java code paths, and Linux’s perf_events to profile the native code paths. It features HotSpot-specific APIs to collect stack traces and to track memory allocations.
The feature introduces a profier plugin that does not add any overhead unless enabled and can be configured to accept profiler arguments as a configuration parameter. It should support to turn profiling on/off, includes the jar/binaries needed for profiling.

Backport [[SPARK-46094] Support Executor JVM Profiling](apache/spark#44021).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Worker cluster test.

Closes #2409 from SteNicholas/CELEBORN-1299.

Authored-by: SteNicholas <[email protected]>
Signed-off-by: Shuang <[email protected]>
dongjoon-hyun added a commit that referenced this pull request May 4, 2024
… `jvm-profiler` modules

### What changes were proposed in this pull request?

This PR aims to fix `dev/scalastyle` to check `hadoop-cloud` and `jam-profiler` modules.
Also, the detected scalastyle issues are fixed.

### Why are the changes needed?

To prevent future scalastyle issues.

Scala style violation was introduced here, but we missed because we didn't check all optional modules.
- #46022

`jvm-profiler` module was added newly at Apache Spark 4.0.0 but we missed to add this to `dev/scalastyle`. Note that there was no scala style issues in that `module` at that time.
- #44021

`hadoop-cloud` module was added at Apache Spark 2.3.0.
- #17834

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs with newly revised `dev/scalastyle`.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46376 from dongjoon-hyun/SPARK-48127.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
sinaiamonkar-sai pushed a commit to sinaiamonkar-sai/spark that referenced this pull request May 5, 2024
… `jvm-profiler` modules

### What changes were proposed in this pull request?

This PR aims to fix `dev/scalastyle` to check `hadoop-cloud` and `jam-profiler` modules.
Also, the detected scalastyle issues are fixed.

### Why are the changes needed?

To prevent future scalastyle issues.

Scala style violation was introduced here, but we missed because we didn't check all optional modules.
- apache#46022

`jvm-profiler` module was added newly at Apache Spark 4.0.0 but we missed to add this to `dev/scalastyle`. Note that there was no scala style issues in that `module` at that time.
- apache#44021

`hadoop-cloud` module was added at Apache Spark 2.3.0.
- apache#17834

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs with newly revised `dev/scalastyle`.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46376 from dongjoon-hyun/SPARK-48127.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Jan 14, 2025
…on DFS

### What changes were proposed in this pull request?

This PR canonicalizes the JVM profiler added in SPARK-46094 profiling result files on DFS to
```
dfsDir/{{APP_ID}}/profile-exec-{{EXECUTOR_ID}}.jfr
```
which majorly follows the event logs file name pattern and layout.

### Why are the changes needed?

According to #44021 (comment), we can integrate the profiling results with Spark UI (both live and history) in the future, so it's good to follow the event logs file name pattern and layout as much as possible.

### Does this PR introduce _any_ user-facing change?

No, it's an unreleased feature.

### How was this patch tested?

```
$ bin/spark-submit run-example \
  --master yarn \
  --deploy-mode cluster \
  --conf spark.plugins=org.apache.spark.executor.profiler.ExecutorProfilerPlugin \
  --conf spark.executor.profiling.enabled=true \
  --conf spark.executor.profiling.dfsDir=hdfs:///spark-profiling \
  --conf spark.executor.profiling.fraction=1 \
  SparkPi 100000
```

```
hadoopspark-dev1:~/spark$ hadoop fs -ls /spark-profiling/
Found 1 items
drwxrwx---   - hadoop supergroup          0 2025-01-13 10:29 /spark-profiling/application_1736320707252_0023_1
```
```
hadoopspark-dev1:~/spark$ hadoop fs -ls /spark-profiling/application_1736320707252_0023_1
Found 48 items
-rw-rw----   3 hadoop supergroup    5255028 2025-01-13 10:29 /spark-profiling/application_1736320707252_0023_1/profile-exec-1.jfr
-rw-rw----   3 hadoop supergroup    3840775 2025-01-13 10:29 /spark-profiling/application_1736320707252_0023_1/profile-exec-10.jfr
-rw-rw----   3 hadoop supergroup    3889002 2025-01-13 10:29 /spark-profiling/application_1736320707252_0023_1/profile-exec-11.jfr
-rw-rw----   3 hadoop supergroup    3570697 2025-01-13 10:29 /spark-profiling/application_1736320707252_0023_1/profile-exec-12.jfr
...
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49440 from pan3793/SPARK-50783.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
.createWithDefault(false)

private[profiler] val EXECUTOR_PROFILING_DFS_DIR =
ConfigBuilder("spark.executor.profiling.dfsDir")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun, I'm also investigating adding profiling support for the driver, should I fork all configurations, or simply treat the driver as a special executor and resue those configurations?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, @pan3793 , could you use a new JIRA for your new goal, please?

Although I understand why you adds a comment here, it doesn't look like a good practice to me. Why do we add a comment about driver-related suggestion to the irrelevant executor-related PR like Spark Executor JVM Profiling?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants