Skip to content

Commit d249484

Browse files
committed
fix merge conflict
2 parents 264adb8 + 5196eff commit d249484

File tree

59 files changed

+1613
-341
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1613
-341
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,8 +1159,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
11591159
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
11601160
withScope {
11611161
assertNotStopped()
1162-
val kc = kcf()
1163-
val vc = vcf()
1162+
val kc = clean(kcf)()
1163+
val vc = clean(vcf)()
11641164
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
11651165
val writables = hadoopFile(path, format,
11661166
kc.writableClass(km).asInstanceOf[Class[Writable]],

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
296296
* before sending results to a reducer, similarly to a "combiner" in MapReduce.
297297
*/
298298
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
299+
val cleanedF = self.sparkContext.clean(func)
299300

300301
if (keyClass.isArray) {
301302
throw new SparkException("reduceByKeyLocally() does not support array keys")
@@ -305,15 +306,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
305306
val map = new JHashMap[K, V]
306307
iter.foreach { pair =>
307308
val old = map.get(pair._1)
308-
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
309+
map.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
309310
}
310311
Iterator(map)
311312
} : Iterator[JHashMap[K, V]]
312313

313314
val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
314315
m2.foreach { pair =>
315316
val old = m1.get(pair._1)
316-
m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
317+
m1.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
317318
}
318319
m1
319320
} : JHashMap[K, V]

core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.util.control.NonFatal
2727

2828
import org.apache.spark.Logging
2929

30-
private[serializer] object SerializationDebugger extends Logging {
30+
private[spark] object SerializationDebugger extends Logging {
3131

3232
/**
3333
* Improve the given NotSerializableException with the serialization path leading from the given

core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import scala.reflect.ClassTag;
2727
import scala.runtime.AbstractFunction1;
2828

29+
import com.google.common.collect.Iterators;
2930
import com.google.common.collect.HashMultiset;
3031
import com.google.common.io.ByteStreams;
3132
import org.junit.After;
@@ -252,6 +253,20 @@ public void doNotNeedToCallWriteBeforeUnsuccessfulStop() throws IOException {
252253
createWriter(false).stop(false);
253254
}
254255

256+
@Test
257+
public void writeEmptyIterator() throws Exception {
258+
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
259+
writer.write(Iterators.<Product2<Object, Object>>emptyIterator());
260+
final Option<MapStatus> mapStatus = writer.stop(true);
261+
assertTrue(mapStatus.isDefined());
262+
assertTrue(mergedOutputFile.exists());
263+
assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
264+
assertEquals(0, taskMetrics.shuffleWriteMetrics().get().shuffleRecordsWritten());
265+
assertEquals(0, taskMetrics.shuffleWriteMetrics().get().shuffleBytesWritten());
266+
assertEquals(0, taskMetrics.diskBytesSpilled());
267+
assertEquals(0, taskMetrics.memoryBytesSpilled());
268+
}
269+
255270
@Test
256271
public void writeWithoutSpilling() throws Exception {
257272
// In this example, each partition should have exactly one record:

core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ class ClosureCleanerSuite extends FunSuite {
112112
expectCorrectException { TestUserClosuresActuallyCleaned.testAggregateByKey(pairRdd) }
113113
expectCorrectException { TestUserClosuresActuallyCleaned.testFoldByKey(pairRdd) }
114114
expectCorrectException { TestUserClosuresActuallyCleaned.testReduceByKey(pairRdd) }
115+
expectCorrectException { TestUserClosuresActuallyCleaned.testReduceByKeyLocally(pairRdd) }
115116
expectCorrectException { TestUserClosuresActuallyCleaned.testMapValues(pairRdd) }
116117
expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMapValues(pairRdd) }
117118
expectCorrectException { TestUserClosuresActuallyCleaned.testForeachAsync(rdd) }
@@ -315,6 +316,9 @@ private object TestUserClosuresActuallyCleaned {
315316
}
316317
def testFoldByKey(rdd: RDD[(Int, Int)]): Unit = { rdd.foldByKey(0) { case (_, _) => return; 1 } }
317318
def testReduceByKey(rdd: RDD[(Int, Int)]): Unit = { rdd.reduceByKey { case (_, _) => return; 1 } }
319+
def testReduceByKeyLocally(rdd: RDD[(Int, Int)]): Unit = {
320+
rdd.reduceByKeyLocally { case (_, _) => return; 1 }
321+
}
318322
def testMapValues(rdd: RDD[(Int, Int)]): Unit = { rdd.mapValues { _ => return; 1 } }
319323
def testFlatMapValues(rdd: RDD[(Int, Int)]): Unit = { rdd.flatMapValues { _ => return; Seq() } }
320324

docs/ml-features.md

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,5 +440,100 @@ for expanded in polyDF.select("polyFeatures").take(3):
440440
</div>
441441
</div>
442442

443+
## OneHotEncoder
444+
445+
[One-hot encoding](http://en.wikipedia.org/wiki/One-hot) maps a column of label indices to a column of binary vectors, with at most a single one-value. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features
446+
447+
<div class="codetabs">
448+
<div data-lang="scala" markdown="1">
449+
{% highlight scala %}
450+
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
451+
452+
val df = sqlContext.createDataFrame(Seq(
453+
(0, "a"),
454+
(1, "b"),
455+
(2, "c"),
456+
(3, "a"),
457+
(4, "a"),
458+
(5, "c")
459+
)).toDF("id", "category")
460+
461+
val indexer = new StringIndexer()
462+
.setInputCol("category")
463+
.setOutputCol("categoryIndex")
464+
.fit(df)
465+
val indexed = indexer.transform(df)
466+
467+
val encoder = new OneHotEncoder().setInputCol("categoryIndex").
468+
setOutputCol("categoryVec")
469+
val encoded = encoder.transform(indexed)
470+
encoded.select("id", "categoryVec").foreach(println)
471+
{% endhighlight %}
472+
</div>
473+
474+
<div data-lang="java" markdown="1">
475+
{% highlight java %}
476+
import com.google.common.collect.Lists;
477+
478+
import org.apache.spark.api.java.JavaRDD;
479+
import org.apache.spark.ml.feature.OneHotEncoder;
480+
import org.apache.spark.ml.feature.StringIndexer;
481+
import org.apache.spark.ml.feature.StringIndexerModel;
482+
import org.apache.spark.sql.DataFrame;
483+
import org.apache.spark.sql.Row;
484+
import org.apache.spark.sql.RowFactory;
485+
import org.apache.spark.sql.types.DataTypes;
486+
import org.apache.spark.sql.types.Metadata;
487+
import org.apache.spark.sql.types.StructField;
488+
import org.apache.spark.sql.types.StructType;
489+
490+
JavaRDD<Row> jrdd = jsc.parallelize(Lists.newArrayList(
491+
RowFactory.create(0, "a"),
492+
RowFactory.create(1, "b"),
493+
RowFactory.create(2, "c"),
494+
RowFactory.create(3, "a"),
495+
RowFactory.create(4, "a"),
496+
RowFactory.create(5, "c")
497+
));
498+
StructType schema = new StructType(new StructField[]{
499+
new StructField("id", DataTypes.DoubleType, false, Metadata.empty()),
500+
new StructField("category", DataTypes.StringType, false, Metadata.empty())
501+
});
502+
DataFrame df = sqlContext.createDataFrame(jrdd, schema);
503+
StringIndexerModel indexer = new StringIndexer()
504+
.setInputCol("category")
505+
.setOutputCol("categoryIndex")
506+
.fit(df);
507+
DataFrame indexed = indexer.transform(df);
508+
509+
OneHotEncoder encoder = new OneHotEncoder()
510+
.setInputCol("categoryIndex")
511+
.setOutputCol("categoryVec");
512+
DataFrame encoded = encoder.transform(indexed);
513+
{% endhighlight %}
514+
</div>
515+
516+
<div data-lang="python" markdown="1">
517+
{% highlight python %}
518+
from pyspark.ml.feature import OneHotEncoder, StringIndexer
519+
520+
df = sqlContext.createDataFrame([
521+
(0, "a"),
522+
(1, "b"),
523+
(2, "c"),
524+
(3, "a"),
525+
(4, "a"),
526+
(5, "c")
527+
], ["id", "category"])
528+
529+
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
530+
model = stringIndexer.fit(df)
531+
indexed = model.transform(df)
532+
encoder = OneHotEncoder(includeFirst=False, inputCol="categoryIndex", outputCol="categoryVec")
533+
encoded = encoder.transform(indexed)
534+
{% endhighlight %}
535+
</div>
536+
</div>
537+
443538
# Feature Selectors
444539

docs/mllib-data-types.md

Lines changed: 64 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -296,70 +296,6 @@ backed by an RDD of its entries.
296296
The underlying RDDs of a distributed matrix must be deterministic, because we cache the matrix size.
297297
In general the use of non-deterministic RDDs can lead to errors.
298298

299-
### BlockMatrix
300-
301-
A `BlockMatrix` is a distributed matrix backed by an RDD of `MatrixBlock`s, where a `MatrixBlock` is
302-
a tuple of `((Int, Int), Matrix)`, where the `(Int, Int)` is the index of the block, and `Matrix` is
303-
the sub-matrix at the given index with size `rowsPerBlock` x `colsPerBlock`.
304-
`BlockMatrix` supports methods such as `add` and `multiply` with another `BlockMatrix`.
305-
`BlockMatrix` also has a helper function `validate` which can be used to check whether the
306-
`BlockMatrix` is set up properly.
307-
308-
<div class="codetabs">
309-
<div data-lang="scala" markdown="1">
310-
311-
A [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) can be
312-
most easily created from an `IndexedRowMatrix` or `CoordinateMatrix` by calling `toBlockMatrix`.
313-
`toBlockMatrix` creates blocks of size 1024 x 1024 by default.
314-
Users may change the block size by supplying the values through `toBlockMatrix(rowsPerBlock, colsPerBlock)`.
315-
316-
{% highlight scala %}
317-
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
318-
319-
val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
320-
// Create a CoordinateMatrix from an RDD[MatrixEntry].
321-
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
322-
// Transform the CoordinateMatrix to a BlockMatrix
323-
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
324-
325-
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
326-
// Nothing happens if it is valid.
327-
matA.validate()
328-
329-
// Calculate A^T A.
330-
val ata = matA.transpose.multiply(matA)
331-
{% endhighlight %}
332-
</div>
333-
334-
<div data-lang="java" markdown="1">
335-
336-
A [`BlockMatrix`](api/java/org/apache/spark/mllib/linalg/distributed/BlockMatrix.html) can be
337-
most easily created from an `IndexedRowMatrix` or `CoordinateMatrix` by calling `toBlockMatrix`.
338-
`toBlockMatrix` creates blocks of size 1024 x 1024 by default.
339-
Users may change the block size by supplying the values through `toBlockMatrix(rowsPerBlock, colsPerBlock)`.
340-
341-
{% highlight java %}
342-
import org.apache.spark.api.java.JavaRDD;
343-
import org.apache.spark.mllib.linalg.distributed.BlockMatrix;
344-
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
345-
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
346-
347-
JavaRDD<MatrixEntry> entries = ... // a JavaRDD of (i, j, v) Matrix Entries
348-
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
349-
CoordinateMatrix coordMat = new CoordinateMatrix(entries.rdd());
350-
// Transform the CoordinateMatrix to a BlockMatrix
351-
BlockMatrix matA = coordMat.toBlockMatrix().cache();
352-
353-
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
354-
// Nothing happens if it is valid.
355-
matA.validate();
356-
357-
// Calculate A^T A.
358-
BlockMatrix ata = matA.transpose().multiply(matA);
359-
{% endhighlight %}
360-
</div>
361-
</div>
362-
363299
### RowMatrix
364300

365301
A `RowMatrix` is a row-oriented distributed matrix without meaningful row indices, backed by an RDD
@@ -530,3 +466,67 @@ IndexedRowMatrix indexedRowMatrix = mat.toIndexedRowMatrix();
530466
{% endhighlight %}
531467
</div>
532468
</div>
469+
470+
### BlockMatrix
471+
472+
A `BlockMatrix` is a distributed matrix backed by an RDD of `MatrixBlock`s, where a `MatrixBlock` is
473+
a tuple of `((Int, Int), Matrix)`, where the `(Int, Int)` is the index of the block, and `Matrix` is
474+
the sub-matrix at the given index with size `rowsPerBlock` x `colsPerBlock`.
475+
`BlockMatrix` supports methods such as `add` and `multiply` with another `BlockMatrix`.
476+
`BlockMatrix` also has a helper function `validate` which can be used to check whether the
477+
`BlockMatrix` is set up properly.
478+
479+
<div class="codetabs">
480+
<div data-lang="scala" markdown="1">
481+
482+
A [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) can be
483+
most easily created from an `IndexedRowMatrix` or `CoordinateMatrix` by calling `toBlockMatrix`.
484+
`toBlockMatrix` creates blocks of size 1024 x 1024 by default.
485+
Users may change the block size by supplying the values through `toBlockMatrix(rowsPerBlock, colsPerBlock)`.
486+
487+
{% highlight scala %}
488+
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
489+
490+
val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
491+
// Create a CoordinateMatrix from an RDD[MatrixEntry].
492+
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
493+
// Transform the CoordinateMatrix to a BlockMatrix
494+
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
495+
496+
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
497+
// Nothing happens if it is valid.
498+
matA.validate()
499+
500+
// Calculate A^T A.
501+
val ata = matA.transpose.multiply(matA)
502+
{% endhighlight %}
503+
</div>
504+
505+
<div data-lang="java" markdown="1">
506+
507+
A [`BlockMatrix`](api/java/org/apache/spark/mllib/linalg/distributed/BlockMatrix.html) can be
508+
most easily created from an `IndexedRowMatrix` or `CoordinateMatrix` by calling `toBlockMatrix`.
509+
`toBlockMatrix` creates blocks of size 1024 x 1024 by default.
510+
Users may change the block size by supplying the values through `toBlockMatrix(rowsPerBlock, colsPerBlock)`.
511+
512+
{% highlight java %}
513+
import org.apache.spark.api.java.JavaRDD;
514+
import org.apache.spark.mllib.linalg.distributed.BlockMatrix;
515+
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
516+
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
517+
518+
JavaRDD<MatrixEntry> entries = ... // a JavaRDD of (i, j, v) Matrix Entries
519+
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
520+
CoordinateMatrix coordMat = new CoordinateMatrix(entries.rdd());
521+
// Transform the CoordinateMatrix to a BlockMatrix
522+
BlockMatrix matA = coordMat.toBlockMatrix().cache();
523+
524+
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
525+
// Nothing happens if it is valid.
526+
matA.validate();
527+
528+
// Calculate A^T A.
529+
BlockMatrix ata = matA.transpose().multiply(matA);
530+
{% endhighlight %}
531+
</div>
532+
</div>

docs/running-on-yarn.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,22 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
7171
</tr>
7272
<tr>
7373
<td><code>spark.yarn.scheduler.heartbeat.interval-ms</code></td>
74-
<td>5000</td>
74+
<td>3000</td>
7575
<td>
7676
The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager.
77+
The value is capped at half the value of YARN's configuration for the expiry interval
78+
(<code>yarn.am.liveness-monitor.expiry-interval-ms</code>).
79+
</td>
80+
</tr>
81+
<tr>
82+
<td><code>spark.yarn.scheduler.initial-allocation.interval</code></td>
83+
<td>200ms</td>
84+
<td>
85+
The initial interval in which the Spark application master eagerly heartbeats to the YARN ResourceManager
86+
when there are pending container allocation requests. It should be no larger than
87+
<code>spark.yarn.scheduler.heartbeat.interval-ms</code>. The allocation interval will doubled on
88+
successive eager heartbeats if pending containers still exist, until
89+
<code>spark.yarn.scheduler.heartbeat.interval-ms</code> is reached.
7790
</td>
7891
</tr>
7992
<tr>

ec2/spark_ec2.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,11 @@ def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state):
864864
for i in cluster_instances:
865865
i.update()
866866

867-
statuses = conn.get_all_instance_status(instance_ids=[i.id for i in cluster_instances])
867+
max_batch = 100
868+
statuses = []
869+
for j in xrange(0, len(cluster_instances), max_batch):
870+
batch = [i.id for i in cluster_instances[j:j + max_batch]]
871+
statuses.extend(conn.get_all_instance_status(instance_ids=batch))
868872

869873
if cluster_state == 'ssh-ready':
870874
if all(i.state == 'running' for i in cluster_instances) and \

mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private[shared] object SharedParamsCodeGen {
4949
isValid = "ParamValidators.inRange(0, 1)"),
5050
ParamDesc[String]("inputCol", "input column name"),
5151
ParamDesc[Array[String]]("inputCols", "input column names"),
52-
ParamDesc[String]("outputCol", "output column name"),
52+
ParamDesc[String]("outputCol", "output column name", Some("uid + \"__output\"")),
5353
ParamDesc[Int]("checkpointInterval", "checkpoint interval (>= 1)",
5454
isValid = "ParamValidators.gtEq(1)"),
5555
ParamDesc[Boolean]("fitIntercept", "whether to fit an intercept term", Some("true")),

0 commit comments

Comments
 (0)