Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ private[spark] class ExecutorAllocationManager(
} else {
// We don't want to change our target number of executors, because we already did that
// when the task backlog decreased.
client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false,
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
countFailures = false, force = false)
}

Expand All @@ -563,9 +563,9 @@ private[spark] class ExecutorAllocationManager(

// reset the newExecutorTotal to the existing number of executors
if (testing || executorsRemoved.nonEmpty) {
executorMonitor.executorsKilled(executorsRemoved)
executorMonitor.executorsKilled(executorsRemoved.toSeq)
logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.")
executorsRemoved
executorsRemoved.toSeq
} else {
logWarning(s"Unable to reach the cluster manager to kill executor/s " +
s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible to kill!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,6 @@ private[spark] object MapOutputTracker extends Logging {
}
}

splitsByAddress.iterator
splitsByAddress.mapValues(_.toSeq).iterator
}
}
9 changes: 0 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
this
}

/**
* Set multiple parameters together
*/
@deprecated("Use setAll(Iterable) instead", "3.0.0")
def setAll(settings: Traversable[(String, String)]): SparkConf = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen, BTW, it might be best to file a JIRA as a reminder to keep this API back if we can't make Scala 2.13 in Spark 3.1.

I believe it is legitimate and inevitable to remove this because of Scala 2.13 but it might be problematic if we can't make it in Spark 3.1, and have a release out only with Scala 2.12.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah if the whole thing doesn't make it for 3.1, I'd leave this method in 3.1.

settings.foreach { case (k, v) => set(k, v) }
this
}

