Skip to content

Commit 586b796

Browse files
guluo2016mokai87
authored andcommitted
RSMobFileCleanerChore may close the StoreFileReader object which is being used by Compaction thread (apache#6464)
Signed-off-by: Wellington Chevreuil <[email protected]>
1 parent 6083bd9 commit 586b796

File tree

2 files changed

+91
-11
lines changed

2 files changed

+91
-11
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ protected void chore() {
108108
// Now clean obsolete files for a table
109109
LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
110110
List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
111+
if (list.isEmpty()) {
112+
// The table is not MOB table, just skip it
113+
continue;
114+
}
111115
List<HRegion> regions = rs.getRegions(htd.getTableName());
112116
for (HRegion region : regions) {
113117
for (ColumnFamilyDescriptor hcd : list) {
@@ -116,14 +120,27 @@ protected void chore() {
116120
Set<String> regionMobs = new HashSet<String>();
117121
Path currentPath = null;
118122
try {
119-
// collectinng referenced MOBs
123+
// collecting referenced MOBs
120124
for (HStoreFile sf : sfs) {
121125
currentPath = sf.getPath();
122-
sf.initReader();
123-
byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
124-
byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
125-
// close store file to avoid memory leaks
126-
sf.closeStoreFile(true);
126+
byte[] mobRefData = null;
127+
byte[] bulkloadMarkerData = null;
128+
if (sf.getReader() == null) {
129+
synchronized (sf) {
130+
boolean needCreateReader = sf.getReader() == null;
131+
sf.initReader();
132+
mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
133+
bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
134+
if (needCreateReader) {
135+
// close store file to avoid memory leaks
136+
sf.closeStoreFile(true);
137+
}
138+
}
139+
} else {
140+
mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
141+
bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
142+
}
143+
127144
if (mobRefData == null) {
128145
if (bulkloadMarkerData == null) {
129146
LOG.warn(

hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
package org.apache.hadoop.hbase.mob;
1919

2020
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertNotNull;
2122
import static org.junit.Assert.assertTrue;
2223

2324
import java.io.IOException;
2425
import java.util.Arrays;
26+
import java.util.Collection;
2527
import java.util.List;
28+
import java.util.concurrent.CompletableFuture;
2629
import java.util.stream.Collectors;
2730
import org.apache.hadoop.conf.Configuration;
2831
import org.apache.hadoop.fs.FileStatus;
@@ -41,8 +44,12 @@
4144
import org.apache.hadoop.hbase.client.Result;
4245
import org.apache.hadoop.hbase.client.ResultScanner;
4346
import org.apache.hadoop.hbase.client.Table;
47+
import org.apache.hadoop.hbase.client.TableDescriptor;
4448
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
4549
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
50+
import org.apache.hadoop.hbase.regionserver.HRegion;
51+
import org.apache.hadoop.hbase.regionserver.HStore;
52+
import org.apache.hadoop.hbase.regionserver.HStoreFile;
4653
import org.apache.hadoop.hbase.testclassification.MediumTests;
4754
import org.apache.hadoop.hbase.util.Bytes;
4855
import org.junit.After;
@@ -124,15 +131,15 @@ private void initConf() {
124131
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
125132
}
126133

127-
private void loadData(int start, int num) {
134+
private void loadData(Table t, int start, int num) {
128135
try {
129136

130137
for (int i = 0; i < num; i++) {
131138
Put p = new Put(Bytes.toBytes(start + i));
132139
p.addColumn(fam, qualifier, mobVal);
133-
table.put(p);
140+
t.put(p);
134141
}
135-
admin.flush(table.getName());
142+
admin.flush(t.getName());
136143
} catch (Exception e) {
137144
LOG.error("MOB file cleaner chore test FAILED", e);
138145
assertTrue(false);
@@ -148,8 +155,8 @@ public void tearDown() throws Exception {
148155

149156
@Test
150157
public void testMobFileCleanerChore() throws InterruptedException, IOException {
151-
loadData(0, 10);
152-
loadData(10, 10);
158+
loadData(table, 0, 10);
159+
loadData(table, 10, 10);
153160
// loadData(20, 10);
154161
long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
155162
assertEquals(2, num);
@@ -225,6 +232,62 @@ public void testMobFileCleanerChore() throws InterruptedException, IOException {
225232
assertEquals(20, scanned);
226233
}
227234

235+
@Test
236+
public void testCleaningAndStoreFileReaderCreatedByOtherThreads()
237+
throws IOException, InterruptedException {
238+
TableName testTable = TableName.valueOf("testCleaningAndStoreFileReaderCreatedByOtherThreads");
239+
ColumnFamilyDescriptor cfDesc = ColumnFamilyDescriptorBuilder.newBuilder(fam)
240+
.setMobEnabled(true).setMobThreshold(mobLen).setMaxVersions(1).build();
241+
TableDescriptor tDesc =
242+
TableDescriptorBuilder.newBuilder(testTable).setColumnFamily(cfDesc).build();
243+
admin.createTable(tDesc);
244+
assertTrue(admin.tableExists(testTable));
245+
246+
// put some data
247+
loadData(admin.getConnection().getTable(testTable), 0, 10);
248+
249+
HRegion region = HTU.getHBaseCluster().getRegions(testTable).get(0);
250+
HStore store = region.getStore(fam);
251+
Collection<HStoreFile> storeFiles = store.getStorefiles();
252+
assertEquals(1, store.getStorefiles().size());
253+
final HStoreFile sf = storeFiles.iterator().next();
254+
assertNotNull(sf);
255+
long mobFileNum = getNumberOfMobFiles(conf, testTable, new String(fam));
256+
assertEquals(1, mobFileNum);
257+
258+
ServerName serverName = null;
259+
for (ServerName sn : admin.getRegionServers()) {
260+
boolean flag = admin.getRegions(sn).stream().anyMatch(
261+
r -> r.getRegionNameAsString().equals(region.getRegionInfo().getRegionNameAsString()));
262+
if (flag) {
263+
serverName = sn;
264+
break;
265+
}
266+
}
267+
assertNotNull(serverName);
268+
RSMobFileCleanerChore cleanerChore =
269+
HTU.getHBaseCluster().getRegionServer(serverName).getRSMobFileCleanerChore();
270+
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
271+
boolean readerIsNotNull = false;
272+
try {
273+
sf.initReader();
274+
Thread.sleep(1000 * 10);
275+
readerIsNotNull = sf.getReader() != null;
276+
sf.closeStoreFile(true);
277+
} catch (Exception e) {
278+
LOG.error("We occur an exception", e);
279+
}
280+
return readerIsNotNull;
281+
});
282+
Thread.sleep(100);
283+
// The StoreFileReader object was created by another thread
284+
cleanerChore.chore();
285+
Boolean readerIsNotNull = future.join();
286+
assertTrue(readerIsNotNull);
287+
admin.disableTable(testTable);
288+
admin.deleteTable(testTable);
289+
}
290+
228291
private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)
229292
throws IOException {
230293
FileSystem fs = FileSystem.get(conf);

0 commit comments

Comments
 (0)