Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.storage.policy.satisfier.retry.max.attempts";
public static final int DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT =
3;
public static final String DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_KEY =
"dfs.storage.policy.satisfier.move.task.retry.max.attempts";
public static final int DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT =
3;
public static final String DFS_STORAGE_DEFAULT_POLICY =
"dfs.storage.default.policy";
public static final HdfsConstants.StoragePolicy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public BlockDispatcher(int sockTimeout, int ioFileBuffSize,
*/
public BlockMovementStatus moveBlock(BlockMovingInfo blkMovingInfo,
SaslDataTransferClient saslClient, ExtendedBlock eb, Socket sock,
DataEncryptionKeyFactory km, Token<BlockTokenIdentifier> accessToken) {
DataEncryptionKeyFactory km, Token<BlockTokenIdentifier> accessToken) throws IOException {
LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
+ "storageType, sourceStoragetype:{} and destinStoragetype:{}",
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
Expand Down Expand Up @@ -149,14 +149,6 @@ public BlockMovementStatus moveBlock(BlockMovingInfo blkMovingInfo,
LOG.debug("Pinned block can't be moved, so skipping block:{}",
blkMovingInfo.getBlock(), e);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
} catch (IOException e) {
// TODO: handle failure retries
LOG.warn(
"Failed to move block:{} from src:{} to destin:{} to satisfy "
+ "storageType:{}",
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
private Daemon movementTrackerThread;
private final SPSService service;
private final BlockDispatcher blkDispatcher;
private final int maxRetry;

public ExternalSPSBlockMoveTaskHandler(Configuration conf,
NameNodeConnector nnc, SPSService spsService) {
int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
maxRetry = conf.getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT);
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
mCompletionServ = new ExecutorCompletionService<>(moveExecutor);
this.nnc = nnc;
Expand Down Expand Up @@ -151,7 +155,7 @@ public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
// during block movement assignment logic. In the internal movement,
// remaining space is bookkeeping at the DatanodeDescriptor, please refer
// IntraSPSNameNodeBlockMoveTaskHandler#submitMoveTask implementation and
// updating via the funcation call -
// updating via the function call -
// dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
LOG.debug("Received BlockMovingTask {}", blkMovingInfo);
BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo);
Expand Down Expand Up @@ -195,21 +199,25 @@ private BlockMovementStatus moveBlock() {

final KeyManager km = nnc.getKeyManager();
Token<BlockTokenIdentifier> accessToken;
try {
accessToken = km.getAccessToken(eb,
new StorageType[]{blkMovingInfo.getTargetStorageType()},
new String[0]);
} catch (IOException e) {
// TODO: handle failure retries
LOG.warn(
"Failed to move block:{} from src:{} to destin:{} to satisfy "
+ "storageType:{}",
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
int retry = 0;
while (retry <= maxRetry) {
try {
ExternalSPSFaultInjector.getInstance().mockAnException(retry);
accessToken = km.getAccessToken(eb,
new StorageType[]{blkMovingInfo.getTargetStorageType()},
new String[0]);
return blkDispatcher.moveBlock(blkMovingInfo, saslClient, eb,
new Socket(), km, accessToken);
} catch (IOException e) {
LOG.warn(
"Failed to move block:{} from src:{} to dest:{} to satisfy "
+ "storageType:{}, retry: {}",
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), retry, e);
retry++;
}
}
return blkDispatcher.moveBlock(blkMovingInfo, saslClient, eb,
new Socket(), km, accessToken);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.sps;

import org.apache.hadoop.classification.VisibleForTesting;

import java.io.IOException;

/**
* Used to inject certain faults for testing.
*/
public class ExternalSPSFaultInjector {
@VisibleForTesting
private static ExternalSPSFaultInjector instance =
new ExternalSPSFaultInjector();

@VisibleForTesting
public static ExternalSPSFaultInjector getInstance() {
return instance;
}

@VisibleForTesting
public static void setInstance(ExternalSPSFaultInjector instance) {
ExternalSPSFaultInjector.instance = instance;
}

@VisibleForTesting
public void mockAnException(int retry) throws IOException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5507,6 +5507,14 @@
</description>
</property>

<property>
<name>dfs.storage.policy.satisfier.move.task.retry.max.attempts</name>
<value>3</value>
<description>
Max retries for moving task to satisfy the block storage policy.
</description>
</property>

<property>
<name>dfs.storage.policy.satisfier.datanode.cache.refresh.interval.ms</name>
<value>300000</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
Expand Down Expand Up @@ -129,6 +130,14 @@ public class TestExternalStoragePolicySatisfier {
private static final int DEFAULT_BLOCK_SIZE = 1024;
private static final Logger LOG =
LoggerFactory.getLogger(TestExternalStoragePolicySatisfier.class);
private final ExternalSPSFaultInjector injector = new ExternalSPSFaultInjector() {
@Override
public void mockAnException(int retry) throws IOException {
if (retry < DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT) {
throw new IOException("IO exception");
}
}
};

@Before
public void setUp() {
Expand Down Expand Up @@ -469,6 +478,20 @@ public void testInfiniteStartWhenAnotherSPSRunning()
}
}

@Test(timeout = 300000)
public void testWhenStoragePolicySetToCOLDWithException()
throws Exception {

try {
createCluster();
// Mock an IOException 3 times, and moving tasks should succeed finally.
ExternalSPSFaultInjector.setInstance(injector);
doTestWhenStoragePolicySetToCOLD();
} finally {
shutdownCluster();
}
}

private void doTestWhenStoragePolicySetToCOLD() throws Exception {
// Change policy to COLD
dfs.setStoragePolicy(new Path(FILE), COLD);
Expand Down