/** Set a parameter if it isn't already configured */
def setIfMissing(key: String, value: String): SparkConf = {
if (settings.putIfAbsent(key, value) == null) {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private[spark] class TaskContextImpl(
if (failed) return
failed = true
failure = error
invokeListeners(onFailureCallbacks, "TaskFailureListener", Option(error)) {
invokeListeners(onFailureCallbacks.toSeq, "TaskFailureListener", Option(error)) {
_.onTaskFailure(this, error)
}
}
Expand All @@ -120,7 +120,7 @@ private[spark] class TaskContextImpl(
private[spark] override def markTaskCompleted(error: Option[Throwable]): Unit = synchronized {
if (completed) return
completed = true
invokeListeners(onCompleteCallbacks, "TaskCompletionListener", error) {
invokeListeners(onCompleteCallbacks.toSeq, "TaskCompletionListener", error) {
_.onTaskCompletion(this)
}
}
Expand All @@ -142,7 +142,7 @@ private[spark] class TaskContextImpl(
}
}
if (errorMsgs.nonEmpty) {
throw new TaskCompletionListenerException(errorMsgs, error)
throw new TaskCompletionListenerException(errorMsgs.toSeq, error)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ object JavaRDD {
} catch {
case eof: EOFException => // No-op
}
JavaRDD.fromRDD(sc.parallelize(objs, parallelism))
JavaRDD.fromRDD(sc.parallelize(objs.toSeq, parallelism))
} finally {
din.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: JList[String]): JavaRDD[String] = {
rdd.pipe(command.asScala)
rdd.pipe(command.asScala.toSeq)
}

/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: JList[String], env: JMap[String, String]): JavaRDD[String] = {
rdd.pipe(command.asScala, env.asScala)
rdd.pipe(command.asScala.toSeq, env.asScala)
}

/**
Expand All @@ -282,7 +282,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
env: JMap[String, String],
separateWorkingDir: Boolean,
bufferSize: Int): JavaRDD[String] = {
rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize)
rdd.pipe(command.asScala.toSeq, env.asScala, null, null, separateWorkingDir, bufferSize)
}

/**
Expand All @@ -293,7 +293,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
separateWorkingDir: Boolean,
bufferSize: Int,
encoding: String): JavaRDD[String] = {
rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize, encoding)
rdd.pipe(command.asScala.toSeq, env.asScala, null, null, separateWorkingDir, bufferSize,
encoding)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
sc.parallelize(list.asScala, numSlices)
sc.parallelize(list.asScala.toSeq, numSlices)
}

/** Get an RDD that has no partitions or elements. */
Expand All @@ -152,7 +152,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
: JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[V] = fakeClassTag
JavaPairRDD.fromRDD(sc.parallelize(list.asScala, numSlices))
JavaPairRDD.fromRDD(sc.parallelize(list.asScala.toSeq, numSlices))
}

/** Distribute a local Scala collection to form an RDD. */
Expand All @@ -161,7 +161,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {

/** Distribute a local Scala collection to form an RDD. */
def parallelizeDoubles(list: java.util.List[java.lang.Double], numSlices: Int): JavaDoubleRDD =
JavaDoubleRDD.fromRDD(sc.parallelize(list.asScala.map(_.doubleValue()), numSlices))
JavaDoubleRDD.fromRDD(sc.parallelize(list.asScala.map(_.doubleValue()).toSeq, numSlices))

/** Distribute a local Scala collection to form an RDD. */
def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private[spark] object PythonRDD extends Logging {
type ByteArray = Array[Byte]
type UnrolledPartition = Array[ByteArray]
val allPartitions: Array[UnrolledPartition] =
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala)
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala.toSeq)
val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
serveIterator(flattenedPartition.iterator,
s"serve RDD ${rdd.id} with partitions ${partitions.asScala.mkString(",")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[spark] object PythonUtils {
* Convert list of T into seq of T (for calling API with varargs)
*/
def toSeq[T](vs: JList[T]): Seq[T] = {
vs.asScala
vs.asScala.toSeq
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private object FaultToleranceTest extends App with Logging {

private def addWorkers(num: Int): Unit = {
logInfo(s">>>>> ADD WORKERS $num <<<<<")
val masterUrls = getMasterUrls(masters)
val masterUrls = getMasterUrls(masters.toSeq)
(1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
}

Expand All @@ -216,7 +216,7 @@ private object FaultToleranceTest extends App with Logging {
// Counter-hack: Because of a hack in SparkEnv#create() that changes this
// property, we need to reset it.
System.setProperty(config.DRIVER_PORT.key, "0")
sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome)
sc = new SparkContext(getMasterUrls(masters.toSeq), "fault-tolerance", containerSparkHome)
}

private def getMasterUrls(masters: Seq[TestMasterInfo]): String = {
Expand Down Expand Up @@ -279,7 +279,7 @@ private object FaultToleranceTest extends App with Logging {
var liveWorkerIPs: Seq[String] = List()

def stateValid(): Boolean = {
(workers.map(_.ip) -- liveWorkerIPs).isEmpty &&
workers.map(_.ip).forall(liveWorkerIPs.contains) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: What about using diff here?
As I see diff is not deprecated: https://www.scala-lang.org/api/current/scala/collection/Seq.html#diff[B%3E:A](that:scala.collection.Seq[B]):C

Suggested change
workers.map(_.ip).forall(liveWorkerIPs.contains) &&
workers.map(_.ip).diff(liveWorkerIPs).isEmpty &&

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diff would work too, I think. It has multiset semantics, and I thought it was not necessary here. I went for what I thought was simpler, but I am not 100% sure.

numAlive == 1 && numStandby == masters.size - 1 && numLiveApps >= 1
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object PythonRunner {
pathElements ++= formattedPyFiles
pathElements += PythonUtils.sparkPythonPath
pathElements += sys.env.getOrElse("PYTHONPATH", "")
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
val pythonPath = PythonUtils.mergePythonPaths(pathElements.toSeq: _*)

// Launch Python process
val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ private[spark] class SparkSubmit extends Logging {
}
sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toSeq)

(childArgs, childClasspath, sparkConf, childMainClass)
(childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass)
}

private def renameResourcesToLocalFS(resources: String, localResources: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer

override def read[T: ClassTag](prefix: String): Seq[T] = {
zk.getChildren.forPath(workingDir).asScala
.filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T])
.filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T]).toSeq
}

override def close(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ private[spark] object RestSubmissionClient {
private[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = {
env.filterKeys { k =>
(k.startsWith("SPARK_") && !BLACKLISTED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_")
}
}.toMap
}

private[spark] def supportsRestClient(master: String): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object CommandUtils extends Logging {
// SPARK-698: do not call the run.cmd script, as process.destroy()
// fails to kill a process tree on Windows
val cmd = new WorkerCommandBuilder(sparkHome, memory, command).buildCommand()
cmd.asScala ++ Seq(command.mainClass) ++ command.arguments
(cmd.asScala ++ Seq(command.mainClass) ++ command.arguments).toSeq
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private[deploy] class DriverRunner(
CommandUtils.redirectStream(process.getInputStream, stdout)

val stderr = new File(baseDir, "stderr")
val redactedCommand = Utils.redactCommandLineArgs(conf, builder.command.asScala)
val redactedCommand = Utils.redactCommandLineArgs(conf, builder.command.asScala.toSeq)
.mkString("\"", "\" \"", "\"")
val header = "Launch Command: %s\n%s\n\n".format(redactedCommand, "=" * 40)
Files.append(header, stderr, StandardCharsets.UTF_8)
Expand Down Expand Up @@ -262,6 +262,6 @@ private[deploy] trait ProcessBuilderLike {
private[deploy] object ProcessBuilderLike {
def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new ProcessBuilderLike {
override def start(): Process = processBuilder.start()
override def command: Seq[String] = processBuilder.command().asScala
override def command: Seq[String] = processBuilder.command().asScala.toSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private[deploy] class ExecutorRunner(
val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala)
val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala.toSeq)
.mkString("\"", "\" \"", "\"")
logInfo(s"Launch command: $redactedCommand")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,13 @@ private[spark] class CoarseGrainedExecutorBackend(
def extractLogUrls: Map[String, String] = {
val prefix = "SPARK_LOG_URL_"
sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2))
.map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)).toMap
}

def extractAttributes: Map[String, String] = {
val prefix = "SPARK_EXECUTOR_ATTRIBUTE_"
sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2))
.map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2)).toMap
}

override def receive: PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -304,8 +304,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.resourcesFileOpt, resourceProfile)
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
env, arguments.resourcesFileOpt, resourceProfile)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
System.exit(0)
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,8 @@ private[spark] class Executor(
// Here and below, put task metric peaks in a WrappedArray to expose them as a Seq
// without requiring a copy.
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId))
val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates, accums, metricPeaks))
val serializedTK = ser.serialize(
TaskKilled(t.reason, accUpdates, accums, metricPeaks.toSeq))
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)

