-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-50783] Canonicalize JVM profiler results file name and layout on DFS #49440
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,17 +17,17 @@ | |
| package org.apache.spark.executor.profiler | ||
|
|
||
| import java.io.{BufferedInputStream, FileInputStream, InputStream, IOException} | ||
| import java.net.URI | ||
| import java.util.concurrent.{ScheduledExecutorService, TimeUnit} | ||
|
|
||
| import one.profiler.{AsyncProfiler, AsyncProfilerLoader} | ||
| import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} | ||
| import org.apache.hadoop.fs.permission.FsPermission | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.deploy.SparkHadoopUtil | ||
| import org.apache.spark.internal.{Logging, MDC} | ||
| import org.apache.spark.internal.LogKeys.PATH | ||
| import org.apache.spark.util.ThreadUtils | ||
| import org.apache.spark.util.{ThreadUtils, Utils} | ||
|
|
||
|
|
||
| /** | ||
|
|
@@ -38,15 +38,26 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex | |
| private var running = false | ||
| private val enableProfiler = conf.get(EXECUTOR_PROFILING_ENABLED) | ||
| private val profilerOptions = conf.get(EXECUTOR_PROFILING_OPTIONS) | ||
| private val profilerDfsDir = conf.get(EXECUTOR_PROFILING_DFS_DIR) | ||
| private val profilerDfsDirOpt = conf.get(EXECUTOR_PROFILING_DFS_DIR) | ||
| private val profilerLocalDir = conf.get(EXECUTOR_PROFILING_LOCAL_DIR) | ||
| private val writeInterval = conf.get(EXECUTOR_PROFILING_WRITE_INTERVAL) | ||
|
|
||
| private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr" | ||
| private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr" | ||
| private val dumpcmd = s"dump,$profilerOptions,file=$profilerLocalDir/profile.jfr" | ||
| private val resumecmd = s"resume,$profilerOptions,file=$profilerLocalDir/profile.jfr" | ||
| private val appId = try { | ||
| conf.getAppId | ||
| } catch { | ||
| case _: NoSuchElementException => "local-" + System.currentTimeMillis | ||
|
||
| } | ||
| private val appAttemptId = conf.getOption("spark.app.attempt.id") | ||
| private val baseName = Utils.nameForAppAndAttempt(appId, appAttemptId) | ||
| private val profileFile = s"profile-exec-$executorId.jfr" | ||
|
|
||
| private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/$profileFile" | ||
| private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/$profileFile" | ||
| private val dumpcmd = s"dump,$profilerOptions,file=$profilerLocalDir/$profileFile" | ||
| private val resumecmd = s"resume,$profilerOptions,file=$profilerLocalDir/$profileFile" | ||
|
|
||
| private val PROFILER_FOLDER_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) | ||
| private val PROFILER_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("660", 8).toShort) | ||
|
||
| private val UPLOAD_SIZE = 8 * 1024 * 1024 // 8 MB | ||
| private var outputStream: FSDataOutputStream = _ | ||
| private var inputStream: InputStream = _ | ||
|
|
@@ -89,28 +100,34 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex | |
| } | ||
| } | ||
|
|
||
| private def requireProfilerBaseDirAsDirectory(fs: FileSystem, profilerDfsDir: String): Unit = { | ||
| if (!fs.getFileStatus(new Path(profilerDfsDir)).isDirectory) { | ||
| throw new IllegalArgumentException( | ||
| s"Profiler DFS base directory $profilerDfsDir is not a directory.") | ||
| } | ||
| } | ||
|
|
||
| private def startWriting(): Unit = { | ||
| if (profilerDfsDir.isDefined) { | ||
| val applicationId = try { | ||
| conf.getAppId | ||
| } catch { | ||
| case _: NoSuchElementException => "local-" + System.currentTimeMillis | ||
| profilerDfsDirOpt.foreach { profilerDfsDir => | ||
| val profilerDirForApp = s"$profilerDfsDir/$baseName" | ||
| val profileOutputFile = s"$profilerDirForApp/$profileFile" | ||
|
|
||
| val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) | ||
| val fs = Utils.getHadoopFileSystem(profilerDfsDir, hadoopConf) | ||
|
|
||
| requireProfilerBaseDirAsDirectory(fs, profilerDfsDir) | ||
|
|
||
| val profilerDirForAppPath = new Path(profilerDirForApp) | ||
| if (!fs.exists(profilerDirForAppPath)) { | ||
| // SPARK-30860: use the class method to avoid the umask causing permission issues | ||
| FileSystem.mkdirs(fs, profilerDirForAppPath, PROFILER_FOLDER_PERMISSIONS) | ||
|
||
| } | ||
| val config = SparkHadoopUtil.get.newConfiguration(conf) | ||
| val appName = conf.get("spark.app.name").replace(" ", "-") | ||
| val profilerOutputDirname = profilerDfsDir.get | ||
|
|
||
| val profileOutputFile = | ||
| s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId.jfr" | ||
| val fs = FileSystem.get(new URI(profileOutputFile), config); | ||
| val filenamePath = new Path(profileOutputFile) | ||
| outputStream = fs.create(filenamePath) | ||
|
|
||
| outputStream = FileSystem.create(fs, new Path(profileOutputFile), PROFILER_FILE_PERMISSIONS) | ||
| try { | ||
| if (fs.exists(filenamePath)) { | ||
| fs.delete(filenamePath, true) | ||
| } | ||
| logInfo(log"Copying executor profiling file to ${MDC(PATH, profileOutputFile)}") | ||
| inputStream = new BufferedInputStream(new FileInputStream(s"$profilerLocalDir/profile.jfr")) | ||
| inputStream = new BufferedInputStream( | ||
| new FileInputStream(s"$profilerLocalDir/$profileFile")) | ||
| threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread") | ||
| threadpool.scheduleWithFixedDelay( | ||
| new Runnable() { | ||
|
|
@@ -158,14 +175,14 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex | |
| } catch { | ||
| case e: IOException => logError("Exception occurred while writing some profiler output: ", e) | ||
| case e @ (_: IllegalArgumentException | _: IllegalStateException) => | ||
| logError("Some profiler output not written." + | ||
| " Exception occurred in profiler native code: ", e) | ||
| logError("Some profiler output not written. " + | ||
| "Exception occurred in profiler native code: ", e) | ||
| case e: Exception => logError("Some profiler output not written. Unexpected exception: ", e) | ||
| } | ||
| } | ||
|
|
||
| private def finishWriting(): Unit = { | ||
| if (profilerDfsDir.isDefined && writing) { | ||
| if (profilerDfsDirOpt.isDefined && writing) { | ||
| try { | ||
| // shutdown background writer | ||
| threadpool.shutdown() | ||
|
|
@@ -177,8 +194,8 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex | |
| } catch { | ||
| case _: InterruptedException => Thread.currentThread().interrupt() | ||
| case e: IOException => | ||
| logWarning("Some profiling output not written." + | ||
| "Exception occurred while completing profiler output", e) | ||
| logWarning("Some profiling output not written. " + | ||
| "Exception occurred while completing profiler output: ", e) | ||
| } | ||
| writing = false | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer using tag for consistency