Skip to content

Commit 3ad4863

Browse files
srowendongjoon-hyun
authored andcommitted
[SPARK-29292][SPARK-30010][CORE] Let core compile for Scala 2.13
### What changes were proposed in this pull request? The purpose of this PR is to partly resolve SPARK-29292, and fully resolve SPARK-30010, which should allow Spark to compile vs Scala 2.13 in Spark Core and up through GraphX (not SQL, Streaming, etc). Note that we are not trying to determine here whether this makes Spark work on 2.13 yet, just compile, as a prerequisite for assessing test outcomes. However, of course, we need to ensure that the change does not break 2.12. The changes are, in the main, adding .toSeq and .toMap calls where mutable collections / maps are returned as Seq / Map, which are immutable by default in Scala 2.13. The theory is that it should be a no-op for Scala 2.12 (these return themselves), and required for 2.13. There are a few non-trivial changes highlighted below. In particular, to get Core to compile, we need to resolve SPARK-30010 which removes a deprecated SparkConf method ### Why are the changes needed? Eventually, we need to support a Scala 2.13 build, perhaps in Spark 3.1. ### Does this PR introduce _any_ user-facing change? Yes, removal of the deprecated SparkConf.setAll overload, which isn't legal in Scala 2.13 anymore. ### How was this patch tested? Existing tests. (2.13 was not _tested_; this is about getting it to compile without breaking 2.12) Closes #28971 from srowen/SPARK-29292.1. Authored-by: Sean Owen <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent ceaa392 commit 3ad4863

File tree

58 files changed

+119
-122
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+119
-122
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ private[spark] class ExecutorAllocationManager(
550550
} else {
551551
// We don't want to change our target number of executors, because we already did that
552552
// when the task backlog decreased.
553-
client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false,
553+
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
554554
countFailures = false, force = false)
555555
}
556556

@@ -563,9 +563,9 @@ private[spark] class ExecutorAllocationManager(
563563

564564
// reset the newExecutorTotal to the existing number of executors
565565
if (testing || executorsRemoved.nonEmpty) {
566-
executorMonitor.executorsKilled(executorsRemoved)
566+
executorMonitor.executorsKilled(executorsRemoved.toSeq)
567567
logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.")
568-
executorsRemoved
568+
executorsRemoved.toSeq
569569
} else {
570570
logWarning(s"Unable to reach the cluster manager to kill executor/s " +
571571
s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible to kill!")

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -972,6 +972,6 @@ private[spark] object MapOutputTracker extends Logging {
972972
}
973973
}
974974

975-
splitsByAddress.iterator
975+
splitsByAddress.mapValues(_.toSeq).iterator
976976
}
977977
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -173,15 +173,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
173173
this
174174
}
175175