case _: InterruptedException | NonFatal(_) if
Expand All @@ -616,7 +617,8 @@ private[spark] class Executor(

val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId))
val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums, metricPeaks))
val serializedTK = ser.serialize(
TaskKilled(killReason, accUpdates, accums, metricPeaks.toSeq))
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)

case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) =>
Expand Down Expand Up @@ -661,13 +663,13 @@ private[spark] class Executor(
val serializedTaskEndReason = {
try {
val ef = new ExceptionFailure(t, accUpdates).withAccums(accums)
.withMetricPeaks(metricPeaks)
.withMetricPeaks(metricPeaks.toSeq)
ser.serialize(ef)
} catch {
case _: NotSerializableException =>
// t is not serializable so just send the stacktrace
val ef = new ExceptionFailure(t, accUpdates, false).withAccums(accums)
.withMetricPeaks(metricPeaks)
.withMetricPeaks(metricPeaks.toSeq)
ser.serialize(ef)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class TaskMetrics private[spark] () extends Serializable {
def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = {
// This is called on driver. All accumulator updates have a fixed value. So it's safe to use
// `asScala` which accesses the internal values using `java.util.Iterator`.
_updatedBlockStatuses.value.asScala
_updatedBlockStatuses.value.asScala.toSeq
}

// Setters and increment-ers
Expand Down Expand Up @@ -199,7 +199,7 @@ class TaskMetrics private[spark] () extends Serializable {
*/
private[spark] def mergeShuffleReadMetrics(): Unit = synchronized {
if (tempShuffleReadMetrics.nonEmpty) {
shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics)
shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics.toSeq)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private[spark] class MetricsSystem private (
}

def getSourcesByName(sourceName: String): Seq[Source] =
sources.filter(_.sourceName == sourceName)
sources.filter(_.sourceName == sourceName).toSeq

def registerSource(source: Source): Unit = {
sources += source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
for (pair <- it if pair._1 == key) {
buf += pair._2
}
buf
buf.toSeq
} : Seq[V]
val res = self.context.runJob(self, process, Array(index))
res(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,11 @@ private object ParallelCollectionRDD {
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
} else {
new Range.Inclusive(r.start + start * r.step, r.start + (end - 1) * r.step, r.step)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For previous reviewers: I fixed a bug from my initial change here. The non-inclusive end is not 1 less than the exclusive end, but one less r.step

}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
case nr: NumericRange[_] =>
case nr: NumericRange[T] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
Expand All @@ -147,7 +146,7 @@ private object ParallelCollectionRDD {
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
slices.toSeq
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map { case (start, end) =>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private object PipedRDD {
while(tok.hasMoreElements) {
buf += tok.nextToken()
}
buf
buf.toSeq
}

val STDIN_WRITER_THREAD_PREFIX = "stdin writer for"
Expand Down
Loading