From 2669f57f37ab818cdf57e764a9b833742ab8dbcb Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 26 May 2014 06:46:55 +0000 Subject: [PATCH 01/21] [maven-release-plugin] prepare for next development iteration --- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 4 ++-- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- yarn/stable/pom.xml | 2 +- 21 files changed, 22 insertions(+), 22 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index bed426138cef5..720ca77c13e0f 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../pom.xml diff --git a/bagel/pom.xml b/bagel/pom.xml index 08932bbb17a1d..85f6d9919d9c9 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 3e226419b58d4..47c2507c3c825 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../pom.xml diff --git a/examples/pom.xml b/examples/pom.xml index 006757a1ed9ae..b7cbb1a8bf85e 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../pom.xml diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 3ba984eee1aa4..b8fc07f7d3e3f 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../../pom.xml diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index cb4dd47a0cfab..9eeb2e18aa989 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../../pom.xml diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index b10916d4d637c..f4272cea8a183 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../../pom.xml diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml index f33b583c3cfbd..e4c2302635150 100644 --- a/external/twitter/pom.xml +++ b/external/twitter/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../../pom.xml diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index a137112563a81..8575c86d6ce16 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../../pom.xml diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml index 86d893d5b9e9d..9a506cd658801 100644 --- a/extras/spark-ganglia-lgpl/pom.xml +++ b/extras/spark-ganglia-lgpl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../../pom.xml diff --git a/graphx/pom.xml b/graphx/pom.xml index 2a07d8a48c1ec..817f3fb18c538 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../pom.xml diff --git a/mllib/pom.xml b/mllib/pom.xml index 3b5c7e81211c2..4c67de8b24b67 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../pom.xml diff --git a/pom.xml b/pom.xml index 609e9ce8a4921..f04d7e436f7c6 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT pom Spark Project Parent POM http://spark.apache.org/ @@ -40,7 +40,7 @@ scm:git:git@github.com:apache/spark.git scm:git:https://git-wip-us.apache.org/repos/asf/spark.git scm:git:git@github.com:apache/spark.git - v1.0.0-rc11 + HEAD diff --git a/repl/pom.xml b/repl/pom.xml index debf0588916eb..4670e87e00935 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../pom.xml diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 903001de74cb1..d3f65de16f5f7 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../../pom.xml diff --git a/sql/core/pom.xml b/sql/core/pom.xml index b7b7b587111ab..6afed6df7e48d 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../../pom.xml diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index bd5e92b0a2ee6..0c5565707e00c 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../../pom.xml diff --git a/streaming/pom.xml b/streaming/pom.xml index f48ff8f9ea196..c2bf2ed26fadc 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../pom.xml diff --git a/tools/pom.xml b/tools/pom.xml index 7e8f2d338533a..1339a702a0dba 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../pom.xml diff --git a/yarn/pom.xml b/yarn/pom.xml index 5b2fcea2ec491..983ce6f7e2555 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.0.1-SNAPSHOT ../pom.xml diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml index b12b591d72079..55687ebfe8c8d 100644 --- a/yarn/stable/pom.xml +++ b/yarn/stable/pom.xml @@ -20,7 +20,7 @@ org.apache.spark yarn-parent_2.10 - 1.0.0 + 1.0.1-SNAPSHOT ../pom.xml From 7a831636c4a9320f4b9022ed23936ad1d3866cf7 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 26 May 2014 00:17:20 -0700 Subject: [PATCH 02/21] [SPARK-1914] [SQL] Simplify CountFunction not to traverse to evaluate all child expressions. `CountFunction` should count up only if the child's evaluated value is not null. Because it traverses to evaluate all child expressions, even if the child is null, it counts up if one of the all children is not null. Author: Takuya UESHIN Closes #861 from ueshin/issues/SPARK-1914 and squashes the following commits: 3b37315 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-1914 2afa238 [Takuya UESHIN] Simplify CountFunction not to traverse to evaluate all child expressions. (cherry picked from commit d6395d86f90d1c47c5b6ad17c618b56e00b7fc85) Signed-off-by: Reynold Xin --- .../apache/spark/sql/catalyst/expressions/aggregates.scala | 4 ++-- .../src/test/scala/org/apache/spark/sql/DslQuerySuite.scala | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 1bcd4e22766a9..79937b129aeae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -298,8 +298,8 @@ case class CountFunction(expr: Expression, base: AggregateExpression) extends Ag var count: Long = _ override def update(input: Row): Unit = { - val evaluatedExpr = expr.map(_.eval(input)) - if (evaluatedExpr.map(_ != null).reduceLeft(_ || _)) { + val evaluatedExpr = expr.eval(input) + if (evaluatedExpr != null) { count += 1L } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala index 692569a73ffcf..8197e8a18d447 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala @@ -125,6 +125,11 @@ class DslQuerySuite extends QueryTest { Seq((1,0), (2, 1)) ) + checkAnswer( + testData3.groupBy('a)('a, Count('a + 'b)), + Seq((1,0), (2, 1)) + ) + checkAnswer( testData3.groupBy()(Count('a), Count('b), Count(1), CountDistinct('a :: Nil), CountDistinct('b :: Nil)), (2, 1, 2, 2, 1) :: Nil From f09cb8506c9f6a562b5749dcc7dacb701c016ad2 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 26 May 2014 14:34:58 -0700 Subject: [PATCH 03/21] SPARK-1925: Replace '&' with '&&' JIRA: https://issues.apache.org/jira/browse/SPARK-1925 Author: zsxwing Closes #879 from zsxwing/SPARK-1925 and squashes the following commits: 5cf5a6d [zsxwing] SPARK-1925: Replace '&' with '&&' (cherry picked from commit cb7fe5034826844f1b50fbe8b92646317b66f21c) Signed-off-by: Reynold Xin --- .../main/scala/org/apache/spark/mllib/tree/DecisionTree.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala index 0fe30a3e7040b..3b13e52a7b445 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala @@ -401,7 +401,7 @@ object DecisionTree extends Serializable with Logging { */ def isSampleValid(parentFilters: List[Filter], labeledPoint: LabeledPoint): Boolean = { // leaf - if ((level > 0) & (parentFilters.length == 0)) { + if ((level > 0) && (parentFilters.length == 0)) { return false } @@ -454,7 +454,7 @@ object DecisionTree extends Serializable with Logging { val bin = binForFeatures(mid) val lowThreshold = bin.lowSplit.threshold val highThreshold = bin.highSplit.threshold - if ((lowThreshold < feature) & (highThreshold >= feature)){ + if ((lowThreshold < feature) && (highThreshold >= feature)){ return mid } else if (lowThreshold >= feature) { From f268548df0b7315b97518914fcacd8c64cb39954 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Mon, 26 May 2014 16:10:22 -0700 Subject: [PATCH 04/21] [SPARK-1931] Reconstruct routing tables in Graph.partitionBy 905173df57b90f90ebafb22e43f55164445330e6 introduced a bug in partitionBy where, after repartitioning the edges, it reuses the VertexRDD without updating the routing tables to reflect the new edge layout. Subsequent accesses of the triplets contain nulls for many vertex properties. This commit adds a test for this bug and fixes it by introducing `VertexRDD#withEdges` and calling it in `partitionBy`. Author: Ankur Dave Closes #885 from ankurdave/SPARK-1931 and squashes the following commits: 3930cdd [Ankur Dave] Note how to set up VertexRDD for efficient joins 9bdbaa4 [Ankur Dave] [SPARK-1931] Reconstruct routing tables in Graph.partitionBy (cherry picked from commit 56c771cb2d00a5843c391ae6561536ee46e535d4) Signed-off-by: Reynold Xin --- .../scala/org/apache/spark/graphx/VertexRDD.scala | 12 ++++++++++++ .../org/apache/spark/graphx/impl/GraphImpl.scala | 13 +++++++++---- .../scala/org/apache/spark/graphx/GraphSuite.scala | 10 ++++++++++ 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 8c62897037b6d..8b910fbc5a423 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -300,6 +300,18 @@ class VertexRDD[@specialized VD: ClassTag]( def reverseRoutingTables(): VertexRDD[VD] = this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse)) + /** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */ + def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = { + val routingTables = VertexRDD.createRoutingTables(edges, this.partitioner.get) + val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) { + (partIter, routingTableIter) => + val routingTable = + if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty + partIter.map(_.withRoutingTable(routingTable)) + } + new VertexRDD(vertexPartitions) + } + /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */ private[graphx] def shipVertexAttributes( shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 2f2d0e03fd7b5..1649b244d2881 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -88,8 +88,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } val edgePartition = builder.toEdgePartition Iterator((pid, edgePartition)) - }, preservesPartitioning = true)) - GraphImpl.fromExistingRDDs(vertices, newEdges) + }, preservesPartitioning = true)).cache() + GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges) } override def reverse: Graph[VD, ED] = { @@ -277,7 +277,11 @@ object GraphImpl { GraphImpl(vertexRDD, edgeRDD) } - /** Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. */ + /** + * Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. The + * VertexRDD must already be set up for efficient joins with the EdgeRDD by calling + * `VertexRDD.withEdges` or an appropriate VertexRDD constructor. + */ def apply[VD: ClassTag, ED: ClassTag]( vertices: VertexRDD[VD], edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = { @@ -290,7 +294,8 @@ object GraphImpl { /** * Create a graph from a VertexRDD and an EdgeRDD with the same replicated vertex type as the - * vertices. + * vertices. The VertexRDD must already be set up for efficient joins with the EdgeRDD by calling + * `VertexRDD.withEdges` or an appropriate VertexRDD constructor. */ def fromExistingRDDs[VD: ClassTag, ED: ClassTag]( vertices: VertexRDD[VD], diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 7b9bac5d9c8ea..abc25d0671133 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -133,6 +133,16 @@ class GraphSuite extends FunSuite with LocalSparkContext { Iterator((part.srcIds ++ part.dstIds).toSet) }.collect assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound)) + + // Forming triplets view + val g = Graph( + sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))), + sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2)) + assert(g.triplets.collect.map(_.toTuple).toSet === + Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1))) + val gPart = g.partitionBy(EdgePartition2D) + assert(gPart.triplets.collect.map(_.toTuple).toSet === + Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1))) } } From 9bcd999252349fb132f3540ef6cb53f5e05e474f Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 26 May 2014 21:40:52 -0700 Subject: [PATCH 05/21] Updated dev Python scripts to make them PEP8 compliant. Author: Reynold Xin Closes #875 from rxin/pep8-dev-scripts and squashes the following commits: 04b084f [Reynold Xin] Made dev Python scripts PEP8 compliant. (cherry picked from commit 9ed37190f45fd9e6aa0f2c73b66d317732a53eb8) Signed-off-by: Reynold Xin --- dev/audit-release/audit_release.py | 225 ++++++------ dev/create-release/generate-changelist.py | 160 ++++----- dev/merge_spark_pr.py | 402 +++++++++++----------- 3 files changed, 408 insertions(+), 379 deletions(-) diff --git a/dev/audit-release/audit_release.py b/dev/audit-release/audit_release.py index 8c7573b91f688..230e900ecd4de 100755 --- a/dev/audit-release/audit_release.py +++ b/dev/audit-release/audit_release.py @@ -30,18 +30,18 @@ import time import urllib2 -## Fill in release details here: +# Fill in release details here: RELEASE_URL = "http://people.apache.org/~pwendell/spark-1.0.0-rc1/" RELEASE_KEY = "9E4FE3AF" RELEASE_REPOSITORY = "https://repository.apache.org/content/repositories/orgapachespark-1006/" RELEASE_VERSION = "1.0.0" SCALA_VERSION = "2.10.4" SCALA_BINARY_VERSION = "2.10" -## +# LOG_FILE_NAME = "spark_audit_%s" % time.strftime("%h_%m_%Y_%I_%M_%S") LOG_FILE = open(LOG_FILE_NAME, 'w') -WORK_DIR = "/tmp/audit_%s" % int(time.time()) +WORK_DIR = "/tmp/audit_%s" % int(time.time()) MAVEN_CMD = "mvn" GPG_CMD = "gpg" @@ -50,54 +50,62 @@ # Track failures failures = [] + def clean_work_files(): - print "OK to delete scratch directory '%s'? (y/N): " % WORK_DIR - response = raw_input() - if response == "y": - shutil.rmtree(WORK_DIR) - print "Should I delete the log output file '%s'? (y/N): " % LOG_FILE_NAME - response = raw_input() - if response == "y": - os.unlink(LOG_FILE_NAME) + print "OK to delete scratch directory '%s'? (y/N): " % WORK_DIR + response = raw_input() + if response == "y": + shutil.rmtree(WORK_DIR) + print "Should I delete the log output file '%s'? (y/N): " % LOG_FILE_NAME + response = raw_input() + if response == "y": + os.unlink(LOG_FILE_NAME) + def run_cmd(cmd, exit_on_failure=True): - print >> LOG_FILE, "Running command: %s" % cmd - ret = subprocess.call(cmd, shell=True, stdout=LOG_FILE, stderr=LOG_FILE) - if ret != 0 and exit_on_failure: - print "Command failed: %s" % cmd - clean_work_files() - sys.exit(-1) - return ret + print >> LOG_FILE, "Running command: %s" % cmd + ret = subprocess.call(cmd, shell=True, stdout=LOG_FILE, stderr=LOG_FILE) + if ret != 0 and exit_on_failure: + print "Command failed: %s" % cmd + clean_work_files() + sys.exit(-1) + return ret + def run_cmd_with_output(cmd): - print >> sys.stderr, "Running command: %s" % cmd - return subprocess.check_output(cmd, shell=True, stderr=LOG_FILE) + print >> sys.stderr, "Running command: %s" % cmd + return subprocess.check_output(cmd, shell=True, stderr=LOG_FILE) + def test(bool, str): - if bool: - return passed(str) - failed(str) + if bool: + return passed(str) + failed(str) + def passed(str): - print "[PASSED] %s" % str + print "[PASSED] %s" % str + def failed(str): - failures.append(str) - print "[**FAILED**] %s" % str + failures.append(str) + print "[**FAILED**] %s" % str + def get_url(url): - return urllib2.urlopen(url).read() + return urllib2.urlopen(url).read() + original_dir = os.getcwd() -# For each of these modules, we'll test an 'empty' application in sbt and +# For each of these modules, we'll test an 'empty' application in sbt and # maven that links against them. This will catch issues with messed up # dependencies within those projects. modules = [ - "spark-core", "spark-bagel", "spark-mllib", "spark-streaming", "spark-repl", - "spark-graphx", "spark-streaming-flume", "spark-streaming-kafka", - "spark-streaming-mqtt", "spark-streaming-twitter", "spark-streaming-zeromq", - "spark-catalyst", "spark-sql", "spark-hive" + "spark-core", "spark-bagel", "spark-mllib", "spark-streaming", "spark-repl", + "spark-graphx", "spark-streaming-flume", "spark-streaming-kafka", + "spark-streaming-mqtt", "spark-streaming-twitter", "spark-streaming-zeromq", + "spark-catalyst", "spark-sql", "spark-hive" ] modules = map(lambda m: "%s_%s" % (m, SCALA_BINARY_VERSION), modules) @@ -106,54 +114,57 @@ def get_url(url): cache_ivy_spark = "~/.ivy2/cache/org.apache.spark" local_maven_kafka = "~/.m2/repository/org/apache/kafka" local_maven_kafka = "~/.m2/repository/org/apache/spark" + + def ensure_path_not_present(x): - if os.path.exists(os.path.expanduser(x)): - print "Please remove %s, it can interfere with testing published artifacts." % x - sys.exit(-1) + if os.path.exists(os.path.expanduser(x)): + print "Please remove %s, it can interfere with testing published artifacts." % x + sys.exit(-1) + map(ensure_path_not_present, [local_ivy_spark, cache_ivy_spark, local_maven_kafka]) -# SBT build tests +# SBT build tests os.chdir("blank_sbt_build") os.environ["SPARK_VERSION"] = RELEASE_VERSION os.environ["SCALA_VERSION"] = SCALA_VERSION os.environ["SPARK_RELEASE_REPOSITORY"] = RELEASE_REPOSITORY os.environ["SPARK_AUDIT_MASTER"] = "local" for module in modules: - os.environ["SPARK_MODULE"] = module - ret = run_cmd("sbt clean update", exit_on_failure=False) - test(ret == 0, "sbt build against '%s' module" % module) + os.environ["SPARK_MODULE"] = module + ret = run_cmd("sbt clean update", exit_on_failure=False) + test(ret == 0, "sbt build against '%s' module" % module) os.chdir(original_dir) # SBT application tests for app in ["sbt_app_core", "sbt_app_graphx", "sbt_app_streaming", "sbt_app_sql", "sbt_app_hive"]: - os.chdir(app) - ret = run_cmd("sbt clean run", exit_on_failure=False) - test(ret == 0, "sbt application (%s)" % app) - os.chdir(original_dir) + os.chdir(app) + ret = run_cmd("sbt clean run", exit_on_failure=False) + test(ret == 0, "sbt application (%s)" % app) + os.chdir(original_dir) # Maven build tests os.chdir("blank_maven_build") for module in modules: - cmd = ('%s --update-snapshots -Dspark.release.repository="%s" -Dspark.version="%s" ' - '-Dspark.module="%s" clean compile' % - (MAVEN_CMD, RELEASE_REPOSITORY, RELEASE_VERSION, module)) - ret = run_cmd(cmd, exit_on_failure=False) - test(ret == 0, "maven build against '%s' module" % module) + cmd = ('%s --update-snapshots -Dspark.release.repository="%s" -Dspark.version="%s" ' + '-Dspark.module="%s" clean compile' % + (MAVEN_CMD, RELEASE_REPOSITORY, RELEASE_VERSION, module)) + ret = run_cmd(cmd, exit_on_failure=False) + test(ret == 0, "maven build against '%s' module" % module) os.chdir(original_dir) os.chdir("maven_app_core") mvn_exec_cmd = ('%s --update-snapshots -Dspark.release.repository="%s" -Dspark.version="%s" ' '-Dscala.binary.version="%s" clean compile ' - 'exec:java -Dexec.mainClass="SimpleApp"' % - (MAVEN_CMD, RELEASE_REPOSITORY, RELEASE_VERSION, SCALA_BINARY_VERSION)) + 'exec:java -Dexec.mainClass="SimpleApp"' % + (MAVEN_CMD, RELEASE_REPOSITORY, RELEASE_VERSION, SCALA_BINARY_VERSION)) ret = run_cmd(mvn_exec_cmd, exit_on_failure=False) test(ret == 0, "maven application (core)") os.chdir(original_dir) # Binary artifact tests if os.path.exists(WORK_DIR): - print "Working directory '%s' already exists" % WORK_DIR - sys.exit(-1) + print "Working directory '%s' already exists" % WORK_DIR + sys.exit(-1) os.mkdir(WORK_DIR) os.chdir(WORK_DIR) @@ -162,66 +173,66 @@ def ensure_path_not_present(x): artifacts = r.findall(index_page) for artifact in artifacts: - print "==== Verifying download integrity for artifact: %s ====" % artifact - - artifact_url = "%s/%s" % (RELEASE_URL, artifact) - run_cmd("wget %s" % artifact_url) - - key_file = "%s.asc" % artifact - run_cmd("wget %s/%s" % (RELEASE_URL, key_file)) - - run_cmd("wget %s%s" % (artifact_url, ".sha")) - - # Verify signature - run_cmd("%s --keyserver pgp.mit.edu --recv-key %s" % (GPG_CMD, RELEASE_KEY)) - run_cmd("%s %s" % (GPG_CMD, key_file)) - passed("Artifact signature verified.") - - # Verify md5 - my_md5 = run_cmd_with_output("%s --print-md MD5 %s" % (GPG_CMD, artifact)).strip() - release_md5 = get_url("%s.md5" % artifact_url).strip() - test(my_md5 == release_md5, "Artifact MD5 verified.") - - # Verify sha - my_sha = run_cmd_with_output("%s --print-md SHA512 %s" % (GPG_CMD, artifact)).strip() - release_sha = get_url("%s.sha" % artifact_url).strip() - test(my_sha == release_sha, "Artifact SHA verified.") - - # Verify Apache required files - dir_name = artifact.replace(".tgz", "") - run_cmd("tar xvzf %s" % artifact) - base_files = os.listdir(dir_name) - test("CHANGES.txt" in base_files, "Tarball contains CHANGES.txt file") - test("NOTICE" in base_files, "Tarball contains NOTICE file") - test("LICENSE" in base_files, "Tarball contains LICENSE file") - - os.chdir(WORK_DIR) - + print "==== Verifying download integrity for artifact: %s ====" % artifact + + artifact_url = "%s/%s" % (RELEASE_URL, artifact) + run_cmd("wget %s" % artifact_url) + + key_file = "%s.asc" % artifact + run_cmd("wget %s/%s" % (RELEASE_URL, key_file)) + + run_cmd("wget %s%s" % (artifact_url, ".sha")) + + # Verify signature + run_cmd("%s --keyserver pgp.mit.edu --recv-key %s" % (GPG_CMD, RELEASE_KEY)) + run_cmd("%s %s" % (GPG_CMD, key_file)) + passed("Artifact signature verified.") + + # Verify md5 + my_md5 = run_cmd_with_output("%s --print-md MD5 %s" % (GPG_CMD, artifact)).strip() + release_md5 = get_url("%s.md5" % artifact_url).strip() + test(my_md5 == release_md5, "Artifact MD5 verified.") + + # Verify sha + my_sha = run_cmd_with_output("%s --print-md SHA512 %s" % (GPG_CMD, artifact)).strip() + release_sha = get_url("%s.sha" % artifact_url).strip() + test(my_sha == release_sha, "Artifact SHA verified.") + + # Verify Apache required files + dir_name = artifact.replace(".tgz", "") + run_cmd("tar xvzf %s" % artifact) + base_files = os.listdir(dir_name) + test("CHANGES.txt" in base_files, "Tarball contains CHANGES.txt file") + test("NOTICE" in base_files, "Tarball contains NOTICE file") + test("LICENSE" in base_files, "Tarball contains LICENSE file") + + os.chdir(WORK_DIR) + for artifact in artifacts: - print "==== Verifying build and tests for artifact: %s ====" % artifact - os.chdir(os.path.join(WORK_DIR, dir_name)) - - os.environ["MAVEN_OPTS"] = "-Xmx3g -XX:MaxPermSize=1g -XX:ReservedCodeCacheSize=1g" - # Verify build - print "==> Running build" - run_cmd("sbt assembly") - passed("sbt build successful") - run_cmd("%s package -DskipTests" % MAVEN_CMD) - passed("Maven build successful") - - # Verify tests - print "==> Performing unit tests" - run_cmd("%s test" % MAVEN_CMD) - passed("Tests successful") - os.chdir(WORK_DIR) + print "==== Verifying build and tests for artifact: %s ====" % artifact + os.chdir(os.path.join(WORK_DIR, dir_name)) + + os.environ["MAVEN_OPTS"] = "-Xmx3g -XX:MaxPermSize=1g -XX:ReservedCodeCacheSize=1g" + # Verify build + print "==> Running build" + run_cmd("sbt assembly") + passed("sbt build successful") + run_cmd("%s package -DskipTests" % MAVEN_CMD) + passed("Maven build successful") + + # Verify tests + print "==> Performing unit tests" + run_cmd("%s test" % MAVEN_CMD) + passed("Tests successful") + os.chdir(WORK_DIR) clean_work_files() if len(failures) == 0: - print "ALL TESTS PASSED" + print "ALL TESTS PASSED" else: - print "SOME TESTS DID NOT PASS" - for f in failures: - print f + print "SOME TESTS DID NOT PASS" + for f in failures: + print f os.chdir(original_dir) diff --git a/dev/create-release/generate-changelist.py b/dev/create-release/generate-changelist.py index 13b744ec1b37e..de1b5d4ae1314 100755 --- a/dev/create-release/generate-changelist.py +++ b/dev/create-release/generate-changelist.py @@ -29,16 +29,16 @@ import subprocess import time import traceback - + SPARK_HOME = os.environ["SPARK_HOME"] NEW_RELEASE_VERSION = "1.0.0" PREV_RELEASE_GIT_TAG = "v0.9.1" - -CHANGELIST = "CHANGES.txt" + +CHANGELIST = "CHANGES.txt" OLD_CHANGELIST = "%s.old" % (CHANGELIST) NEW_CHANGELIST = "%s.new" % (CHANGELIST) TMP_CHANGELIST = "%s.tmp" % (CHANGELIST) - + # date before first PR in TLP Spark repo SPARK_REPO_CHANGE_DATE1 = time.strptime("2014-02-26", "%Y-%m-%d") # date after last PR in incubator Spark repo @@ -46,99 +46,103 @@ # Threshold PR number that differentiates PRs to TLP # and incubator repos SPARK_REPO_PR_NUM_THRESH = 200 - + LOG_FILE_NAME = "changes_%s" % time.strftime("%h_%m_%Y_%I_%M_%S") LOG_FILE = open(LOG_FILE_NAME, 'w') - + + def run_cmd(cmd): - try: - print >> LOG_FILE, "Running command: %s" % cmd - output = subprocess.check_output(cmd, shell=True, stderr=LOG_FILE) - print >> LOG_FILE, "Output: %s" % output - return output - except: - traceback.print_exc() - cleanup() - sys.exit(1) - + try: + print >> LOG_FILE, "Running command: %s" % cmd + output = subprocess.check_output(cmd, shell=True, stderr=LOG_FILE) + print >> LOG_FILE, "Output: %s" % output + return output + except: + traceback.print_exc() + cleanup() + sys.exit(1) + + def append_to_changelist(string): - with open(TMP_CHANGELIST, "a") as f: - print >> f, string - -def cleanup(ask = True): - if ask == True: - print "OK to delete temporary and log files? (y/N): " - response = raw_input() - if ask == False or (ask == True and response == "y"): - if os.path.isfile(TMP_CHANGELIST): - os.remove(TMP_CHANGELIST) - if os.path.isfile(OLD_CHANGELIST): - os.remove(OLD_CHANGELIST) - LOG_FILE.close() - os.remove(LOG_FILE_NAME) - + with open(TMP_CHANGELIST, "a") as f: + print >> f, string + + +def cleanup(ask=True): + if ask is True: + print "OK to delete temporary and log files? (y/N): " + response = raw_input() + if ask is False or (ask is True and response == "y"): + if os.path.isfile(TMP_CHANGELIST): + os.remove(TMP_CHANGELIST) + if os.path.isfile(OLD_CHANGELIST): + os.remove(OLD_CHANGELIST) + LOG_FILE.close() + os.remove(LOG_FILE_NAME) + + print "Generating new %s for Spark release %s" % (CHANGELIST, NEW_RELEASE_VERSION) os.chdir(SPARK_HOME) if os.path.isfile(TMP_CHANGELIST): - os.remove(TMP_CHANGELIST) + os.remove(TMP_CHANGELIST) if os.path.isfile(OLD_CHANGELIST): - os.remove(OLD_CHANGELIST) - + os.remove(OLD_CHANGELIST) + append_to_changelist("Spark Change Log") append_to_changelist("----------------") append_to_changelist("") append_to_changelist("Release %s" % NEW_RELEASE_VERSION) append_to_changelist("") - + print "Getting commits between tag %s and HEAD" % PREV_RELEASE_GIT_TAG hashes = run_cmd("git log %s..HEAD --pretty='%%h'" % PREV_RELEASE_GIT_TAG).split() - + print "Getting details of %s commits" % len(hashes) for h in hashes: - date = run_cmd("git log %s -1 --pretty='%%ad' --date=iso | head -1" % h).strip() - subject = run_cmd("git log %s -1 --pretty='%%s' | head -1" % h).strip() - body = run_cmd("git log %s -1 --pretty='%%b'" % h) - committer = run_cmd("git log %s -1 --pretty='%%cn <%%ce>' | head -1" % h).strip() - body_lines = body.split("\n") - - if "Merge pull" in subject: - ## Parse old format commit message - append_to_changelist(" %s %s" % (h, date)) - append_to_changelist(" %s" % subject) - append_to_changelist(" [%s]" % body_lines[0]) - append_to_changelist("") - - elif "maven-release" not in subject: - ## Parse new format commit message - # Get authors from commit message, committer otherwise - authors = [committer] - if "Author:" in body: - authors = [line.split(":")[1].strip() for line in body_lines if "Author:" in line] - - # Generate GitHub PR URL for easy access if possible - github_url = "" - if "Closes #" in body: - pr_num = [line.split()[1].lstrip("#") for line in body_lines if "Closes #" in line][0] - github_url = "github.com/apache/spark/pull/%s" % pr_num - day = time.strptime(date.split()[0], "%Y-%m-%d") - if day < SPARK_REPO_CHANGE_DATE1 or (day < SPARK_REPO_CHANGE_DATE2 and pr_num < SPARK_REPO_PR_NUM_THRESH): - github_url = "github.com/apache/incubator-spark/pull/%s" % pr_num - - append_to_changelist(" %s" % subject) - append_to_changelist(" %s" % ', '.join(authors)) - # for author in authors: - # append_to_changelist(" %s" % author) - append_to_changelist(" %s" % date) - if len(github_url) > 0: - append_to_changelist(" Commit: %s, %s" % (h, github_url)) - else: - append_to_changelist(" Commit: %s" % h) - append_to_changelist("") - + date = run_cmd("git log %s -1 --pretty='%%ad' --date=iso | head -1" % h).strip() + subject = run_cmd("git log %s -1 --pretty='%%s' | head -1" % h).strip() + body = run_cmd("git log %s -1 --pretty='%%b'" % h) + committer = run_cmd("git log %s -1 --pretty='%%cn <%%ce>' | head -1" % h).strip() + body_lines = body.split("\n") + + if "Merge pull" in subject: + # Parse old format commit message + append_to_changelist(" %s %s" % (h, date)) + append_to_changelist(" %s" % subject) + append_to_changelist(" [%s]" % body_lines[0]) + append_to_changelist("") + + elif "maven-release" not in subject: + # Parse new format commit message + # Get authors from commit message, committer otherwise + authors = [committer] + if "Author:" in body: + authors = [line.split(":")[1].strip() for line in body_lines if "Author:" in line] + + # Generate GitHub PR URL for easy access if possible + github_url = "" + if "Closes #" in body: + pr_num = [line.split()[1].lstrip("#") for line in body_lines if "Closes #" in line][0] + github_url = "github.com/apache/spark/pull/%s" % pr_num + day = time.strptime(date.split()[0], "%Y-%m-%d") + if day < SPARK_REPO_CHANGE_DATE1 or + (day < SPARK_REPO_CHANGE_DATE2 and pr_num < SPARK_REPO_PR_NUM_THRESH): + github_url = "github.com/apache/incubator-spark/pull/%s" % pr_num + + append_to_changelist(" %s" % subject) + append_to_changelist(" %s" % ', '.join(authors)) + # for author in authors: + # append_to_changelist(" %s" % author) + append_to_changelist(" %s" % date) + if len(github_url) > 0: + append_to_changelist(" Commit: %s, %s" % (h, github_url)) + else: + append_to_changelist(" Commit: %s" % h) + append_to_changelist("") + # Append old change list -print "Appending changelist from tag %s" % PREV_RELEASE_GIT_TAG +print "Appending changelist from tag %s" % PREV_RELEASE_GIT_TAG run_cmd("git show %s:%s | tail -n +3 >> %s" % (PREV_RELEASE_GIT_TAG, CHANGELIST, TMP_CHANGELIST)) run_cmd("cp %s %s" % (TMP_CHANGELIST, NEW_CHANGELIST)) print "New change list generated as %s" % NEW_CHANGELIST cleanup(False) - diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py index 83618c8068d35..7f744d5589ef7 100755 --- a/dev/merge_spark_pr.py +++ b/dev/merge_spark_pr.py @@ -21,7 +21,7 @@ # usage: ./apache-pr-merge.py (see config env vars below) # # This utility assumes you already have local a Spark git folder and that you -# have added remotes corresponding to both (i) the github apache Spark +# have added remotes corresponding to both (i) the github apache Spark # mirror and (ii) the apache git repo. import json @@ -33,10 +33,10 @@ import urllib2 try: - import jira.client - JIRA_IMPORTED=True + import jira.client + JIRA_IMPORTED = True except ImportError: - JIRA_IMPORTED=False + JIRA_IMPORTED = False # Location of your Spark git development area SPARK_HOME = os.environ.get("SPARK_HOME", "/home/patrick/Documents/spark") @@ -58,204 +58,217 @@ os.chdir(SPARK_HOME) + def get_json(url): - try: - return json.load(urllib2.urlopen(url)) - except urllib2.HTTPError as e: - print "Unable to fetch URL, exiting: %s" % url - sys.exit(-1) + try: + return json.load(urllib2.urlopen(url)) + except urllib2.HTTPError as e: + print "Unable to fetch URL, exiting: %s" % url + sys.exit(-1) + def fail(msg): - print msg - clean_up() - sys.exit(-1) + print msg + clean_up() + sys.exit(-1) + def run_cmd(cmd): - if isinstance(cmd, list): - return subprocess.check_output(cmd) - else: - return subprocess.check_output(cmd.split(" ")) + if isinstance(cmd, list): + return subprocess.check_output(cmd) + else: + return subprocess.check_output(cmd.split(" ")) + def continue_maybe(prompt): - result = raw_input("\n%s (y/n): " % prompt) - if result.lower() != "y": - fail("Okay, exiting") + result = raw_input("\n%s (y/n): " % prompt) + if result.lower() != "y": + fail("Okay, exiting") + original_head = run_cmd("git rev-parse HEAD")[:8] + def clean_up(): - print "Restoring head pointer to %s" % original_head - run_cmd("git checkout %s" % original_head) + print "Restoring head pointer to %s" % original_head + run_cmd("git checkout %s" % original_head) + + branches = run_cmd("git branch").replace(" ", "").split("\n") - branches = run_cmd("git branch").replace(" ", "").split("\n") + for branch in filter(lambda x: x.startswith(BRANCH_PREFIX), branches): + print "Deleting local branch %s" % branch + run_cmd("git branch -D %s" % branch) - for branch in filter(lambda x: x.startswith(BRANCH_PREFIX), branches): - print "Deleting local branch %s" % branch - run_cmd("git branch -D %s" % branch) # merge the requested PR and return the merge hash def merge_pr(pr_num, target_ref): - pr_branch_name = "%s_MERGE_PR_%s" % (BRANCH_PREFIX, pr_num) - target_branch_name = "%s_MERGE_PR_%s_%s" % (BRANCH_PREFIX, pr_num, target_ref.upper()) - run_cmd("git fetch %s pull/%s/head:%s" % (PR_REMOTE_NAME, pr_num, pr_branch_name)) - run_cmd("git fetch %s %s:%s" % (PUSH_REMOTE_NAME, target_ref, target_branch_name)) - run_cmd("git checkout %s" % target_branch_name) - - had_conflicts = False - try: - run_cmd(['git', 'merge', pr_branch_name, '--squash']) - except Exception as e: - msg = "Error merging: %s\nWould you like to manually fix-up this merge?" % e - continue_maybe(msg) - msg = "Okay, please fix any conflicts and 'git add' conflicting files... Finished?" - continue_maybe(msg) - had_conflicts = True - - commit_authors = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name, - '--pretty=format:%an <%ae>']).split("\n") - distinct_authors = sorted(set(commit_authors), key=lambda x: commit_authors.count(x), - reverse=True) - primary_author = distinct_authors[0] - commits = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name, - '--pretty=format:%h [%an] %s']).split("\n\n") - - merge_message_flags = [] - - for p in [title, body]: - merge_message_flags += ["-m", p] - - authors = "\n".join(["Author: %s" % a for a in distinct_authors]) - - merge_message_flags += ["-m", authors] + pr_branch_name = "%s_MERGE_PR_%s" % (BRANCH_PREFIX, pr_num) + target_branch_name = "%s_MERGE_PR_%s_%s" % (BRANCH_PREFIX, pr_num, target_ref.upper()) + run_cmd("git fetch %s pull/%s/head:%s" % (PR_REMOTE_NAME, pr_num, pr_branch_name)) + run_cmd("git fetch %s %s:%s" % (PUSH_REMOTE_NAME, target_ref, target_branch_name)) + run_cmd("git checkout %s" % target_branch_name) + + had_conflicts = False + try: + run_cmd(['git', 'merge', pr_branch_name, '--squash']) + except Exception as e: + msg = "Error merging: %s\nWould you like to manually fix-up this merge?" % e + continue_maybe(msg) + msg = "Okay, please fix any conflicts and 'git add' conflicting files... Finished?" + continue_maybe(msg) + had_conflicts = True + + commit_authors = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name, + '--pretty=format:%an <%ae>']).split("\n") + distinct_authors = sorted(set(commit_authors), + key=lambda x: commit_authors.count(x), reverse=True) + primary_author = distinct_authors[0] + commits = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name, + '--pretty=format:%h [%an] %s']).split("\n\n") + + merge_message_flags = [] + + for p in [title, body]: + merge_message_flags += ["-m", p] + + authors = "\n".join(["Author: %s" % a for a in distinct_authors]) + + merge_message_flags += ["-m", authors] + + if had_conflicts: + committer_name = run_cmd("git config --get user.name").strip() + committer_email = run_cmd("git config --get user.email").strip() + message = "This patch had conflicts when merged, resolved by\nCommitter: %s <%s>" % ( + committer_name, committer_email) + merge_message_flags += ["-m", message] + + # The string "Closes #%s" string is required for GitHub to correctly close the PR + merge_message_flags += [ + "-m", + "Closes #%s from %s and squashes the following commits:" % (pr_num, pr_repo_desc)] + for c in commits: + merge_message_flags += ["-m", c] + + run_cmd(['git', 'commit', '--author="%s"' % primary_author] + merge_message_flags) + + continue_maybe("Merge complete (local ref %s). Push to %s?" % ( + target_branch_name, PUSH_REMOTE_NAME)) + + try: + run_cmd('git push %s %s:%s' % (PUSH_REMOTE_NAME, target_branch_name, target_ref)) + except Exception as e: + clean_up() + fail("Exception while pushing: %s" % e) + + merge_hash = run_cmd("git rev-parse %s" % target_branch_name)[:8] + clean_up() + print("Pull request #%s merged!" % pr_num) + print("Merge hash: %s" % merge_hash) + return merge_hash - if had_conflicts: - committer_name = run_cmd("git config --get user.name").strip() - committer_email = run_cmd("git config --get user.email").strip() - message = "This patch had conflicts when merged, resolved by\nCommitter: %s <%s>" % ( - committer_name, committer_email) - merge_message_flags += ["-m", message] - # The string "Closes #%s" string is required for GitHub to correctly close the PR - merge_message_flags += ["-m", - "Closes #%s from %s and squashes the following commits:" % (pr_num, pr_repo_desc)] - for c in commits: - merge_message_flags += ["-m", c] +def cherry_pick(pr_num, merge_hash, default_branch): + pick_ref = raw_input("Enter a branch name [%s]: " % default_branch) + if pick_ref == "": + pick_ref = default_branch - run_cmd(['git', 'commit', '--author="%s"' % primary_author] + merge_message_flags) + pick_branch_name = "%s_PICK_PR_%s_%s" % (BRANCH_PREFIX, pr_num, pick_ref.upper()) - continue_maybe("Merge complete (local ref %s). Push to %s?" % ( - target_branch_name, PUSH_REMOTE_NAME)) + run_cmd("git fetch %s %s:%s" % (PUSH_REMOTE_NAME, pick_ref, pick_branch_name)) + run_cmd("git checkout %s" % pick_branch_name) + run_cmd("git cherry-pick -sx %s" % merge_hash) - try: - run_cmd('git push %s %s:%s' % (PUSH_REMOTE_NAME, target_branch_name, target_ref)) - except Exception as e: - clean_up() - fail("Exception while pushing: %s" % e) - - merge_hash = run_cmd("git rev-parse %s" % target_branch_name)[:8] - clean_up() - print("Pull request #%s merged!" % pr_num) - print("Merge hash: %s" % merge_hash) - return merge_hash + continue_maybe("Pick complete (local ref %s). Push to %s?" % ( + pick_branch_name, PUSH_REMOTE_NAME)) + try: + run_cmd('git push %s %s:%s' % (PUSH_REMOTE_NAME, pick_branch_name, pick_ref)) + except Exception as e: + clean_up() + fail("Exception while pushing: %s" % e) -def cherry_pick(pr_num, merge_hash, default_branch): - pick_ref = raw_input("Enter a branch name [%s]: " % default_branch) - if pick_ref == "": - pick_ref = default_branch - - pick_branch_name = "%s_PICK_PR_%s_%s" % (BRANCH_PREFIX, pr_num, pick_ref.upper()) - - run_cmd("git fetch %s %s:%s" % (PUSH_REMOTE_NAME, pick_ref, pick_branch_name)) - run_cmd("git checkout %s" % pick_branch_name) - run_cmd("git cherry-pick -sx %s" % merge_hash) - - continue_maybe("Pick complete (local ref %s). Push to %s?" % ( - pick_branch_name, PUSH_REMOTE_NAME)) - - try: - run_cmd('git push %s %s:%s' % (PUSH_REMOTE_NAME, pick_branch_name, pick_ref)) - except Exception as e: + pick_hash = run_cmd("git rev-parse %s" % pick_branch_name)[:8] clean_up() - fail("Exception while pushing: %s" % e) - pick_hash = run_cmd("git rev-parse %s" % pick_branch_name)[:8] - clean_up() + print("Pull request #%s picked into %s!" % (pr_num, pick_ref)) + print("Pick hash: %s" % pick_hash) + return pick_ref - print("Pull request #%s picked into %s!" % (pr_num, pick_ref)) - print("Pick hash: %s" % pick_hash) - return pick_ref def fix_version_from_branch(branch, versions): - # Note: Assumes this is a sorted (newest->oldest) list of un-released versions - if branch == "master": - return versions[0] - else: - branch_ver = branch.replace("branch-", "") - return filter(lambda x: x.name.startswith(branch_ver), versions)[-1] + # Note: Assumes this is a sorted (newest->oldest) list of un-released versions + if branch == "master": + return versions[0] + else: + branch_ver = branch.replace("branch-", "") + return filter(lambda x: x.name.startswith(branch_ver), versions)[-1] + def resolve_jira(title, merge_branches, comment): - asf_jira = jira.client.JIRA({'server': JIRA_API_BASE}, - basic_auth=(JIRA_USERNAME, JIRA_PASSWORD)) - - default_jira_id = "" - search = re.findall("SPARK-[0-9]{4,5}", title) - if len(search) > 0: - default_jira_id = search[0] - - jira_id = raw_input("Enter a JIRA id [%s]: " % default_jira_id) - if jira_id == "": - jira_id = default_jira_id - - try: - issue = asf_jira.issue(jira_id) - except Exception as e: - fail("ASF JIRA could not find %s\n%s" % (jira_id, e)) - - cur_status = issue.fields.status.name - cur_summary = issue.fields.summary - cur_assignee = issue.fields.assignee - if cur_assignee == None: - cur_assignee = "NOT ASSIGNED!!!" - else: - cur_assignee = cur_assignee.displayName - - if cur_status == "Resolved" or cur_status == "Closed": - fail("JIRA issue %s already has status '%s'" % (jira_id, cur_status)) - print ("=== JIRA %s ===" % jira_id) - print ("summary\t\t%s\nassignee\t%s\nstatus\t\t%s\nurl\t\t%s/%s\n" % ( - cur_summary, cur_assignee, cur_status, JIRA_BASE, jira_id)) - - versions = asf_jira.project_versions("SPARK") - versions = sorted(versions, key = lambda x: x.name, reverse=True) - versions = filter(lambda x: x.raw['released'] == False, versions) - - default_fix_versions = map(lambda x: fix_version_from_branch(x, versions).name, merge_branches) - for v in default_fix_versions: - # Handles the case where we have forked a release branch but not yet made the release. - # In this case, if the PR is committed to the master branch and the release branch, we - # only consider the release branch to be the fix version. E.g. it is not valid to have - # both 1.1.0 and 1.0.0 as fix versions. - (major, minor, patch) = v.split(".") - if patch == "0": - previous = "%s.%s.%s" % (major, int(minor) - 1, 0) - if previous in default_fix_versions: - default_fix_versions = filter(lambda x: x != v, default_fix_versions) - default_fix_versions = ",".join(default_fix_versions) - - fix_versions = raw_input("Enter comma-separated fix version(s) [%s]: " % default_fix_versions) - if fix_versions == "": - fix_versions = default_fix_versions - fix_versions = fix_versions.replace(" ", "").split(",") - - def get_version_json(version_str): - return filter(lambda v: v.name == version_str, versions)[0].raw - jira_fix_versions = map(lambda v: get_version_json(v), fix_versions) - - resolve = filter(lambda a: a['name'] == "Resolve Issue", asf_jira.transitions(jira_id))[0] - asf_jira.transition_issue(jira_id, resolve["id"], fixVersions=jira_fix_versions, comment=comment) - - print "Succesfully resolved %s with fixVersions=%s!" % (jira_id, fix_versions) + asf_jira = jira.client.JIRA({'server': JIRA_API_BASE}, + basic_auth=(JIRA_USERNAME, JIRA_PASSWORD)) + + default_jira_id = "" + search = re.findall("SPARK-[0-9]{4,5}", title) + if len(search) > 0: + default_jira_id = search[0] + + jira_id = raw_input("Enter a JIRA id [%s]: " % default_jira_id) + if jira_id == "": + jira_id = default_jira_id + + try: + issue = asf_jira.issue(jira_id) + except Exception as e: + fail("ASF JIRA could not find %s\n%s" % (jira_id, e)) + + cur_status = issue.fields.status.name + cur_summary = issue.fields.summary + cur_assignee = issue.fields.assignee + if cur_assignee is None: + cur_assignee = "NOT ASSIGNED!!!" + else: + cur_assignee = cur_assignee.displayName + + if cur_status == "Resolved" or cur_status == "Closed": + fail("JIRA issue %s already has status '%s'" % (jira_id, cur_status)) + print ("=== JIRA %s ===" % jira_id) + print ("summary\t\t%s\nassignee\t%s\nstatus\t\t%s\nurl\t\t%s/%s\n" % ( + cur_summary, cur_assignee, cur_status, JIRA_BASE, jira_id)) + + versions = asf_jira.project_versions("SPARK") + versions = sorted(versions, key=lambda x: x.name, reverse=True) + versions = filter(lambda x: x.raw['released'] is False, versions) + + default_fix_versions = map(lambda x: fix_version_from_branch(x, versions).name, merge_branches) + for v in default_fix_versions: + # Handles the case where we have forked a release branch but not yet made the release. + # In this case, if the PR is committed to the master branch and the release branch, we + # only consider the release branch to be the fix version. E.g. it is not valid to have + # both 1.1.0 and 1.0.0 as fix versions. + (major, minor, patch) = v.split(".") + if patch == "0": + previous = "%s.%s.%s" % (major, int(minor) - 1, 0) + if previous in default_fix_versions: + default_fix_versions = filter(lambda x: x != v, default_fix_versions) + default_fix_versions = ",".join(default_fix_versions) + + fix_versions = raw_input("Enter comma-separated fix version(s) [%s]: " % default_fix_versions) + if fix_versions == "": + fix_versions = default_fix_versions + fix_versions = fix_versions.replace(" ", "").split(",") + + def get_version_json(version_str): + return filter(lambda v: v.name == version_str, versions)[0].raw + + jira_fix_versions = map(lambda v: get_version_json(v), fix_versions) + + resolve = filter(lambda a: a['name'] == "Resolve Issue", asf_jira.transitions(jira_id))[0] + asf_jira.transition_issue( + jira_id, resolve["id"], fixVersions=jira_fix_versions, comment=comment) + + print "Succesfully resolved %s with fixVersions=%s!" % (jira_id, fix_versions) + branches = get_json("%s/branches" % GITHUB_API_BASE) branch_names = filter(lambda x: x.startswith("branch-"), [x['name'] for x in branches]) @@ -273,28 +286,29 @@ def get_version_json(version_str): base_ref = pr["head"]["ref"] pr_repo_desc = "%s/%s" % (user_login, base_ref) -if pr["merged"] == True: - print "Pull request %s has already been merged, assuming you want to backport" % pr_num - merge_commit_desc = run_cmd(['git', 'log', '--merges', '--first-parent', - '--grep=pull request #%s' % pr_num, '--oneline']).split("\n")[0] - if merge_commit_desc == "": - fail("Couldn't find any merge commit for #%s, you may need to update HEAD." % pr_num) +if pr["merged"] is True: + print "Pull request %s has already been merged, assuming you want to backport" % pr_num + merge_commit_desc = run_cmd([ + 'git', 'log', '--merges', '--first-parent', + '--grep=pull request #%s' % pr_num, '--oneline']).split("\n")[0] + if merge_commit_desc == "": + fail("Couldn't find any merge commit for #%s, you may need to update HEAD." % pr_num) + + merge_hash = merge_commit_desc[:7] + message = merge_commit_desc[8:] - merge_hash = merge_commit_desc[:7] - message = merge_commit_desc[8:] - - print "Found: %s" % message - maybe_cherry_pick(pr_num, merge_hash, latest_branch) - sys.exit(0) + print "Found: %s" % message + maybe_cherry_pick(pr_num, merge_hash, latest_branch) + sys.exit(0) if not bool(pr["mergeable"]): - msg = "Pull request %s is not mergeable in its current form.\n" % pr_num + \ - "Continue? (experts only!)" - continue_maybe(msg) + msg = "Pull request %s is not mergeable in its current form.\n" % pr_num + \ + "Continue? (experts only!)" + continue_maybe(msg) print ("\n=== Pull Request #%s ===" % pr_num) print ("title\t%s\nsource\t%s\ntarget\t%s\nurl\t%s" % ( - title, pr_repo_desc, target_ref, url)) + title, pr_repo_desc, target_ref, url)) continue_maybe("Proceed with merging pull request #%s?" % pr_num) merged_refs = [target_ref] @@ -303,12 +317,12 @@ def get_version_json(version_str): pick_prompt = "Would you like to pick %s into another branch?" % merge_hash while raw_input("\n%s (y/n): " % pick_prompt).lower() == "y": - merged_refs = merged_refs + [cherry_pick(pr_num, merge_hash, latest_branch)] + merged_refs = merged_refs + [cherry_pick(pr_num, merge_hash, latest_branch)] if JIRA_IMPORTED: - continue_maybe("Would you like to update an associated JIRA?") - jira_comment = "Issue resolved by pull request %s\n[%s/%s]" % (pr_num, GITHUB_BASE, pr_num) - resolve_jira(title, merged_refs, jira_comment) + continue_maybe("Would you like to update an associated JIRA?") + jira_comment = "Issue resolved by pull request %s\n[%s/%s]" % (pr_num, GITHUB_BASE, pr_num) + resolve_jira(title, merged_refs, jira_comment) else: - print "Could not find jira-python library. Run 'sudo pip install jira-python' to install." - print "Exiting without trying to close the associated JIRA." + print "Could not find jira-python library. Run 'sudo pip install jira-python' to install." + print "Exiting without trying to close the associated JIRA." From fcb375026d3b4b75a5a32e1e9b4df9dcdfc889ae Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 26 May 2014 22:05:23 -0700 Subject: [PATCH 06/21] SPARK-1933: Throw a more meaningful exception when a directory is passed to addJar/addFile. https://issues.apache.org/jira/browse/SPARK-1933 Author: Reynold Xin Closes #888 from rxin/addfile and squashes the following commits: 8c402a3 [Reynold Xin] Updated comment. ff6c162 [Reynold Xin] SPARK-1933: Throw a more meaningful exception when a directory is passed to addJar/addFile. (cherry picked from commit 90e281b55aecbfbe4431ac582311d5790fe7aad3) Signed-off-by: Reynold Xin --- core/src/main/scala/org/apache/spark/HttpFileServer.scala | 7 +++++++ core/src/main/scala/org/apache/spark/SparkContext.scala | 5 ++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index a6e300d345786..0e3750fdde415 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -59,6 +59,13 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo } def addFileToDir(file: File, dir: File) : String = { + // Check whether the file is a directory. If it is, throw a more meaningful exception. + // If we don't catch this, Guava throws a very confusing error message: + // java.io.FileNotFoundException: [file] (No such file or directory) + // even though the directory ([file]) exists. + if (file.isDirectory) { + throw new IllegalArgumentException(s"$file cannot be a directory.") + } Files.copy(file, new File(dir, file.getName)) dir + "/" + file.getName } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 49737fa4be56b..03ceff8bf1fb0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -794,7 +794,7 @@ class SparkContext(config: SparkConf) extends Logging { addedFiles(key) = System.currentTimeMillis // Fetch the file locally in case a job is executed using DAGScheduler.runLocally(). - Utils.fetchFile(path, new File(SparkFiles.getRootDirectory), conf, env.securityManager) + Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager) logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) postEnvironmentUpdate() @@ -932,13 +932,12 @@ class SparkContext(config: SparkConf) extends Logging { try { env.httpFileServer.addJar(new File(fileName)) } catch { - case e: Exception => { + case e: Exception => // For now just log an error but allow to go through so spark examples work. // The spark examples don't really need the jar distributed since its also // the app jar. logError("Error adding jar (" + e + "), was the --addJars option used?") null - } } } else { env.httpFileServer.addJar(new File(uri.getPath)) From 214f90ee7910fada1dc58ecc95be0a83a1356a2d Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 26 May 2014 23:17:39 -0700 Subject: [PATCH 07/21] SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers `var cachedPeers: Seq[BlockManagerId] = null` is used in `def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel)` without proper protection. There are two place will call `replicate(blockId, bytesAfterPut, level)` * https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L644 runs in `connectionManager.futureExecContext` * https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L752 `doPut` runs in `connectionManager.handleMessageExecutor`. `org.apache.spark.storage.BlockManagerWorker` calls `blockManager.putBytes` in `connectionManager.handleMessageExecutor`. As they run in different `Executor`s, this is a race condition which may cause the memory pointed by `cachedPeers` is not correct even if `cachedPeers != null`. The race condition of `onReceiveCallback` is that it's set in `BlockManagerWorker` but read in a different thread in `ConnectionManager.handleMessageExecutor`. Author: zsxwing Closes #887 from zsxwing/SPARK-1932 and squashes the following commits: 524f69c [zsxwing] SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers (cherry picked from commit 549830b0db2c8b069391224f3a73bb0d7f397f71) Signed-off-by: Aaron Davidson --- .../scala/org/apache/spark/network/ConnectionManager.scala | 3 ++- .../src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index dcbbc1853186b..5dd5fd0047c0d 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -93,7 +93,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, implicit val futureExecContext = ExecutionContext.fromExecutor( Utils.newDaemonCachedThreadPool("Connection manager future execution context")) - private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null + @volatile + private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null private val authEnabled = securityManager.isAuthenticationEnabled() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6534095811907..6e450081dcb11 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -772,7 +772,7 @@ private[spark] class BlockManager( /** * Replicate block to another node. */ - var cachedPeers: Seq[BlockManagerId] = null + @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) { val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) From 30be37ca7e7dec92cf48692e1a707608871b4314 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Tue, 27 May 2014 11:53:38 -0700 Subject: [PATCH 08/21] bugfix worker DriverStateChanged state should match DriverState.FAILED bugfix worker DriverStateChanged state should match DriverState.FAILED Author: lianhuiwang Closes #864 from lianhuiwang/master and squashes the following commits: 480ce94 [lianhuiwang] address aarondav comments f2b5970 [lianhuiwang] bugfix worker DriverStateChanged state should match DriverState.FAILED --- .../main/scala/org/apache/spark/deploy/worker/Worker.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 8b6747977eb87..100de26170a50 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -317,10 +317,14 @@ private[spark] class Worker( state match { case DriverState.ERROR => logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}") + case DriverState.FAILED => + logWarning(s"Driver $driverId exited with failure") case DriverState.FINISHED => logInfo(s"Driver $driverId exited successfully") case DriverState.KILLED => logInfo(s"Driver $driverId was killed by user") + case _ => + logDebug(s"Driver $driverId changed state to $state") } masterLock.synchronized { master ! DriverStateChanged(driverId, state, exception) From f5399631ca8246e85f3b73d5f60ba1cdaab04ead Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 27 May 2014 14:53:57 -0700 Subject: [PATCH 09/21] [SPARK-1926] [SQL] Nullability of Max/Min/First should be true. Nullability of `Max`/`Min`/`First` should be `true` because they return `null` if there are no rows. Author: Takuya UESHIN Closes #881 from ueshin/issues/SPARK-1926 and squashes the following commits: 322610f [Takuya UESHIN] Fix nullability of Min/Max/First. (cherry picked from commit d1375a2bff846f2c4274e14545924646852895f9) Signed-off-by: Reynold Xin --- .../apache/spark/sql/catalyst/expressions/aggregates.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 79937b129aeae..b49a4614eacab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -90,7 +90,7 @@ abstract class AggregateFunction case class Min(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { override def references = child.references - override def nullable = child.nullable + override def nullable = true override def dataType = child.dataType override def toString = s"MIN($child)" @@ -120,7 +120,7 @@ case class MinFunction(expr: Expression, base: AggregateExpression) extends Aggr case class Max(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { override def references = child.references - override def nullable = child.nullable + override def nullable = true override def dataType = child.dataType override def toString = s"MAX($child)" @@ -257,7 +257,7 @@ case class SumDistinct(child: Expression) case class First(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { override def references = child.references - override def nullable = child.nullable + override def nullable = true override def dataType = child.dataType override def toString = s"FIRST($child)" From 50e234ba510acac0f75c080b1b1ea681a3a28449 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 27 May 2014 14:55:23 -0700 Subject: [PATCH 10/21] [SPARK-1915] [SQL] AverageFunction should not count if the evaluated value is null. Average values are difference between the calculation is done partially or not partially. Because `AverageFunction` (in not-partially calculation) counts even if the evaluated value is null. Author: Takuya UESHIN Closes #862 from ueshin/issues/SPARK-1915 and squashes the following commits: b1ff3c0 [Takuya UESHIN] Modify AverageFunction not to count if the evaluated value is null. (cherry picked from commit 3b0babad1f0856ee16f9d58e1ead30779a4a6310) Signed-off-by: Reynold Xin --- .../spark/sql/catalyst/expressions/aggregates.scala | 9 ++++++--- .../scala/org/apache/spark/sql/DslQuerySuite.scala | 10 ++++++++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index b49a4614eacab..c902433688943 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -281,14 +281,17 @@ case class AverageFunction(expr: Expression, base: AggregateExpression) private val sum = MutableLiteral(zero.eval(EmptyRow)) private val sumAsDouble = Cast(sum, DoubleType) - private val addFunction = Add(sum, Coalesce(Seq(expr, zero))) + private def addFunction(value: Any) = Add(sum, Literal(value)) override def eval(input: Row): Any = sumAsDouble.eval(EmptyRow).asInstanceOf[Double] / count.toDouble override def update(input: Row): Unit = { - count += 1 - sum.update(addFunction, input) + val evaluatedExpr = expr.eval(input) + if (evaluatedExpr != null) { + count += 1 + sum.update(addFunction(evaluatedExpr), input) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala index 8197e8a18d447..fb599e1e01e73 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala @@ -115,6 +115,16 @@ class DslQuerySuite extends QueryTest { 2.0) } + test("null average") { + checkAnswer( + testData3.groupBy()(Average('b)), + 2.0) + + checkAnswer( + testData3.groupBy()(Average('b), CountDistinct('b :: Nil)), + (2.0, 1) :: Nil) + } + test("count") { assert(testData2.count() === testData2.map(_ => 1).count()) } From 5d6382566719043cf07adde36ffe76abb576e7da Mon Sep 17 00:00:00 2001 From: LY Lai Date: Tue, 27 May 2014 16:08:38 -0700 Subject: [PATCH 11/21] [SQL] SPARK-1922 Allow underscore in column name of a struct field https://issues.apache.org/jira/browse/SPARK-1922 . Author: LY Lai Closes #873 from lyuanlai/master and squashes the following commits: 2253263 [LY Lai] Allow underscore in struct field column name (cherry picked from commit 068256745052b0aa947dd8c16b1f1d73d8e4631e) Signed-off-by: Reynold Xin --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../sql/hive/HiveMetastoreCatalogSuite.scala | 32 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 636c4f7b93190..9f74e0334f727 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -201,7 +201,7 @@ object HiveMetastoreTypes extends RegexParsers { } protected lazy val structField: Parser[StructField] = - "[a-zA-Z0-9]*".r ~ ":" ~ dataType ^^ { + "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ { case name ~ _ ~ tpe => StructField(name, tpe, nullable = true) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala new file mode 100644 index 0000000000000..4a64b5f5eb1b4 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.scalatest.FunSuite + +import org.apache.spark.sql.catalyst.types.{DataType, StructType} + +class HiveMetastoreCatalogSuite extends FunSuite { + + test("struct field should accept underscore in sub-column name") { + val metastr = "struct" + + val datatype = HiveMetastoreTypes.toDataType(metastr) + assert(datatype.isInstanceOf[StructType]) + } +} From 24a1cac4ef10cb77ed39a385f4a9e76c39afeb1d Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 27 May 2014 22:17:50 -0700 Subject: [PATCH 12/21] [SPARK-1938] [SQL] ApproxCountDistinctMergeFunction should return Int value. `ApproxCountDistinctMergeFunction` should return `Int` value because the `dataType` of `ApproxCountDistinct` is `IntegerType`. Author: Takuya UESHIN Closes #893 from ueshin/issues/SPARK-1938 and squashes the following commits: 3970e88 [Takuya UESHIN] Remove a superfluous line. 5ad7ec1 [Takuya UESHIN] Make dataType for each of CountDistinct, ApproxCountDistinctMerge and ApproxCountDistinct LongType. cbe7c71 [Takuya UESHIN] Revert a change. fc3ac0f [Takuya UESHIN] Fix evaluated value type of ApproxCountDistinctMergeFunction to Int. (cherry picked from commit 9df86835b60ce587c8b9bd4ad7410eebf59a179d) Signed-off-by: Reynold Xin --- .../spark/sql/catalyst/expressions/aggregates.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index c902433688943..01947273b6ccc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -82,7 +82,6 @@ abstract class AggregateFunction override def dataType = base.dataType def update(input: Row): Unit - override def eval(input: Row): Any // Do we really need this? override def newInstance() = makeCopy(productIterator.map { case a: AnyRef => a }.toArray) @@ -166,7 +165,7 @@ case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpressi override def children = expressions override def references = expressions.flatMap(_.references).toSet override def nullable = false - override def dataType = IntegerType + override def dataType = LongType override def toString = s"COUNT(DISTINCT ${expressions.mkString(",")})" override def newInstance() = new CountDistinctFunction(expressions, this) } @@ -184,7 +183,7 @@ case class ApproxCountDistinctMerge(child: Expression, relativeSD: Double) extends AggregateExpression with trees.UnaryNode[Expression] { override def references = child.references override def nullable = false - override def dataType = IntegerType + override def dataType = LongType override def toString = s"APPROXIMATE COUNT(DISTINCT $child)" override def newInstance() = new ApproxCountDistinctMergeFunction(child, this, relativeSD) } @@ -193,7 +192,7 @@ case class ApproxCountDistinct(child: Expression, relativeSD: Double = 0.05) extends PartialAggregate with trees.UnaryNode[Expression] { override def references = child.references override def nullable = false - override def dataType = IntegerType + override def dataType = LongType override def toString = s"APPROXIMATE COUNT(DISTINCT $child)" override def asPartial: SplitEvaluation = { @@ -394,7 +393,7 @@ case class CountDistinctFunction(expr: Seq[Expression], base: AggregateExpressio } } - override def eval(input: Row): Any = seen.size + override def eval(input: Row): Any = seen.size.toLong } case class FirstFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { From 3669bb8e6d7d0c2a3aefd52fbd0edd7bf6fb1bdb Mon Sep 17 00:00:00 2001 From: jmu Date: Tue, 27 May 2014 22:41:47 -0700 Subject: [PATCH 13/21] Fix doc about NetworkWordCount/JavaNetworkWordCount usage of spark streaming Usage: NetworkWordCount --> Usage: NetworkWordCount Usage: JavaNetworkWordCount --> Usage: JavaNetworkWordCount Author: jmu Closes #826 from jmu/master and squashes the following commits: 9fb7980 [jmu] Merge branch 'master' of https://github.com/jmu/spark b9a6b02 [jmu] Fix doc for NetworkWordCount/JavaNetworkWordCount Usage: NetworkWordCount --> Usage: NetworkWordCount (cherry picked from commit 82eadc3b07d3f00eebd30811f981016e68cf60bf) Signed-off-by: Patrick Wendell --- docs/streaming-programming-guide.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 0c125eb693a8e..972b660262d14 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -234,12 +234,12 @@ Then, in a different terminal, you can start the example by using
{% highlight bash %} -$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount local[2] localhost 9999 +$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999 {% endhighlight %}
{% highlight bash %} -$ ./bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount local[2] localhost 9999 +$ ./bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999 {% endhighlight %}
@@ -268,7 +268,7 @@ hello world {% highlight bash %} # TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount -$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount local[2] localhost 9999 +$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999 ... ------------------------------------------- Time: 1357008430000 ms From 032493e1229cab2c84e9287a8fca6ff0bc3154c8 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 28 May 2014 15:49:54 -0700 Subject: [PATCH 14/21] Organize configuration docs This PR improves and organizes the config option page and makes a few other changes to config docs. See a preview here: http://people.apache.org/~pwendell/config-improvements/configuration.html The biggest changes are: 1. The configs for the standalone master/workers were moved to the standalone page and out of the general config doc. 2. SPARK_LOCAL_DIRS was missing from the standalone docs. 3. Expanded discussion of injecting configs with spark-submit, including an example. 4. Config options were organized into the following categories: - Runtime Environment - Shuffle Behavior - Spark UI - Compression and Serialization - Execution Behavior - Networking - Scheduling - Security - Spark Streaming Author: Patrick Wendell Closes #880 from pwendell/config-cleanup and squashes the following commits: 93f56c3 [Patrick Wendell] Feedback from Matei 6f66efc [Patrick Wendell] More feedback 16ae776 [Patrick Wendell] Adding back header section d9c264f [Patrick Wendell] Small fix e0c1728 [Patrick Wendell] Response to Matei's review 27d57db [Patrick Wendell] Reverting changes to index.html (covered in #896) e230ef9 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into config-cleanup a374369 [Patrick Wendell] Line wrapping fixes fdff7fc [Patrick Wendell] Merge remote-tracking branch 'apache/master' into config-cleanup 3289ea4 [Patrick Wendell] Pulling in changes from #856 106ee31 [Patrick Wendell] Small link fix f7e79bc [Patrick Wendell] Re-organizing config options. 54b184d [Patrick Wendell] Adding standalone configs to the standalone page 592e94a [Patrick Wendell] Stash 29b5446 [Patrick Wendell] Better discussion of spark-submit in configuration docs 2d719ef [Patrick Wendell] Small fix 4af9e07 [Patrick Wendell] Adding SPARK_LOCAL_DIRS docs 204b248 [Patrick Wendell] Small fixes (cherry picked from commit 7801d44fd3bcf4d82e6db12574cc42bef15bf0e1) Signed-off-by: Patrick Wendell --- docs/configuration.md | 815 ++++++++++++++++++--------------------- docs/quick-start.md | 8 +- docs/spark-standalone.md | 167 +++++++- 3 files changed, 554 insertions(+), 436 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index e5d955f23fe32..b6e7fd34eae68 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2,19 +2,25 @@ layout: global title: Spark Configuration --- - * This will become a table of contents (this text will be scraped). {:toc} -Spark provides several locations to configure the system: +Spark provides three locations to configure the system: + +* [Spark properties](#spark-properties) control most application parameters and can be set by passing + a [SparkConf](api/core/index.html#org.apache.spark.SparkConf) object to SparkContext, or through Java + system properties. +* [Environment variables](#environment-variables) can be used to set per-machine settings, such as + the IP address, through the `conf/spark-env.sh` script on each node. +* [Logging](#configuring-logging) can be configured through `log4j.properties`. # Spark Properties Spark properties control most application settings and are configured separately for each -application. The preferred way is to set them through -[SparkConf](api/scala/index.html#org.apache.spark.SparkConf) and passing it as an argument to your -SparkContext. SparkConf allows you to configure most of the common properties to initialize a -cluster (e.g. master URL and application name), as well as arbitrary key-value pairs through the +application. These properties can be set directly on a +[SparkConf](api/scala/index.html#org.apache.spark.SparkConf) and passed as an argument to your +SparkContext. SparkConf allows you to configure some of the common properties +(e.g. master URL and application name), as well as arbitrary key-value pairs through the `set()` method. For example, we could initialize an application as follows: {% highlight scala %} @@ -25,22 +31,37 @@ val conf = new SparkConf() val sc = new SparkContext(conf) {% endhighlight %} -## Loading Default Configurations +## Dynamically Loading Spark Properties +In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For +instance, if you'd like to run the same application with different masters or different +amounts of memory. Spark allows you to simply create an empty conf: -In the case of `spark-shell`, a SparkContext has already been created for you, so you cannot control -the configuration properties through SparkConf. However, you can still set configuration properties -through a default configuration file. By default, `spark-shell` (and more generally `spark-submit`) -will read configuration options from `conf/spark-defaults.conf`, in which each line consists of a -key and a value separated by whitespace. For example, +{% highlight scala %} +val sc = new SparkContext(new SparkConf()) +{% endhighlight %} + +Then, you can supply configuration values at runtime: +{% highlight bash %} +./bin/spark-submit --name "My fancy app" --master local[4] myApp.jar +{% endhighlight %} + +The Spark shell and [`spark-submit`](cluster-overview.html#launching-applications-with-spark-submit) +tool support two ways to load configurations dynamically. The first are command line options, +such as `--master`, as shown above. Running `./bin/spark-submit --help` will show the entire list +of options. + +`bin/spark-submit` will also read configuration options from `conf/spark-defaults.conf`, in which +each line consists of a key and a value separated by whitespace. For example: spark.master spark://5.6.7.8:7077 spark.executor.memory 512m spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer -Any values specified in the file will be passed on to the application, and merged with those -specified through SparkConf. If the same configuration property exists in both `spark-defaults.conf` -and SparkConf, then the latter will take precedence as it is the most application-specific. +Any values specified as flags or in the properties file will be passed on to the application +and merged with those specified through SparkConf. Properties set directly on the SparkConf +take highest precedence, then flags passed to `spark-submit` or `spark-shell`, then options +in the `spark-defaults.conf` file. ## Viewing Spark Properties @@ -49,19 +70,34 @@ This is a useful place to check to make sure that your properties have been set that only values explicitly specified through either `spark-defaults.conf` or SparkConf will appear. For all other configuration properties, you can assume the default value is used. -## All Configuration Properties +## Available Properties -Most of the properties that control internal settings have reasonable default values. However, -there are at least five properties that you will commonly want to control: +Most of the properties that control internal settings have reasonable default values. Some +of the most common options to set are: + + + + + + + + + + @@ -69,10 +105,12 @@ there are at least five properties that you will commonly want to control: @@ -81,7 +119,8 @@ there are at least five properties that you will commonly want to control: @@ -94,138 +133,151 @@ there are at least five properties that you will commonly want to control: comma-separated list of multiple directories on different disks. NOTE: In Spark 1.0 and later this will be overriden by SPARK_LOCAL_DIRS (Standalone, Mesos) or - LOCAL_DIRS (YARN) envrionment variables set by the cluster manager. + LOCAL_DIRS (YARN) environment variables set by the cluster manager. - - + +
Property NameDefaultMeaning
spark.app.name(none) + The name of your application. This will appear in the UI and in log data. +
spark.master(none) + The cluster manager to connect to. See the list of + allowed master URL's. +
spark.executor.memory 512m - Amount of memory to use per executor process, in the same format as JVM memory strings (e.g. - 512m, 2g). + Amount of memory to use per executor process, in the same format as JVM memory strings + (e.g. 512m, 2g).
org.apache.spark.serializer.
JavaSerializer
Class to use for serializing objects that will be sent over the network or need to be cached - in serialized form. The default of Java serialization works with any Serializable Java object but is - quite slow, so we recommend using org.apache.spark.serializer.KryoSerializer - and configuring Kryo serialization when speed is necessary. Can be any subclass of - org.apache.spark.Serializer. + in serialized form. The default of Java serialization works with any Serializable Java object + but is quite slow, so we recommend using + org.apache.spark.serializer.KryoSerializer and configuring Kryo serialization + when speed is necessary. Can be any subclass of + + org.apache.spark.Serializer.
If you use Kryo serialization, set this class to register your custom classes with Kryo. It should be set to a class that extends - KryoRegistrator. + + KryoRegistrator. See the tuning guide for more details.
spark.cores.max(not set)spark.logConffalse - When running on a standalone deploy cluster or a - Mesos cluster in "coarse-grained" - sharing mode, the maximum amount of CPU cores to request for the application from - across the cluster (not from each machine). If not set, the default will be - spark.deploy.defaultCores on Spark's standalone cluster manager, or - infinite (all available cores) on Mesos. + Logs the effective SparkConf as INFO when a SparkContext is started.
- Apart from these, the following properties are also available, and may be useful in some situations: +#### Runtime Environment - + + + + + + - - + + - - + + - - + + +
Property NameDefaultMeaning
spark.default.parallelismspark.executor.memory512m -
    -
  • Local mode: number of cores on the local machine
  • -
  • Mesos fine grained mode: 8
  • -
  • Others: total number of cores on all executor nodes or 2, whichever is larger
  • -
