Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFileCleanerChore;
import org.apache.hadoop.hbase.mob.MobFileCleanupUtil;
import org.apache.hadoop.hbase.mob.MobStoreEngine;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
Expand Down Expand Up @@ -100,7 +100,6 @@ public class IntegrationTestMobCompaction extends IntegrationTestBase {
private static ColumnFamilyDescriptor familyDescriptor;
private static Admin admin;
private static Table table = null;
private static MobFileCleanerChore chore;

private static volatile boolean run = true;

Expand Down Expand Up @@ -249,12 +248,9 @@ static class CleanMobAndArchive implements Runnable {
public void run() {
while (run) {
try {
LOG.info("MOB cleanup chore started ...");
if (chore == null) {
chore = new MobFileCleanerChore();
}
chore.cleanupObsoleteMobFiles(conf, table.getName());
LOG.info("MOB cleanup chore finished");
LOG.info("MOB cleanup started ...");
MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin);
LOG.info("MOB cleanup finished");

Thread.sleep(130000);
} catch (Exception e) {
Expand Down Expand Up @@ -329,7 +325,7 @@ public void testMobCompaction() throws InterruptedException, IOException {
LOG.info("Waiting for write thread to finish ...");
writeData.join();
// Cleanup again
chore.cleanupObsoleteMobFiles(conf, table.getName());
MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin);

if (util != null) {
LOG.info("Archive cleaner started ...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,20 @@
*/
package org.apache.hadoop.hbase.mob;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;

/**
* The class MobFileCleanerChore for running cleaner regularly to remove the expired and obsolete
* (files which have no active references to) mob files.
Expand Down Expand Up @@ -94,10 +72,6 @@ private void checkObsoleteConfigurations() {
}
}

public MobFileCleanerChore() {
this.master = null;
}

@Override
protected void chore() {
TableDescriptors htds = master.getTableDescriptors();
Expand All @@ -123,204 +97,15 @@ protected void chore() {
try {
// Now clean obsolete files for a table
LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName());
try (final Admin admin = master.getConnection().getAdmin()) {
MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName(),
admin);
}
LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());
} catch (IOException e) {
LOG.error("Failed to clean the obsolete mob files for table={}", htd.getTableName(), e);
}
}
}

/**
* Performs housekeeping file cleaning (called by MOB Cleaner chore)
* @param conf configuration
* @param table table name
* @throws IOException exception
*/
public void cleanupObsoleteMobFiles(Configuration conf, TableName table) throws IOException {

long minAgeToArchive =
conf.getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
// We check only those MOB files, which creation time is less
// than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
// gives us full confidence that all corresponding store files will
// exist at the time cleaning procedure begins and will be examined.
// So, if MOB file creation time is greater than this maxTimeToArchive,
// this will be skipped and won't be archived.
long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - minAgeToArchive;
try (final Connection conn = ConnectionFactory.createConnection(conf);
final Admin admin = conn.getAdmin();) {
TableDescriptor htd = admin.getDescriptor(table);
List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
if (list.size() == 0) {
LOG.info("Skipping non-MOB table [{}]", table);
return;
} else {
LOG.info("Only MOB files whose creation time older than {} will be archived, table={}",
maxCreationTimeToArchive, table);
}

FileSystem fs = FileSystem.get(conf);
Set<String> regionNames = new HashSet<>();
Path rootDir = CommonFSUtils.getRootDir(conf);
Path tableDir = CommonFSUtils.getTableDir(rootDir, table);
List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);

Set<String> allActiveMobFileName = new HashSet<String>();
for (Path regionPath : regionDirs) {
regionNames.add(regionPath.getName());
for (ColumnFamilyDescriptor hcd : list) {
String family = hcd.getNameAsString();
Path storePath = new Path(regionPath, family);
boolean succeed = false;
Set<String> regionMobs = new HashSet<String>();

while (!succeed) {
if (!fs.exists(storePath)) {
String errMsg = String.format("Directory %s was deleted during MOB file cleaner chore"
+ " execution, aborting MOB file cleaner chore.", storePath);
throw new IOException(errMsg);
}
RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(storePath);
List<Path> storeFiles = new ArrayList<Path>();
// Load list of store files first
while (rit.hasNext()) {
Path p = rit.next().getPath();
if (fs.isFile(p)) {
storeFiles.add(p);
}
}
LOG.info("Found {} store files in: {}", storeFiles.size(), storePath);
Path currentPath = null;
try {
for (Path pp : storeFiles) {
currentPath = pp;
LOG.trace("Store file: {}", pp);
HStoreFile sf = null;
byte[] mobRefData = null;
byte[] bulkloadMarkerData = null;
try {
sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true);
sf.initReader();
mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
// close store file to avoid memory leaks
sf.closeStoreFile(true);
} catch (IOException ex) {
// When FileBased SFT is active the store dir can contain corrupted or incomplete
// files. So read errors are expected. We just skip these files.
if (ex instanceof FileNotFoundException) {
throw ex;
}
LOG.debug("Failed to get mob data from file: {} due to error.", pp.toString(),
ex);
continue;
}
if (mobRefData == null) {
if (bulkloadMarkerData == null) {
LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
+ "can not proceed until all old files will be MOB-compacted.", pp);
return;
} else {
LOG.debug("Skipping file without MOB references (bulkloaded file):{}", pp);
continue;
}
}
// file may or may not have MOB references, but was created by the distributed
// mob compaction code.
try {
SetMultimap<TableName, String> mobs =
MobUtils.deserializeMobFileRefs(mobRefData).build();
LOG.debug("Found {} mob references for store={}", mobs.size(), sf);
LOG.trace("Specific mob references found for store={} : {}", sf, mobs);
regionMobs.addAll(mobs.values());
} catch (RuntimeException exception) {
throw new IOException("failure getting mob references for hfile " + sf,
exception);
}
}
} catch (FileNotFoundException e) {
LOG.warn(
"Missing file:{} Starting MOB cleaning cycle from the beginning" + " due to error",
currentPath, e);
regionMobs.clear();
continue;
}
succeed = true;
}

// Add MOB references for current region/family
allActiveMobFileName.addAll(regionMobs);
} // END column families
} // END regions
// Check if number of MOB files too big (over 1M)
if (allActiveMobFileName.size() > 1000000) {
LOG.warn("Found too many active MOB files: {}, table={}, "
+ "this may result in high memory pressure.", allActiveMobFileName.size(), table);
}
LOG.debug("Found: {} active mob refs for table={}", allActiveMobFileName.size(), table);
allActiveMobFileName.stream().forEach(LOG::trace);

// Now scan MOB directories and find MOB files with no references to them
for (ColumnFamilyDescriptor hcd : list) {
List<Path> toArchive = new ArrayList<Path>();
String family = hcd.getNameAsString();
Path dir = MobUtils.getMobFamilyPath(conf, table, family);
RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
while (rit.hasNext()) {
LocatedFileStatus lfs = rit.next();
Path p = lfs.getPath();
String[] mobParts = p.getName().split("_");
String regionName = mobParts[mobParts.length - 1];

if (!regionNames.contains(regionName)) {
// MOB belonged to a region no longer hosted
long creationTime = fs.getFileStatus(p).getModificationTime();
if (creationTime < maxCreationTimeToArchive) {
LOG.trace("Archiving MOB file {} creation time={}", p,
(fs.getFileStatus(p).getModificationTime()));
toArchive.add(p);
} else {
LOG.trace("Skipping fresh file: {}. Creation time={}", p,
fs.getFileStatus(p).getModificationTime());
}
} else {
LOG.trace("Keeping MOB file with existing region: {}", p);
}
}
LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", toArchive.size(),
table, family);
archiveMobFiles(conf, table, family.getBytes(), toArchive);
LOG.info(" MOB Cleaner archived {} files, table={} family={}", toArchive.size(), table,
family);
}
}
}

/**
* Archives the mob files.
* @param conf The current configuration.
* @param tableName The table name.
* @param family The name of the column family.
* @param storeFiles The files to be archived.
* @throws IOException exception
*/
public void archiveMobFiles(Configuration conf, TableName tableName, byte[] family,
List<Path> storeFiles) throws IOException {

if (storeFiles.size() == 0) {
// nothing to remove
LOG.debug("Skipping archiving old MOB files - no files found for table={} cf={}", tableName,
Bytes.toString(family));
return;
}
Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
FileSystem fs = storeFiles.get(0).getFileSystem(conf);

for (Path p : storeFiles) {
LOG.debug("MOB Cleaner is archiving: {}", p);
HFileArchiver.archiveStoreFile(conf, fs, MobUtils.getMobRegionInfo(tableName), mobTableDir,
family, p);
}
}
}
Loading