Skip to content

Commit 5412fbf

Browse files
authored
HDFS-16460. [SPS]: Handle failure retries for moving tasks (#4001)
1 parent 807a428 commit 5412fbf

File tree

6 files changed

+105
-24
lines changed

6 files changed

+105
-24
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
827827
"dfs.storage.policy.satisfier.retry.max.attempts";
828828
public static final int DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT =
829829
3;
830+
public static final String DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_KEY =
831+
"dfs.storage.policy.satisfier.move.task.retry.max.attempts";
832+
public static final int DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT =
833+
3;
830834
public static final String DFS_STORAGE_DEFAULT_POLICY =
831835
"dfs.storage.default.policy";
832836
public static final HdfsConstants.StoragePolicy

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public BlockDispatcher(int sockTimeout, int ioFileBuffSize,
101101
*/
102102
public BlockMovementStatus moveBlock(BlockMovingInfo blkMovingInfo,
103103
SaslDataTransferClient saslClient, ExtendedBlock eb, Socket sock,
104-
DataEncryptionKeyFactory km, Token<BlockTokenIdentifier> accessToken) {
104+
DataEncryptionKeyFactory km, Token<BlockTokenIdentifier> accessToken) throws IOException {
105105
LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
106106
+ "storageType, sourceStoragetype:{} and destinStoragetype:{}",
107107
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
@@ -149,14 +149,6 @@ public BlockMovementStatus moveBlock(BlockMovingInfo blkMovingInfo,
149149
LOG.debug("Pinned block can't be moved, so skipping block:{}",
150150
blkMovingInfo.getBlock(), e);
151151
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
152-
} catch (IOException e) {
153-
// TODO: handle failure retries
154-
LOG.warn(
155-
"Failed to move block:{} from src:{} to destin:{} to satisfy "
156-
+ "storageType:{}",
157-
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
158-
blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
159-
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
160152
} finally {
161153
IOUtils.closeStream(out);
162154
IOUtils.closeStream(in);

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,15 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
8080
private Daemon movementTrackerThread;
8181
private final SPSService service;
8282
private final BlockDispatcher blkDispatcher;
83+
private final int maxRetry;
8384

8485
public ExternalSPSBlockMoveTaskHandler(Configuration conf,
8586
NameNodeConnector nnc, SPSService spsService) {
8687
int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
8788
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
89+
maxRetry = conf.getInt(
90+
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_KEY,
91+
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT);
8892
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
8993
mCompletionServ = new ExecutorCompletionService<>(moveExecutor);
9094
this.nnc = nnc;
@@ -151,7 +155,7 @@ public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
151155
// during block movement assignment logic. In the internal movement,
152156
// remaining space is bookkeeping at the DatanodeDescriptor, please refer
153157
// IntraSPSNameNodeBlockMoveTaskHandler#submitMoveTask implementation and
154-
// updating via the funcation call -
158+
// updating via the function call -
155159
// dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
156160
LOG.debug("Received BlockMovingTask {}", blkMovingInfo);
157161
BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo);
@@ -195,21 +199,25 @@ private BlockMovementStatus moveBlock() {
195199

196200
final KeyManager km = nnc.getKeyManager();
197201
Token<BlockTokenIdentifier> accessToken;
198-
try {
199-
accessToken = km.getAccessToken(eb,
200-
new StorageType[]{blkMovingInfo.getTargetStorageType()},
201-
new String[0]);
202-
} catch (IOException e) {
203-
// TODO: handle failure retries
204-
LOG.warn(
205-
"Failed to move block:{} from src:{} to destin:{} to satisfy "
206-
+ "storageType:{}",
207-
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
208-
blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
209-
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
202+
int retry = 0;
203+
while (retry <= maxRetry) {
204+
try {
205+
ExternalSPSFaultInjector.getInstance().mockAnException(retry);
206+
accessToken = km.getAccessToken(eb,
207+
new StorageType[]{blkMovingInfo.getTargetStorageType()},
208+
new String[0]);
209+
return blkDispatcher.moveBlock(blkMovingInfo, saslClient, eb,
210+
new Socket(), km, accessToken);
211+
} catch (IOException e) {
212+
LOG.warn(
213+
"Failed to move block:{} from src:{} to dest:{} to satisfy "
214+
+ "storageType:{}, retry: {}",
215+
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
216+
blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), retry, e);
217+
retry++;
218+
}
210219
}
211-
return blkDispatcher.moveBlock(blkMovingInfo, saslClient, eb,
212-
new Socket(), km, accessToken);
220+
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
213221
}
214222
}
215223

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.sps;
20+
21+
import org.apache.hadoop.classification.VisibleForTesting;
22+
23+
import java.io.IOException;
24+
25+
/**
26+
* Used to inject certain faults for testing.
27+
*/
28+
public class ExternalSPSFaultInjector {
29+
@VisibleForTesting
30+
private static ExternalSPSFaultInjector instance =
31+
new ExternalSPSFaultInjector();
32+
33+
@VisibleForTesting
34+
public static ExternalSPSFaultInjector getInstance() {
35+
return instance;
36+
}
37+
38+
@VisibleForTesting
39+
public static void setInstance(ExternalSPSFaultInjector instance) {
40+
ExternalSPSFaultInjector.instance = instance;
41+
}
42+
43+
@VisibleForTesting
44+
public void mockAnException(int retry) throws IOException {
45+
}
46+
}

hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5507,6 +5507,14 @@
55075507
</description>
55085508
</property>
55095509

5510+
<property>
5511+
<name>dfs.storage.policy.satisfier.move.task.retry.max.attempts</name>
5512+
<value>3</value>
5513+
<description>
5514+
Max retries for moving task to satisfy the block storage policy.
5515+
</description>
5516+
</property>
5517+
55105518
<property>
55115519
<name>dfs.storage.policy.satisfier.datanode.cache.refresh.interval.ms</name>
55125520
<value>300000</value>

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KEYTAB_FILE_KEY;
3434
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY;
3535
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
36+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT;
3637
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
3738
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
3839
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
@@ -130,6 +131,14 @@ public class TestExternalStoragePolicySatisfier {
130131
private static final int DEFAULT_BLOCK_SIZE = 1024;
131132
private static final Logger LOG =
132133
LoggerFactory.getLogger(TestExternalStoragePolicySatisfier.class);
134+
private final ExternalSPSFaultInjector injector = new ExternalSPSFaultInjector() {
135+
@Override
136+
public void mockAnException(int retry) throws IOException {
137+
if (retry < DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT) {
138+
throw new IOException("IO exception");
139+
}
140+
}
141+
};
133142

134143
@Before
135144
public void setUp() {
@@ -480,6 +489,20 @@ public void testInfiniteStartWhenAnotherSPSRunning()
480489
}
481490
}
482491

492+
@Test(timeout = 300000)
493+
public void testWhenStoragePolicySetToCOLDWithException()
494+
throws Exception {
495+
496+
try {
497+
createCluster();
498+
// Mock an IOException 3 times, and moving tasks should succeed finally.
499+
ExternalSPSFaultInjector.setInstance(injector);
500+
doTestWhenStoragePolicySetToCOLD();
501+
} finally {
502+
shutdownCluster();
503+
}
504+
}
505+
483506
private void doTestWhenStoragePolicySetToCOLD() throws Exception {
484507
// Change policy to COLD
485508
dfs.setStoragePolicy(new Path(FILE), COLD);

0 commit comments

Comments
 (0)