From 08493d40373b4d198611c1ddde09fa5404e08f17 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 1 Jul 2020 12:33:04 -0500 Subject: [PATCH 1/7] First pass at getting Core compiling for 2.13. --- .../apache/spark/ExecutorAllocationManager.scala | 6 +++--- .../scala/org/apache/spark/MapOutputTracker.scala | 2 +- .../main/scala/org/apache/spark/SparkConf.scala | 9 --------- .../scala/org/apache/spark/TaskContextImpl.scala | 6 +++--- .../scala/org/apache/spark/api/java/JavaRDD.scala | 2 +- .../org/apache/spark/api/java/JavaRDDLike.scala | 9 +++++---- .../apache/spark/api/java/JavaSparkContext.scala | 6 +++--- .../org/apache/spark/api/python/PythonRDD.scala | 2 +- .../org/apache/spark/api/python/PythonUtils.scala | 2 +- .../apache/spark/deploy/FaultToleranceTest.scala | 6 +++--- .../org/apache/spark/deploy/PythonRunner.scala | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../deploy/master/ZooKeeperPersistenceEngine.scala | 2 +- .../spark/deploy/rest/RestSubmissionClient.scala | 2 +- .../apache/spark/deploy/worker/CommandUtils.scala | 2 +- .../apache/spark/deploy/worker/DriverRunner.scala | 4 ++-- .../spark/deploy/worker/ExecutorRunner.scala | 2 +- .../executor/CoarseGrainedExecutorBackend.scala | 8 ++++---- .../scala/org/apache/spark/executor/Executor.scala | 10 ++++++---- .../org/apache/spark/executor/TaskMetrics.scala | 4 ++-- .../org/apache/spark/metrics/MetricsSystem.scala | 2 +- .../org/apache/spark/rdd/PairRDDFunctions.scala | 2 +- .../apache/spark/rdd/ParallelCollectionRDD.scala | 6 +++--- .../main/scala/org/apache/spark/rdd/PipedRDD.scala | 2 +- .../main/scala/org/apache/spark/rdd/UnionRDD.scala | 2 +- .../apache/spark/resource/ResourceProfile.scala | 5 +++-- .../apache/spark/scheduler/LiveListenerBus.scala | 2 +- .../org/apache/spark/scheduler/SplitInfo.scala | 4 ++-- .../spark/scheduler/StatsReportListener.scala | 10 +++++----- .../org/apache/spark/scheduler/TaskResult.scala | 2 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 4 ++-- .../apache/spark/storage/BlockInfoManager.scala | 2 +- .../apache/spark/storage/DiskBlockManager.scala | 2 +- .../storage/ShuffleBlockFetcherIterator.scala | 10 +++++----- .../main/scala/org/apache/spark/ui/UIUtils.scala | 2 +- .../src/main/scala/org/apache/spark/ui/WebUI.scala | 6 +++--- .../org/apache/spark/ui/jobs/AllJobsPage.scala | 8 ++++---- .../scala/org/apache/spark/ui/jobs/JobPage.scala | 14 +++++++------- .../apache/spark/ui/scope/RDDOperationGraph.scala | 6 +++--- .../scala/org/apache/spark/util/JsonProtocol.scala | 3 ++- .../main/scala/org/apache/spark/util/Utils.scala | 2 +- .../spark/util/collection/ExternalSorter.scala | 2 +- .../scala/org/apache/spark/AccumulatorSuite.scala | 2 +- .../org/apache/spark/ContextCleanerSuite.scala | 8 ++++---- .../org/apache/spark/HeartbeatReceiverSuite.scala | 4 +++- .../apache/spark/InternalAccumulatorSuite.scala | 2 +- .../test/scala/org/apache/spark/ShuffleSuite.scala | 2 +- .../org/apache/spark/deploy/IvyTestUtils.scala | 2 +- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 4 ++-- .../scheduler/EventLoggingListenerSuite.scala | 2 +- .../scheduler/ExecutorResourceInfoSuite.scala | 12 ++++++------ .../spark/scheduler/SparkListenerSuite.scala | 2 +- .../spark/scheduler/TaskResultGetterSuite.scala | 2 +- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- .../storage/ShuffleBlockFetcherIteratorSuite.scala | 2 +- 55 files changed, 115 insertions(+), 117 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index e0ac2b3e0f4b..620a6fe2f9d7 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -550,7 +550,7 @@ private[spark] class ExecutorAllocationManager( } else { // We don't want to change our target number of executors, because we already did that // when the task backlog decreased. - client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false, + client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, countFailures = false, force = false) } @@ -563,9 +563,9 @@ private[spark] class ExecutorAllocationManager( // reset the newExecutorTotal to the existing number of executors if (testing || executorsRemoved.nonEmpty) { - executorMonitor.executorsKilled(executorsRemoved) + executorMonitor.executorsKilled(executorsRemoved.toSeq) logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.") - executorsRemoved + executorsRemoved.toSeq } else { logWarning(s"Unable to reach the cluster manager to kill executor/s " + s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible to kill!") diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 18cd5de4cfad..f981bcbea020 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -972,6 +972,6 @@ private[spark] object MapOutputTracker extends Logging { } } - splitsByAddress.iterator + splitsByAddress.view.mapValues(_.toSeq).iterator } } diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 40915e3904f7..dbd89d646ae5 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -173,15 +173,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria this } - /** - * Set multiple parameters together - */ - @deprecated("Use setAll(Iterable) instead", "3.0.0") - def setAll(settings: Traversable[(String, String)]): SparkConf = { - settings.foreach { case (k, v) => set(k, v) } - this - } - /** Set a parameter if it isn't already configured */ def setIfMissing(key: String, value: String): SparkConf = { if (settings.putIfAbsent(key, value) == null) { diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index 08a58a029528..db4b74bb89f0 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -111,7 +111,7 @@ private[spark] class TaskContextImpl( if (failed) return failed = true failure = error - invokeListeners(onFailureCallbacks, "TaskFailureListener", Option(error)) { + invokeListeners(onFailureCallbacks.toSeq, "TaskFailureListener", Option(error)) { _.onTaskFailure(this, error) } } @@ -120,7 +120,7 @@ private[spark] class TaskContextImpl( private[spark] override def markTaskCompleted(error: Option[Throwable]): Unit = synchronized { if (completed) return completed = true - invokeListeners(onCompleteCallbacks, "TaskCompletionListener", error) { + invokeListeners(onCompleteCallbacks.toSeq, "TaskCompletionListener", error) { _.onTaskCompletion(this) } } @@ -142,7 +142,7 @@ private[spark] class TaskContextImpl( } } if (errorMsgs.nonEmpty) { - throw new TaskCompletionListenerException(errorMsgs, error) + throw new TaskCompletionListenerException(errorMsgs.toSeq, error) } } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index e4140f659d97..15cb01a17328 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -256,7 +256,7 @@ object JavaRDD { } catch { case eof: EOFException => // No-op } - JavaRDD.fromRDD(sc.parallelize(objs, parallelism)) + JavaRDD.fromRDD(sc.parallelize(objs.toSeq, parallelism)) } finally { din.close() } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 1ca526274266..89b33945dfb0 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -265,14 +265,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return an RDD created by piping elements to a forked external process. */ def pipe(command: JList[String]): JavaRDD[String] = { - rdd.pipe(command.asScala) + rdd.pipe(command.asScala.toSeq) } /** * Return an RDD created by piping elements to a forked external process. */ def pipe(command: JList[String], env: JMap[String, String]): JavaRDD[String] = { - rdd.pipe(command.asScala, env.asScala) + rdd.pipe(command.asScala.toSeq, env.asScala) } /** @@ -282,7 +282,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { env: JMap[String, String], separateWorkingDir: Boolean, bufferSize: Int): JavaRDD[String] = { - rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize) + rdd.pipe(command.asScala.toSeq, env.asScala, null, null, separateWorkingDir, bufferSize) } /** @@ -293,7 +293,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { separateWorkingDir: Boolean, bufferSize: Int, encoding: String): JavaRDD[String] = { - rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize, encoding) + rdd.pipe(command.asScala.toSeq, env.asScala, null, null, separateWorkingDir, bufferSize, + encoding) } /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 149def29b8fb..347f59fe8f77 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -133,7 +133,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable { /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { implicit val ctag: ClassTag[T] = fakeClassTag - sc.parallelize(list.asScala, numSlices) + sc.parallelize(list.asScala.toSeq, numSlices) } /** Get an RDD that has no partitions or elements. */ @@ -152,7 +152,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable { : JavaPairRDD[K, V] = { implicit val ctagK: ClassTag[K] = fakeClassTag implicit val ctagV: ClassTag[V] = fakeClassTag - JavaPairRDD.fromRDD(sc.parallelize(list.asScala, numSlices)) + JavaPairRDD.fromRDD(sc.parallelize(list.asScala.toSeq, numSlices)) } /** Distribute a local Scala collection to form an RDD. */ @@ -161,7 +161,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable { /** Distribute a local Scala collection to form an RDD. */ def parallelizeDoubles(list: java.util.List[java.lang.Double], numSlices: Int): JavaDoubleRDD = - JavaDoubleRDD.fromRDD(sc.parallelize(list.asScala.map(_.doubleValue()), numSlices)) + JavaDoubleRDD.fromRDD(sc.parallelize(list.asScala.map(_.doubleValue()).toSeq, numSlices)) /** Distribute a local Scala collection to form an RDD. */ def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD = diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 726cff6703dc..86a1ac31c084 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -163,7 +163,7 @@ private[spark] object PythonRDD extends Logging { type ByteArray = Array[Byte] type UnrolledPartition = Array[ByteArray] val allPartitions: Array[UnrolledPartition] = - sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala) + sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala.toSeq) val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*) serveIterator(flattenedPartition.iterator, s"serve RDD ${rdd.id} with partitions ${partitions.asScala.mkString(",")}") diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 490b48719b6b..527d0d6d3a48 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -54,7 +54,7 @@ private[spark] object PythonUtils { * Convert list of T into seq of T (for calling API with varargs) */ def toSeq[T](vs: JList[T]): Seq[T] = { - vs.asScala + vs.asScala.toSeq } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index 6ff68b694f8f..ab389f99b11a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -205,7 +205,7 @@ private object FaultToleranceTest extends App with Logging { private def addWorkers(num: Int): Unit = { logInfo(s">>>>> ADD WORKERS $num <<<<<") - val masterUrls = getMasterUrls(masters) + val masterUrls = getMasterUrls(masters.toSeq) (1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) } } @@ -216,7 +216,7 @@ private object FaultToleranceTest extends App with Logging { // Counter-hack: Because of a hack in SparkEnv#create() that changes this // property, we need to reset it. System.setProperty(config.DRIVER_PORT.key, "0") - sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome) + sc = new SparkContext(getMasterUrls(masters.toSeq), "fault-tolerance", containerSparkHome) } private def getMasterUrls(masters: Seq[TestMasterInfo]): String = { @@ -279,7 +279,7 @@ private object FaultToleranceTest extends App with Logging { var liveWorkerIPs: Seq[String] = List() def stateValid(): Boolean = { - (workers.map(_.ip) -- liveWorkerIPs).isEmpty && + workers.map(_.ip).forall(liveWorkerIPs.contains) && numAlive == 1 && numStandby == masters.size - 1 && numLiveApps >= 1 } diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index 574ce60b19b4..7ad92da4e055 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -69,7 +69,7 @@ object PythonRunner { pathElements ++= formattedPyFiles pathElements += PythonUtils.sparkPythonPath pathElements += sys.env.getOrElse("PYTHONPATH", "") - val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*) + val pythonPath = PythonUtils.mergePythonPaths(pathElements.toSeq: _*) // Launch Python process val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 1271a3dbfc3f..6d38a1d28146 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -820,7 +820,7 @@ private[spark] class SparkSubmit extends Logging { } sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toSeq) - (childArgs, childClasspath, sparkConf, childMainClass) + (childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass) } private def renameResourcesToLocalFS(resources: String, localResources: String): String = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala index 8eae445b439d..ded816b992db 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -52,7 +52,7 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer override def read[T: ClassTag](prefix: String): Seq[T] = { zk.getChildren.forPath(workingDir).asScala - .filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T]) + .filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T]).toSeq } override def close(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 1648ba516d9b..89ef051e5854 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -422,7 +422,7 @@ private[spark] object RestSubmissionClient { private[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = { env.filterKeys { k => (k.startsWith("SPARK_") && !BLACKLISTED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_") - } + }.toMap } private[spark] def supportsRestClient(master: String): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index f7423f1fc3f1..8240bd6d2f43 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -61,7 +61,7 @@ object CommandUtils extends Logging { // SPARK-698: do not call the run.cmd script, as process.destroy() // fails to kill a process tree on Windows val cmd = new WorkerCommandBuilder(sparkHome, memory, command).buildCommand() - cmd.asScala ++ Seq(command.mainClass) ++ command.arguments + (cmd.asScala ++ Seq(command.mainClass) ++ command.arguments).toSeq } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 53ec7b3a88f3..4f9c497fc3d7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -201,7 +201,7 @@ private[deploy] class DriverRunner( CommandUtils.redirectStream(process.getInputStream, stdout) val stderr = new File(baseDir, "stderr") - val redactedCommand = Utils.redactCommandLineArgs(conf, builder.command.asScala) + val redactedCommand = Utils.redactCommandLineArgs(conf, builder.command.asScala.toSeq) .mkString("\"", "\" \"", "\"") val header = "Launch Command: %s\n%s\n\n".format(redactedCommand, "=" * 40) Files.append(header, stderr, StandardCharsets.UTF_8) @@ -262,6 +262,6 @@ private[deploy] trait ProcessBuilderLike { private[deploy] object ProcessBuilderLike { def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new ProcessBuilderLike { override def start(): Process = processBuilder.start() - override def command: Seq[String] = processBuilder.command().asScala + override def command: Seq[String] = processBuilder.command().asScala.toSeq } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 2a5528bbe89c..e4fcae13a2f8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -158,7 +158,7 @@ private[deploy] class ExecutorRunner( val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf), memory, sparkHome.getAbsolutePath, substituteVariables) val command = builder.command() - val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala) + val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala.toSeq) .mkString("\"", "\" \"", "\"") logInfo(s"Launch command: $redactedCommand") diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 6625457749f6..e072d7919450 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -140,13 +140,13 @@ private[spark] class CoarseGrainedExecutorBackend( def extractLogUrls: Map[String, String] = { val prefix = "SPARK_LOG_URL_" sys.env.filterKeys(_.startsWith(prefix)) - .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)) + .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)).toMap } def extractAttributes: Map[String, String] = { val prefix = "SPARK_EXECUTOR_ATTRIBUTE_" sys.env.filterKeys(_.startsWith(prefix)) - .map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2)) + .map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2)).toMap } override def receive: PartialFunction[Any, Unit] = { @@ -304,8 +304,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env, - arguments.resourcesFileOpt, resourceProfile) + arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq, + env, arguments.resourcesFileOpt, resourceProfile) } run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) System.exit(0) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index c8b1afeebac0..bc0f0c0a7b70 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -606,7 +606,8 @@ private[spark] class Executor( // Here and below, put task metric peaks in a WrappedArray to expose them as a Seq // without requiring a copy. val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId)) - val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates, accums, metricPeaks)) + val serializedTK = ser.serialize( + TaskKilled(t.reason, accUpdates, accums, metricPeaks.toSeq)) execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) case _: InterruptedException | NonFatal(_) if @@ -616,7 +617,8 @@ private[spark] class Executor( val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId)) - val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums, metricPeaks)) + val serializedTK = ser.serialize( + TaskKilled(killReason, accUpdates, accums, metricPeaks.toSeq)) execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) => @@ -661,13 +663,13 @@ private[spark] class Executor( val serializedTaskEndReason = { try { val ef = new ExceptionFailure(t, accUpdates).withAccums(accums) - .withMetricPeaks(metricPeaks) + .withMetricPeaks(metricPeaks.toSeq) ser.serialize(ef) } catch { case _: NotSerializableException => // t is not serializable so just send the stacktrace val ef = new ExceptionFailure(t, accUpdates, false).withAccums(accums) - .withMetricPeaks(metricPeaks) + .withMetricPeaks(metricPeaks.toSeq) ser.serialize(ef) } } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 1470a23884bb..43742a4d46cb 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -123,7 +123,7 @@ class TaskMetrics private[spark] () extends Serializable { def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = { // This is called on driver. All accumulator updates have a fixed value. So it's safe to use // `asScala` which accesses the internal values using `java.util.Iterator`. - _updatedBlockStatuses.value.asScala + _updatedBlockStatuses.value.asScala.toSeq } // Setters and increment-ers @@ -199,7 +199,7 @@ class TaskMetrics private[spark] () extends Serializable { */ private[spark] def mergeShuffleReadMetrics(): Unit = synchronized { if (tempShuffleReadMetrics.nonEmpty) { - shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics) + shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics.toSeq) } } diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 57dcbe501c6d..48f816f649d3 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -156,7 +156,7 @@ private[spark] class MetricsSystem private ( } def getSourcesByName(sourceName: String): Seq[Source] = - sources.filter(_.sourceName == sourceName) + sources.filter(_.sourceName == sourceName).toSeq def registerSource(source: Source): Unit = { sources += source diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 1e39e1085687..f280c220a2c8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -934,7 +934,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) for (pair <- it if pair._1 == key) { buf += pair._2 } - buf + buf.toSeq } : Seq[V] val res = self.context.runJob(self, process, Array(index)) res(0) diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index 9f8019b80a4d..9d136f85c980 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -135,10 +135,10 @@ private object ParallelCollectionRDD { new Range.Inclusive(r.start + start * r.step, r.end, r.step) } else { - new Range(r.start + start * r.step, r.start + end * r.step, r.step) + new Range.Exclusive(r.start + start * r.step, r.start + end * r.step, r.step) } }.toSeq.asInstanceOf[Seq[Seq[T]]] - case nr: NumericRange[_] => + case nr: NumericRange[T] => // For ranges of Long, Double, BigInteger, etc val slices = new ArrayBuffer[Seq[T]](numSlices) var r = nr @@ -147,7 +147,7 @@ private object ParallelCollectionRDD { slices += r.take(sliceSize).asInstanceOf[Seq[T]] r = r.drop(sliceSize) } - slices + slices.toSeq case _ => val array = seq.toArray // To prevent O(n^2) operations for List etc positions(array.length, numSlices).map { case (start, end) => diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 3b11e82dab19..5dd8cb8440be 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -238,7 +238,7 @@ private object PipedRDD { while(tok.hasMoreElements) { buf += tok.nextToken() } - buf + buf.toSeq } val STDIN_WRITER_THREAD_PREFIX = "stdin writer for" diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index 63fa3c2487c3..0a9302344370 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -98,7 +98,7 @@ class UnionRDD[T: ClassTag]( deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length) pos += rdd.partitions.length } - deps + deps.toSeq } override def compute(s: Partition, context: TaskContext): Iterator[T] = { diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index 1dbdc3d81e44..f56ea69f6cec 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -319,12 +319,13 @@ object ResourceProfile extends Logging { private[spark] def getCustomTaskResources( rp: ResourceProfile): Map[String, TaskResourceRequest] = { - rp.taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)) + rp.taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap } private[spark] def getCustomExecutorResources( rp: ResourceProfile): Map[String, ExecutorResourceRequest] = { - rp.executorResources.filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k)) + rp.executorResources. + filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k)).toMap } /* diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 95b0096cade3..f13f1eaeeaa4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -232,7 +232,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) { // For testing only. private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag](): Seq[T] = { - queues.asScala.flatMap { queue => queue.findListenersByClass[T]() } + queues.asScala.flatMap { queue => queue.findListenersByClass[T]() }.toSeq } // For testing only. diff --git a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala index bc1431835e25..6112d8ef051e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala @@ -69,7 +69,7 @@ object SplitInfo { for (host <- mapredSplit.getLocations) { retval += new SplitInfo(inputFormatClazz, host, path, length, mapredSplit) } - retval + retval.toSeq } def toSplitInfo(inputFormatClazz: Class[_], path: String, @@ -79,6 +79,6 @@ object SplitInfo { for (host <- mapreduceSplit.getLocations) { retval += new SplitInfo(inputFormatClazz, host, path, length, mapreduceSplit) } - retval + retval.toSeq } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala index ca48775e77f2..be881481bf4f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala @@ -47,19 +47,19 @@ class StatsReportListener extends SparkListener with Logging { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { implicit val sc = stageCompleted this.logInfo(s"Finished stage: ${getStatusDetail(stageCompleted.stageInfo)}") - showMillisDistribution("task runtime:", (info, _) => info.duration, taskInfoMetrics) + showMillisDistribution("task runtime:", (info, _) => info.duration, taskInfoMetrics.toSeq) // Shuffle write showBytesDistribution("shuffle bytes written:", - (_, metric) => metric.shuffleWriteMetrics.bytesWritten, taskInfoMetrics) + (_, metric) => metric.shuffleWriteMetrics.bytesWritten, taskInfoMetrics.toSeq) // Fetch & I/O showMillisDistribution("fetch wait time:", - (_, metric) => metric.shuffleReadMetrics.fetchWaitTime, taskInfoMetrics) + (_, metric) => metric.shuffleReadMetrics.fetchWaitTime, taskInfoMetrics.toSeq) showBytesDistribution("remote bytes read:", - (_, metric) => metric.shuffleReadMetrics.remoteBytesRead, taskInfoMetrics) + (_, metric) => metric.shuffleReadMetrics.remoteBytesRead, taskInfoMetrics.toSeq) showBytesDistribution("task result size:", - (_, metric) => metric.resultSize, taskInfoMetrics) + (_, metric) => metric.resultSize, taskInfoMetrics.toSeq) // Runtime breakdown val runtimePcts = taskInfoMetrics.map { case (info, metrics) => diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index b6df216d537e..11d969e1aba9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -71,7 +71,7 @@ private[spark] class DirectTaskResult[T]( for (i <- 0 until numUpdates) { _accumUpdates += in.readObject.asInstanceOf[AccumulatorV2[_, _]] } - accumUpdates = _accumUpdates + accumUpdates = _accumUpdates.toSeq } val numMetrics = in.readInt diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2c37fec27176..45cb5e534220 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -137,7 +137,7 @@ private[spark] class TaskSchedulerImpl( private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] def runningTasksByExecutors: Map[String, Int] = synchronized { - executorIdToRunningTaskIds.toMap.mapValues(_.size) + executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap } // The set of executors we have on each host; this is used to compute hostsAlive, which @@ -719,7 +719,7 @@ private[spark] class TaskSchedulerImpl( if (tasks.nonEmpty) { hasLaunchedTask = true } - return tasks + return tasks.map(_.toSeq) } private def createUnschedulableTaskSetAbortTimer( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 219a0e799cc7..95d901f29297 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -367,7 +367,7 @@ private[storage] class BlockInfoManager extends Logging { notifyAll() - blocksWithReleasedLocks + blocksWithReleasedLocks.toSeq } /** Returns the number of locks held by the given task. Used only for testing. */ diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index f2113947f6bf..57dc7ea63e94 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -97,7 +97,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea } }.filter(_ != null).flatMap { dir => val files = dir.listFiles() - if (files != null) files else Seq.empty + if (files != null) files.toSeq else Seq.empty } } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 5efbc0703f72..a2843da0561e 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -368,25 +368,25 @@ final class ShuffleBlockFetcherIterator( collectedRemoteRequests: ArrayBuffer[FetchRequest]): Unit = { val iterator = blockInfos.iterator var curRequestSize = 0L - var curBlocks = new ArrayBuffer[FetchBlockInfo] + var curBlocks = Seq.empty[FetchBlockInfo] while (iterator.hasNext) { val (blockId, size, mapIndex) = iterator.next() assertPositiveBlockSize(blockId, size) - curBlocks += FetchBlockInfo(blockId, size, mapIndex) + curBlocks = curBlocks ++ Seq(FetchBlockInfo(blockId, size, mapIndex)) curRequestSize += size // For batch fetch, the actual block in flight should count for merged block. val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) { curBlocks = createFetchRequests(curBlocks, address, isLast = false, - collectedRemoteRequests).to[ArrayBuffer] + collectedRemoteRequests) curRequestSize = curBlocks.map(_.size).sum } } // Add in the final request if (curBlocks.nonEmpty) { curBlocks = createFetchRequests(curBlocks, address, isLast = true, - collectedRemoteRequests).to[ArrayBuffer] + collectedRemoteRequests) curRequestSize = curBlocks.map(_.size).sum } } @@ -928,7 +928,7 @@ object ShuffleBlockFetcherIterator { } else { blocks } - result + result.toSeq } /** diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 087a22d6c614..a070cc9c7b39 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -443,7 +443,7 @@ private[spark] object UIUtils extends Logging { case None => {getHeaderContent(x._1)} } - } + }.toSeq } {headerRow} diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 9faa3dcf2cdf..a4e87704927c 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -58,11 +58,11 @@ private[spark] abstract class WebUI( private val className = Utils.getFormattedClassName(this) def getBasePath: String = basePath - def getTabs: Seq[WebUITab] = tabs - def getHandlers: Seq[ServletContextHandler] = handlers + def getTabs: Seq[WebUITab] = tabs.toSeq + def getHandlers: Seq[ServletContextHandler] = handlers.toSeq def getDelegatingHandlers: Seq[DelegatingServletContextHandler] = { - handlers.map(new DelegatingServletContextHandler(_)) + handlers.map(new DelegatingServletContextHandler(_)).toSeq } /** Attaches a tab to this UI, along with all of its attached pages. */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 066512d159d0..4e76ea289ede 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -259,11 +259,11 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We } val activeJobsTable = - jobsTable(request, "active", "activeJob", activeJobs, killEnabled = parent.killEnabled) + jobsTable(request, "active", "activeJob", activeJobs.toSeq, killEnabled = parent.killEnabled) val completedJobsTable = - jobsTable(request, "completed", "completedJob", completedJobs, killEnabled = false) + jobsTable(request, "completed", "completedJob", completedJobs.toSeq, killEnabled = false) val failedJobsTable = - jobsTable(request, "failed", "failedJob", failedJobs, killEnabled = false) + jobsTable(request, "failed", "failedJob", failedJobs.toSeq, killEnabled = false) val shouldShowActiveJobs = activeJobs.nonEmpty val shouldShowCompletedJobs = completedJobs.nonEmpty @@ -330,7 +330,7 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We var content = summary - content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs, + content ++= makeTimeline((activeJobs ++ completedJobs ++ failedJobs).toSeq, store.executorList(false), startTime) if (shouldShowActiveJobs) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 542dc39eee4f..bba5e3dda6c4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -288,20 +288,20 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP } val activeStagesTable = - new StageTableBase(store, request, activeStages, "active", "activeStage", parent.basePath, - basePath, parent.isFairScheduler, + new StageTableBase(store, request, activeStages.toSeq, "active", "activeStage", + parent.basePath, basePath, parent.isFairScheduler, killEnabled = parent.killEnabled, isFailedStage = false) val pendingOrSkippedStagesTable = - new StageTableBase(store, request, pendingOrSkippedStages, pendingOrSkippedTableId, + new StageTableBase(store, request, pendingOrSkippedStages.toSeq, pendingOrSkippedTableId, "pendingStage", parent.basePath, basePath, parent.isFairScheduler, killEnabled = false, isFailedStage = false) val completedStagesTable = - new StageTableBase(store, request, completedStages, "completed", "completedStage", + new StageTableBase(store, request, completedStages.toSeq, "completed", "completedStage", parent.basePath, basePath, parent.isFairScheduler, killEnabled = false, isFailedStage = false) val failedStagesTable = - new StageTableBase(store, request, failedStages, "failed", "failedStage", parent.basePath, - basePath, parent.isFairScheduler, + new StageTableBase(store, request, failedStages.toSeq, "failed", "failedStage", + parent.basePath, basePath, parent.isFairScheduler, killEnabled = false, isFailedStage = true) val shouldShowActiveStages = activeStages.nonEmpty @@ -391,7 +391,7 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP var content = summary val appStartTime = store.applicationInfo().attempts.head.startTime.getTime() - content ++= makeTimeline(activeStages ++ completedStages ++ failedStages, + content ++= makeTimeline((activeStages ++ completedStages ++ failedStages).toSeq, store.executorList(false), appStartTime) val operationGraphContent = store.asOption(store.operationGraphForJob(jobId)) match { diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index 842ee7aaf49b..f8d9279c2404 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -81,11 +81,11 @@ private[spark] class RDDOperationCluster( /** Return all the nodes which are cached. */ def getCachedNodes: Seq[RDDOperationNode] = { - _childNodes.filter(_.cached) ++ _childClusters.flatMap(_.getCachedNodes) + (_childNodes.filter(_.cached) ++ _childClusters.flatMap(_.getCachedNodes)).toSeq } def getBarrierClusters: Seq[RDDOperationCluster] = { - _childClusters.filter(_.barrier) ++ _childClusters.flatMap(_.getBarrierClusters) + (_childClusters.filter(_.barrier) ++ _childClusters.flatMap(_.getBarrierClusters)).toSeq } def canEqual(other: Any): Boolean = other.isInstanceOf[RDDOperationCluster] @@ -210,7 +210,7 @@ private[spark] object RDDOperationGraph extends Logging { } } - RDDOperationGraph(internalEdges, outgoingEdges, incomingEdges, rootCluster) + RDDOperationGraph(internalEdges.toSeq, outgoingEdges.toSeq, incomingEdges.toSeq, rootCluster) } /** diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index ced3f9d15720..4d8277fc9f26 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -1210,7 +1210,8 @@ private[spark] object JsonProtocol { case Some(id) => id.extract[Int] case None => ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID } - new ExecutorInfo(executorHost, totalCores, logUrls, attributes, resources, resourceProfileId) + new ExecutorInfo(executorHost, totalCores, logUrls, attributes.toMap, resources.toMap, + resourceProfileId) } def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9636fe88c77c..4e3de5bbe560 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1716,7 +1716,7 @@ private[spark] object Utils extends Logging { if (inWord || inDoubleQuote || inSingleQuote) { endWord() } - buf + buf.toSeq } /* Calculates 'x' modulo 'mod', takes to consideration sign of x, diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index cc97bbfa7201..dc39170ecf38 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -659,7 +659,7 @@ private[spark] class ExternalSorter[K, V, C]( } } else { // Merge spilled and in-memory data - merge(spills, destructiveIterator( + merge(spills.toSeq, destructiveIterator( collection.partitionedDestructiveSortedIterator(comparator))) } } diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index a75cf3f0381d..d701cb65460a 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -157,7 +157,7 @@ private class SaveInfoListener extends SparkListener { def getCompletedStageInfos: Seq[StageInfo] = completedStageInfos.toArray.toSeq def getCompletedTaskInfos: Seq[TaskInfo] = completedTaskInfos.values.flatten.toSeq def getCompletedTaskInfos(stageId: StageId, stageAttemptId: StageAttemptId): Seq[TaskInfo] = - completedTaskInfos.getOrElse((stageId, stageAttemptId), Seq.empty[TaskInfo]) + completedTaskInfos.getOrElse((stageId, stageAttemptId), Seq.empty[TaskInfo]).toSeq /** * If `jobCompletionCallback` is set, block until the next call has finished. diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 92ed24408384..7a95ea0fa321 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -291,14 +291,14 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { val shuffleIds = 0 until sc.newShuffleId val broadcastIds = broadcastBuffer.map(_.id) - val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) + val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds.toSeq) runGC() intercept[Exception] { preGCTester.assertCleanup()(timeout(1.second)) } // Test that GC triggers the cleanup of all variables after the dereferencing them - val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) + val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds.toSeq) broadcastBuffer.clear() rddBuffer.clear() runGC() @@ -331,14 +331,14 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { val shuffleIds = 0 until sc.newShuffleId val broadcastIds = broadcastBuffer.map(_.id) - val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) + val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds.toSeq) runGC() intercept[Exception] { preGCTester.assertCleanup()(timeout(1.second)) } // Test that GC triggers the cleanup of all variables after the dereferencing them - val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) + val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds.toSeq) broadcastBuffer.clear() rddBuffer.clear() runGC() diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 312691302b06..f7bcc4fc8d35 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -261,7 +261,7 @@ class HeartbeatReceiverSuite // We may receive undesired SparkListenerExecutorAdded from LocalSchedulerBackend, // so exclude it from the map. See SPARK-10800. heartbeatReceiver.invokePrivate(_executorLastSeen()). - filterKeys(_ != SparkContext.DRIVER_IDENTIFIER) + filterKeys(_ != SparkContext.DRIVER_IDENTIFIER).toMap } } @@ -286,6 +286,8 @@ private class FakeSchedulerBackend( clusterManagerEndpoint: RpcEndpointRef, resourceProfileManager: ResourceProfileManager) extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + def this() = this(null, null, null, null) protected override def doRequestTotalExecutors( resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = { diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index 5399d868f46f..f2b81e5153ae 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -220,7 +220,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { super.registerAccumulatorForCleanup(a) } - def accumsRegisteredForCleanup: Seq[Long] = accumsRegistered.toArray + def accumsRegisteredForCleanup: Seq[Long] = accumsRegistered.toSeq } } diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 9e39271bdf9e..3d6690cb8534 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -182,7 +182,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2) val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2) val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2)) - .map(p => (p._1, p._2.map(_.toArray))) + .map(p => (p._1, p._2.map(_.toSeq))) .collectAsMap() assert(results(1)(0).length === 3) diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala index 42b8cde65039..b986be03e965 100644 --- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala @@ -317,7 +317,7 @@ private[deploy] object IvyTestUtils { val rFiles = createRFiles(root, className, artifact.groupId) allFiles.append(rFiles: _*) } - val jarFile = packJar(jarPath, artifact, allFiles, useIvyLayout, withR) + val jarFile = packJar(jarPath, artifact, allFiles.toSeq, useIvyLayout, withR) assert(jarFile.exists(), "Problem creating Jar file") val descriptor = createDescriptor(tempPath, artifact, dependencies, useIvyLayout) assert(descriptor.exists(), "Problem creating Pom file") diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 18154d861a73..c8a3149c07d7 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -657,7 +657,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { test("top with predefined ordering") { val nums = Array.range(1, 100000) - val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2) + val ints = sc.makeRDD(scala.util.Random.shuffle(nums).toSeq, 2) val topK = ints.top(5) assert(topK.size === 5) assert(topK === nums.reverse.take(5)) @@ -1098,7 +1098,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { override def getPartitions: Array[Partition] = Array(new Partition { override def index: Int = 0 }) - override def getDependencies: Seq[Dependency[_]] = mutableDependencies + override def getDependencies: Seq[Dependency[_]] = mutableDependencies.toSeq def addDependency(dep: Dependency[_]): Unit = { mutableDependencies += dep } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 7c23e4449f46..915035e9eb71 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -325,7 +325,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit 8000L, 5000L, 7000L, 4000L, 6000L, 3000L, 10L, 90L, 2L, 20L) def max(a: Array[Long], b: Array[Long]): Array[Long] = - (a, b).zipped.map(Math.max) + (a, b).zipped.map(Math.max).toArray // calculated metric peaks per stage per executor // metrics sent during stage 0 for each executor diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala index 388d4e25a06c..e392ff53e02c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala @@ -26,7 +26,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { test("Track Executor Resource information") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 1) + val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1) assert(info.availableAddrs.sorted sameElements Seq("0", "1", "2", "3")) assert(info.assignedAddrs.isEmpty) @@ -43,7 +43,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { test("Don't allow acquire address that is not available") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 1) + val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1) // Acquire some addresses. info.acquire(Seq("0", "1")) assert(!info.availableAddrs.contains("1")) @@ -56,7 +56,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { test("Don't allow acquire address that doesn't exist") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 1) + val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1) assert(!info.availableAddrs.contains("4")) // Acquire an address that doesn't exist val e = intercept[SparkException] { @@ -67,7 +67,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { test("Don't allow release address that is not assigned") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 1) + val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1) // Acquire addresses info.acquire(Array("0", "1")) assert(!info.assignedAddrs.contains("2")) @@ -80,7 +80,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { test("Don't allow release address that doesn't exist") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 1) + val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1) assert(!info.assignedAddrs.contains("4")) // Release an address that doesn't exist val e = intercept[SparkException] { @@ -93,7 +93,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { val slotSeq = Seq(10, 9, 8, 7, 6, 5, 4, 3, 2, 1) val addresses = ArrayBuffer("0", "1", "2", "3") slotSeq.foreach { slots => - val info = new ExecutorResourceInfo(GPU, addresses, slots) + val info = new ExecutorResourceInfo(GPU, addresses.toSeq, slots) for (_ <- 0 until slots) { addresses.foreach(addr => info.acquire(Seq(addr))) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index d4e8d63b54e5..270b2c606ad0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -621,7 +621,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } override def onStageCompleted(stage: SparkListenerStageCompleted): Unit = { - stageInfos(stage.stageInfo) = taskInfoMetrics + stageInfos(stage.stageInfo) = taskInfoMetrics.toSeq taskInfoMetrics = mutable.Buffer.empty } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index 2efe6da5e986..ea44a2d948ca 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -103,7 +103,7 @@ private class MyTaskResultGetter(env: SparkEnv, scheduler: TaskSchedulerImpl) // DirectTaskResults that we receive from the executors private val _taskResults = new ArrayBuffer[DirectTaskResult[_]] - def taskResults: Seq[DirectTaskResult[_]] = _taskResults + def taskResults: Seq[DirectTaskResult[_]] = _taskResults.toSeq override def enqueueSuccessfulTask(tsm: TaskSetManager, tid: Long, data: ByteBuffer): Unit = { // work on a copy since the super class still needs to use the buffer diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index e4aad58d2506..759e68219c2d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1670,7 +1670,7 @@ class TaskSetManagerSuite for (i <- 0 to 99) { locations += Seq(TaskLocation("host" + i)) } - val taskSet = FakeTask.createTaskSet(100, locations: _*) + val taskSet = FakeTask.createTaskSet(100, locations.toSeq: _*) val clock = new ManualClock // make sure we only do one rack resolution call, for the entire batch of hosts, as this // can be expensive. The FakeTaskScheduler calls rack resolution more than the real one diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 43917a5b83bb..bf1379ceb89a 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -1047,7 +1047,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer() ) - val transfer = createMockTransfer(blocks.mapValues(_ => createMockManagedBuffer(0))) + val transfer = createMockTransfer(blocks.mapValues(_ => createMockManagedBuffer(0)).toMap) val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])]( (remoteBmId, blocks.keys.map(blockId => (blockId, 1L, 0)).toSeq)) From 11f319837ae2e97701020be01757b9f9a32f94db Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 1 Jul 2020 13:02:28 -0500 Subject: [PATCH 2/7] Whitespace --- .../test/scala/org/apache/spark/HeartbeatReceiverSuite.scala | 2 +- .../apache/spark/sql/execution/streaming/StreamProgress.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index f7bcc4fc8d35..a2e70b23a3e5 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -286,7 +286,7 @@ private class FakeSchedulerBackend( clusterManagerEndpoint: RpcEndpointRef, resourceProfileManager: ResourceProfileManager) extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { - + def this() = this(null, null, null, null) protected override def doRequestTotalExecutors( diff --git a/sql/core/src/main/scala-2.13/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala-2.13/org/apache/spark/sql/execution/streaming/StreamProgress.scala index 0aa29640899c..6aa1b46cbb94 100644 --- a/sql/core/src/main/scala-2.13/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala-2.13/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, SparkD */ class StreamProgress( val baseMap: immutable.Map[SparkDataStream, OffsetV2] = - new immutable.HashMap[SparkDataStream, OffsetV2]) + new immutable.HashMap[SparkDataStream, OffsetV2]) extends scala.collection.immutable.Map[SparkDataStream, OffsetV2] { // Note: this class supports Scala 2.13. A parallel source tree has a 2.12 implementation. From c261d4b338978da51a90ccf6fee216412a625ba2 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 1 Jul 2020 14:20:41 -0500 Subject: [PATCH 3/7] Fix Range.Exclusive --- .../main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index 9d136f85c980..24311382c696 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -135,7 +135,7 @@ private object ParallelCollectionRDD { new Range.Inclusive(r.start + start * r.step, r.end, r.step) } else { - new Range.Exclusive(r.start + start * r.step, r.start + end * r.step, r.step) + new Range.Inclusive(r.start + start * r.step, r.start + end * r.step - 1, r.step) } }.toSeq.asInstanceOf[Seq[Seq[T]]] case nr: NumericRange[T] => From b0a6be9291387eadbd12f1cb4e7f21b68a6eae32 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 1 Jul 2020 18:06:10 -0500 Subject: [PATCH 4/7] Fix MapOutputTracker --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index f981bcbea020..32251df6f4bb 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -972,6 +972,6 @@ private[spark] object MapOutputTracker extends Logging { } } - splitsByAddress.view.mapValues(_.toSeq).iterator + splitsByAddress.mapValues(_.toSeq).iterator } } From ab62b32ab15330f09aea419c6076585d252054b2 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 2 Jul 2020 10:08:11 -0500 Subject: [PATCH 5/7] Fix RDDSuite --- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index c8a3149c07d7..1f4e784723b4 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -656,8 +656,8 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { } test("top with predefined ordering") { - val nums = Array.range(1, 100000) - val ints = sc.makeRDD(scala.util.Random.shuffle(nums).toSeq, 2) + val nums = Seq.range(1, 100000) + val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2) val topK = ints.top(5) assert(topK.size === 5) assert(topK === nums.reverse.take(5)) From 89d19c60ace7fd2f74aa34e2ac8fe79c20883b41 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 6 Jul 2020 18:18:44 -0500 Subject: [PATCH 6/7] Fix parallel slice RDD --- .../scala/org/apache/spark/rdd/ParallelCollectionRDD.scala | 5 ++--- .../org/apache/spark/rdd/ParallelCollectionSplitSuite.scala | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index 24311382c696..324cba5b4de4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -133,9 +133,8 @@ private object ParallelCollectionRDD { // If the range is inclusive, use inclusive range for the last slice if (r.isInclusive && index == numSlices - 1) { new Range.Inclusive(r.start + start * r.step, r.end, r.step) - } - else { - new Range.Inclusive(r.start + start * r.step, r.start + end * r.step - 1, r.step) + } else { + new Range.Inclusive(r.start + start * r.step, r.start + (end - 1) * r.step, r.step) } }.toSeq.asInstanceOf[Seq[Seq[T]]] case nr: NumericRange[T] => diff --git a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala index 10f4bbcf7f48..879107350bb5 100644 --- a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala @@ -140,7 +140,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers { assert(slices(i).isInstanceOf[Range]) val range = slices(i).asInstanceOf[Range] assert(range.start === i * (N / 40), "slice " + i + " start") - assert(range.end === (i + 1) * (N / 40), "slice " + i + " end") + assert(range.last === (i + 1) * (N / 40) - 1, "slice " + i + " end") assert(range.step === 1, "slice " + i + " step") } } From 8f5af5f20db18ce0ea1fa02bc854f710a9a71dff Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sat, 11 Jul 2020 16:29:15 -0500 Subject: [PATCH 7/7] Update to 2.13.3 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 08ca13bfe9d3..4bee11cd7590 100644 --- a/pom.xml +++ b/pom.xml @@ -3159,7 +3159,7 @@ scala-2.13 - 2.13.1 + 2.13.3 2.13