Skip to content

Commit e40f99f

Browse files
committed
HDFS-15767. RBF: Router federation rename of directory. Contributed by Jinglun.
1 parent b441ca8 commit e40f99f

File tree

16 files changed

+981
-32
lines changed

16 files changed

+981
-32
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
9696
<scope>test</scope>
9797
<type>test-jar</type>
9898
</dependency>
99+
<dependency>
100+
<groupId>org.apache.hadoop</groupId>
101+
<artifactId>hadoop-distcp</artifactId>
102+
<scope>test</scope>
103+
</dependency>
99104
<dependency>
100105
<groupId>com.fasterxml.jackson.core</groupId>
101106
<artifactId>jackson-annotations</artifactId>
@@ -115,6 +120,26 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
115120
<scope>test</scope>
116121
<type>test-jar</type>
117122
</dependency>
123+
<dependency>
124+
<groupId>org.apache.hadoop</groupId>
125+
<artifactId>hadoop-mapreduce-client-app</artifactId>
126+
<scope>test</scope>
127+
</dependency>
128+
<dependency>
129+
<groupId>org.apache.hadoop</groupId>
130+
<artifactId>hadoop-mapreduce-client-hs</artifactId>
131+
<scope>test</scope>
132+
</dependency>
133+
<dependency>
134+
<groupId>org.apache.hadoop</groupId>
135+
<artifactId>hadoop-mapreduce-client-core</artifactId>
136+
<scope>test</scope>
137+
</dependency>
138+
<dependency>
139+
<groupId>org.apache.hadoop</groupId>
140+
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
141+
<scope>test</scope>
142+
</dependency>
118143
<dependency>
119144
<groupId>org.apache.curator</groupId>
120145
<artifactId>curator-test</artifactId>

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,16 @@ public long getHighestPriorityLowRedundancyECBlocks() {
697697
MembershipStats::getHighestPriorityLowRedundancyECBlocks);
698698
}
699699

