Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Arrays;
import java.util.List;

import scala.collection.mutable.WrappedArray;
import scala.collection.mutable.Seq;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WrappedArray is gone in 2.13; this should be an equivalent superclass


import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
Expand Down Expand Up @@ -69,7 +69,7 @@ public static void main(String[] args) {
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);

spark.udf().register(
"countTokens", (WrappedArray<?> words) -> words.size(), DataTypes.IntegerType);
"countTokens", (Seq<?> words) -> words.size(), DataTypes.IntegerType);

Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object SparkKMeans {
while(tempDist > convergeDist) {
val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))

val pointStats = closest.reduceByKey{case ((p1, c1), (p2, c2)) => (p1 + p2, c1 + c2)}
val pointStats = closest.reduceByKey(mergeResults)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure why, but a few calls to reduceByKey didn't like the existing syntax in 2.13. I had to break out a typed method. missing parameter type for expanded function


val newPoints = pointStats.map {pair =>
(pair._1, pair._2._1 * (1.0 / pair._2._2))}.collectAsMap()
Expand All @@ -102,5 +102,11 @@ object SparkKMeans {
kPoints.foreach(println)
spark.stop()
}

private def mergeResults(
a: (Vector[Double], Int),
b: (Vector[Double], Int)): (Vector[Double], Int) = {
(a._1 + b._1, a._2 + b._2)
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object SchemaConverters {
StructField(f.name, schemaType.dataType, schemaType.nullable)
}

SchemaType(StructType(fields), nullable = false)
SchemaType(StructType(fields.toSeq), nullable = false)

case ARRAY =>
val schemaType = toSqlTypeHelper(avroSchema.getElementType, existingRecordNames)
Expand Down Expand Up @@ -126,7 +126,7 @@ object SchemaConverters {
StructField(s"member$i", schemaType.dataType, nullable = true)
}

SchemaType(StructType(fields), nullable = false)
SchemaType(StructType(fields.toSeq), nullable = false)
}

case other => throw new IncompatibleSchemaException(s"Unsupported type $other")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ private[kafka010] class KafkaOffsetReader(
}
})
}
incorrectOffsets
incorrectOffsets.toSeq
}

