Skip to content

Commit 5cea811

Browse files
haohao0103alanzhao
andauthored
HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes (#5121)
Co-authored-by: alanzhao <[email protected]> Signed-off-by: Wellington Chevreuil <[email protected]>
1 parent 5d82d4f commit 5cea811

File tree

2 files changed

+178
-23
lines changed

2 files changed

+178
-23
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java

Lines changed: 70 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.FileNotFoundException;
2323
import java.io.IOException;
2424
import java.io.InterruptedIOException;
25+
import java.net.InetSocketAddress;
2526
import java.nio.ByteBuffer;
2627
import java.util.ArrayDeque;
2728
import java.util.ArrayList;
@@ -56,13 +57,17 @@
5657
import org.apache.hadoop.fs.FileSystem;
5758
import org.apache.hadoop.fs.Path;
5859
import org.apache.hadoop.fs.permission.FsPermission;
60+
import org.apache.hadoop.hbase.Cell;
61+
import org.apache.hadoop.hbase.CellUtil;
5962
import org.apache.hadoop.hbase.HBaseConfiguration;
6063
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
6164
import org.apache.hadoop.hbase.HConstants;
65+
import org.apache.hadoop.hbase.HRegionLocation;
6266
import org.apache.hadoop.hbase.TableName;
6367
import org.apache.hadoop.hbase.TableNotFoundException;
6468
import org.apache.hadoop.hbase.client.AsyncAdmin;
6569
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
70+
import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
6671
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
6772
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
6873
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -114,6 +119,13 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
114119

115120
private static final Logger LOG = LoggerFactory.getLogger(BulkLoadHFilesTool.class);
116121

122+
/**
123+
* Keep locality while generating HFiles for bulkload. See HBASE-12596
124+
*/
125+
public static final String LOCALITY_SENSITIVE_CONF_KEY =
126+
"hbase.bulkload.locality.sensitive.enabled";
127+
private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
128+
117129
public static final String NAME = "completebulkload";
118130
/**
119131
* Whether to run validation on hfiles before loading.
@@ -540,7 +552,6 @@ private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase
540552
Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
541553
while (!queue.isEmpty()) {
542554
final LoadQueueItem item = queue.remove();
543-
544555
final Callable<Pair<List<LoadQueueItem>, String>> call =
545556
() -> groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
546557
splittingFutures.add(pool.submit(call));
@@ -578,8 +589,8 @@ private String getUniqueName() {
578589
return UUID.randomUUID().toString().replaceAll("-", "");
579590
}
580591

581-
private List<LoadQueueItem> splitStoreFile(LoadQueueItem item, TableDescriptor tableDesc,
582-
byte[] splitKey) throws IOException {
592+
private List<LoadQueueItem> splitStoreFile(AsyncTableRegionLocator loc, LoadQueueItem item,
593+
TableDescriptor tableDesc, byte[] splitKey) throws IOException {
583594
Path hfilePath = item.getFilePath();
584595
byte[] family = item.getFamily();
585596
Path tmpDir = hfilePath.getParent();
@@ -594,7 +605,8 @@ private List<LoadQueueItem> splitStoreFile(LoadQueueItem item, TableDescriptor t
594605

595606
Path botOut = new Path(tmpDir, uniqueName + ".bottom");
596607
Path topOut = new Path(tmpDir, uniqueName + ".top");
597-
splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
608+
609+
splitStoreFile(loc, getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
598610

599611
FileSystem fs = tmpDir.getFileSystem(getConf());
600612
fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
@@ -718,8 +730,9 @@ CacheConfig.DISABLED, true, getConf())) {
718730
checkRegionIndexValid(splitIdx, startEndKeys, tableName);
719731
}
720732
byte[] splitPoint = startEndKeys.get(splitIdx).getSecond();
721-
List<LoadQueueItem> lqis =
722-
splitStoreFile(item, FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint);
733+
List<LoadQueueItem> lqis = splitStoreFile(conn.getRegionLocator(tableName), item,
734+
FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint);
735+
723736
return new Pair<>(lqis, null);
724737
}
725738

@@ -729,25 +742,27 @@ CacheConfig.DISABLED, true, getConf())) {
729742
}
730743

731744
/**
732-
* Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom
733-
* filters, etc.
745+
* Split a storefile into a top and bottom half with favored nodes, maintaining the metadata,
746+
* recreating bloom filters, etc.
734747
*/
735748
@InterfaceAudience.Private
736-
static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc,
737-
byte[] splitKey, Path bottomOut, Path topOut) throws IOException {
749+
static void splitStoreFile(AsyncTableRegionLocator loc, Configuration conf, Path inFile,
750+
ColumnFamilyDescriptor familyDesc, byte[] splitKey, Path bottomOut, Path topOut)
751+
throws IOException {
738752
// Open reader with no block cache, and not in-memory
739753
Reference topReference = Reference.createTopReference(splitKey);
740754
Reference bottomReference = Reference.createBottomReference(splitKey);
741755

742-
copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
743-
copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
756+
copyHFileHalf(conf, inFile, topOut, topReference, familyDesc, loc);
757+
copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc, loc);
744758
}
745759

