Skip to content

Commit debe072

Browse files
committed
Merge remote-tracking branch 'upstream/master' into dt-spark-3160
2 parents 5c4ac33 + ce59725 commit debe072

File tree

42 files changed

+502
-142
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+502
-142
lines changed

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
162162

163163
// always add the current user and SPARK_USER to the viewAcls
164164
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
165-
Option(System.getenv("SPARK_USER")).getOrElse(""))
165+
Option(System.getenv("SPARK_USER")).getOrElse("")).filter(!_.isEmpty)
166166

167167
setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
168168
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,14 @@ class SparkContext(config: SparkConf) extends Logging {
220220
new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
221221

222222
// Initialize the Spark UI, registering all associated listeners
223-
private[spark] val ui = new SparkUI(this)
224-
ui.bind()
223+
private[spark] val ui: Option[SparkUI] =
224+
if (conf.getBoolean("spark.ui.enabled", true)) {
225+
Some(new SparkUI(this))
226+
} else {
227+
// For tests, do not enable the UI
228+
None
229+
}
230+
ui.foreach(_.bind())
225231

226232
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
227233
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
@@ -990,7 +996,7 @@ class SparkContext(config: SparkConf) extends Logging {
990996
/** Shut down the SparkContext. */
991997
def stop() {
992998
postApplicationEnd()
993-
ui.stop()
999+
ui.foreach(_.stop())
9941000
// Do this only if not stopped already - best case effort.
9951001
// prevent NPE if stopped more than once.
9961002
val dagSchedulerCopy = dagScheduler

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
292292
logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
293293
conf.set("spark.ui.filters", filterName)
294294
conf.set(s"spark.$filterName.params", filterParams)
295-
JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf)
295+
scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
296296
}
297297
}
298298
}

core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.scheduler.cluster
1919

20-
import org.apache.hadoop.conf.Configuration
2120
import org.apache.hadoop.fs.{Path, FileSystem}
2221