+ Amount of memory to use per executor process, in the same format as JVM memory strings + (e.g. 512m, 2g).
spark.executor.extraJavaOptions(none) - Default number of tasks to use across the cluster for distributed shuffle operations - (groupByKey, reduceByKey, etc) when not set by user. + A string of extra JVM options to pass to executors. For instance, GC settings or other + logging. Note that it is illegal to set Spark properties or heap size settings with this + option. Spark properties should be set using a SparkConf object or the + spark-defaults.conf file used with the spark-submit script. Heap size settings can be set + with spark.executor.memory.
spark.storage.memoryFraction0.6spark.executor.extraClassPath(none) - Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" - generation of objects in the JVM, which by default is given 0.6 of the heap, but you can increase - it if you configure your own old generation size. + Extra classpath entries to append to the classpath of executors. This exists primarily + for backwards-compatibility with older versions of Spark. Users typically should not need + to set this option.
spark.shuffle.memoryFraction0.3spark.executor.extraLibraryPath(none) - Fraction of Java heap to use for aggregation and cogroups during shuffles, if - spark.shuffle.spill is true. At any given time, the collective size of - all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will - begin to spill to disk. If spills are often, consider increasing this value at the expense of - spark.storage.memoryFraction. + Set a special library path to use when launching executor JVM's.
spark.storage.memoryMapThreshold8192spark.files.userClassPathFirstfalse - Size of a block, in bytes, above which Spark memory maps when reading a block from disk. - This prevents Spark from memory mapping very small blocks. In general, memory - mapping has high overhead for blocks close to or below the page size of the operating system. + (Experimental) Whether to give user-added jars precedence over Spark's own jars when + loading classes in Executors. This feature can be used to mitigate conflicts between + Spark's dependencies and user dependencies. It is currently an experimental feature.
+ +#### Shuffle Behavior + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + + +
Property NameDefaultMeaning
spark.tachyonStore.baseDirSystem.getProperty("java.io.tmpdir")spark.shuffle.consolidateFilesfalse - Directories of the Tachyon File System that store RDDs. The Tachyon file system's URL is set by - spark.tachyonStore.url. It can also be a comma-separated list of multiple - directories on Tachyon file system. + If set to "true", consolidates intermediate files created during a shuffle. Creating fewer + files can improve filesystem performance for shuffles with large numbers of reduce tasks. It + is recommended to set this to "true" when using ext4 or xfs filesystems. On ext3, this option + might degrade performance on machines with many (>8) cores due to filesystem limitations.
spark.tachyonStore.urltachyon://localhost:19998spark.shuffle.spilltrue - The URL of the underlying Tachyon file system in the TachyonStore. + If set to "true", limits the amount of memory used during reduces by spilling data out to disk. + This spilling threshold is specified by spark.shuffle.memoryFraction.
spark.mesos.coarsefalsespark.shuffle.spill.compresstrue - If set to "true", runs over Mesos clusters in "coarse-grained" sharing mode, where Spark - acquires one long-lived Mesos task on each machine instead of one Mesos task per Spark task. - This gives lower-latency scheduling for short queries, but leaves resources in use for the whole - duration of the Spark job. + Whether to compress data spilled during shuffles. Compression will use + spark.io.compression.codec.
spark.ui.port4040spark.shuffle.memoryFraction0.3 - Port for your application's dashboard, which shows memory and workload data + Fraction of Java heap to use for aggregation and cogroups during shuffles, if + spark.shuffle.spill is true. At any given time, the collective size of + all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will + begin to spill to disk. If spills are often, consider increasing this value at the expense of + spark.storage.memoryFraction.
spark.ui.retainedStages1000spark.shuffle.compresstrue - How many stages the Spark UI remembers before garbage collecting. + Whether to compress map output files. Generally a good idea. Compression will use + spark.io.compression.codec.
spark.ui.filtersNonespark.shuffle.file.buffer.kb100 - Comma separated list of filter class names to apply to the Spark web ui. The filter should be a - standard javax servlet Filter. Parameters to each filter can also be specified by setting a - java system property of spark.<class name of filter>.params='param1=value1,param2=value2' - (e.g. -Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params='param1=foo,param2=testing') + Size of the in-memory buffer for each shuffle file output stream, in kilobytes. These buffers + reduce the number of disk seeks and system calls made in creating intermediate shuffle files.
spark.ui.acls.enablefalsespark.reducer.maxMbInFlight48 - Whether spark web ui acls should are enabled. If enabled, this checks to see if the user has - access permissions to view the web ui. See spark.ui.view.acls for more details. - Also note this requires the user to be known, if the user comes across as null no checks - are done. Filters can be used to authenticate and set the user. + Maximum size (in megabytes) of map outputs to fetch simultaneously from each reduce task. Since + each output requires us to create a buffer to receive it, this represents a fixed memory + overhead per reduce task, so keep it small unless you have a large amount of memory. +
+ +#### Spark UI + + + + + + - - + + @@ -236,19 +288,35 @@ Apart from these, the following properties are also available, and may be useful - - + + - - + + + + + + + +
Property NameDefaultMeaning
spark.ui.port4040 + Port for your application's dashboard, which shows memory and workload data
spark.ui.view.aclsEmptyspark.ui.retainedStages1000 - Comma separated list of users that have view access to the spark web ui. By default only the - user that started the Spark job has view access. + How many stages the Spark UI remembers before garbage collecting.
spark.shuffle.compresstruespark.eventLog.enabledfalse - Whether to compress map output files. Generally a good idea. + Whether to log Spark events, useful for reconstructing the Web UI after the application has + finished.
spark.shuffle.spill.compresstruespark.eventLog.compressfalse - Whether to compress data spilled during shuffles. + Whether to compress logged events, if spark.eventLog.enabled is true.
spark.eventLog.dirfile:///tmp/spark-events + Base directory in which Spark events are logged, if spark.eventLog.enabled is true. + Within this base directory, Spark creates a sub-directory for each application, and logs the + events specific to the application in this directory. Users may want to set this to + and HDFS directory so that history files can be read by the history server. +
+ +#### Compression and Serialization + + @@ -260,59 +328,46 @@ Apart from these, the following properties are also available, and may be useful - - - - - - - - - - - - + + - - + + @@ -329,21 +384,29 @@ Apart from these, the following properties are also available, and may be useful +
Property NameDefaultMeaning
spark.broadcast.compress truespark.rdd.compress false - Whether to compress serialized RDD partitions (e.g. for StorageLevel.MEMORY_ONLY_SER). - Can save substantial space at the cost of some extra CPU time. + Whether to compress serialized RDD partitions (e.g. for + StorageLevel.MEMORY_ONLY_SER). Can save substantial space at the cost of some + extra CPU time.
spark.io.compression.codec org.apache.spark.io.
LZFCompressionCodec
- The codec used to compress internal data such as RDD partitions and shuffle outputs. By default, - Spark provides two codecs: org.apache.spark.io.LZFCompressionCodec and - org.apache.spark.io.SnappyCompressionCodec. + The codec used to compress internal data such as RDD partitions and shuffle outputs. + By default, Spark provides two codecs: org.apache.spark.io.LZFCompressionCodec + and org.apache.spark.io.SnappyCompressionCodec. Of these two choices, + Snappy offers faster compression and decompression, while LZF offers a better compression + ratio.
spark.io.compression.snappy.block.size 32768 - Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec is - used. -
spark.scheduler.modeFIFO - The scheduling mode between - jobs submitted to the same SparkContext. Can be set to FAIR - to use fair sharing instead of queueing jobs one after another. Useful for - multi-user services. -
spark.scheduler.revive.interval1000 - The interval length for the scheduler to revive the worker resource offers to run tasks. (in - milliseconds) + Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec + is used.
spark.reducer.maxMbInFlight48spark.closure.serializerorg.apache.spark.serializer.
JavaSerializer
- Maximum size (in megabytes) of map outputs to fetch simultaneously from each reduce task. Since - each output requires us to create a buffer to receive it, this represents a fixed memory - overhead per reduce task, so keep it small unless you have a large amount of memory. + Serializer class to use for closures. Currently only the Java serializer is supported.
spark.closure.serializerorg.apache.spark.serializer.
JavaSerializer
spark.serializer.objectStreamReset10000 - Serializer class to use for closures. Currently only the Java serializer is supported. + When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches + objects to prevent writing redundant data, however that stops garbage collection of those + objects. By calling 'reset' you flush that info from the serializer, and allow old + objects to be collected. To turn off this periodic reset set it to a value <= 0. + By default it will reset the serializer every 10,000 objects.
spark.kryoserializer.buffer.mb 2 - Maximum object size to allow within Kryo (the library needs to create a buffer at least as large - as the largest single object you'll serialize). Increase this if you get a "buffer limit + Maximum object size to allow within Kryo (the library needs to create a buffer at least as + large as the largest single object you'll serialize). Increase this if you get a "buffer limit exceeded" exception inside Kryo. Note that there will be one buffer per core on each worker.
+ +#### Execution Behavior + + - - + + @@ -354,73 +417,92 @@ Apart from these, the following properties are also available, and may be useful - - + + - - + + - - + + + + + + + + + + + + - - + + - - + + - - + + +
Property NameDefaultMeaning
spark.serializer.objectStreamReset10000spark.default.parallelism - When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches - objects to prevent writing redundant data, however that stops garbage collection of those - objects. By calling 'reset' you flush that info from the serializer, and allow old - objects to be collected. To turn off this periodic reset set it to a value <= 0. - By default it will reset the serializer every 10,000 objects. +
    +
  • Local mode: number of cores on the local machine
  • +
  • Mesos fine grained mode: 8
  • +
  • Others: total number of cores on all executor nodes or 2, whichever is larger
  • +
