Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test Bulk Load and MR on a distributed cluster.
* With FileBased StorefileTracker enabled.
* It starts an MR job that creates linked chains
*
* The format of rows is like this:
* Row Key -> Long
*
* L:<< Chain Id >> -> Row Key of the next link in the chain
* S:<< Chain Id >> -> The step in the chain that his link is.
* D:<< Chain Id >> -> Random Data.
*
* All chains start on row 0.
* All rk's are > 0.
*
* After creating the linked lists they are walked over using a TableMapper based Mapreduce Job.
*
* There are a few options exposed:
*
* hbase.IntegrationTestBulkLoad.chainLength
* The number of rows that will be part of each and every chain.
*
* hbase.IntegrationTestBulkLoad.numMaps
* The number of mappers that will be run. Each mapper creates on linked list chain.
*
* hbase.IntegrationTestBulkLoad.numImportRounds
* How many jobs will be run to create linked lists.
*
* hbase.IntegrationTestBulkLoad.tableName
* The name of the table.
*
* hbase.IntegrationTestBulkLoad.replicaCount
* How many region replicas to configure for the table under test.
*/
@Category(IntegrationTests.class)
public class IntegrationTestFileBasedSFTBulkLoad extends IntegrationTestBulkLoad {

private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestFileBasedSFTBulkLoad.class);

private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount";
private static int NUM_REPLICA_COUNT_DEFAULT = 1;

@Test
public void testFileBasedSFTBulkLoad() throws Exception {
super.testBulkLoad();
}

@Override
public void setUpCluster() throws Exception {
util = getTestingUtil(getConf());
util.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
util.initializeCluster(1);
int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
LOG.debug("Region Replicas enabled: " + replicaCount);
}

// Scale this up on a real cluster
if (util.isDistributedCluster()) {
util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
Integer.toString(util.getAdmin().getRegionServers().size() * 10));
util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
} else {
util.startMiniMapReduceCluster();
}
}

public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
IntegrationTestingUtility.setUseDistributedCluster(conf);
int status = ToolRunner.run(conf, new IntegrationTestFileBasedSFTBulkLoad(), args);
System.exit(status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6844,7 +6844,7 @@ public interface BulkLoadListener {
* @return final path to be used for actual loading
* @throws IOException
*/
String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile)
String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile, String customStaging)
throws IOException;

