Skip to content

Commit 10197ac

Browse files
committed
Revert "HDFS-16016. BPServiceActor to provide new thread to handle IBR (#2998)" (#6457) Contributed by Shilun Fan.
This reverts commit c1bf3cb. Reviewed-by: Takanobu Asanuma <[email protected]> Reviewed-by: He Xiaoqiao <[email protected]> Reviewed-by: Ayush Saxena <[email protected]> Reviewed-by: Viraj Jasani <[email protected]> Signed-off-by: Shilun Fan <[email protected]>
1 parent ec8d3a6 commit 10197ac

File tree

3 files changed

+17
-86
lines changed

3 files changed

+17
-86
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

Lines changed: 8 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
import java.util.TreeSet;
3737
import java.util.concurrent.BlockingQueue;
3838
import java.util.concurrent.CountDownLatch;
39-
import java.util.concurrent.ExecutorService;
40-
import java.util.concurrent.Executors;
4139
import java.util.concurrent.LinkedBlockingQueue;
4240
import java.util.concurrent.ThreadLocalRandom;
4341
import java.util.concurrent.atomic.AtomicBoolean;
@@ -73,7 +71,6 @@
7371
import org.apache.hadoop.io.IOUtils;
7472
import org.apache.hadoop.ipc.RemoteException;
7573
import org.apache.hadoop.net.NetUtils;
76-
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
7774
import org.apache.hadoop.util.Preconditions;
7875
import org.apache.hadoop.util.Time;
7976
import org.apache.hadoop.util.VersionInfo;
@@ -103,8 +100,6 @@ class BPServiceActor implements Runnable {
103100

104101
volatile long lastCacheReport = 0;
105102
private final Scheduler scheduler;
106-
private final Object sendIBRLock;
107-
private final ExecutorService ibrExecutorService;
108103

109104
Thread bpThread;
110105
DatanodeProtocolClientSideTranslatorPB bpNamenode;
@@ -161,10 +156,6 @@ enum RunningState {
161156
}
162157
commandProcessingThread = new CommandProcessingThread(this);
163158
commandProcessingThread.start();
164-
sendIBRLock = new Object();
165-
ibrExecutorService = Executors.newSingleThreadExecutor(
166-
new ThreadFactoryBuilder().setDaemon(true)
167-
.setNameFormat("ibr-executor-%d").build());
168159
}
169160

170161
public DatanodeRegistration getBpRegistration() {
@@ -397,10 +388,8 @@ List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
397388
// we have a chance that we will miss the delHint information
398389
// or we will report an RBW replica after the BlockReport already reports
399390
// a FINALIZED one.
400-
synchronized (sendIBRLock) {
401-
ibrManager.sendIBRs(bpNamenode, bpRegistration,
402-
bpos.getBlockPoolId(), getRpcMetricSuffix());
403-
}
391+
ibrManager.sendIBRs(bpNamenode, bpRegistration,
392+
bpos.getBlockPoolId(), getRpcMetricSuffix());
404393

405394
long brCreateStartTime = monotonicNow();
406395
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
@@ -633,9 +622,6 @@ void stop() {
633622
if (commandProcessingThread != null) {
634623
commandProcessingThread.interrupt();
635624
}
636-
if (ibrExecutorService != null && !ibrExecutorService.isShutdown()) {
637-
ibrExecutorService.shutdownNow();
638-
}
639625
}
640626

641627
//This must be called only by blockPoolManager
@@ -650,18 +636,13 @@ void join() {
650636
} catch (InterruptedException ie) { }
651637
}
652638

653-
// Cleanup method to be called by current thread before exiting.
654-
// Any Thread / ExecutorService started by BPServiceActor can be shutdown
655-
// here.
639+
//Cleanup method to be called by current thread before exiting.
656640
private synchronized void cleanUp() {
657641

658642
shouldServiceRun = false;
659643
IOUtils.cleanupWithLogger(null, bpNamenode);
660644
IOUtils.cleanupWithLogger(null, lifelineSender);
661645
bpos.shutdownActor(this);
662-
if (!ibrExecutorService.isShutdown()) {
663-
ibrExecutorService.shutdownNow();
664-
}
665646
}
666647

667648
private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException {
@@ -757,6 +738,11 @@ private void offerService() throws Exception {
757738
isSlownode = resp.getIsSlownode();
758739
}
759740
}
741+
if (!dn.areIBRDisabledForTests() &&
742+
(ibrManager.sendImmediately()|| sendHeartbeat)) {
743+
ibrManager.sendIBRs(bpNamenode, bpRegistration,
744+
bpos.getBlockPoolId(), getRpcMetricSuffix());
745+
}
760746

761747
List<DatanodeCommand> cmds = null;
762748
boolean forceFullBr =
@@ -923,10 +909,6 @@ public void run() {
923909
initialRegistrationComplete.countDown();
924910
}
925911

926-
// IBR tasks to be handled separately from offerService() in order to
927-
// improve performance of offerService(), which can now focus only on
928-
// FBR and heartbeat.
929-
ibrExecutorService.submit(new IBRTaskHandler());
930912
while (shouldRun()) {
931913
try {
932914
offerService();
@@ -1159,34 +1141,6 @@ private void sendLifeline() throws IOException {
11591141
}
11601142
}
11611143

1162-
class IBRTaskHandler implements Runnable {
1163-
1164-
@Override
1165-
public void run() {
1166-
LOG.info("Starting IBR Task Handler.");
1167-
while (shouldRun()) {
1168-
try {
1169-
final long startTime = scheduler.monotonicNow();
1170-
final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
1171-
if (!dn.areIBRDisabledForTests() &&
1172-
(ibrManager.sendImmediately() || sendHeartbeat)) {
1173-
synchronized (sendIBRLock) {
1174-
ibrManager.sendIBRs(bpNamenode, bpRegistration,
1175-
bpos.getBlockPoolId(), getRpcMetricSuffix());
1176-
}
1177-
}
1178-
// There is no work to do; sleep until heartbeat timer elapses,
1179-
// or work arrives, and then iterate again.
1180-
ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
1181-
} catch (Throwable t) {
1182-
LOG.error("Exception in IBRTaskHandler.", t);
1183-
sleepAndLogInterrupts(5000, "offering IBR service");
1184-
}
1185-
}
1186-
}
1187-
1188-
}
1189-
11901144
/**
11911145
* Utility class that wraps the timestamp computations for scheduling
11921146
* heartbeats and block reports.

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -172,19 +172,8 @@ public void testDatanodeReportMissingBlock() throws Exception {
172172
// all bad datanodes
173173
}
174174
cluster.triggerHeartbeats(); // IBR delete ack
175-
int retries = 0;
176-
while (true) {
177-
lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
178-
if (0 != lb.getLocations().length) {
179-
retries++;
180-
if (retries > 7) {
181-
Assert.fail("getLocatedBlocks failed after 7 retries");
182-
}
183-
Thread.sleep(2000);
184-
} else {
185-
break;
186-
}
187-
}
175+
lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
176+
assertEquals(0, lb.getLocations().length);
188177
} finally {
189178
cluster.shutdown();
190179
}
@@ -234,4 +223,4 @@ static DataNode findDatanode(String id, List<DataNode> datanodes) {
234223
throw new IllegalStateException("Datnode " + id + " not in datanode list: "
235224
+ datanodes);
236225
}
237-
}
226+
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
import java.io.IOException;
2727

28-
import org.mockito.exceptions.base.MockitoAssertionError;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
3130
import org.apache.hadoop.conf.Configuration;
@@ -157,7 +156,7 @@ public void testReportBlockDeleted() throws InterruptedException, IOException {
157156

158157
// Sleep for a very short time since IBR is generated
159158
// asynchronously.
160-
Thread.sleep(1000);
159+
Thread.sleep(2000);
161160

162161
// Ensure that no block report was generated immediately.
163162
// Deleted blocks are reported when the IBR timer elapses.
@@ -168,24 +167,13 @@ public void testReportBlockDeleted() throws InterruptedException, IOException {
168167

169168
// Trigger a heartbeat, this also triggers an IBR.
170169
DataNodeTestUtils.triggerHeartbeat(singletonDn);
170+
Thread.sleep(2000);
171171

172172
// Ensure that the deleted block is reported.
173-
int retries = 0;
174-
while (true) {
175-
try {
176-
Mockito.verify(nnSpy, atLeastOnce()).blockReceivedAndDeleted(
177-
any(DatanodeRegistration.class),
178-
anyString(),
179-
any(StorageReceivedDeletedBlocks[].class));
180-
break;
181-
} catch (MockitoAssertionError e) {
182-
if (retries > 7) {
183-
throw e;
184-
}
185-
retries++;
186-
Thread.sleep(2000);
187-
}
188-
}
173+
Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted(
174+
any(DatanodeRegistration.class),
175+
anyString(),
176+
any(StorageReceivedDeletedBlocks[].class));
189177

190178
} finally {
191179
cluster.shutdown();

0 commit comments

Comments
 (0)