Skip to content

Commit 1e1aaa2

Browse files
committed
HBASE-27881 The sleep time in checkQuota of replication WAL reader should be controlled independently
1 parent 22526a6 commit 1e1aaa2

File tree

1 file changed

+12
-9
lines changed

1 file changed

+12
-9
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class ReplicationSourceWALReader extends Thread {
6969
// position in the WAL to start reading at
7070
private long currentPosition;
7171
private final long sleepForRetries;
72+
private final long sleepForQuotaCheck;
7273
private final int maxRetriesMultiplier;
7374

7475
// Indicates whether this particular worker is running
@@ -102,6 +103,8 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
102103
int batchCount = conf.getInt("replication.source.nb.batches", 1);
103104
// 1 second
104105
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
106+
// 300ms
107+
this.sleepForQuotaCheck = this.conf.getLong("replication.source.sleepforquotacheck", 300);
105108
// 5 minutes @ 1 sec per
106109
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
107110
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
@@ -140,9 +143,7 @@ public void run() {
140143
Threads.sleep(sleepForRetries);
141144
continue;
142145
}
143-
if (!checkBufferQuota()) {
144-
continue;
145-
}
146+
checkBufferQuota();
146147
Path currentPath = entryStream.getCurrentPath();
147148
WALEntryStream.HasNext hasNext = entryStream.hasNext();
148149
if (hasNext == WALEntryStream.HasNext.NO) {
@@ -266,14 +267,16 @@ public Path getCurrentPath() {
266267
return logQueue.getQueue(walGroupId).peek();
267268
}
268269

269-
// returns false if we've already exceeded the global quota
270-
private boolean checkBufferQuota() {
270+
// sleeping when we've already exceeded the global quota
271+
private void checkBufferQuota() {
271272
// try not to go over total quota
272-
if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) {
273-
Threads.sleep(sleepForRetries);
274-
return false;
273+
while (
274+
!this.getSourceManager().checkBufferQuota(this.source.getPeerId()) && isReaderRunning()
275+
) {
276+
LOG.warn("PeerId={}, sleep {}ms for source reader, current WAL is {}",
277+
this.source.getPeerId(), sleepForQuotaCheck, this.getCurrentPath());
278+
Threads.sleep(sleepForQuotaCheck);
275279
}
276-
return true;
277280
}
278281

279282
private WALEntryBatch createBatch(WALEntryStream entryStream) {

0 commit comments

Comments
 (0)