Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
dd3b545
[SPARK-13309][SQL] Fix type inference issue with CSV data
Feb 29, 2016
d81a713
[SPARK-13545][MLLIB][PYSPARK] Make MLlib LogisticRegressionWithLBFGS'…
yanboliang Feb 29, 2016
99fe899
[SPARK-12994][CORE] It is not necessary to create ExecutorAllocationM…
zjffdu Feb 29, 2016
236e3c8
[SPARK-12633][PYSPARK] [DOC] PySpark regression parameter desc to con…
vijaykiran Feb 29, 2016
2f91f5a
[SPARK-13481] Desc order of appID by default for history server page.
Feb 29, 2016
ac5c635
[SPARK-13506][MLLIB] Fix the wrong parameter in R code comment in Ass…
zhengruifeng Feb 29, 2016
916fc34
[SPARK-13540][SQL] Supports using nested classes within Scala objects…
liancheng Feb 29, 2016
02aa499
[SPARK-13509][SPARK-13507][SQL] Support for writing CSV with a single…
HyukjinKwon Feb 29, 2016
bc65f60
[SPARK-13544][SQL] Rewrite/Propagate Constraints for Aliases in Aggre…
gatorsmile Feb 29, 2016
17a253c
[SPARK-13522][CORE] Executor should kill itself when it's unable to h…
zsxwing Feb 29, 2016
644dbb6
[SPARK-13522][CORE] Fix the exit log place for heartbeat
zsxwing Feb 29, 2016
4bd697d
[SPARK-13123][SQL] Implement whole state codegen for sort
sameeragarwal Feb 29, 2016
c7fccb5
[SPARK-13478][YARN] Use real user when fetching delegation tokens.
Feb 29, 2016
0a4b620
[SPARK-13551][MLLIB] Fix wrong comment and remove meanless lines in m…
zhengruifeng Mar 1, 2016
3c5f5e3
[SPARK-13550][ML] Add java example for ml.clustering.BisectingKMeans
zhengruifeng Mar 1, 2016
12a2a57
[SPARK-13592][WINDOWS] fix path of spark-submit2.cmd in spark-submit.cmd
tsudukim Mar 1, 2016
c43899a
[SPARK-13511] [SQL] Add wholestage codegen for limit
viirya Mar 1, 2016
5ed48dd
[SPARK-12811][ML] Estimator for Generalized Linear Models(GLMs)
yanboliang Mar 1, 2016
c37bbb3
Closes #11320
mengxr Mar 1, 2016
c27ba0d
[SPARK-13582] [SQL] defer dictionary decoding in parquet reader
Mar 1, 2016
b0ee7d4
[SPARK-13548][BUILD] Move tags and unsafe modules into common
rxin Mar 1, 2016
a640c5b
[SPARK-13598] [SQL] remove LeftSemiJoinBNL
Mar 2, 2016
e42724b
[SPARK-13167][SQL] Include rows with null values for partition column…
sureshthalamati Mar 2, 2016
9495c40
[SPARK-13008][ML][PYTHON] Put one alg per line in pyspark.ml all lists
jkbradley Mar 2, 2016
b4d096d
[BUILD][MINOR] Fix SBT build error with network-yarn module
jerryshao Mar 2, 2016
366f26d
[MINOR][STREAMING] Replace deprecated `apply` with `create` in example.
dongjoon-hyun Mar 2, 2016
75e618d
Fix run-tests.py typos
wjur Mar 2, 2016
d8afd45
[SPARK-13515] Make FormatNumber work irrespective of locale.
Mar 2, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/spark-submit.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ rem
rem This is the entry point for running Spark submit. To avoid polluting the
rem environment, it just launches a new cmd to do the real work.

cmd /V /E /C spark-submit2.cmd %*
cmd /V /E /C "%~dp0spark-submit2.cmd" %*
File renamed without changes.
2 changes: 1 addition & 1 deletion tags/pom.xml → common/tags/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
Expand Down
2 changes: 1 addition & 1 deletion unsafe/pom.xml → common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ $(document).ready(function() {
{name: 'seventh'},
{name: 'eighth'},
],
"autoWidth": false
"autoWidth": false,
"order": [[ 0, "desc" ]]
};

