From 0cb06c993f487d00fdb528af9ac5720ab80bfa8f Mon Sep 17 00:00:00 2001 From: Tom Magrino Date: Tue, 28 Jun 2016 13:36:41 -0700 Subject: [PATCH 01/12] [SPARK-16148][SCHEDULER] Allow for underscores in TaskLocation in the Executor ID ## What changes were proposed in this pull request? Previously, the TaskLocation implementation would not allow for executor ids which include underscores. This tweaks the string split used to get the hostname and executor id, allowing for underscores in the executor id. This addresses the JIRA found here: https://issues.apache.org/jira/browse/SPARK-16148 This is moved over from a previous PR against branch-1.6: https://github.com/apache/spark/pull/13857 ## How was this patch tested? Ran existing unit tests for core and streaming. Manually ran a simple streaming job with an executor whose id contained underscores and confirmed that the job ran successfully. This is my original work and I license the work to the project under the project's open source license. Author: Tom Magrino Closes #13858 from tmagrino/fixtasklocation. (cherry picked from commit ae14f362355b131fcb3e3633da7bb14bdd2b6893) Signed-off-by: Shixiong Zhu --- .../org/apache/spark/scheduler/TaskLocation.scala | 14 +++++++------- .../spark/scheduler/TaskSetManagerSuite.scala | 2 ++ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala index 1eb6c1614fc0..06b52935c696 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala @@ -64,18 +64,18 @@ private[spark] object TaskLocation { /** * Create a TaskLocation from a string returned by getPreferredLocations. - * These strings have the form [hostname] or hdfs_cache_[hostname], depending on whether the - * location is cached. + * These strings have the form executor_[hostname]_[executorid], [hostname], or + * hdfs_cache_[hostname], depending on whether the location is cached. */ def apply(str: String): TaskLocation = { val hstr = str.stripPrefix(inMemoryLocationTag) if (hstr.equals(str)) { if (str.startsWith(executorLocationTag)) { - val splits = str.split("_") - if (splits.length != 3) { - throw new IllegalArgumentException("Illegal executor location format: " + str) - } - new ExecutorCacheTaskLocation(splits(1), splits(2)) + val hostAndExecutorId = str.stripPrefix(executorLocationTag) + val splits = hostAndExecutorId.split("_", 2) + require(splits.length == 2, "Illegal executor location format: " + str) + val Array(host, executorId) = splits + new ExecutorCacheTaskLocation(host, executorId) } else { new HostTaskLocation(str) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ecc18fc6e15b..96a49202efcf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -784,6 +784,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(TaskLocation("host1") === HostTaskLocation("host1")) assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1")) assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3")) + assert(TaskLocation("executor_some.host1_executor_task_3") === + ExecutorCacheTaskLocation("some.host1", "executor_task_3")) } def createTaskResult(id: Int): DirectTaskResult[Int] = { From 1ac830aca089e9f0b9b0bf367236ffc1184eae7e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 29 Jun 2016 13:11:56 -0700 Subject: [PATCH 02/12] [SPARK-16044][SQL] Backport input_file_name() for data source based on NewHadoopRDD to branch 1.6 ## What changes were proposed in this pull request? This PR backports https://github.com/apache/spark/pull/13759. (`SqlNewHadoopRDDState` was renamed to `InputFileNameHolder` and `spark` API does not exist in branch 1.6) ## How was this patch tested? Unit tests in `ColumnExpressionSuite`. Author: hyukjinkwon Closes #13806 from HyukjinKwon/backport-SPARK-16044. --- .../org/apache/spark/rdd/NewHadoopRDD.scala | 7 ++++ .../spark/sql/ColumnExpressionSuite.scala | 39 +++++++++++++++++-- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index c8b4f300d316..46fe1ba93441 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -134,6 +134,12 @@ class NewHadoopRDD[K, V]( val inputMetrics = context.taskMetrics .getInputMetricsForReadMethod(DataReadMethod.Hadoop) + // Sets the thread local variable for the file's name + split.serializableHadoopSplit.value match { + case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString) + case _ => SqlNewHadoopRDDState.unsetInputFileName() + } + // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { @@ -190,6 +196,7 @@ class NewHadoopRDD[K, V]( private def close() { if (reader != null) { + SqlNewHadoopRDDState.unsetInputFileName() // Close the reader and release it. Note: it's very important that we don't close the // reader more than once, since that exposes us to MAPREDUCE-5918 when running against // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala index 38c0eb589f96..52b3d60fef7a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql -import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} import org.scalatest.Matchers._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.execution.Project import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext @@ -591,15 +593,44 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { ) } - test("InputFileName") { + test("InputFileName - SqlNewHadoopRDD") { withTempPath { dir => val data = sparkContext.parallelize(0 to 10).toDF("id") data.write.parquet(dir.getCanonicalPath) - val answer = sqlContext.read.parquet(dir.getCanonicalPath).select(inputFileName()) + val answer = sqlContext.read.parquet(dir.getCanonicalPath).select(input_file_name()) .head.getString(0) assert(answer.contains(dir.getCanonicalPath)) - checkAnswer(data.select(inputFileName()).limit(1), Row("")) + checkAnswer(data.select(input_file_name()).limit(1), Row("")) + } + } + + test("input_file_name - HadoopRDD") { + withTempPath { dir => + val data = sparkContext.parallelize((0 to 10).map(_.toString)).toDF() + data.write.text(dir.getCanonicalPath) + val df = sparkContext.textFile(dir.getCanonicalPath).toDF() + val answer = df.select(input_file_name()).head.getString(0) + assert(answer.contains(dir.getCanonicalPath)) + + checkAnswer(data.select(input_file_name()).limit(1), Row("")) + } + } + + test("input_file_name - NewHadoopRDD") { + withTempPath { dir => + val data = sparkContext.parallelize((0 to 10).map(_.toString)).toDF() + data.write.text(dir.getCanonicalPath) + val rdd = sparkContext.newAPIHadoopFile( + dir.getCanonicalPath, + classOf[NewTextInputFormat], + classOf[LongWritable], + classOf[Text]) + val df = rdd.map(pair => pair._2.toString).toDF() + val answer = df.select(input_file_name()).head.getString(0) + assert(answer.contains(dir.getCanonicalPath)) + + checkAnswer(data.select(input_file_name()).limit(1), Row("")) } } From ccc7fa357099e0f621cfc02448ba20d3f6fabc14 Mon Sep 17 00:00:00 2001 From: Brian Uri Date: Thu, 30 Jun 2016 07:52:28 +0100 Subject: [PATCH 03/12] [SPARK-16257][BUILD] Update spark_ec2.py to support Spark 1.6.2 and 1.6.3. ## What changes were proposed in this pull request? - Adds 1.6.2 and 1.6.3 as supported Spark versions within the bundled spark-ec2 script. - Makes the default Spark version 1.6.3 to keep in sync with the upcoming release. - Does not touch the newer spark-ec2 scripts in the separate amplabs repository. ## How was this patch tested? - Manual script execution: export AWS_SECRET_ACCESS_KEY=_snip_ export AWS_ACCESS_KEY_ID=_snip_ $SPARK_HOME/ec2/spark-ec2 \ --key-pair=_snip_ \ --identity-file=_snip_ \ --region=us-east-1 \ --vpc-id=_snip_ \ --slaves=1 \ --instance-type=t1.micro \ --spark-version=1.6.2 \ --hadoop-major-version=yarn \ launch test-cluster - Result: Successful creation of a 1.6.2-based Spark cluster. This contribution is my original work and I license the work to the project under the project's open source license. Author: Brian Uri Closes #13947 from briuri/branch-1.6-bug-spark-16257. --- ec2/spark_ec2.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 76c09f04e3a2..b28b4c5e93ed 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -51,7 +51,7 @@ raw_input = input xrange = range -SPARK_EC2_VERSION = "1.6.1" +SPARK_EC2_VERSION = "1.6.3" SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__)) VALID_SPARK_VERSIONS = set([ @@ -77,6 +77,8 @@ "1.5.2", "1.6.0", "1.6.1", + "1.6.2", + "1.6.3", ]) SPARK_TACHYON_MAP = { @@ -96,6 +98,8 @@ "1.5.2": "0.7.1", "1.6.0": "0.8.2", "1.6.1": "0.8.2", + "1.6.2": "0.8.2", + "1.6.3": "0.8.2", } DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION @@ -103,7 +107,7 @@ # Default location to get the spark-ec2 scripts (and ami-list) from DEFAULT_SPARK_EC2_GITHUB_REPO = "https://github.com/amplab/spark-ec2" -DEFAULT_SPARK_EC2_BRANCH = "branch-1.5" +DEFAULT_SPARK_EC2_BRANCH = "branch-1.6" def setup_external_libs(libs): From 83f86044879b3c6bbfb0f3075cba552070b064cf Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Fri, 1 Jul 2016 09:22:27 +0100 Subject: [PATCH 04/12] [SPARK-16182][CORE] Utils.scala -- terminateProcess() should call Process.destroyForcibly() if and only if Process.destroy() fails ## What changes were proposed in this pull request? Utils.terminateProcess should `destroy()` first and only fall back to `destroyForcibly()` if it fails. It's kind of bad that we're force-killing executors -- and only in Java 8. See JIRA for an example of the impact: no shutdown While here: `Utils.waitForProcess` should use the Java 8 method if available instead of a custom implementation. ## How was this patch tested? Existing tests, which cover the force-kill case, and Amplab tests, which will cover both Java 7 and Java 8 eventually. However I tested locally on Java 8 and the PR builder will try Java 7 here. Author: Sean Owen Closes #13973 from srowen/SPARK-16182. (cherry picked from commit 2075bf8ef6035fd7606bcf20dc2cd7d7b9cda446) Signed-off-by: Sean Owen --- .../scala/org/apache/spark/util/Utils.scala | 76 +++++++++++-------- .../org/apache/spark/util/UtilsSuite.scala | 2 +- 2 files changed, 47 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 36ab3ac99f11..427b3822301c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1732,50 +1732,66 @@ private[spark] object Utils extends Logging { } /** - * Terminates a process waiting for at most the specified duration. Returns whether - * the process terminated. + * Terminates a process waiting for at most the specified duration. + * + * @return the process exit value if it was successfully terminated, else None */ def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = { - try { - // Java8 added a new API which will more forcibly kill the process. Use that if available. - val destroyMethod = process.getClass().getMethod("destroyForcibly"); - destroyMethod.setAccessible(true) - destroyMethod.invoke(process) - } catch { - case NonFatal(e) => - if (!e.isInstanceOf[NoSuchMethodException]) { - logWarning("Exception when attempting to kill process", e) - } - process.destroy() - } + // Politely destroy first + process.destroy() + if (waitForProcess(process, timeoutMs)) { + // Successful exit Option(process.exitValue()) } else { - None + // Java 8 added a new API which will more forcibly kill the process. Use that if available. + try { + classOf[Process].getMethod("destroyForcibly").invoke(process) + } catch { + case _: NoSuchMethodException => return None // Not available; give up + case NonFatal(e) => logWarning("Exception when attempting to kill process", e) + } + // Wait, again, although this really should return almost immediately + if (waitForProcess(process, timeoutMs)) { + Option(process.exitValue()) + } else { + logWarning("Timed out waiting to forcibly kill process") + None + } } } /** * Wait for a process to terminate for at most the specified duration. - * Return whether the process actually terminated after the given timeout. + * + * @return whether the process actually terminated before the given timeout. */ def waitForProcess(process: Process, timeoutMs: Long): Boolean = { - var terminated = false - val startTime = System.currentTimeMillis - while (!terminated) { - try { - process.exitValue() - terminated = true - } catch { - case e: IllegalThreadStateException => - // Process not terminated yet - if (System.currentTimeMillis - startTime > timeoutMs) { - return false + try { + // Use Java 8 method if available + classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit]) + .invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS) + .asInstanceOf[Boolean] + } catch { + case _: NoSuchMethodError => + // Otherwise implement it manually + var terminated = false + val startTime = System.currentTimeMillis + while (!terminated) { + try { + process.exitValue() + terminated = true + } catch { + case e: IllegalThreadStateException => + // Process not terminated yet + if (System.currentTimeMillis - startTime > timeoutMs) { + return false + } + Thread.sleep(100) } - Thread.sleep(100) - } + } + true } - true } /** diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 13f85a7fb409..4c3f18c509f8 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -831,7 +831,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(terminated.isDefined) Utils.waitForProcess(process, 5000) val duration = System.currentTimeMillis() - start - assert(duration < 5000) + assert(duration < 6000) // add a little extra time to allow a force kill to finish assert(!pidExists(pid)) } finally { signal(pid, "SIGKILL") From 1026aba16554f6c5b5a6a3fdc2b9bdb7911a9fcc Mon Sep 17 00:00:00 2001 From: MechCoder Date: Fri, 1 Jul 2016 09:27:34 +0100 Subject: [PATCH 05/12] [SPARK-15761][MLLIB][PYSPARK] Load ipython when default python is Python3 ## What changes were proposed in this pull request? I would like to use IPython with Python 3.5. It is annoying when it fails with IPython requires Python 2.7+; please install python2.7 or set PYSPARK_PYTHON when I have a version greater than 2.7 ## How was this patch tested It now works with IPython and Python3 Author: MechCoder Closes #13503 from MechCoder/spark-15761. (cherry picked from commit 66283ee0b25de2a5daaa21d50a05a7fadec1de77) Signed-off-by: Sean Owen --- bin/pyspark | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bin/pyspark b/bin/pyspark index 5eaa17d3c201..42af5971fe7b 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -54,9 +54,11 @@ elif [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"$DEFAULT_PYTHON"}" fi +WORKS_WITH_IPYTHON=$($DEFAULT_PYTHON -c 'import sys; print(sys.version_info >= (2, 7, 0))') + # Determine the Python executable to use for the executors: if [[ -z "$PYSPARK_PYTHON" ]]; then - if [[ $PYSPARK_DRIVER_PYTHON == *ipython* && $DEFAULT_PYTHON != "python2.7" ]]; then + if [[ $PYSPARK_DRIVER_PYTHON == *ipython* && ! WORKS_WITH_IPYTHON ]]; then echo "IPython requires Python 2.7+; please install python2.7 or set PYSPARK_PYTHON" 1>&2 exit 1 else From c25aa8fca64a1a83e909a8f9baddb7b2a3fdaec5 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 5 Jul 2016 00:40:08 +0800 Subject: [PATCH 06/12] [SPARK-16329][SQL][BACKPORT-1.6] Star Expansion over Table Containing No Column #14040 #### What changes were proposed in this pull request? Star expansion over a table containing zero column does not work since 1.6. However, it works in Spark 1.5.1. This PR is to fix the issue in the master branch. For example, ```scala val rddNoCols = sqlContext.sparkContext.parallelize(1 to 10).map(_ => Row.empty) val dfNoCols = sqlContext.createDataFrame(rddNoCols, StructType(Seq.empty)) dfNoCols.registerTempTable("temp_table_no_cols") sqlContext.sql("select * from temp_table_no_cols").show ``` Without the fix, users will get the following the exception: ``` java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221) at org.apache.spark.sql.catalyst.analysis.UnresolvedStar.expand(unresolved.scala:199) ``` #### How was this patch tested? Tests are added Author: gatorsmile Closes #14042 from gatorsmile/starExpansionEmpty. --- .../sql/catalyst/analysis/unresolved.scala | 15 ++++----- .../org/apache/spark/sql/SQLQuerySuite.scala | 31 +++++++++++++++++++ 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 4f89b462a6ce..390bd1fb592f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -180,23 +180,20 @@ abstract class Star extends LeafExpression with NamedExpression { case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevaluable { override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { + // If there is no table specified, use all input attributes. + if (target.isEmpty) return input.output - // First try to expand assuming it is table.*. - val expandedAttributes: Seq[Attribute] = target match { - // If there is no table specified, use all input attributes. - case None => input.output - // If there is a table, pick out attributes that are part of this table. - case Some(t) => if (t.size == 1) { - input.output.filter(_.qualifiers.exists(resolver(_, t.head))) + val expandedAttributes = + if (target.get.size == 1) { + // If there is a table, pick out attributes that are part of this table. + input.output.filter(_.qualifiers.exists(resolver(_, target.get.head))) } else { List() } - } if (expandedAttributes.nonEmpty) return expandedAttributes // Try to resolve it as a struct expansion. If there is a conflict and both are possible, // (i.e. [name].* is both a table and a struct), the struct path can always be qualified. - require(target.isDefined) val attribute = input.resolve(target.get, resolver) if (attribute.isDefined) { // This target resolved to an attribute in child. It must be a struct. Expand it. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 2be834359089..6fec5809ddcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1950,6 +1950,37 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("Star Expansion - table with zero column") { + withTempTable("temp_table_no_cols") { + val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty) + val dfNoCols = sqlContext.createDataFrame(rddNoCols, StructType(Seq.empty)) + dfNoCols.registerTempTable("temp_table_no_cols") + + // ResolvedStar + checkAnswer( + dfNoCols, + dfNoCols.select(dfNoCols.col("*"))) + + // UnresolvedStar + checkAnswer( + dfNoCols, + sql("SELECT * FROM temp_table_no_cols")) + checkAnswer( + dfNoCols, + dfNoCols.select($"*")) + + var e = intercept[AnalysisException] { + sql("SELECT a.* FROM temp_table_no_cols a") + }.getMessage + assert(e.contains("cannot resolve 'a.*' give input columns ''")) + + e = intercept[AnalysisException] { + dfNoCols.select($"b.*") + }.getMessage + assert(e.contains("cannot resolve 'b.*' give input columns ''")) + } + } + test("Common subexpression elimination") { // select from a table to prevent constant folding. val df = sql("SELECT a, b from testData2 limit 1") From 4fcb88843f5591619f85867e0407a49ed2a9756c Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Mon, 4 Jul 2016 21:16:17 +0100 Subject: [PATCH 07/12] [SPARK-16353][BUILD][DOC] Missing javadoc options for java unidoc Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-16353 ## What changes were proposed in this pull request? The javadoc options for the java unidoc generation are ignored when generating the java unidoc. For example, the generated `index.html` has the wrong HTML page title. This can be seen at http://spark.apache.org/docs/latest/api/java/index.html. I changed the relevant setting scope from `doc` to `(JavaUnidoc, unidoc)`. ## How was this patch tested? I ran `docs/jekyll build` and verified that the java unidoc `index.html` has the correct HTML page title. Author: Michael Allman Closes #14031 from mallman/spark-16353. (cherry picked from commit 7dbffcdd6dc76b8e8d6a9cd6eeb24323a6b740c3) Signed-off-by: Sean Owen --- project/SparkBuild.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index b1dcaedcba75..afbf93c21ac6 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -569,8 +569,7 @@ object Unidoc { ignoreUndocumentedPackages((unidocAllSources in (JavaUnidoc, unidoc)).value) }, - // Javadoc options: create a window title, and group key packages on index page - javacOptions in doc := Seq( + javacOptions in (JavaUnidoc, unidoc) := Seq( "-windowtitle", "Spark " + version.value.replaceAll("-SNAPSHOT", "") + " JavaDoc", "-public", "-group", "Core Java API", packageList("api.java", "api.java.function"), From 76781950fd500ace0f939951fc7a94a58aca87c4 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 6 Jul 2016 12:27:17 +0100 Subject: [PATCH 08/12] [MINOR][BUILD] Download Maven 3.3.9 instead of 3.3.3 because the latter is no longer published on Apache mirrors ## What changes were proposed in this pull request? Download Maven 3.3.9 instead of 3.3.3 because the latter is no longer published on Apache mirrors ## How was this patch tested? Jenkins Author: Sean Owen Closes #14066 from srowen/Maven339Branch16. --- build/mvn | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/mvn b/build/mvn index c2b142700aa4..85983861594b 100755 --- a/build/mvn +++ b/build/mvn @@ -69,7 +69,7 @@ install_app() { # Install maven under the build/ folder install_mvn() { - local MVN_VERSION="3.3.3" + local MVN_VERSION="3.3.9" install_app \ "https://www.apache.org/dyn/closer.lua?action=download&filename=/maven/maven-3/${MVN_VERSION}/binaries" \ From 2588776ad3d91e39300d61c8a12dc0803c28e866 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 6 Jul 2016 14:49:21 +0100 Subject: [PATCH 09/12] [MINOR][CORE][1.6-BACKPORT] Fix display wrong free memory size in the log ## What changes were proposed in this pull request? Free memory size displayed in the log is wrong (used memory), fix to make it correct. Backported to 1.6. ## How was this patch tested? N/A Author: jerryshao Closes #14043 from jerryshao/memory-log-fix-1.6-backport. --- core/src/main/scala/org/apache/spark/storage/MemoryStore.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 17aae6e1bbdd..aed0da96d34c 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -393,7 +393,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } val valuesOrBytes = if (deserialized) "values" else "bytes" logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( - blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed))) + blockId, valuesOrBytes, Utils.bytesToString(size), + Utils.bytesToString(maxMemory - blocksMemoryUsed))) } else { // Tell the block manager that we couldn't put it in memory so that it can drop it to // disk if the block allows disk storage. From 45dda92214191310a56333a2085e2343eba170cd Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 7 Jul 2016 11:28:04 +0100 Subject: [PATCH 10/12] [SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix ## What changes were proposed in this pull request? The following Java code because of type erasing: ```Java JavaRDD rows = jsc.parallelize(...); RowMatrix mat = new RowMatrix(rows.rdd()); QRDecomposition result = mat.tallSkinnyQR(true); ``` We should use retag to restore the type to prevent the following exception: ```Java java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.spark.mllib.linalg.Vector; ``` ## How was this patch tested? Java unit test Author: Xusen Yin Closes #14051 from yinxusen/SPARK-16372. (cherry picked from commit 4c6f00d09c016dfc1d2de6e694dff219c9027fa0) Signed-off-by: Sean Owen --- .../mllib/api/python/PythonMLLibAPI.scala | 2 +- .../mllib/linalg/distributed/RowMatrix.scala | 2 +- .../distributed/JavaRowMatrixSuite.java | 44 +++++++++++++++++++ 3 files changed, 46 insertions(+), 2 deletions(-) create mode 100644 mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 1714983d9828..a059e38c6bab 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1110,7 +1110,7 @@ private[python] class PythonMLLibAPI extends Serializable { * Wrapper around RowMatrix constructor. */ def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): RowMatrix = { - new RowMatrix(rows.rdd.retag(classOf[Vector]), numRows, numCols) + new RowMatrix(rows.rdd, numRows, numCols) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 52c0f19c645d..b941d1fb193e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -526,7 +526,7 @@ class RowMatrix @Since("1.0.0") ( def tallSkinnyQR(computeQ: Boolean = false): QRDecomposition[RowMatrix, Matrix] = { val col = numCols().toInt // split rows horizontally into smaller matrices, and compute QR for each of them - val blockQRs = rows.glom().map { partRows => + val blockQRs = rows.retag(classOf[Vector]).glom().map { partRows => val bdm = BDM.zeros[Double](partRows.length, col) var i = 0 partRows.foreach { row => diff --git a/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java b/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java new file mode 100644 index 000000000000..c01af405491b --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed; + +import java.util.Arrays; + +import org.junit.Test; + +import org.apache.spark.SharedSparkSession; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.mllib.linalg.Matrix; +import org.apache.spark.mllib.linalg.QRDecomposition; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; + +public class JavaRowMatrixSuite extends SharedSparkSession { + + @Test + public void rowMatrixQRDecomposition() { + Vector v1 = Vectors.dense(1.0, 10.0, 100.0); + Vector v2 = Vectors.dense(2.0, 20.0, 200.0); + Vector v3 = Vectors.dense(3.0, 30.0, 300.0); + + JavaRDD rows = jsc.parallelize(Arrays.asList(v1, v2, v3), 1); + RowMatrix mat = new RowMatrix(rows.rdd()); + + QRDecomposition result = mat.tallSkinnyQR(true); + } +} From bb92788f96426e57555ba5771e256c6425e0e75e Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 7 Jul 2016 10:34:50 -0700 Subject: [PATCH 11/12] Revert "[SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix" This reverts commit 45dda92214191310a56333a2085e2343eba170cd. --- .../mllib/api/python/PythonMLLibAPI.scala | 2 +- .../mllib/linalg/distributed/RowMatrix.scala | 2 +- .../distributed/JavaRowMatrixSuite.java | 44 ------------------- 3 files changed, 2 insertions(+), 46 deletions(-) delete mode 100644 mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index a059e38c6bab..1714983d9828 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1110,7 +1110,7 @@ private[python] class PythonMLLibAPI extends Serializable { * Wrapper around RowMatrix constructor. */ def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): RowMatrix = { - new RowMatrix(rows.rdd, numRows, numCols) + new RowMatrix(rows.rdd.retag(classOf[Vector]), numRows, numCols) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index b941d1fb193e..52c0f19c645d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -526,7 +526,7 @@ class RowMatrix @Since("1.0.0") ( def tallSkinnyQR(computeQ: Boolean = false): QRDecomposition[RowMatrix, Matrix] = { val col = numCols().toInt // split rows horizontally into smaller matrices, and compute QR for each of them - val blockQRs = rows.retag(classOf[Vector]).glom().map { partRows => + val blockQRs = rows.glom().map { partRows => val bdm = BDM.zeros[Double](partRows.length, col) var i = 0 partRows.foreach { row => diff --git a/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java b/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java deleted file mode 100644 index c01af405491b..000000000000 --- a/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg.distributed; - -import java.util.Arrays; - -import org.junit.Test; - -import org.apache.spark.SharedSparkSession; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.mllib.linalg.Matrix; -import org.apache.spark.mllib.linalg.QRDecomposition; -import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.Vectors; - -public class JavaRowMatrixSuite extends SharedSparkSession { - - @Test - public void rowMatrixQRDecomposition() { - Vector v1 = Vectors.dense(1.0, 10.0, 100.0); - Vector v2 = Vectors.dense(2.0, 20.0, 200.0); - Vector v3 = Vectors.dense(3.0, 30.0, 300.0); - - JavaRDD rows = jsc.parallelize(Arrays.asList(v1, v2, v3), 1); - RowMatrix mat = new RowMatrix(rows.rdd()); - - QRDecomposition result = mat.tallSkinnyQR(true); - } -} From 702178d1f1f02aaa6efa2de84f23d11be5a8e681 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 5 Jul 2016 16:55:22 -0700 Subject: [PATCH 12/12] [SPARK-16385][CORE] Catch correct exception when calling method via reflection. Using "Method.invoke" causes an exception to be thrown, not an error, so Utils.waitForProcess() was always throwing an exception when run on Java 7. Author: Marcelo Vanzin Closes #14056 from vanzin/SPARK-16385. (cherry picked from commit 59f9c1bd1adfea7069e769fb68351c228c37c8fc) Signed-off-by: Sean Owen --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 427b3822301c..6c19e580c733 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1773,7 +1773,7 @@ private[spark] object Utils extends Logging { .invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS) .asInstanceOf[Boolean] } catch { - case _: NoSuchMethodError => + case _: NoSuchMethodException => // Otherwise implement it manually var terminated = false val startTime = System.currentTimeMillis