Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
private[spark] def getCallSite(): CallSite = {
Option(getLocalProperty("externalCallSite")) match {
case Some(callSite) => CallSite(callSite, long = "")
case Some(callSite) => CallSite(callSite, longForm = "")
case None => Utils.getCallSite
}
}
Expand All @@ -1059,11 +1059,12 @@ class SparkContext(config: SparkConf) extends Logging {
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.short)
logInfo("Starting job: " + callSite.shortForm)
val start = System.nanoTime
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
logInfo(
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
}

Expand Down Expand Up @@ -1144,11 +1145,12 @@ class SparkContext(config: SparkConf) extends Logging {
evaluator: ApproximateEvaluator[U, R],
timeout: Long): PartialResult[R] = {
val callSite = getCallSite
logInfo("Starting job: " + callSite.short)
logInfo("Starting job: " + callSite.shortForm)
val start = System.nanoTime
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
localProperties.get)
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
logInfo(
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,7 @@ abstract class RDD[T: ClassTag](

/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
@transient private[spark] val creationSite = Utils.getCallSite
private[spark] def getCreationSite: String = Option(creationSite).map(_.short).getOrElse("")
private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")

private[spark] def elementClassTag: ClassTag[T] = classTag[T]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ class DAGScheduler(
waiter.awaitResult() match {
case JobSucceeded => {}
case JobFailed(exception: Exception) =>
logInfo("Failed to run " + callSite.short)
logInfo("Failed to run " + callSite.shortForm)
throw exception
}
}
Expand Down Expand Up @@ -679,7 +679,7 @@ class DAGScheduler(
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
job.jobId, callSite.short, partitions.length, allowLocal))
job.jobId, callSite.shortForm, partitions.length, allowLocal))
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ private[spark] class Stage(

def attemptId: Int = nextAttemptId

val name = callSite.short
val details = callSite.long
val name = callSite.shortForm
val details = callSite.longForm

override def toString = "Stage " + id

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}

/** CallSite represents a place in user code. It can have a short and a long form. */
private[spark] case class CallSite(short: String, long: String)
private[spark] case class CallSite(shortForm: String, longForm: String)

/**
* Various utility methods used by Spark.
Expand Down Expand Up @@ -848,8 +848,8 @@ private[spark] object Utils extends Logging {
}
val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt
CallSite(
short = "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine),
long = callStack.take(callStackDepth).mkString("\n"))
shortForm = "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine),
longForm = callStack.take(callStackDepth).mkString("\n"))
}

/** Return a string containing part of a file from byte 'start' to 'end'. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ package object testPackage extends Assertions {
def runCallSiteTest(sc: SparkContext) {
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2)
val rddCreationSite = rdd.getCreationSite
val curCallSite = sc.getCallSite().short // note: 2 lines after definition of "rdd"
val curCallSite = sc.getCallSite().shortForm // note: 2 lines after definition of "rdd"

val rddCreationLine = rddCreationSite match {
case CALL_SITE_REGEX(func, file, line) => {
Expand Down