var rowGroupConf = {
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec

def isLocal: Boolean = (master == "local" || master.startsWith("local["))
def isLocal: Boolean = Utils.isLocalMaster(_conf)

/**
* @return true if context is stopped or in the midst of stopping.
Expand Down Expand Up @@ -526,10 +526,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
}

_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
}
}

if (proxyUser != null && principal != null) {
SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.")
}
}

private def validateKillArguments(): Unit = {
Expand Down Expand Up @@ -517,6 +521,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
|
| --proxy-user NAME User to impersonate when submitting the application.
| This argument does not work with --principal / --keytab.
|
| --help, -h Show this help message and exit
| --verbose, -v Print additional debug output
Expand Down
23 changes: 22 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,19 @@ private[spark] class Executor(
private val heartbeatReceiverRef =
RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)

/**
* When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
* times, it should kill itself. The default value is 60. It means we will retry to send
* heartbeats about 10 minutes because the heartbeat interval is 10s.
*/
private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60)

/**
* Count the failure times of heartbeat. It should only be acessed in the heartbeat thread. Each
* successful heartbeat will reset it to 0.
*/
private var heartbeatFailures = 0

startDriverHeartbeater()

def launchTask(
Expand Down Expand Up @@ -461,8 +474,16 @@ private[spark] class Executor(
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
}
heartbeatFailures = 0
} catch {
case NonFatal(e) => logWarning("Issue communicating with driver in heartbeater", e)
case NonFatal(e) =>
logWarning("Issue communicating with driver in heartbeater", e)
heartbeatFailures += 1
if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
logError(s"Exit as unable to send heartbeats to driver " +
s"more than $HEARTBEAT_MAX_FAILURES times")
System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ object ExecutorExitCode {
/** ExternalBlockStore failed to create a local temporary directory after many attempts. */
val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55

/**
* Executor is unable to send heartbeats to the driver more than
* "spark.executor.heartbeat.maxFailures" times.
*/
val HEARTBEAT_FAILURE = 56

def explainExitCode(exitCode: Int): String = {
exitCode match {
case UNCAUGHT_EXCEPTION => "Uncaught exception"
Expand All @@ -51,6 +57,8 @@ object ExecutorExitCode {
// TODO: replace external block store with concrete implementation name
case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR =>
"ExternalBlockStore failed to create a local temporary directory."
case HEARTBEAT_FAILURE =>
"Unable to send heartbeats to driver."
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
if (exitCode > 128) {
Expand Down
19 changes: 17 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2195,15 +2195,30 @@ private[spark] object Utils extends Logging {
isInDirectory(parent, child.getParentFile)
}


/**
*
* @return whether it is local mode
*/
def isLocalMaster(conf: SparkConf): Boolean = {
val master = conf.get("spark.master", "")
master == "local" || master.startsWith("local[")
}

/**
* Return whether dynamic allocation is enabled in the given conf
* Dynamic allocation and explicitly setting the number of executors are inherently
* incompatible. In environments where dynamic allocation is turned on by default,
* the latter should override the former (SPARK-9092).
*/
def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
conf.getBoolean("spark.dynamicAllocation.enabled", false) &&
conf.getInt("spark.executor.instances", 0) == 0
val numExecutor = conf.getInt("spark.executor.instances", 0)
val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
if (numExecutor != 0 && dynamicAllocationEnabled) {
logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
}
numExecutor == 0 && dynamicAllocationEnabled &&
(!isLocalMaster(conf) || conf.getBoolean("spark.dynamicAllocation.testing", false))
}

def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
Expand Down
3 changes: 3 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {

test("isDynamicAllocationEnabled") {
val conf = new SparkConf()
conf.set("spark.master", "yarn-client")
assert(Utils.isDynamicAllocationEnabled(conf) === false)
assert(Utils.isDynamicAllocationEnabled(
conf.set("spark.dynamicAllocation.enabled", "false")) === false)
Expand All @@ -731,6 +732,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
conf.set("spark.executor.instances", "1")) === false)
assert(Utils.isDynamicAllocationEnabled(
conf.set("spark.executor.instances", "0")) === true)
assert(Utils.isDynamicAllocationEnabled(conf.set("spark.master", "local")) === false)
assert(Utils.isDynamicAllocationEnabled(conf.set("spark.dynamicAllocation.testing", "true")))
}

test("encodeFileNameToURIRawPath") {
Expand Down
2 changes: 1 addition & 1 deletion dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ def main():
if which("R"):
run_cmd([os.path.join(SPARK_HOME, "R", "install-dev.sh")])
else:
print("Can't install SparkR as R is was not found in PATH")
print("Cannot install SparkR as R was not found in PATH")

if os.environ.get("AMPLAB_JENKINS"):
# if we're on the Amplab Jenkins build servers setup variables
Expand Down
2 changes: 1 addition & 1 deletion dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ def __hash__(self):
],
sbt_test_goals=[
"yarn/test",
"common/network-yarn/test",
"network-yarn/test",
],
test_tags=[
"org.apache.spark.tags.ExtendedYarnTest"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.ml;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
// $example on$
import org.apache.spark.ml.clustering.BisectingKMeans;
import org.apache.spark.ml.clustering.BisectingKMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// $example off$


/**
* An example demonstrating a bisecting k-means clustering.
*/
public class JavaBisectingKMeansExample {

public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaBisectingKMeansExample");
JavaSparkContext jsc = new JavaSparkContext(conf);
SQLContext jsql = new SQLContext(jsc);

// $example on$
JavaRDD<Row> data = jsc.parallelize(Arrays.asList(
RowFactory.create(Vectors.dense(0.1, 0.1, 0.1)),
RowFactory.create(Vectors.dense(0.3, 0.3, 0.25)),
RowFactory.create(Vectors.dense(0.1, 0.1, -0.1)),
RowFactory.create(Vectors.dense(20.3, 20.1, 19.9)),
RowFactory.create(Vectors.dense(20.2, 20.1, 19.7)),
RowFactory.create(Vectors.dense(18.9, 20.0, 19.7))
));

StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});

DataFrame dataset = jsql.createDataFrame(data, schema);

BisectingKMeans bkm = new BisectingKMeans().setK(2);
BisectingKMeansModel model = bkm.fit(dataset);

System.out.println("Compute Cost: " + model.computeCost(dataset));

Vector[] clusterCenters = model.clusterCenters();
for (int i = 0; i < clusterCenters.length; i++) {
Vector clusterCenter = clusterCenters[i];
System.out.println("Cluster Center " + i + ": " + clusterCenter);
}
// $example off$

jsc.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
// $example off$

/**
* Java example for graph clustering using power iteration clustering (PIC).
* Java example for bisecting k-means clustering.
*/
public class JavaBisectingKMeansExample {
public static void main(String[] args) {
Expand All @@ -54,9 +54,7 @@ public static void main(String[] args) {
BisectingKMeansModel model = bkm.run(data);

System.out.println("Compute Cost: " + model.computeCost(data));
for (Vector center: model.clusterCenters()) {
System.out.println("");
}

Vector[] clusterCenters = model.clusterCenters();
for (int i = 0; i < clusterCenters.length; i++) {
Vector clusterCenter = clusterCenters[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object TwitterAlgebirdHLL {
var userSet: Set[Long] = Set()

val approxUsers = users.mapPartitions(ids => {
ids.map(id => hll(id))
ids.map(id => hll.create(id))
}).reduce(_ + _)

val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ private[ml] class WeightedLeastSquares(

private[ml] object WeightedLeastSquares {

/**
* In order to take the normal equation approach efficiently, [[WeightedLeastSquares]]
* only supports the number of features is no more than 4096.
*/
val MAX_NUM_FEATURES: Int = 4096

/**
* Aggregator to provide necessary summary statistics for solving [[WeightedLeastSquares]].
*/
Expand All @@ -174,8 +180,8 @@ private[ml] object WeightedLeastSquares {
private var aaSum: DenseVector = _

private def init(k: Int): Unit = {
require(k <= 4096, "In order to take the normal equation approach efficiently, " +
s"we set the max number of features to 4096 but got $k.")
require(k <= MAX_NUM_FEATURES, "In order to take the normal equation approach efficiently, " +
s"we set the max number of features to $MAX_NUM_FEATURES but got $k.")
this.k = k
triK = k * (k + 1) / 2
count = 0L
Expand Down
Loading