Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingReconstructionBlocks.PendingBlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.ExcessRedundancyMap.ExcessBlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.ReconstructionSkipReason.SourceUnavailableDetail;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
Expand Down Expand Up @@ -2180,6 +2181,8 @@ int computeReconstructionWorkForBlocks(
for (BlockReconstructionWork rw : reconWork) {
final DatanodeStorageInfo[] targets = rw.getTargets();
if (targets == null || targets.length == 0) {
ReconstructionSkipReason.genReasonWithDetail(rw.getBlock(), null,
ReconstructionSkipReason.TARGET_UNAVAILABLE);
rw.resetTargets();
continue;
}
Expand All @@ -2188,6 +2191,10 @@ int computeReconstructionWorkForBlocks(
if (validateReconstructionWork(rw)) {
scheduledWork++;
}
else{
ReconstructionSkipReason.genReasonWithDetail(rw.getBlock(), null,
ReconstructionSkipReason.VALIDATION_FAILED);
}
}
}
} finally {
Expand Down Expand Up @@ -2570,7 +2577,6 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
liveBlockIndices.clear();
final boolean isStriped = block.isStriped();
DatanodeDescriptor decommissionedSrc = null;

BitSet liveBitSet = null;
BitSet decommissioningBitSet = null;
if (isStriped) {
Expand All @@ -2595,13 +2601,19 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
// do not select the replica if it is corrupt or excess
if (state == StoredReplicaState.CORRUPT ||
state == StoredReplicaState.EXCESS) {
ReconstructionSkipReason.genReasonWithDetail(block, storage,
ReconstructionSkipReason.SOURCE_UNAVAILABLE,
SourceUnavailableDetail.CORRUPT_OR_EXCESS);
continue;
}

// Never use maintenance node not suitable for read
// or unknown state replicas.
if (state == null
|| state == StoredReplicaState.MAINTENANCE_NOT_FOR_READ) {
ReconstructionSkipReason.genReasonWithDetail(block, storage,
ReconstructionSkipReason.SOURCE_UNAVAILABLE,
SourceUnavailableDetail.MAINTENANCE_NOT_FOR_READ);
continue;
}

Expand All @@ -2613,6 +2625,9 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
ThreadLocalRandom.current().nextBoolean()) {
decommissionedSrc = node;
}
ReconstructionSkipReason.genReasonWithDetail(block, storage,
ReconstructionSkipReason.SOURCE_UNAVAILABLE,
SourceUnavailableDetail.DECOMMISSIONED);
continue;
}

Expand All @@ -2637,6 +2652,9 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
//HDFS-16566 ExcludeReconstructed won't be reconstructed.
excludeReconstructed.add(blockIndex);
}
ReconstructionSkipReason.genReasonWithDetail(block, storage,
ReconstructionSkipReason.SOURCE_UNAVAILABLE,
SourceUnavailableDetail.REPLICATION_SOFT_LIMIT);
continue; // already reached replication limit
}

Expand All @@ -2648,6 +2666,9 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
//HDFS-16566 ExcludeReconstructed won't be reconstructed.
excludeReconstructed.add(blockIndex);
}
ReconstructionSkipReason.genReasonWithDetail(block, storage,
ReconstructionSkipReason.SOURCE_UNAVAILABLE,
SourceUnavailableDetail.REPLICATION_HARD_LIMIT);
continue;
}

