Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -84,14 +80,20 @@ public class DiskBalancer {
private final BlockMover blockMover;
private final ReentrantLock lock;
private final ConcurrentHashMap<VolumePair, DiskBalancerWorkItem> workMap;
private final List<VolumePair> workToSubmitList;
private final Set<String> workingVolumeUuids;

private boolean isDiskBalancerEnabled = false;
private ExecutorService scheduler;
private ExecutorService executorService;
private Future future;
private String planID;
private String planFile;
private DiskBalancerWorkStatus.Result currentResult;
private long bandwidth;

private final static int PARALLEL_COPY = 3;

/**
* Constructs a Disk Balancer object. This object takes care of reading a
* NodePlan and executing it against a set of volumes.
Expand All @@ -106,9 +108,13 @@ public DiskBalancer(String dataNodeUUID,
this.blockMover = blockMover;
this.dataset = this.blockMover.getDataset();
this.dataNodeUUID = dataNodeUUID;
scheduler = Executors.newSingleThreadExecutor();
scheduler = Executors.newSingleThreadExecutor();//submit thread
executorService = Executors.newFixedThreadPool(PARALLEL_COPY);//copy threadPool
lock = new ReentrantLock();
workMap = new ConcurrentHashMap<>();
workMap = new ConcurrentHashMap<>();//volumePair to WorkItem
workToSubmitList = new ArrayList<>();//works to submit
workingVolumeUuids = Collections.synchronizedSet(new HashSet<String>());//volumes in working status

this.planID = ""; // to keep protobuf happy.
this.planFile = ""; // to keep protobuf happy.
this.isDiskBalancerEnabled = conf.getBoolean(
Expand All @@ -132,6 +138,7 @@ public void shutdown() {
this.currentResult = Result.PLAN_CANCELLED;
this.blockMover.setExitFlag();
scheduler.shutdown();
executorService.shutdown();
needShutdown = true;
}
} finally {
Expand All @@ -152,11 +159,19 @@ private void shutdownExecutor() {
if (!scheduler.awaitTermination(secondsTowait, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
if (!scheduler.awaitTermination(secondsTowait, TimeUnit.SECONDS)) {
LOG.error("Disk Balancer : Scheduler did not terminate.");
LOG.error("Disk Balancer : Submit Scheduler did not terminate.");
}
}

if (!executorService.awaitTermination(secondsTowait, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(secondsTowait, TimeUnit.SECONDS)) {
LOG.error("Disk Balancer : Copy Scheduler did not terminate.");
}
}
} catch (InterruptedException ex) {
scheduler.shutdownNow();
executorService.shutdown();
Thread.currentThread().interrupt();
}
}
Expand Down Expand Up @@ -274,6 +289,7 @@ public void cancelPlan(String planID) throws DiskBalancerException {
this.currentResult = Result.PLAN_CANCELLED;
this.blockMover.setExitFlag();
scheduler.shutdown();
executorService.shutdown();
needShutdown = true;
}
} finally {
Expand Down Expand Up @@ -518,26 +534,70 @@ private Map<String, String> getStorageIDToVolumeBasePathMap()
*/
private void executePlan() {
Preconditions.checkState(lock.isHeldByCurrentThread());
this.blockMover.setRunnable();
if (this.scheduler.isShutdown()) {
this.scheduler = Executors.newSingleThreadExecutor();
}

this.future = scheduler.submit(new Runnable() {
@Override
//submit works in submitList in round-robin
public void run() {
Thread.currentThread().setName("DiskBalancerThread");
LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}",
planFile, planID);
for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
workMap.entrySet()) {
blockMover.setRunnable();
blockMover.copyBlocks(entry.getKey(), entry.getValue());
blockMover.setRunnable();
while(workToSubmitList.size()>0){
for(int i = 0; i < workToSubmitList.size(); ){
String source = workToSubmitList.get(i).getSourceVolUuid();
String destination = workToSubmitList.get(i).getDestVolUuid();
if(!workingVolumeUuids.contains(source) && !workingVolumeUuids.contains(destination)){
migrate(workToSubmitList.get(i), workMap.get(workToSubmitList.get(i)));
workingVolumeUuids.add(source);
workingVolumeUuids.add(destination);
workToSubmitList.remove(i);
}else{
i++;
}
}
}
//wait the submitted works to finish
while(workingVolumeUuids.size() != 0){
try{
Thread.sleep(1000);
}catch (InterruptedException e){
LOG.error(e.getMessage());
}

}
executorService.shutdown();
/*no works in the submit list and no works in working list,
means the end of the copy migration
*/
}
});
}

/**
* Starts migrate the data between the volume pair
*/
private void migrate(VolumePair volumePair, DiskBalancerWorkItem workItem){
executorService.submit(new CopyRunnable(volumePair,workItem));
}

/**
* Copy Runnable
*/
class CopyRunnable implements Runnable{
VolumePair volumePair;
DiskBalancerWorkItem workItem;
public CopyRunnable(VolumePair volumePair, DiskBalancerWorkItem workItem){
this.volumePair = volumePair;
this.workItem = workItem;
}
public void run(){
blockMover.copyBlocks(volumePair, workItem);
workingVolumeUuids.remove(volumePair.getSourceVolUuid());
workingVolumeUuids.remove(volumePair.getDestVolUuid());
}
}

/**
* Insert work items to work map.
* @param volumePair - VolumePair
Expand All @@ -559,6 +619,9 @@ private void createWorkPlan(final VolumePair volumePair, Step step)
if (workMap.containsKey(volumePair)) {
bytesToMove += workMap.get(volumePair).getBytesToCopy();
}
if(!workToSubmitList.contains(volumePair)){
workToSubmitList.add(volumePair);
}

DiskBalancerWorkItem work = new DiskBalancerWorkItem(bytesToMove, 0);

Expand Down Expand Up @@ -893,6 +956,9 @@ private ExtendedBlock getBlockToCopy(FsVolumeSpi.BlockIterator iter,
try {
ExtendedBlock block = iter.nextBlock();

if(block == null){
return null;
}
// A valid block is a finalized block, we iterate until we get
// finalized blocks
if (!this.dataset.isValidBlock(block)) {
Expand Down Expand Up @@ -1049,23 +1115,26 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
"blocks.",
source.getBaseURI(), dest.getBaseURI(),
item.getBytesCopied(), item.getBlocksCopied());
this.setExitFlag();
continue;
//this.setExitFlag();
break;
}

ExtendedBlock block = getNextBlock(poolIters, item);
// we are not able to find any blocks to copy.
if (block == null) {
LOG.error("No source blocks, exiting the copy. Source: {}, " +
"Dest:{}", source.getBaseURI(), dest.getBaseURI());
this.setExitFlag();
continue;
//this.setExitFlag();
break;
}

/*
// check if someone told us exit, treat this as an interruption
// point
// for the thread, since both getNextBlock and moveBlocAcrossVolume
// can take some time.
*/

if (!shouldRun()) {
continue;
}
Expand Down