diff --git a/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala b/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala new file mode 100644 index 000000000000..19639420b8b9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.io.File + +import com.fasterxml.jackson.core.JsonParseException +import org.json4s.{DefaultFormats, MappingException} +import org.json4s.JsonAST.JValue +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.util.Utils.executeAndGetOutput + +/** + * Discovers resources (GPUs/FPGAs/etc). It currently only supports resources that have + * addresses. + * This class finds resources by running and parsing the output of the user specified script + * from the config spark.{driver/executor}.resource.{resourceType}.discoveryScript. + * The output of the script it runs is expected to be JSON in the format of the + * ResourceInformation class. + * + * For example: {"name": "gpu", "addresses": ["0","1"]} + */ +private[spark] object ResourceDiscoverer extends Logging { + + private implicit val formats = DefaultFormats + + def findResources(sparkConf: SparkConf, isDriver: Boolean): Map[String, ResourceInformation] = { + val prefix = if (isDriver) { + SPARK_DRIVER_RESOURCE_PREFIX + } else { + SPARK_EXECUTOR_RESOURCE_PREFIX + } + // get unique resource types by grabbing first part config with multiple periods, + // ie resourceType.count, grab resourceType part + val resourceNames = sparkConf.getAllWithPrefix(prefix).map { case (k, _) => + k.split('.').head + }.toSet + resourceNames.map { rName => { + val rInfo = getResourceInfoForType(sparkConf, prefix, rName) + (rName -> rInfo) + }}.toMap + } + + private def getResourceInfoForType( + sparkConf: SparkConf, + prefix: String, + resourceType: String): ResourceInformation = { + val discoveryConf = prefix + resourceType + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX + val script = sparkConf.getOption(discoveryConf) + val result = if (script.nonEmpty) { + val scriptFile = new File(script.get) + // check that script exists and try to execute + if (scriptFile.exists()) { + try { + val output = executeAndGetOutput(Seq(script.get), new File(".")) + val parsedJson = parse(output) + val name = (parsedJson \ "name").extract[String] + val addresses = (parsedJson \ "addresses").extract[Array[String]].toArray + new ResourceInformation(name, addresses) + } catch { + case e @ (_: SparkException | _: MappingException | _: JsonParseException) => + throw new SparkException(s"Error running the resource discovery script: $scriptFile" + + s" for $resourceType", e) + } + } else { + throw new SparkException(s"Resource script: $scriptFile to discover $resourceType" + + s" doesn't exist!") + } + } else { + throw new SparkException(s"User is expecting to use $resourceType resources but " + + s"didn't specify a script via conf: $discoveryConf, to find them!") + } + result + } +} diff --git a/core/src/main/scala/org/apache/spark/ResourceInformation.scala b/core/src/main/scala/org/apache/spark/ResourceInformation.scala new file mode 100644 index 000000000000..6a5b725ac21d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ResourceInformation.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.Evolving + +/** + * Class to hold information about a type of Resource. A resource could be a GPU, FPGA, etc. + * The array of addresses are resource specific and its up to the user to interpret the address. + * + * One example is GPUs, where the addresses would be the indices of the GPUs + * + * @param name the name of the resource + * @param addresses an array of strings describing the addresses of the resource + */ +@Evolving +class ResourceInformation( + val name: String, + val addresses: Array[String]) extends Serializable { + + override def toString: String = s"[name: ${name}, addresses: ${addresses.mkString(",")}]" +} diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index f57b731bedee..af01e0b23dad 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -17,6 +17,7 @@ package org.apache.spark.executor +import java.io.{BufferedInputStream, FileInputStream} import java.net.URL import java.nio.ByteBuffer import java.util.Locale @@ -26,12 +27,18 @@ import scala.collection.mutable import scala.util.{Failure, Success} import scala.util.control.NonFatal +import com.fasterxml.jackson.databind.exc.MismatchedInputException +import org.json4s.DefaultFormats +import org.json4s.JsonAST.JArray +import org.json4s.MappingException +import org.json4s.jackson.JsonMethods._ + import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.EXECUTOR_ID +import org.apache.spark.internal.config._ import org.apache.spark.rpc._ import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -45,9 +52,12 @@ private[spark] class CoarseGrainedExecutorBackend( hostname: String, cores: Int, userClassPath: Seq[URL], - env: SparkEnv) + env: SparkEnv, + resourcesFile: Option[String]) extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { + private implicit val formats = DefaultFormats + private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None @@ -58,11 +68,12 @@ private[spark] class CoarseGrainedExecutorBackend( override def onStart() { logInfo("Connecting to driver: " + driverUrl) + val resources = parseOrFindResources(resourcesFile) rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls, - extractAttributes)) + extractAttributes, resources)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => @@ -72,6 +83,97 @@ private[spark] class CoarseGrainedExecutorBackend( }(ThreadUtils.sameThread) } + // Check that the actual resources discovered will satisfy the user specified + // requirements and that they match the configs specified by the user to catch + // mismatches between what the user requested and what resource manager gave or + // what the discovery script found. + private def checkResourcesMeetRequirements( + resourceConfigPrefix: String, + reqResourcesAndCounts: Array[(String, String)], + actualResources: Map[String, ResourceInformation]): Unit = { + + reqResourcesAndCounts.foreach { case (rName, reqCount) => + if (actualResources.contains(rName)) { + val resourceInfo = actualResources(rName) + + if (resourceInfo.addresses.size < reqCount.toLong) { + throw new SparkException(s"Resource: $rName with addresses: " + + s"${resourceInfo.addresses.mkString(",")} doesn't meet the " + + s"requirements of needing $reqCount of them") + } + // also make sure the resource count on start matches the + // resource configs specified by user + val userCountConfigName = + resourceConfigPrefix + rName + SPARK_RESOURCE_COUNT_POSTFIX + val userConfigCount = env.conf.getOption(userCountConfigName). + getOrElse(throw new SparkException(s"Resource: $rName not specified " + + s"via config: $userCountConfigName, but required, " + + "please fix your configuration")) + + if (userConfigCount.toLong > resourceInfo.addresses.size) { + throw new SparkException(s"Resource: $rName, with addresses: " + + s"${resourceInfo.addresses.mkString(",")} " + + s"is less than what the user requested for count: $userConfigCount, " + + s"via $userCountConfigName") + } + } else { + throw new SparkException(s"Executor resource config missing required task resource: $rName") + } + } + } + + // visible for testing + def parseOrFindResources(resourcesFile: Option[String]): Map[String, ResourceInformation] = { + // only parse the resources if a task requires them + val taskResourceConfigs = env.conf.getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX) + val resourceInfo = if (taskResourceConfigs.nonEmpty) { + val execResources = resourcesFile.map { resourceFileStr => { + val source = new BufferedInputStream(new FileInputStream(resourceFileStr)) + val resourceMap = try { + val parsedJson = parse(source).asInstanceOf[JArray].arr + parsedJson.map { json => + val name = (json \ "name").extract[String] + val addresses = (json \ "addresses").extract[Array[String]] + new ResourceInformation(name, addresses) + }.map(x => (x.name -> x)).toMap + } catch { + case e @ (_: MappingException | _: MismatchedInputException) => + throw new SparkException( + s"Exception parsing the resources in $resourceFileStr", e) + } finally { + source.close() + } + resourceMap + }}.getOrElse(ResourceDiscoverer.findResources(env.conf, isDriver = false)) + + if (execResources.isEmpty) { + throw new SparkException("User specified resources per task via: " + + s"$SPARK_TASK_RESOURCE_PREFIX, but can't find any resources available on the executor.") + } + // get just the map of resource name to count + val resourcesAndCounts = taskResourceConfigs. + withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_COUNT_POSTFIX)}. + map { case (k, v) => (k.dropRight(SPARK_RESOURCE_COUNT_POSTFIX.size), v)} + + checkResourcesMeetRequirements(SPARK_EXECUTOR_RESOURCE_PREFIX, resourcesAndCounts, + execResources) + + logInfo("===============================================================================") + logInfo(s"Executor $executorId Resources:") + execResources.foreach { case (k, v) => logInfo(s"$k -> $v") } + logInfo("===============================================================================") + + execResources + } else { + if (resourcesFile.nonEmpty) { + logWarning(s"A resources file was specified but the application is not configured " + + s"to use any resources, see the configs with prefix: ${SPARK_TASK_RESOURCE_PREFIX}") + } + Map.empty[String, ResourceInformation] + } + resourceInfo + } + def extractLogUrls: Map[String, String] = { val prefix = "SPARK_LOG_URL_" sys.env.filterKeys(_.startsWith(prefix)) @@ -189,13 +291,14 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { cores: Int, appId: String, workerUrl: Option[String], - userClassPath: mutable.ListBuffer[URL]) + userClassPath: mutable.ListBuffer[URL], + resourcesFile: Option[String]) def main(args: Array[String]): Unit = { val createFn: (RpcEnv, Arguments, SparkEnv) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) => new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.hostname, arguments.cores, arguments.userClassPath, env) + arguments.hostname, arguments.cores, arguments.userClassPath, env, arguments.resourcesFile) } run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) System.exit(0) @@ -257,6 +360,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { var executorId: String = null var hostname: String = null var cores: Int = 0 + var resourcesFile: Option[String] = None var appId: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBuffer[URL]() @@ -276,6 +380,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { case ("--cores") :: value :: tail => cores = value.toInt argv = tail + case ("--resourcesFile") :: value :: tail => + resourcesFile = Some(value) + argv = tail case ("--app-id") :: value :: tail => appId = value argv = tail @@ -301,7 +408,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } Arguments(driverUrl, executorId, hostname, cores, appId, workerUrl, - userClassPath) + userClassPath, resourcesFile) } private def printUsageAndExit(classNameForEntry: String): Unit = { @@ -315,6 +422,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { | --executor-id | --hostname | --cores + | --resourcesFile | --app-id | --worker-url | --user-class-path diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 8e59ce710170..0aed1af023f8 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -30,6 +30,13 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.MAX_ package object config { + private[spark] val SPARK_DRIVER_RESOURCE_PREFIX = "spark.driver.resource." + private[spark] val SPARK_EXECUTOR_RESOURCE_PREFIX = "spark.executor.resource." + private[spark] val SPARK_TASK_RESOURCE_PREFIX = "spark.task.resource." + + private[spark] val SPARK_RESOURCE_COUNT_POSTFIX = ".count" + private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX = ".discoveryScript" + private[spark] val DRIVER_CLASS_PATH = ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index afb48a31754f..89425e702677 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster import java.nio.ByteBuffer +import org.apache.spark.ResourceInformation import org.apache.spark.TaskState.TaskState import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.ExecutorLossReason @@ -64,7 +65,8 @@ private[spark] object CoarseGrainedClusterMessages { hostname: String, cores: Int, logUrls: Map[String, String], - attributes: Map[String, String]) + attributes: Map[String, String], + resources: Map[String, ResourceInformation]) extends CoarseGrainedClusterMessage case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4830d0e6f800..f7cf212d0bfe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -185,7 +185,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, attributes) => + case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, + attributes, resources) => if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 061aeb366cf5..dc4f4b4c66d9 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -174,9 +174,11 @@ class HeartbeatReceiverSuite val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1) val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2) fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( - RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty, Map.empty)) + RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty, Map.empty, + Map.empty)) fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( - RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty, Map.empty)) + RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty, Map.empty, + Map.empty)) heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) addExecutorAndVerify(executorId1) addExecutorAndVerify(executorId2) diff --git a/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala b/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala new file mode 100644 index 000000000000..77f8a0bae742 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.io.File +import java.nio.charset.StandardCharsets +import java.nio.file.{Files => JavaFiles} +import java.nio.file.attribute.PosixFilePermission._ +import java.util.EnumSet + +import com.google.common.io.Files + +import org.apache.spark.internal.config._ +import org.apache.spark.util.Utils + +class ResourceDiscovererSuite extends SparkFunSuite + with LocalSparkContext { + + def mockDiscoveryScript(file: File, result: String): String = { + Files.write(s"echo $result", file, StandardCharsets.UTF_8) + JavaFiles.setPosixFilePermissions(file.toPath(), + EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) + file.getPath() + } + + test("Resource discoverer no resources") { + val sparkconf = new SparkConf + val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = false) + assert(resources.size === 0) + assert(resources.get("gpu").isEmpty, + "Should have a gpus entry that is empty") + } + + test("Resource discoverer multiple gpus") { + val sparkconf = new SparkConf + assume(!(Utils.isWindows)) + withTempDir { dir => + val gpuFile = new File(dir, "gpuDiscoverScript") + val scriptPath = mockDiscoveryScript(gpuFile, + """'{"name": "gpu","addresses":["0", "1"]}'""") + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, scriptPath) + val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = false) + val gpuValue = resources.get("gpu") + assert(gpuValue.nonEmpty, "Should have a gpu entry") + assert(gpuValue.get.name == "gpu", "name should be gpu") + assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes") + assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries") + } + } + + test("Resource discoverer no addresses errors") { + val sparkconf = new SparkConf + assume(!(Utils.isWindows)) + withTempDir { dir => + val gpuFile = new File(dir, "gpuDiscoverScript") + val scriptPath = mockDiscoveryScript(gpuFile, + """'{"name": "gpu"}'""") + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, scriptPath) + val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = false) + val gpuValue = resources.get("gpu") + assert(gpuValue.nonEmpty, "Should have a gpu entry") + assert(gpuValue.get.name == "gpu", "name should be gpu") + assert(gpuValue.get.addresses.size == 0, "Should have 0 indexes") + } + } + + test("Resource discoverer multiple resource types") { + val sparkconf = new SparkConf + assume(!(Utils.isWindows)) + withTempDir { dir => + val gpuFile = new File(dir, "gpuDiscoverScript") + val gpuDiscovery = mockDiscoveryScript(gpuFile, + """'{"name": "gpu", "addresses": ["0", "1"]}'""") + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, gpuDiscovery) + + val fpgaFile = new File(dir, "fpgaDiscoverScript") + val fpgaDiscovery = mockDiscoveryScript(fpgaFile, + """'{"name": "fpga", "addresses": ["f1", "f2", "f3"]}'""") + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, fpgaDiscovery) + + val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = false) + assert(resources.size === 2) + val gpuValue = resources.get("gpu") + assert(gpuValue.nonEmpty, "Should have a gpu entry") + assert(gpuValue.get.name == "gpu", "name should be gpu") + assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes") + assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries") + + val fpgaValue = resources.get("fpga") + assert(fpgaValue.nonEmpty, "Should have a gpu entry") + assert(fpgaValue.get.name == "fpga", "name should be fpga") + assert(fpgaValue.get.addresses.size == 3, "Should have 3 indexes") + assert(fpgaValue.get.addresses.deep == Array("f1", "f2", "f3").deep, + "should have f1,f2,f3 entries") + } + } + + test("Resource discoverer multiple gpus on driver") { + val sparkconf = new SparkConf + assume(!(Utils.isWindows)) + withTempDir { dir => + val gpuFile = new File(dir, "gpuDiscoverScript") + val gpuDiscovery = mockDiscoveryScript(gpuFile, + """'{"name": "gpu", "addresses": ["0", "1"]}'""") + sparkconf.set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, gpuDiscovery) + sparkconf set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, "boguspath") + // make sure it reads from correct config, here it should use driver + val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = true) + val gpuValue = resources.get("gpu") + assert(gpuValue.nonEmpty, "Should have a gpu entry") + assert(gpuValue.get.name == "gpu", "name should be gpu") + assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes") + assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries") + } + } + + test("Resource discoverer script returns invalid format") { + val sparkconf = new SparkConf + assume(!(Utils.isWindows)) + withTempDir { dir => + val gpuFile = new File(dir, "gpuDiscoverScript") + val gpuDiscovery = mockDiscoveryScript(gpuFile, + """'{"addresses": ["0", "1"]}'""") + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, gpuDiscovery) + val error = intercept[SparkException] { + ResourceDiscoverer.findResources(sparkconf, isDriver = false) + }.getMessage() + + assert(error.contains("Error running the resource discovery")) + } + } + + test("Resource discoverer script doesn't exist") { + val sparkconf = new SparkConf + withTempDir { dir => + val file1 = new File(dir, "bogusfilepath") + try { + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, file1.getPath()) + + val error = intercept[SparkException] { + ResourceDiscoverer.findResources(sparkconf, isDriver = false) + }.getMessage() + + assert(error.contains("doesn't exist")) + } finally { + JavaFiles.deleteIfExists(file1.toPath()) + } + } + } + + test("gpu's specified but not discovery script") { + val sparkconf = new SparkConf + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_COUNT_POSTFIX, "2") + + val error = intercept[SparkException] { + ResourceDiscoverer.findResources(sparkconf, isDriver = false) + }.getMessage() + + assert(error.contains("User is expecting to use")) + } + +} diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 1ed2a1a2aeb4..39daf51ee613 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -497,7 +497,8 @@ class StandaloneDynamicAllocationSuite val endpointRef = mock(classOf[RpcEndpointRef]) val mockAddress = mock(classOf[RpcAddress]) when(endpointRef.address).thenReturn(mockAddress) - val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty, Map.empty) + val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty, Map.empty, + Map.empty) // Get "localhost" on a blacklist. val taskScheduler = mock(classOf[TaskSchedulerImpl]) @@ -621,7 +622,8 @@ class StandaloneDynamicAllocationSuite val endpointRef = mock(classOf[RpcEndpointRef]) val mockAddress = mock(classOf[RpcAddress]) when(endpointRef.address).thenReturn(mockAddress) - val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty) + val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty, + Map.empty) val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] backend.driverEndpoint.askSync[Boolean](message) } diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala new file mode 100644 index 000000000000..e59d1b0e047b --- /dev/null +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + + +import java.io.{File, PrintWriter} +import java.net.URL +import java.nio.charset.StandardCharsets +import java.nio.file.{Files => JavaFiles} +import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, OWNER_WRITE} +import java.util.EnumSet + +import com.google.common.io.Files +import org.json4s.JsonAST.{JArray, JObject, JString} +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods.{compact, render} +import org.mockito.Mockito.when +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config._ +import org.apache.spark.rpc.RpcEnv +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.Utils + +class CoarseGrainedExecutorBackendSuite extends SparkFunSuite + with LocalSparkContext with MockitoSugar { + + private def writeFileWithJson(dir: File, strToWrite: JArray): String = { + val f1 = File.createTempFile("test-resource-parser1", "", dir) + JavaFiles.write(f1.toPath(), compact(render(strToWrite)).getBytes()) + f1.getPath() + } + + test("parsing no resources") { + val conf = new SparkConf + conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2") + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + + // we don't really use this, just need it to get at the parser function + val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", + 4, Seq.empty[URL], env, None) + withTempDir { tmpDir => + val testResourceArgs: JObject = ("" -> "") + val ja = JArray(List(testResourceArgs)) + val f1 = writeFileWithJson(tmpDir, ja) + var error = intercept[SparkException] { + val parsedResources = backend.parseOrFindResources(Some(f1)) + }.getMessage() + + assert(error.contains("Exception parsing the resources in"), + s"Calling with no resources didn't error as expected, error: $error") + } + } + + test("parsing one resources") { + val conf = new SparkConf + conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2") + conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2") + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + // we don't really use this, just need it to get at the parser function + val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", + 4, Seq.empty[URL], env, None) + withTempDir { tmpDir => + val testResourceArgs = + ("name" -> "gpu") ~ + ("addresses" -> Seq("0", "1")) + val ja = JArray(List(testResourceArgs)) + val f1 = writeFileWithJson(tmpDir, ja) + val parsedResources = backend.parseOrFindResources(Some(f1)) + + assert(parsedResources.size === 1) + assert(parsedResources.get("gpu").nonEmpty) + assert(parsedResources.get("gpu").get.name === "gpu") + assert(parsedResources.get("gpu").get.addresses.deep === Array("0", "1").deep) + } + } + + test("parsing multiple resources") { + val conf = new SparkConf + conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_POSTFIX, "3") + conf.set(SPARK_TASK_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_POSTFIX, "3") + conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2") + conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2") + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + // we don't really use this, just need it to get at the parser function + val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", + 4, Seq.empty[URL], env, None) + + withTempDir { tmpDir => + val gpuArgs = + ("name" -> "gpu") ~ + ("addresses" -> Seq("0", "1")) + val fpgaArgs = + ("name" -> "fpga") ~ + ("addresses" -> Seq("f1", "f2", "f3")) + val ja = JArray(List(gpuArgs, fpgaArgs)) + val f1 = writeFileWithJson(tmpDir, ja) + val parsedResources = backend.parseOrFindResources(Some(f1)) + + assert(parsedResources.size === 2) + assert(parsedResources.get("gpu").nonEmpty) + assert(parsedResources.get("gpu").get.name === "gpu") + assert(parsedResources.get("gpu").get.addresses.deep === Array("0", "1").deep) + assert(parsedResources.get("fpga").nonEmpty) + assert(parsedResources.get("fpga").get.name === "fpga") + assert(parsedResources.get("fpga").get.addresses.deep === Array("f1", "f2", "f3").deep) + } + } + + test("error checking parsing resources and executor and task configs") { + val conf = new SparkConf + conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2") + conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2") + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + // we don't really use this, just need it to get at the parser function + val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", + 4, Seq.empty[URL], env, None) + + // not enough gpu's on the executor + withTempDir { tmpDir => + val gpuArgs = + ("name" -> "gpu") ~ + ("addresses" -> Seq("0")) + val ja = JArray(List(gpuArgs)) + val f1 = writeFileWithJson(tmpDir, ja) + + var error = intercept[SparkException] { + val parsedResources = backend.parseOrFindResources(Some(f1)) + }.getMessage() + + assert(error.contains("doesn't meet the requirements of needing")) + } + + // missing resource on the executor + withTempDir { tmpDir => + val gpuArgs = + ("name" -> "fpga") ~ + ("addresses" -> Seq("0")) + val ja = JArray(List(gpuArgs)) + val f1 = writeFileWithJson(tmpDir, ja) + + var error = intercept[SparkException] { + val parsedResources = backend.parseOrFindResources(Some(f1)) + }.getMessage() + + assert(error.contains("Executor resource config missing required task resource")) + } + } + + test("executor resource found less than required") { + val conf = new SparkConf + conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "4") + conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "1") + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + // we don't really use this, just need it to get at the parser function + val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", + 4, Seq.empty[URL], env, None) + + // executor resources < required + withTempDir { tmpDir => + val gpuArgs = + ("name" -> "gpu") ~ + ("addresses" -> Seq("0", "1")) + val ja = JArray(List(gpuArgs)) + val f1 = writeFileWithJson(tmpDir, ja) + + var error = intercept[SparkException] { + val parsedResources = backend.parseOrFindResources(Some(f1)) + }.getMessage() + + assert(error.contains("is less than what the user requested for count")) + } + } + + test("parsing resources task configs with missing executor config") { + val conf = new SparkConf + conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2") + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + // we don't really use this, just need it to get at the parser function + val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", + 4, Seq.empty[URL], env, None) + + withTempDir { tmpDir => + val gpuArgs = + ("name" -> "gpu") ~ + ("addresses" -> Seq("0", "1")) + val ja = JArray(List(gpuArgs)) + val f1 = writeFileWithJson(tmpDir, ja) + + var error = intercept[SparkException] { + val parsedResources = backend.parseOrFindResources(Some(f1)) + }.getMessage() + + assert(error.contains("Resource: gpu not specified via config: " + + "spark.executor.resource.gpu.count, but required, please " + + "fix your configuration")) + } + } + + test("use discoverer") { + val conf = new SparkConf + conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_POSTFIX, "3") + conf.set(SPARK_TASK_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_POSTFIX, "3") + assume(!(Utils.isWindows)) + withTempDir { dir => + val fpgaDiscovery = new File(dir, "resourceDiscoverScriptfpga") + Files.write("""echo '{"name": "fpga","addresses":["f1", "f2", "f3"]}'""", + fpgaDiscovery, StandardCharsets.UTF_8) + JavaFiles.setPosixFilePermissions(fpgaDiscovery.toPath(), + EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) + conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, fpgaDiscovery.getPath()) + + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + + // we don't really use this, just need it to get at the parser function + val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", + 4, Seq.empty[URL], env, None) + + val parsedResources = backend.parseOrFindResources(None) + + assert(parsedResources.size === 1) + assert(parsedResources.get("fpga").nonEmpty) + assert(parsedResources.get("fpga").get.name === "fpga") + assert(parsedResources.get("fpga").get.addresses.deep === Array("f1", "f2", "f3").deep) + } + } + + private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = { + val mockEnv = mock[SparkEnv] + val mockRpcEnv = mock[RpcEnv] + when(mockEnv.conf).thenReturn(conf) + when(mockEnv.serializer).thenReturn(serializer) + when(mockEnv.closureSerializer).thenReturn(serializer) + when(mockEnv.rpcEnv).thenReturn(mockRpcEnv) + SparkEnv.set(mockEnv) + mockEnv + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 33f48b858a73..4858d38cad40 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -164,11 +164,11 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo sc.addSparkListener(listener) backend.driverEndpoint.askSync[Boolean]( - RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls, attributes)) + RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty)) backend.driverEndpoint.askSync[Boolean]( - RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls, attributes)) + RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty)) backend.driverEndpoint.askSync[Boolean]( - RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes)) + RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty)) sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis) assert(executorAddedCount === 3) diff --git a/docs/configuration.md b/docs/configuration.md index da2d279e2837..d0b2699a5dc7 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -221,6 +221,25 @@ of the most common options to set are: This option is currently supported on YARN and Kubernetes. + + spark.executor.resource.{resourceType}.count + 0 + + The number of a particular resource type to use per executor process. + If this is used, you must also specify the + spark.executor.resource.{resourceType}.discoveryScript + for the executor to find the resource on startup. + + + + spark.executor.resource.{resourceType}.discoveryScript + None + + A script for the executor to run to discover a particular resource type. This should + write to STDOUT a JSON string in the format of the ResourceInformation class. This has a + name and an array of addresses. + + spark.extraListeners (none) @@ -1793,6 +1812,15 @@ Apart from these, the following properties are also available, and may be useful Number of cores to allocate for each task. + + spark.task.resource.{resourceType}.count + 1 + + Number of a particular resource type to allocate for each task. If this is specified + you must also provide the executor config spark.executor.resource.{resourceType}.count + and any corresponding discovery configs so that your executors are created with that resource type. + + spark.task.maxFailures 4 diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 37c0f5f45075..34b5f6cd19bb 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -692,7 +692,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite val mockEndpointRef = mock[RpcEndpointRef] val mockAddress = mock[RpcAddress] val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty, - Map.empty) + Map.empty, Map.empty) backend.driverEndpoint.askSync[Boolean](message) } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala index 53e99d992db8..380da361e5fc 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala @@ -37,7 +37,8 @@ private[spark] class YarnCoarseGrainedExecutorBackend( hostname: String, cores: Int, userClassPath: Seq[URL], - env: SparkEnv) + env: SparkEnv, + resourcesFile: Option[String]) extends CoarseGrainedExecutorBackend( rpcEnv, driverUrl, @@ -45,7 +46,8 @@ private[spark] class YarnCoarseGrainedExecutorBackend( hostname, cores, userClassPath, - env) with Logging { + env, + resourcesFile) with Logging { private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf) @@ -66,7 +68,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging { val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) => new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.hostname, arguments.cores, arguments.userClassPath, env) + arguments.hostname, arguments.cores, arguments.userClassPath, env, arguments.resourcesFile) } val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$"))