Expand Down Expand Up @@ -5406,9 +5427,13 @@ int computeDatanodeWork() {
* this.blocksReplWorkMultiplier;
final int nodesToProcess = (int) Math.ceil(numlive
* this.blocksInvalidateWorkPct);

if(LOG.isDebugEnabled()){
ReconstructionSkipReason.start();
}
int workFound = this.computeBlockReconstructionWork(blocksToProcess);

if(LOG.isDebugEnabled()){
ReconstructionSkipReason.summary();
}
// Update counters
namesystem.writeLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void startDecommission(DatanodeDescriptor node) {
monitor.startTrackingNode(node);
}
} else {
LOG.trace("startDecommission: Node {} in {}, nothing to do.",
LOG.info("startDecommission: Node {} in {}, nothing to do.",
node, node.getAdminState());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;


import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

/**
* When scheduling ReconstructionWork for a low-redundancy block, the scheduling may fail for overall 3 high-level reasons:
* 1. No source node is available
* 2. No Target node is available
* 3. ReconstructionWork is built but validation failed
* I put above 3 cases as ReconstructionSkipReason.
* - For the detailed reason of `No source node is available`, I put it into SourceUnavailableDetail enum
* - For the detailed reason of `No Target node is available`, we already has NodeNotChosenReason in BlockPlacementPolicyDefault
*/
public enum ReconstructionSkipReason {
SOURCE_UNAVAILABLE("source node or storage unavailable"),
TARGET_UNAVAILABLE("cannot find available target host"),
VALIDATION_FAILED("validation for reconstruction work failed");

enum SourceUnavailableDetail {
CORRUPT_OR_EXCESS("stored replica state is corrupt or excess"),
MAINTENANCE_NOT_FOR_READ("stored replica is maintenance not for read"),
DECOMMISSIONED("replica is already decommissioned"),
REPLICATION_SOFT_LIMIT("replica already reached replication soft limit"),
REPLICATION_HARD_LIMIT("replica already reached replication hard limit");
private final String text;

SourceUnavailableDetail(final String logText) {
text = logText;
}

@Override
public String toString() {
return text;
}
}

public static final Logger LOG = LoggerFactory.getLogger(
BlockManager.class);

private static final ThreadLocal<HashMap<BlockInfo, StringBuilder>>
blockNotChosenReasonMap = ThreadLocal
.withInitial(() -> new HashMap<BlockInfo, StringBuilder>());

private final String text;

ReconstructionSkipReason(final String logText) {
text = logText;
}

@Override
public String toString() {
return text;
}

public static void start() {
blockNotChosenReasonMap.get().clear();
}

public static void genReasonWithDetail(BlockInfo block, DatanodeStorageInfo storage,
ReconstructionSkipReason reason) {
if (LOG.isDebugEnabled()) {
genReasonImpl(block, storage, reason, null);
}
}

public static void genReasonWithDetail(BlockInfo block, DatanodeStorageInfo storage,
ReconstructionSkipReason reason, SourceUnavailableDetail reasonDetails) {
if (LOG.isDebugEnabled()) {
genReasonImpl(block, storage, reason, reasonDetails);
}
}

@VisibleForTesting
static void genReasonImpl(BlockInfo block, DatanodeStorageInfo storage,
ReconstructionSkipReason reason, SourceUnavailableDetail reasonDetails) {
// build the error message for later use.
HashMap<BlockInfo, StringBuilder> blockReason = blockNotChosenReasonMap.get();
StringBuilder reasonForBlock = null;
blockReason.putIfAbsent(block, new StringBuilder()
.append("Block ")
.append(block)
.append(" is not scheduled for reconstruction since: ["));
reasonForBlock = blockReason.get(block);
reasonForBlock.append("\n").append(reason);
if (storage != null)
reasonForBlock.append(" on node ").append(storage);
if (reasonDetails != null) {
reasonForBlock.append(". Detail : [").append(reasonDetails).append("]");
}
}

@VisibleForTesting
static String summary() {
StringBuilder finalReasonForAllBlocks = new StringBuilder();
for (Map.Entry<BlockInfo, StringBuilder> blockReason : blockNotChosenReasonMap.get().entrySet()) {
blockReason.getValue().append("\n]");
finalReasonForAllBlocks.append(blockReason.getValue());
}
blockNotChosenReasonMap.get().clear();
return finalReasonForAllBlocks.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,12 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.hadoop.hdfs.server.blockmanagement.ReconstructionSkipReason.SourceUnavailableDetail.CORRUPT_OR_EXCESS;
import static org.apache.hadoop.hdfs.server.blockmanagement.ReconstructionSkipReason.SourceUnavailableDetail.DECOMMISSIONED;
import static org.apache.hadoop.hdfs.server.blockmanagement.ReconstructionSkipReason.SOURCE_UNAVAILABLE;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.apache.hadoop.test.MetricsAsserts.*;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -2329,4 +2327,49 @@ public void delayDeleteReplica() {
DataNodeFaultInjector.set(oldInjector);
}
}

/**
* Test the reason output work as expected even in multi-thread environment.
* @throws InterruptedException
*/
@Test(timeout = 360000)
public void testReconstructionSkipReason() throws InterruptedException {
final AtomicBoolean failure = new AtomicBoolean(false);
int threadNum = 5;
Thread[] threads = new Thread[threadNum];
for(int i = 0; i<threadNum;i++){
final int index = i;
threads[i] = new Thread(() -> {
try{
String storageID0 = "storageID_0_"+index;
String storageID1 = "storageID_1_"+index;
DatanodeStorageInfo sourceStorage0 = BlockManagerTestUtil
.newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
new DatanodeStorage(storageID0));
DatanodeStorageInfo sourceStorage1 = BlockManagerTestUtil
.newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
new DatanodeStorage(storageID1));
BlockInfo newBlk = new BlockInfoContiguous(new Block(index), (short) index);
ReconstructionSkipReason.start();
ReconstructionSkipReason.genReasonImpl(newBlk,sourceStorage0,SOURCE_UNAVAILABLE,CORRUPT_OR_EXCESS);
ReconstructionSkipReason.genReasonImpl(newBlk,sourceStorage1,SOURCE_UNAVAILABLE,DECOMMISSIONED);
String summary = ReconstructionSkipReason.summary();
LOG.info("Reason for " + newBlk + " in storage " + storageID0 + " storage " + storageID1 + " is : " + summary);
assertEquals("after summary, the reason should be cleared", "", ReconstructionSkipReason.summary());
assertTrue("reason content should be correct", summary.contains(SOURCE_UNAVAILABLE.toString()));
assertTrue("reason should contain block ID " + newBlk, summary.contains(newBlk.toString()));
assertTrue("reason should contain storage " + sourceStorage0, summary.contains(sourceStorage0.toString()));
assertTrue("reason should contain storage " + sourceStorage1, summary.contains(sourceStorage1.toString()));
}catch (Throwable e){
e.printStackTrace();
failure.set(true);
}
});
}
for(int i = 0;i<threadNum;i++){
threads[i].start();
threads[i].join(0);
}
assertFalse("Test case testReconstructionSkipReason has error. Check the log for the details.", failure.get());
}
}