Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ class ResourceProfile(
executorResources.get(ResourceProfile.PYSPARK_MEM).map(_.amount)
}

private[spark] def getOverheadMemory: Option[Long] = {
executorResources.get(ResourceProfile.OVERHEAD_MEM).map(_.amount)
}

private[spark] def getExecutorOffHeap: Option[Long] = {
executorResources.get(ResourceProfile.OFFHEAP_MEM).map(_.amount)
}

private[spark] def getExecutorMemory: Option[Long] = {
executorResources.get(ResourceProfile.MEMORY).map(_.amount)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,21 @@ class ResourceProfileSuite extends SparkFunSuite with MockitoSugar {
assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
assert(rprof.executorResources.size === 3,
"Executor resources should contain cores, heap and offheap memory by default")
assert(rprof.executorResources(ResourceProfile.CORES).amount === 1,
assert(rprof.getExecutorCores.get === 1,
"Executor resources should have 1 core")
assert(rprof.getExecutorCores.get === 1,
"Executor resources should have 1 core")
assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 1024,
assert(rprof.getExecutorMemory.get === 1024,
"Executor resources should have 1024 memory")
assert(rprof.executorResources.get(ResourceProfile.PYSPARK_MEM) == None,
assert(rprof.getPySparkMemory == None,
"pyspark memory empty if not specified")
assert(rprof.executorResources.get(ResourceProfile.OVERHEAD_MEM) == None,
assert(rprof.getOverheadMemory == None,
"overhead memory empty if not specified")
assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount === 0,
assert(rprof.getExecutorOffHeap.get === 0,
"Executor resources should have 0 offheap memory")
assert(rprof.taskResources.size === 1,
"Task resources should just contain cpus by default")
assert(rprof.taskResources(ResourceProfile.CPUS).amount === 1,
assert(rprof.getTaskCpus.get === 1,
"Task resources should have 1 cpu")
assert(rprof.getTaskCpus.get === 1,
"Task resources should have 1 cpu")
Expand Down Expand Up @@ -121,17 +121,15 @@ class ResourceProfileSuite extends SparkFunSuite with MockitoSugar {
assert(execResources.size === 6, s"Executor resources should contain cores, pyspark " +
s"memory, memory overhead, memory, offHeap memory and gpu $execResources")
assert(execResources.contains("gpu"), "Executor resources should have gpu")
assert(rprof.executorResources(ResourceProfile.CORES).amount === 4,
"Executor resources should have 4 core")
assert(rprof.getExecutorCores.get === 4,
"Executor resources should have 4 core")
assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
assert(rprof.getExecutorMemory.get === 4096,
"Executor resources should have 1024 memory")
assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount == 2048,
assert(rprof.getPySparkMemory.get == 2048,
"pyspark memory empty if not specified")
assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount == 1024,
assert(rprof.getOverheadMemory.get == 1024,
"overhead memory empty if not specified")
assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount == 3,
assert(rprof.getExecutorOffHeap.get == 3,
"Executor resources should have 3 offHeap memory")
assert(rprof.taskResources.size === 2,
"Task resources should just contain cpus and gpu")
Expand Down Expand Up @@ -238,24 +236,24 @@ class ResourceProfileSuite extends SparkFunSuite with MockitoSugar {
}

test("Create ResourceProfile") {
val rprof = new ResourceProfileBuilder()
val rprofBuilder = new ResourceProfileBuilder()
val taskReq = new TaskResourceRequests().resource("gpu", 1)
val eReq = new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia")
rprof.require(taskReq).require(eReq)
rprofBuilder.require(taskReq).require(eReq)

assert(rprof.executorResources.size === 1)
assert(rprof.executorResources.contains("gpu"),
assert(rprofBuilder.executorResources.size === 1)
assert(rprofBuilder.executorResources.contains("gpu"),
"Executor resources should have gpu")
assert(rprof.executorResources.get("gpu").get.vendor === "nvidia",
assert(rprofBuilder.executorResources.get("gpu").get.vendor === "nvidia",
"gpu vendor should be nvidia")
assert(rprof.executorResources.get("gpu").get.discoveryScript === "myscript",
assert(rprofBuilder.executorResources.get("gpu").get.discoveryScript === "myscript",
"discoveryScript should be myscript")
assert(rprof.executorResources.get("gpu").get.amount === 2,
assert(rprofBuilder.executorResources.get("gpu").get.amount === 2,
"gpu amount should be 2")

assert(rprof.taskResources.size === 1, "Should have 1 task resource")
assert(rprof.taskResources.contains("gpu"), "Task resources should have gpu")
assert(rprof.taskResources.get("gpu").get.amount === 1,
assert(rprofBuilder.taskResources.size === 1, "Should have 1 task resource")
assert(rprofBuilder.taskResources.contains("gpu"), "Task resources should have gpu")
assert(rprofBuilder.taskResources.get("gpu").get.amount === 1,
"Task resources should have 1 gpu")

val ereqs = new ExecutorResourceRequests()
Expand All @@ -264,19 +262,20 @@ class ResourceProfileSuite extends SparkFunSuite with MockitoSugar {
val treqs = new TaskResourceRequests()
treqs.cpus(1)

rprof.require(treqs)
rprof.require(ereqs)
rprofBuilder.require(treqs)
rprofBuilder.require(ereqs)
val rprof = rprofBuilder.build()

assert(rprof.executorResources.size === 6)
assert(rprof.executorResources(ResourceProfile.CORES).amount === 2,
assert(rprof.getExecutorCores.get === 2,
"Executor resources should have 2 cores")
assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
assert(rprof.getExecutorMemory.get === 4096,
"Executor resources should have 4096 memory")
assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount === 2048,
assert(rprof.getOverheadMemory.get === 2048,
"Executor resources should have 2048 overhead memory")
assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 1024,
assert(rprof.getPySparkMemory.get === 1024,
"Executor resources should have 1024 pyspark memory")
assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount === 3072,
assert(rprof.getExecutorOffHeap.get === 3072,
"Executor resources should have 3072 offHeap memory")

assert(rprof.taskResources.size === 2)
Expand Down Expand Up @@ -320,19 +319,20 @@ class ResourceProfileSuite extends SparkFunSuite with MockitoSugar {
}

test("Test ExecutorResourceRequests memory helpers") {
val rprof = new ResourceProfileBuilder()
val rprofBuilder = new ResourceProfileBuilder()
val ereqs = new ExecutorResourceRequests()
ereqs.memory("4g")
ereqs.memoryOverhead("2000m").pysparkMemory("512000k").offHeapMemory("1g")
rprof.require(ereqs)
rprofBuilder.require(ereqs)
val rprof = rprofBuilder.build()

assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
assert(rprof.getExecutorMemory.get === 4096,
"Executor resources should have 4096 memory")
assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount === 2000,
assert(rprof.getOverheadMemory.get === 2000,
"Executor resources should have 2000 overhead memory")
assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 500,
assert(rprof.getPySparkMemory.get === 500,
"Executor resources should have 512 pyspark memory")
assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount === 1024,
assert(rprof.getExecutorOffHeap.get === 1024,
"Executor resources should have 1024 offHeap memory")
}

Expand Down