Skip to content

Commit 0a8d066

Browse files
wchevreuilJenkins
authored andcommitted
HBASE-24813 ReplicationSource should clear buffer usage on Replicatio… (apache#2546) (apache#2849)
Signed-off-by: Ankit Singhal <[email protected]> Signed-off-by: Josh Elser <[email protected]> (cherry picked from commit fdae12d) (cherry picked from commit 3242c8a) Change-Id: I8552da6cb7b37271204e255a6ca96a8af544da48
1 parent 185f034 commit 0a8d066

File tree

4 files changed

+123
-23
lines changed

4 files changed

+123
-23
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool
575575
Threads.shutdown(initThread, this.sleepForRetries);
576576
}
577577
Collection<ReplicationSourceShipper> workers = workerThreads.values();
578+
578579
for (ReplicationSourceShipper worker : workers) {
579580
worker.stopWorker();
580581
if(worker.entryReader != null) {
@@ -585,6 +586,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool
585586
if (this.replicationEndpoint != null) {
586587
this.replicationEndpoint.stop();
587588
}
589+
588590
for (ReplicationSourceShipper worker : workers) {
589591
if (worker.isAlive() || worker.entryReader.isAlive()) {
590592
try {
@@ -603,6 +605,9 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool
603605
worker.entryReader.interrupt();
604606
}
605607
}
608+
//If worker is already stopped but there was still entries batched,
609+
//we need to clear buffer used for non processed entries
610+
worker.clearWALEntryBatch();
606611
}
607612

608613
if (join) {

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.io.IOException;
2222
import java.util.List;
2323
import java.util.concurrent.PriorityBlockingQueue;
24+
import java.util.concurrent.atomic.LongAccumulator;
25+
2426
import org.apache.hadoop.conf.Configuration;
2527
import org.apache.hadoop.fs.Path;
2628
import org.apache.hadoop.hbase.Cell;
@@ -55,7 +57,7 @@ public enum WorkerState {
5557
private final Configuration conf;
5658
protected final String walGroupId;
5759
protected final PriorityBlockingQueue<Path> queue;
58-
private final ReplicationSourceInterface source;
60+
private final ReplicationSource source;
5961

6062
// Last position in the log that we sent to ZooKeeper
6163
// It will be accessed by the stats thread so make it volatile
@@ -75,7 +77,7 @@ public enum WorkerState {
7577
private final int shipEditsTimeout;
7678

7779
public ReplicationSourceShipper(Configuration conf, String walGroupId,
78-
PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
80+
PriorityBlockingQueue<Path> queue, ReplicationSource source) {
7981
this.conf = conf;
8082
this.walGroupId = walGroupId;
8183
this.queue = queue;
@@ -125,6 +127,7 @@ public final void run() {
125127
if (!isFinished()) {
126128
setWorkerState(WorkerState.STOPPED);
127129
} else {
130+
source.workerThreads.remove(this.walGroupId);
128131
postFinish();
129132
}
130133
}
@@ -330,4 +333,56 @@ public boolean sleepForRetries(String msg, int sleepMultiplier) {
330333
}
331334
return sleepMultiplier < maxRetriesMultiplier;
332335
}
336+
337+
/**
338+
* Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>,
339+
* in case there were unprocessed entries batched by the reader to the shipper,
340+
* but the shipper didn't manage to ship those because the replication source is being terminated.
341+
* In that case, it iterates through the batched entries and decrease the pending
342+
* entries size from <code>ReplicationSourceManager.totalBufferUser</code>
343+
* <p/>
344+
* <b>NOTES</b>
345+
* 1) This method should only be called upon replication source termination.
346+
* It blocks waiting for both shipper and reader threads termination,
347+
* to make sure no race conditions
348+
* when updating <code>ReplicationSourceManager.totalBufferUser</code>.
349+
*
350+
* 2) It <b>does not</b> attempt to terminate reader and shipper threads. Those <b>must</b>
351+
* have been triggered interruption/termination prior to calling this method.
352+
*/
353+
void clearWALEntryBatch() {
354+
long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
355+
while(this.isAlive() || this.entryReader.isAlive()){
356+
try {
357+
if (System.currentTimeMillis() >= timeout) {
358+
LOG.warn("Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper "
359+
+ "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}",
360+
this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive());
361+
return;
362+
} else {
363+
// Wait both shipper and reader threads to stop
364+
Thread.sleep(this.sleepForRetries);
365+
}
366+
} catch (InterruptedException e) {
367+
LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. "
368+
+ "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e);
369+
return;
370+
}
371+
}
372+
LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
373+
entryReader.entryBatchQueue.forEach(w -> {
374+
entryReader.entryBatchQueue.remove(w);
375+
w.getWalEntries().forEach(e -> {
376+
long entrySizeExcludeBulkLoad = ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(e);
377+
totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
378+
});
379+
});
380+
if( LOG.isTraceEnabled()) {
381+
LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
382+
totalToDecrement.longValue());
383+
}
384+
long newBufferUsed = source.getSourceManager().getTotalBufferUsed()
385+
.addAndGet(-totalToDecrement.longValue());
386+
source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
387+
}
333388
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ class ReplicationSourceWALReader extends Thread {
6060
private final WALEntryFilter filter;
6161
private final ReplicationSource source;
6262

63-
private final BlockingQueue<WALEntryBatch> entryBatchQueue;
63+
@InterfaceAudience.Private
64+
final BlockingQueue<WALEntryBatch> entryBatchQueue;
6465
// max (heap) size of each batch - multiply by number of batches in queue to get total
6566
private final long replicationBatchSizeCapacity;
6667
// max count of each batch - multiply by number of batches in queue to get total

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java

Lines changed: 59 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
@@ -20,8 +20,11 @@
2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertNotNull;
2222
import static org.junit.Assert.assertNull;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.when;
2325

2426
import java.io.IOException;
27+
import java.util.ArrayList;
2528
import java.util.OptionalLong;
2629
import java.util.concurrent.ExecutorService;
2730
import java.util.concurrent.Executors;
@@ -31,6 +34,7 @@
3134
import org.apache.hadoop.conf.Configuration;
3235
import org.apache.hadoop.fs.FileSystem;
3336
import org.apache.hadoop.fs.Path;
37+
import org.apache.hadoop.hbase.Cell;
3438
import org.apache.hadoop.hbase.HBaseClassTestRule;
3539
import org.apache.hadoop.hbase.HBaseConfiguration;
3640
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -48,12 +52,6 @@
4852
import org.apache.hadoop.hbase.replication.ReplicationPeer;
4953
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
5054
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
51-
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
52-
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
53-
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper;
54-
import org.apache.hadoop.hbase.replication.regionserver.Replication;
55-
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
56-
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
5755
import org.apache.hadoop.hbase.testclassification.MediumTests;
5856
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
5957
import org.apache.hadoop.hbase.util.Bytes;
@@ -142,7 +140,7 @@ public void testLogMoving() throws Exception{
142140
entry = reader.next();
143141
assertNotNull(entry);
144142

145-
entry = reader.next();
143+
reader.next();
146144
entry = reader.next();
147145

148146
assertNull(entry);
@@ -168,12 +166,12 @@ protected void doStop() {
168166
}
169167
};
170168
replicationEndpoint.start();
171-
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
172-
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
169+
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
170+
when(mockPeer.getPeerBandwidth()).thenReturn(0L);
173171
Configuration testConf = HBaseConfiguration.create();
174172
testConf.setInt("replication.source.maxretriesmultiplier", 1);
175-
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
176-
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
173+
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
174+
when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
177175
source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
178176
p -> OptionalLong.empty(), null);
179177
ExecutorService executor = Executors.newSingleThreadExecutor();
@@ -194,6 +192,47 @@ public boolean evaluate() throws Exception {
194192
});
195193
}
196194

195+
@Test
196+
public void testTerminateClearsBuffer() throws Exception {
197+
ReplicationSource source = new ReplicationSource();
198+
ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class);
199+
MetricsReplicationGlobalSourceSource mockMetrics =
200+
mock(MetricsReplicationGlobalSourceSource.class);
201+
AtomicLong buffer = new AtomicLong();
202+
when(mockManager.getTotalBufferUsed()).thenReturn(buffer);
203+
when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics);
204+
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
205+
when(mockPeer.getPeerBandwidth()).thenReturn(0L);
206+
Configuration testConf = HBaseConfiguration.create();
207+
source.init(testConf, null, mockManager, null, mockPeer, null,
208+
"testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
209+
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
210+
conf, null, 0, null, source);
211+
ReplicationSourceShipper shipper =
212+
new ReplicationSourceShipper(conf, null, null, source);
213+
shipper.entryReader = reader;
214+
source.workerThreads.put("testPeer", shipper);
215+
WALEntryBatch batch = new WALEntryBatch(10, logDir);
216+
WAL.Entry mockEntry = mock(WAL.Entry.class);
217+
WALEdit mockEdit = mock(WALEdit.class);
218+
WALKeyImpl mockKey = mock(WALKeyImpl.class);
219+
when(mockEntry.getEdit()).thenReturn(mockEdit);
220+
when(mockEdit.isEmpty()).thenReturn(false);
221+
when(mockEntry.getKey()).thenReturn(mockKey);
222+
when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
223+
when(mockEdit.heapSize()).thenReturn(10000L);
224+
when(mockEdit.size()).thenReturn(0);
225+
ArrayList<Cell> cells = new ArrayList<>();
226+
KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"),
227+
Bytes.toBytes("1"), Bytes.toBytes("v1"));
228+
cells.add(kv);
229+
when(mockEdit.getCells()).thenReturn(cells);
230+
reader.addEntryToBatch(batch, mockEntry);
231+
reader.entryBatchQueue.put(batch);
232+
source.terminate("test");
233+
assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
234+
}
235+
197236
/**
198237
* Tests that recovered queues are preserved on a regionserver shutdown.
199238
* See HBASE-18192
@@ -303,15 +342,15 @@ public void testRecoveredReplicationSourceShipperGetPosition() throws Exception
303342
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
304343
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
305344
queue.put(new Path("/www/html/test"));
306-
RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
307-
Server server = Mockito.mock(Server.class);
308-
Mockito.when(server.getServerName()).thenReturn(serverName);
309-
Mockito.when(source.getServer()).thenReturn(server);
310-
Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
311-
ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
312-
Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
345+
RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
346+
Server server = mock(Server.class);
347+
when(server.getServerName()).thenReturn(serverName);
348+
when(source.getServer()).thenReturn(server);
349+
when(source.getServerWALsBelongTo()).thenReturn(deadServer);
350+
ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class);
351+
when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
313352
.thenReturn(1001L);
314-
Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
353+
when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
315354
.thenReturn(-1L);
316355
conf.setInt("replication.source.maxretriesmultiplier", -1);
317356
RecoveredReplicationSourceShipper shipper =

0 commit comments

Comments
 (0)