From 9cd3c4371e5d1c5eadaeb7665218e3d353653af6 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Mon, 30 Oct 2023 14:21:51 +0800 Subject: [PATCH 1/4] [SPARK-45688][SPARK-45693][CORE] Clean up the deprecated API usage related to `MapOps` & Fix `method += in trait Growable is deprecated` --- .../src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../scala/org/apache/spark/deploy/worker/CommandUtils.scala | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 883ca62ae22b4..1e4d39aab9b37 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -727,7 +727,7 @@ private[spark] class SparkSubmit extends Logging { if (opt.value != null && (deployMode & opt.deployMode) != 0 && (clusterManager & opt.clusterManager) != 0) { - if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) } + if (opt.clOption != null) { childArgs.addOne(opt.clOption).addOne(opt.value) } if (opt.confKey != null) { if (opt.mergeFn.isDefined && sparkConf.contains(opt.confKey)) { sparkConf.set(opt.confKey, opt.mergeFn.get.apply(sparkConf.get(opt.confKey), opt.value)) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index c04214de4ddc6..af8f7277a59e8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -81,14 +81,15 @@ object CommandUtils extends Logging { var newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) { val libraryPaths = libraryPathEntries ++ cmdLibraryPath ++ env.get(libraryPathName) - command.environment + ((libraryPathName, libraryPaths.mkString(File.pathSeparator))) + command.environment.concat(Map(libraryPathName -> libraryPaths.mkString(File.pathSeparator))) } else { command.environment } // set auth secret to env variable if needed if (securityMgr.isAuthenticationEnabled()) { - newEnvironment += (SecurityManager.ENV_AUTH_SECRET -> securityMgr.getSecretKey()) + newEnvironment = newEnvironment.concat( + Map(SecurityManager.ENV_AUTH_SECRET -> securityMgr.getSecretKey())) } // set SSL env variables if needed newEnvironment ++= securityMgr.getEnvironmentForSslRpcPasswords From d4ebf7d89e6a2aca27458448637b5bcd21235d02 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Mon, 30 Oct 2023 16:16:52 +0800 Subject: [PATCH 2/4] [SPARK-45688][SPARK-45693][CORE] Clean up the deprecated API usage related to & Fix --- .../org/apache/spark/deploy/SparkSubmit.scala | 28 +++++++++---------- .../spark/deploy/worker/CommandUtils.scala | 6 ++-- .../apache/spark/util/JsonProtocolSuite.scala | 2 +- .../spark/util/TimeStampedHashMapSuite.scala | 10 +++---- .../cluster/YarnClientSchedulerBackend.scala | 2 +- .../statsEstimation/JoinEstimation.scala | 13 ++++----- .../v2/DescribeNamespaceExec.scala | 6 ++-- .../v2/V2SessionCatalogSuite.scala | 5 ++-- 8 files changed, 35 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 1e4d39aab9b37..21e3405aa1dd3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -727,7 +727,7 @@ private[spark] class SparkSubmit extends Logging { if (opt.value != null && (deployMode & opt.deployMode) != 0 && (clusterManager & opt.clusterManager) != 0) { - if (opt.clOption != null) { childArgs.addOne(opt.clOption).addOne(opt.value) } + if (opt.clOption != null) { childArgs += opt.clOption += opt.value } if (opt.confKey != null) { if (opt.mergeFn.isDefined && sparkConf.contains(opt.confKey)) { sparkConf.set(opt.confKey, opt.mergeFn.get.apply(sparkConf.get(opt.confKey), opt.value)) @@ -759,15 +759,15 @@ private[spark] class SparkSubmit extends Logging { if (args.isStandaloneCluster) { if (args.useRest) { childMainClass = REST_CLUSTER_SUBMIT_CLASS - childArgs += (args.primaryResource, args.mainClass) + childArgs += args.primaryResource += args.mainClass } else { // In legacy standalone cluster mode, use Client as a wrapper around the user class childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS if (args.supervise) { childArgs += "--supervise" } - Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) } - Option(args.driverCores).foreach { c => childArgs += ("--cores", c) } + Option(args.driverMemory).foreach { m => childArgs += "--memory" += m } + Option(args.driverCores).foreach { c => childArgs += "--cores" += c } childArgs += "launch" - childArgs += (args.master, args.primaryResource, args.mainClass) + childArgs += args.master += args.primaryResource += args.mainClass } if (args.childArgs != null) { childArgs ++= args.childArgs @@ -789,20 +789,20 @@ private[spark] class SparkSubmit extends Logging { if (isYarnCluster) { childMainClass = YARN_CLUSTER_SUBMIT_CLASS if (args.isPython) { - childArgs += ("--primary-py-file", args.primaryResource) - childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") + childArgs += "--primary-py-file" += args.primaryResource + childArgs += "--class" += "org.apache.spark.deploy.PythonRunner" } else if (args.isR) { val mainFile = new Path(args.primaryResource).getName - childArgs += ("--primary-r-file", mainFile) - childArgs += ("--class", "org.apache.spark.deploy.RRunner") + childArgs += "--primary-r-file" += mainFile + childArgs += "--class" += "org.apache.spark.deploy.RRunner" } else { if (args.primaryResource != SparkLauncher.NO_RESOURCE) { - childArgs += ("--jar", args.primaryResource) + childArgs += "--jar" += args.primaryResource } - childArgs += ("--class", args.mainClass) + childArgs += "--class" += args.mainClass } if (args.childArgs != null) { - args.childArgs.foreach { arg => childArgs += ("--arg", arg) } + args.childArgs.foreach { arg => childArgs += "--arg" += arg } } } @@ -825,12 +825,12 @@ private[spark] class SparkSubmit extends Logging { } if (args.childArgs != null) { args.childArgs.foreach { arg => - childArgs += ("--arg", arg) + childArgs += "--arg" += arg } } // Pass the proxyUser to the k8s app so it is possible to add it to the driver args if (args.proxyUser != null) { - childArgs += ("--proxy-user", args.proxyUser) + childArgs += "--proxy-user" += args.proxyUser } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index af8f7277a59e8..d1190ca46c2a8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -81,15 +81,15 @@ object CommandUtils extends Logging { var newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) { val libraryPaths = libraryPathEntries ++ cmdLibraryPath ++ env.get(libraryPathName) - command.environment.concat(Map(libraryPathName -> libraryPaths.mkString(File.pathSeparator))) + command.environment ++ Map(libraryPathName -> libraryPaths.mkString(File.pathSeparator)) } else { command.environment } // set auth secret to env variable if needed if (securityMgr.isAuthenticationEnabled()) { - newEnvironment = newEnvironment.concat( - Map(SecurityManager.ENV_AUTH_SECRET -> securityMgr.getSecretKey())) + newEnvironment = newEnvironment ++ + Map(SecurityManager.ENV_AUTH_SECRET -> securityMgr.getSecretKey()) } // set SSL env variables if needed newEnvironment ++= securityMgr.getEnvironmentForSslRpcPasswords diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 3defd4b1a7d90..948bc8889bcd1 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -626,7 +626,7 @@ class JsonProtocolSuite extends SparkFunSuite { val expectedEvent: SparkListenerEnvironmentUpdate = { val e = JsonProtocol.environmentUpdateFromJson(environmentUpdateJsonString) e.copy(environmentDetails = - e.environmentDetails + ("Metrics Properties" -> Seq.empty[(String, String)])) + e.environmentDetails ++ Map("Metrics Properties" -> Seq.empty[(String, String)])) } val oldEnvironmentUpdateJson = environmentUpdateJsonString .removeField("Metrics Properties") diff --git a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala index 1644540946839..3554a0f8e2c23 100644 --- a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala @@ -118,7 +118,7 @@ class TimeStampedHashMapSuite extends SparkFunSuite { assert(testMap2.iterator.toSeq.head === (("k1", "v1"))) // + - val testMap3 = testMap2 + (("k0", "v0")) + val testMap3 = testMap2 ++ Map("k0" -> "v0") assert(testMap3.size === 2) assert(testMap3.get("k1").isDefined) assert(testMap3("k1") === "v1") @@ -126,10 +126,10 @@ class TimeStampedHashMapSuite extends SparkFunSuite { assert(testMap3("k0") === "v0") // - - val testMap4 = testMap3 - "k0" - assert(testMap4.size === 1) - assert(testMap4.get("k1").isDefined) - assert(testMap4("k1") === "v1") + testMap3.remove("k0") + assert(testMap3.size === 1) + assert(testMap3.get("k1").isDefined) + assert(testMap3("k1") === "v1") } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 717c620f5c341..af41d30c2cdb8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -53,7 +53,7 @@ private[spark] class YarnClientSchedulerBackend( sc.ui.foreach { ui => conf.set(DRIVER_APP_UI_ADDRESS, ui.webUrl) } val argsArrayBuf = new ArrayBuffer[String]() - argsArrayBuf += ("--arg", hostport) + argsArrayBuf += "--arg" += hostport logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) val args = new ClientArguments(argsArrayBuf.toArray) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala index c6e76df1b31ad..cb7ab6c6cb55b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala @@ -206,13 +206,12 @@ case class JoinEstimation(join: Join) extends Logging { case _ => computeByNdv(leftKey, rightKey, newMin, newMax) } - keyStatsAfterJoin += ( - // Histograms are propagated as unchanged. During future estimation, they should be - // truncated by the updated max/min. In this way, only pointers of the histograms are - // propagated and thus reduce memory consumption. - leftKey -> joinStat.copy(histogram = leftKeyStat.histogram), - rightKey -> joinStat.copy(histogram = rightKeyStat.histogram) - ) + // Histograms are propagated as unchanged. During future estimation, they should be + // truncated by the updated max/min. In this way, only pointers of the histograms are + // propagated and thus reduce memory consumption. + keyStatsAfterJoin += + (leftKey -> joinStat.copy(histogram = leftKeyStat.histogram)) += + (rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)) // Return cardinality estimated from the most selective join keys. if (card < joinCard) joinCard = card } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala index 125952566d7e8..b89fa24f4edbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala @@ -46,12 +46,12 @@ case class DescribeNamespaceExec( } if (isExtended) { - val properties = metadata.asScala -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES + CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.foreach(metadata.remove(_)) val propertiesStr = - if (properties.isEmpty) { + if (metadata.isEmpty) { "" } else { - conf.redactOptions(properties.toMap).toSeq.sortBy(_._1).mkString("(", ", ", ")") + conf.redactOptions(metadata.asScala.toMap).toSeq.sortBy(_._1).mkString("(", ", ", ")") } rows += toCatalystRow("Properties", propertiesStr) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index c43658eacabc2..e93a3950ae05a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -825,9 +825,8 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { expected: scala.collection.Map[String, String], actual: scala.collection.Map[String, String]): Unit = { // remove location and comment that are automatically added by HMS unless they are expected - val toRemove = - CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filter(expected.contains) - assert(expected -- toRemove === actual) + val toRemove = CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.toSet + assert(expected.filter(e => !toRemove.contains(e._1)) === actual) } test("listNamespaces: basic behavior") { From 2700b98c4569730dd4746e0522720c7e59c7ded4 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Mon, 30 Oct 2023 16:34:33 +0800 Subject: [PATCH 3/4] filterNot --- .../sql/execution/datasources/v2/V2SessionCatalogSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index e93a3950ae05a..3d299c8bf08ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -826,7 +826,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { actual: scala.collection.Map[String, String]): Unit = { // remove location and comment that are automatically added by HMS unless they are expected val toRemove = CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.toSet - assert(expected.filter(e => !toRemove.contains(e._1)) === actual) + assert(expected.filterNot(e => toRemove.contains(e._1)) === actual) } test("listNamespaces: basic behavior") { From 52da31c51fa37d00ac715c609f960504d553f62a Mon Sep 17 00:00:00 2001 From: panbingkun Date: Mon, 30 Oct 2023 21:48:36 +0800 Subject: [PATCH 4/4] fix map remove bug --- .../execution/datasources/v2/DescribeNamespaceExec.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala index b89fa24f4edbb..766200e046339 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala @@ -46,12 +46,13 @@ case class DescribeNamespaceExec( } if (isExtended) { - CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.foreach(metadata.remove(_)) + val properties = metadata.asScala.filterNot( + m => CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.toSet.contains(m._1)) val propertiesStr = - if (metadata.isEmpty) { + if (properties.isEmpty) { "" } else { - conf.redactOptions(metadata.asScala.toMap).toSeq.sortBy(_._1).mkString("(", ", ", ")") + conf.redactOptions(properties.toMap).toSeq.sortBy(_._1).mkString("(", ", ", ")") } rows += toCatalystRow("Properties", propertiesStr) }