From 002719c586934c5e40a5201fb8ce57268d7dde99 Mon Sep 17 00:00:00 2001 From: Scheller Date: Wed, 27 Jun 2018 13:17:24 -0700 Subject: [PATCH] Make hadoop consider wildcard host as datalocal This allows hadoop to treat "*" as data local. This allows remote filesystems to increase performance by skipping retrys for data locality by returning "*" as the block host when the filesystem is asked for the block location. --- .../v2/app/job/impl/TaskAttemptImpl.java | 7 +++-- .../v2/app/rm/RMContainerAllocator.java | 27 +++++++++++++++---- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 63e7456e86342..0e3960aab5777 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -162,6 +162,7 @@ public abstract class TaskAttemptImpl implements private static final Logger LOG = LoggerFactory.getLogger(TaskAttemptImpl.class); private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable? + private static final String ANY = "*"; private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); protected final JobConf conf; @@ -683,7 +684,9 @@ public TaskAttemptImpl(TaskId taskId, int i, RackResolver.init(conf); this.dataLocalRacks = new HashSet(); for (String host : this.dataLocalHosts) { - this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation()); + if (!ANY.equals(host)) { + this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation()); + } } locality = Locality.OFF_SWITCH; @@ -1586,7 +1589,7 @@ private void computeRackAndLocality() { locality = Locality.OFF_SWITCH; if (dataLocalHosts.size() > 0) { String cHost = resolveHost(containerNodeId.getHost()); - if (dataLocalHosts.contains(cHost)) { + if (dataLocalHosts.contains(ANY) || dataLocalHosts.contains(cHost)) { locality = Locality.NODE_LOCAL; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 9d030b84e981b..3d09599c1f5ae 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -98,10 +98,11 @@ public class RMContainerAllocator extends RMContainerRequestor implements ContainerAllocator { static final Logger LOG = LoggerFactory.getLogger(RMContainerAllocator.class); - + + static final String ANY = "*"; public static final float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f; - + static final Priority PRIORITY_FAST_FAIL_MAP; static final Priority PRIORITY_REDUCE; static final Priority PRIORITY_MAP; @@ -160,6 +161,8 @@ added to the pending and are ramped up (added to scheduled) based private int hostLocalAssigned = 0; private int rackLocalAssigned = 0; private int lastCompletedTasks = 0; + + private boolean isLocationIrrelevant = false; private boolean recalculateReduceSchedule = false; private Resource mapResourceRequest = Resources.none(); @@ -1121,6 +1124,10 @@ void addMap(ContainerRequestEvent event) { new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression); for (String host : event.getHosts()) { LinkedList list = mapsHostMapping.get(host); + if (ANY.equals(host)) { + LOG.info("Location is irrelevant in map hosts"); + isLocationIrrelevant = true; + } if (list == null) { list = new LinkedList(); mapsHostMapping.put(host, list); @@ -1339,8 +1346,13 @@ else if (PRIORITY_MAP.equals(priority) || PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) { LOG.info("Replacing MAP container " + allocated.getId()); // allocated container was for a map - String host = allocated.getNodeId().getHost(); - LinkedList list = mapsHostMapping.get(host); + LinkedList list; + if (isLocationIrrelevant) { + list = mapsHostMapping.get(ANY); + } else { + String host = allocated.getNodeId().getHost(); + list = mapsHostMapping.get(host); + } if (list != null && list.size() > 0) { TaskAttemptId tId = list.removeLast(); if (maps.containsKey(tId)) { @@ -1405,7 +1417,12 @@ private void assignMapsWithLocality(List allocatedContainers) { // "if (maps.containsKey(tId))" below should be almost always true. // hence this while loop would almost always have O(1) complexity String host = allocated.getNodeId().getHost(); - LinkedList list = mapsHostMapping.get(host); + LinkedList list; + if (isLocationIrrelevant){ + list = mapsHostMapping.get(ANY); + } else { + list = mapsHostMapping.get(host); + } while (list != null && list.size() > 0) { if (LOG.isDebugEnabled()) { LOG.debug("Host matched to the request list " + host);