// Retry to fetch latest offsets when detecting incorrect offsets. We don't use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1540,8 +1540,8 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
makeSureGetOffsetCalled,
Execute { q =>
// wait to reach the last offset in every partition
q.awaitOffset(
0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L)), streamingTimeout.toMillis)
q.awaitOffset(0,
KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L).toMap), streamingTimeout.toMillis)
},
CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
StopStream,
Expand Down
2 changes: 1 addition & 1 deletion mllib/src/main/scala/org/apache/spark/ml/Estimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage {
* @return fitted models, matching the input parameter maps
*/
@Since("2.0.0")
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
def fit(dataset: Dataset[_], paramMaps: Seq[ParamMap]): Seq[M] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this fixes the weird compile error (Arrays + generic types are stricter in Scala 2.13) though I don't directly see what it has to do with type M. Still, this is an API change I think MiMa will fail and I think I need another workaround for that. This is an obscure method that isn't even called by tests, AFAICT, so not sure it even has coverage.

paramMaps.map(fit(dataset, _))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,12 +492,7 @@ class GaussianMixture @Since("2.0.0") (
(i, (agg.means(i), agg.covs(i), agg.weights(i), ws))
}
} else Iterator.empty
}.reduceByKey { case ((mean1, cov1, w1, ws1), (mean2, cov2, w2, ws2)) =>
// update the weights, means and covariances for i-th distributions
BLAS.axpy(1.0, mean2, mean1)
BLAS.axpy(1.0, cov2, cov1)
(mean1, cov1, w1 + w2, ws1 + ws2)
}.mapValues { case (mean, cov, w, ws) =>
}.reduceByKey(GaussianMixture.mergeWeightsMeans).mapValues { case (mean, cov, w, ws) =>
// Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature)
GaussianMixture.updateWeightsAndGaussians(mean, cov, w, ws)
Expand Down Expand Up @@ -560,12 +555,7 @@ class GaussianMixture @Since("2.0.0") (
agg.meanIter.zip(agg.covIter).zipWithIndex
.map { case ((mean, cov), i) => (i, (mean, cov, agg.weights(i), ws)) }
} else Iterator.empty
}.reduceByKey { case ((mean1, cov1, w1, ws1), (mean2, cov2, w2, ws2)) =>
// update the weights, means and covariances for i-th distributions
BLAS.axpy(1.0, mean2, mean1)
BLAS.axpy(1.0, cov2, cov1)
(mean1, cov1, w1 + w2, ws1 + ws2)
}.mapValues { case (mean, cov, w, ws) =>
}.reduceByKey(GaussianMixture.mergeWeightsMeans).mapValues { case (mean, cov, w, ws) =>
// Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature)
GaussianMixture.updateWeightsAndGaussians(mean, cov, w, ws)
Expand Down Expand Up @@ -624,8 +614,8 @@ class GaussianMixture @Since("2.0.0") (
val gaussians = Array.tabulate(numClusters) { i =>
val start = i * numSamples
val end = start + numSamples
val sampleSlice = samples.view(start, end)
val weightSlice = sampleWeights.view(start, end)
val sampleSlice = samples.view.slice(start, end)
val weightSlice = sampleWeights.view.slice(start, end)
val localWeightSum = weightSlice.sum
weights(i) = localWeightSum / weightSum

Expand Down Expand Up @@ -691,6 +681,16 @@ object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
new DenseMatrix(n, n, symmetricValues)
}

private def mergeWeightsMeans(
a: (DenseVector, DenseVector, Double, Double),
b: (DenseVector, DenseVector, Double, Double)): (DenseVector, DenseVector, Double, Double) =
{
// update the weights, means and covariances for i-th distributions
BLAS.axpy(1.0, b._1, a._1)
BLAS.axpy(1.0, b._2, a._2)
(a._1, a._2, a._3 + b._3, a._4 + b._4)
}

/**
* Update the weight, mean and covariance of gaussian distribution.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ object RobustScaler extends DefaultParamsReadable[RobustScaler] {
}
Iterator.tabulate(numFeatures)(i => (i, summaries(i).compress))
} else Iterator.empty
}.reduceByKey { case (s1, s2) => s1.merge(s2) }
}.reduceByKey { (s1, s2) => s1.merge(s2) }
} else {
val scale = math.max(math.ceil(math.sqrt(vectors.getNumPartitions)).toInt, 2)
vectors.mapPartitionsWithIndex { case (pid, iter) =>
Expand All @@ -214,7 +214,7 @@ object RobustScaler extends DefaultParamsReadable[RobustScaler] {
seqOp = (s, v) => s.insert(v),
combOp = (s1, s2) => s1.compress.merge(s2.compress)
).map { case ((_, i), s) => (i, s)
}.reduceByKey { case (s1, s2) => s1.compress.merge(s2.compress) }
}.reduceByKey { (s1, s2) => s1.compress.merge(s2.compress) }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ class Word2VecModel private[ml] (
val outputSchema = transformSchema(dataset.schema, logging = true)
val vectors = wordVectors.getVectors
.mapValues(vv => Vectors.dense(vv.map(_.toDouble)))
.map(identity) // mapValues doesn't return a serializable map (SI-7005)
.map(identity).toMap // mapValues doesn't return a serializable map (SI-7005)
val bVectors = dataset.sparkSession.sparkContext.broadcast(vectors)
val d = $(vectorSize)
val emptyVec = Vectors.sparse(d, Array.emptyIntArray, Array.emptyDoubleArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ final class ParamMap private[ml] (private val map: mutable.Map[Param[Any], Any])

/** Put param pairs with a `java.util.List` of values for Python. */
private[ml] def put(paramPairs: JList[ParamPair[_]]): this.type = {
put(paramPairs.asScala: _*)
put(paramPairs.asScala.toSeq: _*)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1223,28 +1223,28 @@ private[python] class PythonMLLibAPI extends Serializable {
* Python-friendly version of [[MLUtils.convertVectorColumnsToML()]].
*/
def convertVectorColumnsToML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = {
MLUtils.convertVectorColumnsToML(dataset, cols.asScala: _*)
MLUtils.convertVectorColumnsToML(dataset, cols.asScala.toSeq: _*)
}

/**
* Python-friendly version of [[MLUtils.convertVectorColumnsFromML()]]
*/
def convertVectorColumnsFromML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = {
MLUtils.convertVectorColumnsFromML(dataset, cols.asScala: _*)
MLUtils.convertVectorColumnsFromML(dataset, cols.asScala.toSeq: _*)
}

/**
* Python-friendly version of [[MLUtils.convertMatrixColumnsToML()]].
*/
def convertMatrixColumnsToML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = {
MLUtils.convertMatrixColumnsToML(dataset, cols.asScala: _*)
MLUtils.convertMatrixColumnsToML(dataset, cols.asScala.toSeq: _*)
}

/**
* Python-friendly version of [[MLUtils.convertMatrixColumnsFromML()]]
*/
def convertMatrixColumnsFromML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = {
MLUtils.convertMatrixColumnsFromML(dataset, cols.asScala: _*)
MLUtils.convertMatrixColumnsFromML(dataset, cols.asScala.toSeq: _*)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class BisectingKMeans private (
divisibleIndices.contains(parentIndex(index))
}
newClusters = summarize(d, newAssignments, dMeasure)
newClusterCenters = newClusters.mapValues(_.center).map(identity)
newClusterCenters = newClusters.mapValues(_.center).map(identity).toMap
}
if (preIndices != null) {
preIndices.unpersist()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.mllib.clustering

import scala.collection.mutable.IndexedSeq

import breeze.linalg.{diag, DenseMatrix => BreezeMatrix, DenseVector => BDV, Vector => BV}

import org.apache.spark.annotation.Since
Expand Down Expand Up @@ -189,8 +187,8 @@ class GaussianMixture private (
case None =>
val samples = breezeData.takeSample(withReplacement = true, k * nSamples, seed)
(Array.fill(k)(1.0 / k), Array.tabulate(k) { i =>
val slice = samples.view(i * nSamples, (i + 1) * nSamples)
new MultivariateGaussian(vectorMean(slice), initCovariance(slice))
val slice = samples.view.slice(i * nSamples, (i + 1) * nSamples)
new MultivariateGaussian(vectorMean(slice.toSeq), initCovariance(slice.toSeq))
})
}

Expand Down Expand Up @@ -259,7 +257,7 @@ class GaussianMixture private (
}

/** Average of dense breeze vectors */
private def vectorMean(x: IndexedSeq[BV[Double]]): BDV[Double] = {
private def vectorMean(x: Seq[BV[Double]]): BDV[Double] = {
val v = BDV.zeros[Double](x(0).length)
x.foreach(xi => v += xi)
v / x.length.toDouble
Expand All @@ -269,7 +267,7 @@ class GaussianMixture private (
* Construct matrix where diagonal entries are element-wise
* variance of input vectors (computes biased variance)
*/
private def initCovariance(x: IndexedSeq[BV[Double]]): BreezeMatrix[Double] = {
private def initCovariance(x: Seq[BV[Double]]): BreezeMatrix[Double] = {
val mu = vectorMean(x)
val ss = BDV.zeros[Double](x(0).length)
x.foreach { xi =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ object PrefixSpan extends Logging {
largePrefixes = newLargePrefixes
}

var freqPatterns = sc.parallelize(localFreqPatterns, 1)
var freqPatterns = sc.parallelize(localFreqPatterns.toSeq, 1)

val numSmallPrefixes = smallPrefixes.size
logInfo(s"number of small prefixes for local processing: $numSmallPrefixes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int
}
if (sizes(i) + tail.length >= offset + windowSize) {
partitions +=
new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail, offset)
new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toSeq, offset)
partitionIndex += 1
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private[spark] class EntropyAggregator(numClasses: Int)
* @param offset Start index of stats for this (node, feature, bin).
*/
def getCalculator(allStats: Array[Double], offset: Int): EntropyCalculator = {
new EntropyCalculator(allStats.view(offset, offset + statsSize - 1).toArray,
new EntropyCalculator(allStats.view.slice(offset, offset + statsSize - 1).toArray,
allStats(offset + statsSize - 1).toLong)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private[spark] class GiniAggregator(numClasses: Int)
* @param offset Start index of stats for this (node, feature, bin).
*/
def getCalculator(allStats: Array[Double], offset: Int): GiniCalculator = {
new GiniCalculator(allStats.view(offset, offset + statsSize - 1).toArray,
new GiniCalculator(allStats.view.slice(offset, offset + statsSize - 1).toArray,
allStats(offset + statsSize - 1).toLong)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private[spark] class VarianceAggregator()
* @param offset Start index of stats for this (node, feature, bin).
*/
def getCalculator(allStats: Array[Double], offset: Int): VarianceCalculator = {
new VarianceCalculator(allStats.view(offset, offset + statsSize - 1).toArray,
new VarianceCalculator(allStats.view.slice(offset, offset + statsSize - 1).toArray,
allStats(offset + statsSize - 1).toLong)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ private[mllib] object NumericParser {
while (parsing && tokenizer.hasMoreTokens()) {
token = tokenizer.nextToken()
if (token == "(") {
items.append(parseTuple(tokenizer))
items += parseTuple(tokenizer)
allowComma = true
} else if (token == "[") {
items.append(parseArray(tokenizer))
items += parseArray(tokenizer)
allowComma = true
} else if (token == ",") {
if (allowComma) {
Expand All @@ -102,14 +102,14 @@ private[mllib] object NumericParser {
// ignore whitespaces between delim chars, e.g. ", ["
} else {
// expecting a number
items.append(parseDouble(token))
items += parseDouble(token)
allowComma = true
}
}
if (parsing) {
throw new SparkException(s"A tuple must end with ')'.")
}
items
items.toSeq
}

private def parseDouble(s: String): Double = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {

model1.clusterCenters.forall(Vectors.norm(_, 2) == 1.0)

val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Array(
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Seq(
(Vectors.dense(1.0, 1.0), 2.0), (Vectors.dense(10.0, 10.0), 2.0),
(Vectors.dense(1.0, 0.5), 2.0), (Vectors.dense(10.0, 4.4), 2.0),
(Vectors.dense(-1.0, 1.0), 2.0), (Vectors.dense(-100.0, 90.0), 2.0))))
Expand Down Expand Up @@ -286,7 +286,7 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {

model1.clusterCenters.forall(Vectors.norm(_, 2) == 1.0)

val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Array(
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Seq(
(Vectors.dense(1.0, 1.0), 1.0), (Vectors.dense(10.0, 10.0), 2.0),
(Vectors.dense(1.0, 0.5), 2.0), (Vectors.dense(10.0, 4.4), 3.0),
(Vectors.dense(-1.0, 1.0), 3.0), (Vectors.dense(-100.0, 90.0), 4.0))))
Expand Down
Loading