Skip to content

Commit 9a1d8cf

Browse files
LeonGao91jojochuang
authored andcommitted
HDFS-14678. Allow triggerBlockReport to a specific namenode. (#1252). Contributed by Leon Gao.
1 parent 9b8359b commit 9a1d8cf

File tree

9 files changed

+135
-39
lines changed

9 files changed

+135
-39
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,33 @@
2020
import org.apache.hadoop.classification.InterfaceAudience;
2121
import org.apache.hadoop.classification.InterfaceStability;
2222

23+
import java.net.InetSocketAddress;
24+
2325
/**
2426
* Options that can be specified when manually triggering a block report.
2527
*/
2628
@InterfaceAudience.Public
2729
@InterfaceStability.Evolving
2830
public final class BlockReportOptions {
2931
private final boolean incremental;
32+
private final InetSocketAddress namenodeAddr;
3033

31-
private BlockReportOptions(boolean incremental) {
34+
private BlockReportOptions(boolean incremental, InetSocketAddress namenodeAddr) {
3235
this.incremental = incremental;
36+
this.namenodeAddr = namenodeAddr;
3337
}
3438

3539
public boolean isIncremental() {
3640
return incremental;
3741
}
3842

43+
public InetSocketAddress getNamenodeAddr() {
44+
return namenodeAddr;
45+
}
46+
3947
public static class Factory {
4048
private boolean incremental = false;
49+
private InetSocketAddress namenodeAddr;
4150

4251
public Factory() {
4352
}
@@ -47,13 +56,18 @@ public Factory setIncremental(boolean incremental) {
4756
return this;
4857
}
4958

59+
public Factory setNamenodeAddr(InetSocketAddress namenodeAddr) {
60+
this.namenodeAddr = namenodeAddr;
61+
return this;
62+
}
63+
5064
public BlockReportOptions build() {
51-
return new BlockReportOptions(incremental);
65+
return new BlockReportOptions(incremental, namenodeAddr);
5266
}
5367
}
5468

5569
@Override
5670
public String toString() {
57-
return "BlockReportOptions{incremental=" + incremental + "}";
71+
return "BlockReportOptions{incremental=" + incremental + ", namenodeAddr=" + namenodeAddr + "}";
5872
}
5973
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -324,10 +324,12 @@ public List<String> listReconfigurableProperties() throws IOException {
324324
public void triggerBlockReport(BlockReportOptions options)
325325
throws IOException {
326326
try {
327-
rpcProxy.triggerBlockReport(NULL_CONTROLLER,
328-
TriggerBlockReportRequestProto.newBuilder().
329-
setIncremental(options.isIncremental()).
330-
build());
327+
TriggerBlockReportRequestProto.Builder builder = TriggerBlockReportRequestProto.newBuilder().
328+
setIncremental(options.isIncremental());
329+
if (options.getNamenodeAddr() != null) {
330+
builder.setNnAddress(NetUtils.getHostPortString(options.getNamenodeAddr()));
331+
}
332+
rpcProxy.triggerBlockReport(NULL_CONTROLLER, builder.build());
331333
} catch (ServiceException e) {
332334
throw ProtobufHelper.getRemoteException(e);
333335
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ message GetVolumeReportResponseProto {
140140

141141
message TriggerBlockReportRequestProto {
142142
required bool incremental = 1;
143+
optional string nnAddress = 2;
143144
}
144145

145146
message TriggerBlockReportResponseProto {

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import com.google.protobuf.RpcController;
6565
import com.google.protobuf.ServiceException;
6666
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
67+
import org.apache.hadoop.net.NetUtils;
6768

6869
/**
6970
* Implementation for protobuf service that forwards requests
@@ -225,8 +226,12 @@ public TriggerBlockReportResponseProto triggerBlockReport(
225226
RpcController unused, TriggerBlockReportRequestProto request)
226227
throws ServiceException {
227228
try {
228-
impl.triggerBlockReport(new BlockReportOptions.Factory().
229-
setIncremental(request.getIncremental()).build());
229+
BlockReportOptions.Factory factory = new BlockReportOptions.Factory().
230+
setIncremental(request.getIncremental());
231+
if (request.hasNnAddress()) {
232+
factory.setNamenodeAddr(NetUtils.createSocketAddr(request.getNnAddress()));
233+
}
234+
impl.triggerBlockReport(factory.build());
230235
} catch (IOException e) {
231236
throw new ServiceException(e);
232237
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3316,10 +3316,14 @@ public List<String> listReconfigurableProperties()
33163316
public void triggerBlockReport(BlockReportOptions options)
33173317
throws IOException {
33183318
checkSuperuserPrivilege();
3319+
InetSocketAddress namenodeAddr = options.getNamenodeAddr();
3320+
boolean shouldTriggerToAllNn = (namenodeAddr == null);
33193321
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
33203322
if (bpos != null) {
33213323
for (BPServiceActor actor : bpos.getBPServiceActors()) {
3322-
actor.triggerBlockReport(options);
3324+
if (shouldTriggerToAllNn || namenodeAddr.equals(actor.nnAddr)) {
3325+
actor.triggerBlockReport(options);
3326+
}
33233327
}
33243328
}
33253329
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ static int run(DistributedFileSystem dfs, String[] argv, int idx) throws IOExcep
466466
"\t[-evictWriters <datanode_host:ipc_port>]\n" +
467467
"\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" +
468468
"\t[-metasave filename]\n" +
469-
"\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]\n" +
469+
"\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port> [-namenode <namenode_host:ipc_port>]]\n" +
470470
"\t[-listOpenFiles [-blockingDecommission] [-path <path>]]\n" +
471471
"\t[-help [cmd]]\n";
472472

@@ -727,6 +727,13 @@ public int triggerBlockReport(String[] argv) throws IOException {
727727
for (int j = 1; j < argv.length; j++) {
728728
args.add(argv[j]);
729729
}
730+
// Block report to a specific namenode
731+
InetSocketAddress namenodeAddr = null;
732+
String nnHostPort = StringUtils.popOptionWithArgument("-namenode", args);
733+
if (nnHostPort != null) {
734+
namenodeAddr = NetUtils.createSocketAddr(nnHostPort);
735+
}
736+
730737
boolean incremental = StringUtils.popOption("-incremental", args);
731738
String hostPort = StringUtils.popFirstNonOption(args);
732739
if (hostPort == null) {
@@ -742,6 +749,7 @@ public int triggerBlockReport(String[] argv) throws IOException {
742749
try {
743750
dnProxy.triggerBlockReport(
744751
new BlockReportOptions.Factory().
752+
setNamenodeAddr(namenodeAddr).
745753
setIncremental(incremental).
746754
build());
747755
} catch (IOException e) {
@@ -750,7 +758,9 @@ public int triggerBlockReport(String[] argv) throws IOException {
750758
}
751759
System.out.println("Triggering " +
752760
(incremental ? "an incremental " : "a full ") +
753-
"block report on " + hostPort + ".");
761+
"block report on " + hostPort +
762+
(namenodeAddr == null ? "" : " to namenode " + nnHostPort) +
763+
".");
754764
return 0;
755765
}
756766

@@ -1266,7 +1276,7 @@ private void printHelp(String cmd) {
12661276
+ "\tbe used for checking if a datanode is alive.\n";
12671277

12681278
String triggerBlockReport =
1269-
"-triggerBlockReport [-incremental] <datanode_host:ipc_port>\n"
1279+
"-triggerBlockReport [-incremental] <datanode_host:ipc_port> [-namenode <namenode_host:ipc_port>]\n"
12701280
+ "\tTrigger a block report for the datanode.\n"
12711281
+ "\tIf 'incremental' is specified, it will be an incremental\n"
12721282
+ "\tblock report; otherwise, it will be a full block report.\n";
@@ -2176,7 +2186,7 @@ private static void printUsage(String cmd) {
21762186
+ " [-getDatanodeInfo <datanode_host:ipc_port>]");
21772187
} else if ("-triggerBlockReport".equals(cmd)) {
21782188
System.err.println("Usage: hdfs dfsadmin"
2179-
+ " [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]");
2189+
+ " [-triggerBlockReport [-incremental] <datanode_host:ipc_port> [-namenode <namenode_host:ipc_port>]]");
21802190
} else if ("-listOpenFiles".equals(cmd)) {
21812191
System.err.println("Usage: hdfs dfsadmin"
21822192
+ " [-listOpenFiles [-blockingDecommission] [-path <path>]]");
@@ -2334,7 +2344,7 @@ public int run(String[] argv) {
23342344
return exitCode;
23352345
}
23362346
} else if ("-triggerBlockReport".equals(cmd)) {
2337-
if ((argv.length != 2) && (argv.length != 3)) {
2347+
if ((argv.length < 2) || (argv.length > 5)) {
23382348
printUsage(cmd);
23392349
return exitCode;
23402350
}

hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ Usage:
377377
hdfs dfsadmin [-evictWriters <datanode_host:ipc_port>]
378378
hdfs dfsadmin [-getDatanodeInfo <datanode_host:ipc_port>]
379379
hdfs dfsadmin [-metasave filename]
380-
hdfs dfsadmin [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]
380+
hdfs dfsadmin [-triggerBlockReport [-incremental] <datanode_host:ipc_port> [-namenode] <namenode_host:ipc_port>]
381381
hdfs dfsadmin [-listOpenFiles [-blockingDecommission] [-path <path>]]
382382
hdfs dfsadmin [-help [cmd]]
383383

@@ -415,7 +415,7 @@ Usage:
415415
| `-evictWriters` \<datanode\_host:ipc\_port\> | Make the datanode evict all clients that are writing a block. This is useful if decommissioning is hung due to slow writers. |
416416
| `-getDatanodeInfo` \<datanode\_host:ipc\_port\> | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. |
417417
| `-metasave` filename | Save Namenode's primary data structures to *filename* in the directory specified by hadoop.log.dir property. *filename* is overwritten if it exists. *filename* will contain one line for each of the following<br/>1. Datanodes heart beating with Namenode<br/>2. Blocks waiting to be replicated<br/>3. Blocks currently being replicated<br/>4. Blocks waiting to be deleted |
418-
| `-triggerBlockReport` `[-incremental]` \<datanode\_host:ipc\_port\> | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. |
418+
| `-triggerBlockReport` `[-incremental]` \<datanode\_host:ipc\_port\> `[-namenode]` \<namenode\_host:ipc\_port\> | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. If '-namenode \<host\>:\<port\>' is given, it only sends block report to a specified namenode. |
419419
| `-listOpenFiles` `[-blockingDecommission]` `[-path <path>]` | List all open files currently managed by the NameNode along with client name and client machine accessing them. Open files list will be filtered by given type and path. Add -blockingDecommission option if you only want to list open files that are blocking the DataNode decommissioning. |
420420
| `-help` [cmd] | Displays help for the given command or all commands if none is specified. |
421421

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java

Lines changed: 57 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hadoop.hdfs.DFSTestUtil;
3030
import org.apache.hadoop.hdfs.HdfsConfiguration;
3131
import org.apache.hadoop.hdfs.MiniDFSCluster;
32+
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
3233
import org.apache.hadoop.hdfs.client.BlockReportOptions;
3334
import org.apache.hadoop.hdfs.protocol.Block;
3435
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
@@ -42,11 +43,13 @@
4243
import org.junit.Test;
4344
import org.mockito.Mockito;
4445

46+
import java.net.InetSocketAddress;
47+
4548
/**
4649
* Test manually requesting that the DataNode send a block report.
4750
*/
4851
public final class TestTriggerBlockReport {
49-
private void testTriggerBlockReport(boolean incremental) throws Exception {
52+
private void testTriggerBlockReport(boolean incremental, boolean withSpecificNN) throws Exception {
5053
Configuration conf = new HdfsConfiguration();
5154

5255
// Set a really long value for dfs.blockreport.intervalMsec and
@@ -57,16 +60,24 @@ private void testTriggerBlockReport(boolean incremental) throws Exception {
5760
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L);
5861

5962
final MiniDFSCluster cluster =
60-
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
63+
new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(1).build();
6164
cluster.waitActive();
62-
FileSystem fs = cluster.getFileSystem();
63-
DatanodeProtocolClientSideTranslatorPB spy =
65+
cluster.transitionToActive(0);
66+
FileSystem fs = cluster.getFileSystem(0);
67+
DatanodeProtocolClientSideTranslatorPB spyOnNn0 =
68+
InternalDataNodeTestUtils.spyOnBposToNN(
69+
cluster.getDataNodes().get(0), cluster.getNameNode(0));
70+
DatanodeProtocolClientSideTranslatorPB spyOnNn1 =
6471
InternalDataNodeTestUtils.spyOnBposToNN(
65-
cluster.getDataNodes().get(0), cluster.getNameNode());
72+
cluster.getDataNodes().get(0), cluster.getNameNode(1));
6673
DFSTestUtil.createFile(fs, new Path("/abc"), 16, (short) 1, 1L);
6774

68-
// We should get 1 incremental block report.
69-
Mockito.verify(spy, timeout(60000).times(1)).blockReceivedAndDeleted(
75+
// We should get 1 incremental block report on both NNs.
76+
Mockito.verify(spyOnNn0, timeout(60000).times(1)).blockReceivedAndDeleted(
77+
any(DatanodeRegistration.class),
78+
anyString(),
79+
any(StorageReceivedDeletedBlocks[].class));
80+
Mockito.verify(spyOnNn1, timeout(60000).times(1)).blockReceivedAndDeleted(
7081
any(DatanodeRegistration.class),
7182
anyString(),
7283
any(StorageReceivedDeletedBlocks[].class));
@@ -75,12 +86,21 @@ private void testTriggerBlockReport(boolean incremental) throws Exception {
7586
// since the interval we configured is so long.
7687
for (int i = 0; i < 3; i++) {
7788
Thread.sleep(10);
78-
Mockito.verify(spy, times(0)).blockReport(
89+
Mockito.verify(spyOnNn0, times(0)).blockReport(
7990
any(DatanodeRegistration.class),
8091
anyString(),
8192
any(StorageBlockReport[].class),
8293
any());
83-
Mockito.verify(spy, times(1)).blockReceivedAndDeleted(
94+
Mockito.verify(spyOnNn0, times(1)).blockReceivedAndDeleted(
95+
any(DatanodeRegistration.class),
96+
anyString(),
97+
any(StorageReceivedDeletedBlocks[].class));
98+
Mockito.verify(spyOnNn1, times(0)).blockReport(
99+
any(DatanodeRegistration.class),
100+
anyString(),
101+
any(StorageBlockReport[].class),
102+
any());
103+
Mockito.verify(spyOnNn1, times(1)).blockReceivedAndDeleted(
84104
any(DatanodeRegistration.class),
85105
anyString(),
86106
any(StorageReceivedDeletedBlocks[].class));
@@ -91,34 +111,47 @@ private void testTriggerBlockReport(boolean incremental) throws Exception {
91111
ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
92112
new Block(5678, 512, 1000), BlockStatus.DELETED_BLOCK, null);
93113
DataNode datanode = cluster.getDataNodes().get(0);
94-
BPServiceActor actor =
95-
datanode.getAllBpOs().get(0).getBPServiceActors().get(0);
96-
final FsDatasetSpi<?> dataset = datanode.getFSDataset();
97-
final DatanodeStorage storage;
98-
try (FsDatasetSpi.FsVolumeReferences volumes =
99-
dataset.getFsVolumeReferences()) {
100-
storage = dataset.getStorage(volumes.get(0).getStorageID());
114+
for (BPServiceActor actor : datanode.getAllBpOs().get(0).getBPServiceActors()) {
115+
final FsDatasetSpi<?> dataset = datanode.getFSDataset();
116+
final DatanodeStorage storage;
117+
try (FsDatasetSpi.FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
118+
storage = dataset.getStorage(volumes.get(0).getStorageID());
119+
}
120+
actor.getIbrManager().addRDBI(rdbi, storage);
101121
}
102122

103-
actor.getIbrManager().addRDBI(rdbi, storage);
104-
105123
// Manually trigger a block report.
124+
// Only trigger block report to NN1 when testing triggering block report on specific namenode.
125+
InetSocketAddress nnAddr = withSpecificNN ? cluster.getNameNode(1).getServiceRpcAddress() : null;
106126
datanode.triggerBlockReport(
107127
new BlockReportOptions.Factory().
128+
setNamenodeAddr(nnAddr).
108129
setIncremental(incremental).
109130
build()
110131
);
111132

112133
// triggerBlockReport returns before the block report is
113134
// actually sent. Wait for it to be sent here.
114135
if (incremental) {
115-
Mockito.verify(spy, timeout(60000).times(2)).
136+
Mockito.verify(spyOnNn1, timeout(60000).times(2)).
137+
blockReceivedAndDeleted(
138+
any(DatanodeRegistration.class),
139+
anyString(),
140+
any(StorageReceivedDeletedBlocks[].class));
141+
int nn0IncrBlockReport = withSpecificNN ? 1 : 2;
142+
Mockito.verify(spyOnNn0, timeout(60000).times(nn0IncrBlockReport)).
116143
blockReceivedAndDeleted(
117144
any(DatanodeRegistration.class),
118145
anyString(),
119146
any(StorageReceivedDeletedBlocks[].class));
120147
} else {
121-
Mockito.verify(spy, timeout(60000)).blockReport(
148+
Mockito.verify(spyOnNn1, timeout(60000).times(1)).blockReport(
149+
any(DatanodeRegistration.class),
150+
anyString(),
151+
any(StorageBlockReport[].class),
152+
any());
153+
int nn0BlockReport = withSpecificNN ? 0 : 1;
154+
Mockito.verify(spyOnNn0, timeout(60000).times(nn0BlockReport)).blockReport(
122155
any(DatanodeRegistration.class),
123156
anyString(),
124157
any(StorageBlockReport[].class),
@@ -130,11 +163,13 @@ private void testTriggerBlockReport(boolean incremental) throws Exception {
130163

131164
@Test
132165
public void testTriggerFullBlockReport() throws Exception {
133-
testTriggerBlockReport(false);
166+
testTriggerBlockReport(false, false);
167+
testTriggerBlockReport(false, true);
134168
}
135169

136170
@Test
137171
public void testTriggerIncrementalBlockReport() throws Exception {
138-
testTriggerBlockReport(true);
172+
testTriggerBlockReport(true, false);
173+
testTriggerBlockReport(true, true);
139174
}
140175
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,31 @@ public void testGetDatanodeInfo() throws Exception {
245245
}
246246
}
247247

248+
@Test(timeout = 30000)
249+
public void testTriggerBlockReport() throws Exception {
250+
redirectStream();
251+
final DFSAdmin dfsAdmin = new DFSAdmin(conf);
252+
final DataNode dn = cluster.getDataNodes().get(0);
253+
final NameNode nn = cluster.getNameNode();
254+
255+
final String dnAddr = String.format(
256+
"%s:%d",
257+
dn.getXferAddress().getHostString(),
258+
dn.getIpcPort());
259+
final String nnAddr = nn.getHostAndPort();
260+
resetStream();
261+
final List<String> outs = Lists.newArrayList();
262+
final int ret = ToolRunner.run(dfsAdmin,
263+
new String[]{"-triggerBlockReport", dnAddr, "-incremental", "-namenode", nnAddr});
264+
assertEquals(0, ret);
265+
266+
scanIntoList(out, outs);
267+
assertEquals(1, outs.size());
268+
assertThat(outs.get(0),
269+
is(allOf(containsString("Triggering an incremental block report on "),
270+
containsString(" to namenode "))));
271+
}
272+
248273
@Test(timeout = 30000)
249274
public void testGetVolumeReport() throws Exception {
250275
redirectStream();

0 commit comments

Comments
 (0)