/**
Expand Down Expand Up @@ -6966,12 +6966,21 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
familyWithFinalPath.put(familyName, new ArrayList<>());
}
List<Pair<Path, Path>> lst = familyWithFinalPath.get(familyName);
String finalPath = path;
try {
String finalPath = path;
boolean reqTmp = store.storeEngine.requireWritingToTmpDirFirst();
if (bulkLoadListener != null) {
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile);
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile,
reqTmp ? null : regionDir.toString());
}
Pair<Path, Path> pair = null;
if (reqTmp) {
pair = store.preBulkLoadHFile(finalPath, seqId);
}
else {
Path livePath = new Path(finalPath);
pair = new Pair<>(livePath, livePath);
}
Pair<Path, Path> pair = store.preBulkLoadHFile(finalPath, seqId);
lst.add(pair);
} catch (IOException ioe) {
// A failure here can cause an atomicity violation that we currently
Expand All @@ -6981,7 +6990,7 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
" load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);
if (bulkLoadListener != null) {
try {
bulkLoadListener.failedBulkLoad(familyName, path);
bulkLoadListener.failedBulkLoad(familyName, finalPath);
} catch (Exception ex) {
LOG.error("Error while calling failedBulkLoad for family " +
Bytes.toString(familyName) + " with path " + path, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,10 @@ private Path preCommitStoreFile(final String familyName, final Path buildPath,
* @throws IOException
*/
Path commitStoreFile(final Path buildPath, Path dstPath) throws IOException {
// rename is not necessary in case of direct-insert stores
if(buildPath.equals(dstPath)){
return dstPath;
}
// buildPath exists, therefore not doing an exists() check.
if (!rename(buildPath, dstPath)) {
throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -342,27 +343,37 @@ private User getActiveUser() throws IOException {
return user;
}

private static class SecureBulkLoadListener implements BulkLoadListener {
//package-private for test purpose only
static class SecureBulkLoadListener implements BulkLoadListener {
// Target filesystem
private final FileSystem fs;
private final String stagingDir;
private final Configuration conf;
// Source filesystem
private FileSystem srcFs = null;
private Map<String, FsPermission> origPermissions = null;
private Map<String, String> origSources = null;

public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
this.fs = fs;
this.stagingDir = stagingDir;
this.conf = conf;
this.origPermissions = new HashMap<>();
this.origSources = new HashMap<>();
}

@Override
public String prepareBulkLoad(final byte[] family, final String srcPath, boolean copyFile)
throws IOException {
public String prepareBulkLoad(final byte[] family, final String srcPath, boolean copyFile,
String customStaging ) throws IOException {
Path p = new Path(srcPath);
Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));

//store customStaging for failedBulkLoad
String currentStaging = stagingDir;
if(StringUtils.isNotEmpty(customStaging)){
currentStaging = customStaging;
}

Path stageP = new Path(currentStaging, new Path(Bytes.toString(family), p.getName()));

// In case of Replication for bulk load files, hfiles are already copied in staging directory
if (p.equals(stageP)) {
Expand Down Expand Up @@ -391,11 +402,16 @@ public String prepareBulkLoad(final byte[] family, final String srcPath, boolean
LOG.debug("Moving " + p + " to " + stageP);
FileStatus origFileStatus = fs.getFileStatus(p);
origPermissions.put(srcPath, origFileStatus.getPermission());
origSources.put(stageP.toString(), srcPath);
if(!fs.rename(p, stageP)) {
throw new IOException("Failed to move HFile: " + p + " to " + stageP);
}
}
fs.setPermission(stageP, PERM_ALL_ACCESS);

if(StringUtils.isNotEmpty(customStaging)) {
fs.setPermission(stageP, PERM_ALL_ACCESS);
}

return stageP.toString();
}

Expand All @@ -413,35 +429,37 @@ private void closeSrcFs() throws IOException {
}

@Override
public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {
public void failedBulkLoad(final byte[] family, final String stagedPath) throws IOException {
try {
Path p = new Path(srcPath);
if (srcFs == null) {
srcFs = FileSystem.newInstance(p.toUri(), conf);
}
if (!FSUtils.isSameHdfs(conf, srcFs, fs)) {
// files are copied so no need to move them back
String src = origSources.get(stagedPath);
if(StringUtils.isEmpty(src)){
LOG.debug(stagedPath + " was not moved to staging. No need to move back");
return;
}
Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));

// In case of Replication for bulk load files, hfiles are not renamed by end point during
// prepare stage, so no need of rename here again
if (p.equals(stageP)) {
LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");
Path stageP = new Path(stagedPath);
if (!fs.exists(stageP)) {
throw new IOException(
"Missing HFile: " + stageP + ", can't be moved back to it's original place");
}

//we should not move back files if the original exists
Path srcPath = new Path(src);
if(srcFs.exists(srcPath)) {
LOG.debug(src + " is already at it's original place. No need to move.");
return;
}

LOG.debug("Moving " + stageP + " back to " + p);
if (!fs.rename(stageP, p)) {
throw new IOException("Failed to move HFile: " + stageP + " to " + p);
LOG.debug("Moving " + stageP + " back to " + srcPath);
if (!fs.rename(stageP, srcPath)) {
throw new IOException("Failed to move HFile: " + stageP + " to " + srcPath);
}

// restore original permission
if (origPermissions.containsKey(srcPath)) {
fs.setPermission(p, origPermissions.get(srcPath));
if (origPermissions.containsKey(stagedPath)) {
fs.setPermission(srcPath, origPermissions.get(src));
} else {
LOG.warn("Can't find previous permission for path=" + srcPath);
LOG.warn("Can't find previous permission for path=" + stagedPath);
}
} finally {
closeSrcFs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public class TestBulkLoad extends TestBulkloadBase {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBulkLoad.class);

public TestBulkLoad(boolean useFileBasedSFT) {
super(useFileBasedSFT);
}

@Test
public void verifyBulkLoadEvent() throws IOException {
TableName tableName = TableName.valueOf("test", "test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand All @@ -46,6 +48,7 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
Expand All @@ -62,7 +65,10 @@

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestBulkloadBase {
@ClassRule
public static TemporaryFolder testFolder = new TemporaryFolder();
Expand All @@ -75,12 +81,31 @@ public class TestBulkloadBase {
protected final byte[] family2 = Bytes.toBytes("family2");
protected final byte[] family3 = Bytes.toBytes("family3");

protected Boolean useFileBasedSFT;

@Rule
public TestName name = new TestName();

public TestBulkloadBase(boolean useFileBasedSFT) {
this.useFileBasedSFT = useFileBasedSFT;
}

@Parameterized.Parameters
public static Collection<Boolean> data() {
Boolean[] data = {false, true};
return Arrays.asList(data);
}

@Before
public void before() throws IOException {
random.nextBytes(randomBytes);
if(useFileBasedSFT) {
conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
}
else {
conf.unset(StoreFileTrackerFactory.TRACKER_IMPL);
}
}

protected Pair<byte[], String> withMissingHFileForFamily(byte[] family) {
Expand Down Expand Up @@ -115,7 +140,7 @@ protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableNam
}

protected HRegion testRegionWithFamilies(byte[]... families) throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName());
TableName tableName = TableName.valueOf(name.getMethodName().substring(0, name.getMethodName().indexOf("[")));
return testRegionWithFamiliesAndSpecifiedTableName(tableName, families);
}

Expand All @@ -134,7 +159,7 @@ protected List<Pair<byte[], String>> withFamilyPathsFor(byte[]... families) thro
private String createHFileForFamilies(byte[] family) throws IOException {
HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf);
// TODO We need a way to do this without creating files
File hFileLocation = testFolder.newFile();
File hFileLocation = testFolder.newFile(generateUniqueName(null));
FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
try {
hFileFactory.withOutputStream(out);
Expand All @@ -153,6 +178,12 @@ private String createHFileForFamilies(byte[] family) throws IOException {
return hFileLocation.getAbsoluteFile().getAbsolutePath();
}

private static String generateUniqueName(final String suffix) {
String name = UUID.randomUUID().toString().replaceAll("-", "");
if (suffix != null) name += suffix;
return name;
}

protected static Matcher<WALEdit> bulkLogWalEditType(byte[] typeBytes) {
return new WalMatcher(typeBytes);
}
Expand Down
Loading