From 8b9f4a8c77141c8d8e47c909fda644d568f9452f Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 14 Jul 2020 15:30:26 -0500 Subject: [PATCH 1/3] Update rest of default modules (Hive, ML, etc) for Scala 2.13 compilation --- .../examples/ml/JavaTokenizerExample.java | 4 +-- .../apache/spark/examples/SparkKMeans.scala | 7 ++++- .../spark/sql/avro/SchemaConverters.scala | 4 +-- .../sql/kafka010/KafkaOffsetReader.scala | 2 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 4 +-- .../scala/org/apache/spark/ml/Estimator.scala | 3 +- .../scala/org/apache/spark/ml/Predictor.scala | 4 ++- .../spark/ml/classification/Classifier.scala | 4 ++- .../ProbabilisticClassifier.scala | 4 ++- .../spark/ml/clustering/GaussianMixture.scala | 28 +++++++++---------- .../org/apache/spark/ml/feature/LSH.scala | 3 +- .../spark/ml/feature/RobustScaler.scala | 4 +-- .../apache/spark/ml/feature/Selector.scala | 3 +- .../apache/spark/ml/feature/Word2Vec.scala | 2 +- .../org/apache/spark/ml/param/params.scala | 2 +- .../spark/ml/regression/Regressor.scala | 4 ++- .../mllib/api/python/PythonMLLibAPI.scala | 8 +++--- .../mllib/clustering/BisectingKMeans.scala | 2 +- .../mllib/clustering/GaussianMixture.scala | 10 +++---- .../apache/spark/mllib/fpm/PrefixSpan.scala | 2 +- .../apache/spark/mllib/rdd/SlidingRDD.scala | 2 +- .../spark/mllib/tree/impurity/Entropy.scala | 2 +- .../spark/mllib/tree/impurity/Gini.scala | 2 +- .../spark/mllib/tree/impurity/Variance.scala | 2 +- .../spark/mllib/util/NumericParser.scala | 8 +++--- .../ml/clustering/BisectingKMeansSuite.scala | 4 +-- .../spark/ml/clustering/KMeansSuite.scala | 12 ++++---- .../evaluation/ClusteringEvaluatorSuite.scala | 2 +- .../spark/ml/feature/NormalizerSuite.scala | 12 ++++---- .../spark/ml/recommendation/ALSSuite.scala | 12 ++++---- .../spark/sql/hive/HiveExternalCatalog.scala | 8 +++--- .../spark/sql/hive/HiveInspectors.scala | 4 +-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 4 +-- .../org/apache/spark/sql/hive/HiveUtils.scala | 4 +-- .../sql/hive/client/HiveClientImpl.scala | 24 ++++++++-------- .../spark/sql/hive/client/HiveShim.scala | 10 +++---- .../sql/hive/execution/HiveOptions.scala | 2 +- .../hive/execution/HiveTableScanExec.scala | 2 +- .../org/apache/spark/sql/hive/hiveUDFs.scala | 4 +-- .../sql/hive/HiveShowCreateTableSuite.scala | 2 +- .../spark/sql/hive/StatisticsSuite.scala | 2 +- .../sql/hive/execution/HiveDDLSuite.scala | 2 +- 42 files changed, 122 insertions(+), 108 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java index a0979aa2d24e..3b5d8e6d555e 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java @@ -23,7 +23,7 @@ import java.util.Arrays; import java.util.List; -import scala.collection.mutable.WrappedArray; +import scala.collection.mutable.Seq; import org.apache.spark.ml.feature.RegexTokenizer; import org.apache.spark.ml.feature.Tokenizer; @@ -69,7 +69,7 @@ public static void main(String[] args) { .setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false); spark.udf().register( - "countTokens", (WrappedArray words) -> words.size(), DataTypes.IntegerType); + "countTokens", (Seq words) -> words.size(), DataTypes.IntegerType); Dataset tokenized = tokenizer.transform(sentenceDataFrame); tokenized.select("sentence", "words") diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala index ec9b44ce6e3b..5982fb67ee19 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala @@ -82,7 +82,7 @@ object SparkKMeans { while(tempDist > convergeDist) { val closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) - val pointStats = closest.reduceByKey{case ((p1, c1), (p2, c2)) => (p1 + p2, c1 + c2)} + val pointStats = closest.reduceByKey(mergeResults) val newPoints = pointStats.map {pair => (pair._1, pair._2._1 * (1.0 / pair._2._2))}.collectAsMap() @@ -102,5 +102,10 @@ object SparkKMeans { kPoints.foreach(println) spark.stop() } + + private def mergeResults(a: (Vector[Double], Int), + b: (Vector[Double], Int)): (Vector[Double], Int) = { + (a._1 + b._1, a._2 + b._2) + } } // scalastyle:on println diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index 3947d327dfac..75690bb7722e 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -85,7 +85,7 @@ object SchemaConverters { StructField(f.name, schemaType.dataType, schemaType.nullable) } - SchemaType(StructType(fields), nullable = false) + SchemaType(StructType(fields.toSeq), nullable = false) case ARRAY => val schemaType = toSqlTypeHelper(avroSchema.getElementType, existingRecordNames) @@ -126,7 +126,7 @@ object SchemaConverters { StructField(s"member$i", schemaType.dataType, nullable = true) } - SchemaType(StructType(fields), nullable = false) + SchemaType(StructType(fields.toSeq), nullable = false) } case other => throw new IncompatibleSchemaException(s"Unsupported type $other") diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 216e74a85c2a..5ab786267495 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -336,7 +336,7 @@ private[kafka010] class KafkaOffsetReader( } }) } - incorrectOffsets + incorrectOffsets.toSeq } // Retry to fetch latest offsets when detecting incorrect offsets. We don't use diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index bdad214a9134..ee31652eaf1f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1540,8 +1540,8 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { makeSureGetOffsetCalled, Execute { q => // wait to reach the last offset in every partition - q.awaitOffset( - 0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L)), streamingTimeout.toMillis) + q.awaitOffset(0, + KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L).toMap), streamingTimeout.toMillis) }, CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), StopStream, diff --git a/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala b/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala index 8815eb29bc86..da3ec435ae3d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala @@ -18,6 +18,7 @@ package org.apache.spark.ml import scala.annotation.varargs +import scala.reflect.ClassTag import org.apache.spark.annotation.Since import org.apache.spark.ml.param.{ParamMap, ParamPair} @@ -26,7 +27,7 @@ import org.apache.spark.sql.Dataset /** * Abstract class for estimators that fit models to data. */ -abstract class Estimator[M <: Model[M]] extends PipelineStage { +abstract class Estimator[M <: Model[M] : ClassTag] extends PipelineStage { /** * Fits a single model to the input data with optional parameters. diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala index e0b128e36981..7f981e5d554b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml +import scala.reflect.ClassTag + import org.apache.spark.annotation.Since import org.apache.spark.ml.feature.{Instance, LabeledPoint} import org.apache.spark.ml.functions.checkNonNegativeWeight @@ -115,7 +117,7 @@ private[ml] trait PredictorParams extends Params abstract class Predictor[ FeaturesType, Learner <: Predictor[FeaturesType, Learner, M], - M <: PredictionModel[FeaturesType, M]] + M <: PredictionModel[FeaturesType, M] : ClassTag] extends Estimator[M] with PredictorParams { /** @group setParam */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala index 233e8e5bcdc8..687c5c11e7c9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml.classification +import scala.reflect.ClassTag + import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams} @@ -73,7 +75,7 @@ private[spark] trait ClassifierParams abstract class Classifier[ FeaturesType, E <: Classifier[FeaturesType, E, M], - M <: ClassificationModel[FeaturesType, M]] + M <: ClassificationModel[FeaturesType, M] : ClassTag] extends Predictor[FeaturesType, E, M] with ClassifierParams { /** @group setParam */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala index 1caaeccd7b0d..1c35ab84af00 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml.classification +import scala.reflect.ClassTag + import org.apache.spark.annotation.Since import org.apache.spark.ml.linalg.{DenseVector, Vector, VectorUDT} import org.apache.spark.ml.param.ParamMap @@ -51,7 +53,7 @@ private[ml] trait ProbabilisticClassifierParams abstract class ProbabilisticClassifier[ FeaturesType, E <: ProbabilisticClassifier[FeaturesType, E, M], - M <: ProbabilisticClassificationModel[FeaturesType, M]] + M <: ProbabilisticClassificationModel[FeaturesType, M] : ClassTag] extends Classifier[FeaturesType, E, M] with ProbabilisticClassifierParams { /** @group setParam */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index 18fd220b4ca9..90845021fc07 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -492,12 +492,7 @@ class GaussianMixture @Since("2.0.0") ( (i, (agg.means(i), agg.covs(i), agg.weights(i), ws)) } } else Iterator.empty - }.reduceByKey { case ((mean1, cov1, w1, ws1), (mean2, cov2, w2, ws2)) => - // update the weights, means and covariances for i-th distributions - BLAS.axpy(1.0, mean2, mean1) - BLAS.axpy(1.0, cov2, cov1) - (mean1, cov1, w1 + w2, ws1 + ws2) - }.mapValues { case (mean, cov, w, ws) => + }.reduceByKey(GaussianMixture.mergeWeightsMeans).mapValues { case (mean, cov, w, ws) => // Create new distributions based on the partial assignments // (often referred to as the "M" step in literature) GaussianMixture.updateWeightsAndGaussians(mean, cov, w, ws) @@ -560,12 +555,7 @@ class GaussianMixture @Since("2.0.0") ( agg.meanIter.zip(agg.covIter).zipWithIndex .map { case ((mean, cov), i) => (i, (mean, cov, agg.weights(i), ws)) } } else Iterator.empty - }.reduceByKey { case ((mean1, cov1, w1, ws1), (mean2, cov2, w2, ws2)) => - // update the weights, means and covariances for i-th distributions - BLAS.axpy(1.0, mean2, mean1) - BLAS.axpy(1.0, cov2, cov1) - (mean1, cov1, w1 + w2, ws1 + ws2) - }.mapValues { case (mean, cov, w, ws) => + }.reduceByKey(GaussianMixture.mergeWeightsMeans).mapValues { case (mean, cov, w, ws) => // Create new distributions based on the partial assignments // (often referred to as the "M" step in literature) GaussianMixture.updateWeightsAndGaussians(mean, cov, w, ws) @@ -624,8 +614,8 @@ class GaussianMixture @Since("2.0.0") ( val gaussians = Array.tabulate(numClusters) { i => val start = i * numSamples val end = start + numSamples - val sampleSlice = samples.view(start, end) - val weightSlice = sampleWeights.view(start, end) + val sampleSlice = samples.view.slice(start, end) + val weightSlice = sampleWeights.view.slice(start, end) val localWeightSum = weightSlice.sum weights(i) = localWeightSum / weightSum @@ -691,6 +681,16 @@ object GaussianMixture extends DefaultParamsReadable[GaussianMixture] { new DenseMatrix(n, n, symmetricValues) } + private def mergeWeightsMeans( + a: (DenseVector, DenseVector, Double, Double), + b: (DenseVector, DenseVector, Double, Double)): (DenseVector, DenseVector, Double, Double) = + { + // update the weights, means and covariances for i-th distributions + BLAS.axpy(1.0, b._1, a._1) + BLAS.axpy(1.0, b._2, a._2) + (a._1, a._2, a._3 + b._3, a._4 + b._4) + } + /** * Update the weight, mean and covariance of gaussian distribution. * diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala index 6d5c7c50dbac..999042215113 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala @@ -17,6 +17,7 @@ package org.apache.spark.ml.feature +import scala.reflect.ClassTag import scala.util.Random import org.apache.spark.ml.{Estimator, Model} @@ -324,7 +325,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] * (2) Wang, Jingdong et al. "Hashing for similarity search: A survey." arXiv preprint * arXiv:1408.2927 (2014). */ -private[ml] abstract class LSH[T <: LSHModel[T]] +private[ml] abstract class LSH[T <: LSHModel[T] : ClassTag] extends Estimator[T] with LSHParams with DefaultParamsWritable { self: Estimator[T] => diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala index bd9be779fedb..72ab3dbc3101 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala @@ -201,7 +201,7 @@ object RobustScaler extends DefaultParamsReadable[RobustScaler] { } Iterator.tabulate(numFeatures)(i => (i, summaries(i).compress)) } else Iterator.empty - }.reduceByKey { case (s1, s2) => s1.merge(s2) } + }.reduceByKey { (s1, s2) => s1.merge(s2) } } else { val scale = math.max(math.ceil(math.sqrt(vectors.getNumPartitions)).toInt, 2) vectors.mapPartitionsWithIndex { case (pid, iter) => @@ -214,7 +214,7 @@ object RobustScaler extends DefaultParamsReadable[RobustScaler] { seqOp = (s, v) => s.insert(v), combOp = (s1, s2) => s1.compress.merge(s2.compress) ).map { case ((_, i), s) => (i, s) - }.reduceByKey { case (s1, s2) => s1.compress.merge(s2.compress) } + }.reduceByKey { (s1, s2) => s1.compress.merge(s2.compress) } } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Selector.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Selector.scala index 627133968d14..98f38f23171e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Selector.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Selector.scala @@ -18,6 +18,7 @@ package org.apache.spark.ml.feature import scala.collection.mutable.ArrayBuilder +import scala.reflect.ClassTag import org.apache.spark.annotation.Since import org.apache.spark.ml._ @@ -155,7 +156,7 @@ private[feature] trait SelectorParams extends Params * By default, the selection method is `numTopFeatures`, with the default number of top features * set to 50. */ -private[ml] abstract class Selector[T <: SelectorModel[T]] +private[ml] abstract class Selector[T <: SelectorModel[T] : ClassTag] extends Estimator[T] with SelectorParams with DefaultParamsWritable { /** @group setParam */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index bbfcbfbe038e..db2665fa2e4a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -291,7 +291,7 @@ class Word2VecModel private[ml] ( val outputSchema = transformSchema(dataset.schema, logging = true) val vectors = wordVectors.getVectors .mapValues(vv => Vectors.dense(vv.map(_.toDouble))) - .map(identity) // mapValues doesn't return a serializable map (SI-7005) + .map(identity).toMap // mapValues doesn't return a serializable map (SI-7005) val bVectors = dataset.sparkSession.sparkContext.broadcast(vectors) val d = $(vectorSize) val emptyVec = Vectors.sparse(d, Array.emptyIntArray, Array.emptyDoubleArray) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index 53ca35ccd007..f12c1f995b7d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -937,7 +937,7 @@ final class ParamMap private[ml] (private val map: mutable.Map[Param[Any], Any]) /** Put param pairs with a `java.util.List` of values for Python. */ private[ml] def put(paramPairs: JList[ParamPair[_]]): this.type = { - put(paramPairs.asScala: _*) + put(paramPairs.asScala.toSeq: _*) } /** diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala index c28dac6850e8..80c0b93bae4a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml.regression +import scala.reflect.ClassTag + import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams} @@ -30,7 +32,7 @@ import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams} abstract class Regressor[ FeaturesType, Learner <: Regressor[FeaturesType, Learner, M], - M <: RegressionModel[FeaturesType, M]] + M <: RegressionModel[FeaturesType, M] : ClassTag] extends Predictor[FeaturesType, Learner, M] with PredictorParams { // TODO: defaultEvaluator (follow-up PR) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 259ecb3a1762..68f6ed4281de 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1223,28 +1223,28 @@ private[python] class PythonMLLibAPI extends Serializable { * Python-friendly version of [[MLUtils.convertVectorColumnsToML()]]. */ def convertVectorColumnsToML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = { - MLUtils.convertVectorColumnsToML(dataset, cols.asScala: _*) + MLUtils.convertVectorColumnsToML(dataset, cols.asScala.toSeq: _*) } /** * Python-friendly version of [[MLUtils.convertVectorColumnsFromML()]] */ def convertVectorColumnsFromML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = { - MLUtils.convertVectorColumnsFromML(dataset, cols.asScala: _*) + MLUtils.convertVectorColumnsFromML(dataset, cols.asScala.toSeq: _*) } /** * Python-friendly version of [[MLUtils.convertMatrixColumnsToML()]]. */ def convertMatrixColumnsToML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = { - MLUtils.convertMatrixColumnsToML(dataset, cols.asScala: _*) + MLUtils.convertMatrixColumnsToML(dataset, cols.asScala.toSeq: _*) } /** * Python-friendly version of [[MLUtils.convertMatrixColumnsFromML()]] */ def convertMatrixColumnsFromML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = { - MLUtils.convertMatrixColumnsFromML(dataset, cols.asScala: _*) + MLUtils.convertMatrixColumnsFromML(dataset, cols.asScala.toSeq: _*) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala index 7c12697be95c..99c6e8b3e079 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala @@ -225,7 +225,7 @@ class BisectingKMeans private ( divisibleIndices.contains(parentIndex(index)) } newClusters = summarize(d, newAssignments, dMeasure) - newClusterCenters = newClusters.mapValues(_.center).map(identity) + newClusterCenters = newClusters.mapValues(_.center).map(identity).toMap } if (preIndices != null) { preIndices.unpersist() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala index 4d98ba41bbb7..d5a788261454 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala @@ -17,8 +17,6 @@ package org.apache.spark.mllib.clustering -import scala.collection.mutable.IndexedSeq - import breeze.linalg.{diag, DenseMatrix => BreezeMatrix, DenseVector => BDV, Vector => BV} import org.apache.spark.annotation.Since @@ -189,8 +187,8 @@ class GaussianMixture private ( case None => val samples = breezeData.takeSample(withReplacement = true, k * nSamples, seed) (Array.fill(k)(1.0 / k), Array.tabulate(k) { i => - val slice = samples.view(i * nSamples, (i + 1) * nSamples) - new MultivariateGaussian(vectorMean(slice), initCovariance(slice)) + val slice = samples.view.slice(i * nSamples, (i + 1) * nSamples) + new MultivariateGaussian(vectorMean(slice.toSeq), initCovariance(slice.toSeq)) }) } @@ -259,7 +257,7 @@ class GaussianMixture private ( } /** Average of dense breeze vectors */ - private def vectorMean(x: IndexedSeq[BV[Double]]): BDV[Double] = { + private def vectorMean(x: Seq[BV[Double]]): BDV[Double] = { val v = BDV.zeros[Double](x(0).length) x.foreach(xi => v += xi) v / x.length.toDouble @@ -269,7 +267,7 @@ class GaussianMixture private ( * Construct matrix where diagonal entries are element-wise * variance of input vectors (computes biased variance) */ - private def initCovariance(x: IndexedSeq[BV[Double]]): BreezeMatrix[Double] = { + private def initCovariance(x: Seq[BV[Double]]): BreezeMatrix[Double] = { val mu = vectorMean(x) val ss = BDV.zeros[Double](x(0).length) x.foreach { xi => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index ac2b576f4ac4..de3209c34bf0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -335,7 +335,7 @@ object PrefixSpan extends Logging { largePrefixes = newLargePrefixes } - var freqPatterns = sc.parallelize(localFreqPatterns, 1) + var freqPatterns = sc.parallelize(localFreqPatterns.toSeq, 1) val numSmallPrefixes = smallPrefixes.size logInfo(s"number of small prefixes for local processing: $numSmallPrefixes") diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala index 365b2a06110f..c669ced61d2f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala @@ -97,7 +97,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int } if (sizes(i) + tail.length >= offset + windowSize) { partitions += - new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail, offset) + new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toSeq, offset) partitionIndex += 1 } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Entropy.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Entropy.scala index 6e2732f7ae7a..c3bda9978631 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Entropy.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Entropy.scala @@ -112,7 +112,7 @@ private[spark] class EntropyAggregator(numClasses: Int) * @param offset Start index of stats for this (node, feature, bin). */ def getCalculator(allStats: Array[Double], offset: Int): EntropyCalculator = { - new EntropyCalculator(allStats.view(offset, offset + statsSize - 1).toArray, + new EntropyCalculator(allStats.view.slice(offset, offset + statsSize - 1).toArray, allStats(offset + statsSize - 1).toLong) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Gini.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Gini.scala index 5983118c0575..70163b56408a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Gini.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Gini.scala @@ -107,7 +107,7 @@ private[spark] class GiniAggregator(numClasses: Int) * @param offset Start index of stats for this (node, feature, bin). */ def getCalculator(allStats: Array[Double], offset: Int): GiniCalculator = { - new GiniCalculator(allStats.view(offset, offset + statsSize - 1).toArray, + new GiniCalculator(allStats.view.slice(offset, offset + statsSize - 1).toArray, allStats(offset + statsSize - 1).toLong) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Variance.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Variance.scala index f5b2f8d514c7..7143fd07d733 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Variance.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Variance.scala @@ -95,7 +95,7 @@ private[spark] class VarianceAggregator() * @param offset Start index of stats for this (node, feature, bin). */ def getCalculator(allStats: Array[Double], offset: Int): VarianceCalculator = { - new VarianceCalculator(allStats.view(offset, offset + statsSize - 1).toArray, + new VarianceCalculator(allStats.view.slice(offset, offset + statsSize - 1).toArray, allStats(offset + statsSize - 1).toLong) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala index 2c613348c2d9..01f508d7a6de 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala @@ -85,10 +85,10 @@ private[mllib] object NumericParser { while (parsing && tokenizer.hasMoreTokens()) { token = tokenizer.nextToken() if (token == "(") { - items.append(parseTuple(tokenizer)) + items ++= parseTuple(tokenizer) allowComma = true } else if (token == "[") { - items.append(parseArray(tokenizer)) + items ++= parseArray(tokenizer) allowComma = true } else if (token == ",") { if (allowComma) { @@ -102,14 +102,14 @@ private[mllib] object NumericParser { // ignore whitespaces between delim chars, e.g. ", [" } else { // expecting a number - items.append(parseDouble(token)) + items += parseDouble(token) allowComma = true } } if (parsing) { throw new SparkException(s"A tuple must end with ')'.") } - items + items.toSeq } private def parseDouble(s: String): Double = { diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala index debd0dd65d0c..04b20d1e58dd 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala @@ -219,7 +219,7 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest { model1.clusterCenters.forall(Vectors.norm(_, 2) == 1.0) - val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Array( + val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Seq( (Vectors.dense(1.0, 1.0), 2.0), (Vectors.dense(10.0, 10.0), 2.0), (Vectors.dense(1.0, 0.5), 2.0), (Vectors.dense(10.0, 4.4), 2.0), (Vectors.dense(-1.0, 1.0), 2.0), (Vectors.dense(-100.0, 90.0), 2.0)))) @@ -286,7 +286,7 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest { model1.clusterCenters.forall(Vectors.norm(_, 2) == 1.0) - val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Array( + val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Seq( (Vectors.dense(1.0, 1.0), 1.0), (Vectors.dense(10.0, 10.0), 2.0), (Vectors.dense(1.0, 0.5), 2.0), (Vectors.dense(10.0, 4.4), 3.0), (Vectors.dense(-1.0, 1.0), 3.0), (Vectors.dense(-100.0, 90.0), 4.0)))) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index 584594436267..61f4359d99ea 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -255,7 +255,7 @@ class KMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWriteTes } test("compare with weightCol and without weightCol") { - val df1 = spark.createDataFrame(spark.sparkContext.parallelize(Array( + val df1 = spark.createDataFrame(spark.sparkContext.parallelize(Seq( Vectors.dense(1.0, 1.0), Vectors.dense(10.0, 10.0), Vectors.dense(10.0, 10.0), Vectors.dense(1.0, 0.5), @@ -285,7 +285,7 @@ class KMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWriteTes model1.clusterCenters.forall(Vectors.norm(_, 2) == 1.0) - val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Array( + val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Seq( (Vectors.dense(1.0, 1.0), 1.0), (Vectors.dense(10.0, 10.0), 2.0), (Vectors.dense(1.0, 0.5), 1.0), @@ -322,7 +322,7 @@ class KMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWriteTes test("Two centers with weightCol") { // use the same weight for all samples. - val df1 = spark.createDataFrame(spark.sparkContext.parallelize(Array( + val df1 = spark.createDataFrame(spark.sparkContext.parallelize(Seq( (Vectors.dense(0.0, 0.0), 2.0), (Vectors.dense(0.0, 0.1), 2.0), (Vectors.dense(0.1, 0.0), 2.0), @@ -366,7 +366,7 @@ class KMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWriteTes assert(model1.clusterCenters(1) === model1_center2) // use different weight - val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Array( + val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Seq( (Vectors.dense(0.0, 0.0), 1.0), (Vectors.dense(0.0, 0.1), 2.0), (Vectors.dense(0.1, 0.0), 3.0), @@ -412,7 +412,7 @@ class KMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWriteTes test("Four centers with weightCol") { // no weight - val df1 = spark.createDataFrame(spark.sparkContext.parallelize(Array( + val df1 = spark.createDataFrame(spark.sparkContext.parallelize(Seq( Vectors.dense(0.1, 0.1), Vectors.dense(5.0, 0.2), Vectors.dense(10.0, 0.0), @@ -444,7 +444,7 @@ class KMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWriteTes model1.clusterCenters.forall(Vectors.norm(_, 2) == 1.0) // use same weight, should have the same result as no weight - val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Array( + val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Seq( (Vectors.dense(0.1, 0.1), 2.0), (Vectors.dense(5.0, 0.2), 2.0), (Vectors.dense(10.0, 0.0), 2.0), diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala index d4c620adc2e3..06f2cb2b9788 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala @@ -189,7 +189,7 @@ class ClusteringEvaluatorSuite } test("single-element clusters with weight") { - val singleItemClusters = spark.createDataFrame(spark.sparkContext.parallelize(Array( + val singleItemClusters = spark.createDataFrame(spark.sparkContext.parallelize(Seq( (0.0, Vectors.dense(5.1, 3.5, 1.4, 0.2), 6.0), (1.0, Vectors.dense(7.0, 3.2, 4.7, 1.4), 0.25), (2.0, Vectors.dense(6.3, 3.3, 6.0, 2.5), 9.99)))).toDF("label", "features", "weight") diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala index d97df0050d74..1c602cd7d9a4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala @@ -29,14 +29,14 @@ class NormalizerSuite extends MLTest with DefaultReadWriteTest { import testImplicits._ - @transient var data: Array[Vector] = _ - @transient var l1Normalized: Array[Vector] = _ - @transient var l2Normalized: Array[Vector] = _ + @transient var data: Seq[Vector] = _ + @transient var l1Normalized: Seq[Vector] = _ + @transient var l2Normalized: Seq[Vector] = _ override def beforeAll(): Unit = { super.beforeAll() - data = Array( + data = Seq( Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))), Vectors.dense(0.0, 0.0, 0.0), Vectors.dense(0.6, -1.1, -3.0), @@ -44,7 +44,7 @@ class NormalizerSuite extends MLTest with DefaultReadWriteTest { Vectors.sparse(3, Seq((0, 5.7), (1, 0.72), (2, 2.7))), Vectors.sparse(3, Seq()) ) - l1Normalized = Array( + l1Normalized = Seq( Vectors.sparse(3, Seq((0, -0.465116279), (1, 0.53488372))), Vectors.dense(0.0, 0.0, 0.0), Vectors.dense(0.12765957, -0.23404255, -0.63829787), @@ -52,7 +52,7 @@ class NormalizerSuite extends MLTest with DefaultReadWriteTest { Vectors.dense(0.625, 0.07894737, 0.29605263), Vectors.sparse(3, Seq()) ) - l2Normalized = Array( + l2Normalized = Seq( Vectors.sparse(3, Seq((0, -0.65617871), (1, 0.75460552))), Vectors.dense(0.0, 0.0, 0.0), Vectors.dense(0.184549876, -0.3383414, -0.922749378), diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 9029fc96b36a..28275eb06cf0 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -307,7 +307,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging { } logInfo(s"Generated an explicit feedback dataset with ${training.size} ratings for training " + s"and ${test.size} for test.") - (sc.parallelize(training, 2), sc.parallelize(test, 2)) + (sc.parallelize(training.toSeq, 2), sc.parallelize(test.toSeq, 2)) } /** @@ -810,7 +810,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging { val topItems = model.recommendForAllUsers(k) assert(topItems.count() == numUsers) assert(topItems.columns.contains("user")) - checkRecommendations(topItems, expectedUpToN, "item") + checkRecommendations(topItems, expectedUpToN.toMap, "item") } } @@ -831,7 +831,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging { val topUsers = getALSModel.recommendForAllItems(k) assert(topUsers.count() == numItems) assert(topUsers.columns.contains("item")) - checkRecommendations(topUsers, expectedUpToN, "user") + checkRecommendations(topUsers, expectedUpToN.toMap, "user") } } @@ -853,7 +853,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging { val topItems = model.recommendForUserSubset(userSubset, k) assert(topItems.count() == numUsersSubset) assert(topItems.columns.contains("user")) - checkRecommendations(topItems, expectedUpToN, "item") + checkRecommendations(topItems, expectedUpToN.toMap, "item") } } @@ -875,7 +875,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging { val topUsers = model.recommendForItemSubset(itemSubset, k) assert(topUsers.count() == numItemsSubset) assert(topUsers.columns.contains("item")) - checkRecommendations(topUsers, expectedUpToN, "user") + checkRecommendations(topUsers, expectedUpToN.toMap, "user") } } @@ -1211,6 +1211,6 @@ object ALSSuite extends Logging { } logInfo(s"Generated an implicit feedback dataset with ${training.size} ratings for training " + s"and ${test.size} for test.") - (sc.parallelize(training, 2), sc.parallelize(test, 2)) + (sc.parallelize(training.toSeq, 2), sc.parallelize(test.toSeq, 2)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 571c25e356c0..f01a03996821 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -833,8 +833,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat updateLocationInStorageProps(table, newPath = None).copy( locationUri = tableLocation.map(CatalogUtils.stringToURI(_))) } - val storageWithoutHiveGeneratedProperties = storageWithLocation.copy( - properties = storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_))) + val storageWithoutHiveGeneratedProperties = storageWithLocation.copy(properties = + storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap) val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) val schemaFromTableProps = getSchemaFromTableProperties(table) @@ -848,7 +848,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat partitionColumnNames = partColumnNames, bucketSpec = getBucketSpecFromTableProperties(table), tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG), - properties = table.properties.filterKeys(!HIVE_GENERATED_TABLE_PROPERTIES(_))) + properties = table.properties.filterKeys(!HIVE_GENERATED_TABLE_PROPERTIES(_)).toMap) } override def tableExists(db: String, table: String): Boolean = withClient { @@ -1125,7 +1125,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val colStats = new mutable.HashMap[String, CatalogColumnStat] val colStatsProps = properties.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX)).map { case (k, v) => k.drop(STATISTICS_COL_STATS_PREFIX.length) -> v - } + }.toMap // Find all the column names by matching the KEY_VERSION properties for them. colStatsProps.keys.filter { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 16e901434024..19aa5935a09d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -694,7 +694,7 @@ private[hive] trait HiveInspectors { } data: Any => { if (data != null) { - InternalRow.fromSeq(unwrappers.map(_(data))) + InternalRow.fromSeq(unwrappers.map(_(data)).toSeq) } else { null } @@ -872,7 +872,7 @@ private[hive] trait HiveInspectors { StructType(s.getAllStructFieldRefs.asScala.map(f => types.StructField( f.getFieldName, inspectorToDataType(f.getFieldObjectInspector), nullable = true) - )) + ).toSeq) case l: ListObjectInspector => ArrayType(inspectorToDataType(l.getListElementObjectInspector)) case m: MapObjectInspector => MapType( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2981e391c043..a89243c331c7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -131,12 +131,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // Consider table and storage properties. For properties existing in both sides, storage // properties will supersede table properties. if (serde.contains("parquet")) { - val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ + val options = relation.tableMeta.properties.filterKeys(isParquetProperty).toMap ++ relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") } else { - val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ + val options = relation.tableMeta.properties.filterKeys(isOrcProperty).toMap ++ relation.tableMeta.storage.properties if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { convertToLogicalRelation( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 04caf57efdc7..62ff2db2ecb3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -408,7 +408,7 @@ private[spark] object HiveUtils extends Logging { logWarning(s"Hive jar path '$path' does not exist.") Nil } else { - files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")) + files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).toSeq } case path => new File(path) :: Nil @@ -505,7 +505,7 @@ private[spark] object HiveUtils extends Logging { // partition columns are part of the schema val partCols = hiveTable.getPartCols.asScala.map(HiveClientImpl.fromHiveColumn) val dataCols = hiveTable.getCols.asScala.map(HiveClientImpl.fromHiveColumn) - table.copy(schema = StructType(dataCols ++ partCols)) + table.copy(schema = StructType((dataCols ++ partCols).toSeq)) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 6ad5e9d3c908..3f70387a3b05 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -390,7 +390,7 @@ private[hive] class HiveClientImpl( } override def listDatabases(pattern: String): Seq[String] = withHiveState { - client.getDatabasesByPattern(pattern).asScala + client.getDatabasesByPattern(pattern).asScala.toSeq } private def getRawTableOption(dbName: String, tableName: String): Option[HiveTable] = { @@ -400,7 +400,7 @@ private[hive] class HiveClientImpl( private def getRawTablesByName(dbName: String, tableNames: Seq[String]): Seq[HiveTable] = { try { msClient.getTableObjectsByName(dbName, tableNames.asJava).asScala - .map(extraFixesForNonView).map(new HiveTable(_)) + .map(extraFixesForNonView).map(new HiveTable(_)).toSeq } catch { case ex: Exception => throw new HiveException(s"Unable to fetch tables of db $dbName", ex); @@ -434,7 +434,7 @@ private[hive] class HiveClientImpl( throw new SparkException( s"${ex.getMessage}, db: ${h.getDbName}, table: ${h.getTableName}", ex) } - val schema = StructType(cols ++ partCols) + val schema = StructType((cols ++ partCols).toSeq) val bucketSpec = if (h.getNumBuckets > 0) { val sortColumnOrders = h.getSortCols.asScala @@ -450,7 +450,7 @@ private[hive] class HiveClientImpl( } else { Seq.empty } - Option(BucketSpec(h.getNumBuckets, h.getBucketCols.asScala, sortColumnNames)) + Option(BucketSpec(h.getNumBuckets, h.getBucketCols.asScala.toSeq, sortColumnNames.toSeq)) } else { None } @@ -502,7 +502,7 @@ private[hive] class HiveClientImpl( throw new AnalysisException(s"Hive $tableTypeStr is not supported.") }, schema = schema, - partitionColumnNames = partCols.map(_.name), + partitionColumnNames = partCols.map(_.name).toSeq, // If the table is written by Spark, we will put bucketing information in table properties, // and will always overwrite the bucket spec in hive metastore by the bucketing information // in table properties. This means, if we have bucket spec in both hive metastore and @@ -539,7 +539,7 @@ private[hive] class HiveClientImpl( // that created by older versions of Spark. viewOriginalText = Option(h.getViewOriginalText), viewText = Option(h.getViewExpandedText), - unsupportedFeatures = unsupportedFeatures, + unsupportedFeatures = unsupportedFeatures.toSeq, ignoredProperties = ignoredProperties.toMap) } @@ -638,7 +638,7 @@ private[hive] class HiveClientImpl( shim.dropPartition(client, db, table, partition, !retainData, purge) } catch { case e: Exception => - val remainingParts = matchingParts.toBuffer -- droppedParts + val remainingParts = matchingParts.toBuffer --= droppedParts logError( s""" |====================== @@ -708,7 +708,7 @@ private[hive] class HiveClientImpl( assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") client.getPartitionNames(table.database, table.identifier.table, s.asJava, -1) } - hivePartitionNames.asScala.sorted + hivePartitionNames.asScala.sorted.toSeq } override def getPartitionOption( @@ -735,7 +735,7 @@ private[hive] class HiveClientImpl( } val parts = client.getPartitions(hiveTable, partSpec.asJava).asScala.map(fromHivePartition) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) - parts + parts.toSeq } override def getPartitionsByFilter( @@ -748,11 +748,11 @@ private[hive] class HiveClientImpl( } override def listTables(dbName: String): Seq[String] = withHiveState { - client.getAllTables(dbName).asScala + client.getAllTables(dbName).asScala.toSeq } override def listTables(dbName: String, pattern: String): Seq[String] = withHiveState { - client.getTablesByPattern(dbName, pattern).asScala + client.getTablesByPattern(dbName, pattern).asScala.toSeq } override def listTablesByType( @@ -766,7 +766,7 @@ private[hive] class HiveClientImpl( case _: UnsupportedOperationException => // Fallback to filter logic if getTablesByType not supported. val tableNames = client.getTablesByPattern(dbName, pattern).asScala - val tables = getTablesByName(dbName, tableNames).filter(_.tableType == tableType) + val tables = getTablesByName(dbName, tableNames.toSeq).filter(_.tableType == tableType) tables.map(_.identifier.table) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 8df43b785759..8ff7a1abd2d6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -363,7 +363,7 @@ private[client] class Shim_v0_12 extends Shim with Logging { override def getDriverResults(driver: Driver): Seq[String] = { val res = new JArrayList[String]() getDriverResultsMethod.invoke(driver, res) - res.asScala + res.asScala.toSeq } override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = { @@ -600,7 +600,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri()) } - CatalogFunction(name, hf.getClassName, resources) + CatalogFunction(name, hf.getClassName, resources.toSeq) } override def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction] = { @@ -623,7 +623,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } override def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] = { - hive.getFunctions(db, pattern).asScala + hive.getFunctions(db, pattern).asScala.toSeq } /** @@ -843,7 +843,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { case s: String => s case a: Array[Object] => a(0).asInstanceOf[String] } - } + }.toSeq } override def getDatabaseOwnerName(db: Database): String = { @@ -1252,7 +1252,7 @@ private[client] class Shim_v2_3 extends Shim_v2_1 { pattern: String, tableType: TableType): Seq[String] = { getTablesByTypeMethod.invoke(hive, dbName, pattern, tableType) - .asInstanceOf[JList[String]].asScala + .asInstanceOf[JList[String]].asScala.toSeq } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala index 802ddafdbee4..7b51618772ed 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala @@ -87,7 +87,7 @@ class HiveOptions(@transient private val parameters: CaseInsensitiveMap[String]) def serdeProperties: Map[String, String] = parameters.filterKeys { k => !lowerCasedOptionNames.contains(k.toLowerCase(Locale.ROOT)) - }.map { case (k, v) => delimiterOptions.getOrElse(k, k) -> v } + }.map { case (k, v) => delimiterOptions.getOrElse(k, k) -> v }.toMap } object HiveOptions { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 4dccacef337e..41820b0135f4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -156,7 +156,7 @@ case class HiveTableScanExec( // Only partitioned values are needed here, since the predicate has already been bound to // partition key attribute references. - val row = InternalRow.fromSeq(castedValues) + val row = InternalRow.fromSeq(castedValues.toSeq) shouldKeep.eval(row).asInstanceOf[Boolean] } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 05d608a2016a..8ad5cb70d248 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -224,7 +224,7 @@ private[hive] case class HiveGenericUDTF( override lazy val elementSchema = StructType(outputInspector.getAllStructFieldRefs.asScala.map { field => StructField(field.getFieldName, inspectorToDataType(field.getFieldObjectInspector), nullable = true) - }) + }.toSeq) @transient private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray @@ -257,7 +257,7 @@ private[hive] case class HiveGenericUDTF( def collectRows(): Seq[InternalRow] = { val toCollect = collected collected = new ArrayBuffer[InternalRow] - toCollect + toCollect.toSeq } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveShowCreateTableSuite.scala index cfcf70c0e79f..446923ad2320 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveShowCreateTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveShowCreateTableSuite.scala @@ -279,7 +279,7 @@ class HiveShowCreateTableSuite extends ShowCreateTableSuite with TestHiveSinglet table.copy( createTime = 0L, lastAccessTime = 0L, - properties = table.properties.filterKeys(!nondeterministicProps.contains(_)), + properties = table.properties.filterKeys(!nondeterministicProps.contains(_)).toMap, stats = None, ignoredProperties = Map.empty, storage = table.storage.copy(properties = Map.empty), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index c1eab63ec073..be6d02330229 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -911,7 +911,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto */ private def getStatsProperties(tableName: String): Map[String, String] = { val hTable = hiveClient.getTable(spark.sessionState.catalog.getCurrentDatabase, tableName) - hTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)) + hTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)).toMap } test("change stats after insert command for hive table") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index f95251a66971..fbd1fc1ea98d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -128,7 +128,7 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA createTime = 0L, lastAccessTime = 0L, owner = "", - properties = table.properties.filterKeys(!nondeterministicProps.contains(_)), + properties = table.properties.filterKeys(!nondeterministicProps.contains(_)).toMap, // View texts are checked separately viewText = None ) From bc74297f72cf51c773b6abfe6dcd19c691f3dfac Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 14 Jul 2020 20:10:16 -0500 Subject: [PATCH 2/3] Try to fix ML ClassTag issue --- mllib/src/main/scala/org/apache/spark/ml/Estimator.scala | 5 ++--- mllib/src/main/scala/org/apache/spark/ml/Predictor.scala | 4 +--- .../org/apache/spark/ml/classification/Classifier.scala | 4 +--- .../spark/ml/classification/ProbabilisticClassifier.scala | 4 +--- mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala | 3 +-- .../main/scala/org/apache/spark/ml/feature/Selector.scala | 3 +-- .../scala/org/apache/spark/ml/regression/Regressor.scala | 4 +--- 7 files changed, 8 insertions(+), 19 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala b/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala index da3ec435ae3d..3a02e2be6fe0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Estimator.scala @@ -18,7 +18,6 @@ package org.apache.spark.ml import scala.annotation.varargs -import scala.reflect.ClassTag import org.apache.spark.annotation.Since import org.apache.spark.ml.param.{ParamMap, ParamPair} @@ -27,7 +26,7 @@ import org.apache.spark.sql.Dataset /** * Abstract class for estimators that fit models to data. */ -abstract class Estimator[M <: Model[M] : ClassTag] extends PipelineStage { +abstract class Estimator[M <: Model[M]] extends PipelineStage { /** * Fits a single model to the input data with optional parameters. @@ -77,7 +76,7 @@ abstract class Estimator[M <: Model[M] : ClassTag] extends PipelineStage { * @return fitted models, matching the input parameter maps */ @Since("2.0.0") - def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = { + def fit(dataset: Dataset[_], paramMaps: Seq[ParamMap]): Seq[M] = { paramMaps.map(fit(dataset, _)) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala index 7f981e5d554b..e0b128e36981 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala @@ -17,8 +17,6 @@ package org.apache.spark.ml -import scala.reflect.ClassTag - import org.apache.spark.annotation.Since import org.apache.spark.ml.feature.{Instance, LabeledPoint} import org.apache.spark.ml.functions.checkNonNegativeWeight @@ -117,7 +115,7 @@ private[ml] trait PredictorParams extends Params abstract class Predictor[ FeaturesType, Learner <: Predictor[FeaturesType, Learner, M], - M <: PredictionModel[FeaturesType, M] : ClassTag] + M <: PredictionModel[FeaturesType, M]] extends Estimator[M] with PredictorParams { /** @group setParam */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala index 687c5c11e7c9..233e8e5bcdc8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -17,8 +17,6 @@ package org.apache.spark.ml.classification -import scala.reflect.ClassTag - import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams} @@ -75,7 +73,7 @@ private[spark] trait ClassifierParams abstract class Classifier[ FeaturesType, E <: Classifier[FeaturesType, E, M], - M <: ClassificationModel[FeaturesType, M] : ClassTag] + M <: ClassificationModel[FeaturesType, M]] extends Predictor[FeaturesType, E, M] with ClassifierParams { /** @group setParam */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala index 1c35ab84af00..1caaeccd7b0d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala @@ -17,8 +17,6 @@ package org.apache.spark.ml.classification -import scala.reflect.ClassTag - import org.apache.spark.annotation.Since import org.apache.spark.ml.linalg.{DenseVector, Vector, VectorUDT} import org.apache.spark.ml.param.ParamMap @@ -53,7 +51,7 @@ private[ml] trait ProbabilisticClassifierParams abstract class ProbabilisticClassifier[ FeaturesType, E <: ProbabilisticClassifier[FeaturesType, E, M], - M <: ProbabilisticClassificationModel[FeaturesType, M] : ClassTag] + M <: ProbabilisticClassificationModel[FeaturesType, M]] extends Classifier[FeaturesType, E, M] with ProbabilisticClassifierParams { /** @group setParam */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala index 999042215113..6d5c7c50dbac 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala @@ -17,7 +17,6 @@ package org.apache.spark.ml.feature -import scala.reflect.ClassTag import scala.util.Random import org.apache.spark.ml.{Estimator, Model} @@ -325,7 +324,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] * (2) Wang, Jingdong et al. "Hashing for similarity search: A survey." arXiv preprint * arXiv:1408.2927 (2014). */ -private[ml] abstract class LSH[T <: LSHModel[T] : ClassTag] +private[ml] abstract class LSH[T <: LSHModel[T]] extends Estimator[T] with LSHParams with DefaultParamsWritable { self: Estimator[T] => diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Selector.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Selector.scala index 98f38f23171e..627133968d14 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Selector.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Selector.scala @@ -18,7 +18,6 @@ package org.apache.spark.ml.feature import scala.collection.mutable.ArrayBuilder -import scala.reflect.ClassTag import org.apache.spark.annotation.Since import org.apache.spark.ml._ @@ -156,7 +155,7 @@ private[feature] trait SelectorParams extends Params * By default, the selection method is `numTopFeatures`, with the default number of top features * set to 50. */ -private[ml] abstract class Selector[T <: SelectorModel[T] : ClassTag] +private[ml] abstract class Selector[T <: SelectorModel[T]] extends Estimator[T] with SelectorParams with DefaultParamsWritable { /** @group setParam */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala index 80c0b93bae4a..c28dac6850e8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala @@ -17,8 +17,6 @@ package org.apache.spark.ml.regression -import scala.reflect.ClassTag - import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams} @@ -32,7 +30,7 @@ import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams} abstract class Regressor[ FeaturesType, Learner <: Regressor[FeaturesType, Learner, M], - M <: RegressionModel[FeaturesType, M] : ClassTag] + M <: RegressionModel[FeaturesType, M]] extends Predictor[FeaturesType, Learner, M] with PredictorParams { // TODO: defaultEvaluator (follow-up PR) From 6390b6c46f5bf35e0c92b140bfbe12f98c35cd8f Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 15 Jul 2020 11:52:25 -0500 Subject: [PATCH 3/3] Fix NumericParser --- .../main/scala/org/apache/spark/examples/SparkKMeans.scala | 5 +++-- .../scala/org/apache/spark/mllib/util/NumericParser.scala | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala index 5982fb67ee19..cf03e0203f77 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala @@ -103,8 +103,9 @@ object SparkKMeans { spark.stop() } - private def mergeResults(a: (Vector[Double], Int), - b: (Vector[Double], Int)): (Vector[Double], Int) = { + private def mergeResults( + a: (Vector[Double], Int), + b: (Vector[Double], Int)): (Vector[Double], Int) = { (a._1 + b._1, a._2 + b._2) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala index 01f508d7a6de..959e54e4c716 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala @@ -85,10 +85,10 @@ private[mllib] object NumericParser { while (parsing && tokenizer.hasMoreTokens()) { token = tokenizer.nextToken() if (token == "(") { - items ++= parseTuple(tokenizer) + items += parseTuple(tokenizer) allowComma = true } else if (token == "[") { - items ++= parseArray(tokenizer) + items += parseArray(tokenizer) allowComma = true } else if (token == ",") { if (allowComma) {