Skip to content

Commit 3535d31

Browse files
committed
HBASE-26064 Introduce a StoreFileTracker to abstract the store file tracking logic
1 parent e65fc92 commit 3535d31

24 files changed

+358
-87
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -143,17 +143,17 @@ public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> s
143143
};
144144

145145
private final CellSinkFactory<StoreFileWriter> writerFactory =
146-
new CellSinkFactory<StoreFileWriter>() {
147-
@Override
148-
public StoreFileWriter createWriter(InternalScanner scanner,
149-
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
150-
boolean shouldDropBehind, boolean major) throws IOException {
151-
// make this writer with tags always because of possible new cells with tags.
152-
return store.createWriterInTmp(fd.maxKeyCount,
153-
major ? majorCompactionCompression : minorCompactionCompression,
154-
true, true, true, shouldDropBehind);
155-
}
156-
};
146+
new CellSinkFactory<StoreFileWriter>() {
147+
@Override
148+
public StoreFileWriter createWriter(InternalScanner scanner,
149+
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
150+
boolean shouldDropBehind, boolean major) throws IOException {
151+
// make this writer with tags always because of possible new cells with tags.
152+
return store.createWriter(fd.maxKeyCount,
153+
major ? majorCompactionCompression : minorCompactionCompression, true, true, true,
154+
shouldDropBehind);
155+
}
156+
};
157157

