Skip to content

Commit 7615945

Browse files
committed
HDFS-14814. RBF: RouterQuotaUpdateService supports inherited rule. Contributed by Jinglun.
1 parent 4fdf016 commit 7615945

File tree

5 files changed

+226
-18
lines changed

5 files changed

+226
-18
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.util.Collections;
2424
import java.util.List;
2525
import java.util.Map;
26+
import java.util.Map.Entry;
27+
import java.util.TreeMap;
2628
import java.util.Set;
2729

2830
import org.apache.hadoop.fs.QuotaUsage;
@@ -70,13 +72,30 @@ public Quota(Router router, RouterRpcServer server) {
7072
*/
7173
public void setQuota(String path, long namespaceQuota,
7274
long storagespaceQuota, StorageType type) throws IOException {
75+
setQuotaInternal(path, null, namespaceQuota, storagespaceQuota, type);
76+
}
77+
78+
/**
79+
* Set quota for the federation path.
80+
* @param path Federation path.
81+
* @param locations Locations of the Federation path.
82+
* @param namespaceQuota Name space quota.
83+
* @param storagespaceQuota Storage space quota.
84+
* @param type StorageType that the space quota is intended to be set on.
85+
* @throws IOException If the quota system is disabled.
86+
*/
87+
void setQuotaInternal(String path, List<RemoteLocation> locations,
88+
long namespaceQuota, long storagespaceQuota, StorageType type)
89+
throws IOException {
7390
rpcServer.checkOperation(OperationCategory.WRITE);
7491
if (!router.isQuotaEnabled()) {
7592
throw new IOException("The quota system is disabled in Router.");
7693
}
7794

7895
// Set quota for current path and its children mount table path.
79-
final List<RemoteLocation> locations = getQuotaRemoteLocations(path);
96+
if (locations == null) {
97+
locations = getQuotaRemoteLocations(path);
98+
}
8099
if (LOG.isDebugEnabled()) {
81100
for (RemoteLocation loc : locations) {
82101
LOG.debug("Set quota for path: nsId: {}, dest: {}.",
@@ -92,12 +111,23 @@ public void setQuota(String path, long namespaceQuota,
92111
}
93112

94113
/**
95-
* Get quota usage for the federation path.
114+
* Get aggregated quota usage for the federation path.
96115
* @param path Federation path.
97116
* @return Aggregated quota.
98117
* @throws IOException If the quota system is disabled.
99118
*/
100119
public QuotaUsage getQuotaUsage(String path) throws IOException {
120+
return aggregateQuota(getEachQuotaUsage(path));
121+
}
122+
123+
/**
124+
* Get quota usage for the federation path.
125+
* @param path Federation path.
126+
* @return quota usage for each remote location.
127+
* @throws IOException If the quota system is disabled.
128+
*/
129+
Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
130+
throws IOException {
101131
rpcServer.checkOperation(OperationCategory.READ);
102132
if (!router.isQuotaEnabled()) {
103133
throw new IOException("The quota system is disabled in Router.");
@@ -109,7 +139,39 @@ public QuotaUsage getQuotaUsage(String path) throws IOException {
109139
Map<RemoteLocation, QuotaUsage> results = rpcClient.invokeConcurrent(
110140
quotaLocs, method, true, false, QuotaUsage.class);
111141

112-
return aggregateQuota(results);
142+
return results;
143+
}
144+
145+
/**
146+
* Get global quota for the federation path.
147+
* @param path Federation path.
148+
* @return global quota for path.
149+
* @throws IOException If the quota system is disabled.
150+
*/
151+
QuotaUsage getGlobalQuota(String path) throws IOException {
152+
if (!router.isQuotaEnabled()) {
153+
throw new IOException("The quota system is disabled in Router.");
154+
}
155+
156+
long nQuota = HdfsConstants.QUOTA_RESET;
157+
long sQuota = HdfsConstants.QUOTA_RESET;
158+
RouterQuotaManager manager = this.router.getQuotaManager();
159+
TreeMap<String, RouterQuotaUsage> pts =
160+
manager.getParentsContainingQuota(path);
161+
Entry<String, RouterQuotaUsage> entry = pts.lastEntry();
162+
while (entry != null && (nQuota == HdfsConstants.QUOTA_RESET
163+
|| sQuota == HdfsConstants.QUOTA_RESET)) {
164+
String ppath = entry.getKey();
165+
QuotaUsage quota = entry.getValue();
166+
if (nQuota == HdfsConstants.QUOTA_RESET) {
167+
nQuota = quota.getQuota();
168+
}
169+
if (sQuota == HdfsConstants.QUOTA_RESET) {
170+
sQuota = quota.getSpaceQuota();
171+
}
172+
entry = pts.lowerEntry(ppath);
173+
}
174+
return new QuotaUsage.Builder().quota(nQuota).spaceQuota(sQuota).build();
113175
}
114176

115177
/**
@@ -157,7 +219,7 @@ private List<RemoteLocation> getValidQuotaLocations(String path)
157219
* @param results Quota query result.
158220
* @return Aggregated Quota.
159221
*/
160-
private QuotaUsage aggregateQuota(Map<RemoteLocation, QuotaUsage> results) {
222+
QuotaUsage aggregateQuota(Map<RemoteLocation, QuotaUsage> results) {
161223
long nsCount = 0;
162224
long ssCount = 0;
163225
long nsQuota = HdfsConstants.QUOTA_RESET;

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.isParentEntry;
2121

2222
import java.util.HashSet;
23+
import java.util.Map.Entry;
2324
import java.util.Set;
2425
import java.util.SortedMap;
2526
import java.util.TreeMap;
@@ -113,6 +114,32 @@ public Set<String> getPaths(String parentPath) {
113114
}
114115
}
115116

117+
/**
118+
* Get parent paths (including itself) and quotas of the specified federation
119+
* path. Only parents containing quota are returned.
120+
* @param childPath Federated path.
121+
* @return TreeMap of parent paths and quotas.
122+
*/
123+
TreeMap<String, RouterQuotaUsage> getParentsContainingQuota(
124+
String childPath) {
125+
TreeMap<String, RouterQuotaUsage> res = new TreeMap<>();
126+
readLock.lock();
127+
try {
128+
Entry<String, RouterQuotaUsage> entry = this.cache.floorEntry(childPath);
129+
while (entry != null) {
130+
String mountPath = entry.getKey();
131+
RouterQuotaUsage quota = entry.getValue();
132+
if (isQuotaSet(quota) && isParentEntry(childPath, mountPath)) {
133+
res.put(mountPath, quota);
134+
}
135+
entry = this.cache.lowerEntry(mountPath);
136+
}
137+
return res;
138+
} finally {
139+
readLock.unlock();
140+
}
141+
}
142+
116143
/**
117144
* Put new entity into cache.
118145
* @param path Mount table path.

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,20 @@
1818
package org.apache.hadoop.hdfs.server.federation.router;
1919

2020
import java.io.IOException;
21+
import java.util.Arrays;
22+
import java.util.HashMap;
2123
import java.util.HashSet;
2224
import java.util.LinkedList;
2325
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Map.Entry;
2428
import java.util.Set;
2529
import java.util.concurrent.TimeUnit;
2630

2731
import org.apache.hadoop.conf.Configuration;
2832
import org.apache.hadoop.fs.QuotaUsage;
29-
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
3033
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
34+
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
3135
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
3236
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
3337
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
@@ -79,6 +83,7 @@ protected void periodicInvoke() {
7983
try {
8084
List<MountTable> updateMountTables = new LinkedList<>();
8185
List<MountTable> mountTables = getQuotaSetMountTables();
86+
Map<RemoteLocation, QuotaUsage> remoteQuotaUsage = new HashMap<>();
8287
for (MountTable entry : mountTables) {
8388
String src = entry.getSourcePath();
8489
RouterQuotaUsage oldQuota = entry.getQuota();
@@ -102,25 +107,17 @@ protected void periodicInvoke() {
102107
// Call RouterRpcServer#getQuotaUsage for getting current quota usage.
103108
// If any exception occurs catch it and proceed with other entries.
104109
try {
105-
currentQuotaUsage = this.rpcServer.getQuotaModule()
106-
.getQuotaUsage(src);
110+
Quota quotaModule = this.rpcServer.getQuotaModule();
111+
Map<RemoteLocation, QuotaUsage> usageMap =
112+
quotaModule.getEachQuotaUsage(src);
113+
currentQuotaUsage = quotaModule.aggregateQuota(usageMap);
114+
remoteQuotaUsage.putAll(usageMap);
107115
} catch (IOException ioe) {
108116
LOG.error("Unable to get quota usage for " + src, ioe);
109117
continue;
110118
}
111119
}
112120

113-
// If quota is not set in some subclusters under federation path,
114-
// set quota for this path.
115-
if (currentQuotaUsage.getQuota() == HdfsConstants.QUOTA_RESET) {
116-
try {
117-
this.rpcServer.setQuota(src, nsQuota, ssQuota, null);
118-
} catch (IOException ioe) {
119-
LOG.error("Unable to set quota at remote location for "
120-
+ src, ioe);
121-
}
122-
}
123-
124121
RouterQuotaUsage newQuota = generateNewQuota(oldQuota,
125122
currentQuotaUsage);
126123
this.quotaManager.put(src, newQuota);
@@ -139,12 +136,36 @@ protected void periodicInvoke() {
139136
}
140137
}
141138

139+
// Fix inconsistent quota.
140+
for (Entry<RemoteLocation, QuotaUsage> en : remoteQuotaUsage
141+
.entrySet()) {
142+
RemoteLocation remoteLocation = en.getKey();
143+
QuotaUsage currentQuota = en.getValue();
144+
fixGlobalQuota(remoteLocation, currentQuota);
145+
}
146+
142147
updateMountTableEntries(updateMountTables);
143148
} catch (IOException e) {
144149
LOG.error("Quota cache updated error.", e);
145150
}
146151
}
147152

153+
private void fixGlobalQuota(RemoteLocation location, QuotaUsage remoteQuota)
154+
throws IOException {
155+
QuotaUsage gQuota =
156+
this.rpcServer.getQuotaModule().getGlobalQuota(location.getSrc());
157+
if (remoteQuota.getQuota() != gQuota.getQuota()
158+
|| remoteQuota.getSpaceQuota() != gQuota.getSpaceQuota()) {
159+
this.rpcServer.getQuotaModule()
160+
.setQuotaInternal(location.getSrc(), Arrays.asList(location),
161+
gQuota.getQuota(), gQuota.getSpaceQuota(), null);
162+
LOG.info("[Fix Quota] src={} dst={} oldQuota={}/{} newQuota={}/{}",
163+
location.getSrc(), location, remoteQuota.getQuota(),
164+
remoteQuota.getSpaceQuota(), gQuota.getQuota(),
165+
gQuota.getSpaceQuota());
166+
}
167+
}
168+
148169
/**
149170
* Get mount table store management interface.
150171
* @return MountTableStore instance.

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableRouterQuota.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.io.IOException;
2424

2525
import org.apache.hadoop.test.GenericTestUtils;
26+
import org.apache.hadoop.test.LambdaTestUtils;
2627
import org.junit.AfterClass;
2728
import org.junit.Before;
2829
import org.junit.BeforeClass;
@@ -92,4 +93,13 @@ public void testGetQuotaUsage() throws Exception {
9293
}
9394
}
9495

96+
@Test
97+
public void testGetGlobalQuota() throws Exception {
98+
LambdaTestUtils.intercept(IOException.class,
99+
"The quota system is disabled in Router.",
100+
"The getGlobalQuota call should fail.", () -> {
101+
Quota quotaModule = router.getRpcServer().getQuotaModule();
102+
quotaModule.getGlobalQuota("/test");
103+
});
104+
}
95105
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -868,4 +868,92 @@ public void testNoQuotaaExceptionForUnrelatedOperations() throws Exception {
868868
routerFs.listStatus(path);
869869
routerFs.getContentSummary(path);
870870
}
871+
872+
@Test
873+
public void testGetGlobalQuota() throws Exception {
874+
long nsQuota = 5;
875+
long ssQuota = 3 * BLOCK_SIZE;
876+
prepareGlobalQuotaTestMountTable(nsQuota, ssQuota);
877+
878+
Quota qModule = routerContext.getRouter().getRpcServer().getQuotaModule();
879+
QuotaUsage qu = qModule.getGlobalQuota("/dir-1");
880+
assertEquals(nsQuota, qu.getQuota());
881+
assertEquals(ssQuota, qu.getSpaceQuota());
882+
qu = qModule.getGlobalQuota("/dir-1/dir-2");
883+
assertEquals(nsQuota, qu.getQuota());
884+
assertEquals(ssQuota * 2, qu.getSpaceQuota());
885+
qu = qModule.getGlobalQuota("/dir-1/dir-2/dir-3");
886+
assertEquals(nsQuota, qu.getQuota());
887+
assertEquals(ssQuota * 2, qu.getSpaceQuota());
888+
qu = qModule.getGlobalQuota("/dir-4");
889+
assertEquals(-1, qu.getQuota());
890+
assertEquals(-1, qu.getSpaceQuota());
891+
}
892+
893+
@Test
894+
public void testFixGlobalQuota() throws Exception {
895+
long nsQuota = 5;
896+
long ssQuota = 3 * BLOCK_SIZE;
897+
final FileSystem nnFs = nnContext1.getFileSystem();
898+
prepareGlobalQuotaTestMountTable(nsQuota, ssQuota);
899+
900+
QuotaUsage qu = nnFs.getQuotaUsage(new Path("/dir-1"));
901+
assertEquals(nsQuota, qu.getQuota());
902+
assertEquals(ssQuota, qu.getSpaceQuota());
903+
qu = nnFs.getQuotaUsage(new Path("/dir-2"));
904+
assertEquals(nsQuota, qu.getQuota());
905+
assertEquals(ssQuota * 2, qu.getSpaceQuota());
906+
qu = nnFs.getQuotaUsage(new Path("/dir-3"));
907+
assertEquals(nsQuota, qu.getQuota());
908+
assertEquals(ssQuota * 2, qu.getSpaceQuota());
909+
qu = nnFs.getQuotaUsage(new Path("/dir-4"));
910+
assertEquals(-1, qu.getQuota());
911+
assertEquals(-1, qu.getSpaceQuota());
912+
}
913+
914+
/**
915+
* Add three mount tables.
916+
* /dir-1 --> ns0---/dir-1 [nsQuota, ssQuota]
917+
* /dir-1/dir-2 --> ns0---/dir-2 [QUOTA_UNSET, ssQuota * 2]
918+
* /dir-1/dir-2/dir-3 --> ns0---/dir-3 [QUOTA_UNSET, QUOTA_UNSET]
919+
* /dir-4 --> ns0---/dir-4 [QUOTA_UNSET, QUOTA_UNSET]
920+
*
921+
* Expect three remote locations' global quota.
922+
* ns0---/dir-1 --> [nsQuota, ssQuota]
923+
* ns0---/dir-2 --> [nsQuota, ssQuota * 2]
924+
* ns0---/dir-3 --> [nsQuota, ssQuota * 2]
925+
* ns0---/dir-4 --> [-1, -1]
926+
*/
927+
private void prepareGlobalQuotaTestMountTable(long nsQuota, long ssQuota)
928+
throws IOException {
929+
final FileSystem nnFs = nnContext1.getFileSystem();
930+
931+
// Create destination directory
932+
nnFs.mkdirs(new Path("/dir-1"));
933+
nnFs.mkdirs(new Path("/dir-2"));
934+
nnFs.mkdirs(new Path("/dir-3"));
935+
nnFs.mkdirs(new Path("/dir-4"));
936+
937+
MountTable mountTable = MountTable.newInstance("/dir-1",
938+
Collections.singletonMap("ns0", "/dir-1"));
939+
mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
940+
.spaceQuota(ssQuota).build());
941+
addMountTable(mountTable);
942+
mountTable = MountTable.newInstance("/dir-1/dir-2",
943+
Collections.singletonMap("ns0", "/dir-2"));
944+
mountTable.setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota * 2)
945+
.build());
946+
addMountTable(mountTable);
947+
mountTable = MountTable.newInstance("/dir-1/dir-2/dir-3",
948+
Collections.singletonMap("ns0", "/dir-3"));
949+
addMountTable(mountTable);
950+
mountTable = MountTable.newInstance("/dir-4",
951+
Collections.singletonMap("ns0", "/dir-4"));
952+
addMountTable(mountTable);
953+
954+
// Ensure setQuota RPC was invoked and mount table was updated.
955+
RouterQuotaUpdateService updateService = routerContext.getRouter()
956+
.getQuotaCacheUpdateService();
957+
updateService.periodicInvoke();
958+
}
871959
}

0 commit comments

Comments
 (0)