Skip to content

Commit 73cbbb1

Browse files
committed
Merge branch 'master' into expand-nest-join
2 parents 4b18418 + 22730ad commit 73cbbb1

File tree

55 files changed

+1910
-1104
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1910
-1104
lines changed

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
148148
try {
149149
Snappy.getNativeLibraryVersion
150150
} catch {
151-
case e: Error => throw new IllegalArgumentException
151+
case e: Error => throw new IllegalArgumentException(e)
152152
}
153153

154154
override def compressedOutputStream(s: OutputStream): OutputStream = {

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,9 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
164164

165165
private def doStop(): Unit = {
166166
// Only perform cleanup if an external service is not serving our shuffle files.
167-
if (!blockManager.externalShuffleServiceEnabled || blockManager.blockManagerId.isDriver) {
167+
// Also blockManagerId could be null if block manager is not initialized properly.
168+
if (!blockManager.externalShuffleServiceEnabled ||
169+
(blockManager.blockManagerId != null && blockManager.blockManagerId.isDriver)) {
168170
localDirs.foreach { localDir =>
169171
if (localDir.isDirectory() && localDir.exists()) {
170172
try {

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,8 @@ private[spark] class ExternalSorter[K, V, C](
297297
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
298298
while (it.hasNext) {
299299
val partitionId = it.nextPartition()
300+
require(partitionId >= 0 && partitionId < numPartitions,
301+
s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
300302
it.writeNext(writer)
301303
elementsPerPartition(partitionId) += 1
302304
objectsWritten += 1

core/src/test/java/org/apache/spark/JavaAPISuite.java

Lines changed: 224 additions & 227 deletions
Large diffs are not rendered by default.

core/src/test/scala/org/apache/spark/ThreadingSuite.scala

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -119,23 +119,30 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
119119
val nums = sc.parallelize(1 to 2, 2)
120120
val sem = new Semaphore(0)
121121
ThreadingSuiteState.clear()
122+
var throwable: Option[Throwable] = None
122123
for (i <- 0 until 2) {
123124
new Thread {
124125
override def run() {
125-
val ans = nums.map(number => {
126-
val running = ThreadingSuiteState.runningThreads
127-
running.getAndIncrement()
128-
val time = System.currentTimeMillis()
129-
while (running.get() != 4 && System.currentTimeMillis() < time + 1000) {
130-
Thread.sleep(100)
131-
}
132-
if (running.get() != 4) {
133-
ThreadingSuiteState.failed.set(true)
134-
}
135-
number
136-
}).collect()
137-
assert(ans.toList === List(1, 2))
138-
sem.release()
126+
try {
127+
val ans = nums.map(number => {
128+
val running = ThreadingSuiteState.runningThreads
129+
running.getAndIncrement()
130+
val time = System.currentTimeMillis()
131+
while (running.get() != 4 && System.currentTimeMillis() < time + 1000) {
132+
Thread.sleep(100)
133+
}
134+
if (running.get() != 4) {
135+
ThreadingSuiteState.failed.set(true)
136+
}
137+
number
138+
}).collect()
139+
assert(ans.toList === List(1, 2))
140+
} catch {
141+
case t: Throwable =>
142+
throwable = Some(t)
143+
} finally {
144+
sem.release()
145+
}
139146
}
140147
}.start()
141148
}
@@ -145,18 +152,25 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
145152
ThreadingSuiteState.runningThreads.get() + "); failing test")
146153
fail("One or more threads didn't see runningThreads = 4")
147154
}
155+
throwable.foreach { t => throw t }
148156
}
149157

150158
test("set local properties in different thread") {
151159
sc = new SparkContext("local", "test")
152160
val sem = new Semaphore(0)
153-
161+
var throwable: Option[Throwable] = None
154162
val threads = (1 to 5).map { i =>
155163
new Thread() {
156164
override def run() {
157-
sc.setLocalProperty("test", i.toString)
158-
assert(sc.getLocalProperty("test") === i.toString)
159-
sem.release()
165+
try {
166+
sc.setLocalProperty("test", i.toString)
167+
assert(sc.getLocalProperty("test") === i.toString)
168+
} catch {
169+
case t: Throwable =>
170+
throwable = Some(t)
171+
} finally {
172+
sem.release()
173+
}
160174
}
161175
}
162176
}
@@ -165,20 +179,27 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
165179

166180
sem.acquire(5)
167181
assert(sc.getLocalProperty("test") === null)
182+
throwable.foreach { t => throw t }
168183
}
169184

170185
test("set and get local properties in parent-children thread") {
171186
sc = new SparkContext("local", "test")
172187
sc.setLocalProperty("test", "parent")
173188
val sem = new Semaphore(0)
174-
189+
var throwable: Option[Throwable] = None
175190
val threads = (1 to 5).map { i =>
176191
new Thread() {
177192
override def run() {
178-
assert(sc.getLocalProperty("test") === "parent")
179-
sc.setLocalProperty("test", i.toString)
180-
assert(sc.getLocalProperty("test") === i.toString)
181-
sem.release()
193+
try {
194+
assert(sc.getLocalProperty("test") === "parent")
195+
sc.setLocalProperty("test", i.toString)
196+
assert(sc.getLocalProperty("test") === i.toString)
197+
} catch {
198+
case t: Throwable =>
199+
throwable = Some(t)
200+
} finally {
201+
sem.release()
202+
}
182203
}
183204
}
184205
}
@@ -188,6 +209,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
188209
sem.acquire(5)
189210
assert(sc.getLocalProperty("test") === "parent")
190211
assert(sc.getLocalProperty("Foo") === null)
212+
throwable.foreach { t => throw t }
191213
}
192214

193215
test("mutations to local properties should not affect submitted jobs (SPARK-6629)") {

docs/ml-ensembles.md

Lines changed: 19 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,9 @@ import org.apache.spark.ml.classification.RandomForestClassifier
121121
import org.apache.spark.ml.classification.RandomForestClassificationModel
122122
import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorIndexer}
123123
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
124-
import org.apache.spark.mllib.util.MLUtils
125124

126125
// Load and parse the data file, converting it to a DataFrame.
127-
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
126+
val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
128127

129128
// Index labels, adding metadata to the label column.
130129
// Fit on whole dataset to include all labels in index.
@@ -193,14 +192,11 @@ import org.apache.spark.ml.classification.RandomForestClassifier;
193192
import org.apache.spark.ml.classification.RandomForestClassificationModel;
194193
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
195194
import org.apache.spark.ml.feature.*;
196-
import org.apache.spark.mllib.regression.LabeledPoint;
197-
import org.apache.spark.mllib.util.MLUtils;
198-
import org.apache.spark.rdd.RDD;
199195
import org.apache.spark.sql.DataFrame;
200196

201197
// Load and parse the data file, converting it to a DataFrame.
202-
RDD<LabeledPoint> rdd = MLUtils.loadLibSVMFile(sc.sc(), "data/mllib/sample_libsvm_data.txt");
203-
DataFrame data = jsql.createDataFrame(rdd, LabeledPoint.class);
198+
DataFrame data = sqlContext.read.format("libsvm")
199+
.load("data/mllib/sample_libsvm_data.txt");
204200

205201
// Index labels, adding metadata to the label column.
206202
// Fit on whole dataset to include all labels in index.
@@ -268,10 +264,9 @@ from pyspark.ml import Pipeline
268264
from pyspark.ml.classification import RandomForestClassifier
269265
from pyspark.ml.feature import StringIndexer, VectorIndexer
270266
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
271-
from pyspark.mllib.util import MLUtils
272267

273268
# Load and parse the data file, converting it to a DataFrame.
274-
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
269+
data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
275270

276271
# Index labels, adding metadata to the label column.
277272
# Fit on whole dataset to include all labels in index.
@@ -327,10 +322,9 @@ import org.apache.spark.ml.regression.RandomForestRegressor
327322
import org.apache.spark.ml.regression.RandomForestRegressionModel
328323
import org.apache.spark.ml.feature.VectorIndexer
329324
import org.apache.spark.ml.evaluation.RegressionEvaluator
330-
import org.apache.spark.mllib.util.MLUtils
331325

332326
// Load and parse the data file, converting it to a DataFrame.
333-
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
327+
val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
334328

335329
// Automatically identify categorical features, and index them.
336330
// Set maxCategories so features with > 4 distinct values are treated as continuous.
@@ -387,14 +381,11 @@ import org.apache.spark.ml.feature.VectorIndexer;
387381
import org.apache.spark.ml.feature.VectorIndexerModel;
388382
import org.apache.spark.ml.regression.RandomForestRegressionModel;
389383
import org.apache.spark.ml.regression.RandomForestRegressor;
390-
import org.apache.spark.mllib.regression.LabeledPoint;
391-
import org.apache.spark.mllib.util.MLUtils;
392-
import org.apache.spark.rdd.RDD;
393384
import org.apache.spark.sql.DataFrame;
394385

395386
// Load and parse the data file, converting it to a DataFrame.
396-
RDD<LabeledPoint> rdd = MLUtils.loadLibSVMFile(sc.sc(), "data/mllib/sample_libsvm_data.txt");
397-
DataFrame data = jsql.createDataFrame(rdd, LabeledPoint.class);
387+
DataFrame data = sqlContext.read.format("libsvm")
388+
.load("data/mllib/sample_libsvm_data.txt");
398389

399390
// Automatically identify categorical features, and index them.
400391
// Set maxCategories so features with > 4 distinct values are treated as continuous.
@@ -450,10 +441,9 @@ from pyspark.ml import Pipeline
450441
from pyspark.ml.regression import RandomForestRegressor
451442
from pyspark.ml.feature import VectorIndexer
452443
from pyspark.ml.evaluation import RegressionEvaluator
453-
from pyspark.mllib.util import MLUtils
454444

455445
# Load and parse the data file, converting it to a DataFrame.
456-
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
446+
data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
457447

458448
# Automatically identify categorical features, and index them.
459449
# Set maxCategories so features with > 4 distinct values are treated as continuous.
@@ -576,10 +566,9 @@ import org.apache.spark.ml.classification.GBTClassifier
576566
import org.apache.spark.ml.classification.GBTClassificationModel
577567
import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorIndexer}
578568
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
579-
import org.apache.spark.mllib.util.MLUtils
580569

581570
// Load and parse the data file, converting it to a DataFrame.
582-
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
571+
val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
583572

584573
// Index labels, adding metadata to the label column.
585574
// Fit on whole dataset to include all labels in index.
@@ -648,14 +637,10 @@ import org.apache.spark.ml.classification.GBTClassifier;
648637
import org.apache.spark.ml.classification.GBTClassificationModel;
649638
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
650639
import org.apache.spark.ml.feature.*;
651-
import org.apache.spark.mllib.regression.LabeledPoint;
652-
import org.apache.spark.mllib.util.MLUtils;
653-
import org.apache.spark.rdd.RDD;
654640
import org.apache.spark.sql.DataFrame;
655641

656642
// Load and parse the data file, converting it to a DataFrame.
657-
RDD<LabeledPoint> rdd = MLUtils.loadLibSVMFile(sc.sc(), "data/mllib/sample_libsvm_data.txt");
658-
DataFrame data = jsql.createDataFrame(rdd, LabeledPoint.class);
643+
DataFrame data sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt");
659644

660645
// Index labels, adding metadata to the label column.
661646
// Fit on whole dataset to include all labels in index.
@@ -724,10 +709,9 @@ from pyspark.ml import Pipeline
724709
from pyspark.ml.classification import GBTClassifier
725710
from pyspark.ml.feature import StringIndexer, VectorIndexer
726711
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
727-
from pyspark.mllib.util import MLUtils
728712

729713
# Load and parse the data file, converting it to a DataFrame.
730-
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
714+
data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
731715

732716
# Index labels, adding metadata to the label column.
733717
# Fit on whole dataset to include all labels in index.
@@ -783,10 +767,9 @@ import org.apache.spark.ml.regression.GBTRegressor
783767
import org.apache.spark.ml.regression.GBTRegressionModel
784768
import org.apache.spark.ml.feature.VectorIndexer
785769
import org.apache.spark.ml.evaluation.RegressionEvaluator
786-
import org.apache.spark.mllib.util.MLUtils
787770

788771
// Load and parse the data file, converting it to a DataFrame.
789-
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
772+
val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
790773

791774
// Automatically identify categorical features, and index them.
792775
// Set maxCategories so features with > 4 distinct values are treated as continuous.
@@ -844,14 +827,10 @@ import org.apache.spark.ml.feature.VectorIndexer;
844827
import org.apache.spark.ml.feature.VectorIndexerModel;
845828
import org.apache.spark.ml.regression.GBTRegressionModel;
846829
import org.apache.spark.ml.regression.GBTRegressor;
847-
import org.apache.spark.mllib.regression.LabeledPoint;
848-
import org.apache.spark.mllib.util.MLUtils;
849-
import org.apache.spark.rdd.RDD;
850830
import org.apache.spark.sql.DataFrame;
851831

852832
// Load and parse the data file, converting it to a DataFrame.
853-
RDD<LabeledPoint> rdd = MLUtils.loadLibSVMFile(sc.sc(), "data/mllib/sample_libsvm_data.txt");
854-
DataFrame data = jsql.createDataFrame(rdd, LabeledPoint.class);
833+
DataFrame data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt");
855834

856835
// Automatically identify categorical features, and index them.
857836
// Set maxCategories so features with > 4 distinct values are treated as continuous.
@@ -908,10 +887,9 @@ from pyspark.ml import Pipeline
908887
from pyspark.ml.regression import GBTRegressor
909888
from pyspark.ml.feature import VectorIndexer
910889
from pyspark.ml.evaluation import RegressionEvaluator
911-
from pyspark.mllib.util import MLUtils
912890

913891
# Load and parse the data file, converting it to a DataFrame.
914-
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
892+
data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
915893

916894
# Automatically identify categorical features, and index them.
917895
# Set maxCategories so features with > 4 distinct values are treated as continuous.
@@ -970,15 +948,14 @@ Refer to the [Scala API docs](api/scala/index.html#org.apache.spark.ml.classifie
970948
{% highlight scala %}
971949
import org.apache.spark.ml.classification.{LogisticRegression, OneVsRest}
972950
import org.apache.spark.mllib.evaluation.MulticlassMetrics
973-
import org.apache.spark.mllib.util.MLUtils
974951
import org.apache.spark.sql.{Row, SQLContext}
975952

976953
val sqlContext = new SQLContext(sc)
977954

978955
// parse data into dataframe
979-
val data = MLUtils.loadLibSVMFile(sc,
980-
"data/mllib/sample_multiclass_classification_data.txt")
981-
val Array(train, test) = data.toDF().randomSplit(Array(0.7, 0.3))
956+
val data = sqlContext.read.format("libsvm")
957+
.load("data/mllib/sample_multiclass_classification_data.txt")
958+
val Array(train, test) = data.randomSplit(Array(0.7, 0.3))
982959

983960
// instantiate multiclass learner and train
984961
val ovr = new OneVsRest().setClassifier(new LogisticRegression)
@@ -1016,20 +993,16 @@ import org.apache.spark.ml.classification.OneVsRest;
1016993
import org.apache.spark.ml.classification.OneVsRestModel;
1017994
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
1018995
import org.apache.spark.mllib.linalg.Matrix;
1019-
import org.apache.spark.mllib.regression.LabeledPoint;
1020-
import org.apache.spark.mllib.util.MLUtils;
1021-
import org.apache.spark.rdd.RDD;
1022996
import org.apache.spark.sql.DataFrame;
1023997
import org.apache.spark.sql.SQLContext;
1024998

1025999
SparkConf conf = new SparkConf().setAppName("JavaOneVsRestExample");
10261000
JavaSparkContext jsc = new JavaSparkContext(conf);
10271001
SQLContext jsql = new SQLContext(jsc);
10281002

1029-
RDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(),
1030-
"data/mllib/sample_multiclass_classification_data.txt");
1003+
DataFrame dataFrame = sqlContext.read.format("libsvm")
1004+
.load("data/mllib/sample_multiclass_classification_data.txt");
10311005

1032-
DataFrame dataFrame = jsql.createDataFrame(data, LabeledPoint.class);
10331006
DataFrame[] splits = dataFrame.randomSplit(new double[] {0.7, 0.3}, 12345);
10341007
DataFrame train = splits[0];
10351008
DataFrame test = splits[1];

0 commit comments

Comments
 (0)