Skip to content

Commit 52f925d

Browse files
committed
checkstyle
1 parent d6a5722 commit 52f925d

File tree

5 files changed

+49
-38
lines changed

5 files changed

+49
-38
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,13 @@ public class ClientContext {
136136
private volatile DeadNodeDetector deadNodeDetector = null;
137137

138138
/**
139-
* The switch for the {@link LocatedBlocksRefresher}
139+
* The switch for the {@link LocatedBlocksRefresher}.
140140
*/
141141
private final boolean locatedBlocksRefresherEnabled;
142142

143143
/**
144144
* Periodically refresh the {@link org.apache.hadoop.hdfs.protocol.LocatedBlocks} backing
145-
* registered {@link DFSInputStream}s, to take advantage of changes in block placement
145+
* registered {@link DFSInputStream}s, to take advantage of changes in block placement.
146146
*/
147147
private volatile LocatedBlocksRefresher locatedBlocksRefresher = null;
148148

@@ -321,7 +321,7 @@ public DeadNodeDetector getDeadNodeDetector() {
321321

322322
/**
323323
* If true, LocatedBlocksRefresher will be periodically refreshing LocatedBlocks
324-
* of registered DFSInputStreams
324+
* of registered DFSInputStreams.
325325
*/
326326
public boolean isLocatedBlocksRefresherEnabled() {
327327
return locatedBlocksRefresherEnabled;

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -307,10 +307,10 @@ private LocatedBlocks fetchAndCheckLocatedBlocks(LocatedBlocks existing)
307307
return newInfo;
308308
}
309309

310-
private long getLastBlockLength(LocatedBlocks locatedBlocks) throws IOException{
310+
private long getLastBlockLength(LocatedBlocks blocks) throws IOException{
311311
long lastBlockBeingWrittenLength = 0;
312-
if (!locatedBlocks.isLastBlockComplete()) {
313-
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
312+
if (!blocks.isLastBlockComplete()) {
313+
final LocatedBlock last = blocks.getLastLocatedBlock();
314314
if (last != null) {
315315
if (last.getLocations().length == 0) {
316316
if (last.getBlockSize() == 0) {
@@ -1948,7 +1948,7 @@ protected void maybeRegisterBlockRefresh() {
19481948
}
19491949

19501950
/**
1951-
* De-register periodic refresh of this inputstream, if it was added to begin with
1951+
* De-register periodic refresh of this inputstream, if it was added to begin with.
19521952
*/
19531953
private void maybeDeRegisterBlockRefresh() {
19541954
if (refreshingBlockLocations.get()) {
@@ -1957,24 +1957,24 @@ private void maybeDeRegisterBlockRefresh() {
19571957
}
19581958

19591959
/**
1960-
* Refresh blocks for the input stream, if necessary
1960+
* Refresh blocks for the input stream, if necessary.
19611961
*
19621962
* @param addressCache optional map to use as a cache for resolving datanode InetSocketAddress
19631963
* @return whether a refresh was performed or not
19641964
*/
19651965
boolean refreshBlockLocations(Map<String, InetSocketAddress> addressCache) {
1966-
LocatedBlocks locatedBlocks;
1966+
LocatedBlocks blocks;
19671967
synchronized (infoLock) {
1968-
locatedBlocks = getLocatedBlocks();
1968+
blocks = getLocatedBlocks();
19691969
}
19701970

1971-
if (getLocalDeadNodes().isEmpty() && allBlocksLocal(locatedBlocks, addressCache)) {
1971+
if (getLocalDeadNodes().isEmpty() && allBlocksLocal(blocks, addressCache)) {
19721972
return false;
19731973
}
19741974

19751975
try {
19761976
DFSClient.LOG.debug("Refreshing {} for path {}", this, getSrc());
1977-
LocatedBlocks newLocatedBlocks = fetchAndCheckLocatedBlocks(locatedBlocks);
1977+
LocatedBlocks newLocatedBlocks = fetchAndCheckLocatedBlocks(blocks);
19781978
long lastBlockLength = getLastBlockLength(newLocatedBlocks);
19791979
if (lastBlockLength == -1) {
19801980
DFSClient.LOG.debug(
@@ -1995,10 +1995,10 @@ boolean refreshBlockLocations(Map<String, InetSocketAddress> addressCache) {
19951995
* Once new LocatedBlocks have been fetched, sets them on the DFSInputStream and
19961996
* updates stateful read location within the necessary locks.
19971997
*/
1998-
private synchronized void setRefreshedValues(LocatedBlocks locatedBlocks, long lastBlockLength)
1998+
private synchronized void setRefreshedValues(LocatedBlocks blocks, long lastBlockLength)
19991999
throws IOException {
20002000
synchronized (infoLock) {
2001-
setLocatedBlocksFields(locatedBlocks, lastBlockLength);
2001+
setLocatedBlocksFields(blocks, lastBlockLength);
20022002
}
20032003

20042004
getLocalDeadNodes().clear();
@@ -2009,14 +2009,15 @@ private synchronized void setRefreshedValues(LocatedBlocks locatedBlocks, long l
20092009
}
20102010
}
20112011

2012-
private boolean allBlocksLocal(LocatedBlocks locatedBlocks, Map<String, InetSocketAddress> addressCache) {
2012+
private boolean allBlocksLocal(LocatedBlocks blocks,
2013+
Map<String, InetSocketAddress> addressCache) {
20132014
if (addressCache == null) {
20142015
addressCache = new HashMap<>();
20152016
}
20162017

20172018
// we only need to check the first location of each block, because the blocks are already
20182019
// sorted by distance from the current host
2019-
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
2020+
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
20202021
if (lb.getLocations().length == 0) {
20212022
return false;
20222023
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/LocatedBlocksRefresher.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@
4444
import org.slf4j.LoggerFactory;
4545

4646
/**
47-
* Periodically refresh the underlying cached {@link LocatedBlocks} for eligible registered {@link DFSInputStream}s.
48-
* DFSInputStreams are eligible for refreshing if they have any deadNodes or any blocks are lacking local replicas.
47+
* Periodically refresh the underlying cached {@link LocatedBlocks} for eligible registered
48+
* {@link DFSInputStream}s. DFSInputStreams are eligible for refreshing if they have any
49+
* deadNodes or any blocks are lacking local replicas.
4950
* Disabled by default, unless an interval is configured.
5051
*/
5152
public class LocatedBlocksRefresher extends Daemon {
@@ -59,9 +60,10 @@ public class LocatedBlocksRefresher extends Daemon {
5960
private final long jitter;
6061
private final ExecutorService refreshThreadPool;
6162

62-
// Use WeakHashMap so that we don't hold onto references that might have not been explicitly closed
63-
// because they were created and thrown away.
64-
private final Set<DFSInputStream> registeredInputStreams = Collections.newSetFromMap(new WeakHashMap<>());
63+
// Use WeakHashMap so that we don't hold onto references that might have not been explicitly
64+
// closed because they were created and thrown away.
65+
private final Set<DFSInputStream> registeredInputStreams =
66+
Collections.newSetFromMap(new WeakHashMap<>());
6567

6668
private int runCount;
6769
private int refreshCount;
@@ -116,7 +118,8 @@ public void run() {
116118
phaser.register();
117119
refreshThreadPool.submit(() -> {
118120
try {
119-
if (isInputStreamTracked(inputStream) && inputStream.refreshBlockLocations(addressCache)) {
121+
if (isInputStreamTracked(inputStream) &&
122+
inputStream.refreshBlockLocations(addressCache)) {
120123
neededRefresh.incrementAndGet();
121124
}
122125
} finally {
@@ -175,11 +178,11 @@ public void shutdown() {
175178
}
176179

177180
/**
178-
* Collects the DFSInputStreams to a list within synchronization, so that we can iterate them without
179-
* potentially blocking callers to {@link #addInputStream(DFSInputStream)} or
181+
* Collects the DFSInputStreams to a list within synchronization, so that we can iterate them
182+
* without potentially blocking callers to {@link #addInputStream(DFSInputStream)} or
180183
* {@link #removeInputStream(DFSInputStream)}. We don't care so much about missing additions,
181-
* and we'll guard against removals by doing an additional {@link #isInputStreamTracked(DFSInputStream)}
182-
* track during iteration.
184+
* and we'll guard against removals by doing an additional
185+
* {@link #isInputStreamTracked(DFSInputStream)} track during iteration.
183186
*/
184187
private synchronized Collection<DFSInputStream> getInputStreams() {
185188
return new ArrayList<>(registeredInputStreams);

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,10 @@ private void testWithRegistrationMethod(ThrowingConsumer registrationMethod) thr
235235
}
236236

237237
private Path createFile(String fileName) throws IOException {
238-
Path filePath = new Path(fileName);
239-
try (FSDataOutputStream fout = fs.create(filePath, REPLICATION_FACTOR)) {
238+
Path path = new Path(fileName);
239+
try (FSDataOutputStream fout = fs.create(path, REPLICATION_FACTOR)) {
240240
fout.write(new byte[(fileLength)]);
241241
}
242-
return filePath;
242+
return path;
243243
}
244-
}
244+
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLocatedBlocksRefresher.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ public void testDisabledOnZeroInterval() throws IOException {
105105
@Test
106106
public void testEnabledOnNonZeroInterval() throws Exception {
107107
setupTest(1000);
108-
LocatedBlocksRefresher refresher = cluster.getFileSystem().getClient().getLocatedBlockRefresher();
108+
LocatedBlocksRefresher refresher =
109+
cluster.getFileSystem().getClient().getLocatedBlockRefresher();
109110
assertNotNull(refresher);
110111
assertNoMoreRefreshes(refresher);
111112
}
@@ -205,15 +206,17 @@ private void assertNoMoreRefreshes(LocatedBlocksRefresher refresher) throws Inte
205206
int runCount = refresher.getRunCount();
206207
int refreshCount = refresher.getRefreshCount();
207208

208-
LOG.info("Waiting for at least {} runs, from current {}, expecting no refreshes", runCount + 3, runCount);
209+
LOG.info("Waiting for at least {} runs, from current {}, expecting no refreshes",
210+
runCount + 3, runCount);
209211
// wait for it to run 3 times, with some buffer
210212
awaitWithTimeout(() -> refresher.getRunCount() > runCount + 3, 5 * interval);
211213

212214
// it should not have refreshed anything, because no DFSInputStreams registered anymore
213215
assertEquals(refreshCount, refresher.getRefreshCount());
214216
}
215217

216-
private void assertRefreshes(LocatedBlocksRefresher refresher, int expectedRefreshes) throws InterruptedException {
218+
private void assertRefreshes(LocatedBlocksRefresher refresher, int expectedRefreshes)
219+
throws InterruptedException {
217220
int runCount = refresher.getRunCount();
218221
int refreshCount = refresher.getRefreshCount();
219222
int expectedRuns = 3;
@@ -222,18 +225,22 @@ private void assertRefreshes(LocatedBlocksRefresher refresher, int expectedRefre
222225
expectedRefreshes = expectedRuns;
223226
}
224227

225-
LOG.info("Waiting for at least {} runs, from current {}. Expecting {} refreshes, from current {}",
226-
runCount + expectedRuns, runCount, refreshCount + expectedRefreshes, refreshCount);
228+
LOG.info(
229+
"Waiting for at least {} runs, from current {}. Expecting {} refreshes, from current {}",
230+
runCount + expectedRuns, runCount, refreshCount + expectedRefreshes, refreshCount
231+
);
227232

228233
// wait for it to run 3 times
229234
awaitWithTimeout(() -> refresher.getRunCount() >= runCount + expectedRuns, 10_000);
230235

231-
// the values may not be identical due to any refreshes that occurred before we opened the DFSInputStream
232-
// but the difference should be identical since we are refreshing every time
236+
// the values may not be identical due to any refreshes that occurred before we opened
237+
// the DFSInputStream but the difference should be identical since we are refreshing
238+
// every time
233239
assertEquals(expectedRefreshes, refresher.getRefreshCount() - refreshCount);
234240
}
235241

236-
private void awaitWithTimeout(Supplier<Boolean> test, long timeoutMillis) throws InterruptedException {
242+
private void awaitWithTimeout(Supplier<Boolean> test, long timeoutMillis)
243+
throws InterruptedException {
237244
long now = Time.monotonicNow();
238245

239246
while(!test.get()) {

0 commit comments

Comments
 (0)