diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index e0ac2b3e0f4b..620a6fe2f9d7 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -550,7 +550,7 @@ private[spark] class ExecutorAllocationManager( } else { // We don't want to change our target number of executors, because we already did that // when the task backlog decreased. - client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false, + client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, countFailures = false, force = false) } @@ -563,9 +563,9 @@ private[spark] class ExecutorAllocationManager( // reset the newExecutorTotal to the existing number of executors if (testing || executorsRemoved.nonEmpty) { - executorMonitor.executorsKilled(executorsRemoved) + executorMonitor.executorsKilled(executorsRemoved.toSeq) logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.") - executorsRemoved + executorsRemoved.toSeq } else { logWarning(s"Unable to reach the cluster manager to kill executor/s " + s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible to kill!") diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 18cd5de4cfad..32251df6f4bb 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -972,6 +972,6 @@ private[spark] object MapOutputTracker extends Logging { } } - splitsByAddress.iterator + splitsByAddress.mapValues(_.toSeq).iterator } } diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 40915e3904f7..dbd89d646ae5 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -173,15 +173,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria this } - /** - * Set multiple parameters together - */ - @deprecated("Use setAll(Iterable) instead", "3.0.0") - def setAll(settings: Traversable[(String, String)]): SparkConf = { - settings.foreach { case (k, v) => set(k, v) } - this - } - /** Set a parameter if it isn't already configured */ def setIfMissing(key: String, value: String): SparkConf = { if (settings.putIfAbsent(key, value) == null) { diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index 08a58a029528..db4b74bb89f0 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -111,7 +111,7 @@ private[spark] class TaskContextImpl( if (failed) return failed = true failure = error - invokeListeners(onFailureCallbacks, "TaskFailureListener", Option(error)) { + invokeListeners(onFailureCallbacks.toSeq, "TaskFailureListener", Option(error)) { _.onTaskFailure(this, error) } } @@ -120,7 +120,7 @@ private[spark] class TaskContextImpl( private[spark] override def markTaskCompleted(error: Option[Throwable]): Unit = synchronized { if (completed) return completed = true - invokeListeners(onCompleteCallbacks, "TaskCompletionListener", error) { + invokeListeners(onCompleteCallbacks.toSeq, "TaskCompletionListener", error) { _.onTaskCompletion(this) } } @@ -142,7 +142,7 @@ private[spark] class TaskContextImpl( } } if (errorMsgs.nonEmpty) { - throw new TaskCompletionListenerException(errorMsgs, error) + throw new TaskCompletionListenerException(errorMsgs.toSeq, error) } } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index e4140f659d97..15cb01a17328 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -256,7 +256,7 @@ object JavaRDD { } catch { case eof: EOFException => // No-op } - JavaRDD.fromRDD(sc.parallelize(objs, parallelism)) + JavaRDD.fromRDD(sc.parallelize(objs.toSeq, parallelism)) } finally { din.close() } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 1ca526274266..89b33945dfb0 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -265,14 +265,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return an RDD created by piping elements to a forked external process. */ def pipe(command: JList[String]): JavaRDD[String] = { - rdd.pipe(command.asScala) + rdd.pipe(command.asScala.toSeq) } /** * Return an RDD created by piping elements to a forked external process. */ def pipe(command: JList[String], env: JMap[String, String]): JavaRDD[String] = { - rdd.pipe(command.asScala, env.asScala) + rdd.pipe(command.asScala.toSeq, env.asScala) } /** @@ -282,7 +282,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { env: JMap[String, String], separateWorkingDir: Boolean, bufferSize: Int): JavaRDD[String] = { - rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize) + rdd.pipe(command.asScala.toSeq, env.asScala, null, null, separateWorkingDir, bufferSize) } /** @@ -293,7 +293,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { separateWorkingDir: Boolean, bufferSize: Int, encoding: String): JavaRDD[String] = { - rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize, encoding) + rdd.pipe(command.asScala.toSeq, env.asScala, null, null, separateWorkingDir, bufferSize, + encoding) } /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 149def29b8fb..347f59fe8f77 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -133,7 +133,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable { /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { implicit val ctag: ClassTag[T] = fakeClassTag - sc.parallelize(list.asScala, numSlices) + sc.parallelize(list.asScala.toSeq, numSlices) } /** Get an RDD that has no partitions or elements. */ @@ -152,7 +152,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable { : JavaPairRDD[K, V] = { implicit val ctagK: ClassTag[K] = fakeClassTag implicit val ctagV: ClassTag[V] = fakeClassTag - JavaPairRDD.fromRDD(sc.parallelize(list.asScala, numSlices)) + JavaPairRDD.fromRDD(sc.parallelize(list.asScala.toSeq, numSlices)) } /** Distribute a local Scala collection to form an RDD. */ @@ -161,7 +161,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable { /** Distribute a local Scala collection to form an RDD. */ def parallelizeDoubles(list: java.util.List[java.lang.Double], numSlices: Int): JavaDoubleRDD = - JavaDoubleRDD.fromRDD(sc.parallelize(list.asScala.map(_.doubleValue()), numSlices)) + JavaDoubleRDD.fromRDD(sc.parallelize(list.asScala.map(_.doubleValue()).toSeq, numSlices)) /** Distribute a local Scala collection to form an RDD. */ def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD = diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 726cff6703dc..86a1ac31c084 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -163,7 +163,7 @@ private[spark] object PythonRDD extends Logging { type ByteArray = Array[Byte] type UnrolledPartition = Array[ByteArray] val allPartitions: Array[UnrolledPartition] = - sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala) + sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala.toSeq) val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*) serveIterator(flattenedPartition.iterator, s"serve RDD ${rdd.id} with partitions ${partitions.asScala.mkString(",")}") diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 490b48719b6b..527d0d6d3a48 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -54,7 +54,7 @@ private[spark] object PythonUtils { * Convert list of T into seq of T (for calling API with varargs) */ def toSeq[T](vs: JList[T]): Seq[T] = { - vs.asScala + vs.asScala.toSeq } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index 6ff68b694f8f..ab389f99b11a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -205,7 +205,7 @@ private object FaultToleranceTest extends App with Logging { private def addWorkers(num: Int): Unit = { logInfo(s">>>>> ADD WORKERS $num <<<<<") - val masterUrls = getMasterUrls(masters) + val masterUrls = getMasterUrls(masters.toSeq) (1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) } } @@ -216,7 +216,7 @@ private object FaultToleranceTest extends App with Logging { // Counter-hack: Because of a hack in SparkEnv#create() that changes this // property, we need to reset it. System.setProperty(config.DRIVER_PORT.key, "0") - sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome) + sc = new SparkContext(getMasterUrls(masters.toSeq), "fault-tolerance", containerSparkHome) } private def getMasterUrls(masters: Seq[TestMasterInfo]): String = { @@ -279,7 +279,7 @@ private object FaultToleranceTest extends App with Logging { var liveWorkerIPs: Seq[String] = List() def stateValid(): Boolean = { - (workers.map(_.ip) -- liveWorkerIPs).isEmpty && + workers.map(_.ip).forall(liveWorkerIPs.contains) && numAlive == 1 && numStandby == masters.size - 1 && numLiveApps >= 1 } diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index 574ce60b19b4..7ad92da4e055 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -69,7 +69,7 @@ object PythonRunner { pathElements ++= formattedPyFiles pathElements += PythonUtils.sparkPythonPath pathElements += sys.env.getOrElse("PYTHONPATH", "") - val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*) + val pythonPath = PythonUtils.mergePythonPaths(pathElements.toSeq: _*) // Launch Python process val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 1271a3dbfc3f..6d38a1d28146 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -820,7 +820,7 @@ private[spark] class SparkSubmit extends Logging { } sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toSeq) - (childArgs, childClasspath, sparkConf, childMainClass) + (childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass) } private def renameResourcesToLocalFS(resources: String, localResources: String): String = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala index 8eae445b439d..ded816b992db 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -52,7 +52,7 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer override def read[T: ClassTag](prefix: String): Seq[T] = { zk.getChildren.forPath(workingDir).asScala - .filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T]) + .filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T]).toSeq } override def close(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 1648ba516d9b..89ef051e5854 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -422,7 +422,7 @@ private[spark] object RestSubmissionClient { private[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = { env.filterKeys { k => (k.startsWith("SPARK_") && !BLACKLISTED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_") - } + }.toMap } private[spark] def supportsRestClient(master: String): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index f7423f1fc3f1..8240bd6d2f43 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -61,7 +61,7 @@ object CommandUtils extends Logging { // SPARK-698: do not call the run.cmd script, as process.destroy() // fails to kill a process tree on Windows val cmd = new WorkerCommandBuilder(sparkHome, memory, command).buildCommand() - cmd.asScala ++ Seq(command.mainClass) ++ command.arguments + (cmd.asScala ++ Seq(command.mainClass) ++ command.arguments).toSeq } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 53ec7b3a88f3..4f9c497fc3d7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -201,7 +201,7 @@ private[deploy] class DriverRunner( CommandUtils.redirectStream(process.getInputStream, stdout) val stderr = new File(baseDir, "stderr") - val redactedCommand = Utils.redactCommandLineArgs(conf, builder.command.asScala) + val redactedCommand = Utils.redactCommandLineArgs(conf, builder.command.asScala.toSeq) .mkString("\"", "\" \"", "\"") val header = "Launch Command: %s\n%s\n\n".format(redactedCommand, "=" * 40) Files.append(header, stderr, StandardCharsets.UTF_8) @@ -262,6 +262,6 @@ private[deploy] trait ProcessBuilderLike { private[deploy] object ProcessBuilderLike { def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new ProcessBuilderLike { override def start(): Process = processBuilder.start() - override def command: Seq[String] = processBuilder.command().asScala + override def command: Seq[String] = processBuilder.command().asScala.toSeq } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 2a5528bbe89c..e4fcae13a2f8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -158,7 +158,7 @@ private[deploy] class ExecutorRunner( val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf), memory, sparkHome.getAbsolutePath, substituteVariables) val command = builder.command() - val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala) + val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala.toSeq) .mkString("\"", "\" \"", "\"") logInfo(s"Launch command: $redactedCommand") diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 6625457749f6..e072d7919450 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -140,13 +140,13 @@ private[spark] class CoarseGrainedExecutorBackend( def extractLogUrls: Map[String, String] = { val prefix = "SPARK_LOG_URL_" sys.env.filterKeys(_.startsWith(prefix)) - .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)) + .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)).toMap } def extractAttributes: Map[String, String] = { val prefix = "SPARK_EXECUTOR_ATTRIBUTE_" sys.env.filterKeys(_.startsWith(prefix)) - .map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2)) + .map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2)).toMap } override def receive: PartialFunction[Any, Unit] = { @@ -304,8 +304,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env, - arguments.resourcesFileOpt, resourceProfile) + arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq, + env, arguments.resourcesFileOpt, resourceProfile) } run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) System.exit(0) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index c8b1afeebac0..bc0f0c0a7b70 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -606,7 +606,8 @@ private[spark] class Executor( // Here and below, put task metric peaks in a WrappedArray to expose them as a Seq // without requiring a copy. val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId)) - val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates, accums, metricPeaks)) + val serializedTK = ser.serialize( + TaskKilled(t.reason, accUpdates, accums, metricPeaks.toSeq)) execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) case _: InterruptedException | NonFatal(_) if @@ -616,7 +617,8 @@ private[spark] class Executor( val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId)) - val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums, metricPeaks)) + val serializedTK = ser.serialize( + TaskKilled(killReason, accUpdates, accums, metricPeaks.toSeq)) execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) => @@ -661,13 +663,13 @@ private[spark] class Executor( val serializedTaskEndReason = { try { val ef = new ExceptionFailure(t, accUpdates).withAccums(accums) - .withMetricPeaks(metricPeaks) + .withMetricPeaks(metricPeaks.toSeq) ser.serialize(ef) } catch { case _: NotSerializableException => // t is not serializable so just send the stacktrace val ef = new ExceptionFailure(t, accUpdates, false).withAccums(accums) - .withMetricPeaks(metricPeaks) + .withMetricPeaks(metricPeaks.toSeq) ser.serialize(ef) } } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 1470a23884bb..43742a4d46cb 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -123,7 +123,7 @@ class TaskMetrics private[spark] () extends Serializable { def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = { // This is called on driver. All accumulator updates have a fixed value. So it's safe to use // `asScala` which accesses the internal values using `java.util.Iterator`. - _updatedBlockStatuses.value.asScala + _updatedBlockStatuses.value.asScala.toSeq } // Setters and increment-ers @@ -199,7 +199,7 @@ class TaskMetrics private[spark] () extends Serializable { */ private[spark] def mergeShuffleReadMetrics(): Unit = synchronized { if (tempShuffleReadMetrics.nonEmpty) { - shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics) + shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics.toSeq) } } diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 57dcbe501c6d..48f816f649d3 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -156,7 +156,7 @@ private[spark] class MetricsSystem private ( } def getSourcesByName(sourceName: String): Seq[Source] = - sources.filter(_.sourceName == sourceName) + sources.filter(_.sourceName == sourceName).toSeq def registerSource(source: Source): Unit = { sources += source diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 1e39e1085687..f280c220a2c8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -934,7 +934,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) for (pair <- it if pair._1 == key) { buf += pair._2 } - buf + buf.toSeq } : Seq[V] val res = self.context.runJob(self, process, Array(index)) res(0) diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index 9f8019b80a4d..324cba5b4de4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -133,12 +133,11 @@ private object ParallelCollectionRDD { // If the range is inclusive, use inclusive range for the last slice if (r.isInclusive && index == numSlices - 1) { new Range.Inclusive(r.start + start * r.step, r.end, r.step) - } - else { - new Range(r.start + start * r.step, r.start + end * r.step, r.step) + } else { + new Range.Inclusive(r.start + start * r.step, r.start + (end - 1) * r.step, r.step) } }.toSeq.asInstanceOf[Seq[Seq[T]]] - case nr: NumericRange[_] => + case nr: NumericRange[T] => // For ranges of Long, Double, BigInteger, etc val slices = new ArrayBuffer[Seq[T]](numSlices) var r = nr @@ -147,7 +146,7 @@ private object ParallelCollectionRDD { slices += r.take(sliceSize).asInstanceOf[Seq[T]] r = r.drop(sliceSize) } - slices + slices.toSeq case _ => val array = seq.toArray // To prevent O(n^2) operations for List etc positions(array.length, numSlices).map { case (start, end) => diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 3b11e82dab19..5dd8cb8440be 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -238,7 +238,7 @@ private object PipedRDD { while(tok.hasMoreElements) { buf += tok.nextToken() } - buf + buf.toSeq } val STDIN_WRITER_THREAD_PREFIX = "stdin writer for" diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index 63fa3c2487c3..0a9302344370 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -98,7 +98,7 @@ class UnionRDD[T: ClassTag]( deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length) pos += rdd.partitions.length } - deps + deps.toSeq } override def compute(s: Partition, context: TaskContext): Iterator[T] = { diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index 1dbdc3d81e44..f56ea69f6cec 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -319,12 +319,13 @@ object ResourceProfile extends Logging { private[spark] def getCustomTaskResources( rp: ResourceProfile): Map[String, TaskResourceRequest] = { - rp.taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)) + rp.taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap } private[spark] def getCustomExecutorResources( rp: ResourceProfile): Map[String, ExecutorResourceRequest] = { - rp.executorResources.filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k)) + rp.executorResources. + filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k)).toMap } /* diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 95b0096cade3..f13f1eaeeaa4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -232,7 +232,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) { // For testing only. private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag](): Seq[T] = { - queues.asScala.flatMap { queue => queue.findListenersByClass[T]() } + queues.asScala.flatMap { queue => queue.findListenersByClass[T]() }.toSeq } // For testing only. diff --git a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala index bc1431835e25..6112d8ef051e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala @@ -69,7 +69,7 @@ object SplitInfo { for (host <- mapredSplit.getLocations) { retval += new SplitInfo(inputFormatClazz, host, path, length, mapredSplit) } - retval + retval.toSeq } def toSplitInfo(inputFormatClazz: Class[_], path: String, @@ -79,6 +79,6 @@ object SplitInfo { for (host <- mapreduceSplit.getLocations) { retval += new SplitInfo(inputFormatClazz, host, path, length, mapreduceSplit) } - retval + retval.toSeq } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala index ca48775e77f2..be881481bf4f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala @@ -47,19 +47,19 @@ class StatsReportListener extends SparkListener with Logging { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { implicit val sc = stageCompleted this.logInfo(s"Finished stage: ${getStatusDetail(stageCompleted.stageInfo)}") - showMillisDistribution("task runtime:", (info, _) => info.duration, taskInfoMetrics) + showMillisDistribution("task runtime:", (info, _) => info.duration, taskInfoMetrics.toSeq) // Shuffle write showBytesDistribution("shuffle bytes written:", - (_, metric) => metric.shuffleWriteMetrics.bytesWritten, taskInfoMetrics) + (_, metric) => metric.shuffleWriteMetrics.bytesWritten, taskInfoMetrics.toSeq) // Fetch & I/O showMillisDistribution("fetch wait time:", - (_, metric) => metric.shuffleReadMetrics.fetchWaitTime, taskInfoMetrics) + (_, metric) => metric.shuffleReadMetrics.fetchWaitTime, taskInfoMetrics.toSeq) showBytesDistribution("remote bytes read:", - (_, metric) => metric.shuffleReadMetrics.remoteBytesRead, taskInfoMetrics) + (_, metric) => metric.shuffleReadMetrics.remoteBytesRead, taskInfoMetrics.toSeq) showBytesDistribution("task result size:", - (_, metric) => metric.resultSize, taskInfoMetrics) + (_, metric) => metric.resultSize, taskInfoMetrics.toSeq) // Runtime breakdown val runtimePcts = taskInfoMetrics.map { case (info, metrics) => diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index b6df216d537e..11d969e1aba9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -71,7 +71,7 @@ private[spark] class DirectTaskResult[T]( for (i <- 0 until numUpdates) { _accumUpdates += in.readObject.asInstanceOf[AccumulatorV2[_, _]] } - accumUpdates = _accumUpdates + accumUpdates = _accumUpdates.toSeq } val numMetrics = in.readInt diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2c37fec27176..45cb5e534220 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -137,7 +137,7 @@ private[spark] class TaskSchedulerImpl( private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] def runningTasksByExecutors: Map[String, Int] = synchronized { - executorIdToRunningTaskIds.toMap.mapValues(_.size) + executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap } // The set of executors we have on each host; this is used to compute hostsAlive, which @@ -719,7 +719,7 @@ private[spark] class TaskSchedulerImpl( if (tasks.nonEmpty) { hasLaunchedTask = true } - return tasks + return tasks.map(_.toSeq) } private def createUnschedulableTaskSetAbortTimer( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 219a0e799cc7..95d901f29297 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -367,7 +367,7 @@ private[storage] class BlockInfoManager extends Logging { notifyAll() - blocksWithReleasedLocks + blocksWithReleasedLocks.toSeq } /** Returns the number of locks held by the given task. Used only for testing. */ diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index f2113947f6bf..57dc7ea63e94 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -97,7 +97,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea } }.filter(_ != null).flatMap { dir => val files = dir.listFiles() - if (files != null) files else Seq.empty + if (files != null) files.toSeq else Seq.empty } } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 5efbc0703f72..a2843da0561e 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -368,25 +368,25 @@ final class ShuffleBlockFetcherIterator( collectedRemoteRequests: ArrayBuffer[FetchRequest]): Unit = { val iterator = blockInfos.iterator var curRequestSize = 0L - var curBlocks = new ArrayBuffer[FetchBlockInfo] + var curBlocks = Seq.empty[FetchBlockInfo] while (iterator.hasNext) { val (blockId, size, mapIndex) = iterator.next() assertPositiveBlockSize(blockId, size) - curBlocks += FetchBlockInfo(blockId, size, mapIndex) + curBlocks = curBlocks ++ Seq(FetchBlockInfo(blockId, size, mapIndex)) curRequestSize += size // For batch fetch, the actual block in flight should count for merged block. val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) { curBlocks = createFetchRequests(curBlocks, address, isLast = false, - collectedRemoteRequests).to[ArrayBuffer] + collectedRemoteRequests) curRequestSize = curBlocks.map(_.size).sum } } // Add in the final request if (curBlocks.nonEmpty) { curBlocks = createFetchRequests(curBlocks, address, isLast = true, - collectedRemoteRequests).to[ArrayBuffer] + collectedRemoteRequests) curRequestSize = curBlocks.map(_.size).sum } } @@ -928,7 +928,7 @@ object ShuffleBlockFetcherIterator { } else { blocks } - result + result.toSeq } /** diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 087a22d6c614..a070cc9c7b39 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -443,7 +443,7 @@ private[spark] object UIUtils extends Logging { case None =>