From 982589e9ba3de81e2af162f7af209016a75ad3a3 Mon Sep 17 00:00:00 2001 From: tom lee Date: Sat, 19 Feb 2022 21:07:24 +0800 Subject: [PATCH 1/2] HDFS-16460. [SPS]: Handle failure retries for moving tasks --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 ++ .../server/common/sps/BlockDispatcher.java | 10 +---- .../sps/ExternalSPSBlockMoveTaskHandler.java | 38 ++++++++++------- .../server/sps/ExternalSPSFaultInjector.java | 41 +++++++++++++++++++ .../src/main/resources/hdfs-default.xml | 8 ++++ .../TestExternalStoragePolicySatisfier.java | 23 +++++++++++ 6 files changed, 100 insertions(+), 24 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFaultInjector.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 6216f6e7a1ded..cf1755cd9f9b8 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -827,6 +827,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.storage.policy.satisfier.retry.max.attempts"; public static final int DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT = 3; + public static final String DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_KEY = + "dfs.storage.policy.satisfier.move.task.retry.max.attempts"; + public static final int DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT = + 3; public static final String DFS_STORAGE_DEFAULT_POLICY = "dfs.storage.default.policy"; public static final HdfsConstants.StoragePolicy diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java index f87fcaef054c0..f7756c74851a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java @@ -101,7 +101,7 @@ public BlockDispatcher(int sockTimeout, int ioFileBuffSize, */ public BlockMovementStatus moveBlock(BlockMovingInfo blkMovingInfo, SaslDataTransferClient saslClient, ExtendedBlock eb, Socket sock, - DataEncryptionKeyFactory km, Token accessToken) { + DataEncryptionKeyFactory km, Token accessToken) throws IOException { LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy " + "storageType, sourceStoragetype:{} and destinStoragetype:{}", blkMovingInfo.getBlock(), blkMovingInfo.getSource(), @@ -149,14 +149,6 @@ public BlockMovementStatus moveBlock(BlockMovingInfo blkMovingInfo, LOG.debug("Pinned block can't be moved, so skipping block:{}", blkMovingInfo.getBlock(), e); return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS; - } catch (IOException e) { - // TODO: handle failure retries - LOG.warn( - "Failed to move block:{} from src:{} to destin:{} to satisfy " - + "storageType:{}", - blkMovingInfo.getBlock(), blkMovingInfo.getSource(), - blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e); - return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE; } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java index 64dec8bbc5c3c..ec3837424cc20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java @@ -80,11 +80,15 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { private Daemon movementTrackerThread; private final SPSService service; private final BlockDispatcher blkDispatcher; + private final int maxRetry; public ExternalSPSBlockMoveTaskHandler(Configuration conf, NameNodeConnector nnc, SPSService spsService) { int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); + maxRetry = conf.getInt( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT); moveExecutor = initializeBlockMoverThreadPool(moverThreads); mCompletionServ = new ExecutorCompletionService<>(moveExecutor); this.nnc = nnc; @@ -151,7 +155,7 @@ public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException { // during block movement assignment logic. In the internal movement, // remaining space is bookkeeping at the DatanodeDescriptor, please refer // IntraSPSNameNodeBlockMoveTaskHandler#submitMoveTask implementation and - // updating via the funcation call - + // updating via the function call - // dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType()); LOG.debug("Received BlockMovingTask {}", blkMovingInfo); BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo); @@ -195,21 +199,25 @@ private BlockMovementStatus moveBlock() { final KeyManager km = nnc.getKeyManager(); Token accessToken; - try { - accessToken = km.getAccessToken(eb, - new StorageType[]{blkMovingInfo.getTargetStorageType()}, - new String[0]); - } catch (IOException e) { - // TODO: handle failure retries - LOG.warn( - "Failed to move block:{} from src:{} to destin:{} to satisfy " - + "storageType:{}", - blkMovingInfo.getBlock(), blkMovingInfo.getSource(), - blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e); - return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE; + int retry = 0; + while (retry <= maxRetry) { + try { + ExternalSPSFaultInjector.getInstance().mockAnException(retry); + accessToken = km.getAccessToken(eb, + new StorageType[]{blkMovingInfo.getTargetStorageType()}, + new String[0]); + return blkDispatcher.moveBlock(blkMovingInfo, saslClient, eb, + new Socket(), km, accessToken); + } catch (IOException e) { + LOG.warn( + "Failed to move block:{} from src:{} to dest:{} to satisfy " + + "storageType:{}, retry: {}", + blkMovingInfo.getBlock(), blkMovingInfo.getSource(), + blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), retry, e); + retry++; + } } - return blkDispatcher.moveBlock(blkMovingInfo, saslClient, eb, - new Socket(), km, accessToken); + return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFaultInjector.java new file mode 100644 index 0000000000000..971a0009e8816 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFaultInjector.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.sps; + +import org.apache.hadoop.classification.VisibleForTesting; + +import java.io.IOException; + +/** + * Used to inject certain faults for testing. + */ +public class ExternalSPSFaultInjector { + @VisibleForTesting + public static ExternalSPSFaultInjector instance = + new ExternalSPSFaultInjector(); + + @VisibleForTesting + public static ExternalSPSFaultInjector getInstance() { + return instance; + } + + @VisibleForTesting + public void mockAnException(int retry) throws IOException { + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index d49800c34849e..6f9b0fde43cba 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -5507,6 +5507,14 @@ + + dfs.storage.policy.satisfier.move.task.retry.max.attempts + 3 + + Max retries for moving task to satisfy the block storage policy. + + + dfs.storage.policy.satisfier.datanode.cache.refresh.interval.ms 300000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java index 361d61d54e258..f8f3938833c6e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -33,6 +33,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KEYTAB_FILE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; @@ -129,6 +130,14 @@ public class TestExternalStoragePolicySatisfier { private static final int DEFAULT_BLOCK_SIZE = 1024; private static final Logger LOG = LoggerFactory.getLogger(TestExternalStoragePolicySatisfier.class); + private final ExternalSPSFaultInjector injector = new ExternalSPSFaultInjector() { + @Override + public void mockAnException(int retry) throws IOException { + if (retry < DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT) { + throw new IOException("IO exception"); + } + } + }; @Before public void setUp() { @@ -469,6 +478,20 @@ public void testInfiniteStartWhenAnotherSPSRunning() } } + @Test(timeout = 300000) + public void testWhenStoragePolicySetToCOLDWithException() + throws Exception { + + try { + createCluster(); + // Mock an IOException 3 times, and moving tasks should succeed finally. + ExternalSPSFaultInjector.instance = injector; + doTestWhenStoragePolicySetToCOLD(); + } finally { + shutdownCluster(); + } + } + private void doTestWhenStoragePolicySetToCOLD() throws Exception { // Change policy to COLD dfs.setStoragePolicy(new Path(FILE), COLD); From 65573245fcce49a42b41a62adb1d8f917034bc35 Mon Sep 17 00:00:00 2001 From: tom lee Date: Sat, 2 Apr 2022 13:15:00 +0800 Subject: [PATCH 2/2] fix checkstyles --- .../hadoop/hdfs/server/sps/ExternalSPSFaultInjector.java | 7 ++++++- .../server/sps/TestExternalStoragePolicySatisfier.java | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFaultInjector.java index 971a0009e8816..5ddf1ee3c0f6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFaultInjector.java @@ -27,7 +27,7 @@ */ public class ExternalSPSFaultInjector { @VisibleForTesting - public static ExternalSPSFaultInjector instance = + private static ExternalSPSFaultInjector instance = new ExternalSPSFaultInjector(); @VisibleForTesting @@ -35,6 +35,11 @@ public static ExternalSPSFaultInjector getInstance() { return instance; } + @VisibleForTesting + public static void setInstance(ExternalSPSFaultInjector instance) { + ExternalSPSFaultInjector.instance = instance; + } + @VisibleForTesting public void mockAnException(int retry) throws IOException { } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java index f8f3938833c6e..1308add077ced 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -485,7 +485,7 @@ public void testWhenStoragePolicySetToCOLDWithException() try { createCluster(); // Mock an IOException 3 times, and moving tasks should succeed finally. - ExternalSPSFaultInjector.instance = injector; + ExternalSPSFaultInjector.setInstance(injector); doTestWhenStoragePolicySetToCOLD(); } finally { shutdownCluster();