From 2ebd2b10c6c2e7dd20e3d36af17d02bf6cdcde83 Mon Sep 17 00:00:00 2001 From: liumihust <2270566005@qq.com> Date: Thu, 27 Jul 2017 09:21:16 +0800 Subject: [PATCH] Update DiskBalancer.java --- .../hdfs/server/datanode/DiskBalancer.java | 109 ++++++++++++++---- 1 file changed, 89 insertions(+), 20 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index 6b2cd52f9176b..e3fcf487a49e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -43,11 +43,7 @@ import java.io.IOException; import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.LinkedList; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -84,14 +80,20 @@ public class DiskBalancer { private final BlockMover blockMover; private final ReentrantLock lock; private final ConcurrentHashMap workMap; + private final List workToSubmitList; + private final Set workingVolumeUuids; + private boolean isDiskBalancerEnabled = false; private ExecutorService scheduler; + private ExecutorService executorService; private Future future; private String planID; private String planFile; private DiskBalancerWorkStatus.Result currentResult; private long bandwidth; + private final static int PARALLEL_COPY = 3; + /** * Constructs a Disk Balancer object. This object takes care of reading a * NodePlan and executing it against a set of volumes. @@ -106,9 +108,13 @@ public DiskBalancer(String dataNodeUUID, this.blockMover = blockMover; this.dataset = this.blockMover.getDataset(); this.dataNodeUUID = dataNodeUUID; - scheduler = Executors.newSingleThreadExecutor(); + scheduler = Executors.newSingleThreadExecutor();//submit thread + executorService = Executors.newFixedThreadPool(PARALLEL_COPY);//copy threadPool lock = new ReentrantLock(); - workMap = new ConcurrentHashMap<>(); + workMap = new ConcurrentHashMap<>();//volumePair to WorkItem + workToSubmitList = new ArrayList<>();//works to submit + workingVolumeUuids = Collections.synchronizedSet(new HashSet());//volumes in working status + this.planID = ""; // to keep protobuf happy. this.planFile = ""; // to keep protobuf happy. this.isDiskBalancerEnabled = conf.getBoolean( @@ -132,6 +138,7 @@ public void shutdown() { this.currentResult = Result.PLAN_CANCELLED; this.blockMover.setExitFlag(); scheduler.shutdown(); + executorService.shutdown(); needShutdown = true; } } finally { @@ -152,11 +159,19 @@ private void shutdownExecutor() { if (!scheduler.awaitTermination(secondsTowait, TimeUnit.SECONDS)) { scheduler.shutdownNow(); if (!scheduler.awaitTermination(secondsTowait, TimeUnit.SECONDS)) { - LOG.error("Disk Balancer : Scheduler did not terminate."); + LOG.error("Disk Balancer : Submit Scheduler did not terminate."); + } + } + + if (!executorService.awaitTermination(secondsTowait, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + if (!executorService.awaitTermination(secondsTowait, TimeUnit.SECONDS)) { + LOG.error("Disk Balancer : Copy Scheduler did not terminate."); } } } catch (InterruptedException ex) { scheduler.shutdownNow(); + executorService.shutdown(); Thread.currentThread().interrupt(); } } @@ -274,6 +289,7 @@ public void cancelPlan(String planID) throws DiskBalancerException { this.currentResult = Result.PLAN_CANCELLED; this.blockMover.setExitFlag(); scheduler.shutdown(); + executorService.shutdown(); needShutdown = true; } } finally { @@ -518,26 +534,70 @@ private Map getStorageIDToVolumeBasePathMap() */ private void executePlan() { Preconditions.checkState(lock.isHeldByCurrentThread()); - this.blockMover.setRunnable(); if (this.scheduler.isShutdown()) { this.scheduler = Executors.newSingleThreadExecutor(); } this.future = scheduler.submit(new Runnable() { @Override + //submit works in submitList in round-robin public void run() { - Thread.currentThread().setName("DiskBalancerThread"); - LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}", - planFile, planID); - for (Map.Entry entry : - workMap.entrySet()) { - blockMover.setRunnable(); - blockMover.copyBlocks(entry.getKey(), entry.getValue()); + blockMover.setRunnable(); + while(workToSubmitList.size()>0){ + for(int i = 0; i < workToSubmitList.size(); ){ + String source = workToSubmitList.get(i).getSourceVolUuid(); + String destination = workToSubmitList.get(i).getDestVolUuid(); + if(!workingVolumeUuids.contains(source) && !workingVolumeUuids.contains(destination)){ + migrate(workToSubmitList.get(i), workMap.get(workToSubmitList.get(i))); + workingVolumeUuids.add(source); + workingVolumeUuids.add(destination); + workToSubmitList.remove(i); + }else{ + i++; + } + } + } + //wait the submitted works to finish + while(workingVolumeUuids.size() != 0){ + try{ + Thread.sleep(1000); + }catch (InterruptedException e){ + LOG.error(e.getMessage()); + } + } + executorService.shutdown(); + /*no works in the submit list and no works in working list, + means the end of the copy migration + */ } }); } + /** + * Starts migrate the data between the volume pair + */ + private void migrate(VolumePair volumePair, DiskBalancerWorkItem workItem){ + executorService.submit(new CopyRunnable(volumePair,workItem)); + } + + /** + * Copy Runnable + */ + class CopyRunnable implements Runnable{ + VolumePair volumePair; + DiskBalancerWorkItem workItem; + public CopyRunnable(VolumePair volumePair, DiskBalancerWorkItem workItem){ + this.volumePair = volumePair; + this.workItem = workItem; + } + public void run(){ + blockMover.copyBlocks(volumePair, workItem); + workingVolumeUuids.remove(volumePair.getSourceVolUuid()); + workingVolumeUuids.remove(volumePair.getDestVolUuid()); + } + } + /** * Insert work items to work map. * @param volumePair - VolumePair @@ -559,6 +619,9 @@ private void createWorkPlan(final VolumePair volumePair, Step step) if (workMap.containsKey(volumePair)) { bytesToMove += workMap.get(volumePair).getBytesToCopy(); } + if(!workToSubmitList.contains(volumePair)){ + workToSubmitList.add(volumePair); + } DiskBalancerWorkItem work = new DiskBalancerWorkItem(bytesToMove, 0); @@ -893,6 +956,9 @@ private ExtendedBlock getBlockToCopy(FsVolumeSpi.BlockIterator iter, try { ExtendedBlock block = iter.nextBlock(); + if(block == null){ + return null; + } // A valid block is a finalized block, we iterate until we get // finalized blocks if (!this.dataset.isValidBlock(block)) { @@ -1049,8 +1115,8 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) { "blocks.", source.getBaseURI(), dest.getBaseURI(), item.getBytesCopied(), item.getBlocksCopied()); - this.setExitFlag(); - continue; + //this.setExitFlag(); + break; } ExtendedBlock block = getNextBlock(poolIters, item); @@ -1058,14 +1124,17 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) { if (block == null) { LOG.error("No source blocks, exiting the copy. Source: {}, " + "Dest:{}", source.getBaseURI(), dest.getBaseURI()); - this.setExitFlag(); - continue; + //this.setExitFlag(); + break; } + /* // check if someone told us exit, treat this as an interruption // point // for the thread, since both getNextBlock and moveBlocAcrossVolume // can take some time. + */ + if (!shouldRun()) { continue; }