", actorSystem, master, serializer, 1200, conf, securityMgr)
+ store.putSingle(rdd(0, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(1, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ // Access rdd_1_0 to ensure it's not least recently used.
+ assert(store.getSingle(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
+ // According to the same-RDD rule, rdd_1_0 should be replaced here.
+ store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ // rdd_1_0 should have been replaced, even it's not least recently used.
+ assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
+ assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
+ assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store")
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
new file mode 100644
index 0000000000000..bcf138b5ee6d0
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+import org.apache.spark.{SharedSparkContext, SparkConf, LocalSparkContext, SparkContext}
+
+
+class FlatmapIteratorSuite extends FunSuite with LocalSparkContext {
+ /* Tests the ability of Spark to deal with user provided iterators from flatMap
+ * calls, that may generate more data then available memory. In any
+ * memory based persistance Spark will unroll the iterator into an ArrayBuffer
+ * for caching, however in the case that the use defines DISK_ONLY persistance,
+ * the iterator will be fed directly to the serializer and written to disk.
+ *
+ * This also tests the ObjectOutputStream reset rate. When serializing using the
+ * Java serialization system, the serializer caches objects to prevent writing redundant
+ * data, however that stops GC of those objects. By calling 'reset' you flush that
+ * info from the serializer, and allow old objects to be GC'd
+ */
+ test("Flatmap Iterator to Disk") {
+ val sconf = new SparkConf().setMaster("local").setAppName("iterator_to_disk_test")
+ sc = new SparkContext(sconf)
+ val expand_size = 100
+ val data = sc.parallelize((1 to 5).toSeq).
+ flatMap( x => Stream.range(0, expand_size))
+ var persisted = data.persist(StorageLevel.DISK_ONLY)
+ assert(persisted.count()===500)
+ assert(persisted.filter(_==1).count()===5)
+ }
+
+ test("Flatmap Iterator to Memory") {
+ val sconf = new SparkConf().setMaster("local").setAppName("iterator_to_disk_test")
+ sc = new SparkContext(sconf)
+ val expand_size = 100
+ val data = sc.parallelize((1 to 5).toSeq).
+ flatMap(x => Stream.range(0, expand_size))
+ var persisted = data.persist(StorageLevel.MEMORY_ONLY)
+ assert(persisted.count()===500)
+ assert(persisted.filter(_==1).count()===5)
+ }
+
+ test("Serializer Reset") {
+ val sconf = new SparkConf().setMaster("local").setAppName("serializer_reset_test")
+ .set("spark.serializer.objectStreamReset", "10")
+ sc = new SparkContext(sconf)
+ val expand_size = 500
+ val data = sc.parallelize(Seq(1,2)).
+ flatMap(x => Stream.range(1, expand_size).
+ map(y => "%d: string test %d".format(y,x)))
+ var persisted = data.persist(StorageLevel.MEMORY_ONLY_SER)
+ assert(persisted.filter(_.startsWith("1:")).count()===2)
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 20ebb1897e6ba..30415814adbba 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -24,6 +24,8 @@ import scala.util.{Failure, Success, Try}
import org.eclipse.jetty.server.Server
import org.scalatest.FunSuite
+import org.apache.spark.SparkConf
+
class UISuite extends FunSuite {
test("jetty port increases under contention") {
val startPort = 4040
@@ -34,15 +36,17 @@ class UISuite extends FunSuite {
case Failure(e) =>
// Either case server port is busy hence setup for test complete
}
- val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq())
- val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq())
+ val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq(),
+ new SparkConf)
+ val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq(),
+ new SparkConf)
// Allow some wiggle room in case ports on the machine are under contention
assert(boundPort1 > startPort && boundPort1 < startPort + 10)
assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10)
}
test("jetty binds to port 0 correctly") {
- val (jettyServer, boundPort) = JettyUtils.startJettyServer("0.0.0.0", 0, Seq())
+ val (jettyServer, boundPort) = JettyUtils.startJettyServer("0.0.0.0", 0, Seq(), new SparkConf)
assert(jettyServer.getState === "STARTED")
assert(boundPort != 0)
Try {new ServerSocket(boundPort)} match {
diff --git a/docker/README.md b/docker/README.md
index bf59e77d111f9..40ba9c3065946 100644
--- a/docker/README.md
+++ b/docker/README.md
@@ -2,4 +2,6 @@ Spark docker files
===========
Drawn from Matt Massie's docker files (https://github.com/massie/dockerfiles),
-as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker).
\ No newline at end of file
+as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker).
+
+Tested with Docker version 0.8.1.
diff --git a/docker/spark-test/master/default_cmd b/docker/spark-test/master/default_cmd
index a5b1303c2ebdb..5a7da3446f6d2 100755
--- a/docker/spark-test/master/default_cmd
+++ b/docker/spark-test/master/default_cmd
@@ -19,4 +19,10 @@
IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
echo "CONTAINER_IP=$IP"
-/opt/spark/spark-class org.apache.spark.deploy.master.Master -i $IP
+export SPARK_LOCAL_IP=$IP
+export SPARK_PUBLIC_DNS=$IP
+
+# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
+umount /etc/hosts
+
+/opt/spark/bin/spark-class org.apache.spark.deploy.master.Master -i $IP
diff --git a/docker/spark-test/worker/default_cmd b/docker/spark-test/worker/default_cmd
index ab6336f70c1c6..31b06cb0eb047 100755
--- a/docker/spark-test/worker/default_cmd
+++ b/docker/spark-test/worker/default_cmd
@@ -19,4 +19,10 @@
IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
echo "CONTAINER_IP=$IP"
-/opt/spark/spark-class org.apache.spark.deploy.worker.Worker $1
+export SPARK_LOCAL_IP=$IP
+export SPARK_PUBLIC_DNS=$IP
+
+# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
+umount /etc/hosts
+
+/opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker $1
diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md
index a982c4dbac7d4..d3bc34e68b240 100644
--- a/docs/building-with-maven.md
+++ b/docs/building-with-maven.md
@@ -56,7 +56,7 @@ Tests are run by default via the [ScalaTest Maven plugin](http://www.scalatest.o
The ScalaTest plugin also supports running only a specific test suite as follows:
- $ mvn -Dhadoop.version=... -Dsuites=spark.repl.ReplSuite test
+ $ mvn -Dhadoop.version=... -Dsuites=org.apache.spark.repl.ReplSuite test
## Continuous Compilation ##
diff --git a/docs/configuration.md b/docs/configuration.md
index dc5553f3da770..a006224d5080c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -147,6 +147,34 @@ Apart from these, the following properties are also available, and may be useful
How many stages the Spark UI remembers before garbage collecting.
+
+ | spark.ui.filters |
+ None |
+
+ Comma separated list of filter class names to apply to the Spark web ui. The filter should be a
+ standard javax servlet Filter. Parameters to each filter can also be specified by setting a
+ java system property of spark.<class name of filter>.params='param1=value1,param2=value2'
+ (e.g.-Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params='param1=foo,param2=testing')
+ |
+
+
+ | spark.ui.acls.enable |
+ false |
+
+ Whether spark web ui acls should are enabled. If enabled, this checks to see if the user has
+ access permissions to view the web ui. See spark.ui.view.acls for more details.
+ Also note this requires the user to be known, if the user comes across as null no checks
+ are done. Filters can be used to authenticate and set the user.
+ |
+
+
+ | spark.ui.view.acls |
+ Empty |
+
+ Comma separated list of users that have view access to the spark web ui. By default only the
+ user that started the Spark job has view access.
+ |
+
| spark.shuffle.compress |
true |
@@ -244,6 +272,17 @@ Apart from these, the following properties are also available, and may be useful
exceeded" exception inside Kryo. Note that there will be one buffer per core on each worker.
+
+ | spark.serializer.objectStreamReset |
+ 10000 |
+
+ When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches
+ objects to prevent writing redundant data, however that stops garbage collection of those
+ objects. By calling 'reset' you flush that info from the serializer, and allow old
+ objects to be collected. To turn off this periodic reset set it to a value of <= 0.
+ By default it will reset the serializer every 10,000 objects.
+ |
+
| spark.broadcast.factory |
org.apache.spark.broadcast. HttpBroadcastFactory |
@@ -476,7 +515,7 @@ Apart from these, the following properties are also available, and may be useful
the whole cluster by default.
Note: this setting needs to be configured in the standalone cluster master, not in individual
applications; you can set it through SPARK_JAVA_OPTS in spark-env.sh.
-
+
| spark.files.overwrite |
@@ -485,6 +524,38 @@ Apart from these, the following properties are also available, and may be useful
Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source.
+
+ | spark.files.fetchTimeout |
+ false |
+
+ Communication timeout to use when fetching files added through SparkContext.addFile() from
+ the driver.
+ |
+
+
+ | spark.authenticate |
+ false |
+
+ Whether spark authenticates its internal connections. See spark.authenticate.secret if not
+ running on Yarn.
+ |
+
+
+ | spark.authenticate.secret |
+ None |
+
+ Set the secret key used for Spark to authenticate between components. This needs to be set if
+ not running on Yarn and authentication is enabled.
+ |
+
+
+ | spark.core.connection.auth.wait.timeout |
+ 30 |
+
+ Number of seconds for the connection to wait for authentication to occur before timing
+ out and giving up.
+ |
+
## Viewing Spark Properties
diff --git a/docs/index.md b/docs/index.md
index 4eb297df39144..c4f4d79edbc6c 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -103,6 +103,7 @@ For this version of Spark (0.8.1) Hadoop 2.2.x (or newer) users will have to bui
* [Configuration](configuration.html): customize Spark via its configuration system
* [Tuning Guide](tuning.html): best practices to optimize performance and memory use
+* [Security](security.html): Spark security support
* [Hardware Provisioning](hardware-provisioning.html): recommendations for cluster hardware
* [Job Scheduling](job-scheduling.html): scheduling resources across and within Spark applications
* [Building Spark with Maven](building-with-maven.html): build Spark using the Maven system
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index ee1d892a3b630..b17929542c531 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -29,7 +29,7 @@ If you want to test out the YARN deployment mode, you can use the current Spark
# Configuration
-Most of the configs are the same for Spark on YARN as other deploys. See the Configuration page for more information on those. These are configs that are specific to SPARK on YARN.
+Most of the configs are the same for Spark on YARN as for other deployment modes. See the Configuration page for more information on those. These are configs that are specific to Spark on YARN.
Environment variables:
@@ -41,28 +41,30 @@ System Properties:
* `spark.yarn.submit.file.replication`, the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives.
* `spark.yarn.preserve.staging.files`, set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them.
* `spark.yarn.scheduler.heartbeat.interval-ms`, the interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. Default is 5 seconds.
-* `spark.yarn.max.worker.failures`, the maximum number of worker failures before failing the application. Default is the number of workers requested times 2 with minimum of 3.
+* `spark.yarn.max.worker.failures`, the maximum number of executor failures before failing the application. Default is the number of executors requested times 2 with minimum of 3.
# Launching Spark on YARN
-Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster.
-This would be used to connect to the cluster, write to the dfs and submit jobs to the resource manager.
+Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster.
+These configs are used to connect to the cluster, write to the dfs, and connect to the YARN ResourceManager.
-There are two scheduler mode that can be used to launch spark application on YARN.
+There are two scheduler modes that can be used to launch Spark applications on YARN. In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
-## Launch spark application by YARN Client with yarn-standalone mode.
+Unlike in Spark standalone and Mesos mode, in which the master's address is specified in the "master" parameter, in YARN mode the ResourceManager's address is picked up from the Hadoop configuration. Thus, the master parameter is simply "yarn-client" or "yarn-cluster".
-The command to launch the YARN Client is as follows:
+## Launching a Spark application with yarn-cluster mode.
+
+The command to launch the Spark application on the cluster is as follows:
SPARK_JAR= ./bin/spark-class org.apache.spark.deploy.yarn.Client \
--jar \
--class \
--args \
- --num-workers \
+ --num-workers \
--master-class
--master-memory \
- --worker-memory \
- --worker-cores \
+ --worker-memory \
+ --worker-cores \
--name \
--queue \
--addJars \
@@ -82,35 +84,30 @@ For example:
./bin/spark-class org.apache.spark.deploy.yarn.Client \
--jar examples/target/scala-{{site.SCALA_BINARY_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
--class org.apache.spark.examples.SparkPi \
- --args yarn-standalone \
+ --args yarn-cluster \
--num-workers 3 \
--master-memory 4g \
--worker-memory 2g \
--worker-cores 1
- # Examine the output (replace $YARN_APP_ID in the following with the "application identifier" output by the previous command)
- # (Note: YARN_APP_LOGS_DIR is usually /tmp/logs or $HADOOP_HOME/logs/userlogs depending on the Hadoop version.)
- $ cat $YARN_APP_LOGS_DIR/$YARN_APP_ID/container*_000001/stdout
- Pi is roughly 3.13794
-
-The above starts a YARN Client programs which start the default Application Master. Then SparkPi will be run as a child thread of Application Master, YARN Client will periodically polls the Application Master for status updates and displays them in the console. The client will exit once your application has finished running.
+The above starts a YARN client program which starts the default Application Master. Then SparkPi will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running. Refer to the "Viewing Logs" section below for how to see driver and executor logs.
-With this mode, your application is actually run on the remote machine where the Application Master is run upon. Thus application that involve local interaction will not work well, e.g. spark-shell.
+Because the application is run on a remote machine where the Application Master is running, applications that involve local interaction, such as spark-shell, will not work.
-## Launch spark application with yarn-client mode.
+## Launching a Spark application with yarn-client mode.
-With yarn-client mode, the application will be launched locally. Just like running application or spark-shell on Local / Mesos / Standalone mode. The launch method is also the similar with them, just make sure that when you need to specify a master url, use "yarn-client" instead. And you also need to export the env value for SPARK_JAR.
+With yarn-client mode, the application will be launched locally, just like running an application or spark-shell on Local / Mesos / Standalone client mode. The launch method is also the same, just make sure to specify the master URL as "yarn-client". You also need to export the env value for SPARK_JAR.
Configuration in yarn-client mode:
-In order to tune worker core/number/memory etc. You need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options.
+In order to tune worker cores/number/memory etc., you need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options.
-* `SPARK_WORKER_INSTANCES`, Number of workers to start (Default: 2)
-* `SPARK_WORKER_CORES`, Number of cores for the workers (Default: 1).
-* `SPARK_WORKER_MEMORY`, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
+* `SPARK_WORKER_INSTANCES`, Number of executors to start (Default: 2)
+* `SPARK_WORKER_CORES`, Number of cores per executor (Default: 1).
+* `SPARK_WORKER_MEMORY`, Memory per executor (e.g. 1000M, 2G) (Default: 1G)
* `SPARK_MASTER_MEMORY`, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
* `SPARK_YARN_APP_NAME`, The name of your application (Default: Spark)
-* `SPARK_YARN_QUEUE`, The hadoop queue to use for allocation requests (Default: 'default')
+* `SPARK_YARN_QUEUE`, The YARN queue to use for allocation requests (Default: 'default')
* `SPARK_YARN_DIST_FILES`, Comma separated list of files to be distributed with the job.
* `SPARK_YARN_DIST_ARCHIVES`, Comma separated list of archives to be distributed with the job.
@@ -125,13 +122,23 @@ or
MASTER=yarn-client ./bin/spark-shell
+## Viewing logs
+
+In YARN terminology, executors and application masters run inside "containers". YARN has two modes for handling container logs after an application has completed. If log aggregation is turned on (with the yarn.log-aggregation-enable config), container logs are copied to HDFS and deleted on the local machine. These logs can be viewed from anywhere on the cluster with the "yarn logs" command.
+
+ yarn logs -applicationId
+
+will print out the contents of all log files from all containers from the given application.
+
+When log aggregation isn't turned on, logs are retained locally on each machine under YARN_APP_LOGS_DIR, which is usually configured to /tmp/logs or $HADOOP_HOME/logs/userlogs depending on the Hadoop version and installation. Viewing logs for a container requires going to the host that contains them and looking in this directory. Subdirectories organize log files by application ID and container ID.
+
# Building Spark for Hadoop/YARN 2.2.x
-See [Building Spark with Maven](building-with-maven.html) for instructions on how to build Spark using the Maven process.
+See [Building Spark with Maven](building-with-maven.html) for instructions on how to build Spark using Maven.
-# Important Notes
+# Important notes
- Before Hadoop 2.2, YARN does not support cores in container resource requests. Thus, when running against an earlier version, the numbers of cores given via command line arguments cannot be passed to YARN. Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured.
-- The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.
-- The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN.
+- The local directories used by Spark executors will be the local directories configured for YARN (Hadoop YARN config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.
+- The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt, and your application should use the name as appSees.txt to reference it when running on YARN.
- The --addJars option allows the SparkContext.addJar function to work if you are using it with local files. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.
diff --git a/docs/security.md b/docs/security.md
new file mode 100644
index 0000000000000..9e4218fbcfe7d
--- /dev/null
+++ b/docs/security.md
@@ -0,0 +1,18 @@
+---
+layout: global
+title: Spark Security
+---
+
+Spark currently supports authentication via a shared secret. Authentication can be configured to be on via the `spark.authenticate` configuration parameter. This parameter controls whether the Spark communication protocols do authentication using the shared secret. This authentication is a basic handshake to make sure both sides have the same shared secret and are allowed to communicate. If the shared secret is not identical they will not be allowed to communicate.
+
+The Spark UI can also be secured by using javax servlet filters. A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view acls to make sure they are authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls' control the behavior of the acls. Note that the person who started the application always has view access to the UI.
+
+For Spark on Yarn deployments, configuring `spark.authenticate` to true will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. The Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI.
+
+For other types of Spark deployments, the spark config `spark.authenticate.secret` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. The UI can be secured using a javax servlet filter installed via `spark.ui.filters`. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI.
+
+IMPORTANT NOTE: The NettyBlockFetcherIterator is not secured so do not use netty for the shuffle is running with authentication on.
+
+See [Spark Configuration](configuration.html) for more details on the security configs.
+
+See org.apache.spark.SecurityManager for implementation details about security.
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 2a56cf07d0cfc..f9904d45013f6 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -539,7 +539,7 @@ common ones are as follows.
updateStateByKey(func) |
Return a new "state" DStream where the state for each key is updated by applying the
given function on the previous state of the key and the new values for the key. This can be
- used to maintain arbitrary state data for each ket. |
+ used to maintain arbitrary state data for each key.
| |
diff --git a/docs/tuning.md b/docs/tuning.md
index 26ff1325bb59c..093df3187a789 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -163,7 +163,7 @@ their work directories), *not* on your driver program.
**Cache Size Tuning**
One important configuration parameter for GC is the amount of memory that should be used for caching RDDs.
-By default, Spark uses 60% of the configured executor memory (`spark.executor.memory` or `SPARK_MEM`) to
+By default, Spark uses 60% of the configured executor memory (`spark.executor.memory`) to
cache RDDs. This means that 40% of memory is available for any objects created during task execution.
In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 25e85381896b0..d8840c94ac17c 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -398,15 +398,13 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
if any((master_nodes, slave_nodes)):
print ("Found %d master(s), %d slaves" %
(len(master_nodes), len(slave_nodes)))
- if (master_nodes != [] and slave_nodes != []) or not die_on_error:
+ if master_nodes != [] or not die_on_error:
return (master_nodes, slave_nodes)
else:
if master_nodes == [] and slave_nodes != []:
- print "ERROR: Could not find master in group " + cluster_name + "-master"
- elif master_nodes != [] and slave_nodes == []:
- print "ERROR: Could not find slaves in group " + cluster_name + "-slaves"
+ print >> sys.stderr, "ERROR: Could not find master in group " + cluster_name + "-master"
else:
- print "ERROR: Could not find any existing cluster"
+ print >> sys.stderr, "ERROR: Could not find any existing cluster"
sys.exit(1)
@@ -680,6 +678,9 @@ def real_main():
opts.zone = random.choice(conn.get_all_zones()).name
if action == "launch":
+ if opts.slaves <= 0:
+ print >> sys.stderr, "ERROR: You have to start at least 1 slave"
+ sys.exit(1)
if opts.resume:
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name)
diff --git a/examples/pom.xml b/examples/pom.xml
index 3aba343f4cf50..9f0e2d0b875b8 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -37,10 +37,10 @@
a Hadoop 0.23.X issue -->
yarn-alpha
-
- org.apache.avro
- avro
-
+
+ org.apache.avro
+ avro
+
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
new file mode 100644
index 0000000000000..ee283ce6abac2
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import java.nio.ByteBuffer
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ListBuffer
+import scala.collection.immutable.Map
+import org.apache.cassandra.hadoop.ConfigHelper
+import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
+import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
+import org.apache.cassandra.utils.ByteBufferUtil
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+
+/*
+ Need to create following keyspace and column family in cassandra before running this example
+ Start CQL shell using ./bin/cqlsh and execute following commands
+ CREATE KEYSPACE retail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+ use retail;
+ CREATE TABLE salecount (prod_id text, sale_count int, PRIMARY KEY (prod_id));
+ CREATE TABLE ordercf (user_id text,
+ time timestamp,
+ prod_id text,
+ quantity int,
+ PRIMARY KEY (user_id, time));
+ INSERT INTO ordercf (user_id,
+ time,
+ prod_id,
+ quantity) VALUES ('bob', 1385983646000, 'iphone', 1);
+ INSERT INTO ordercf (user_id,
+ time,
+ prod_id,
+ quantity) VALUES ('tom', 1385983647000, 'samsung', 4);
+ INSERT INTO ordercf (user_id,
+ time,
+ prod_id,
+ quantity) VALUES ('dora', 1385983648000, 'nokia', 2);
+ INSERT INTO ordercf (user_id,
+ time,
+ prod_id,
+ quantity) VALUES ('charlie', 1385983649000, 'iphone', 2);
+*/
+
+/**
+ * This example demonstrates how to read and write to cassandra column family created using CQL3
+ * using Spark.
+ * Parameters :
+ * Usage: ./bin/run-example org.apache.spark.examples.CassandraCQLTest local[2] localhost 9160
+ *
+ */
+object CassandraCQLTest {
+
+ def main(args: Array[String]) {
+ val sc = new SparkContext(args(0),
+ "CQLTestApp",
+ System.getenv("SPARK_HOME"),
+ SparkContext.jarOfClass(this.getClass))
+ val cHost: String = args(1)
+ val cPort: String = args(2)
+ val KeySpace = "retail"
+ val InputColumnFamily = "ordercf"
+ val OutputColumnFamily = "salecount"
+
+ val job = new Job()
+ job.setInputFormatClass(classOf[CqlPagingInputFormat])
+ ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
+ ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
+ ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily)
+ ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
+ CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3")
+
+ /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'") */
+
+ /** An UPDATE writes one or more columns to a record in a Cassandra column family */
+ val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? "
+ CqlConfigHelper.setOutputCql(job.getConfiguration(), query)
+
+ job.setOutputFormatClass(classOf[CqlOutputFormat])
+ ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily)
+ ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost)
+ ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort)
+ ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
+
+ val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
+ classOf[CqlPagingInputFormat],
+ classOf[java.util.Map[String,ByteBuffer]],
+ classOf[java.util.Map[String,ByteBuffer]])
+
+ println("Count: " + casRdd.count)
+ val productSaleRDD = casRdd.map {
+ case (key, value) => {
+ (ByteBufferUtil.string(value.get("prod_id")), ByteBufferUtil.toInt(value.get("quantity")))
+ }
+ }
+ val aggregatedRDD = productSaleRDD.reduceByKey(_ + _)
+ aggregatedRDD.collect().foreach {
+ case (productId, saleCount) => println(productId + ":" + saleCount)
+ }
+
+ val casoutputCF = aggregatedRDD.map {
+ case (productId, saleCount) => {
+ val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId))
+ val outKey: java.util.Map[String, ByteBuffer] = outColFamKey
+ var outColFamVal = new ListBuffer[ByteBuffer]
+ outColFamVal += ByteBufferUtil.bytes(saleCount)
+ val outVal: java.util.List[ByteBuffer] = outColFamVal
+ (outKey, outVal)
+ }
+ }
+
+ casoutputCF.saveAsNewAPIHadoopFile(
+ KeySpace,
+ classOf[java.util.Map[String, ByteBuffer]],
+ classOf[java.util.List[ByteBuffer]],
+ classOf[CqlOutputFormat],
+ job.getConfiguration()
+ )
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 3d7b390724e77..62d3a52615584 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -23,7 +23,7 @@ import scala.util.Random
import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SecurityManager}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.spark.streaming.receivers.Receiver
@@ -112,8 +112,9 @@ object FeederActor {
}
val Seq(host, port) = args.toSeq
-
- val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = new SparkConf)._1
+ val conf = new SparkConf
+ val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf,
+ securityManager = new SecurityManager(conf))._1
val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
println("Feeder started as:" + feeder)
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 8783aea3e4a5b..f21963531574b 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -37,10 +37,10 @@
a Hadoop 0.23.X issue -->
yarn-alpha
-
- org.apache.avro
- avro
-
+
+ org.apache.avro
+ avro
+
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index 79dc38f9844a0..343e1fabd823f 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -37,10 +37,10 @@
a Hadoop 0.23.X issue -->
yarn-alpha
-
- org.apache.avro
- avro
-
+
+ org.apache.avro
+ avro
+
diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml
index 37bb4fad64f68..398b9f4fbaa7d 100644
--- a/external/twitter/pom.xml
+++ b/external/twitter/pom.xml
@@ -37,10 +37,10 @@
a Hadoop 0.23.X issue -->
yarn-alpha
-
- org.apache.avro
- avro
-
+
+ org.apache.avro
+ avro
+
diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml
index 65ec0e26da881..77e957f404645 100644
--- a/external/zeromq/pom.xml
+++ b/external/zeromq/pom.xml
@@ -37,10 +37,10 @@
a Hadoop 0.23.X issue -->
yarn-alpha
-
- org.apache.avro
- avro
-
+
+ org.apache.avro
+ avro
+
diff --git a/graphx/pom.xml b/graphx/pom.xml
index 5b54dd27efb44..894a7c2641e39 100644
--- a/graphx/pom.xml
+++ b/graphx/pom.xml
@@ -37,10 +37,10 @@
a Hadoop 0.23.X issue -->
yarn-alpha
-
- org.apache.avro
- avro
-
+
+ org.apache.avro
+ avro
+
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
index d1528e2f07cf2..014a7335f85cc 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
@@ -23,8 +23,8 @@ import scala.collection.mutable.HashSet
import org.apache.spark.util.Utils
-import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor}
-import org.objectweb.asm.Opcodes._
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor}
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
/**
diff --git a/mllib/pom.xml b/mllib/pom.xml
index 760a2a85d5ffa..9b65cb4b4ce3f 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -37,10 +37,10 @@
a Hadoop 0.23.X issue -->
yarn-alpha
-
- org.apache.avro
- avro
-
+
+ org.apache.avro
+ avro
+
diff --git a/pom.xml b/pom.xml
index c59fada5cd4a0..f0c877dcfe7b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -155,6 +155,21 @@
+
+ org.eclipse.jetty
+ jetty-util
+ 7.6.8.v20121106
+
+
+ org.eclipse.jetty
+ jetty-security
+ 7.6.8.v20121106
+
+
+ org.eclipse.jetty
+ jetty-plus
+ 7.6.8.v20121106
+
org.eclipse.jetty
jetty-server
@@ -206,11 +221,6 @@
snappy-java
1.0.5
-
- org.ow2.asm
- asm
- 4.0
-
com.clearspring.analytics
stream
@@ -230,11 +240,31 @@
com.twitter
chill_${scala.binary.version}
0.3.1
+
+
+ org.ow2.asm
+ asm
+
+
+ org.ow2.asm
+ asm-commons
+
+
com.twitter
chill-java
0.3.1
+
+
+ org.ow2.asm
+ asm
+
+
+ org.ow2.asm
+ asm-commons
+
+
${akka.group}
@@ -295,6 +325,11 @@
mesos
${mesos.version}
+
+ commons-net
+ commons-net
+ 2.2
+
io.netty
netty-all
@@ -415,6 +450,10 @@
asm
asm
+
+ org.ow2.asm
+ asm
+
org.jboss.netty
netty
@@ -454,6 +493,10 @@
asm
asm
+
+ org.ow2.asm
+ asm
+
org.jboss.netty
netty
@@ -469,6 +512,10 @@
asm
asm
+
+ org.ow2.asm
+ asm
+
org.jboss.netty
netty
@@ -485,6 +532,10 @@
asm
asm
+
+ org.ow2.asm
+ asm
+
org.jboss.netty
netty
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index aa1784897566b..8fa220c413291 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -226,6 +226,9 @@ object SparkBuild extends Build {
libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.0.17.Final",
"org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
+ "org.eclipse.jetty" % "jetty-util" % "7.6.8.v20121106",
+ "org.eclipse.jetty" % "jetty-plus" % "7.6.8.v20121106",
+ "org.eclipse.jetty" % "jetty-security" % "7.6.8.v20121106",
/** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */
"org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
@@ -254,7 +257,8 @@ object SparkBuild extends Build {
val slf4jVersion = "1.7.5"
val excludeNetty = ExclusionRule(organization = "org.jboss.netty")
- val excludeAsm = ExclusionRule(organization = "asm")
+ val excludeAsm = ExclusionRule(organization = "org.ow2.asm")
+ val excludeOldAsm = ExclusionRule(organization = "asm")
val excludeCommonsLogging = ExclusionRule(organization = "commons-logging")
val excludeSLF4J = ExclusionRule(organization = "org.slf4j")
val excludeScalap = ExclusionRule(organization = "org.scala-lang", artifact = "scalap")
@@ -277,7 +281,6 @@ object SparkBuild extends Build {
"commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407
"com.ning" % "compress-lzf" % "1.0.0",
"org.xerial.snappy" % "snappy-java" % "1.0.5",
- "org.ow2.asm" % "asm" % "4.0",
"org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty),
"org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty),
"org.spark-project.akka" %% "akka-testkit" % "2.2.3-shaded-protobuf" % "test",
@@ -285,17 +288,18 @@ object SparkBuild extends Build {
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
"org.apache.mesos" % "mesos" % "0.13.0",
+ "commons-net" % "commons-net" % "2.2",
"net.java.dev.jets3t" % "jets3t" % "0.7.1" excludeAll(excludeCommonsLogging),
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
- "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeCommonsLogging, excludeSLF4J),
+ "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeCommonsLogging, excludeSLF4J, excludeOldAsm),
"org.apache.curator" % "curator-recipes" % "2.4.0" excludeAll(excludeNetty),
"com.codahale.metrics" % "metrics-core" % "3.0.0",
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
"com.codahale.metrics" % "metrics-json" % "3.0.0",
"com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
"com.codahale.metrics" % "metrics-graphite" % "3.0.0",
- "com.twitter" %% "chill" % "0.3.1",
- "com.twitter" % "chill-java" % "0.3.1",
+ "com.twitter" %% "chill" % "0.3.1" excludeAll(excludeAsm),
+ "com.twitter" % "chill-java" % "0.3.1" excludeAll(excludeAsm),
"com.clearspring.analytics" % "stream" % "2.5.1"
),
libraryDependencies ++= maybeAvro
@@ -316,7 +320,7 @@ object SparkBuild extends Build {
name := "spark-examples",
libraryDependencies ++= Seq(
"com.twitter" %% "algebird-core" % "0.1.11",
- "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm, excludeCommonsLogging),
+ "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm, excludeOldAsm, excludeCommonsLogging),
"org.apache.cassandra" % "cassandra-all" % "1.2.6"
exclude("com.google.guava", "guava")
exclude("com.googlecode.concurrentlinkedhashmap", "concurrentlinkedhashmap-lru")
@@ -393,10 +397,10 @@ object SparkBuild extends Build {
def yarnEnabledSettings = Seq(
libraryDependencies ++= Seq(
// Exclude rule required for all ?
- "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeNetty, excludeAsm),
- "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeNetty, excludeAsm),
- "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeNetty, excludeAsm),
- "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeNetty, excludeAsm)
+ "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeOldAsm),
+ "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeOldAsm),
+ "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeOldAsm),
+ "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeOldAsm)
)
)
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 914f2e05a402a..32bc044a93221 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -19,3 +19,4 @@ addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.4.0")
+addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.0")
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 93faa2e3857ed..c9f42d3aacb58 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -372,6 +372,37 @@ def _getJavaStorageLevel(self, storageLevel):
return newStorageLevel(storageLevel.useDisk, storageLevel.useMemory,
storageLevel.deserialized, storageLevel.replication)
+ def setJobGroup(self, groupId, description):
+ """
+ Assigns a group ID to all the jobs started by this thread until the group ID is set to a
+ different value or cleared.
+
+ Often, a unit of execution in an application consists of multiple Spark actions or jobs.
+ Application programmers can use this method to group all those jobs together and give a
+ group description. Once set, the Spark web UI will associate such jobs with this group.
+ """
+ self._jsc.setJobGroup(groupId, description)
+
+ def setLocalProperty(self, key, value):
+ """
+ Set a local property that affects jobs submitted from this thread, such as the
+ Spark fair scheduler pool.
+ """
+ self._jsc.setLocalProperty(key, value)
+
+ def getLocalProperty(self, key):
+ """
+ Get a local property set in this thread, or null if it is missing. See
+ L{setLocalProperty}
+ """
+ return self._jsc.getLocalProperty(key)
+
+ def sparkUser(self):
+ """
+ Get SPARK_USER for user who is running SparkContext.
+ """
+ return self._jsc.sc().sparkUser()
+
def _test():
import atexit
import doctest
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index c15add5237507..6a16756e0576d 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -29,7 +29,7 @@
def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the
- # proper classpath and SPARK_MEM settings from spark-env.sh
+ # proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index be23f87f5ed2d..e72f57d9d1ab0 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -95,6 +95,13 @@ def __init__(self, jrdd, ctx, jrdd_deserializer):
self.is_checkpointed = False
self.ctx = ctx
self._jrdd_deserializer = jrdd_deserializer
+ self._id = jrdd.id()
+
+ def id(self):
+ """
+ A unique ID for this RDD (within its SparkContext).
+ """
+ return self._id
def __repr__(self):
return self._jrdd.toString()
@@ -319,6 +326,23 @@ def union(self, other):
return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
self.ctx.serializer)
+ def intersection(self, other):
+ """
+ Return the intersection of this RDD and another one. The output will not
+ contain any duplicate elements, even if the input RDDs did.
+
+ Note that this method performs a shuffle internally.
+
+ >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
+ >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
+ >>> rdd1.intersection(rdd2).collect()
+ [1, 2, 3]
+ """
+ return self.map(lambda v: (v, None)) \
+ .cogroup(other.map(lambda v: (v, None))) \
+ .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \
+ .keys()
+
def _reserialize(self):
if self._jrdd_deserializer == self.ctx.serializer:
return self
diff --git a/repl/pom.xml b/repl/pom.xml
index aa01a1760285a..fc49c8b811316 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -37,10 +37,10 @@
yarn-alpha
-
- org.apache.avro
- avro
-
+
+ org.apache.avro
+ avro
+
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index e3bcf7f30ac8d..ee972887feda6 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -18,14 +18,18 @@
package org.apache.spark.repl
import java.io.{ByteArrayOutputStream, InputStream}
-import java.net.{URI, URL, URLClassLoader, URLEncoder}
+import java.net.{URI, URL, URLEncoder}
import java.util.concurrent.{Executors, ExecutorService}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.objectweb.asm._
-import org.objectweb.asm.Opcodes._
+import org.apache.spark.SparkEnv
+import org.apache.spark.util.Utils
+
+
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm._
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
/**
@@ -53,7 +57,13 @@ extends ClassLoader(parent) {
if (fileSystem != null) {
fileSystem.open(new Path(directory, pathInDirectory))
} else {
- new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
+ if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
+ val uri = new URI(classUri + "/" + urlEncode(pathInDirectory))
+ val newuri = Utils.constructURIForAuthentication(uri, SparkEnv.get.securityManager)
+ newuri.toURL().openStream()
+ } else {
+ new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
+ }
}
}
val bytes = readAndTransformClass(name, inputStream)
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index f52ebe4a159f1..9b1da195002c2 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -881,6 +881,8 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
})
def process(settings: Settings): Boolean = savingContextLoader {
+ if (getMaster() == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
+
this.settings = settings
createInterpreter()
@@ -939,16 +941,9 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
def createSparkContext(): SparkContext = {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
- val master = this.master match {
- case Some(m) => m
- case None => {
- val prop = System.getenv("MASTER")
- if (prop != null) prop else "local"
- }
- }
val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath)
val conf = new SparkConf()
- .setMaster(master)
+ .setMaster(getMaster())
.setAppName("Spark shell")
.setJars(jars)
.set("spark.repl.class.uri", intp.classServer.uri)
@@ -963,6 +958,17 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
sparkContext
}
+ private def getMaster(): String = {
+ val master = this.master match {
+ case Some(m) => m
+ case None => {
+ val prop = System.getenv("MASTER")
+ if (prop != null) prop else "local"
+ }
+ }
+ master
+ }
+
/** process command-line arguments and do as they request */
def process(args: Array[String]): Boolean = {
val command = new SparkCommandLine(args.toList, msg => echo(msg))
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index 1d73d0b6993a8..90a96ad38381e 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -36,7 +36,7 @@ import scala.tools.reflect.StdRuntimeTags._
import scala.util.control.ControlThrowable
import util.stackTraceString
-import org.apache.spark.{HttpServer, SparkConf, Logging}
+import org.apache.spark.{Logging, HttpServer, SecurityManager, SparkConf}
import org.apache.spark.util.Utils
// /** directory to save .class files to */
@@ -83,15 +83,17 @@ import org.apache.spark.util.Utils
* @author Moez A. Abdel-Gawad
* @author Lex Spoon
*/
- class SparkIMain(initialSettings: Settings, val out: JPrintWriter) extends SparkImports with Logging {
+ class SparkIMain(initialSettings: Settings, val out: JPrintWriter)
+ extends SparkImports with Logging {
imain =>
- val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1")
+ val conf = new SparkConf()
+ val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1")
/** Local directory to save .class files too */
val outputDir = {
val tmp = System.getProperty("java.io.tmpdir")
- val rootDir = new SparkConf().get("spark.repl.classdir", tmp)
+ val rootDir = conf.get("spark.repl.classdir", tmp)
Utils.createTempDir(rootDir)
}
if (SPARK_DEBUG_REPL) {
@@ -99,7 +101,8 @@ import org.apache.spark.util.Utils
}
val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles
- val classServer = new HttpServer(outputDir) /** Jetty server that will serve our classes to worker nodes */
+ val classServer = new HttpServer(outputDir,
+ new SecurityManager(conf)) /** Jetty server that will serve our classes to worker nodes */
private var currentSettings: Settings = initialSettings
var printResults = true // whether to print result lines
var totalSilence = false // whether to print anything
diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash
index 00a6b41013e5f..64e40a88206be 100755
--- a/sbt/sbt-launch-lib.bash
+++ b/sbt/sbt-launch-lib.bash
@@ -105,7 +105,7 @@ get_mem_opts () {
local mem=${1:-2048}
local perm=$(( $mem / 4 ))
(( $perm > 256 )) || perm=256
- (( $perm < 1024 )) || perm=1024
+ (( $perm < 4096 )) || perm=4096
local codecache=$(( $perm / 2 ))
echo "-Xms${mem}m -Xmx${mem}m -XX:MaxPermSize=${perm}m -XX:ReservedCodeCacheSize=${codecache}m"
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 91d6a1375a18c..2343e381e6f7c 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -37,10 +37,10 @@
a Hadoop 0.23.X issue -->
yarn-alpha
-
- org.apache.avro
- avro
-
+
+ org.apache.avro
+ avro
+
diff --git a/tools/pom.xml b/tools/pom.xml
index b8dd255d40ac4..11433e596f5b0 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -36,10 +36,10 @@
yarn-alpha
-
- org.apache.avro
- avro
-
+
+ org.apache.avro
+ avro
+
diff --git a/yarn/alpha/pom.xml b/yarn/alpha/pom.xml
index bfe12ecec0c09..d0aeaceb0d23c 100644
--- a/yarn/alpha/pom.xml
+++ b/yarn/alpha/pom.xml
@@ -30,10 +30,10 @@
a Hadoop 0.23.X issue -->
yarn-alpha
-
- org.apache.avro
- avro
-
+
+ org.apache.avro
+ avro
+
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e045b9f0248f6..bb574f415293a 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -27,7 +27,6 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
@@ -36,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -87,27 +86,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
resourceManager = registerWithResourceManager()
- // Workaround until hadoop moves to something which has
- // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
- // ignore result.
- // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
- // Hence args.workerCores = numCore disabled above. Any better option?
-
- // Compute number of threads for akka
- //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
- //if (minimumMemory > 0) {
- // val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
- // if (numCore > 0) {
- // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
- // TODO: Uncomment when hadoop is on a version which has this fixed.
- // args.workerCores = numCore
- // }
- //}
- // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
+ // setup AmIpFilter for the SparkUI - do this before we start the UI
+ addAmIpFilter()
ApplicationMaster.register(this)
+
+ // Call this to force generation of secret so it gets populated into the
+ // hadoop UGI. This has to happen before the startUserClass which does a
+ // doAs in order for the credentials to be passed on to the worker containers.
+ val securityMgr = new SecurityManager(sparkConf)
+
// Start the user's JAR
userThread = startUserClass()
@@ -132,6 +120,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
System.exit(0)
}
+ // add the yarn amIpFilter that Yarn requires for properly securing the UI
+ private def addAmIpFilter() {
+ val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+ System.setProperty("spark.ui.filters", amFilter)
+ val proxy = YarnConfiguration.getProxyHostAndPort(conf)
+ val parts : Array[String] = proxy.split(":")
+ val uriBase = "http://" + proxy +
+ System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+
+ val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+ System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params",
+ params)
+ }
+
/** Get the Yarn approved local directories. */
private def getLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 138c27910b0b0..b735d01df8097 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
@@ -50,8 +50,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
private var yarnAllocator: YarnAllocationHandler = _
private var driverClosed:Boolean = false
+ val securityManager = new SecurityManager(sparkConf)
val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
- conf = sparkConf)._1
+ conf = sparkConf, securityManager = securityManager)._1
var actor: ActorRef = _
// This actor just working as a monitor to watch on Driver Actor.
@@ -110,6 +111,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval =
System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index fe37168e5a7ba..1f894a677d169 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -129,12 +129,12 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
System.err.println(
"Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
"Options:\n" +
- " --jar JAR_PATH Path to your application's JAR file (required in yarn-standalone mode)\n" +
+ " --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)\n" +
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
" --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
+ " --worker-cores NUM Number of cores for the workers (Default: 1).\n" +
" --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index d6c12a9f5952d..4c6e1dcd6dac3 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,11 +17,13 @@
package org.apache.spark.deploy.yarn
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.conf.Configuration
+import org.apache.spark.deploy.SparkHadoopUtil
/**
* Contains util methods to interact with Hadoop from spark.
@@ -44,4 +46,24 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
val jobCreds = conf.getCredentials()
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}
+
+ override def getCurrentUserCredentials(): Credentials = {
+ UserGroupInformation.getCurrentUser().getCredentials()
+ }
+
+ override def addCurrentUserCredentials(creds: Credentials) {
+ UserGroupInformation.getCurrentUser().addCredentials(creds)
+ }
+
+ override def addSecretKeyToUserCredentials(key: String, secret: String) {
+ val creds = new Credentials()
+ creds.addSecretKey(new Text(key), secret.getBytes())
+ addCurrentUserCredentials(creds)
+ }
+
+ override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = {
+ val credentials = getCurrentUserCredentials()
+ if (credentials != null) credentials.getSecretKey(new Text(key)) else null
+ }
+
}
diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml
index 9d68603251d1c..e7915d12aef63 100644
--- a/yarn/stable/pom.xml
+++ b/yarn/stable/pom.xml
@@ -30,10 +30,10 @@
a Hadoop 0.23.X issue -->
yarn-alpha
-
- org.apache.avro
- avro
-
+
+ org.apache.avro
+ avro
+
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index dd117d5810949..b48a2d50db5ef 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -27,7 +27,6 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.protocolrecords._
@@ -37,8 +36,9 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -91,12 +91,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
amClient.init(yarnConf)
amClient.start()
- // Workaround until hadoop moves to something which has
- // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
- // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
+ // setup AmIpFilter for the SparkUI - do this before we start the UI
+ addAmIpFilter()
ApplicationMaster.register(this)
+ // Call this to force generation of secret so it gets populated into the
+ // hadoop UGI. This has to happen before the startUserClass which does a
+ // doAs in order for the credentials to be passed on to the worker containers.
+ val securityMgr = new SecurityManager(sparkConf)
+
// Start the user's JAR
userThread = startUserClass()
@@ -121,6 +125,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
System.exit(0)
}
+ // add the yarn amIpFilter that Yarn requires for properly securing the UI
+ private def addAmIpFilter() {
+ val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+ System.setProperty("spark.ui.filters", amFilter)
+ val proxy = WebAppUtils.getProxyHostAndPort(conf)
+ val parts : Array[String] = proxy.split(":")
+ val uriBase = "http://" + proxy +
+ System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+
+ val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+ System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
+ }
+
/** Get the Yarn approved local directories. */
private def getLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
@@ -261,7 +278,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
val schedulerInterval =
sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 40600f38e5e73..f1c1fea0b5895 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
@@ -52,8 +52,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
private var amClient: AMRMClient[ContainerRequest] = _
+ val securityManager = new SecurityManager(sparkConf)
val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
- conf = sparkConf)._1
+ conf = sparkConf, securityManager = securityManager)._1
var actor: ActorRef = _
// This actor just working as a monitor to watch on Driver Actor.
@@ -105,6 +106,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val interval = math.min(timeoutInterval / 2, schedulerInterval)
reporterThread = launchReporterThread(interval)
+
// Wait for the reporter thread to Finish.
reporterThread.join()