From 3c2d53bb8e88d9aeb691066f1b0b93437f6c928c Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 10 Jan 2025 02:57:24 +0800 Subject: [PATCH 1/2] [SPARK-50783][CORE] Canonicalize JVM profiler results file name and layout on DFS --- connector/profiler/README.md | 6 +- .../profiler/ExecutorJVMProfiler.scala | 77 +++++++++++-------- 2 files changed, 50 insertions(+), 33 deletions(-) diff --git a/connector/profiler/README.md b/connector/profiler/README.md index 1326fd55df09..79359be591a8 100644 --- a/connector/profiler/README.md +++ b/connector/profiler/README.md @@ -16,7 +16,7 @@ The profiler writes the jfr files to the executor's working directory in the exe Code profiling is currently only supported for * Linux (x64) -* Linux (arm 64) +* Linux (arm64) * Linux (musl, x64) * MacOS @@ -54,7 +54,7 @@ Then enable the profiling in the configuration. spark.executor.profiling.dfsDir (none) - An HDFS compatible path to which the profiler's output files are copied. The output files will be written as dfsDir/application_id/profile-appname-exec-executor_id.jfr
+ An HDFS compatible path to which the profiler's output files are copied. The output files will be written as dfsDir/{{APP_ID}}/profile-{{APP_ID}}-exec-{{EXECUTOR_ID}}.jfr
If no dfsDir is specified then the files are not copied over. Users should ensure there is sufficient disk space available otherwise it may lead to corrupt jfr files. 4.0.0 @@ -72,7 +72,7 @@ Then enable the profiling in the configuration. event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s Options to pass to the profiler. Detailed options are documented in the comments here: - Profiler arguments. + Profiler arguments. Note that the options to start, stop, specify output format, and output file do not have to be specified. 4.0.0 diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala index 20b6db5221fa..0f0185e6a7f0 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala @@ -17,17 +17,17 @@ package org.apache.spark.executor.profiler import java.io.{BufferedInputStream, FileInputStream, InputStream, IOException} -import java.net.URI import java.util.concurrent.{ScheduledExecutorService, TimeUnit} import one.profiler.{AsyncProfiler, AsyncProfilerLoader} import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.PATH -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{ThreadUtils, Utils} /** @@ -38,15 +38,26 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex private var running = false private val enableProfiler = conf.get(EXECUTOR_PROFILING_ENABLED) private val profilerOptions = conf.get(EXECUTOR_PROFILING_OPTIONS) - private val profilerDfsDir = conf.get(EXECUTOR_PROFILING_DFS_DIR) + private val profilerDfsDirOpt = conf.get(EXECUTOR_PROFILING_DFS_DIR) private val profilerLocalDir = conf.get(EXECUTOR_PROFILING_LOCAL_DIR) private val writeInterval = conf.get(EXECUTOR_PROFILING_WRITE_INTERVAL) - private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr" - private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr" - private val dumpcmd = s"dump,$profilerOptions,file=$profilerLocalDir/profile.jfr" - private val resumecmd = s"resume,$profilerOptions,file=$profilerLocalDir/profile.jfr" + private val appId = try { + conf.getAppId + } catch { + case _: NoSuchElementException => "local-" + System.currentTimeMillis + } + private val appAttemptId = conf.getOption("spark.app.attempt.id") + private val baseName = Utils.nameForAppAndAttempt(appId, appAttemptId) + private val profileFile = s"profile-$baseName-exec-$executorId.jfr" + + private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/$profileFile" + private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/$profileFile" + private val dumpcmd = s"dump,$profilerOptions,file=$profilerLocalDir/$profileFile" + private val resumecmd = s"resume,$profilerOptions,file=$profilerLocalDir/$profileFile" + private val PROFILER_FOLDER_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + private val PROFILER_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("660", 8).toShort) private val UPLOAD_SIZE = 8 * 1024 * 1024 // 8 MB private var outputStream: FSDataOutputStream = _ private var inputStream: InputStream = _ @@ -89,28 +100,34 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex } } + private def requireProfilerBaseDirAsDirectory(fs: FileSystem, profilerDfsDir: String): Unit = { + if (!fs.getFileStatus(new Path(profilerDfsDir)).isDirectory) { + throw new IllegalArgumentException( + s"Profiler DFS base directory $profilerDfsDir is not a directory.") + } + } + private def startWriting(): Unit = { - if (profilerDfsDir.isDefined) { - val applicationId = try { - conf.getAppId - } catch { - case _: NoSuchElementException => "local-" + System.currentTimeMillis + profilerDfsDirOpt.foreach { profilerDfsDir => + val profilerDirForApp = s"$profilerDfsDir/$baseName" + val profileOutputFile = s"$profilerDirForApp/$profileFile" + + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val fs = Utils.getHadoopFileSystem(profilerDfsDir, hadoopConf) + + requireProfilerBaseDirAsDirectory(fs, profilerDfsDir) + + val profilerDirForAppPath = new Path(profilerDirForApp) + if (!fs.exists(profilerDirForAppPath)) { + // SPARK-30860: use the class method to avoid the umask causing permission issues + FileSystem.mkdirs(fs, profilerDirForAppPath, PROFILER_FOLDER_PERMISSIONS) } - val config = SparkHadoopUtil.get.newConfiguration(conf) - val appName = conf.get("spark.app.name").replace(" ", "-") - val profilerOutputDirname = profilerDfsDir.get - - val profileOutputFile = - s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId.jfr" - val fs = FileSystem.get(new URI(profileOutputFile), config); - val filenamePath = new Path(profileOutputFile) - outputStream = fs.create(filenamePath) + + outputStream = FileSystem.create(fs, new Path(profileOutputFile), PROFILER_FILE_PERMISSIONS) try { - if (fs.exists(filenamePath)) { - fs.delete(filenamePath, true) - } logInfo(log"Copying executor profiling file to ${MDC(PATH, profileOutputFile)}") - inputStream = new BufferedInputStream(new FileInputStream(s"$profilerLocalDir/profile.jfr")) + inputStream = new BufferedInputStream( + new FileInputStream(s"$profilerLocalDir/$profileFile")) threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread") threadpool.scheduleWithFixedDelay( new Runnable() { @@ -158,14 +175,14 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex } catch { case e: IOException => logError("Exception occurred while writing some profiler output: ", e) case e @ (_: IllegalArgumentException | _: IllegalStateException) => - logError("Some profiler output not written." + - " Exception occurred in profiler native code: ", e) + logError("Some profiler output not written. " + + "Exception occurred in profiler native code: ", e) case e: Exception => logError("Some profiler output not written. Unexpected exception: ", e) } } private def finishWriting(): Unit = { - if (profilerDfsDir.isDefined && writing) { + if (profilerDfsDirOpt.isDefined && writing) { try { // shutdown background writer threadpool.shutdown() @@ -177,8 +194,8 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex } catch { case _: InterruptedException => Thread.currentThread().interrupt() case e: IOException => - logWarning("Some profiling output not written." + - "Exception occurred while completing profiler output", e) + logWarning("Some profiling output not written. " + + "Exception occurred while completing profiler output: ", e) } writing = false } From fee48dd02b5cb229667bd450de485c683ecbb42b Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 13 Jan 2025 10:32:46 +0800 Subject: [PATCH 2/2] address comments --- connector/profiler/README.md | 2 +- .../apache/spark/executor/profiler/ExecutorJVMProfiler.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/profiler/README.md b/connector/profiler/README.md index 79359be591a8..4d97b15eb96a 100644 --- a/connector/profiler/README.md +++ b/connector/profiler/README.md @@ -54,7 +54,7 @@ Then enable the profiling in the configuration. spark.executor.profiling.dfsDir (none) - An HDFS compatible path to which the profiler's output files are copied. The output files will be written as dfsDir/{{APP_ID}}/profile-{{APP_ID}}-exec-{{EXECUTOR_ID}}.jfr
+ An HDFS compatible path to which the profiler's output files are copied. The output files will be written as dfsDir/{{APP_ID}}/profile-exec-{{EXECUTOR_ID}}.jfr
If no dfsDir is specified then the files are not copied over. Users should ensure there is sufficient disk space available otherwise it may lead to corrupt jfr files. 4.0.0 diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala index 0f0185e6a7f0..94e5b46c6588 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala @@ -49,7 +49,7 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex } private val appAttemptId = conf.getOption("spark.app.attempt.id") private val baseName = Utils.nameForAppAndAttempt(appId, appAttemptId) - private val profileFile = s"profile-$baseName-exec-$executorId.jfr" + private val profileFile = s"profile-exec-$executorId.jfr" private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/$profileFile" private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/$profileFile"