diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala index ad6c805c20b2e..4e8e1f2499eb2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala @@ -40,8 +40,7 @@ private[spark] trait HadoopConfBootstrap { private[spark] class HadoopConfBootstrapImpl( hadoopConfConfigMapName: String, - hadoopConfigFiles: Seq[File], - hadoopUGI: HadoopUGIUtil) extends HadoopConfBootstrap with Logging { + hadoopConfigFiles: Seq[File]) extends HadoopConfBootstrap with Logging { override def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer) : PodWithMainContainer = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfUtils.scala new file mode 100644 index 0000000000000..090b44e95408d --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfUtils.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.io.File + +private[spark] object HadoopConfUtils { + + def getHadoopConfFiles(path: String) : Seq[File] = { + val dir = new File(path) + if (dir.isDirectory) { + dir.listFiles.flatMap { file => Some(file).filter(_.isFile) }.toSeq + } else { + Seq.empty[File] + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala index 06de3fc6c74b9..a3764769f4dcf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps import java.io.File import org.apache.spark.SparkConf -import org.apache.spark.deploy.k8s.{HadoopConfBootstrapImpl, HadoopUGIUtilImpl, OptionRequirements} +import org.apache.spark.deploy.k8s.{HadoopConfBootstrapImpl, HadoopConfUtils, HadoopUGIUtilImpl, OptionRequirements} import org.apache.spark.deploy.k8s.HadoopConfSparkUserBootstrapImpl import org.apache.spark.deploy.k8s.config._ import org.apache.spark.internal.Logging @@ -43,7 +43,7 @@ private[spark] class HadoopStepsOrchestrator( submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY) private val maybeRenewerPrincipal = submissionSparkConf.get(KUBERNETES_KERBEROS_RENEWER_PRINCIPAL) - private val hadoopConfigurationFiles = getHadoopConfFiles(hadoopConfDir) + private val hadoopConfigurationFiles = HadoopConfUtils.getHadoopConfFiles(hadoopConfDir) private val hadoopUGI = new HadoopUGIUtilImpl logInfo(s"Hadoop Conf directory: $hadoopConfDir") @@ -70,8 +70,7 @@ private[spark] class HadoopStepsOrchestrator( def getHadoopSteps(): Seq[HadoopConfigurationStep] = { val hadoopConfBootstrapImpl = new HadoopConfBootstrapImpl( hadoopConfigMapName, - hadoopConfigurationFiles, - hadoopUGI) + hadoopConfigurationFiles) val hadoopConfMounterStep = new HadoopConfMounterStep( hadoopConfigMapName, hadoopConfigurationFiles, @@ -95,13 +94,4 @@ private[spark] class HadoopStepsOrchestrator( } Seq(hadoopConfMounterStep) ++ maybeKerberosStep.toSeq } - - private def getHadoopConfFiles(path: String) : Seq[File] = { - val dir = new File(path) - if (dir.isDirectory) { - dir.listFiles.flatMap { file => Some(file).filter(_.isFile) }.toSeq - } else { - Seq.empty[File] - } - } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index bd90766d07002..7ff0b03c37ecc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -21,7 +21,7 @@ import java.io.File import io.fabric8.kubernetes.client.Config import org.apache.spark.SparkContext -import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrapImpl, HadoopConfSparkUserBootstrapImpl, HadoopUGIUtilImpl, InitContainerResourceStagingServerSecretPluginImpl, KerberosTokenConfBootstrapImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl} +import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrapImpl, HadoopConfSparkUserBootstrapImpl, HadoopConfUtils, HadoopUGIUtilImpl, InitContainerResourceStagingServerSecretPluginImpl, KerberosTokenConfBootstrapImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl} @@ -87,14 +87,12 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit sparkConf) } - val hadoopUtil = new HadoopUGIUtilImpl val hadoopBootStrap = maybeHadoopConfigMap.map{ hadoopConfigMap => val hadoopConfigurations = maybeHadoopConfDir.map( - conf_dir => getHadoopConfFiles(conf_dir)).getOrElse(Array.empty[File]) + conf_dir => HadoopConfUtils.getHadoopConfFiles(conf_dir)).getOrElse(Seq.empty[File]) new HadoopConfBootstrapImpl( hadoopConfigMap, - hadoopConfigurations, - hadoopUtil) + hadoopConfigurations) } val kerberosBootstrap = @@ -109,6 +107,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit Utils.getCurrentUserName() ) } } + val hadoopUtil = new HadoopUGIUtilImpl val hadoopUserBootstrap = if (hadoopBootStrap.isDefined && kerberosBootstrap.isEmpty) { Some(new HadoopConfSparkUserBootstrapImpl(hadoopUtil)) @@ -202,13 +201,4 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) } - - private def getHadoopConfFiles(path: String) : Array[File] = { - val dir = new File(path) - if (dir.isDirectory) { - dir.listFiles.flatMap { file => Some(file).filter(_.isFile) } - } else { - Array.empty[File] - } - } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala index 03ada4090a56a..8113a965ecd5a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala @@ -23,12 +23,11 @@ import scala.collection.JavaConverters._ import com.google.common.io.Files import io.fabric8.kubernetes.api.model._ -import org.mockito.{Mock, MockitoAnnotations} -import org.mockito.Mockito.when +import org.mockito.MockitoAnnotations import org.scalatest.BeforeAndAfter import org.apache.spark.SparkFunSuite -import org.apache.spark.deploy.k8s.{HadoopConfBootstrapImpl, HadoopUGIUtilImpl, PodWithMainContainer} +import org.apache.spark.deploy.k8s.{HadoopConfBootstrapImpl, PodWithMainContainer} import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.util.Utils @@ -38,19 +37,14 @@ private[spark] class HadoopConfBootstrapSuite extends SparkFunSuite with BeforeA private val HADOOP_FILES = Seq(TEMP_HADOOP_FILE) private val SPARK_USER_VALUE = "sparkUser" - @Mock - private var hadoopUtil: HadoopUGIUtilImpl = _ - before { MockitoAnnotations.initMocks(this) - when(hadoopUtil.getShortUserName).thenReturn(SPARK_USER_VALUE) } test("Test of bootstrapping hadoop_conf_dir files") { val hadoopConfStep = new HadoopConfBootstrapImpl( CONFIG_MAP_NAME, - HADOOP_FILES, - hadoopUtil) + HADOOP_FILES) val expectedKeyPaths = Seq( new KeyToPathBuilder() .withKey(TEMP_HADOOP_FILE.toPath.getFileName.toString) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 47493c827ddb5..be2004b6eca13 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -351,8 +351,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef val hadoopFiles = Seq(hadoopFile) val hadoopBootsrap = new HadoopConfBootstrapImpl( hadoopConfConfigMapName = configName, - hadoopConfigFiles = hadoopFiles, - hadoopUGI = hadoopUGI) + hadoopConfigFiles = hadoopFiles) val factory = new ExecutorPodFactoryImpl( conf, @@ -388,8 +387,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef val hadoopFiles = Seq(hadoopFile) val hadoopBootstrap = new HadoopConfBootstrapImpl( hadoopConfConfigMapName = configName, - hadoopConfigFiles = hadoopFiles, - hadoopUGI = hadoopUGI) + hadoopConfigFiles = hadoopFiles) val hadoopUserBootstrap = new HadoopConfSparkUserBootstrapImpl(hadoopUGI) val factory = new ExecutorPodFactoryImpl( @@ -427,8 +425,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef val hadoopFiles = Seq(hadoopFile) val hadoopBootstrap = new HadoopConfBootstrapImpl( hadoopConfConfigMapName = configName, - hadoopConfigFiles = hadoopFiles, - hadoopUGI = hadoopUGI) + hadoopConfigFiles = hadoopFiles) val secretName = "secret-test" val secretItemKey = "item-test" val userName = "sparkUser"