From c0b108b0f97220890a15fde80a403000bf3fd09b Mon Sep 17 00:00:00 2001 From: Tom Magrino Date: Wed, 22 Jun 2016 11:08:23 -0700 Subject: [PATCH 1/4] Fixed split command in TaskLocation to allow for _ in executor id. Also updated the comment to indicate the string format which includes the executor id. --- .../scala/org/apache/spark/scheduler/TaskLocation.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala index 1eb6c1614fc0..ff48773ae875 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala @@ -64,14 +64,14 @@ private[spark] object TaskLocation { /** * Create a TaskLocation from a string returned by getPreferredLocations. - * These strings have the form [hostname] or hdfs_cache_[hostname], depending on whether the - * location is cached. + * These strings have the form [executorLocationTag][hostname][executorid], [hostname], or hdfs_cache_[hostname], + * depending on whether the location is cached. */ def apply(str: String): TaskLocation = { val hstr = str.stripPrefix(inMemoryLocationTag) if (hstr.equals(str)) { if (str.startsWith(executorLocationTag)) { - val splits = str.split("_") + val splits = str.split("_", 3) if (splits.length != 3) { throw new IllegalArgumentException("Illegal executor location format: " + str) } From c2f0a2c52b8763b62aca5ba8f3b8701039d565ee Mon Sep 17 00:00:00 2001 From: Tom Magrino Date: Wed, 22 Jun 2016 11:20:39 -0700 Subject: [PATCH 2/4] Code style fix. --- .../main/scala/org/apache/spark/scheduler/TaskLocation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala index ff48773ae875..6d9a1cc2c99e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala @@ -64,8 +64,8 @@ private[spark] object TaskLocation { /** * Create a TaskLocation from a string returned by getPreferredLocations. - * These strings have the form [executorLocationTag][hostname][executorid], [hostname], or hdfs_cache_[hostname], - * depending on whether the location is cached. + * These strings have the form [executorLocationTag][hostname][executorid], [hostname], or + * hdfs_cache_[hostname], depending on whether the location is cached. */ def apply(str: String): TaskLocation = { val hstr = str.stripPrefix(inMemoryLocationTag) From 0aed80a6721f83c57b74d93dd225d29cf6e8e937 Mon Sep 17 00:00:00 2001 From: Tom Magrino Date: Tue, 28 Jun 2016 01:01:00 -0700 Subject: [PATCH 3/4] Refactored code and added test case. In response to @srowen and @zsxwing's comments. --- .../org/apache/spark/scheduler/TaskLocation.scala | 11 +++++++---- .../apache/spark/scheduler/TaskSetManagerSuite.scala | 2 ++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala index 6d9a1cc2c99e..e54a562a5634 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala @@ -64,18 +64,21 @@ private[spark] object TaskLocation { /** * Create a TaskLocation from a string returned by getPreferredLocations. - * These strings have the form [executorLocationTag][hostname][executorid], [hostname], or + * These strings have the form executor_[hostname]_[executorid], [hostname], or * hdfs_cache_[hostname], depending on whether the location is cached. */ def apply(str: String): TaskLocation = { val hstr = str.stripPrefix(inMemoryLocationTag) if (hstr.equals(str)) { if (str.startsWith(executorLocationTag)) { - val splits = str.split("_", 3) - if (splits.length != 3) { + val hostAndExecutorId = str.stripPrefix(executorLocationTag) + val splits = hostAndExecutorId.split("_", 2) + if (splits.length != 2) { throw new IllegalArgumentException("Illegal executor location format: " + str) } - new ExecutorCacheTaskLocation(splits(1), splits(2)) + val host = splits(0) + val executorId = splits(1) + new ExecutorCacheTaskLocation(host, executorId) } else { new HostTaskLocation(str) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 1d7c8f4a6185..19d4250675e0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -789,6 +789,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(TaskLocation("host1") === HostTaskLocation("host1")) assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1")) assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3")) + assert(TaskLocation("executor_some.host1_executor_task_3") === + ExecutorCacheTaskLocation("some.host1", "executor_task_3")) } test("Kill other task attempts when one attempt belonging to the same task succeeds") { From ee4708d498655ef8b156a97d388501bb5bf83463 Mon Sep 17 00:00:00 2001 From: Tom Magrino Date: Tue, 28 Jun 2016 01:35:54 -0700 Subject: [PATCH 4/4] Tweak to use require and array deconstruction for cleaner code. --- .../scala/org/apache/spark/scheduler/TaskLocation.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala index e54a562a5634..06b52935c696 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala @@ -73,11 +73,8 @@ private[spark] object TaskLocation { if (str.startsWith(executorLocationTag)) { val hostAndExecutorId = str.stripPrefix(executorLocationTag) val splits = hostAndExecutorId.split("_", 2) - if (splits.length != 2) { - throw new IllegalArgumentException("Illegal executor location format: " + str) - } - val host = splits(0) - val executorId = splits(1) + require(splits.length == 2, "Illegal executor location format: " + str) + val Array(host, executorId) = splits new ExecutorCacheTaskLocation(host, executorId) } else { new HostTaskLocation(str)