Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -639,25 +639,24 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException
* @param regionFs region file system
* @param mergedDir the temp directory of merged region
*/
private void mergeStoreFiles(
final MasterProcedureEnv env, final HRegionFileSystem regionFs, final Path mergedDir)
throws IOException {
private void mergeStoreFiles(final MasterProcedureEnv env, final HRegionFileSystem regionFs,
final Path mergedDir) throws IOException {
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Configuration conf = env.getMasterConfiguration();
final TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());

for (String family : regionFs.getFamilies()) {
final ColumnFamilyDescriptor hcd = htd.getColumnFamily(Bytes.toBytes(family));
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
String family = hcd.getNameAsString();
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);

if (storeFiles != null && storeFiles.size() > 0) {
for (StoreFileInfo storeFileInfo : storeFiles) {
// Create reference file(s) of the region in mergedDir.
// As this procedure is running on master, use CacheConfig.DISABLED means
// don't cache any block.
regionFs.mergeStoreFile(mergedRegion, family,
new HStoreFile(mfs.getFileSystem(), storeFileInfo, conf, CacheConfig.DISABLED,
hcd.getBloomFilterType(), true), mergedDir);
regionFs.mergeStoreFile(mergedRegion, family, new HStoreFile(mfs.getFileSystem(),
storeFileInfo, conf, CacheConfig.DISABLED, hcd.getBloomFilterType(), true),
mergedDir);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
final HRegionFileSystem regionFs) throws IOException {
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Configuration conf = env.getMasterConfiguration();
TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
// The following code sets up a thread pool executor with as many slots as
// there's files to split. It then fires up everything, waits for
// completion and finally checks for any exception
Expand All @@ -635,12 +636,15 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
// clean this up.
int nbFiles = 0;
final Map<String, Collection<StoreFileInfo>> files =
new HashMap<String, Collection<StoreFileInfo>>(regionFs.getFamilies().size());
for (String family: regionFs.getFamilies()) {
new HashMap<String, Collection<StoreFileInfo>>(htd.getColumnFamilyCount());
for (ColumnFamilyDescriptor cfd : htd.getColumnFamilies()) {
String family = cfd.getNameAsString();
Collection<StoreFileInfo> sfis = regionFs.getStoreFiles(family);
if (sfis == null) continue;
if (sfis == null) {
continue;
}
Collection<StoreFileInfo> filteredSfis = null;
for (StoreFileInfo sfi: sfis) {
for (StoreFileInfo sfi : sfis) {
// Filter. There is a lag cleaning up compacted reference files. They get cleared
// after a delay in case outstanding Scanners still have references. Because of this,
// the listing of the Store content may have straggler reference files. Skip these.
Expand All @@ -661,20 +665,19 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
}
if (nbFiles == 0) {
// no file needs to be splitted.
return new Pair<Integer, Integer>(0,0);
return new Pair<Integer, Integer>(0, 0);
}
// Max #threads is the smaller of the number of storefiles or the default max determined above.
int maxThreads = Math.min(
conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT)),
nbFiles);
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
final ExecutorService threadPool = Executors.newFixedThreadPool(
maxThreads, Threads.getNamedThreadFactory("StoreFileSplitter-%1$d"));
final List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>>(nbFiles);
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
Threads.getNamedThreadFactory("StoreFileSplitter-%1$d"));
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);

TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
// Split each store file.
for (Map.Entry<String, Collection<StoreFileInfo>> e : files.entrySet()) {
byte[] familyName = Bytes.toBytes(e.getKey());
Expand All @@ -684,9 +687,9 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
for (StoreFileInfo storeFileInfo : storeFiles) {
// As this procedure is running on master, use CacheConfig.DISABLED means
// don't cache any block.
StoreFileSplitter sfs = new StoreFileSplitter(regionFs, familyName,
new HStoreFile(mfs.getFileSystem(), storeFileInfo, conf, CacheConfig.DISABLED,
hcd.getBloomFilterType(), true));
StoreFileSplitter sfs =
new StoreFileSplitter(regionFs, familyName, new HStoreFile(mfs.getFileSystem(),
storeFileInfo, conf, CacheConfig.DISABLED, hcd.getBloomFilterType(), true));
futures.add(threadPool.submit(sfs));
}
}
Expand All @@ -698,7 +701,7 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
// When splits ran on the RegionServer, how-long-to-wait-configuration was named
// hbase.regionserver.fileSplitTimeout. If set, use its value.
long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout",
conf.getLong("hbase.regionserver.fileSplitTimeout", 600000));
conf.getLong("hbase.regionserver.fileSplitTimeout", 600000));
try {
boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, TimeUnit.MILLISECONDS);
if (stillRunning) {
Expand All @@ -707,11 +710,11 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
while (!threadPool.isTerminated()) {
Thread.sleep(50);
}
throw new IOException("Took too long to split the" +
" files and create the references, aborting split");
throw new IOException(
"Took too long to split the" + " files and create the references, aborting split");
}
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}

