Skip to content

Commit 15e8611

Browse files
HBASE-25848: Add flexibility to backup replication in case replication filter throws an exception (#3283)
* HBASE-25848: Add flexibility to backup replication in case replication filter throws an exception
1 parent 7c24ed4 commit 15e8611

File tree

3 files changed

+112
-3
lines changed

3 files changed

+112
-3
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,12 @@ public void run() {
152152
addBatchToShippingQueue(batch);
153153
}
154154
}
155-
} catch (IOException e) { // stream related
155+
} catch (WALEntryFilterRetryableException | IOException e) { // stream related
156156
if (handleEofException(e, batch)) {
157157
sleepMultiplier = 1;
158158
} else {
159-
LOG.warn("Failed to read stream of replication entries", e);
159+
LOG.warn("Failed to read stream of replication entries "
160+
+ "or replication filter is recovering", e);
160161
if (sleepMultiplier < maxRetriesMultiplier) {
161162
sleepMultiplier++;
162163
}
@@ -281,7 +282,7 @@ private void handleEmptyWALEntryBatch() throws InterruptedException {
281282
* logs from replication queue
282283
* @return true only the IOE can be handled
283284
*/
284-
private boolean handleEofException(IOException e, WALEntryBatch batch)
285+
private boolean handleEofException(Exception e, WALEntryBatch batch)
285286
throws InterruptedException {
286287
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
287288
// Dump the log even if logQueue size is 1 if the source is from recovered Source
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication.regionserver;
19+
20+
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
21+
import org.apache.yetus.audience.InterfaceAudience;
22+
23+
/**
24+
* This exception should be thrown from any wal filter when the filter is expected
25+
* to recover from the failures and it wants the replication to backup till it fails.
26+
* There is special handling in replication wal reader to catch this exception and
27+
* retry.
28+
*/
29+
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
30+
public class WALEntryFilterRetryableException extends RuntimeException {
31+
private static final long serialVersionUID = 1L;
32+
33+
public WALEntryFilterRetryableException(String m, Throwable t) {
34+
super(m, t);
35+
}
36+
37+
public WALEntryFilterRetryableException(String m) {
38+
super(m);
39+
}
40+
}

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ private static NavigableMap<byte[], Integer> getScopes() {
121121
public static void setUpBeforeClass() throws Exception {
122122
TEST_UTIL = new HBaseTestingUtility();
123123
CONF = TEST_UTIL.getConfiguration();
124+
CONF.setLong("replication.source.sleepforretries", 10);
124125
TEST_UTIL.startMiniDFSCluster(3);
125126

126127
cluster = TEST_UTIL.getDFSCluster();
@@ -413,6 +414,17 @@ private ReplicationSourceWALReader createReader(boolean recovered, Configuration
413414
return reader;
414415
}
415416

417+
private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures,
418+
Configuration conf) {
419+
ReplicationSource source = mockReplicationSource(false, conf);
420+
when(source.isPeerEnabled()).thenReturn(true);
421+
ReplicationSourceWALReader reader =
422+
new ReplicationSourceWALReader(fs, conf, logQueue, 0,
423+
getIntermittentFailingFilter(numFailures), source, fakeWalGroupId);
424+
reader.start();
425+
return reader;
426+
}
427+
416428
@Test
417429
public void testReplicationSourceWALReader() throws Exception {
418430
appendEntriesToLogAndSync(3);
@@ -445,6 +457,36 @@ public void testReplicationSourceWALReader() throws Exception {
445457
assertEquals("foo", getRow(entryBatch.getWalEntries().get(0)));
446458
}
447459

460+
@Test
461+
public void testReplicationSourceWALReaderWithFailingFilter() throws Exception {
462+
appendEntriesToLogAndSync(3);
463+
// get ending position
464+
long position;
465+
try (WALEntryStream entryStream =
466+
new WALEntryStream(logQueue, CONF, 0, log, null,
467+
new MetricsSource("1"), fakeWalGroupId)) {
468+
entryStream.next();
469+
entryStream.next();
470+
entryStream.next();
471+
position = entryStream.getPosition();
472+
}
473+
474+
// start up a reader
475+
Path walPath = getQueue().peek();
476+
int numFailuresInFilter = 5;
477+
ReplicationSourceWALReader reader = createReaderWithBadReplicationFilter(
478+
numFailuresInFilter, CONF);
479+
WALEntryBatch entryBatch = reader.take();
480+
assertEquals(numFailuresInFilter, FailingWALEntryFilter.numFailures());
481+
482+
// should've batched up our entries
483+
assertNotNull(entryBatch);
484+
assertEquals(3, entryBatch.getWalEntries().size());
485+
assertEquals(position, entryBatch.getLastWalPosition());
486+
assertEquals(walPath, entryBatch.getLastWalPath());
487+
assertEquals(3, entryBatch.getNbRowKeys());
488+
}
489+
448490
@Test
449491
public void testReplicationSourceWALReaderRecovered() throws Exception {
450492
appendEntriesToLogAndSync(10);
@@ -636,6 +678,32 @@ public Entry filter(Entry entry) {
636678
};
637679
}
638680

681+
private WALEntryFilter getIntermittentFailingFilter(int numFailuresInFilter) {
682+
return new FailingWALEntryFilter(numFailuresInFilter);
683+
}
684+
685+
public static class FailingWALEntryFilter implements WALEntryFilter {
686+
private int numFailures = 0;
687+
private static int countFailures = 0;
688+
689+
public FailingWALEntryFilter(int numFailuresInFilter) {
690+
numFailures = numFailuresInFilter;
691+
}
692+
693+
@Override
694+
public Entry filter(Entry entry) {
695+
if (countFailures == numFailures) {
696+
return entry;
697+
}
698+
countFailures = countFailures + 1;
699+
throw new WALEntryFilterRetryableException("failing filter");
700+
}
701+
702+
public static int numFailures(){
703+
return countFailures;
704+
}
705+
}
706+
639707
class PathWatcher implements WALActionsListener {
640708

641709
Path currentPath;

0 commit comments

Comments
 (0)