Skip to content

Commit edba38c

Browse files
joshelsertamasadami
authored andcommitted
HBASE-24779 Report on the WAL edit buffer usage/limit for replication
Closes apache#2193 Signed-off-by: Bharath Vissapragada <[email protected]> Signed-off-by: Sean Busbey <[email protected]> Signed-off-by: Wellington Chevreuil <[email protected]> (cherry picked from commit 303db63) Change-Id: If1b3662792304747d2942dbc52338e2108b1a764
1 parent b351d31 commit edba38c

File tree

12 files changed

+163
-18
lines changed

12 files changed

+163
-18
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication.regionserver;
19+
20+
import org.apache.yetus.audience.InterfaceAudience;
21+
22+
@InterfaceAudience.Private
23+
public interface MetricsReplicationGlobalSourceSource extends MetricsReplicationSourceSource {
24+
25+
public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage";
26+
27+
/**
28+
* Sets the total usage of memory used by edits in memory read from WALs. The memory represented
29+
* by this usage measure is across peers/sources. For example, we may batch the same WAL edits
30+
* multiple times for the sake of replicating them to multiple peers..
31+
* @param usage The memory used by edits in bytes
32+
*/
33+
void setWALReaderEditsBufferBytes(long usage);
34+
35+
/**
36+
* Returns the size, in bytes, of edits held in memory to be replicated across all peers.
37+
*/
38+
long getWALReaderEditsBufferBytes();
39+
}

hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ public interface MetricsReplicationSourceFactory {
2525
public MetricsReplicationSinkSource getSink();
2626
public MetricsReplicationSourceSource getSource(String id);
2727
public MetricsReplicationTableSource getTableSource(String tableName);
28-
public MetricsReplicationSourceSource getGlobalSource();
28+
public MetricsReplicationGlobalSourceSource getGlobalSource();
2929
}
Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
import org.apache.yetus.audience.InterfaceAudience;
2525

2626
@InterfaceAudience.Private
27-
public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{
27+
public class MetricsReplicationGlobalSourceSourceImpl
28+
implements MetricsReplicationGlobalSourceSource {
2829
private static final String KEY_PREFIX = "source.";
2930

3031
private final MetricsReplicationSourceImpl rms;
@@ -53,8 +54,9 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
5354
private final MutableFastCounter completedWAL;
5455
private final MutableFastCounter completedRecoveryQueue;
5556
private final MutableFastCounter failedRecoveryQueue;
57+
private final MutableGaugeLong walReaderBufferUsageBytes;
5658

57-
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
59+
public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) {
5860
this.rms = rms;
5961

6062
ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
@@ -92,6 +94,8 @@ public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
9294
.getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
9395
failedRecoveryQueue = rms.getMetricsRegistry()
9496
.getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
97+
walReaderBufferUsageBytes = rms.getMetricsRegistry()
98+
.getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
9599
}
96100

97101
@Override public void setLastShippedAge(long age) {
@@ -260,4 +264,14 @@ public String getMetricsJmxContext() {
260264
public String getMetricsName() {
261265
return rms.getMetricsName();
262266
}
267+
268+
@Override
269+
public void setWALReaderEditsBufferBytes(long usage) {
270+
this.walReaderBufferUsageBytes.set(usage);
271+
}
272+
273+
@Override
274+
public long getWALReaderEditsBufferBytes() {
275+
return this.walReaderBufferUsageBytes.value();
276+
}
263277
}

hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private static enum SourceHolder {
3939
return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName);
4040
}
4141

42-
@Override public MetricsReplicationSourceSource getGlobalSource() {
43-
return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source);
42+
@Override public MetricsReplicationGlobalSourceSource getGlobalSource() {
43+
return new MetricsReplicationGlobalSourceSourceImpl(SourceHolder.INSTANCE.source);
4444
}
4545
}

hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri
162162

163163
@Override public void incrShippedBytes(long size) {
164164
shippedBytesCounter.incr(size);
165-
MetricsReplicationGlobalSourceSource
165+
MetricsReplicationGlobalSourceSourceImpl
166166
.incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
167167
}
168168

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class MetricsSource implements BaseSource {
5050
private String id;
5151

5252
private final MetricsReplicationSourceSource singleSourceSource;
53-
private final MetricsReplicationSourceSource globalSourceSource;
53+
private final MetricsReplicationGlobalSourceSource globalSourceSource;
5454
private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
5555

5656
/**
@@ -74,7 +74,7 @@ public MetricsSource(String id) {
7474
* @param globalSourceSource Class to monitor global-scoped metrics
7575
*/
7676
public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
77-
MetricsReplicationSourceSource globalSourceSource,
77+
MetricsReplicationGlobalSourceSource globalSourceSource,
7878
Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
7979
this.id = id;
8080
this.singleSourceSource = singleSourceSource;
@@ -422,4 +422,19 @@ public String getMetricsName() {
422422
public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
423423
return singleSourceSourceByTable;
424424
}
425+
426+
/**
427+
* Sets the amount of memory in bytes used in this RegionServer by edits pending replication.
428+
*/
429+
public void setWALReaderEditsBufferUsage(long usageInBytes) {
430+
globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
431+
}
432+
433+
/**
434+
* Returns the amount of memory in bytes used in this RegionServer by edits pending replication.
435+
* @return
436+
*/
437+
public long getWALReaderEditsBufferUsage() {
438+
return globalSourceSource.getWALReaderEditsBufferBytes();
439+
}
425440
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hadoop.fs.FileSystem;
3030
import org.apache.hadoop.fs.Path;
3131
import org.apache.hadoop.hbase.CellScanner;
32+
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
3233
import org.apache.hadoop.hbase.HConstants;
3334
import org.apache.hadoop.hbase.Server;
3435
import org.apache.hadoop.hbase.TableName;
@@ -72,6 +73,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
7273
private int statsThreadPeriod;
7374
// ReplicationLoad to access replication metrics
7475
private ReplicationLoad replicationLoad;
76+
private MetricsReplicationGlobalSourceSource globalMetricsSource;
7577

7678
private PeerProcedureHandler peerProcedureHandler;
7779

@@ -119,9 +121,12 @@ public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir
119121
} catch (KeeperException ke) {
120122
throw new IOException("Could not read cluster id", ke);
121123
}
124+
this.globalMetricsSource = CompatibilitySingletonFactory
125+
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
122126
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf,
123127
this.server, fs, logDir, oldLogDir, clusterId,
124-
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty());
128+
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
129+
globalMetricsSource);
125130
if (walProvider != null) {
126131
walProvider
127132
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -706,7 +706,9 @@ public void postShipEdits(List<Entry> entries, int batchSize) {
706706
throttler.addPushSize(batchSize);
707707
}
708708
totalReplicatedEdits.addAndGet(entries.size());
709-
totalBufferUsed.addAndGet(-batchSize);
709+
long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
710+
// Record the new buffer usage
711+
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
710712
}
711713

