102102import org .apache .hadoop .hbase .regionserver .compactions .DefaultCompactor ;
103103import org .apache .hadoop .hbase .regionserver .querymatcher .ScanQueryMatcher ;
104104import org .apache .hadoop .hbase .regionserver .throttle .NoLimitThroughputController ;
105+ import org .apache .hadoop .hbase .regionserver .throttle .ThroughputController ;
105106import org .apache .hadoop .hbase .security .User ;
106107import org .apache .hadoop .hbase .testclassification .MediumTests ;
107108import org .apache .hadoop .hbase .testclassification .RegionServerTests ;
@@ -783,11 +784,12 @@ private void injectFault() throws IOException {
783784 }
784785 }
785786
786- private static void flushStore (HStore store , long id ) throws IOException {
787+ private static StoreFlushContext flushStore (HStore store , long id ) throws IOException {
787788 StoreFlushContext storeFlushCtx = store .createFlushContext (id , FlushLifeCycleTracker .DUMMY );
788789 storeFlushCtx .prepare ();
789790 storeFlushCtx .flushCache (Mockito .mock (MonitoredTask .class ));
790791 storeFlushCtx .commit (Mockito .mock (MonitoredTask .class ));
792+ return storeFlushCtx ;
791793 }
792794
793795 /**
@@ -2222,7 +2224,7 @@ public void testClearSnapshotGetScannerConcurrently() throws Exception {
22222224 flushThread .join ();
22232225
22242226 if (myDefaultMemStore .shouldWait ) {
2225- SegmentScanner segmentScanner = getSegmentScanner (storeScanner );
2227+ SegmentScanner segmentScanner = getTypeKeyValueScanner (storeScanner , SegmentScanner . class );
22262228 MemStoreLABImpl memStoreLAB = (MemStoreLABImpl ) (segmentScanner .segment .getMemStoreLAB ());
22272229 assertTrue (memStoreLAB .isClosed ());
22282230 assertTrue (!memStoreLAB .chunks .isEmpty ());
@@ -2249,16 +2251,16 @@ public void testClearSnapshotGetScannerConcurrently() throws Exception {
22492251 }
22502252 }
22512253
2252- private SegmentScanner getSegmentScanner (StoreScanner storeScanner ) {
2253- List <SegmentScanner > segmentScanners = new ArrayList <SegmentScanner >();
2254+ @ SuppressWarnings ("unchecked" )
2255+ private <T > T getTypeKeyValueScanner (StoreScanner storeScanner , Class <T > keyValueScannerClass ) {
2256+ List <T > resultScanners = new ArrayList <T >();
22542257 for (KeyValueScanner keyValueScanner : storeScanner .currentScanners ) {
2255- if (keyValueScanner instanceof SegmentScanner ) {
2256- segmentScanners .add ((SegmentScanner ) keyValueScanner );
2258+ if (keyValueScannerClass . isInstance ( keyValueScanner ) ) {
2259+ resultScanners .add ((T ) keyValueScanner );
22572260 }
22582261 }
2259-
2260- assertTrue (segmentScanners .size () == 1 );
2261- return segmentScanners .get (0 );
2262+ assertTrue (resultScanners .size () == 1 );
2263+ return resultScanners .get (0 );
22622264 }
22632265
22642266 @ Test
@@ -2310,6 +2312,116 @@ public CustomDefaultMemStore(Configuration conf, CellComparator c,
23102312
23112313 }
23122314
2315+ /**
2316+ * This test is for HBASE-26488
2317+ */
2318+ @ Test
2319+ public void testMemoryLeakWhenFlushMemStoreRetrying () throws Exception {
2320+
2321+ Configuration conf = HBaseConfiguration .create ();
2322+
2323+ byte [] smallValue = new byte [3 ];
2324+ byte [] largeValue = new byte [9 ];
2325+ final long timestamp = EnvironmentEdgeManager .currentTime ();
2326+ final long seqId = 100 ;
2327+ final Cell smallCell = createCell (qf1 , timestamp , seqId , smallValue );
2328+ final Cell largeCell = createCell (qf2 , timestamp , seqId , largeValue );
2329+ TreeSet <byte []> quals = new TreeSet <>(Bytes .BYTES_COMPARATOR );
2330+ quals .add (qf1 );
2331+ quals .add (qf2 );
2332+
2333+ conf .set (HStore .MEMSTORE_CLASS_NAME , MyDefaultMemStore1 .class .getName ());
2334+ conf .setBoolean (WALFactory .WAL_ENABLED , false );
2335+ conf .set (DefaultStoreEngine .DEFAULT_STORE_FLUSHER_CLASS_KEY ,
2336+ MyDefaultStoreFlusher .class .getName ());
2337+
2338+ init (name .getMethodName (), conf , ColumnFamilyDescriptorBuilder .newBuilder (family ).build ());
2339+ MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1 ) (store .memstore );
2340+ assertTrue ((store .storeEngine .getStoreFlusher ()) instanceof MyDefaultStoreFlusher );
2341+
2342+ MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing ();
2343+ store .add (smallCell , memStoreSizing );
2344+ store .add (largeCell , memStoreSizing );
2345+ flushStore (store , id ++);
2346+
2347+ MemStoreLABImpl memStoreLAB =
2348+ (MemStoreLABImpl ) (myDefaultMemStore .snapshotImmutableSegment .getMemStoreLAB ());
2349+ assertTrue (memStoreLAB .isClosed ());
2350+ assertTrue (memStoreLAB .getOpenScannerCount () == 0 );
2351+ assertTrue (memStoreLAB .isReclaimed ());
2352+ assertTrue (memStoreLAB .chunks .isEmpty ());
2353+ StoreScanner storeScanner = null ;
2354+ try {
2355+ storeScanner =
2356+ (StoreScanner ) store .getScanner (new Scan (new Get (row )), quals , seqId + 1 );
2357+ assertTrue (store .storeEngine .getStoreFileManager ().getStorefileCount () == 1 );
2358+ assertTrue (store .memstore .size ().getCellsCount () == 0 );
2359+ assertTrue (store .memstore .getSnapshotSize ().getCellsCount () == 0 );
2360+ assertTrue (storeScanner .currentScanners .size () == 1 );
2361+ assertTrue (storeScanner .currentScanners .get (0 ) instanceof StoreFileScanner );
2362+
2363+ List <Cell > results = new ArrayList <>();
2364+ storeScanner .next (results );
2365+ assertEquals (2 , results .size ());
2366+ CellUtil .equals (smallCell , results .get (0 ));
2367+ CellUtil .equals (largeCell , results .get (1 ));
2368+ } finally {
2369+ if (storeScanner != null ) {
2370+ storeScanner .close ();
2371+ }
2372+ }
2373+ }
2374+
2375+
2376+ static class MyDefaultMemStore1 extends DefaultMemStore {
2377+
2378+ private ImmutableSegment snapshotImmutableSegment ;
2379+
2380+ public MyDefaultMemStore1 (Configuration conf , CellComparator c ,
2381+ RegionServicesForStores regionServices ) {
2382+ super (conf , c , regionServices );
2383+ }
2384+
2385+ @ Override
2386+ public MemStoreSnapshot snapshot () {
2387+ MemStoreSnapshot result = super .snapshot ();
2388+ this .snapshotImmutableSegment = snapshot ;
2389+ return result ;
2390+ }
2391+
2392+ }
2393+
2394+ public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
2395+ private static final AtomicInteger failCounter = new AtomicInteger (1 );
2396+ private static final AtomicInteger counter = new AtomicInteger (0 );
2397+
2398+ public MyDefaultStoreFlusher (Configuration conf , HStore store ) {
2399+ super (conf , store );
2400+ }
2401+
2402+ @ Override
2403+ public List <Path > flushSnapshot (MemStoreSnapshot snapshot , long cacheFlushId ,
2404+ MonitoredTask status , ThroughputController throughputController ,
2405+ FlushLifeCycleTracker tracker ) throws IOException {
2406+ counter .incrementAndGet ();
2407+ return super .flushSnapshot (snapshot , cacheFlushId , status , throughputController , tracker );
2408+ }
2409+
2410+ @ Override
2411+ protected void performFlush (InternalScanner scanner , final CellSink sink ,
2412+ ThroughputController throughputController ) throws IOException {
2413+
2414+ final int currentCount = counter .get ();
2415+ CellSink newCellSink = (cell ) -> {
2416+ if (currentCount <= failCounter .get ()) {
2417+ throw new IOException ("Simulated exception by tests" );
2418+ }
2419+ sink .append (cell );
2420+ };
2421+ super .performFlush (scanner , newCellSink , throughputController );
2422+ }
2423+ }
2424+
23132425 private HStoreFile mockStoreFileWithLength (long length ) {
23142426 HStoreFile sf = mock (HStoreFile .class );
23152427 StoreFileReader sfr = mock (StoreFileReader .class );
@@ -3093,7 +3205,5 @@ protected void doClearSnapShot() {
30933205 }
30943206 }
30953207 }
3096-
3097-
30983208 }
30993209}
0 commit comments