Skip to content

Commit efa5e18

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into rest
2 parents e42c131 + d743732 commit efa5e18

File tree

104 files changed

+4388
-2015
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

104 files changed

+4388
-2015
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,9 @@ class SparkHadoopUtil extends Logging {
133133
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
134134
* Returns None if the required method can't be found.
135135
*/
136-
private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
137-
: Option[() => Long] = {
136+
private[spark] def getFSBytesReadOnThreadCallback(): Option[() => Long] = {
138137
try {
139-
val threadStats = getFileSystemThreadStatistics(path, conf)
138+
val threadStats = getFileSystemThreadStatistics()
140139
val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
141140
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
142141
val baselineBytesRead = f()
@@ -156,10 +155,9 @@ class SparkHadoopUtil extends Logging {
156155
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
157156
* Returns None if the required method can't be found.
158157
*/
159-
private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration)
160-
: Option[() => Long] = {
158+
private[spark] def getFSBytesWrittenOnThreadCallback(): Option[() => Long] = {
161159
try {
162-
val threadStats = getFileSystemThreadStatistics(path, conf)
160+
val threadStats = getFileSystemThreadStatistics()
163161
val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
164162
val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
165163
val baselineBytesWritten = f()
@@ -172,10 +170,8 @@ class SparkHadoopUtil extends Logging {
172170
}
173171
}
174172

175-
private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
176-
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
177-
val scheme = qualifiedPath.toUri().getScheme()
178-
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
173+
private def getFileSystemThreadStatistics(): Seq[AnyRef] = {
174+
val stats = FileSystem.getAllStatistics()
179175
stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
180176
}
181177

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,18 @@ object SparkSubmit {
292292
}
293293
}
294294

295+
// Add the application jar automatically so the user doesn't have to call sc.addJar
296+
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
297+
// For python files, the primary resource is already distributed as a regular file
298+
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
299+
if (!isYarnCluster && !args.isPython) {
300+
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
301+
if (isUserJar(args.primaryResource)) {
302+
jars = jars ++ Seq(args.primaryResource)
303+
}
304+
sysProps.put("spark.jars", jars.mkString(","))
305+
}
306+
295307
// In standalone-cluster mode, use Client as a wrapper around the user class
296308
// Note that we won't actually launch this class if we're using the stable REST protocol
297309
if (args.isStandaloneCluster && !args.isRestEnabled) {
@@ -306,18 +318,6 @@ object SparkSubmit {
306318
}
307319
}
308320

309-
// Add the application jar automatically so the user doesn't have to call sc.addJar
310-
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
311-
// For python files, the primary resource is already distributed as a regular file
312-
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
313-
if (!isYarnCluster && !args.isPython) {
314-
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
315-
if (isUserJar(args.primaryResource)) {
316-
jars = jars ++ Seq(args.primaryResource)
317-
}
318-
sysProps.put("spark.jars", jars.mkString(","))
319-
}
320-
321321
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
322322
if (isYarnCluster) {
323323
childMainClass = "org.apache.spark.deploy.yarn.Client"

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,12 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
7474
<ul class="unstyled">
7575
<li><strong>URL:</strong> {state.uri}</li>
7676
{
77-
state.stableUri
78-
.map { uri =>
79-
<li>
80-
<strong>Stable URL:</strong> {uri}
81-
<span class="stable-uri"> (for standalone cluster mode in Spark 1.3+)</span>
82-
</li> }
83-
.getOrElse { Seq.empty }
77+
state.stableUri.map { uri =>
78+
<li>
79+
<strong>Stable URL:</strong> {uri}
80+
<span class="stable-uri"> (for standalone cluster mode in Spark 1.3+)</span>
81+
</li>
82+
}.getOrElse { Seq.empty }
8483
}
8584
<li><strong>Workers:</strong> {state.workers.size}</li>
8685
<li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.executor
1919

2020
import java.util.concurrent.atomic.AtomicLong
2121

22-
import org.apache.spark.executor.DataReadMethod
2322
import org.apache.spark.executor.DataReadMethod.DataReadMethod
2423

2524
import scala.collection.mutable.ArrayBuffer

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.Reporter
3535
import org.apache.hadoop.mapred.JobID
3636
import org.apache.hadoop.mapred.TaskAttemptID
3737
import org.apache.hadoop.mapred.TaskID
38+
import org.apache.hadoop.mapred.lib.CombineFileSplit
3839
import org.apache.hadoop.util.ReflectionUtils
3940

4041
import org.apache.spark._
@@ -218,13 +219,13 @@ class HadoopRDD[K, V](
218219

219220
// Find a function that will return the FileSystem bytes read by this thread. Do this before
220221
// creating RecordReader, because RecordReader's constructor might read some bytes
221-
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
222+
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
222223
split.inputSplit.value match {
223-
case split: FileSplit =>
224-
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf)
224+
case _: FileSplit | _: CombineFileSplit =>
225+
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
225226
case _ => None
226227
}
227-
)
228+
}
228229
inputMetrics.setBytesReadCallback(bytesReadCallback)
229230

230231
var reader: RecordReader[K, V] = null
@@ -254,7 +255,8 @@ class HadoopRDD[K, V](
254255
reader.close()
255256
if (bytesReadCallback.isDefined) {
256257
inputMetrics.updateBytesRead()
257-
} else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
258+
} else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
259+
split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
258260
// If we can't get the bytes read from the FS stats, fall back to the split size,
259261
// which may be inaccurate.
260262
try {

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.reflect.ClassTag
2525
import org.apache.hadoop.conf.{Configurable, Configuration}
2626
import org.apache.hadoop.io.Writable
2727
import org.apache.hadoop.mapreduce._
28-
import org.apache.hadoop.mapreduce.lib.input.FileSplit
28+
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
2929

3030
import org.apache.spark.annotation.DeveloperApi
3131
import org.apache.spark.input.WholeTextFileInputFormat
@@ -34,7 +34,7 @@ import org.apache.spark.Logging
3434
import org.apache.spark.Partition
3535
import org.apache.spark.SerializableWritable
3636
import org.apache.spark.{SparkContext, TaskContext}
37-
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
37+
import org.apache.spark.executor.DataReadMethod
3838
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
3939
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
4040
import org.apache.spark.util.Utils
@@ -114,13 +114,13 @@ class NewHadoopRDD[K, V](
114114

115115
// Find a function that will return the FileSystem bytes read by this thread. Do this before
116116
// creating RecordReader, because RecordReader's constructor might read some bytes
117-
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
117+
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
118118
split.serializableHadoopSplit.value match {
119-
case split: FileSplit =>
120-
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, conf)
119+
case _: FileSplit | _: CombineFileSplit =>
120+
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
121121
case _ => None
122122
}
123-
)
123+
}
124124
inputMetrics.setBytesReadCallback(bytesReadCallback)
125125

126126
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
@@ -163,7 +163,8 @@ class NewHadoopRDD[K, V](
163163
reader.close()
164164
if (bytesReadCallback.isDefined) {
165165
inputMetrics.updateBytesRead()
166-
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
166+
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
167+
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
167168
// If we can't get the bytes read from the FS stats, fall back to the split size,
168169
// which may be inaccurate.
169170
try {

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -990,7 +990,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
990990
val committer = format.getOutputCommitter(hadoopContext)
991991
committer.setupTask(hadoopContext)
992992

993-
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
993+
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
994994

995995
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
996996
try {
@@ -1061,7 +1061,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10611061
// around by taking a mod. We expect that no task will be attempted 2 billion times.
10621062
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
10631063

1064-
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
1064+
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
10651065

10661066
writer.setup(context.stageId, context.partitionId, taskAttemptId)
10671067
writer.open()
@@ -1086,11 +1086,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10861086
writer.commitJob()
10871087
}
10881088

1089-
private def initHadoopOutputMetrics(context: TaskContext, config: Configuration)
1090-
: (OutputMetrics, Option[() => Long]) = {
1091-
val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir"))
1092-
.map(new Path(_))
1093-
.flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config))
1089+
private def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, Option[() => Long]) = {
1090+
val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
10941091
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
10951092
if (bytesWrittenCallback.isDefined) {
10961093
context.taskMetrics.outputMetrics = Some(outputMetrics)

core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestProtocolSuite.scala

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
5353
val resultsFile = File.createTempFile("test-submit", ".txt")
5454
val numbers = Seq(1, 2, 3)
5555
val size = 500
56-
val driverId = submitApp(resultsFile, numbers, size)
56+
val driverId = submitApplication(resultsFile, numbers, size)
5757
waitUntilFinished(driverId)
5858
validateResult(resultsFile, numbers, size)
5959
}
@@ -68,7 +68,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
6868
val resultsFile = File.createTempFile("test-kill", ".txt")
6969
val numbers = Seq(1, 2, 3)
7070
val size = 500
71-
val driverId = submitApp(resultsFile, numbers, size)
71+
val driverId = submitApplication(resultsFile, numbers, size)
7272
val killResponse = client.killDriver(masterRestUrl, driverId)
7373
val killSuccess = killResponse.getFieldNotNull(KillDriverResponseField.SUCCESS)
7474
waitUntilFinished(driverId)
@@ -90,6 +90,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
9090
/**
9191
* Start a local cluster containing one Master and a few Workers.
9292
* Do not use org.apache.spark.deploy.LocalCluster here because we want the REST URL.
93+
* Return the Master's REST URL to which applications should be submitted.
9394
*/
9495
private def startLocalCluster(): String = {
9596
val conf = new SparkConf(false)
@@ -111,10 +112,8 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
111112
masterRestUrl
112113
}
113114

114-
/**
115-
* Submit an application through the stable gateway and return the corresponding driver ID.
116-
*/
117-
private def submitApp(resultsFile: File, numbers: Seq[Int], size: Int): String = {
115+
/** Submit the StandaloneRestApp and return the corresponding driver ID. */
116+
private def submitApplication(resultsFile: File, numbers: Seq[Int], size: Int): String = {
118117
val appArgs = Seq(resultsFile.getAbsolutePath) ++ numbers.map(_.toString) ++ Seq(size.toString)
119118
val commandLineArgs = Array(
120119
"--deploy-mode", "cluster",
@@ -129,10 +128,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
129128
submitResponse.getFieldNotNull(SubmitDriverResponseField.DRIVER_ID)
130129
}
131130

132-
/**
133-
* Wait until the given driver has finished running,
134-
* up to the specified maximum number of seconds.
135-
*/
131+
/** Wait until the given driver has finished running up to the specified timeout. */
136132
private def waitUntilFinished(driverId: String, maxSeconds: Int = 10): Unit = {
137133
var finished = false
138134
val expireTime = System.currentTimeMillis + maxSeconds * 1000
@@ -189,11 +185,11 @@ private object StandaloneRestProtocolSuite {
189185

190186
/**
191187
* Return a list of class files compiled for StandaloneRestApp.
192-
* This includes all the anonymous classes used in StandaloneRestApp#main.
188+
* This includes all the anonymous classes used in the application.
193189
*/
194190
private def getClassFiles: Seq[File] = {
195-
val clazz = StandaloneRestApp.getClass
196191
val className = Utils.getFormattedClassName(StandaloneRestApp)
192+
val clazz = StandaloneRestApp.getClass
197193
val basePath = clazz.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
198194
val baseDir = new File(basePath + "/" + pathPrefix)
199195
baseDir.listFiles().filter(_.getName.contains(className))
@@ -202,22 +198,21 @@ private object StandaloneRestProtocolSuite {
202198

203199
/**
204200
* Sample application to be submitted to the cluster using the stable gateway.
205-
* All relevant classes will be packaged into a jar dynamically and submitted to the cluster.
201+
* All relevant classes will be packaged into a jar at run time.
206202
*/
207203
object StandaloneRestApp {
208204
// Usage: [path to results file] [num1] [num2] [num3] [rddSize]
209205
// The first line of the results file should be (num1 + num2 + num3)
210206
// The second line should be (rddSize / 2) + 1
211207
def main(args: Array[String]) {
212-
assert(args.size == 5)
208+
assert(args.size == 5, s"Expected exactly 5 arguments: ${args.mkString(",")}")
213209
val resultFile = new File(args(0))
214210
val writer = new PrintWriter(resultFile)
215211
try {
216-
val firstLine = args(1).toInt + args(2).toInt + args(3).toInt
217-
val rddSize = args(4).toInt
218212
val conf = new SparkConf()
219213
val sc = new SparkContext(conf)
220-
val secondLine = sc.parallelize(1 to rddSize)
214+
val firstLine = args(1).toInt + args(2).toInt + args(3).toInt
215+
val secondLine = sc.parallelize(1 to args(4).toInt)
221216
.map { i => (i / 2, i) }
222217
.reduceByKey(_ + _)
223218
.count()

0 commit comments

Comments
 (0)