Skip to content

Commit 0c85952

Browse files
authored
HBASE-27458 Use ReadWriteLock for region scanner readpoint map (#4859) (#5073)
Signed-off-by: Duo Zhang <[email protected]>
1 parent 13e06c7 commit 0c85952

File tree

3 files changed

+105
-9
lines changed

3 files changed

+105
-9
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,7 @@ public void setRestoredRegion(boolean restoredRegion) {
397397
final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
398398

399399
final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
400+
final ReadPointCalculationLock smallestReadPointCalcLock;
400401

401402
/**
402403
* The sequence ID that was enLongAddered when this region was opened.
@@ -435,19 +436,18 @@ public void setRestoredRegion(boolean restoredRegion) {
435436
* this readPoint, are included in every read operation.
436437
*/
437438
public long getSmallestReadPoint() {
438-
long minimumReadPoint;
439439
// We need to ensure that while we are calculating the smallestReadPoint
440440
// no new RegionScanners can grab a readPoint that we are unaware of.
441-
// We achieve this by synchronizing on the scannerReadPoints object.
442-
synchronized (scannerReadPoints) {
443-
minimumReadPoint = mvcc.getReadPoint();
441+
smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.CALCULATION_LOCK);
442+
try {
443+
long minimumReadPoint = mvcc.getReadPoint();
444444
for (Long readPoint : this.scannerReadPoints.values()) {
445-
if (readPoint < minimumReadPoint) {
446-
minimumReadPoint = readPoint;
447-
}
445+
minimumReadPoint = Math.min(minimumReadPoint, readPoint);
448446
}
447+
return minimumReadPoint;
448+
} finally {
449+
smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.CALCULATION_LOCK);
449450
}
450-
return minimumReadPoint;
451451
}
452452

453453
/*
@@ -789,6 +789,8 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co
789789
}
790790
this.rowLockWaitDuration = tmpRowLockDuration;
791791

792+
this.smallestReadPointCalcLock = new ReadPointCalculationLock(conf);
793+
792794
this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
793795
this.htableDescriptor = htd;
794796
Set<byte[]> families = this.htableDescriptor.getColumnFamilyNames();
@@ -7832,6 +7834,7 @@ void checkFamily(final byte[] family) throws NoSuchColumnFamilyException {
78327834
// 1 x RegionSplitPolicy - splitPolicy
78337835
// 1 x MetricsRegion - metricsRegion
78347836
// 1 x MetricsRegionWrapperImpl - metricsRegionWrapper
7837+
// 1 x ReadPointCalculationLock - smallestReadPointCalcLock
78357838
public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.OBJECT + // closeLock
78367839
(2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
78377840
(3 * ClassSize.ATOMIC_LONG) + // numPutsWithoutWAL, dataInMemoryWithoutWAL,
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver;
19+
20+
import java.util.concurrent.locks.Lock;
21+
import java.util.concurrent.locks.ReadWriteLock;
22+
import java.util.concurrent.locks.ReentrantLock;
23+
import java.util.concurrent.locks.ReentrantReadWriteLock;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.yetus.audience.InterfaceAudience;
26+
27+
/**
28+
* Lock to manage concurrency between {@link RegionScanner} and
29+
* {@link HRegion#getSmallestReadPoint()}. We need to ensure that while we are calculating the
30+
* smallest read point, no new scanners can modify the scannerReadPoints Map. We used to achieve
31+
* this by synchronizing on the scannerReadPoints object. But this may block the read thread and
32+
* reduce the read performance. Since the scannerReadPoints object is a
33+
* {@link java.util.concurrent.ConcurrentHashMap}, which is thread-safe, so the
34+
* {@link RegionScanner} can record their read points concurrently, what it needs to do is just
35+
* acquiring a shared lock. When we calculate the smallest read point, we need to acquire an
36+
* exclusive lock. This can improve read performance in most scenarios, only not when we have a lot
37+
* of delta operations, like {@link org.apache.hadoop.hbase.client.Append} or
38+
* {@link org.apache.hadoop.hbase.client.Increment}. So we introduce a flag to enable/disable this
39+
* feature.
40+
*/
41+
@InterfaceAudience.Private
42+
public class ReadPointCalculationLock {
43+
44+
public enum LockType {
45+
CALCULATION_LOCK,
46+
RECORDING_LOCK
47+
}
48+
49+
private final boolean useReadWriteLockForReadPoints;
50+
private Lock lock;
51+
private ReadWriteLock readWriteLock;
52+
53+
ReadPointCalculationLock(Configuration conf) {
54+
this.useReadWriteLockForReadPoints =
55+
conf.getBoolean("hbase.region.readpoints.read.write.lock.enable", false);
56+
if (useReadWriteLockForReadPoints) {
57+
readWriteLock = new ReentrantReadWriteLock();
58+
} else {
59+
lock = new ReentrantLock();
60+
}
61+
}
62+
63+
void lock(LockType lockType) {
64+
if (useReadWriteLockForReadPoints) {
65+
assert lock == null;
66+
if (lockType == LockType.CALCULATION_LOCK) {
67+
readWriteLock.writeLock().lock();
68+
} else {
69+
readWriteLock.readLock().lock();
70+
}
71+
} else {
72+
assert readWriteLock == null;
73+
lock.lock();
74+
}
75+
}
76+
77+
void unlock(LockType lockType) {
78+
if (useReadWriteLockForReadPoints) {
79+
assert lock == null;
80+
if (lockType == LockType.CALCULATION_LOCK) {
81+
readWriteLock.writeLock().unlock();
82+
} else {
83+
readWriteLock.readLock().unlock();
84+
}
85+
} else {
86+
assert readWriteLock == null;
87+
lock.unlock();
88+
}
89+
}
90+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ private static boolean hasNonce(HRegion region, long nonce) {
130130
long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);
131131
this.scannerReadPoints = region.scannerReadPoints;
132132
this.rsServices = region.getRegionServerServices();
133-
synchronized (scannerReadPoints) {
133+
region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
134+
try {
134135
if (mvccReadPoint > 0) {
135136
this.readPt = mvccReadPoint;
136137
} else if (hasNonce(region, nonce)) {
@@ -139,6 +140,8 @@ private static boolean hasNonce(HRegion region, long nonce) {
139140
this.readPt = region.getReadPoint(isolationLevel);
140141
}
141142
scannerReadPoints.put(this, this.readPt);
143+
} finally {
144+
region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
142145
}
143146
initializeScanners(scan, additionalScanners);
144147
}

0 commit comments

Comments
 (0)