+
+ Default number of tasks to use across the cluster for distributed shuffle operations + (groupByKey, reduceByKey, etc) when not set by user.
spark.locality.wait3000spark.broadcast.blockSize4096 - Number of milliseconds to wait to launch a data-local task before giving up and launching it - on a less-local node. The same wait will be used to step through multiple locality levels - (process-local, node-local, rack-local and then any). It is also possible to customize the - waiting time for each level by setting spark.locality.wait.node, etc. - You should increase this setting if your tasks are long and see poor locality, but the - default usually works well. + Size of each piece of a block in kilobytes for TorrentBroadcastFactory. + Too large a value decreases parallelism during broadcast (makes it slower); however, if it is + too small, BlockManager might take a performance hit.
spark.locality.wait.processspark.locality.waitspark.files.overwritefalse - Customize the locality wait for process locality. This affects tasks that attempt to access - cached data in a particular executor process. + Whether to overwrite files added through SparkContext.addFile() when the target file exists and + its contents do not match those of the source.
spark.locality.wait.nodespark.locality.waitspark.files.fetchTimeoutfalse + Communication timeout to use when fetching files added through SparkContext.addFile() from + the driver. +
spark.storage.memoryFraction0.6 + Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" + generation of objects in the JVM, which by default is given 0.6 of the heap, but you can + increase it if you configure your own old generation size. +
spark.tachyonStore.baseDirSystem.getProperty("java.io.tmpdir") - Customize the locality wait for node locality. For example, you can set this to 0 to skip - node locality and search immediately for rack locality (if your cluster has rack information). + Directories of the Tachyon File System that store RDDs. The Tachyon file system's URL is set by + spark.tachyonStore.url. It can also be a comma-separated list of multiple + directories on Tachyon file system.
spark.locality.wait.rackspark.locality.waitspark.storage.memoryMapThreshold8192 - Customize the locality wait for rack locality. + Size of a block, in bytes, above which Spark memory maps when reading a block from disk. + This prevents Spark from memory mapping very small blocks. In general, memory + mapping has high overhead for blocks close to or below the page size of the operating system.
spark.worker.timeout60spark.tachyonStore.urltachyon://localhost:19998 - Number of seconds after which the standalone deploy master considers a worker lost if it - receives no heartbeats. + The URL of the underlying Tachyon file system in the TachyonStore.
spark.worker.cleanup.enabledfalsespark.cleaner.ttl(infinite) - Enable periodic cleanup of worker / application directories. Note that this only affects - standalone mode, as YARN works differently. Applications directories are cleaned up regardless - of whether the application is still running. + Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks + generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be + forgotten. This is useful for running Spark for many hours / days (for example, running 24/7 in + case of Spark Streaming applications). Note that any RDD that persists in memory for more than + this duration will be cleared as well.
+ +#### Networking + + - - + + - - + + @@ -454,8 +536,8 @@ Apart from these, the following properties are also available, and may be useful This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). Acceptable heart beat pause in seconds for akka. This can be used to control sensitivity to gc pauses. Tune this in - combination of `spark.akka.heartbeat.interval` and `spark.akka.failure-detector.threshold` if - you need to. + combination of `spark.akka.heartbeat.interval` and `spark.akka.failure-detector.threshold` + if you need to. @@ -476,55 +558,23 @@ Apart from these, the following properties are also available, and may be useful enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and - `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure - detector can be, a sensistive failure detector can help evict rogue executors really quick. - However this is usually not the case as gc pauses and network lags are expected in a real spark - cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes - leading to flooding the network with those. - - - - - - - - - - - - - - - - - - - - - +
Property NameDefaultMeaning
spark.worker.cleanup.interval1800 (30 minutes)spark.driver.host(local hostname) - Controls the interval, in seconds, at which the worker cleans up old application work dirs - on the local machine. + Hostname or IP address for the driver to listen on.
spark.worker.cleanup.appDataTtl7 * 24 * 3600 (7 days)spark.driver.port(random) - The number of seconds to retain application work directories on each worker. This is a Time To - Live and should depend on the amount of available disk space you have. Application logs and - jars are downloaded to each application work dir. Over time, the work dirs can quickly fill up - disk space, especially if you run jobs very frequently. + Port for the driver to listen on.
spark.driver.host(local hostname) - Hostname or IP address for the driver to listen on. -
spark.driver.port(random) - Port for the driver to listen on. -
spark.cleaner.ttl(infinite) - Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks - generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be - forgotten. This is useful for running Spark for many hours / days (for example, running 24/7 in - case of Spark Streaming applications). Note that any RDD that persists in memory for more than - this duration will be cleared as well. -
spark.streaming.blockInterval200 - Interval (milliseconds) at which data received by Spark Streaming receivers is coalesced - into blocks of data before storing them in Spark. + `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using + failure detector can be, a sensistive failure detector can help evict rogue executors really + quick. However this is usually not the case as gc pauses and network lags are expected in a + real Spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats + between nodes leading to flooding the network with those.
+ +#### Scheduling + + - - + + @@ -536,39 +586,36 @@ Apart from these, the following properties are also available, and may be useful - - - - - - - - + + - - + + - - + + @@ -601,91 +648,59 @@ Apart from these, the following properties are also available, and may be useful - - - - - - - - - - - - - - - - - - - - - - + + - - + + - - + + - - + + - - + + +
Property NameDefaultMeaning
spark.streaming.unpersisttruespark.task.cpus1 - Force RDDs generated and persisted by Spark Streaming to be automatically unpersisted from - Spark's memory. The raw input data received by Spark Streaming is also automatically cleared. - Setting this to false will allow the raw data and persisted RDDs to be accessible outside the - streaming application as they will not be cleared automatically. But it comes at the cost of - higher memory usage in Spark. + Number of cores to allocate for each task.
spark.broadcast.blockSize4096 - Size of each piece of a block in kilobytes for TorrentBroadcastFactory. - Too large a value decreases parallelism during broadcast (makes it slower); however, if it is - too small, BlockManager might take a performance hit. -
spark.shuffle.consolidateFilesfalsespark.scheduler.modeFIFO - If set to "true", consolidates intermediate files created during a shuffle. Creating fewer files - can improve filesystem performance for shuffles with large numbers of reduce tasks. It is - recommended to set this to "true" when using ext4 or xfs filesystems. On ext3, this option might - degrade performance on machines with many (>8) cores due to filesystem limitations. + The scheduling mode between + jobs submitted to the same SparkContext. Can be set to FAIR + to use fair sharing instead of queueing jobs one after another. Useful for + multi-user services.
spark.shuffle.file.buffer.kb100spark.cores.max(not set) - Size of the in-memory buffer for each shuffle file output stream, in kilobytes. These buffers - reduce the number of disk seeks and system calls made in creating intermediate shuffle files. + When running on a standalone deploy cluster or a + Mesos cluster in "coarse-grained" + sharing mode, the maximum amount of CPU cores to request for the application from + across the cluster (not from each machine). If not set, the default will be + spark.deploy.defaultCores on Spark's standalone cluster manager, or + infinite (all available cores) on Mesos.
spark.shuffle.spilltruespark.mesos.coarsefalse - If set to "true", limits the amount of memory used during reduces by spilling data out to disk. - This spilling threshold is specified by spark.shuffle.memoryFraction. + If set to "true", runs over Mesos clusters in + "coarse-grained" sharing mode, + where Spark acquires one long-lived Mesos task on each machine instead of one Mesos task per + Spark task. This gives lower-latency scheduling for short queries, but leaves resources in use + for the whole duration of the Spark job.
spark.logConffalse - Whether to log the supplied SparkConf as INFO at start of spark context. -
spark.eventLog.enabledfalse - Whether to log spark events, useful for reconstructing the Web UI after the application has - finished. -
spark.eventLog.compressfalse - Whether to compress logged events, if spark.eventLog.enabled is true. -
spark.eventLog.dirfile:///tmp/spark-events - Base directory in which spark events are logged, if spark.eventLog.enabled is true. - Within this base directory, Spark creates a sub-directory for each application, and logs the - events specific to the application in this directory. -
spark.deploy.spreadOuttruespark.locality.wait3000 - Whether the standalone cluster manager should spread applications out across nodes or try to - consolidate them onto as few nodes as possible. Spreading out is usually better for data - locality in HDFS, but consolidating is more efficient for compute-intensive workloads.
- Note: this setting needs to be configured in the standalone cluster master, not in - individual applications; you can set it through SPARK_MASTER_OPTS in - spark-env.sh. + Number of milliseconds to wait to launch a data-local task before giving up and launching it + on a less-local node. The same wait will be used to step through multiple locality levels + (process-local, node-local, rack-local and then any). It is also possible to customize the + waiting time for each level by setting spark.locality.wait.node, etc. + You should increase this setting if your tasks are long and see poor locality, but the + default usually works well.
spark.deploy.defaultCores(infinite)spark.locality.wait.processspark.locality.wait - Default number of cores to give to applications in Spark's standalone mode if they don't set - spark.cores.max. If not set, applications always get all available cores unless - they configure spark.cores.max themselves. Set this lower on a shared cluster to - prevent users from grabbing the whole cluster by default.
Note: this setting needs - to be configured in the standalone cluster master, not in individual applications; you can set - it through SPARK_MASTER_OPTS in spark-env.sh. + Customize the locality wait for process locality. This affects tasks that attempt to access + cached data in a particular executor process.
spark.files.overwritefalsespark.locality.wait.nodespark.locality.wait - Whether to overwrite files added through SparkContext.addFile() when the target file exists and - its contents do not match those of the source. + Customize the locality wait for node locality. For example, you can set this to 0 to skip + node locality and search immediately for rack locality (if your cluster has rack information).
spark.files.fetchTimeoutfalsespark.locality.wait.rackspark.locality.wait - Communication timeout to use when fetching files added through SparkContext.addFile() from - the driver. + Customize the locality wait for rack locality.
spark.files.userClassPathFirstfalsespark.scheduler.revive.interval1000 - (Experimental) Whether to give user-added jars precedence over Spark's own jars when - loading classes in Executors. This feature can be used to mitigate conflicts between - Spark's dependencies and user dependencies. It is currently an experimental feature. + The interval length for the scheduler to revive the worker resource offers to run tasks. + (in milliseconds)
+ +#### Security + + @@ -693,7 +708,7 @@ Apart from these, the following properties are also available, and may be useful @@ -705,42 +720,71 @@ Apart from these, the following properties are also available, and may be useful - - + + - - + + - - + + +
Property NameDefaultMeaning
spark.authenticate false - Whether spark authenticates its internal connections. See spark.authenticate.secret - if not running on Yarn. + Whether Spark authenticates its internal connections. See + spark.authenticate.secret if not running on YARN.
None Set the secret key used for Spark to authenticate between components. This needs to be set if - not running on Yarn and authentication is enabled. + not running on YARN and authentication is enabled.
spark.task.cpus1spark.ui.filtersNone - Number of cores to allocate for each task. + Comma separated list of filter class names to apply to the Spark web UI. The filter should be a + standard + javax servlet Filter. Parameters to each filter can also be specified by setting a + java system property of:
+ spark.<class name of filter>.params='param1=value1,param2=value2'
+ For example:
+ -Dspark.ui.filters=com.test.filter1
+ -Dspark.com.test.filter1.params='param1=foo,param2=testing'
spark.executor.extraJavaOptions(none)spark.ui.acls.enablefalse - A string of extra JVM options to pass to executors. For instance, GC settings or other - logging. Note that it is illegal to set Spark properties or heap size settings with this - option. Spark properties should be set using a SparkConf object or the - spark-defaults.conf file used with the spark-submit script. Heap size settings can be set - with spark.executor.memory. + Whether Spark web ui acls should are enabled. If enabled, this checks to see if the user has + access permissions to view the web ui. See spark.ui.view.acls for more details. + Also note this requires the user to be known, if the user comes across as null no checks + are done. Filters can be used to authenticate and set the user.
spark.executor.extraClassPath(none)spark.ui.view.aclsEmpty - Extra classpath entries to append to the classpath of executors. This exists primarily - for backwards-compatibility with older versions of Spark. Users typically should not need - to set this option. + Comma separated list of users that have view access to the Spark web ui. By default only the + user that started the Spark job has view access.
+ +#### Spark Streaming + + - - + + + + + + + -
Property NameDefaultMeaning
spark.executor.extraLibraryPath(none)spark.streaming.blockInterval200 - Set a special library path to use when launching executor JVM's. + Interval (milliseconds) at which data received by Spark Streaming receivers is coalesced + into blocks of data before storing them in Spark. +
spark.streaming.unpersisttrue + Force RDDs generated and persisted by Spark Streaming to be automatically unpersisted from + Spark's memory. The raw input data received by Spark Streaming is also automatically cleared. + Setting this to false will allow the raw data and persisted RDDs to be accessible outside the + streaming application as they will not be cleared automatically. But it comes at the cost of + higher memory usage in Spark.
+#### Cluster Managers (YARN, Mesos, Standalone) +Each cluster manager in Spark has additional configuration options. Configurations +can be found on the pages for each mode: + + * [YARN](running-on-yarn.html#configuration) + * [Mesos](running-on-mesos.html) + * [Standalone Mode](spark-standalone.html#cluster-launch-scripts) + # Environment Variables Certain Spark settings can be configured through environment variables, which are read from the @@ -774,104 +818,15 @@ The following variables can be set in `spark-env.sh`: -In addition to the above, there are also options for setting up the Spark [standalone cluster -scripts](spark-standalone.html#cluster-launch-scripts), such as number of cores to use on each -machine and maximum memory. +In addition to the above, there are also options for setting up the Spark +[standalone cluster scripts](spark-standalone.html#cluster-launch-scripts), such as number of cores +to use on each machine and maximum memory. -Since `spark-env.sh` is a shell script, some of these can be set programmatically -- for example, -you might compute `SPARK_LOCAL_IP` by looking up the IP of a specific network interface. +Since `spark-env.sh` is a shell script, some of these can be set programmatically -- for example, you might +compute `SPARK_LOCAL_IP` by looking up the IP of a specific network interface. # Configuring Logging Spark uses [log4j](http://logging.apache.org/log4j/) for logging. You can configure it by adding a `log4j.properties` file in the `conf` directory. One way to start is to copy the existing `log4j.properties.template` located there. - -# Configuring Ports for Network Security - -Spark makes heavy use of the network, and some environments have strict requirements for using tight -firewall settings. Below are the primary ports that Spark uses for its communication and how to -configure those ports. - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
FromToDefault PortPurposeConfiguration - SettingNotes
BrowserStandalone Cluster Master8080Web UImaster.ui.portJetty-based
BrowserWorker8081Web UIworker.ui.portJetty-based
BrowserDriver4040Web UIspark.ui.portJetty-based
BrowserHistory Server18080Web UIspark.history.ui.portJetty-based
ApplicationStandalone Cluster Master7077Submit job to clusterspark.driver.portAkka-based. Set to "0" to choose a port randomly
WorkerStandalone Cluster Master7077Join clusterspark.driver.portAkka-based. Set to "0" to choose a port randomly
ApplicationWorker(random)Join clusterSPARK_WORKER_PORT (standalone cluster)Akka-based
Driver and other WorkersWorker(random) -
    -
  • File server for file and jars
  • -
  • Http Broadcast
  • -
  • Class file server (Spark Shell only)
  • -
-
NoneJetty-based. Each of these services starts on a random port that cannot be configured
diff --git a/docs/quick-start.md b/docs/quick-start.md index 33a0df1036424..20e17ebf703fc 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -252,11 +252,11 @@ we initialize a SparkContext as part of the program. We pass the SparkContext constructor a [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) object which contains information about our -application. We also call sc.addJar to make sure that when our application is launched in cluster -mode, the jar file containing it will be shipped automatically to worker nodes. +application. -This file depends on the Spark API, so we'll also include an sbt configuration file, `simple.sbt` -which explains that Spark is a dependency. This file also adds a repository that Spark depends on: +Our application depends on the Spark API, so we'll also include an sbt configuration file, +`simple.sbt` which explains that Spark is a dependency. This file also adds a repository that +Spark depends on: {% highlight scala %} name := "Simple Project" diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index eb3211b6b0e4e..dca80a9a69614 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -93,7 +93,15 @@ You can optionally configure the cluster further by setting environment variable SPARK_MASTER_OPTS - Configuration properties that apply only to the master in the form "-Dx=y" (default: none). + Configuration properties that apply only to the master in the form "-Dx=y" (default: none). See below for a list of possible options. + + + SPARK_LOCAL_DIRS + + Directory to use for "scratch" space in Spark, including map output files and RDDs that get + stored on disk. This should be on a fast, local disk in your system. It can also be a + comma-separated list of multiple directories on different disks. + SPARK_WORKER_CORES @@ -126,7 +134,7 @@ You can optionally configure the cluster further by setting environment variable SPARK_WORKER_OPTS - Configuration properties that apply only to the worker in the form "-Dx=y" (default: none). + Configuration properties that apply only to the worker in the form "-Dx=y" (default: none). See below for a list of possible options. SPARK_DAEMON_MEMORY @@ -144,6 +152,73 @@ You can optionally configure the cluster further by setting environment variable **Note:** The launch scripts do not currently support Windows. To run a Spark cluster on Windows, start the master and workers by hand. +SPARK_MASTER_OPTS supports the following system properties: + + + + + + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.deploy.spreadOuttrue + Whether the standalone cluster manager should spread applications out across nodes or try + to consolidate them onto as few nodes as possible. Spreading out is usually better for + data locality in HDFS, but consolidating is more efficient for compute-intensive workloads.
+
spark.deploy.defaultCores(infinite) + Default number of cores to give to applications in Spark's standalone mode if they don't + set spark.cores.max. If not set, applications always get all available + cores unless they configure spark.cores.max themselves. + Set this lower on a shared cluster to prevent users from grabbing + the whole cluster by default.
+
spark.worker.timeout60 + Number of seconds after which the standalone deploy master considers a worker lost if it + receives no heartbeats. +
+ +SPARK_WORKER_OPTS supports the following system properties: + + + + + + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.worker.cleanup.enabledfalse + Enable periodic cleanup of worker / application directories. Note that this only affects standalone + mode, as YARN works differently. Applications directories are cleaned up regardless of whether + the application is still running. +
spark.worker.cleanup.interval1800 (30 minutes) + Controls the interval, in seconds, at which the worker cleans up old application work dirs + on the local machine. +
spark.worker.cleanup.appDataTtl7 * 24 * 3600 (7 days) + The number of seconds to retain application work directories on each worker. This is a Time To Live + and should depend on the amount of available disk space you have. Application logs and jars are + downloaded to each application work dir. Over time, the work dirs can quickly fill up disk space, + especially if you run jobs very frequently. +
+ # Connecting an Application to the Cluster To run an application on the Spark cluster, simply pass the `spark://IP:PORT` URL of the master as to the [`SparkContext` @@ -212,6 +287,94 @@ In addition, detailed log output for each job is also written to the work direct You can run Spark alongside your existing Hadoop cluster by just launching it as a separate service on the same machines. To access Hadoop data from Spark, just use a hdfs:// URL (typically `hdfs://:9000/path`, but you can find the right URL on your Hadoop Namenode's web UI). Alternatively, you can set up a separate cluster for Spark, and still have it access HDFS over the network; this will be slower than disk-local access, but may not be a concern if you are still running in the same local area network (e.g. you place a few Spark machines on each rack that you have Hadoop on). +# Configuring Ports for Network Security + +Spark makes heavy use of the network, and some environments have strict requirements for using tight +firewall settings. Below are the primary ports that Spark uses for its communication and how to +configure those ports. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
FromToDefault PortPurposeConfiguration + SettingNotes
BrowserStandalone Cluster Master8080Web UImaster.ui.portJetty-based
BrowserDriver4040Web UIspark.ui.portJetty-based
BrowserHistory Server18080Web UIspark.history.ui.portJetty-based
BrowserWorker8081Web UIworker.ui.portJetty-based
ApplicationStandalone Cluster Master7077Submit job to clusterspark.driver.portAkka-based. Set to "0" to choose a port randomly
WorkerStandalone Cluster Master7077Join clusterspark.driver.portAkka-based. Set to "0" to choose a port randomly
ApplicationWorker(random)Join clusterSPARK_WORKER_PORT (standalone cluster)Akka-based
Driver and other WorkersWorker(random) +
    +
  • File server for file and jars
  • +
  • Http Broadcast
  • +
  • Class file server (Spark Shell only)
  • +
