Skip to content

Commit 482ef69

Browse files
comnetworkApache9
authored andcommitted
HBASE-26488 Memory leak when MemStore retry flushing (#3899)
Signed-off-by: Duo Zhang <[email protected]>
1 parent d072a2e commit 482ef69

File tree

4 files changed

+148
-35
lines changed

4 files changed

+148
-35
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2404,7 +2404,6 @@ public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
24042404
long snapshotId = -1; // -1 means do not drop
24052405
if (dropMemstoreSnapshot && snapshot != null) {
24062406
snapshotId = snapshot.getId();
2407-
snapshot.close();
24082407
}
24092408
HStore.this.updateStorefiles(storeFiles, snapshotId);
24102409
}
@@ -2415,10 +2414,6 @@ public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
24152414
@Override
24162415
public void abort() throws IOException {
24172416
if (snapshot != null) {
2418-
//We need to close the snapshot when aborting, otherwise, the segment scanner
2419-
//won't be closed. If we are using MSLAB, the chunk referenced by those scanners
2420-
//can't be released, thus memory leak
2421-
snapshot.close();
24222417
HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId());
24232418
}
24242419
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ public String toString() {
8585
return res;
8686
}
8787

88+
/**
89+
* We create a new {@link SnapshotSegmentScanner} to increase the reference count of
90+
* {@link MemStoreLABImpl} used by this segment.
91+
*/
8892
List<KeyValueScanner> getSnapshotScanners() {
8993
return Collections.singletonList(new SnapshotSegmentScanner(this));
9094
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,38 @@
1717
*/
1818
package org.apache.hadoop.hbase.regionserver;
1919

20+
import java.util.List;
2021
import org.apache.yetus.audience.InterfaceAudience;
2122

22-
import java.io.Closeable;
23-
import java.util.List;
23+
2424
/**
25-
* Holds details of the snapshot taken on a MemStore. Details include the snapshot's identifier,
26-
* count of cells in it and total memory size occupied by all the cells, timestamp information of
27-
* all the cells and a scanner to read all cells in it.
25+
* {@link MemStoreSnapshot} is a Context Object to hold details of the snapshot taken on a MemStore.
26+
* Details include the snapshot's identifier, count of cells in it and total memory size occupied by
27+
* all the cells, timestamp information of all the cells and the snapshot immutableSegment.
28+
* <p>
29+
* NOTE:Every time when {@link MemStoreSnapshot#getScanners} is called, we create new
30+
* {@link SnapshotSegmentScanner}s on the {@link MemStoreSnapshot#snapshotImmutableSegment},and
31+
* {@link Segment#incScannerCount} is invoked in the {@link SnapshotSegmentScanner} ctor to increase
32+
* the reference count of {@link MemStoreLAB} which used by
33+
* {@link MemStoreSnapshot#snapshotImmutableSegment}, so after we finish using these scanners, we
34+
* must call their close method to invoke {@link Segment#decScannerCount}.
2835
*/
2936
@InterfaceAudience.Private
30-
public class MemStoreSnapshot implements Closeable {
37+
public class MemStoreSnapshot {
3138
private final long id;
3239
private final int cellsCount;
3340
private final MemStoreSize memStoreSize;
3441
private final TimeRangeTracker timeRangeTracker;
35-
private final List<KeyValueScanner> scanners;
3642
private final boolean tagsPresent;
43+
private final ImmutableSegment snapshotImmutableSegment;
3744

3845
public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
3946
this.id = id;
4047
this.cellsCount = snapshot.getCellsCount();
4148
this.memStoreSize = snapshot.getMemStoreSize();
4249
this.timeRangeTracker = snapshot.getTimeRangeTracker();
43-
this.scanners = snapshot.getSnapshotScanners();
4450
this.tagsPresent = snapshot.isTagsPresent();
51+
this.snapshotImmutableSegment = snapshot;
4552
}
4653

4754
/**
@@ -74,10 +81,16 @@ public TimeRangeTracker getTimeRangeTracker() {
7481
}
7582

7683
/**
77-
* @return {@link KeyValueScanner} for iterating over the snapshot
84+
* Create new {@link SnapshotSegmentScanner}s for iterating over the snapshot. <br/>
85+
* NOTE:Here when create new {@link SnapshotSegmentScanner}s, {@link Segment#incScannerCount} is
86+
* invoked in the {@link SnapshotSegmentScanner} ctor,so after we use these
87+
* {@link SnapshotSegmentScanner}s, we must call {@link SnapshotSegmentScanner#close} to invoke
88+
* {@link Segment#decScannerCount}.
89+
* @return {@link KeyValueScanner}s(Which type is {@link SnapshotSegmentScanner}) for iterating
90+
* over the snapshot.
7891
*/
7992
public List<KeyValueScanner> getScanners() {
80-
return scanners;
93+
return snapshotImmutableSegment.getSnapshotScanners();
8194
}
8295

8396
/**
@@ -86,13 +99,4 @@ public List<KeyValueScanner> getScanners() {
8699
public boolean isTagsPresent() {
87100
return this.tagsPresent;
88101
}
89-
90-
@Override
91-
public void close() {
92-
if (this.scanners != null) {
93-
for (KeyValueScanner scanner : scanners) {
94-
scanner.close();
95-
}
96-
}
97-
}
98102
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java

Lines changed: 121 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
104104
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
105105
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
106+
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
106107
import org.apache.hadoop.hbase.security.User;
107108
import org.apache.hadoop.hbase.testclassification.MediumTests;
108109
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -784,11 +785,12 @@ private void injectFault() throws IOException {
784785
}
785786
}
786787

787-
private static void flushStore(HStore store, long id) throws IOException {
788+
private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
788789
StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
789790
storeFlushCtx.prepare();
790791
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
791792
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
793+
return storeFlushCtx;
792794
}
793795

794796
/**
@@ -2236,7 +2238,7 @@ public void testClearSnapshotGetScannerConcurrently() throws Exception {
22362238
flushThread.join();
22372239

22382240
if (myDefaultMemStore.shouldWait) {
2239-
SegmentScanner segmentScanner = getSegmentScanner(storeScanner);
2241+
SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
22402242
MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
22412243
assertTrue(memStoreLAB.isClosed());
22422244
assertTrue(!memStoreLAB.chunks.isEmpty());
@@ -2263,16 +2265,16 @@ public void testClearSnapshotGetScannerConcurrently() throws Exception {
22632265
}
22642266
}
22652267

2266-
private SegmentScanner getSegmentScanner(StoreScanner storeScanner) {
2267-
List<SegmentScanner> segmentScanners = new ArrayList<SegmentScanner>();
2268+
@SuppressWarnings("unchecked")
2269+
private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
2270+
List<T> resultScanners = new ArrayList<T>();
22682271
for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
2269-
if (keyValueScanner instanceof SegmentScanner) {
2270-
segmentScanners.add((SegmentScanner) keyValueScanner);
2272+
if (keyValueScannerClass.isInstance(keyValueScanner)) {
2273+
resultScanners.add((T) keyValueScanner);
22712274
}
22722275
}
2273-
2274-
assertTrue(segmentScanners.size() == 1);
2275-
return segmentScanners.get(0);
2276+
assertTrue(resultScanners.size() == 1);
2277+
return resultScanners.get(0);
22762278
}
22772279

22782280
@Test
@@ -2324,6 +2326,116 @@ public CustomDefaultMemStore(Configuration conf, CellComparator c,
23242326

23252327
}
23262328

2329+
/**
2330+
* This test is for HBASE-26488
2331+
*/
2332+
@Test
2333+
public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {
2334+
2335+
Configuration conf = HBaseConfiguration.create();
2336+
2337+
byte[] smallValue = new byte[3];
2338+
byte[] largeValue = new byte[9];
2339+
final long timestamp = EnvironmentEdgeManager.currentTime();
2340+
final long seqId = 100;
2341+
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2342+
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2343+
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2344+
quals.add(qf1);
2345+
quals.add(qf2);
2346+
2347+
conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
2348+
conf.setBoolean(WALFactory.WAL_ENABLED, false);
2349+
conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
2350+
MyDefaultStoreFlusher.class.getName());
2351+
2352+
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2353+
MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
2354+
assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);
2355+
2356+
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2357+
store.add(smallCell, memStoreSizing);
2358+
store.add(largeCell, memStoreSizing);
2359+
flushStore(store, id++);
2360+
2361+
MemStoreLABImpl memStoreLAB =
2362+
(MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
2363+
assertTrue(memStoreLAB.isClosed());
2364+
assertTrue(memStoreLAB.getOpenScannerCount() == 0);
2365+
assertTrue(memStoreLAB.isReclaimed());
2366+
assertTrue(memStoreLAB.chunks.isEmpty());
2367+
StoreScanner storeScanner = null;
2368+
try {
2369+
storeScanner =
2370+
(StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2371+
assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
2372+
assertTrue(store.memstore.size().getCellsCount() == 0);
2373+
assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
2374+
assertTrue(storeScanner.currentScanners.size() == 1);
2375+
assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);
2376+
2377+
List<Cell> results = new ArrayList<>();
2378+
storeScanner.next(results);
2379+
assertEquals(2, results.size());
2380+
CellUtil.equals(smallCell, results.get(0));
2381+
CellUtil.equals(largeCell, results.get(1));
2382+
} finally {
2383+
if (storeScanner != null) {
2384+
storeScanner.close();
2385+
}
2386+
}
2387+
}
2388+
2389+
2390+
static class MyDefaultMemStore1 extends DefaultMemStore {
2391+
2392+
private ImmutableSegment snapshotImmutableSegment;
2393+
2394+
public MyDefaultMemStore1(Configuration conf, CellComparator c,
2395+
RegionServicesForStores regionServices) {
2396+
super(conf, c, regionServices);
2397+
}
2398+
2399+
@Override
2400+
public MemStoreSnapshot snapshot() {
2401+
MemStoreSnapshot result = super.snapshot();
2402+
this.snapshotImmutableSegment = snapshot;
2403+
return result;
2404+
}
2405+
2406+
}
2407+
2408+
public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
2409+
private static final AtomicInteger failCounter = new AtomicInteger(1);
2410+
private static final AtomicInteger counter = new AtomicInteger(0);
2411+
2412+
public MyDefaultStoreFlusher(Configuration conf, HStore store) {
2413+
super(conf, store);
2414+
}
2415+
2416+
@Override
2417+
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
2418+
MonitoredTask status, ThroughputController throughputController,
2419+
FlushLifeCycleTracker tracker) throws IOException {
2420+
counter.incrementAndGet();
2421+
return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
2422+
}
2423+
2424+
@Override
2425+
protected void performFlush(InternalScanner scanner, final CellSink sink,
2426+
ThroughputController throughputController) throws IOException {
2427+
2428+
final int currentCount = counter.get();
2429+
CellSink newCellSink = (cell) -> {
2430+
if (currentCount <= failCounter.get()) {
2431+
throw new IOException("Simulated exception by tests");
2432+
}
2433+
sink.append(cell);
2434+
};
2435+
super.performFlush(scanner, newCellSink, throughputController);
2436+
}
2437+
}
2438+
23272439
private HStoreFile mockStoreFileWithLength(long length) {
23282440
HStoreFile sf = mock(HStoreFile.class);
23292441
StoreFileReader sfr = mock(StoreFileReader.class);
@@ -3107,7 +3219,5 @@ protected void doClearSnapShot() {
31073219
}
31083220
}
31093221
}
3110-
3111-
31123222
}
31133223
}

0 commit comments

Comments
 (0)