From 9428c1ef6bb1acde2625a6634a144a88ad300857 Mon Sep 17 00:00:00 2001 From: Zhenzhao Wang Date: Wed, 12 Aug 2020 21:29:58 -0700 Subject: [PATCH 1/2] YARN-10398. Fix the bug to make sure only application master upload resource to yarn shared cache manager. --- .../mapreduce/v2/app/job/impl/JobImpl.java | 3 +- .../v2/app/job/impl/TestJobImpl.java | 23 ++++++++++ .../java/org/apache/hadoop/mapreduce/Job.java | 45 +++++++++++-------- 3 files changed, 51 insertions(+), 20 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 8ee097fe243e1..0e2604681db42 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -1425,7 +1425,8 @@ public static String escapeString(String data) { * be set up to false. In that way, the NMs that host the task containers * won't try to upload the resources to shared cache. */ - private static void cleanupSharedCacheUploadPolicies(Configuration conf) { + @VisibleForTesting + static void cleanupSharedCacheUploadPolicies(Configuration conf) { Job.setArchiveSharedCacheUploadPolicies(conf, Collections.emptyMap()); Job.setFileSharedCacheUploadPolicies(conf, Collections.emptyMap()); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 122fb9bf3d040..5f378e4f9c3fa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -39,6 +39,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -1001,6 +1002,28 @@ public void testJobPriorityUpdate() throws Exception { Assert.assertEquals(updatedPriority, jobPriority); } + @Test + public void testCleanupSharedCacheUploadPolicies() { + Configuration config = new Configuration(); + Map archivePolicies = new HashMap<>(); + archivePolicies.put("archive1", true); + archivePolicies.put("archive2", true); + Job.setArchiveSharedCacheUploadPolicies(config, archivePolicies); + Map filePolicies = new HashMap<>(); + filePolicies.put("file1", true); + filePolicies.put("jar1", true); + Job.setFileSharedCacheUploadPolicies(config, filePolicies); + Assert.assertEquals( + 2, Job.getArchiveSharedCacheUploadPolicies(config).size()); + Assert.assertEquals( + 2, Job.getFileSharedCacheUploadPolicies(config).size()); + JobImpl.cleanupSharedCacheUploadPolicies(config); + Assert.assertEquals( + 0, Job.getArchiveSharedCacheUploadPolicies(config).size()); + Assert.assertEquals( + 0, Job.getFileSharedCacheUploadPolicies(config).size()); + } + private static CommitterEventHandler createCommitterEventHandler( Dispatcher dispatcher, OutputCommitter committer) { final SystemClock clock = SystemClock.getInstance(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index 31e2057e8dfbf..2a6e77401ec16 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -1450,26 +1450,33 @@ public static void setArchiveSharedCacheUploadPolicies(Configuration conf, */ private static void setSharedCacheUploadPolicies(Configuration conf, Map policies, boolean areFiles) { - if (policies != null) { - StringBuilder sb = new StringBuilder(); - Iterator> it = policies.entrySet().iterator(); - Map.Entry e; - if (it.hasNext()) { - e = it.next(); - sb.append(e.getKey() + DELIM + e.getValue()); - } else { - // policies is an empty map, just skip setting the parameter - return; - } - while (it.hasNext()) { - e = it.next(); - sb.append("," + e.getKey() + DELIM + e.getValue()); - } - String confParam = - areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES - : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES; - conf.set(confParam, sb.toString()); + String confParam = areFiles ? + MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES : + MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES; + conf.set(confParam, populateSharedCacheUploadPolicies(policies)); + } + + private static String populateSharedCacheUploadPolicies( + Map policies) { + // If policies are an empty map or null, we will set EMPTY_STRING. + // In other words, cleaning up existing policies. This is useful when we + // try to clean up shared cache upload policies for non-application + // master tasks. See YARN-10398 for details. + if (policies == null || policies.size() == 0) { + return ""; + } + StringBuilder sb = new StringBuilder(); + Iterator> it = policies.entrySet().iterator(); + Map.Entry e; + if (it.hasNext()) { + e = it.next(); + sb.append(e.getKey() + DELIM + e.getValue()); } + while (it.hasNext()) { + e = it.next(); + sb.append("," + e.getKey() + DELIM + e.getValue()); + } + return sb.toString(); } /** From b356f239e3559b5bf41a950a33e903b553f8c38d Mon Sep 17 00:00:00 2001 From: Zhenzhao Wang Date: Mon, 14 Sep 2020 23:52:58 -0700 Subject: [PATCH 2/2] Adress comments and rebase code --- .../java/org/apache/hadoop/mapreduce/Job.java | 30 ++++++------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index 2a6e77401ec16..9a998dacd9820 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -1453,30 +1453,18 @@ private static void setSharedCacheUploadPolicies(Configuration conf, String confParam = areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES; - conf.set(confParam, populateSharedCacheUploadPolicies(policies)); - } - - private static String populateSharedCacheUploadPolicies( - Map policies) { - // If policies are an empty map or null, we will set EMPTY_STRING. - // In other words, cleaning up existing policies. This is useful when we - // try to clean up shared cache upload policies for non-application - // master tasks. See YARN-10398 for details. + // If no policy is provided, we will reset the config by setting an empty + // string value. In other words, cleaning up existing policies. This is + // useful when we try to clean up shared cache upload policies for + // non-application master tasks. See MAPREDUCE-7294 for details. if (policies == null || policies.size() == 0) { - return ""; + conf.set(confParam, ""); + return; } StringBuilder sb = new StringBuilder(); - Iterator> it = policies.entrySet().iterator(); - Map.Entry e; - if (it.hasNext()) { - e = it.next(); - sb.append(e.getKey() + DELIM + e.getValue()); - } - while (it.hasNext()) { - e = it.next(); - sb.append("," + e.getKey() + DELIM + e.getValue()); - } - return sb.toString(); + policies.forEach((k,v) -> sb.append(k).append(DELIM).append(v).append(",")); + sb.deleteCharAt(sb.length() - 1); + conf.set(confParam, sb.toString()); } /**