diff --git a/connector/profiler/README.md b/connector/profiler/README.md index 1326fd55df09..4d97b15eb96a 100644 --- a/connector/profiler/README.md +++ b/connector/profiler/README.md @@ -16,7 +16,7 @@ The profiler writes the jfr files to the executor's working directory in the exe Code profiling is currently only supported for * Linux (x64) -* Linux (arm 64) +* Linux (arm64) * Linux (musl, x64) * MacOS @@ -54,7 +54,7 @@ Then enable the profiling in the configuration. spark.executor.profiling.dfsDir (none) - An HDFS compatible path to which the profiler's output files are copied. The output files will be written as dfsDir/application_id/profile-appname-exec-executor_id.jfr
+ An HDFS compatible path to which the profiler's output files are copied. The output files will be written as dfsDir/{{APP_ID}}/profile-exec-{{EXECUTOR_ID}}.jfr
If no dfsDir is specified then the files are not copied over. Users should ensure there is sufficient disk space available otherwise it may lead to corrupt jfr files. 4.0.0 @@ -72,7 +72,7 @@ Then enable the profiling in the configuration. event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s Options to pass to the profiler. Detailed options are documented in the comments here: - Profiler arguments. + Profiler arguments. Note that the options to start, stop, specify output format, and output file do not have to be specified. 4.0.0 diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala index 20b6db5221fa..94e5b46c6588 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala @@ -17,17 +17,17 @@ package org.apache.spark.executor.profiler import java.io.{BufferedInputStream, FileInputStream, InputStream, IOException} -import java.net.URI import java.util.concurrent.{ScheduledExecutorService, TimeUnit} import one.profiler.{AsyncProfiler, AsyncProfilerLoader} import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.PATH -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{ThreadUtils, Utils} /** @@ -38,15 +38,26 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex private var running = false private val enableProfiler = conf.get(EXECUTOR_PROFILING_ENABLED) private val profilerOptions = conf.get(EXECUTOR_PROFILING_OPTIONS) - private val profilerDfsDir = conf.get(EXECUTOR_PROFILING_DFS_DIR) + private val profilerDfsDirOpt = conf.get(EXECUTOR_PROFILING_DFS_DIR) private val profilerLocalDir = conf.get(EXECUTOR_PROFILING_LOCAL_DIR) private val writeInterval = conf.get(EXECUTOR_PROFILING_WRITE_INTERVAL) - private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr" - private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr" - private val dumpcmd = s"dump,$profilerOptions,file=$profilerLocalDir/profile.jfr" - private val resumecmd = s"resume,$profilerOptions,file=$profilerLocalDir/profile.jfr" + private val appId = try { + conf.getAppId + } catch { + case _: NoSuchElementException => "local-" + System.currentTimeMillis + } + private val appAttemptId = conf.getOption("spark.app.attempt.id") + private val baseName = Utils.nameForAppAndAttempt(appId, appAttemptId) + private val profileFile = s"profile-exec-$executorId.jfr" + + private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/$profileFile" + private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/$profileFile" + private val dumpcmd = s"dump,$profilerOptions,file=$profilerLocalDir/$profileFile" + private val resumecmd = s"resume,$profilerOptions,file=$profilerLocalDir/$profileFile" + private val PROFILER_FOLDER_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + private val PROFILER_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("660", 8).toShort) private val UPLOAD_SIZE = 8 * 1024 * 1024 // 8 MB private var outputStream: FSDataOutputStream = _ private var inputStream: InputStream = _ @@ -89,28 +100,34 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex } } + private def requireProfilerBaseDirAsDirectory(fs: FileSystem, profilerDfsDir: String): Unit = { + if (!fs.getFileStatus(new Path(profilerDfsDir)).isDirectory) { + throw new IllegalArgumentException( + s"Profiler DFS base directory $profilerDfsDir is not a directory.") + } + } + private def startWriting(): Unit = { - if (profilerDfsDir.isDefined) { - val applicationId = try { - conf.getAppId - } catch { - case _: NoSuchElementException => "local-" + System.currentTimeMillis + profilerDfsDirOpt.foreach { profilerDfsDir => + val profilerDirForApp = s"$profilerDfsDir/$baseName" + val profileOutputFile = s"$profilerDirForApp/$profileFile" + + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val fs = Utils.getHadoopFileSystem(profilerDfsDir, hadoopConf) + + requireProfilerBaseDirAsDirectory(fs, profilerDfsDir) + + val profilerDirForAppPath = new Path(profilerDirForApp) + if (!fs.exists(profilerDirForAppPath)) { + // SPARK-30860: use the class method to avoid the umask causing permission issues + FileSystem.mkdirs(fs, profilerDirForAppPath, PROFILER_FOLDER_PERMISSIONS) } - val config = SparkHadoopUtil.get.newConfiguration(conf) - val appName = conf.get("spark.app.name").replace(" ", "-") - val profilerOutputDirname = profilerDfsDir.get - - val profileOutputFile = - s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId.jfr" - val fs = FileSystem.get(new URI(profileOutputFile), config); - val filenamePath = new Path(profileOutputFile) - outputStream = fs.create(filenamePath) + + outputStream = FileSystem.create(fs, new Path(profileOutputFile), PROFILER_FILE_PERMISSIONS) try { - if (fs.exists(filenamePath)) { - fs.delete(filenamePath, true) - } logInfo(log"Copying executor profiling file to ${MDC(PATH, profileOutputFile)}") - inputStream = new BufferedInputStream(new FileInputStream(s"$profilerLocalDir/profile.jfr")) + inputStream = new BufferedInputStream( + new FileInputStream(s"$profilerLocalDir/$profileFile")) threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread") threadpool.scheduleWithFixedDelay( new Runnable() { @@ -158,14 +175,14 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex } catch { case e: IOException => logError("Exception occurred while writing some profiler output: ", e) case e @ (_: IllegalArgumentException | _: IllegalStateException) => - logError("Some profiler output not written." + - " Exception occurred in profiler native code: ", e) + logError("Some profiler output not written. " + + "Exception occurred in profiler native code: ", e) case e: Exception => logError("Some profiler output not written. Unexpected exception: ", e) } } private def finishWriting(): Unit = { - if (profilerDfsDir.isDefined && writing) { + if (profilerDfsDirOpt.isDefined && writing) { try { // shutdown background writer threadpool.shutdown() @@ -177,8 +194,8 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex } catch { case _: InterruptedException => Thread.currentThread().interrupt() case e: IOException => - logWarning("Some profiling output not written." + - "Exception occurred while completing profiler output", e) + logWarning("Some profiling output not written. " + + "Exception occurred while completing profiler output: ", e) } writing = false }