700+
@Override
701+
public int getRouterFederationRenameCount() {
702+
return this.router.getRpcServer().getRouterFederationRenameCount();
703+
}
704+
705+
@Override
706+
public int getSchedulerJobCount() {
707+
return this.router.getRpcServer().getSchedulerJobCount();
708+
}
709+
700710
@Override
701711
public String getSafemode() {
702712
if (this.router.isRouterState(RouterServiceState.SAFEMODE)) {

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RouterMBean.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,19 @@ public interface RouterMBean {
108108
* @return Json string of owners to token counts
109109
*/
110110
String getTopTokenRealOwners();
111+
112+
/**
113+
* Gets the count of the currently running router federation rename jobs.
114+
*
115+
* @return the count of the currently running router federation rename jobs.
116+
*/
117+
int getRouterFederationRenameCount();
118+
119+
/**
120+
* Gets the count of the currently running jobs in the scheduler. It includes
121+
* both the submitted and the recovered jobs.
122+
*
123+
* @return the count of the currently running jobs in the scheduler.
124+
*/
125+
int getSchedulerJobCount();
111126
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,4 +348,31 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
348348
NoRouterRpcFairnessPolicyController.class;
349349
public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
350350
FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
351+
352+
// HDFS Router Federation Rename.
353+
public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX =
354+
FEDERATION_ROUTER_PREFIX + "federation.rename.";
355+
public static final String DFS_ROUTER_FEDERATION_RENAME_OPTION =
356+
DFS_ROUTER_FEDERATION_RENAME_PREFIX + "option";
357+
public static final String DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT =
358+
"NONE";
359+
public static final String
360+
DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE =
361+
DFS_ROUTER_FEDERATION_RENAME_PREFIX + "force.close.open.file";
362+
public static final boolean
363+
DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE_DEFAULT = true;
364+
public static final String DFS_ROUTER_FEDERATION_RENAME_MAP =
365+
DFS_ROUTER_FEDERATION_RENAME_PREFIX + "map";
366+
public static final String DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH =
367+
DFS_ROUTER_FEDERATION_RENAME_PREFIX + "bandwidth";
368+
public static final String DFS_ROUTER_FEDERATION_RENAME_DELAY =
369+
DFS_ROUTER_FEDERATION_RENAME_PREFIX + "delay";
370+
public static final long DFS_ROUTER_FEDERATION_RENAME_DELAY_DEFAULT = 1000;
371+
public static final String DFS_ROUTER_FEDERATION_RENAME_DIFF =
372+
DFS_ROUTER_FEDERATION_RENAME_PREFIX + "diff";
373+
public static final int DFS_ROUTER_FEDERATION_RENAME_DIFF_DEFAULT = 0;
374+
public static final String DFS_ROUTER_FEDERATION_RENAME_TRASH =
375+
DFS_ROUTER_FEDERATION_RENAME_PREFIX + "trash";
376+
public static final String DFS_ROUTER_FEDERATION_RENAME_TRASH_DEFAULT =
377+
"trash";
351378
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ public class RouterClientProtocol implements ClientProtocol {
126126

127127
private final RouterRpcServer rpcServer;
128128
private final RouterRpcClient rpcClient;
129+
private final RouterFederationRename rbfRename;
129130
private final FileSubclusterResolver subclusterResolver;
130131
private final ActiveNamenodeResolver namenodeResolver;
131132

@@ -191,6 +192,7 @@ public class RouterClientProtocol implements ClientProtocol {
191192
this.snapshotProto = new RouterSnapshot(rpcServer);
192193
this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
193194
this.securityManager = rpcServer.getRouterSecurityManager();
195+
this.rbfRename = new RouterFederationRename(rpcServer, conf);
194196
}
195197

196198
@Override
@@ -594,13 +596,13 @@ public boolean rename(final String src, final String dst)
594596

595597
final List<RemoteLocation> srcLocations =
596598
rpcServer.getLocationsForPath(src, true, false);
599+
final List<RemoteLocation> dstLocations =
600+
rpcServer.getLocationsForPath(dst, false, false);
597601
// srcLocations may be trimmed by getRenameDestinations()
598602
final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
599-
RemoteParam dstParam = getRenameDestinations(locs, dst);
603+
RemoteParam dstParam = getRenameDestinations(locs, dstLocations);
600604
if (locs.isEmpty()) {
601-
throw new IOException(
602-
"Rename of " + src + " to " + dst + " is not allowed," +
603-
" no eligible destination in the same namespace was found.");
605+
return rbfRename.routerFedRename(src, dst, srcLocations, dstLocations);
604606
}
605607
RemoteMethod method = new RemoteMethod("rename",
606608
new Class<?>[] {String.class, String.class},
@@ -620,13 +622,14 @@ public void rename2(final String src, final String dst,
620622

621623
final List<RemoteLocation> srcLocations =
622624
rpcServer.getLocationsForPath(src, true, false);
625+
final List<RemoteLocation> dstLocations =
626+
rpcServer.getLocationsForPath(dst, false, false);
623627
// srcLocations may be trimmed by getRenameDestinations()
624628
final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
625-
RemoteParam dstParam = getRenameDestinations(locs, dst);
629+
RemoteParam dstParam = getRenameDestinations(locs, dstLocations);
626630
if (locs.isEmpty()) {
627-
throw new IOException(
628-
"Rename of " + src + " to " + dst + " is not allowed," +
629-
" no eligible destination in the same namespace was found.");
631+
rbfRename.routerFedRename(src, dst, srcLocations, dstLocations);
632+
return;
630633
}
631634
RemoteMethod method = new RemoteMethod("rename2",
632635
new Class<?>[] {String.class, String.class, options.getClass()},
@@ -1821,11 +1824,9 @@ public HAServiceProtocol.HAServiceState getHAServiceState() {
18211824
* @throws IOException If the dst paths could not be determined.
18221825
*/
18231826
private RemoteParam getRenameDestinations(
1824-
final List<RemoteLocation> srcLocations, final String dst)
1825-
throws IOException {
1827+
final List<RemoteLocation> srcLocations,
1828+
final List<RemoteLocation> dstLocations) throws IOException {
18261829

1827-
final List<RemoteLocation> dstLocations =
1828-
rpcServer.getLocationsForPath(dst, false, false);
18291830
final Map<RemoteLocation, String> dstMap = new HashMap<>();
18301831

18311832
Iterator<RemoteLocation> iterator = srcLocations.iterator();
@@ -2203,4 +2204,8 @@ boolean isMultiDestDirectory(String src) throws IOException {
22032204
}
22042205
return false;
22052206
}
2207+
2208+
public int getRouterFederationRenameCount() {
2209+
return rbfRename.getRouterFederationRenameCount();
2210+
}
22062211
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router;
19+
20+
import org.apache.hadoop.classification.InterfaceAudience;
21+
import org.apache.hadoop.classification.InterfaceStability;
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.fs.Path;
24+
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
25+
import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
26+
import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs;
27+
import org.apache.hadoop.tools.fedbalance.FedBalanceContext;
28+
import org.apache.hadoop.tools.fedbalance.TrashProcedure;
29+
import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
30+
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
31+
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
32+
33+
import java.io.IOException;
34+
import java.util.List;
35+
import java.util.concurrent.atomic.AtomicInteger;
36+
37+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE;
38+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE_DEFAULT;
39+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_MAP;
40+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH;
41+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DELAY;
42+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DELAY_DEFAULT;
43+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DIFF;
44+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DIFF_DEFAULT;
45+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_TRASH;
46+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_TRASH_DEFAULT;
47+
import static org.apache.hadoop.tools.fedbalance.FedBalance.DISTCP_PROCEDURE;
48+
import static org.apache.hadoop.tools.fedbalance.FedBalance.TRASH_PROCEDURE;
49+
import static org.apache.hadoop.tools.fedbalance.FedBalance.NO_MOUNT;
50+
51+
import org.slf4j.Logger;
52+
import org.slf4j.LoggerFactory;
53+
54+
/**
55+
* Rename across router based federation namespaces.
56+
*/
57+
@InterfaceAudience.Private
58+
@InterfaceStability.Unstable
59+
public class RouterFederationRename {
60+
61+
private static final Logger LOG =
62+
LoggerFactory.getLogger(RouterFederationRename.class.getName());
63+
private final RouterRpcServer rpcServer;
64+
private final Configuration conf;
65+
private final AtomicInteger routerRenameCounter = new AtomicInteger();
66+
public enum RouterRenameOption {
67+
NONE, DISTCP
68+
}
69+
70+
public RouterFederationRename(RouterRpcServer rpcServer, Configuration conf) {
71+
this.rpcServer = rpcServer;
72+
this.conf = conf;
73+
}
74+
75+
/**
76+
* Router federation rename across namespaces.
77+
*
78+
* @param src the source path. There is no mount point under the src path.
79+
* @param dst the dst path.
80+
* @param srcLocations the remote locations of src.
81+
* @param dstLocations the remote locations of dst.
82+
* @throws IOException if rename fails.
83+
* @return true if rename succeeds.
84+
*/
85+
boolean routerFedRename(final String src, final String dst,
86+
final List<RemoteLocation> srcLocations,
87+
final List<RemoteLocation> dstLocations) throws IOException {
88+
if (!rpcServer.isEnableRenameAcrossNamespace()) {
89+
throw new IOException("Rename of " + src + " to " + dst
90+
+ " is not allowed, no eligible destination in the same namespace was"
91+
+ " found");
92+
}
93+
if (srcLocations.size() != 1 || dstLocations.size() != 1) {
94+
throw new IOException("Rename of " + src + " to " + dst + " is not"
95+
+ " allowed. The remote location should be exactly one.");
96+
}
97+
RemoteLocation srcLoc = srcLocations.get(0);
98+
RemoteLocation dstLoc = dstLocations.get(0);
99+
// Build and submit router federation rename job.
100+
BalanceJob job = buildRouterRenameJob(srcLoc.getNameserviceId(),
101+
dstLoc.getNameserviceId(), srcLoc.getDest(), dstLoc.getDest());
102+
BalanceProcedureScheduler scheduler = rpcServer.getFedRenameScheduler();
103+
countIncrement();
104+
try {
105+
scheduler.submit(job);
106+
LOG.info("Rename {} to {} from namespace {} to {}. JobId={}.", src, dst,
107+
srcLoc.getNameserviceId(), dstLoc.getNameserviceId(), job.getId());
108+
scheduler.waitUntilDone(job);
109+
if (job.getError() != null) {
110+
throw new IOException("Rename of " + src + " to " + dst + " failed.",
111+
job.getError());
112+
}
113+
return true;
114+
} finally {
115+
countDecrement();
116+
}
117+
}
118+
119+
/**
120+
* Build router federation rename job moving data from src to dst.
121+
* @param srcNs the source namespace id.
122+
* @param dstNs the dst namespace id.
123+
* @param src the source path.
124+
* @param dst the dst path.
125+
*/
126+
private BalanceJob buildRouterRenameJob(String srcNs, String dstNs,
127+
String src, String dst) throws IOException {
128+
checkConfiguration(conf);
129+
Path srcPath = new Path("hdfs://" + srcNs + src);
130+
Path dstPath = new Path("hdfs://" + dstNs + dst);
131+
boolean forceCloseOpen =
132+
conf.getBoolean(DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE,
133+
DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE_DEFAULT);
134+
int map = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_MAP, -1);
135+
int bandwidth = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, -1);
136+
long delay = conf.getLong(DFS_ROUTER_FEDERATION_RENAME_DELAY,
137+
DFS_ROUTER_FEDERATION_RENAME_DELAY_DEFAULT);
138+
int diff = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_DIFF,
139+
DFS_ROUTER_FEDERATION_RENAME_DIFF_DEFAULT);
140+
String trashPolicy = conf.get(DFS_ROUTER_FEDERATION_RENAME_TRASH,
141+
DFS_ROUTER_FEDERATION_RENAME_TRASH_DEFAULT);
142+
FedBalanceConfigs.TrashOption trashOpt =
143+
FedBalanceConfigs.TrashOption.valueOf(trashPolicy.toUpperCase());
144+
// Construct job context.
145+
FedBalanceContext context =
146+
new FedBalanceContext.Builder(srcPath, dstPath, NO_MOUNT, conf)
147+
.setForceCloseOpenFiles(forceCloseOpen)
148+
.setUseMountReadOnly(true)
149+
.setMapNum(map)
150+
.setBandwidthLimit(bandwidth)
151+
.setTrash(trashOpt)
152+
.setDelayDuration(delay)
153+
.setDiffThreshold(diff)
154+
.build();
155+
156+
LOG.info(context.toString());
157+
// Construct the balance job.
158+
BalanceJob.Builder<BalanceProcedure> builder = new BalanceJob.Builder<>();
159+
DistCpProcedure dcp =
160+
new DistCpProcedure(DISTCP_PROCEDURE, null, delay, context);
161+
builder.nextProcedure(dcp);
162+
TrashProcedure tp =
163+
new TrashProcedure(TRASH_PROCEDURE, null, delay, context);
164+
builder.nextProcedure(tp);
165+
return builder.build();
166+
}
167+
168+
public int getRouterFederationRenameCount() {
169+
return routerRenameCounter.get();
170+
}
171+
172+
void countIncrement() {
173+
routerRenameCounter.incrementAndGet();
174+
}
175+
176+
void countDecrement() {
177+
routerRenameCounter.decrementAndGet();
178+
}
179+
180+
static void checkConfiguration(Configuration conf) throws IOException {
181+
int map = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_MAP, -1);
182+
int bandwidth = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, -1);
183+
long delay = conf.getLong(DFS_ROUTER_FEDERATION_RENAME_DELAY,
184+
DFS_ROUTER_FEDERATION_RENAME_DELAY_DEFAULT);
185+
int diff = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_DIFF,
186+
DFS_ROUTER_FEDERATION_RENAME_DIFF_DEFAULT);
187+
if (map < 0) {
188+
throw new IOException("map=" + map + " is negative. Please check "
189+
+ DFS_ROUTER_FEDERATION_RENAME_MAP);
190+
} else if (bandwidth < 0) {
191+
throw new IOException(
192+
"bandwidth=" + bandwidth + " is negative. Please check "
193+
+ DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH);
194+
} else if (delay < 0) {
195+
throw new IOException("delay=" + delay + " is negative. Please check "
196+
+ DFS_ROUTER_FEDERATION_RENAME_DELAY);
197+
} else if (diff < 0) {
198+
throw new IOException("diff=" + diff + " is negative. Please check "
199+
+ DFS_ROUTER_FEDERATION_RENAME_DIFF);
200+
}
201+
}
202+
}

0 commit comments

Comments
 (0)