158158
public DefaultMobStoreCompactor(Configuration conf, HStore store) {
159159
super(conf, store);

hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
127127
synchronized (flushLock) {
128128
status.setStatus("Flushing " + store + ": creating writer");
129129
// Write the map out to the disk
130-
writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(),
131-
false, true, true, false);
130+
writer = store.createWriter(cellsCount,
131+
store.getColumnFamilyDescriptor().getCompressionType(), false, true, true, false);
132132
IOException e = null;
133133
try {
134134
// It's a mob store, flush the cells in a mob way. This is the difference of flushing

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
6060
synchronized (flushLock) {
6161
status.setStatus("Flushing " + store + ": creating writer");
6262
// Write the map out to the disk
63-
writer = store.createWriterInTmp(cellsCount,
63+
writer = store.createWriter(cellsCount,
6464
store.getColumnFamilyDescriptor().getCompressionType(), false, true,
6565
snapshot.isTagsPresent(), false);
6666
IOException e = null;

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -237,11 +237,7 @@ public String getStoragePolicyName(String familyName) {
237237
* @param familyName Column Family Name
238238
* @return a set of {@link StoreFileInfo} for the specified family.
239239
*/
240-
public Collection<StoreFileInfo> getStoreFiles(final byte[] familyName) throws IOException {
241-
return getStoreFiles(Bytes.toString(familyName));
242-
}
243-
244-
public Collection<StoreFileInfo> getStoreFiles(final String familyName) throws IOException {
240+
public List<StoreFileInfo> getStoreFiles(final String familyName) throws IOException {
245241
return getStoreFiles(familyName, true);
246242
}
247243

@@ -251,7 +247,7 @@ public Collection<StoreFileInfo> getStoreFiles(final String familyName) throws I
251247
* @param familyName Column Family Name
252248
* @return a set of {@link StoreFileInfo} for the specified family.
253249
*/
254-
public Collection<StoreFileInfo> getStoreFiles(final String familyName, final boolean validate)
250+
public List<StoreFileInfo> getStoreFiles(final String familyName, final boolean validate)
255251
throws IOException {
256252
Path familyDir = getStoreDir(familyName);
257253
FileStatus[] files = CommonFSUtils.listStatus(this.fs, familyDir);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java

Lines changed: 61 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.regionserver;
1919

20+
import com.google.errorprone.annotations.RestrictedApi;
2021
import java.io.IOException;
2122
import java.io.InterruptedIOException;
2223
import java.net.InetSocketAddress;
@@ -66,6 +67,7 @@
6667
import org.apache.hadoop.hbase.backup.FailedArchiveException;
6768
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
6869
import org.apache.hadoop.hbase.client.RegionInfo;
70+
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
6971
import org.apache.hadoop.hbase.client.Scan;
7072
import org.apache.hadoop.hbase.conf.ConfigurationManager;
7173
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
@@ -89,6 +91,8 @@
8991
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
9092
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
9193
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
94+
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
95+
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
9296
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
9397
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
9498
import org.apache.hadoop.hbase.security.EncryptionUtil;
@@ -210,6 +214,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
210214

211215
private HFileDataBlockEncoder dataBlockEncoder;
212216

217+
private final StoreFileTracker storeFileTracker;
218+
213219
final StoreEngine<?, ?, ?, ?> storeEngine;
214220

215221
private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
@@ -290,6 +296,7 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
290296
}
291297

292298
this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator());
299+
this.storeFileTracker = StoreFileTrackerFactory.create(storeContext);
293300
List<HStoreFile> hStoreFiles = loadStoreFiles(warmup);
294301
// Move the storeSize calculation out of loadStoreFiles() method, because the secondary read
295302
// replica's refreshStoreFiles() will also use loadStoreFiles() to refresh its store files and
@@ -337,6 +344,7 @@ private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throw
337344
.withFamilyStoreDirectoryPath(region.getRegionFileSystem()
338345
.getStoreDir(family.getNameAsString()))
339346
.withRegionCoprocessorHost(region.getCoprocessorHost())
347+
.withPrimary(RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()))
340348
.build();
341349
}
342350

@@ -518,7 +526,7 @@ void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
518526
* from the given directory.
519527
*/
520528
private List<HStoreFile> loadStoreFiles(boolean warmup) throws IOException {
521-
Collection<StoreFileInfo> files = getRegionFileSystem().getStoreFiles(getColumnFamilyName());
529+
Collection<StoreFileInfo> files = storeFileTracker.loadStoreFiles();
522530
return openStoreFiles(files, warmup);
523531
}
524532

@@ -607,7 +615,7 @@ private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean
607615

608616
@Override
609617
public void refreshStoreFiles() throws IOException {
610-
Collection<StoreFileInfo> newFiles = getRegionFileSystem().getStoreFiles(getColumnFamilyName());
618+
Collection<StoreFileInfo> newFiles = storeFileTracker.loadStoreFiles();
611619
refreshStoreFilesInternal(newFiles);
612620
}
613621

@@ -674,7 +682,7 @@ private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throw
674682
List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
675683

676684
// propogate the file changes to the underlying store file manager
677-
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception
685+
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, false); //won't throw an exception
678686

679687
// Advance the memstore read point to be at least the new store files seqIds so that
680688
// readers might pick it up. This assumes that the store is not getting any writes (otherwise
@@ -872,8 +880,7 @@ public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws
872880
HStoreFile sf = createStoreFileAndReader(dstPath);
873881
bulkLoadHFile(sf);
874882

875-
LOG.info("Successfully loaded {} into {} (new location: {})",
876-
srcPath, this, dstPath);
883+
LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath);
877884

878885
return dstPath;
879886
}
@@ -888,6 +895,7 @@ private void bulkLoadHFile(HStoreFile sf) throws IOException {
888895
this.storeSize.addAndGet(r.length());
889896
this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
890897

898+
storeFileTracker.addNewStoreFiles(Lists.newArrayList(sf.getFileInfo()));
891899
// Append the new storefile into the list
892900
this.lock.writeLock().lock();
893901
try {
@@ -1071,6 +1079,7 @@ public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException {
10711079
this.storeSize.addAndGet(r.length());
10721080
this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
10731081

1082+
storeFileTracker.addNewStoreFiles(Lists.newArrayList(sf.getFileInfo()));
10741083
this.lock.writeLock().lock();
10751084
try {
10761085
this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
@@ -1088,9 +1097,20 @@ public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException {
10881097
* @return store file created.
10891098
*/
10901099
private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status)
1091-
throws IOException {
1092-
// Write-out finished successfully, move into the right spot
1093-
Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path);
1100+
throws IOException {
1101+
HRegionFileSystem hfs = getRegionFileSystem();
1102+
Path storeDir = hfs.getStoreDir(getColumnFamilyName());
1103+
Path dstPath;
1104+
// Do not use StoreFileTracker.requireWritingToTmpDirFirst here, as we have lots of places to
1105+
// commit a store file other than flush and compaction, where the store file could be in any
1106+
// places...
1107+
if (path.getParent() != null && path.getParent().equals(storeDir)) {
1108+
// already in the right place, skip renmaing
1109+
dstPath = path;
1110+
} else {
1111+
// Write-out finished successfully, move into the right spot
1112+
dstPath = hfs.commitStoreFile(getColumnFamilyName(), path);
1113+
}
10941114

10951115
status.setStatus("Flushing " + this + ": reopening flushed file");
10961116
HStoreFile sf = createStoreFileAndReader(dstPath);
@@ -1100,30 +1120,33 @@ private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask sta
11001120
this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
11011121

11021122
if (LOG.isInfoEnabled()) {
1103-
LOG.info("Added " + sf + ", entries=" + r.getEntries() +
1104-
", sequenceid=" + logCacheFlushId +
1123+
LOG.info("Added " + sf + ", entries=" + r.getEntries() + ", sequenceid=" + logCacheFlushId +
11051124
", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1));
11061125
}
11071126
return sf;
11081127
}
11091128

1110-
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
1129+
public StoreFileWriter createWriter(long maxKeyCount, Compression.Algorithm compression,
11111130
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
11121131
boolean shouldDropBehind) throws IOException {
1113-
return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
1132+
return createWriter(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
11141133
includesTag, shouldDropBehind, -1, HConstants.EMPTY_STRING);
11151134
}
11161135

11171136
/**
1137+
* Create a writer for writing new store files.
1138+
* <p/>
1139+
* Whether or not creating in tmp dir depends on
1140+
* {@link StoreFileTracker#requireWritingToTmpDirFirst()}
11181141
* @param compression Compression algorithm to use
11191142
* @param isCompaction whether we are creating a new file in a compaction
11201143
* @param includeMVCCReadpoint - whether to include MVCC or not
11211144
* @param includesTag - includesTag or not
1122-
* @return Writer for a new StoreFile in the tmp dir.
1145+
* @return Writer for a new StoreFile
11231146
*/
11241147
// TODO : allow the Writer factory to create Writers of ShipperListener type only in case of
11251148
// compaction
1126-
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
1149+
public StoreFileWriter createWriter(long maxKeyCount, Compression.Algorithm compression,
11271150
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
11281151
boolean shouldDropBehind, long totalCompactedFilesSize, String fileStoragePolicy)
11291152
throws IOException {
@@ -1171,10 +1194,15 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm
11711194
Encryption.Context encryptionContext = storeContext.getEncryptionContext();
11721195
HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
11731196
encryptionContext);
1174-
Path familyTempDir = new Path(getRegionFileSystem().getTempDir(), getColumnFamilyName());
1197+
Path outputDir;
1198+
if (storeFileTracker.requireWritingToTmpDirFirst()) {
1199+
outputDir= new Path(getRegionFileSystem().getTempDir(), getColumnFamilyName());
1200+
} else {
1201+
throw new UnsupportedOperationException("not supported yet");
1202+
}
11751203
StoreFileWriter.Builder builder =
11761204
new StoreFileWriter.Builder(conf, writerCacheConf, getFileSystem())
1177-
.withOutputDir(familyTempDir)
1205+
.withOutputDir(outputDir)
11781206
.withBloomType(storeContext.getBloomFilterType())
11791207
.withMaxKeyCount(maxKeyCount)
11801208
.withFavoredNodes(storeContext.getFavoredNodes())
@@ -1220,6 +1248,7 @@ private long getTotalSize(Collection<HStoreFile> sfs) {
12201248
* @return Whether compaction is required.
12211249
*/
12221250
private boolean updateStorefiles(List<HStoreFile> sfs, long snapshotId) throws IOException {
1251+
storeFileTracker.addNewStoreFiles(StoreUtils.toStoreFileInfo(sfs));
12231252
this.lock.writeLock().lock();
12241253
try {
12251254
this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
@@ -1502,8 +1531,7 @@ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
15021531
// Do the steps necessary to complete the compaction.
15031532
setStoragePolicyFromFileName(newFiles);
15041533
List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
1505-
writeCompactionWalRecord(filesToCompact, sfs);
1506-
replaceStoreFiles(filesToCompact, sfs);
1534+
replaceStoreFiles(filesToCompact, sfs, true);
15071535
if (cr.isMajor()) {
15081536
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
15091537
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
@@ -1557,7 +1585,8 @@ private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr,
15571585
return sfs;
15581586
}
15591587

1560-
// Package-visible for tests
1588+
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
1589+
allowedOnPath = ".*/(HStore|TestCompaction).java")
15611590
HStoreFile moveFileIntoPlace(Path newFile) throws IOException {
15621591
validateStoreFile(newFile);
15631592
// Move the file into the right spot
@@ -1590,8 +1619,15 @@ private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted,
15901619
this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
15911620
}
15921621

1593-
void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result)
1594-
throws IOException {
1622+
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
1623+
allowedOnPath = ".*/(HStore|TestHStore).java")
1624+
void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,
1625+
boolean writeCompactionMarker) throws IOException {
1626+
storeFileTracker.addCompactionResults(StoreUtils.toStoreFileInfo(compactedFiles),
1627+
StoreUtils.toStoreFileInfo(result));
1628+
if (writeCompactionMarker) {
1629+
writeCompactionWalRecord(compactedFiles, result);
1630+
}
15951631
this.lock.writeLock().lock();
15961632
try {
15971633
this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
@@ -1604,8 +1640,8 @@ void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreF
16041640
RegionServerServices rsServices = region.getRegionServerServices();
16051641
if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {
16061642
updateSpaceQuotaAfterFileReplacement(
1607-
rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),
1608-
compactedFiles, result);
1643+
rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),
1644+
compactedFiles, result);
16091645
}
16101646
} finally {
16111647
this.lock.writeLock().unlock();
@@ -1739,7 +1775,7 @@ public void replayCompactionMarker(CompactionDescriptor compaction, boolean pick
17391775
if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
17401776
LOG.info("Replaying compaction marker, replacing input files: " +
17411777
inputStoreFiles + " with output files : " + outputStoreFiles);
1742-
this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
1778+
this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false);
17431779
this.refreshStoreSizeAndTotalBytes();
17441780
}
17451781
}
@@ -1921,8 +1957,7 @@ private void removeUnneededFiles() throws IOException {
19211957
}
19221958

19231959
Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files.
1924-
writeCompactionWalRecord(delSfs, newFiles);
1925-
replaceStoreFiles(delSfs, newFiles);
1960+
replaceStoreFiles(delSfs, newFiles, true);
19261961
refreshStoreSizeAndTotalBytes();
19271962
LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
19281963
+ this + "; total size is "

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public final class StoreContext implements HeapSize {
4949
private final ColumnFamilyDescriptor family;
5050
private final Path familyStoreDirectoryPath;
5151
private final RegionCoprocessorHost coprocessorHost;
52+
private final boolean primary;
5253

5354
private StoreContext(Builder builder) {
5455
this.blockSize = builder.blockSize;
@@ -62,6 +63,7 @@ private StoreContext(Builder builder) {
6263
this.family = builder.family;
6364
this.familyStoreDirectoryPath = builder.familyStoreDirectoryPath;
6465
this.coprocessorHost = builder.coprocessorHost;
66+
this.primary = builder.primary;
6567
}
6668

6769
public int getBlockSize() {
@@ -108,6 +110,10 @@ public RegionCoprocessorHost getCoprocessorHost() {
108110
return coprocessorHost;
109111
}
110112

113+
public boolean isPrimary() {
114+
return primary;
115+
}
116+
111117
public static Builder getBuilder() {
112118
return new Builder();
113119
}
@@ -129,6 +135,7 @@ public static class Builder {
129135
private ColumnFamilyDescriptor family;
130136
private Path familyStoreDirectoryPath;
131137
private RegionCoprocessorHost coprocessorHost;
138+
private boolean primary;
132139

133140
public Builder withBlockSize(int blockSize) {
134141
this.blockSize = blockSize;
@@ -186,9 +193,13 @@ public Builder withRegionCoprocessorHost(RegionCoprocessorHost coprocessorHost)
186193
return this;
187194
}
188195

196+
public Builder withPrimary(boolean primary) {
197+
this.primary = primary;
198+
return this;
199+
}
200+
189201
public StoreContext build() {
190202
return new StoreContext(this);
191203
}
192204
}
193-
194205
}

0 commit comments

Comments
 (0)