746760
/**
747-
* Copy half of an HFile into a new HFile.
761+
* Copy half of an HFile into a new HFile with favored nodes.
748762
*/
749763
private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
750-
Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException {
764+
Reference reference, ColumnFamilyDescriptor familyDescriptor, AsyncTableRegionLocator loc)
765+
throws IOException {
751766
FileSystem fs = inFile.getFileSystem(conf);
752767
CacheConfig cacheConf = CacheConfig.DISABLED;
753768
HalfStoreFileReader halfReader = null;
@@ -769,12 +784,50 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
769784
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize)
770785
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
771786
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();
772-
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
773-
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
787+
774788
HFileScanner scanner = halfReader.getScanner(false, false, false);
775789
scanner.seekTo();
776790
do {
777-
halfWriter.append(scanner.getCell());
791+
final Cell cell = scanner.getCell();
792+
if (null != halfWriter) {
793+
halfWriter.append(cell);
794+
} else {
795+
796+
// init halfwriter
797+
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
798+
byte[] rowKey = CellUtil.cloneRow(cell);
799+
HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey));
800+
InetSocketAddress[] favoredNodes = null;
801+
if (null == hRegionLocation) {
802+
LOG.warn(
803+
"Failed get region location for rowkey {} , Using writer without favoured nodes.",
804+
Bytes.toString(rowKey));
805+
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
806+
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
807+
} else {
808+
LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
809+
InetSocketAddress initialIsa =
810+
new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort());
811+
if (initialIsa.isUnresolved()) {
812+
LOG.warn("Failed get location for region {} , Using writer without favoured nodes.",
813+
hRegionLocation);
814+
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
815+
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
816+
} else {
817+
LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
818+
favoredNodes = new InetSocketAddress[] { initialIsa };
819+
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
820+
.withBloomType(bloomFilterType).withFileContext(hFileContext)
821+
.withFavoredNodes(favoredNodes).build();
822+
}
823+
}
824+
} else {
825+
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
826+
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
827+
}
828+
halfWriter.append(cell);
829+
}
830+
778831
} while (scanner.next());
779832

