Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ public static String createFromHFileLink(final Configuration conf, final FileSys
* Create the back reference name
*/
// package-private for testing
static String createBackReferenceName(final String tableNameStr, final String regionName) {
public static String createBackReferenceName(final String tableNameStr, final String regionName) {

return regionName + "." + tableNameStr.replace(TableName.NAMESPACE_DELIM, '=');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public static Reference convert(final FSProtos.Reference r) {
* delimiter, pb reads to EOF which may not be what you want).
* @return This instance serialized as a delimited protobuf w/ a magic pb prefix.
*/
byte[] toByteArray() throws IOException {
public byte[] toByteArray() throws IOException {
return ProtobufUtil.prependPBMagic(convert().toByteArray());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ public class CacheConfig implements PropagatingConfigurationObserver {

private final ByteBuffAllocator byteBuffAllocator;


/**
* Create a cache configuration using the specified configuration object and defaults for family
* level settings. Only use if no column family context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem reg
// to read the hfiles.
storeFileInfo.setConf(storeConfiguration);
Path refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED), tracker);
mergedFiles.add(refFile);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,8 +701,9 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
// table dir. In case of failure, the proc would go through this again, already existing
// region dirs and split files would just be ignored, new split files should get created.
int nbFiles = 0;
final Map<String, Collection<StoreFileInfo>> files =
new HashMap<String, Collection<StoreFileInfo>>(htd.getColumnFamilyCount());
final Map<String, Pair<Collection<StoreFileInfo>, StoreFileTracker>> files =
new HashMap<String, Pair<Collection<StoreFileInfo>, StoreFileTracker>>(
htd.getColumnFamilyCount());
for (ColumnFamilyDescriptor cfd : htd.getColumnFamilies()) {
String family = cfd.getNameAsString();
StoreFileTracker tracker =
Expand All @@ -725,7 +726,7 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
}
if (filteredSfis == null) {
filteredSfis = new ArrayList<StoreFileInfo>(sfis.size());
files.put(family, filteredSfis);
files.put(family, new Pair(filteredSfis, tracker));
}
filteredSfis.add(sfi);
nbFiles++;
Expand All @@ -748,10 +749,12 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);

// Split each store file.
for (Map.Entry<String, Collection<StoreFileInfo>> e : files.entrySet()) {
for (Map.Entry<String, Pair<Collection<StoreFileInfo>, StoreFileTracker>> e : files
.entrySet()) {
byte[] familyName = Bytes.toBytes(e.getKey());
final ColumnFamilyDescriptor hcd = htd.getColumnFamily(familyName);
final Collection<StoreFileInfo> storeFiles = e.getValue();
Pair<Collection<StoreFileInfo>, StoreFileTracker> storeFilesAndTracker = e.getValue();
final Collection<StoreFileInfo> storeFiles = storeFilesAndTracker.getFirst();
if (storeFiles != null && storeFiles.size() > 0) {
final Configuration storeConfiguration =
StoreUtils.createStoreConfiguration(env.getMasterConfiguration(), htd, hcd);
Expand All @@ -762,8 +765,9 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
// is running in a regionserver's Store context, or we might not be able
// to read the hfiles.
storeFileInfo.setConf(storeConfiguration);
StoreFileSplitter sfs = new StoreFileSplitter(regionFs, familyName,
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
StoreFileSplitter sfs =
new StoreFileSplitter(regionFs, storeFilesAndTracker.getSecond(), familyName,
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
futures.add(threadPool.submit(sfs));
}
}
Expand Down Expand Up @@ -829,19 +833,19 @@ private void assertSplitResultFilesCount(final FileSystem fs,
}
}

private Pair<Path, Path> splitStoreFile(HRegionFileSystem regionFs, byte[] family, HStoreFile sf)
throws IOException {
private Pair<Path, Path> splitStoreFile(HRegionFileSystem regionFs, StoreFileTracker tracker,
byte[] family, HStoreFile sf) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("pid=" + getProcId() + " splitting started for store file: " + sf.getPath()
+ " for region: " + getParentRegion().getShortNameToLog());
}

final byte[] splitRow = getSplitRow();
final String familyName = Bytes.toString(family);
final Path path_first =
regionFs.splitStoreFile(this.daughterOneRI, familyName, sf, splitRow, false, splitPolicy);
final Path path_second =
regionFs.splitStoreFile(this.daughterTwoRI, familyName, sf, splitRow, true, splitPolicy);
final Path path_first = regionFs.splitStoreFile(this.daughterOneRI, familyName, sf, splitRow,
false, splitPolicy, tracker);
final Path path_second = regionFs.splitStoreFile(this.daughterTwoRI, familyName, sf, splitRow,
true, splitPolicy, tracker);
if (LOG.isDebugEnabled()) {
LOG.debug("pid=" + getProcId() + " splitting complete for store file: " + sf.getPath()
+ " for region: " + getParentRegion().getShortNameToLog());
Expand All @@ -857,22 +861,25 @@ private class StoreFileSplitter implements Callable<Pair<Path, Path>> {
private final HRegionFileSystem regionFs;
private final byte[] family;
private final HStoreFile sf;
private final StoreFileTracker tracker;

/**
* Constructor that takes what it needs to split
* @param regionFs the file system
* @param family Family that contains the store file
* @param sf which file
*/
public StoreFileSplitter(HRegionFileSystem regionFs, byte[] family, HStoreFile sf) {
public StoreFileSplitter(HRegionFileSystem regionFs, StoreFileTracker tracker, byte[] family,
HStoreFile sf) {
this.regionFs = regionFs;
this.sf = sf;
this.family = family;
this.tracker = tracker;
}

@Override
public Pair<Path, Path> call() throws IOException {
return splitStoreFile(regionFs, family, sf);
return splitStoreFile(regionFs, tracker, family, sf);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
Expand All @@ -49,6 +51,8 @@
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
Expand Down Expand Up @@ -421,7 +425,16 @@ private static Pair<Boolean, Boolean> checkRegionReferences(MasterServices servi
try {
HRegionFileSystem regionFs = HRegionFileSystem
.openRegionFromFileSystem(services.getConfiguration(), fs, tabledir, region, true);
boolean references = regionFs.hasReferences(tableDescriptor);
ColumnFamilyDescriptor[] families = tableDescriptor.getColumnFamilies();
boolean references = false;
for (ColumnFamilyDescriptor cfd : families) {
StoreFileTracker sft = StoreFileTrackerFactory.create(services.getConfiguration(),
tableDescriptor, ColumnFamilyDescriptorBuilder.of(cfd.getNameAsString()), regionFs);
references = references || sft.hasReferences();
if (references) {
break;
}
}
return new Pair<>(Boolean.TRUE, references);
} catch (IOException e) {
LOG.error("Error trying to determine if region {} has references, assuming it does",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -41,10 +42,10 @@ public CachedMobFile(HStoreFile sf) {
}

public static CachedMobFile create(FileSystem fs, Path path, Configuration conf,
CacheConfig cacheConf) throws IOException {
CacheConfig cacheConf, StoreFileTracker sft) throws IOException {
// XXX: primaryReplica is only used for constructing the key of block cache so it is not a
// critical problem if we pass the wrong value, so here we always pass true. Need to fix later.
HStoreFile sf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
HStoreFile sf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true, sft);
return new CachedMobFile(sf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
* @param tableName The current table name.
* @param family The current family.
*/
public void cleanExpiredMobFiles(String tableName, ColumnFamilyDescriptor family)
public void cleanExpiredMobFiles(TableDescriptor htd, ColumnFamilyDescriptor family)
throws IOException {
Configuration conf = getConf();
TableName tn = TableName.valueOf(tableName);
String tableName = htd.getTableName().getNameAsString();
FileSystem fs = FileSystem.get(conf);
LOG.info("Cleaning the expired MOB files of " + family.getNameAsString() + " in " + tableName);
// disable the block cache.
Configuration copyOfConf = new Configuration(conf);
copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
CacheConfig cacheConfig = new CacheConfig(copyOfConf);
MobUtils.cleanExpiredMobFiles(fs, conf, tn, family, cacheConfig,
MobUtils.cleanExpiredMobFiles(fs, conf, htd, family, cacheConfig,
EnvironmentEdgeManager.currentTime());
}

Expand Down Expand Up @@ -107,7 +107,7 @@ public int run(String[] args) throws Exception {
throw new IOException(
"The minVersions of the column family is not 0, could not be handled by this cleaner");
}
cleanExpiredMobFiles(tableName, family);
cleanExpiredMobFiles(htd, family);
return 0;
} finally {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -133,11 +134,11 @@ public void close() throws IOException {
* @param cacheConf The CacheConfig.
* @return An instance of the MobFile.
*/
public static MobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf)
throws IOException {
public static MobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf,
StoreFileTracker sft) throws IOException {
// XXX: primaryReplica is only used for constructing the key of block cache so it is not a
// critical problem if we pass the wrong value, so here we always pass true. Need to fix later.
HStoreFile sf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
HStoreFile sf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true, sft);
return new MobFile(sf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -198,9 +201,11 @@ public void evictFile(String fileName) {
* @param cacheConf The current MobCacheConfig
* @return A opened mob file.
*/
public MobFile openFile(FileSystem fs, Path path, CacheConfig cacheConf) throws IOException {
public MobFile openFile(FileSystem fs, Path path, CacheConfig cacheConf,
StoreContext storeContext) throws IOException {
StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, storeContext);
if (!isCacheEnabled) {
MobFile mobFile = MobFile.create(fs, path, conf, cacheConf);
MobFile mobFile = MobFile.create(fs, path, conf, cacheConf, sft);
mobFile.open();
return mobFile;
} else {
Expand All @@ -214,7 +219,7 @@ public MobFile openFile(FileSystem fs, Path path, CacheConfig cacheConf) throws
if (map.size() > mobFileMaxCacheSize) {
evict();
}
cached = CachedMobFile.create(fs, path, conf, cacheConf);
cached = CachedMobFile.create(fs, path, conf, cacheConf, sft);
cached.open();
map.put(fileName, cached);
miss.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected void chore() {
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
try {
cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd);
cleaner.cleanExpiredMobFiles(htd, hcd);
} catch (IOException e) {
LOG.error("Failed to clean the expired mob files table={} family={}",
htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -90,7 +94,10 @@ public static void cleanupObsoleteMobFiles(Configuration conf, TableName table,
Set<String> allActiveMobFileName = new HashSet<String>();
for (Path regionPath : regionDirs) {
regionNames.add(regionPath.getName());
HRegionFileSystem regionFS =
HRegionFileSystem.create(conf, fs, tableDir, MobUtils.getMobRegionInfo(table));
for (ColumnFamilyDescriptor hcd : list) {
StoreFileTracker sft = StoreFileTrackerFactory.create(conf, htd, hcd, regionFS, false);
String family = hcd.getNameAsString();
Path storePath = new Path(regionPath, family);
boolean succeed = false;
Expand All @@ -102,26 +109,19 @@ public static void cleanupObsoleteMobFiles(Configuration conf, TableName table,
+ " execution, aborting MOB file cleaner chore.", storePath);
throw new IOException(errMsg);
}
RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(storePath);
List<Path> storeFiles = new ArrayList<Path>();
// Load list of store files first
while (rit.hasNext()) {
Path p = rit.next().getPath();
if (fs.isFile(p)) {
storeFiles.add(p);
}
}
LOG.info("Found {} store files in: {}", storeFiles.size(), storePath);
List<StoreFileInfo> storeFileInfos = sft.load();
LOG.info("Found {} store files in: {}", storeFileInfos.size(), storePath);
Path currentPath = null;
try {
for (Path pp : storeFiles) {
for (StoreFileInfo storeFileInfo : storeFileInfos) {
Path pp = storeFileInfo.getPath();
currentPath = pp;
LOG.trace("Store file: {}", pp);
HStoreFile sf = null;
byte[] mobRefData = null;
byte[] bulkloadMarkerData = null;
try {
sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true);
sf = new HStoreFile(storeFileInfo, BloomType.NONE, CacheConfig.DISABLED);
sf.initReader();
mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
Expand Down
Loading