int daughterA = 0;
Expand All @@ -731,9 +734,8 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,

if (LOG.isDebugEnabled()) {
LOG.debug("pid=" + getProcId() + " split storefiles for region " +
getParentRegion().getShortNameToLog() +
" Daughter A: " + daughterA + " storefiles, Daughter B: " +
daughterB + " storefiles.");
getParentRegion().getShortNameToLog() + " Daughter A: " + daughterA +
" storefiles, Daughter B: " + daughterB + " storefiles.");
}
return new Pair<Integer, Integer>(daughterA, daughterB);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

/**
* Testcase for HBASE-22632
*/
@Category({ MasterTests.class, MediumTests.class })
public class TestIgnoreUnknownFamily {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestIgnoreUnknownFamily.class);

private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();

private static final byte[] FAMILY = Bytes.toBytes("cf");

private static final byte[] UNKNOWN_FAMILY = Bytes.toBytes("wrong_cf");

@Rule
public TestName name = new TestName();

@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster(1);
}

@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}

@After
public void tearDownAfterTest() throws IOException {
Admin admin = UTIL.getAdmin();
for (TableName tableName : admin.listTableNames()) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
}

private void addStoreFileToKnownFamily(RegionInfo region) throws IOException {
MasterFileSystem mfs = UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem();
Path regionDir =
FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(mfs.getConfiguration()), region);
Path familyDir = new Path(regionDir, Bytes.toString(UNKNOWN_FAMILY));
StoreFileWriter writer =
new StoreFileWriter.Builder(mfs.getConfiguration(), mfs.getFileSystem())
.withOutputDir(familyDir).withFileContext(new HFileContext()).build();
writer.close();
}

@Test
public void testSplit()
throws IOException, InterruptedException, ExecutionException, TimeoutException {
TableName tableName = TableName.valueOf(name.getMethodName());
Admin admin = UTIL.getAdmin();
admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
RegionInfo region = admin.getRegions(tableName).get(0);
addStoreFileToKnownFamily(region);
admin.splitRegionAsync(region.getRegionName(), Bytes.toBytes(0)).get(30, TimeUnit.SECONDS);
}

@Test
public void testMerge()
throws IOException, InterruptedException, ExecutionException, TimeoutException {
TableName tableName = TableName.valueOf(name.getMethodName());
Admin admin = UTIL.getAdmin();
admin.createTable(
TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(),
new byte[][] { Bytes.toBytes(0) });
List<RegionInfo> regions = admin.getRegions(tableName);
addStoreFileToKnownFamily(regions.get(0));
admin.mergeRegionsAsync(regions.get(0).getEncodedNameAsBytes(),
regions.get(1).getEncodedNameAsBytes(), false).get(30, TimeUnit.SECONDS);
}
}