Skip to content

Commit d25a324

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
2 parents e4a93ac + 9476148 commit d25a324

File tree

24 files changed

+335
-91
lines changed

24 files changed

+335
-91
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -324,9 +324,6 @@ private[worker] class Worker(
324324
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
325325
sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)
326326

327-
case Heartbeat =>
328-
logInfo(s"Received heartbeat from driver ${sender.path}")
329-
330327
case RegisterWorkerFailed(message) =>
331328
if (!registered) {
332329
logError("Worker registration failed: " + message)

core/src/main/scala/org/apache/spark/ui/WebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ private[spark] abstract class WebUI(
9494
}
9595

9696
/** Detach a handler from this UI. */
97-
protected def detachHandler(handler: ServletContextHandler) {
97+
def detachHandler(handler: ServletContextHandler) {
9898
handlers -= handler
9999
serverInfo.foreach { info =>
100100
info.rootHandler.removeHandler(handler)

core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.mockito.MockitoAnnotations;
3636
import org.mockito.invocation.InvocationOnMock;
3737
import org.mockito.stubbing.Answer;
38+
import org.xerial.snappy.buffer.CachedBufferAllocator;
3839
import static org.hamcrest.MatcherAssert.assertThat;
3940
import static org.hamcrest.Matchers.greaterThan;
4041
import static org.hamcrest.Matchers.lessThan;
@@ -96,6 +97,13 @@ public OutputStream apply(OutputStream stream) {
9697
@After
9798
public void tearDown() {
9899
Utils.deleteRecursively(tempDir);
100+
// This call is a workaround for SPARK-7660, a snappy-java bug which is exposed by this test
101+
// suite. Clearing the cached buffer allocator's pool of reusable buffers masks this bug,
102+
// preventing a test failure in JavaAPISuite that would otherwise occur. The underlying bug
103+
// needs to be fixed, but in the meantime this workaround avoids spurious Jenkins failures.
104+
synchronized (CachedBufferAllocator.class) {
105+
CachedBufferAllocator.queueTable.clear();
106+
}
99107
final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory();
100108
if (leakedMemory != 0) {
101109
fail("Test leaked " + leakedMemory + " bytes of managed memory");

mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -345,28 +345,40 @@ private[python] class PythonMLLibAPI extends Serializable {
345345
* Returns a list containing weights, mean and covariance of each mixture component.
346346
*/
347347
def trainGaussianMixture(
348-
data: JavaRDD[Vector],
349-
k: Int,
350-
convergenceTol: Double,
348+
data: JavaRDD[Vector],
349+
k: Int,
350+
convergenceTol: Double,
351351
maxIterations: Int,
352-
seed: java.lang.Long): JList[Object] = {
352+
seed: java.lang.Long,
353+
initialModelWeights: java.util.ArrayList[Double],
354+
initialModelMu: java.util.ArrayList[Vector],
355+
initialModelSigma: java.util.ArrayList[Matrix]): JList[Object] = {
353356
val gmmAlg = new GaussianMixture()
354357
.setK(k)
355358
.setConvergenceTol(convergenceTol)
356359
.setMaxIterations(maxIterations)
357360

361+
if (initialModelWeights != null && initialModelMu != null && initialModelSigma != null) {
362+
val gaussians = initialModelMu.asScala.toSeq.zip(initialModelSigma.asScala.toSeq).map {
363+
case (x, y) => new MultivariateGaussian(x.asInstanceOf[Vector], y.asInstanceOf[Matrix])
364+
}
365+
val initialModel = new GaussianMixtureModel(
366+
initialModelWeights.asScala.toArray, gaussians.toArray)
367+
gmmAlg.setInitialModel(initialModel)
368+
}
369+
358370
if (seed != null) gmmAlg.setSeed(seed)
359371

360372
try {
361373
val model = gmmAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK))
362374
var wt = ArrayBuffer.empty[Double]
363-
var mu = ArrayBuffer.empty[Vector]
375+
var mu = ArrayBuffer.empty[Vector]
364376
var sigma = ArrayBuffer.empty[Matrix]
365377
for (i <- 0 until model.k) {
366378
wt += model.weights(i)
367379
mu += model.gaussians(i).mu
368380
sigma += model.gaussians(i).sigma
369-
}
381+
}
370382
List(Vectors.dense(wt.toArray), mu.toArray, sigma.toArray).map(_.asInstanceOf[Object]).asJava
371383
} finally {
372384
data.rdd.unpersist(blocking = false)

mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,10 @@ import org.apache.spark.sql.{SQLContext, Row}
3838
* are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are
3939
* the respective mean and covariance for each Gaussian distribution i=1..k.
4040
*
41-
* @param weight Weights for each Gaussian distribution in the mixture, where weight(i) is
42-
* the weight for Gaussian i, and weight.sum == 1
43-
* @param mu Means for each Gaussian in the mixture, where mu(i) is the mean for Gaussian i
44-
* @param sigma Covariance maxtrix for each Gaussian in the mixture, where sigma(i) is the
45-
* covariance matrix for Gaussian i
41+
* @param weights Weights for each Gaussian distribution in the mixture, where weights(i) is
42+
* the weight for Gaussian i, and weights.sum == 1
43+
* @param gaussians Array of MultivariateGaussian where gaussians(i) represents
44+
* the Multivariate Gaussian (Normal) Distribution for Gaussian i
4645
*/
4746
@Experimental
4847
class GaussianMixtureModel(

python/pyspark/mllib/clustering.py

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ class GaussianMixtureModel(object):
142142

143143
"""A clustering model derived from the Gaussian Mixture Model method.
144144
145+
>>> from pyspark.mllib.linalg import Vectors, DenseMatrix
145146
>>> clusterdata_1 = sc.parallelize(array([-0.1,-0.05,-0.01,-0.1,
146147
... 0.9,0.8,0.75,0.935,
147148
... -0.83,-0.68,-0.91,-0.76 ]).reshape(6, 2))
@@ -154,24 +155,51 @@ class GaussianMixtureModel(object):
154155
True
155156
>>> labels[4]==labels[5]
156157
True
157-
>>> clusterdata_2 = sc.parallelize(array([-5.1971, -2.5359, -3.8220,
158-
... -5.2211, -5.0602, 4.7118,
159-
... 6.8989, 3.4592, 4.6322,
160-
... 5.7048, 4.6567, 5.5026,
161-
... 4.5605, 5.2043, 6.2734]).reshape(5, 3))
158+
>>> data = array([-5.1971, -2.5359, -3.8220,
159+
... -5.2211, -5.0602, 4.7118,
160+
... 6.8989, 3.4592, 4.6322,
161+
... 5.7048, 4.6567, 5.5026,
162+
... 4.5605, 5.2043, 6.2734])
163+
>>> clusterdata_2 = sc.parallelize(data.reshape(5,3))
162164
>>> model = GaussianMixture.train(clusterdata_2, 2, convergenceTol=0.0001,
163165
... maxIterations=150, seed=10)
164166
>>> labels = model.predict(clusterdata_2).collect()
165167
>>> labels[0]==labels[1]==labels[2]
166168
True
167169
>>> labels[3]==labels[4]
168170
True
171+
>>> clusterdata_3 = sc.parallelize(data.reshape(15, 1))
172+
>>> im = GaussianMixtureModel([0.5, 0.5],
173+
... [MultivariateGaussian(Vectors.dense([-1.0]), DenseMatrix(1, 1, [1.0])),
174+
... MultivariateGaussian(Vectors.dense([1.0]), DenseMatrix(1, 1, [1.0]))])
175+
>>> model = GaussianMixture.train(clusterdata_3, 2, initialModel=im)
169176
"""
170177

171178
def __init__(self, weights, gaussians):
172-
self.weights = weights
173-
self.gaussians = gaussians
174-
self.k = len(self.weights)
179+
self._weights = weights
180+
self._gaussians = gaussians
181+
self._k = len(self._weights)
182+
183+
@property
184+
def weights(self):
185+
"""
186+
Weights for each Gaussian distribution in the mixture, where weights[i] is
187+
the weight for Gaussian i, and weights.sum == 1.
188+
"""
189+
return self._weights
190+
191+
@property
192+
def gaussians(self):
193+
"""
194+
Array of MultivariateGaussian where gaussians[i] represents
195+
the Multivariate Gaussian (Normal) Distribution for Gaussian i.
196+
"""
197+
return self._gaussians
198+
199+
@property
200+
def k(self):
201+
"""Number of gaussians in mixture."""
202+
return self._k
175203

176204
def predict(self, x):
177205
"""
@@ -193,9 +221,9 @@ def predictSoft(self, x):
193221
:return: membership_matrix. RDD of array of double values.
194222
"""
195223
if isinstance(x, RDD):
196-
means, sigmas = zip(*[(g.mu, g.sigma) for g in self.gaussians])
224+
means, sigmas = zip(*[(g.mu, g.sigma) for g in self._gaussians])
197225
membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector),
198-
_convert_to_vector(self.weights), means, sigmas)
226+
_convert_to_vector(self._weights), means, sigmas)
199227
return membership_matrix.map(lambda x: pyarray.array('d', x))
200228

201229

@@ -208,13 +236,24 @@ class GaussianMixture(object):
208236
:param convergenceTol: Threshold value to check the convergence criteria. Defaults to 1e-3
209237
:param maxIterations: Number of iterations. Default to 100
210238
:param seed: Random Seed
239+
:param initialModel: GaussianMixtureModel for initializing learning
211240
"""
212241
@classmethod
213-
def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None):
242+
def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initialModel=None):
214243
"""Train a Gaussian Mixture clustering model."""
215-
weight, mu, sigma = callMLlibFunc("trainGaussianMixture",
216-
rdd.map(_convert_to_vector), k,
217-
convergenceTol, maxIterations, seed)
244+
initialModelWeights = None
245+
initialModelMu = None
246+
initialModelSigma = None
247+
if initialModel is not None:
248+
if initialModel.k != k:
249+
raise Exception("Mismatched cluster count, initialModel.k = %s, however k = %s"
250+
% (initialModel.k, k))
251+
initialModelWeights = initialModel.weights
252+
initialModelMu = [initialModel.gaussians[i].mu for i in range(initialModel.k)]
253+
initialModelSigma = [initialModel.gaussians[i].sigma for i in range(initialModel.k)]
254+
weight, mu, sigma = callMLlibFunc("trainGaussianMixture", rdd.map(_convert_to_vector), k,
255+
convergenceTol, maxIterations, seed, initialModelWeights,
256+
initialModelMu, initialModelSigma)
218257
mvg_obj = [MultivariateGaussian(mu[i], sigma[i]) for i in range(k)]
219258
return GaussianMixtureModel(weight, mvg_obj)
220259

python/pyspark/sql/dataframe.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1511,13 +1511,19 @@ def inSet(self, *cols):
15111511
isNull = _unary_op("isNull", "True if the current expression is null.")
15121512
isNotNull = _unary_op("isNotNull", "True if the current expression is not null.")
15131513

1514-
def alias(self, alias):
1515-
"""Return a alias for this column
1514+
def alias(self, *alias):
1515+
"""Returns this column aliased with a new name or names (in the case of expressions that
1516+
return more than one column, such as explode).
15161517
15171518
>>> df.select(df.age.alias("age2")).collect()
15181519
[Row(age2=2), Row(age2=5)]
15191520
"""
1520-
return Column(getattr(self._jc, "as")(alias))
1521+
1522+
if len(alias) == 1:
1523+
return Column(getattr(self._jc, "as")(alias[0]))
1524+
else:
1525+
sc = SparkContext._active_spark_context
1526+
return Column(getattr(self._jc, "as")(_to_seq(sc, list(alias))))
15211527

15221528
@ignore_unicode_prefix
15231529
def cast(self, dataType):

python/pyspark/sql/functions.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,26 @@ def approxCountDistinct(col, rsd=None):
169169
return Column(jc)
170170

171171

172+
def explode(col):
173+
"""Returns a new row for each element in the given array or map.
174+
175+
>>> from pyspark.sql import Row
176+
>>> eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
177+
>>> eDF.select(explode(eDF.intlist).alias("anInt")).collect()
178+
[Row(anInt=1), Row(anInt=2), Row(anInt=3)]
179+
180+
>>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
181+
+---+-----+
182+
|key|value|
183+
+---+-----+
184+
| a| b|
185+
+---+-----+
186+
"""
187+
sc = SparkContext._active_spark_context
188+
jc = sc._jvm.functions.explode(_to_java_column(col))
189+
return Column(jc)
190+
191+
172192
def coalesce(*cols):
173193
"""Returns the first column that is not null.
174194

python/pyspark/sql/tests.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,21 @@ def tearDownClass(cls):
117117
ReusedPySparkTestCase.tearDownClass()
118118
shutil.rmtree(cls.tempdir.name, ignore_errors=True)
119119

120+
def test_explode(self):
121+
from pyspark.sql.functions import explode
122+
d = [Row(a=1, intlist=[1, 2, 3], mapfield={"a": "b"})]
123+
rdd = self.sc.parallelize(d)
124+
data = self.sqlCtx.createDataFrame(rdd)
125+
126+
result = data.select(explode(data.intlist).alias("a")).select("a").collect()
127+
self.assertEqual(result[0][0], 1)
128+
self.assertEqual(result[1][0], 2)
129+
self.assertEqual(result[2][0], 3)
130+
131+
result = data.select(explode(data.mapfield).alias("a", "b")).select("a", "b").collect()
132+
self.assertEqual(result[0][0], "a")
133+
self.assertEqual(result[0][1], "b")
134+
120135
def test_udf_with_callable(self):
121136
d = [Row(number=i, squared=i**2) for i in range(10)]
122137
rdd = self.sc.parallelize(d)

0 commit comments

Comments
 (0)