712714
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ public class ReplicationSourceManager implements ReplicationListener {
155155

156156

157157
private AtomicLong totalBufferUsed = new AtomicLong();
158+
// Total buffer size on this RegionServer for holding batched edits to be shipped.
159+
private final long totalBufferLimit;
160+
private final MetricsReplicationGlobalSourceSource globalMetrics;
158161

159162
/**
160163
* Creates a replication manager and sets the watch on all the other registered region servers
@@ -171,7 +174,8 @@ public class ReplicationSourceManager implements ReplicationListener {
171174
public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
172175
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
173176
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
174-
WALFileLengthProvider walFileLengthProvider) throws IOException {
177+
WALFileLengthProvider walFileLengthProvider,
178+
MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
175179
// CopyOnWriteArrayList is thread-safe.
176180
// Generally, reading is more than modifying.
177181
this.sources = new ConcurrentHashMap<>();
@@ -205,6 +209,9 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
205209
this.latestPaths = new HashSet<Path>();
206210
replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
207211
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
212+
this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
213+
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
214+
this.globalMetrics = globalMetrics;
208215
}
209216

210217
/**
@@ -861,6 +868,14 @@ public AtomicLong getTotalBufferUsed() {
861868
return totalBufferUsed;
862869
}
863870

871+
/**
872+
* Returns the maximum size in bytes of edits held in memory which are pending replication
873+
* across all sources inside this RegionServer.
874+
*/
875+
public long getTotalBufferLimit() {
876+
return totalBufferLimit;
877+
}
878+
864879
/**
865880
* Get the directory where wals are archived
866881
* @return the directory where wals are archived
@@ -898,6 +913,10 @@ public ReplicationPeers getReplicationPeers() {
898913
*/
899914
public String getStats() {
900915
StringBuilder stats = new StringBuilder();
916+
// Print stats that apply across all Replication Sources
917+
stats.append("Global stats: ");
918+
stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
919+
.append(getTotalBufferLimit()).append("B\n");
901920
for (ReplicationSourceInterface source : this.sources.values()) {
902921
stats.append("Normal source for cluster " + source.getPeerId() + ": ");
903922
stats.append(source.getStats() + "\n");
@@ -923,4 +942,8 @@ public void cleanUpHFileRefs(String peerId, List<String> files) {
923942
int activeFailoverTaskCount() {
924943
return executor.getActiveCount();
925944
}
945+
946+
MetricsReplicationGlobalSourceSource getGlobalMetrics() {
947+
return this.globalMetrics;
948+
}
926949
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.hadoop.fs.Path;
3232
import org.apache.hadoop.hbase.Cell;
3333
import org.apache.hadoop.hbase.CellUtil;
34-
import org.apache.hadoop.hbase.HConstants;
3534
import org.apache.hadoop.hbase.replication.WALEntryFilter;
3635
import org.apache.hadoop.hbase.util.Pair;
3736
import org.apache.hadoop.hbase.util.Threads;
@@ -104,8 +103,7 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
104103
// the +1 is for the current thread reading before placing onto the queue
105104
int batchCount = conf.getInt("replication.source.nb.batches", 1);
106105
this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
107-
this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
108-
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
106+
this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
109107
this.sleepForRetries =
110108
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
111109
this.maxRetriesMultiplier =
@@ -272,6 +270,8 @@ public Path getCurrentPath() {
272270
private boolean checkQuota() {
273271
// try not to go over total quota
274272
if (totalBufferUsed.get() > totalBufferQuota) {
273+
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
274+
this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
275275
Threads.sleep(sleepForRetries);
276276
return false;
277277
}
@@ -399,7 +399,10 @@ private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
399399
* @return true if we should clear buffer and push all
400400
*/
401401
private boolean acquireBufferQuota(long size) {
402-
return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
402+
long newBufferUsed = totalBufferUsed.addAndGet(size);
403+
// Record the new buffer usage
404+
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
405+
return newBufferUsed >= totalBufferQuota;
403406
}
404407

405408
/**

0 commit comments

Comments
 (0)