780833
for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {

hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java

Lines changed: 108 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.junit.Assert.fail;
2727

2828
import java.io.IOException;
29+
import java.net.InetAddress;
2930
import java.nio.ByteBuffer;
3031
import java.util.ArrayList;
3132
import java.util.Collection;
@@ -43,10 +44,12 @@
4344
import org.apache.hadoop.hbase.HBaseClassTestRule;
4445
import org.apache.hadoop.hbase.HBaseTestingUtil;
4546
import org.apache.hadoop.hbase.HConstants;
47+
import org.apache.hadoop.hbase.HRegionLocation;
4648
import org.apache.hadoop.hbase.NamespaceDescriptor;
4749
import org.apache.hadoop.hbase.TableName;
4850
import org.apache.hadoop.hbase.TableNotFoundException;
4951
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
52+
import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
5053
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
5154
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
5255
import org.apache.hadoop.hbase.client.Table;
@@ -63,7 +66,12 @@
6366
import org.apache.hadoop.hbase.testclassification.MiscTests;
6467
import org.apache.hadoop.hbase.util.Bytes;
6568
import org.apache.hadoop.hbase.util.CommonFSUtils;
69+
import org.apache.hadoop.hbase.util.FutureUtils;
6670
import org.apache.hadoop.hbase.util.HFileTestUtil;
71+
import org.apache.hadoop.hdfs.DistributedFileSystem;
72+
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
73+
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
74+
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
6775
import org.hamcrest.MatcherAssert;
6876
import org.junit.AfterClass;
6977
import org.junit.BeforeClass;
@@ -555,15 +563,48 @@ public void testSplitStoreFile() throws IOException {
555563
FileSystem fs = util.getTestFileSystem();
556564
Path testIn = new Path(dir, "testhfile");
557565
ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
566+
String tableName = tn.getMethodName();
567+
util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
558568
HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
559569
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
560570

561571
Path bottomOut = new Path(dir, "bottom.out");
562572
Path topOut = new Path(dir, "top.out");
563573

564-
BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
565-
Bytes.toBytes("ggg"), bottomOut, topOut);
574+
BulkLoadHFilesTool.splitStoreFile(
575+
util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
576+
util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
577+
578+
int rowCount = verifyHFile(bottomOut);
579+
rowCount += verifyHFile(topOut);
580+
assertEquals(1000, rowCount);
581+
}
582+
583+
/**
584+
* Test hfile splits with the favored nodes
585+
*/
586+
@Test
587+
public void testSplitStoreFileWithFavoriteNodes() throws IOException {
566588

589+
Path dir = new Path(util.getDefaultRootDirPath(), "testhfile");
590+
FileSystem fs = util.getDFSCluster().getFileSystem();
591+
592+
Path testIn = new Path(dir, "testSplitStoreFileWithFavoriteNodes");
593+
ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
594+
String tableName = tn.getMethodName();
595+
Table table = util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
596+
HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
597+
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
598+
599+
Path bottomOut = new Path(dir, "bottom.out");
600+
Path topOut = new Path(dir, "top.out");
601+
602+
final AsyncTableRegionLocator regionLocator =
603+
util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName));
604+
BulkLoadHFilesTool.splitStoreFile(regionLocator, util.getConfiguration(), testIn, familyDesc,
605+
Bytes.toBytes("ggg"), bottomOut, topOut);
606+
verifyHFileFavoriteNode(topOut, regionLocator, fs);
607+
verifyHFileFavoriteNode(bottomOut, regionLocator, fs);
567608
int rowCount = verifyHFile(bottomOut);
568609
rowCount += verifyHFile(topOut);
569610
assertEquals(1000, rowCount);
@@ -575,14 +616,17 @@ public void testSplitStoreFileWithCreateTimeTS() throws IOException {
575616
FileSystem fs = util.getTestFileSystem();
576617
Path testIn = new Path(dir, "testhfile");
577618
ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
619+
String tableName = tn.getMethodName();
620+
util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
578621
HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
579622
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
580623

581624
Path bottomOut = new Path(dir, "bottom.out");
582625
Path topOut = new Path(dir, "top.out");
583626

584-
BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
585-
Bytes.toBytes("ggg"), bottomOut, topOut);
627+
BulkLoadHFilesTool.splitStoreFile(
628+
util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
629+
util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
586630

587631
verifyHFileCreateTimeTS(bottomOut);
588632
verifyHFileCreateTimeTS(topOut);
@@ -615,14 +659,17 @@ private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadE
615659
Path testIn = new Path(dir, "testhfile");
616660
ColumnFamilyDescriptor familyDesc =
617661
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
662+
String tableName = tn.getMethodName();
663+
util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
618664
HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
619665
bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
620666

621667
Path bottomOut = new Path(dir, "bottom.out");
622668
Path topOut = new Path(dir, "top.out");
623669

624-
BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
625-
Bytes.toBytes("ggg"), bottomOut, topOut);
670+
BulkLoadHFilesTool.splitStoreFile(
671+
util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
672+
util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
626673

627674
int rowCount = verifyHFile(bottomOut);
628675
rowCount += verifyHFile(topOut);
@@ -654,6 +701,61 @@ private void verifyHFileCreateTimeTS(Path p) throws IOException {
654701
}
655702
}
656703

704+
/**
705+
* test split storefile with favorite node information
706+
*/
707+
private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocator, FileSystem fs)
708+
throws IOException {
709+
Configuration conf = util.getConfiguration();
710+
711+
try (HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);) {
712+
713+
final byte[] firstRowkey = reader.getFirstRowKey().get();
714+
final HRegionLocation hRegionLocation =
715+
FutureUtils.get(regionLocator.getRegionLocation(firstRowkey));
716+
717+
final String targetHostName = hRegionLocation.getHostname();
718+
719+
if (fs instanceof DistributedFileSystem) {
720+
String pathStr = p.toUri().getPath();
721+
LocatedBlocks blocks =
722+
((DistributedFileSystem) fs).getClient().getLocatedBlocks(pathStr, 0L);
723+
724+
boolean isFavoriteNode = false;
725+
List<LocatedBlock> locatedBlocks = blocks.getLocatedBlocks();
726+
int index = 0;
727+
do {
728+
if (index > 0) {
729+
assertTrue("failed use favored nodes", isFavoriteNode);
730+
}
731+
isFavoriteNode = false;
732+
final LocatedBlock block = locatedBlocks.get(index);
733+
734+
final DatanodeInfo[] locations = block.getLocations();
735+
for (DatanodeInfo location : locations) {
736+
737+
final String hostName = location.getHostName();
738+
if (
739+
targetHostName.equals(hostName.equals("127.0.0.1")
740+
? InetAddress.getLocalHost().getHostName()
741+
: "127.0.0.1") || targetHostName.equals(hostName)
742+
) {
743+
isFavoriteNode = true;
744+
break;
745+
}
746+
}
747+
748+
index++;
749+
} while (index < locatedBlocks.size());
750+
if (index > 0) {
751+
assertTrue("failed use favored nodes", isFavoriteNode);
752+
}
753+
754+
}
755+
756+
}
757+
}
758+
657759
private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
658760
Integer value = map.containsKey(first) ? map.get(first) : 0;
659761
map.put(first, value + 1);

0 commit comments

Comments
 (0)