Skip to content

Commit 7ee6737

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into rest
2 parents e2f7f5f + 0a95085 commit 7ee6737

File tree

42 files changed

+831
-166
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+831
-166
lines changed

core/src/main/resources/org/apache/spark/log4j-defaults.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,3 @@ log4j.logger.org.eclipse.jetty=WARN
1010
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
1111
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
1212
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
13-
log4j.logger.org.apache.hadoop.yarn.util.RackResolver=WARN

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1986,7 +1986,7 @@ object SparkContext extends Logging {
19861986
case "yarn-client" =>
19871987
val scheduler = try {
19881988
val clazz =
1989-
Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
1989+
Class.forName("org.apache.spark.scheduler.cluster.YarnScheduler")
19901990
val cons = clazz.getConstructor(classOf[SparkContext])
19911991
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
19921992

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,16 @@ private[spark] class PythonRDD(
6767
envVars += ("SPARK_REUSE_WORKER" -> "1")
6868
}
6969
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
70+
// Whether is the worker released into idle pool
71+
@volatile var released = false
7072

7173
// Start a thread to feed the process input from our parent's iterator
7274
val writerThread = new WriterThread(env, worker, split, context)
7375

74-
var complete_cleanly = false
7576
context.addTaskCompletionListener { context =>
7677
writerThread.shutdownOnTaskCompletion()
7778
writerThread.join()
78-
if (reuse_worker && complete_cleanly) {
79-
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
80-
} else {
79+
if (!reuse_worker || !released) {
8180
try {
8281
worker.close()
8382
} catch {
@@ -145,8 +144,12 @@ private[spark] class PythonRDD(
145144
stream.readFully(update)
146145
accumulator += Collections.singletonList(update)
147146
}
147+
// Check whether the worker is ready to be re-used.
148148
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
149-
complete_cleanly = true
149+
if (reuse_worker) {
150+
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
151+
released = true
152+
}
150153
}
151154
null
152155
}

core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ class SparkContextSchedulerCreationSuite
149149
}
150150

151151
test("yarn-client") {
152-
testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
152+
testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnScheduler")
153153
}
154154

155155
def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
Gradient boosted Trees classification and regression using MLlib.
20+
"""
21+
22+
import sys
23+
24+
from pyspark.context import SparkContext
25+
from pyspark.mllib.tree import GradientBoostedTrees
26+
from pyspark.mllib.util import MLUtils
27+
28+
29+
def testClassification(trainingData, testData):
30+
# Train a GradientBoostedTrees model.
31+
# Empty categoricalFeaturesInfo indicates all features are continuous.
32+
model = GradientBoostedTrees.trainClassifier(trainingData, categoricalFeaturesInfo={},
33+
numIterations=30, maxDepth=4)
34+
# Evaluate model on test instances and compute test error
35+
predictions = model.predict(testData.map(lambda x: x.features))
36+
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
37+
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() \
38+
/ float(testData.count())
39+
print('Test Error = ' + str(testErr))
40+
print('Learned classification ensemble model:')
41+
print(model.toDebugString())
42+
43+
44+
def testRegression(trainingData, testData):
45+
# Train a GradientBoostedTrees model.
46+
# Empty categoricalFeaturesInfo indicates all features are continuous.
47+
model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo={},
48+
numIterations=30, maxDepth=4)
49+
# Evaluate model on test instances and compute test error
50+
predictions = model.predict(testData.map(lambda x: x.features))
51+
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
52+
testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum() \
53+
/ float(testData.count())
54+
print('Test Mean Squared Error = ' + str(testMSE))
55+
print('Learned regression ensemble model:')
56+
print(model.toDebugString())
57+
58+
59+
if __name__ == "__main__":
60+
if len(sys.argv) > 1:
61+
print >> sys.stderr, "Usage: gradient_boosted_trees"
62+
exit(1)
63+
sc = SparkContext(appName="PythonGradientBoostedTrees")
64+
65+
# Load and parse the data file into an RDD of LabeledPoint.
66+
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
67+
# Split the data into training and test sets (30% held out for testing)
68+
(trainingData, testData) = data.randomSplit([0.7, 0.3])
69+
70+
print('\nRunning example of classification using GradientBoostedTrees\n')
71+
testClassification(trainingData, testData)
72+
73+
print('\nRunning example of regression using GradientBoostedTrees\n')
74+
testRegression(trainingData, testData)
75+
76+
sc.stop()

mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ import org.apache.spark.mllib.regression._
4141
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
4242
import org.apache.spark.mllib.stat.correlation.CorrelationNames
4343
import org.apache.spark.mllib.stat.test.ChiSqTestResult
44-
import org.apache.spark.mllib.tree.{RandomForest, DecisionTree}
45-
import org.apache.spark.mllib.tree.configuration.{Algo, Strategy}
44+
import org.apache.spark.mllib.tree.{GradientBoostedTrees, RandomForest, DecisionTree}
45+
import org.apache.spark.mllib.tree.configuration.{BoostingStrategy, Algo, Strategy}
4646
import org.apache.spark.mllib.tree.impurity._
47-
import org.apache.spark.mllib.tree.model.{RandomForestModel, DecisionTreeModel}
47+
import org.apache.spark.mllib.tree.loss.Losses
48+
import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel, RandomForestModel, DecisionTreeModel}
4849
import org.apache.spark.mllib.util.MLUtils
4950
import org.apache.spark.rdd.RDD
5051
import org.apache.spark.storage.StorageLevel
@@ -532,6 +533,35 @@ class PythonMLLibAPI extends Serializable {
532533
}
533534
}
534535

536+
/**
537+
* Java stub for Python mllib GradientBoostedTrees.train().
538+
* This stub returns a handle to the Java object instead of the content of the Java object.
539+
* Extra care needs to be taken in the Python code to ensure it gets freed on exit;
540+
* see the Py4J documentation.
541+
*/
542+
def trainGradientBoostedTreesModel(
543+
data: JavaRDD[LabeledPoint],
544+
algoStr: String,
545+
categoricalFeaturesInfo: JMap[Int, Int],
546+
lossStr: String,
547+
numIterations: Int,
548+
learningRate: Double,
549+
maxDepth: Int): GradientBoostedTreesModel = {
550+
val boostingStrategy = BoostingStrategy.defaultParams(algoStr)
551+
boostingStrategy.setLoss(Losses.fromString(lossStr))
552+
boostingStrategy.setNumIterations(numIterations)
553+
boostingStrategy.setLearningRate(learningRate)
554+
boostingStrategy.treeStrategy.setMaxDepth(maxDepth)
555+
boostingStrategy.treeStrategy.categoricalFeaturesInfo = categoricalFeaturesInfo.asScala.toMap
556+
557+
val cached = data.rdd.persist(StorageLevel.MEMORY_AND_DISK)
558+
try {
559+
GradientBoostedTrees.train(cached, boostingStrategy)
560+
} finally {
561+
cached.unpersist(blocking = false)
562+
}
563+
}
564+
535565
/**
536566
* Java stub for mllib Statistics.colStats(X: RDD[Vector]).
537567
* TODO figure out return type.

mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,13 @@ class Word2Vec extends Serializable with Logging {
290290

291291
val newSentences = sentences.repartition(numPartitions).cache()
292292
val initRandom = new XORShiftRandom(seed)
293+
294+
if (vocabSize.toLong * vectorSize * 8 >= Int.MaxValue) {
295+
throw new RuntimeException("Please increase minCount or decrease vectorSize in Word2Vec" +
296+
" to avoid an OOM. You are highly recommended to make your vocabSize*vectorSize, " +
297+
"which is " + vocabSize + "*" + vectorSize + " for now, less than `Int.MaxValue/8`.")
298+
}
299+
293300
val syn0Global =
294301
Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 0.5f) / vectorSize)
295302
val syn1Global = new Array[Float](vocabSize * vectorSize)

mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ sealed trait Vector extends Serializable {
7878
result = 31 * result + (bits ^ (bits >>> 32)).toInt
7979
}
8080
}
81-
return result
81+
result
8282
}
8383

8484
/**

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,15 @@ class BlockMatrix(
232232
new DenseMatrix(m, n, values)
233233
}
234234

235+
/** Transpose this `BlockMatrix`. Returns a new `BlockMatrix` instance sharing the
236+
* same underlying data. Is a lazy operation. */
237+
def transpose: BlockMatrix = {
238+
val transposedBlocks = blocks.map { case ((blockRowIndex, blockColIndex), mat) =>
239+
((blockColIndex, blockRowIndex), mat.transpose)
240+
}
241+
new BlockMatrix(transposedBlocks, colsPerBlock, rowsPerBlock, nCols, nRows)
242+
}
243+
235244
/** Collects data and assembles a local dense breeze matrix (for test only). */
236245
private[mllib] def toBreeze(): BDM[Double] = {
237246
val localMat = toLocalMatrix()

mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Algo.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ object Algo extends Enumeration {
2929
val Classification, Regression = Value
3030

3131
private[mllib] def fromString(name: String): Algo = name match {
32-
case "classification" => Classification
33-
case "regression" => Regression
32+
case "classification" | "Classification" => Classification
33+
case "regression" | "Regression" => Regression
3434
case _ => throw new IllegalArgumentException(s"Did not recognize Algo name: $name")
3535
}
3636
}

0 commit comments

Comments
 (0)