+
NoneJetty-based. Each of these services starts on a random port that cannot be configured
+ # High Availability By default, standalone scheduling clusters are resilient to Worker failures (insofar as Spark itself is resilient to losing work by moving it to other workers). However, the scheduler uses a Master to make scheduling decisions, and this (by default) creates a single point of failure: if the Master crashes, no new applications can be created. In order to circumvent this, we have two high availability schemes, detailed below. From 0b769b73fb7ae314325857138a2d3138ed157908 Mon Sep 17 00:00:00 2001 From: David Lemieux Date: Wed, 28 May 2014 15:50:35 -0700 Subject: [PATCH 15/21] Spark 1916 The changes could be ported back to 0.9 as well. Changing in.read to in.readFully to read the whole input stream rather than the first 1020 bytes. This should ok considering that Flume caps the body size to 32K by default. Author: David Lemieux Closes #865 from lemieud/SPARK-1916 and squashes the following commits: a265673 [David Lemieux] Updated SparkFlumeEvent to read the whole stream rather than the first X bytes. --- .../org/apache/spark/streaming/flume/FlumeInputDStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index df7605fe579f8..5be33f1d5c428 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -63,7 +63,7 @@ class SparkFlumeEvent() extends Externalizable { def readExternal(in: ObjectInput) { val bodyLength = in.readInt() val bodyBuff = new Array[Byte](bodyLength) - in.read(bodyBuff) + in.readFully(bodyBuff) val numHeaders = in.readInt() val headers = new java.util.HashMap[CharSequence, CharSequence] From 386fd83b2cacfbf136155261bcea7a6d861bdd8b Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 28 May 2014 15:57:05 -0700 Subject: [PATCH 16/21] [SPARK-1712]: TaskDescription instance is too big causes Spark to hang Author: witgo Closes #694 from witgo/SPARK-1712_new and squashes the following commits: 0f52483 [witgo] review commit 83ce29b [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 52e6752 [witgo] reset test SparkContext 63636b6 [witgo] review commit 44a59ee [witgo] review commit 3b6d48c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 926bd6a [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 9a5cfad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 03cc562 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new b0930b0 [witgo] review commit b1174bd [witgo] merge master f76679b [witgo] merge master 689495d [witgo] fix scala style bug 1d35c3c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 062c182 [witgo] fix small bug for code style 0a428cf [witgo] add unit tests 158b2dc [witgo] review commit 4afe71d [witgo] review commit 9e4ffa7 [witgo] review commit 1d35c7d [witgo] fix hang 7965580 [witgo] fix Statement order 0e29eac [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 3ea1ca1 [witgo] remove duplicate serialize 743a7ad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 86e2048 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 2a89adc [witgo] SPARK-1712: TaskDescription instance is too big causes Spark to hang (cherry picked from commit 4dbb27b0cf4eb67c92aad2c1158616312f5a54e6) Signed-off-by: Matei Zaharia --- .../CoarseGrainedExecutorBackend.scala | 9 ++-- .../cluster/CoarseGrainedClusterMessage.scala | 2 +- .../CoarseGrainedSchedulerBackend.scala | 27 ++++++++++-- .../CoarseGrainedSchedulerBackendSuite.scala | 43 +++++++++++++++++++ 4 files changed, 73 insertions(+), 8 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 84aec65b7765d..2279d77c91c89 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -22,11 +22,12 @@ import java.nio.ByteBuffer import akka.actor._ import akka.remote._ -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf} import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.scheduler.TaskDescription import org.apache.spark.util.{AkkaUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( @@ -61,12 +62,14 @@ private[spark] class CoarseGrainedExecutorBackend( logError("Slave registration failed: " + message) System.exit(1) - case LaunchTask(taskDesc) => - logInfo("Got assigned task " + taskDesc.taskId) + case LaunchTask(data) => if (executor == null) { logError("Received LaunchTask command but executor was null") System.exit(1) } else { + val ser = SparkEnv.get.closureSerializer.newInstance() + val taskDesc = ser.deserialize[TaskDescription](data.value) + logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index ddbc74e82ac49..ca74069ef885c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable private[spark] object CoarseGrainedClusterMessages { // Driver to executors - case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage + case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index a6d6b3d26a3c6..e47a060683a2d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -27,10 +27,10 @@ import akka.actor._ import akka.pattern.ask import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} -import org.apache.spark.{Logging, SparkException, TaskState} +import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState} import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils} /** * A scheduler backend that waits for coarse grained executors to connect to it through Akka. @@ -48,6 +48,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A var totalCoreCount = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) + private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] @@ -140,8 +141,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { - freeCores(task.executorId) -= scheduler.CPUS_PER_TASK - executorActor(task.executorId) ! LaunchTask(task) + val ser = SparkEnv.get.closureSerializer.newInstance() + val serializedTask = ser.serialize(task) + if (serializedTask.limit >= akkaFrameSize - 1024) { + val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) + scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => + try { + var msg = "Serialized task %s:%d was %d bytes which " + + "exceeds spark.akka.frameSize (%d bytes). " + + "Consider using broadcast variables for large values." + msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize) + taskSet.abort(msg) + } catch { + case e: Exception => logError("Exception in error callback", e) + } + } + } + else { + freeCores(task.executorId) -= scheduler.CPUS_PER_TASK + executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)) + } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala new file mode 100644 index 0000000000000..efef9d26dadca --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkContext} +import org.apache.spark.util.{SerializableBuffer, AkkaUtils} + +import org.scalatest.FunSuite + +class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext { + + test("serialized task larger than akka frame size") { + val conf = new SparkConf + conf.set("spark.akka.frameSize","1") + conf.set("spark.default.parallelism","1") + sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf) + val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf) + val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize)) + val larger = sc.parallelize(Seq(buffer)) + val thrown = intercept[SparkException] { + larger.collect() + } + assert(thrown.getMessage.contains("Consider using broadcast variables for large values")) + val smaller = sc.parallelize(1 to 4).collect() + assert(smaller.size === 4) + } + +} From 7179180b7e4a73021d6d715a90877bef0637da49 Mon Sep 17 00:00:00 2001 From: Jyotiska NK Date: Wed, 28 May 2014 23:08:39 -0700 Subject: [PATCH 17/21] Added doctest and method description in context.py Added doctest for method textFile and description for methods _initialize_context and _ensure_initialized in context.py Author: Jyotiska NK Closes #187 from jyotiska/pyspark_context and squashes the following commits: 356f945 [Jyotiska NK] Added doctest for textFile method in context.py 5b23686 [Jyotiska NK] Updated context.py with method descriptions (cherry picked from commit 9cff1dd25abc5e848720d853172ed42e35376fd0) Signed-off-by: Matei Zaharia --- python/pyspark/context.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 27b440d73bdc3..56746cb7aab3d 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -173,12 +173,18 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, self._temp_dir = \ self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() - # Initialize SparkContext in function to allow subclass specific initialization def _initialize_context(self, jconf): + """ + Initialize SparkContext in function to allow subclass specific initialization + """ return self._jvm.JavaSparkContext(jconf) @classmethod def _ensure_initialized(cls, instance=None, gateway=None): + """ + Checks whether a SparkContext is initialized or not. + Throws error if a SparkContext is already running. + """ with SparkContext._lock: if not SparkContext._gateway: SparkContext._gateway = gateway or launch_gateway() @@ -270,6 +276,13 @@ def textFile(self, name, minPartitions=None): Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings. + + >>> path = os.path.join(tempdir, "sample-text.txt") + >>> with open(path, "w") as testFile: + ... testFile.write("Hello world!") + >>> textFile = sc.textFile(path) + >>> textFile.collect() + [u'Hello world!'] """ minPartitions = minPartitions or min(self.defaultParallelism, 2) return RDD(self._jsc.textFile(name, minPartitions), self, From 8bb93909f41562df4ea74dc64583c2b9cfd1a2b4 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 29 May 2014 09:07:39 -0700 Subject: [PATCH 18/21] SPARK-1935: Explicitly add commons-codec 1.5 as a dependency. Author: Yin Huai Closes #889 from yhuai/SPARK-1935 and squashes the following commits: 7d50ef1 [Yin Huai] Explicitly add commons-codec 1.5 as a dependency. (cherry picked from commit 60b89fe6b09ff896a30d74204876da883e307de7) Signed-off-by: Patrick Wendell --- pom.xml | 5 +++++ project/SparkBuild.scala | 1 + 2 files changed, 6 insertions(+) diff --git a/pom.xml b/pom.xml index f04d7e436f7c6..2d42eb6ea4fa0 100644 --- a/pom.xml +++ b/pom.xml @@ -239,6 +239,11 @@ commons-lang3 3.3.2 + + commons-codec + commons-codec + 1.5 + com.google.code.findbugs jsr305 diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index ae8a6afa7646f..0c67c492d9471 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -348,6 +348,7 @@ object SparkBuild extends Build { "org.apache.mesos" % "mesos" % "0.18.1" classifier("shaded-protobuf") exclude("com.google.protobuf", "protobuf-java"), "commons-net" % "commons-net" % "2.2", "net.java.dev.jets3t" % "jets3t" % jets3tVersion excludeAll(excludeCommonsLogging), + "commons-codec" % "commons-codec" % "1.5", // Prevent jets3t from including the older version of commons-codec "org.apache.derby" % "derby" % "10.4.2.0" % "test", "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJBossNetty, excludeAsm, excludeCommonsLogging, excludeSLF4J, excludeOldAsm), "org.apache.curator" % "curator-recipes" % "2.4.0" excludeAll(excludeJBossNetty), From 0f56aadc8d1c34463cee2e234b6250145d866cd7 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 29 May 2014 15:24:03 -0700 Subject: [PATCH 19/21] [SPARK-1368][SQL] Optimized HiveTableScan JIRA issue: [SPARK-1368](https://issues.apache.org/jira/browse/SPARK-1368) This PR introduces two major updates: - Replaced FP style code with `while` loop and reusable `GenericMutableRow` object in critical path of `HiveTableScan`. - Using `ColumnProjectionUtils` to help optimizing RCFile and ORC column pruning. My quick micro benchmark suggests these two optimizations made the optimized version 2x and 2.5x faster when scanning CSV table and RCFile table respectively: ``` Original: [info] CSV: 27676 ms, RCFile: 26415 ms [info] CSV: 27703 ms, RCFile: 26029 ms [info] CSV: 27511 ms, RCFile: 25962 ms Optimized: [info] CSV: 13820 ms, RCFile: 10402 ms [info] CSV: 14158 ms, RCFile: 10691 ms [info] CSV: 13606 ms, RCFile: 10346 ms ``` The micro benchmark loads a 609MB CVS file (structurally similar to the `src` test table) into a normal Hive table with `LazySimpleSerDe` and a RCFile table, then scans these tables respectively. Preparation code: ```scala package org.apache.spark.examples.sql.hive import org.apache.spark.sql.hive.LocalHiveContext import org.apache.spark.{SparkConf, SparkContext} object HiveTableScanPrepare extends App { val sparkContext = new SparkContext( new SparkConf() .setMaster("local") .setAppName(getClass.getSimpleName.stripSuffix("$"))) val hiveContext = new LocalHiveContext(sparkContext) import hiveContext._ hql("drop table scan_csv") hql("drop table scan_rcfile") hql("""create table scan_csv (key int, value string) | row format serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' | with serdeproperties ('field.delim'=',') """.stripMargin) hql(s"""load data local inpath "${args(0)}" into table scan_csv""") hql("""create table scan_rcfile (key int, value string) | row format serde 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' |stored as | inputformat 'org.apache.hadoop.hive.ql.io.RCFileInputFormat' | outputformat 'org.apache.hadoop.hive.ql.io.RCFileOutputFormat' """.stripMargin) hql( """ |from scan_csv |insert overwrite table scan_rcfile |select scan_csv.key, scan_csv.value """.stripMargin) } ``` Benchmark code: ```scala package org.apache.spark.examples.sql.hive import org.apache.spark.sql.hive.LocalHiveContext import org.apache.spark.{SparkConf, SparkContext} object HiveTableScanBenchmark extends App { val sparkContext = new SparkContext( new SparkConf() .setMaster("local") .setAppName(getClass.getSimpleName.stripSuffix("$"))) val hiveContext = new LocalHiveContext(sparkContext) import hiveContext._ val scanCsv = hql("select key from scan_csv") val scanRcfile = hql("select key from scan_rcfile") val csvDuration = benchmark(scanCsv.count()) val rcfileDuration = benchmark(scanRcfile.count()) println(s"CSV: $csvDuration ms, RCFile: $rcfileDuration ms") def benchmark(f: => Unit) = { val begin = System.currentTimeMillis() f val end = System.currentTimeMillis() end - begin } } ``` @marmbrus Please help review, thanks! Author: Cheng Lian Closes #758 from liancheng/fastHiveTableScan and squashes the following commits: 4241a19 [Cheng Lian] Distinguishes sorted and possibly not sorted operations more accurately in HiveComparisonTest cf640d8 [Cheng Lian] More HiveTableScan optimisations: bf0e7dc [Cheng Lian] Added SortedOperation pattern to match *some* definitely sorted operations and avoid some sorting cost in HiveComparisonTest. 6d1c642 [Cheng Lian] Using ColumnProjectionUtils to optimise RCFile and ORC column pruning eb62fd3 [Cheng Lian] [SPARK-1368] Optimized HiveTableScan (cherry picked from commit 8f7141fbc015addb314e1d5801085587b5cbb171) Signed-off-by: Michael Armbrust --- .../spark/sql/execution/Aggregate.scala | 2 +- .../apache/spark/sql/hive/hiveOperators.scala | 97 ++++++++++++++++--- .../hive/execution/HiveComparisonTest.scala | 25 ++--- 3 files changed, 96 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala index 36b3b956da96c..604914e547790 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala @@ -116,7 +116,7 @@ case class Aggregate( */ @transient private[this] lazy val resultMap = - (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute} ++ namedGroups).toMap + (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ namedGroups).toMap /** * Substituted version of aggregateExpressions expressions which are used to compute final diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala index 96faebc5a8687..f141139ef46a8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala @@ -18,15 +18,18 @@ package org.apache.spark.sql.hive.execution import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.MetaStoreUtils import org.apache.hadoop.hive.ql.Context import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive} import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc} -import org.apache.hadoop.hive.serde2.Serializer +import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred._ @@ -37,6 +40,7 @@ import org.apache.spark.sql.catalyst.types.{BooleanType, DataType} import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive._ import org.apache.spark.{TaskContext, SparkException} +import org.apache.spark.util.MutablePair /* Implicits */ import scala.collection.JavaConversions._ @@ -94,7 +98,7 @@ case class HiveTableScan( (_: Any, partitionKeys: Array[String]) => { val value = partitionKeys(ordinal) val dataType = relation.partitionKeys(ordinal).dataType - castFromString(value, dataType) + unwrapHiveData(castFromString(value, dataType)) } } else { val ref = objectInspector.getAllStructFieldRefs @@ -102,16 +106,55 @@ case class HiveTableScan( .getOrElse(sys.error(s"Can't find attribute $a")) (row: Any, _: Array[String]) => { val data = objectInspector.getStructFieldData(row, ref) - unwrapData(data, ref.getFieldObjectInspector) + unwrapHiveData(unwrapData(data, ref.getFieldObjectInspector)) } } } } + private def unwrapHiveData(value: Any) = value match { + case maybeNull: String if maybeNull.toLowerCase == "null" => null + case varchar: HiveVarchar => varchar.getValue + case decimal: HiveDecimal => BigDecimal(decimal.bigDecimalValue) + case other => other + } + private def castFromString(value: String, dataType: DataType) = { Cast(Literal(value), dataType).eval(null) } + private def addColumnMetadataToConf(hiveConf: HiveConf) { + // Specifies IDs and internal names of columns to be scanned. + val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name): Integer) + val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",") + + if (attributes.size == relation.output.size) { + ColumnProjectionUtils.setFullyReadColumns(hiveConf) + } else { + ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) + } + + ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name)) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + relation.tableDesc.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames) + } + + addColumnMetadataToConf(sc.hiveconf) + @transient def inputRdd = if (!relation.hiveQlTable.isPartitioned) { hadoopReader.makeRDDForTable(relation.hiveQlTable) @@ -143,20 +186,42 @@ case class HiveTableScan( } def execute() = { - inputRdd.map { row => - val values = row match { - case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) => - attributeFunctions.map(_(deserializedRow, partitionKeys)) - case deserializedRow: AnyRef => - attributeFunctions.map(_(deserializedRow, Array.empty)) + inputRdd.mapPartitions { iterator => + if (iterator.isEmpty) { + Iterator.empty + } else { + val mutableRow = new GenericMutableRow(attributes.length) + val mutablePair = new MutablePair[Any, Array[String]]() + val buffered = iterator.buffered + + // NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern + // matching are avoided intentionally. + val rowsAndPartitionKeys = buffered.head match { + // With partition keys + case _: Array[Any] => + buffered.map { case array: Array[Any] => + val deserializedRow = array(0) + val partitionKeys = array(1).asInstanceOf[Array[String]] + mutablePair.update(deserializedRow, partitionKeys) + } + + // Without partition keys + case _ => + val emptyPartitionKeys = Array.empty[String] + buffered.map { deserializedRow => + mutablePair.update(deserializedRow, emptyPartitionKeys) + } + } + + rowsAndPartitionKeys.map { pair => + var i = 0 + while (i < attributes.length) { + mutableRow(i) = attributeFunctions(i)(pair._1, pair._2) + i += 1 + } + mutableRow: Row + } } - buildRow(values.map { - case n: String if n.toLowerCase == "null" => null - case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => varchar.getValue - case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal => - BigDecimal(decimal.bigDecimalValue) - case other => other - }) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index edff38b901073..1b5a132f9665d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.hive.execution import java.io._ +import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen} + import org.apache.spark.sql.Logging -import org.apache.spark.sql.catalyst.plans.logical.{ExplainCommand, NativeCommand} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.Sort -import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen} import org.apache.spark.sql.hive.test.TestHive /** @@ -128,17 +129,19 @@ abstract class HiveComparisonTest protected def prepareAnswer( hiveQuery: TestHive.type#HiveQLQueryExecution, answer: Seq[String]): Seq[String] = { + + def isSorted(plan: LogicalPlan): Boolean = plan match { + case _: Join | _: Aggregate | _: BaseRelation | _: Generate | _: Sample | _: Distinct => false + case PhysicalOperation(_, _, Sort(_, _)) => true + case _ => plan.children.iterator.map(isSorted).exists(_ == true) + } + val orderedAnswer = hiveQuery.logical match { // Clean out non-deterministic time schema info. case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") case _: ExplainCommand => answer - case _ => - // TODO: Really we only care about the final total ordering here... - val isOrdered = hiveQuery.executedPlan.collect { - case s @ Sort(_, global, _) if global => s - }.nonEmpty - // If the query results aren't sorted, then sort them to ensure deterministic answers. - if (!isOrdered) answer.sorted else answer + case plan if isSorted(plan) => answer + case _ => answer.sorted } orderedAnswer.map(cleanPaths) } @@ -161,7 +164,7 @@ abstract class HiveComparisonTest "minFileSize" ) protected def nonDeterministicLine(line: String) = - nonDeterministicLineIndicators.map(line contains _).reduceLeft(_||_) + nonDeterministicLineIndicators.exists(line contains _) /** * Removes non-deterministic paths from `str` so cached answers will compare correctly. From 80721fb451abff8fafbffb4a6a9c97183502f1e2 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 30 May 2014 00:34:33 -0700 Subject: [PATCH 20/21] [SPARK-1566] consolidate programming guide, and general doc updates This is a fairly large PR to clean up and update the docs for 1.0. The major changes are: * A unified programming guide for all languages replaces language-specific ones and shows language-specific info in tabs * New programming guide sections on key-value pairs, unit testing, input formats beyond text, migrating from 0.9, and passing functions to Spark * Spark-submit guide moved to a separate page and expanded slightly * Various cleanups of the menu system, security docs, and others * Updated look of title bar to differentiate the docs from previous Spark versions You can find the updated docs at http://people.apache.org/~matei/1.0-docs/_site/ and in particular http://people.apache.org/~matei/1.0-docs/_site/programming-guide.html. Author: Matei Zaharia Closes #896 from mateiz/1.0-docs and squashes the following commits: 03e6853 [Matei Zaharia] Some tweaks to configuration and YARN docs 0779508 [Matei Zaharia] tweak ef671d4 [Matei Zaharia] Keep frames in JavaDoc links, and other small tweaks 1bf4112 [Matei Zaharia] Review comments 4414f88 [Matei Zaharia] tweaks d04e979 [Matei Zaharia] Fix some old links to Java guide a34ed33 [Matei Zaharia] tweak 541bb3b [Matei Zaharia] miscellaneous changes fcefdec [Matei Zaharia] Moved submitting apps to separate doc 61d72b4 [Matei Zaharia] stuff 181f217 [Matei Zaharia] migration guide, remove old language guides e11a0da [Matei Zaharia] Add more API functions 6a030a9 [Matei Zaharia] tweaks 8db0ae3 [Matei Zaharia] Added key-value pairs section 318d2c9 [Matei Zaharia] tweaks 1c81477 [Matei Zaharia] New section on basics and function syntax e38f559 [Matei Zaharia] Actually added programming guide to Git a33d6fe [Matei Zaharia] First pass at updating programming guide to support all languages, plus other tweaks throughout 3b6a876 [Matei Zaharia] More CSS tweaks 01ec8bf [Matei Zaharia] More CSS tweaks e6d252e [Matei Zaharia] Change color of doc title bar to differentiate from 0.9.0 (cherry picked from commit c8bf4131bc2a2e147e977159fc90e94b85738830) Signed-off-by: Patrick Wendell --- docs/_layouts/global.html | 18 +- docs/bagel-programming-guide.md | 2 +- docs/building-with-maven.md | 90 +- docs/cluster-overview.md | 108 +- docs/configuration.md | 11 +- docs/css/bootstrap.min.css | 2 +- docs/graphx-programming-guide.md | 8 +- docs/hadoop-third-party-distributions.md | 2 +- docs/index.md | 79 +- docs/java-programming-guide.md | 215 +--- docs/js/api-docs.js | 23 +- docs/js/main.js | 21 + docs/mllib-guide.md | 10 +- docs/mllib-optimization.md | 2 +- docs/monitoring.md | 2 +- docs/programming-guide.md | 1294 ++++++++++++++++++++++ docs/python-programming-guide.md | 168 +-- docs/quick-start.md | 39 +- docs/running-on-mesos.md | 7 +- docs/running-on-yarn.md | 91 +- docs/scala-programming-guide.md | 445 +------- docs/security.md | 18 +- docs/spark-standalone.md | 4 +- docs/sql-programming-guide.md | 29 +- docs/streaming-programming-guide.md | 42 +- docs/submitting-applications.md | 153 +++ docs/tuning.md | 6 +- 27 files changed, 1767 insertions(+), 1122 deletions(-) create mode 100644 docs/programming-guide.md create mode 100644 docs/submitting-applications.md diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index fb808129bb65d..4ba20e590f2c2 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -9,6 +9,11 @@ {{ page.title }} - Spark {{site.SPARK_VERSION_SHORT}} Documentation + {% if page.redirect %} + + + {% endif %} +