176-
/**
177-
* Set multiple parameters together
178-
*/
179-
@deprecated("Use setAll(Iterable) instead", "3.0.0")
180-
def setAll(settings: Traversable[(String, String)]): SparkConf = {
181-
settings.foreach { case (k, v) => set(k, v) }
182-
this
183-
}
184-
185176
/** Set a parameter if it isn't already configured */
186177
def setIfMissing(key: String, value: String): SparkConf = {
187178
if (settings.putIfAbsent(key, value) == null) {

core/src/main/scala/org/apache/spark/TaskContextImpl.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ private[spark] class TaskContextImpl(
111111
if (failed) return
112112
failed = true
113113
failure = error
114-
invokeListeners(onFailureCallbacks, "TaskFailureListener", Option(error)) {
114+
invokeListeners(onFailureCallbacks.toSeq, "TaskFailureListener", Option(error)) {
115115
_.onTaskFailure(this, error)
116116
}
117117
}
@@ -120,7 +120,7 @@ private[spark] class TaskContextImpl(
120120
private[spark] override def markTaskCompleted(error: Option[Throwable]): Unit = synchronized {
121121
if (completed) return
122122
completed = true
123-
invokeListeners(onCompleteCallbacks, "TaskCompletionListener", error) {
123+
invokeListeners(onCompleteCallbacks.toSeq, "TaskCompletionListener", error) {
124124
_.onTaskCompletion(this)
125125
}
126126
}
@@ -142,7 +142,7 @@ private[spark] class TaskContextImpl(
142142
}
143143
}
144144
if (errorMsgs.nonEmpty) {
145-
throw new TaskCompletionListenerException(errorMsgs, error)
145+
throw new TaskCompletionListenerException(errorMsgs.toSeq, error)
146146
}
147147
}
148148

core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ object JavaRDD {
256256
} catch {
257257
case eof: EOFException => // No-op
258258
}
259-
JavaRDD.fromRDD(sc.parallelize(objs, parallelism))
259+
JavaRDD.fromRDD(sc.parallelize(objs.toSeq, parallelism))
260260
} finally {
261261
din.close()
262262
}

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -265,14 +265,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
265265
* Return an RDD created by piping elements to a forked external process.
266266
*/
267267
def pipe(command: JList[String]): JavaRDD[String] = {
268-
rdd.pipe(command.asScala)
268+
rdd.pipe(command.asScala.toSeq)
269269
}
270270

271271
/**
272272
* Return an RDD created by piping elements to a forked external process.
273273
*/
274274
def pipe(command: JList[String], env: JMap[String, String]): JavaRDD[String] = {
275-
rdd.pipe(command.asScala, env.asScala)
275+
rdd.pipe(command.asScala.toSeq, env.asScala)
276276
}
277277

278278
/**
@@ -282,7 +282,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
282282
env: JMap[String, String],
283283
separateWorkingDir: Boolean,
284284
bufferSize: Int): JavaRDD[String] = {
285-
rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize)
285+
rdd.pipe(command.asScala.toSeq, env.asScala, null, null, separateWorkingDir, bufferSize)
286286
}
287287

288288
/**
@@ -293,7 +293,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
293293
separateWorkingDir: Boolean,
294294
bufferSize: Int,
295295
encoding: String): JavaRDD[String] = {
296-
rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize, encoding)
296+
rdd.pipe(command.asScala.toSeq, env.asScala, null, null, separateWorkingDir, bufferSize,
297+
encoding)
297298
}
298299

299300
/**

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
133133
/** Distribute a local Scala collection to form an RDD. */
134134
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
135135
implicit val ctag: ClassTag[T] = fakeClassTag
136-
sc.parallelize(list.asScala, numSlices)
136+
sc.parallelize(list.asScala.toSeq, numSlices)
137137
}
138138

139139
/** Get an RDD that has no partitions or elements. */
@@ -152,7 +152,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
152152
: JavaPairRDD[K, V] = {
153153
implicit val ctagK: ClassTag[K] = fakeClassTag
154154
implicit val ctagV: ClassTag[V] = fakeClassTag
155-
JavaPairRDD.fromRDD(sc.parallelize(list.asScala, numSlices))
155+
JavaPairRDD.fromRDD(sc.parallelize(list.asScala.toSeq, numSlices))
156156
}
157157

158158
/** Distribute a local Scala collection to form an RDD. */
@@ -161,7 +161,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
161161

162162
/** Distribute a local Scala collection to form an RDD. */
163163
def parallelizeDoubles(list: java.util.List[java.lang.Double], numSlices: Int): JavaDoubleRDD =
164-
JavaDoubleRDD.fromRDD(sc.parallelize(list.asScala.map(_.doubleValue()), numSlices))
164+
JavaDoubleRDD.fromRDD(sc.parallelize(list.asScala.map(_.doubleValue()).toSeq, numSlices))
165165

166166
/** Distribute a local Scala collection to form an RDD. */
167167
def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD =

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ private[spark] object PythonRDD extends Logging {
163163
type ByteArray = Array[Byte]
164164
type UnrolledPartition = Array[ByteArray]
165165
val allPartitions: Array[UnrolledPartition] =
166-
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala)
166+
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala.toSeq)
167167
val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
168168
serveIterator(flattenedPartition.iterator,
169169
s"serve RDD ${rdd.id} with partitions ${partitions.asScala.mkString(",")}")

core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private[spark] object PythonUtils {
5454
* Convert list of T into seq of T (for calling API with varargs)
5555
*/
5656
def toSeq[T](vs: JList[T]): Seq[T] = {
57-
vs.asScala
57+
vs.asScala.toSeq
5858
}
5959

6060
/**

core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ private object FaultToleranceTest extends App with Logging {
205205

206206
private def addWorkers(num: Int): Unit = {
207207
logInfo(s">>>>> ADD WORKERS $num <<<<<")
208-
val masterUrls = getMasterUrls(masters)
208+
val masterUrls = getMasterUrls(masters.toSeq)
209209
(1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
210210
}
211211

@@ -216,7 +216,7 @@ private object FaultToleranceTest extends App with Logging {
216216
// Counter-hack: Because of a hack in SparkEnv#create() that changes this
217217
// property, we need to reset it.
218218
System.setProperty(config.DRIVER_PORT.key, "0")
219-
sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome)
219+
sc = new SparkContext(getMasterUrls(masters.toSeq), "fault-tolerance", containerSparkHome)
220220
}
221221

222222
private def getMasterUrls(masters: Seq[TestMasterInfo]): String = {
@@ -279,7 +279,7 @@ private object FaultToleranceTest extends App with Logging {
279279
var liveWorkerIPs: Seq[String] = List()
280280

281281
def stateValid(): Boolean = {
282-
(workers.map(_.ip) -- liveWorkerIPs).isEmpty &&
282+
workers.map(_.ip).forall(liveWorkerIPs.contains) &&
283283
numAlive == 1 && numStandby == masters.size - 1 && numLiveApps >= 1
284284
}
285285

0 commit comments

Comments
 (0)