2322
import org.apache.spark.{Logging, SparkContext, SparkEnv}
@@ -47,16 +46,17 @@ private[spark] class SimrSchedulerBackend(
4746

4847
val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
4948
val fs = FileSystem.get(conf)
49+
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
5050

5151
logInfo("Writing to HDFS file: " + driverFilePath)
5252
logInfo("Writing Akka address: " + driverUrl)
53-
logInfo("Writing Spark UI Address: " + sc.ui.appUIAddress)
53+
logInfo("Writing Spark UI Address: " + appUIAddress)
5454

5555
// Create temporary file to prevent race condition where executors get empty driverUrl file
5656
val temp = fs.create(tmpPath, true)
5757
temp.writeUTF(driverUrl)
5858
temp.writeInt(maxCores)
59-
temp.writeUTF(sc.ui.appUIAddress)
59+
temp.writeUTF(appUIAddress)
6060
temp.close()
6161

6262
// "Atomic" rename

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ private[spark] class SparkDeploySchedulerBackend(
6767
val javaOpts = sparkJavaOpts ++ extraJavaOpts
6868
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
6969
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
70+
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
71+
val eventLogDir = sc.eventLogger.map(_.logDir)
7072
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
71-
sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
73+
appUIAddress, eventLogDir)
7274

7375
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
7476
client.start()

core/src/test/scala/org/apache/spark/ui/UISuite.scala

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,25 @@ import scala.xml.Node
3636

3737
class UISuite extends FunSuite {
3838

39+
/**
40+
* Create a test SparkContext with the SparkUI enabled.
41+
* It is safe to `get` the SparkUI directly from the SparkContext returned here.
42+
*/
43+
private def newSparkContext(): SparkContext = {
44+
val conf = new SparkConf()
45+
.setMaster("local")
46+
.setAppName("test")
47+
.set("spark.ui.enabled", "true")
48+
val sc = new SparkContext(conf)
49+
assert(sc.ui.isDefined)
50+
sc
51+
}
52+
3953
ignore("basic ui visibility") {
40-
withSpark(new SparkContext("local", "test")) { sc =>
54+
withSpark(newSparkContext()) { sc =>
4155
// test if the ui is visible, and all the expected tabs are visible
4256
eventually(timeout(10 seconds), interval(50 milliseconds)) {
43-
val html = Source.fromURL(sc.ui.appUIAddress).mkString
57+
val html = Source.fromURL(sc.ui.get.appUIAddress).mkString
4458
assert(!html.contains("random data that should not be present"))
4559
assert(html.toLowerCase.contains("stages"))
4660
assert(html.toLowerCase.contains("storage"))
@@ -51,7 +65,7 @@ class UISuite extends FunSuite {
5165
}
5266

5367
ignore("visibility at localhost:4040") {
54-
withSpark(new SparkContext("local", "test")) { sc =>
68+
withSpark(newSparkContext()) { sc =>
5569
// test if visible from http://localhost:4040
5670
eventually(timeout(10 seconds), interval(50 milliseconds)) {
5771
val html = Source.fromURL("http://localhost:4040").mkString
@@ -61,8 +75,8 @@ class UISuite extends FunSuite {
6175
}
6276

6377
ignore("attaching a new tab") {
64-
withSpark(new SparkContext("local", "test")) { sc =>
65-
val sparkUI = sc.ui
78+
withSpark(newSparkContext()) { sc =>
79+
val sparkUI = sc.ui.get
6680

6781
val newTab = new WebUITab(sparkUI, "foo") {
6882
attachPage(new WebUIPage("") {
@@ -73,7 +87,7 @@ class UISuite extends FunSuite {
7387
}
7488
sparkUI.attachTab(newTab)
7589
eventually(timeout(10 seconds), interval(50 milliseconds)) {
76-
val html = Source.fromURL(sc.ui.appUIAddress).mkString
90+
val html = Source.fromURL(sparkUI.appUIAddress).mkString
7791
assert(!html.contains("random data that should not be present"))
7892

7993
// check whether new page exists
@@ -87,7 +101,7 @@ class UISuite extends FunSuite {
87101
}
88102

89103
eventually(timeout(10 seconds), interval(50 milliseconds)) {
90-
val html = Source.fromURL(sc.ui.appUIAddress.stripSuffix("/") + "/foo").mkString
104+
val html = Source.fromURL(sparkUI.appUIAddress.stripSuffix("/") + "/foo").mkString
91105
// check whether new page exists
92106
assert(html.contains("magic"))
93107
}
@@ -129,16 +143,20 @@ class UISuite extends FunSuite {
129143
}
130144

131145
test("verify appUIAddress contains the scheme") {
132-
withSpark(new SparkContext("local", "test")) { sc =>
133-
val uiAddress = sc.ui.appUIAddress
134-
assert(uiAddress.equals("http://" + sc.ui.appUIHostPort))
146+
withSpark(newSparkContext()) { sc =>
147+
val ui = sc.ui.get
148+
val uiAddress = ui.appUIAddress
149+
val uiHostPort = ui.appUIHostPort
150+
assert(uiAddress.equals("http://" + uiHostPort))
135151
}
136152
}
137153

138154
test("verify appUIAddress contains the port") {
139-
withSpark(new SparkContext("local", "test")) { sc =>
140-
val splitUIAddress = sc.ui.appUIAddress.split(':')
141-
assert(splitUIAddress(2).toInt == sc.ui.boundPort)
155+
withSpark(newSparkContext()) { sc =>
156+
val ui = sc.ui.get
157+
val splitUIAddress = ui.appUIAddress.split(':')
158+
val boundPort = ui.boundPort
159+
assert(splitUIAddress(2).toInt == boundPort)
142160
}
143161
}
144162
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,7 @@
899899
<java.awt.headless>true</java.awt.headless>
900900
<spark.test.home>${session.executionRootDirectory}</spark.test.home>
901901
<spark.testing>1</spark.testing>
902-
<spark.ui.port>0</spark.ui.port>
902+
<spark.ui.enabled>false</spark.ui.enabled>
903903
</systemProperties>
904904
</configuration>
905905
<executions>

project/SparkBuild.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ object TestSettings {
337337
javaOptions in Test += "-Dspark.test.home=" + sparkHome,
338338
javaOptions in Test += "-Dspark.testing=1",
339339
javaOptions in Test += "-Dspark.ports.maxRetries=100",
340-
javaOptions in Test += "-Dspark.ui.port=0",
340+
javaOptions in Test += "-Dspark.ui.enabled=false",
341341
javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
342342
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
343343
.map { case (k,v) => s"-D$k=$v" }.toSeq,

python/pyspark/context.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,12 +331,16 @@ def pickleFile(self, name, minPartitions=None):
331331
return RDD(self._jsc.objectFile(name, minPartitions), self,
332332
BatchedSerializer(PickleSerializer()))
333333

334-
def textFile(self, name, minPartitions=None):
334+
def textFile(self, name, minPartitions=None, use_unicode=True):
335335
"""
336336
Read a text file from HDFS, a local file system (available on all
337337
nodes), or any Hadoop-supported file system URI, and return it as an
338338
RDD of Strings.
339339
340+
If use_unicode is False, the strings will be kept as `str` (encoding
341+
as `utf-8`), which is faster and smaller than unicode. (Added in
342+
Spark 1.2)
343+
340344
>>> path = os.path.join(tempdir, "sample-text.txt")
341345
>>> with open(path, "w") as testFile:
342346
... testFile.write("Hello world!")
@@ -346,16 +350,20 @@ def textFile(self, name, minPartitions=None):
346350
"""
347351
minPartitions = minPartitions or min(self.defaultParallelism, 2)
348352
return RDD(self._jsc.textFile(name, minPartitions), self,
349-
UTF8Deserializer())
353+
UTF8Deserializer(use_unicode))
350354

351-
def wholeTextFiles(self, path, minPartitions=None):
355+
def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):
352356
"""
353357
Read a directory of text files from HDFS, a local file system
354358
(available on all nodes), or any Hadoop-supported file system
355359
URI. Each file is read as a single record and returned in a
356360
key-value pair, where the key is the path of each file, the
357361
value is the content of each file.
358362
363+
If use_unicode is False, the strings will be kept as `str` (encoding
364+
as `utf-8`), which is faster and smaller than unicode. (Added in
365+
Spark 1.2)
366+
359367
For example, if you have the following files::
360368
361369
hdfs://a-hdfs-path/part-00000
@@ -386,7 +394,7 @@ def wholeTextFiles(self, path, minPartitions=None):
386394
"""
387395
minPartitions = minPartitions or self.defaultMinPartitions
388396
return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
389-
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
397+
PairDeserializer(UTF8Deserializer(use_unicode), UTF8Deserializer(use_unicode)))
390398

391399
def _dictToJavaMap(self, d):
392400
jm = self._jvm.java.util.HashMap()

python/pyspark/serializers.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -429,18 +429,22 @@ class UTF8Deserializer(Serializer):
429429
Deserializes streams written by String.getBytes.
430430
"""
431431

432+
def __init__(self, use_unicode=False):
433+
self.use_unicode = use_unicode
434+
432435
def loads(self, stream):
433436
length = read_int(stream)
434-
return stream.read(length).decode('utf8')
437+
s = stream.read(length)
438+
return s.decode("utf-8") if self.use_unicode else s
435439

436440
def load_stream(self, stream):
437-
while True:
438-
try:
441+
try:
442+
while True:
439443
yield self.loads(stream)
440-
except struct.error:
441-
return
442-
except EOFError:
443-
return
444+
except struct.error:
445+
return
446+
except EOFError:
447+
return
444448

445449

446450
def read_long(stream):

0 commit comments

Comments
 (0)