From 15681ebf00fe9884c7cfb17fed2d84e04eec8952 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Wed, 8 Sep 2021 10:31:49 +0100 Subject: [PATCH 1/8] HBASE-26079 Use StoreFileTracker when splitting and merging (#3617) Signed-off-by: Duo Zhang --- .../apache/hadoop/hbase/regionserver/HRegionFileSystem.java | 2 +- .../hbase/regionserver/TestMergesSplitsAddToTracker.java | 4 ++++ .../regionserver/storefiletracker/TestStoreFileTracker.java | 1 - 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 4bcebd96ed23..7cd5c5337bc4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -606,7 +606,7 @@ public Path commitDaughterRegion(final RegionInfo regionInfo, List allRegi writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent); HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false); - insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs); + insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs); } return regionDir; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java index 68fc444493c4..7544426824eb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java @@ -18,7 +18,11 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory. +<<<<<<< HEAD TRACKER_IMPL; +======= + TRACK_IMPL; +>>>>>>> HBASE-26079 Use StoreFileTracker when splitting and merging (#3617) import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java index b30ca47772cb..98189729ac75 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java @@ -47,7 +47,6 @@ public TestStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreC } else { LOG.info("ctx.getRegionFileSystem() returned null. Leaving storeId null."); } - } @Override From 4fa03c3290824bc9513215380b4dc5e3b3cb46a5 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 15 Sep 2021 23:00:03 +0800 Subject: [PATCH 2/8] HBASE-26264 Add more checks to prevent misconfiguration on store file tracker (#3681) Signed-off-by: Josh Elser --- .../MigrationStoreFileTracker.java | 8 + .../StoreFileTrackerFactory.java | 138 ++++++++++++++++++ 2 files changed, 146 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java index a6648f291e43..5bdcafe588f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java @@ -93,4 +93,12 @@ static Class getSrcTrackerClass(Configuration conf) static Class getDstTrackerClass(Configuration conf) { return StoreFileTrackerFactory.getStoreFileTrackerClassForMigration(conf, DST_IMPL); } + + static Class getSrcTrackerClass(Configuration conf) { + return StoreFileTrackerFactory.getStoreFileTrackerClassForMigration(conf, SRC_IMPL); + } + + static Class getDstTrackerClass(Configuration conf) { + return StoreFileTrackerFactory.getStoreFileTrackerClassForMigration(conf, DST_IMPL); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java index 1c683ae3de62..7689d4fc8434 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java @@ -311,4 +311,142 @@ public static void checkForModifyTable(Configuration conf, TableDescriptor oldTa } } } + + // should not use MigrationStoreFileTracker for new family + private static void checkForNewFamily(Configuration conf, TableDescriptor table, + ColumnFamilyDescriptor family) throws IOException { + Configuration mergedConf = mergeConfigurations(conf, table, family); + Class tracker = getTrackerClass(mergedConf); + if (MigrationStoreFileTracker.class.isAssignableFrom(tracker)) { + throw new DoNotRetryIOException( + "Should not use " + Trackers.MIGRATION + " as store file tracker for new family " + + family.getNameAsString() + " of table " + table.getTableName()); + } + } + + /** + * Pre check when creating a new table. + *

+ * For now, only make sure that we do not use {@link Trackers#MIGRATION} for newly created tables. + * @throws IOException when there are check errors, the upper layer should fail the + * {@code CreateTableProcedure}. + */ + public static void checkForCreateTable(Configuration conf, TableDescriptor table) + throws IOException { + for (ColumnFamilyDescriptor family : table.getColumnFamilies()) { + checkForNewFamily(conf, table, family); + } + } + + + /** + * Pre check when modifying a table. + *

+ * The basic idea is when you want to change the store file tracker implementation, you should use + * {@link Trackers#MIGRATION} first and then change to the destination store file tracker + * implementation. + *

+ * There are several rules: + *

    + *
  • For newly added family, you should not use {@link Trackers#MIGRATION}.
  • + *
  • For modifying a family: + *
      + *
    • If old tracker is {@link Trackers#MIGRATION}, then: + *
        + *
      • The new tracker is also {@link Trackers#MIGRATION}, then they must have the same src and + * dst tracker.
      • + *
      • The new tracker is not {@link Trackers#MIGRATION}, then the new tracker must be the dst + * tracker of the old tracker.
      • + *
      + *
    • + *
    • If the old tracker is not {@link Trackers#MIGRATION}, then: + *
        + *
      • If the new tracker is {@link Trackers#MIGRATION}, then the old tracker must be the src + * tracker of the new tracker.
      • + *
      • If the new tracker is not {@link Trackers#MIGRATION}, then the new tracker must be the same + * with old tracker.
      • + *
      + *
    • + *
    + *
  • + *
+ * @throws IOException when there are check errors, the upper layer should fail the + * {@code ModifyTableProcedure}. + */ + public static void checkForModifyTable(Configuration conf, TableDescriptor oldTable, + TableDescriptor newTable) throws IOException { + for (ColumnFamilyDescriptor newFamily : newTable.getColumnFamilies()) { + ColumnFamilyDescriptor oldFamily = oldTable.getColumnFamily(newFamily.getName()); + if (oldFamily == null) { + checkForNewFamily(conf, newTable, newFamily); + continue; + } + Configuration oldConf = mergeConfigurations(conf, oldTable, oldFamily); + Configuration newConf = mergeConfigurations(conf, newTable, newFamily); + + Class oldTracker = getTrackerClass(oldConf); + Class newTracker = getTrackerClass(newConf); + + if (MigrationStoreFileTracker.class.isAssignableFrom(oldTracker)) { + Class oldSrcTracker = + MigrationStoreFileTracker.getSrcTrackerClass(oldConf); + Class oldDstTracker = + MigrationStoreFileTracker.getDstTrackerClass(oldConf); + if (oldTracker.equals(newTracker)) { + // confirm that we have the same src tracker and dst tracker + Class newSrcTracker = + MigrationStoreFileTracker.getSrcTrackerClass(newConf); + if (!oldSrcTracker.equals(newSrcTracker)) { + throw new DoNotRetryIOException( + "The src tracker has been changed from " + getStoreFileTrackerName(oldSrcTracker) + + " to " + getStoreFileTrackerName(newSrcTracker) + " for family " + + newFamily.getNameAsString() + " of table " + newTable.getTableName()); + } + Class newDstTracker = + MigrationStoreFileTracker.getDstTrackerClass(newConf); + if (!oldDstTracker.equals(newDstTracker)) { + throw new DoNotRetryIOException( + "The dst tracker has been changed from " + getStoreFileTrackerName(oldDstTracker) + + " to " + getStoreFileTrackerName(newDstTracker) + " for family " + + newFamily.getNameAsString() + " of table " + newTable.getTableName()); + } + } else { + // we can only change to the dst tracker + if (!newTracker.equals(oldDstTracker)) { + throw new DoNotRetryIOException( + "Should migrate tracker to " + getStoreFileTrackerName(oldDstTracker) + " but got " + + getStoreFileTrackerName(newTracker) + " for family " + newFamily.getNameAsString() + + " of table " + newTable.getTableName()); + } + } + } else { + if (!oldTracker.equals(newTracker)) { + // can only change to MigrationStoreFileTracker and the src tracker should be the old + // tracker + if (!MigrationStoreFileTracker.class.isAssignableFrom(newTracker)) { + throw new DoNotRetryIOException("Should change to " + Trackers.MIGRATION + + " first when migrating from " + getStoreFileTrackerName(oldTracker) + " for family " + + newFamily.getNameAsString() + " of table " + newTable.getTableName()); + } + Class newSrcTracker = + MigrationStoreFileTracker.getSrcTrackerClass(newConf); + if (!oldTracker.equals(newSrcTracker)) { + throw new DoNotRetryIOException( + "Should use src tracker " + getStoreFileTrackerName(oldTracker) + " first but got " + + getStoreFileTrackerName(newSrcTracker) + " when migrating from " + + getStoreFileTrackerName(oldTracker) + " for family " + newFamily.getNameAsString() + + " of table " + newTable.getTableName()); + } + Class newDstTracker = + MigrationStoreFileTracker.getDstTrackerClass(newConf); + // the src and dst tracker should not be the same + if (newSrcTracker.equals(newDstTracker)) { + throw new DoNotRetryIOException("The src tracker and dst tracker are both " + + getStoreFileTrackerName(newSrcTracker) + " for family " + + newFamily.getNameAsString() + " of table " + newTable.getTableName()); + } + } + } + } + } } From b047f090d14322ecae6bdd42bb46ee4423998b1f Mon Sep 17 00:00:00 2001 From: Szabolcs Bukros Date: Fri, 22 Oct 2021 15:52:13 +0200 Subject: [PATCH 3/8] HBASE-26271: Cleanup the broken store files under data directory Add new chore to delete lefotver files in case file based storefile handling is used Expose the target files of currently running compactions for easier validation --- .../hbase/mob/DefaultMobStoreCompactor.java | 6 +- .../regionserver/AbstractMultiFileWriter.java | 2 +- .../DateTieredMultiFileWriter.java | 2 +- .../FileBasedStoreFileCleaner.java | 191 ++++++++++++++++++ .../hbase/regionserver/HRegionServer.java | 27 +++ .../hbase/regionserver/StoreEngine.java | 4 + .../regionserver/StripeMultiFileWriter.java | 2 +- .../AbstractMultiOutputCompactor.java | 3 +- .../regionserver/compactions/Compactor.java | 39 +++- .../compactions/DateTieredCompactor.java | 6 +- .../compactions/DefaultCompactor.java | 10 +- .../compactions/StripeCompactor.java | 3 +- .../FileBasedStoreFileTracker.java | 2 +- .../MigrationStoreFileTracker.java | 2 +- .../storefiletracker/StoreFileTracker.java | 6 + .../StoreFileTrackerBase.java | 2 +- .../hbase/mob/FaultyMobStoreCompactor.java | 2 +- .../regionserver/TestCompactorMemLeak.java | 4 +- .../TestFileBasedStoreFileCleaner.java | 180 +++++++++++++++++ 19 files changed, 468 insertions(+), 25 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FileBasedStoreFileCleaner.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFileBasedStoreFileCleaner.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 01fe0005f048..e250c0342caa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -286,7 +286,6 @@ private void calculateMobLengthMap(SetMultimap mobRefs) throw * * @param fd File details * @param scanner Where to read from. - * @param writer Where to write to. * @param smallestReadPoint Smallest read point. * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint * @param throughputController The compaction throughput controller. @@ -295,7 +294,7 @@ private void calculateMobLengthMap(SetMultimap mobRefs) throw * @return Whether compaction ended; false if it was interrupted for any reason. */ @Override - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, boolean major, int numofFilesToCompact) throws IOException { long bytesWrittenProgressForLog = 0; @@ -665,12 +664,13 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId @Override - protected List commitWriter(StoreFileWriter writer, FileDetails fd, + protected List commitWriter(FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = Lists.newArrayList(writer.getPath()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); writer.appendMobMetadata(mobRefSet.get()); writer.close(); + writer = null; clearThreadLocals(); return newFiles; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java index f250304952a3..394c31259cf0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java @@ -110,7 +110,7 @@ public List abortWriters() { return paths; } - protected abstract Collection writers(); + public abstract Collection writers(); /** * Subclasses override this method to be called at the end of a successful sequence of append; all diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java index 8201cb152c01..1e10eb2db231 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java @@ -71,7 +71,7 @@ public void append(Cell cell) throws IOException { } @Override - protected Collection writers() { + public Collection writers() { return lowerBoundary2Writer.values(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FileBasedStoreFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FileBasedStoreFileCleaner.java new file mode 100644 index 000000000000..06a12ac6d86d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FileBasedStoreFileCleaner.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This Chore, every time it runs, will clear the unsused HFiles in the data + * folder. + */ +@InterfaceAudience.Private public class FileBasedStoreFileCleaner extends ScheduledChore { + private static final Logger LOG = LoggerFactory.getLogger(FileBasedStoreFileCleaner.class); + public static final String FILEBASED_STOREFILE_CLEANER_ENABLED = + "hbase.region.filebased.storefilecleaner.enabled"; + public static final boolean DEFAULT_FILEBASED_STOREFILE_CLEANER_ENABLED = false; + public static final String FILEBASED_STOREFILE_CLEANER_TTL = + "hbase.region.filebased.storefilecleaner.ttl"; + public static final long DEFAULT_FILEBASED_STOREFILE_CLEANER_TTL = 1000 * 60 * 60 * 12; //12h + public static final String FILEBASED_STOREFILE_CLEANER_DELAY = + "hbase.region.filebased.storefilecleaner.delay"; + public static final int DEFAULT_FILEBASED_STOREFILE_CLEANER_DELAY = 1000 * 60 * 60 * 2; //2h + public static final String FILEBASED_STOREFILE_CLEANER_DELAY_JITTER = + "hbase.region.filebased.storefilecleaner.delay.jitter"; + public static final double DEFAULT_FILEBASED_STOREFILE_CLEANER_DELAY_JITTER = 0.25D; + public static final String FILEBASED_STOREFILE_CLEANER_PERIOD = + "hbase.region.filebased.storefilecleaner.period"; + public static final int DEFAULT_FILEBASED_STOREFILE_CLEANER_PERIOD = 1000 * 60 * 60 * 6; //6h + + private HRegionServer regionServer; + private final AtomicBoolean enabled = new AtomicBoolean(true); + private long ttl; + + public FileBasedStoreFileCleaner(final int delay, final int period, final Stoppable stopper, Configuration conf, + HRegionServer regionServer) { + super("FileBasedStoreFileCleaner", stopper, period, delay); + this.regionServer = regionServer; + setEnabled(conf.getBoolean(FILEBASED_STOREFILE_CLEANER_ENABLED, DEFAULT_FILEBASED_STOREFILE_CLEANER_ENABLED)); + ttl = conf.getLong(FILEBASED_STOREFILE_CLEANER_TTL, DEFAULT_FILEBASED_STOREFILE_CLEANER_TTL); + } + + public boolean setEnabled(final boolean enabled) { + return this.enabled.getAndSet(enabled); + } + + public boolean getEnabled() { + return this.enabled.get(); + } + + @InterfaceAudience.Private + @Override public void chore() { + if (getEnabled()) { + long start = EnvironmentEdgeManager.currentTime(); + AtomicLong deletedFiles = new AtomicLong(0); + AtomicLong failedDeletes = new AtomicLong(0); + for (HRegion region : regionServer.getRegions()) { + for (HStore store : region.getStores()) { + //only clean do cleanup in store using file based storefile tracking + if (store.getStoreEngine().requireWritingToTmpDirFirst()) { + continue; + } + Path storePath = + new Path(region.getRegionFileSystem().getRegionDir(), store.getColumnFamilyName()); + + try { + List fsStoreFiles = Arrays.asList(region.getRegionFileSystem().fs.listStatus(storePath)); + fsStoreFiles.forEach(file -> cleanFileIfNeeded(file, store, deletedFiles, failedDeletes)); + } catch (IOException e) { + LOG.warn("Failed to list files in {}, cleanup is skipped there",storePath); + continue; + } + } + } + LOG.debug( + "FileBasedStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed to delete {}", + regionServer.getServerName().getServerName(), EnvironmentEdgeManager.currentTime() - start, + deletedFiles.get(), failedDeletes.get()); + } else { + LOG.trace("File based storefile Cleaner chore disabled! Not cleaning."); + } + } + + private void cleanFileIfNeeded(FileStatus file, HStore store, + AtomicLong deletedFiles, AtomicLong failedDeletes) { + if(file.isDirectory()){ + LOG.trace("This is a Directory {}, skip cleanup", file.getPath()); + return; + } + + if(!validate(file.getPath())){ + LOG.trace("Invalid file {}, skip cleanup", file.getPath()); + return; + } + + if(!isOldEnough(file)){ + LOG.trace("Fresh file {}, skip cleanup", file.getPath()); + return; + } + + if(isActiveStorefile(file, store)){ + LOG.trace("Actively used storefile file {}, skip cleanup", file.getPath()); + return; + } + + if(isCompactedFile(file, store)){ + LOG.trace("Cleanup is done by a different chore for file {}, skip cleanup", file.getPath()); + return; + } + + if(isCompactingFile(file, store)){ + LOG.trace("The file is the result of an ongoing compaction {}, skip cleanup", file.getPath()); + return; + } + + deleteFile(file, store, deletedFiles, failedDeletes); + } + + private boolean isCompactingFile(FileStatus file, HStore store) { + return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath()); + } + + private boolean isCompactedFile(FileStatus file, HStore store) { + return store.getStoreEngine().getStoreFileManager().getCompactedfiles().stream().anyMatch(sf -> sf.getPath().equals(file.getPath())); + } + + private boolean isActiveStorefile(FileStatus file, HStore store) { + return store.getStoreEngine().getStoreFileManager().getStorefiles().stream().anyMatch(sf -> sf.getPath().equals(file.getPath())); + } + + boolean validate(Path file) { + if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) { + return true; + } + return StoreFileInfo.validateStoreFileName(file.getName()); + } + + boolean isOldEnough(FileStatus file){ + return file.getModificationTime() + ttl < System.currentTimeMillis(); + } + + private void deleteFile(FileStatus file, HStore store, AtomicLong deletedFiles, AtomicLong failedDeletes) { + Path filePath = file.getPath(); + LOG.debug("Removing {} from store", filePath); + try { + boolean success = store.getFileSystem().delete(filePath, false); + if (!success) { + failedDeletes.incrementAndGet(); + LOG.warn("Attempted to delete:" + filePath + + ", but couldn't. Attempt to delete on next pass."); + } + else{ + deletedFiles.incrementAndGet(); + } + } catch (IOException e) { + e = e instanceof RemoteException ? + ((RemoteException)e).unwrapRemoteException() : e; + LOG.warn("Error while deleting: " + filePath, e); + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 4bf2d9c25f1d..229bf2f16744 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -432,6 +432,8 @@ public class HRegionServer extends HBaseServerBase */ final ServerNonceManager nonceManager; + private FileBasedStoreFileCleaner fileBasedStoreFileCleaner; + @InterfaceAudience.Private CompactedHFilesDischarger compactedFileDischarger; @@ -1831,6 +1833,9 @@ private void startServices() throws IOException { if (this.slowLogTableOpsChore != null) { choreService.scheduleChore(slowLogTableOpsChore); } + if (this.fileBasedStoreFileCleaner != null) { + choreService.scheduleChore(fileBasedStoreFileCleaner); + } // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. @@ -1910,6 +1915,22 @@ private void initializeThreads() { this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this); } + + int fileBasedStoreFileCleanerPeriod = conf.getInt( + FileBasedStoreFileCleaner.FILEBASED_STOREFILE_CLEANER_PERIOD, + FileBasedStoreFileCleaner.DEFAULT_FILEBASED_STOREFILE_CLEANER_PERIOD); + int fileBasedStoreFileCleanerDelay = conf.getInt( + FileBasedStoreFileCleaner.FILEBASED_STOREFILE_CLEANER_DELAY, + FileBasedStoreFileCleaner.DEFAULT_FILEBASED_STOREFILE_CLEANER_DELAY); + double fileBasedStoreFileCleanerDelayJitter = conf.getDouble( + FileBasedStoreFileCleaner.FILEBASED_STOREFILE_CLEANER_DELAY_JITTER, + FileBasedStoreFileCleaner.DEFAULT_FILEBASED_STOREFILE_CLEANER_DELAY_JITTER); + double jitterRate = (RandomUtils.nextDouble() - 0.5D) * fileBasedStoreFileCleanerDelayJitter; + long jitterValue = Math.round(fileBasedStoreFileCleanerDelay * jitterRate); + this.fileBasedStoreFileCleaner = + new FileBasedStoreFileCleaner((int) (fileBasedStoreFileCleanerDelay + jitterValue), + fileBasedStoreFileCleanerPeriod, this, conf, this); + registerConfigurationObservers(); } @@ -3484,6 +3505,11 @@ protected boolean clusterMode() { return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false); } + @InterfaceAudience.Private + public FileBasedStoreFileCleaner getFileBasedStoreFileCleaner(){ + return fileBasedStoreFileCleaner; + } + @Override protected void stopChores() { shutdownChore(nonceManagerChore); @@ -3494,5 +3520,6 @@ protected void stopChores() { shutdownChore(storefileRefresher); shutdownChore(fsUtilizationChore); shutdownChore(slowLogTableOpsChore); + shutdownChore(fileBasedStoreFileCleaner); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java index e6f939500192..60607bdbb5cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java @@ -517,4 +517,8 @@ public void removeCompactedFiles(Collection compactedFiles) { throw new IOException("Unable to load configured store engine '" + className + "'", e); } } + + public boolean requireWritingToTmpDirFirst() { + return storeFileTracker.requireWritingToTmpDirFirst(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java index fc0598d89ac0..a4e943ac8b04 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java @@ -58,7 +58,7 @@ public void setNoStripeMetadata() { } @Override - protected Collection writers() { + public Collection writers() { return existingWriters; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java index 533be176e7a7..c505eb5cd25f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java @@ -68,7 +68,7 @@ public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy) } @Override - protected void abortWriter(T writer) throws IOException { + protected void abortWriter() throws IOException { FileSystem fs = store.getFileSystem(); for (Path leftoverFile : writer.abortWriters()) { try { @@ -79,5 +79,6 @@ protected void abortWriter(T writer) throws IOException { e); } } + writer = null; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 47ef0f290251..4b0aff6686d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -25,9 +25,12 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -37,6 +40,7 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileInfo; +import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter; import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; import org.apache.hadoop.hbase.regionserver.HStore; @@ -92,6 +96,8 @@ public abstract class Compactor { private final boolean dropCacheMajor; private final boolean dropCacheMinor; + protected T writer = null; + //TODO: depending on Store is not good but, realistically, all compactors currently do. Compactor(Configuration conf, HStore store) { this.conf = conf; @@ -324,7 +330,6 @@ protected final List compact(final CompactionRequestImpl request, // Find the smallest read point across all the Scanners. long smallestReadPoint = getSmallestReadPoint(); - T writer = null; boolean dropCache; if (request.isMajor() || request.isAllFiles()) { dropCache = this.dropCacheMajor; @@ -348,8 +353,14 @@ protected final List compact(final CompactionRequestImpl request, smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); cleanSeqId = true; } + if (writer != null){ + LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream() + .map(n -> n.toString()) + .collect(Collectors.joining(", ", "{ ", " }"))); + writer = null; + } writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor()); - finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, + finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId, throughputController, request.isAllFiles(), request.getFiles().size()); if (!finished) { throw new InterruptedIOException("Aborting compaction of store " + store + " in region " @@ -369,24 +380,23 @@ protected final List compact(final CompactionRequestImpl request, Closeables.close(scanner, true); } if (!finished && writer != null) { - abortWriter(writer); + abortWriter(); } } assert finished : "We should have exited the method on all error paths"; assert writer != null : "Writer should be non-null if no error"; - return commitWriter(writer, fd, request); + return commitWriter(fd, request); } - protected abstract List commitWriter(T writer, FileDetails fd, + protected abstract List commitWriter(FileDetails fd, CompactionRequestImpl request) throws IOException; - protected abstract void abortWriter(T writer) throws IOException; + protected abstract void abortWriter() throws IOException; /** * Performs the compaction. * @param fd FileDetails of cell sink writer * @param scanner Where to read from. - * @param writer Where to write to. * @param smallestReadPoint Smallest read point. * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= * smallestReadPoint @@ -394,7 +404,7 @@ protected abstract List commitWriter(T writer, FileDetails fd, * @param numofFilesToCompact the number of files to compact * @return Whether compaction ended; false if it was interrupted for some reason. */ - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, boolean major, int numofFilesToCompact) throws IOException { assert writer instanceof ShipperListener; @@ -537,4 +547,17 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs, dropDeletesFromRow, dropDeletesToRow); } + + public List getCompactionTargets(){ + if (writer == null){ + return Collections.emptyList(); + } + synchronized (writer){ + if (writer instanceof StoreFileWriter){ + return Arrays.asList(((StoreFileWriter)writer).getPath()); + } + return ((AbstractMultiFileWriter)writer).writers().stream().map(sfw -> sfw.getPath()).collect( + Collectors.toList()); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java index fd5433082903..7be2e69caa2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -79,8 +79,10 @@ public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetai } @Override - protected List commitWriter(DateTieredMultiFileWriter writer, FileDetails fd, + protected List commitWriter(FileDetails fd, CompactionRequestImpl request) throws IOException { - return writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles()); + List pathList = writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles()); + writer = null; + return pathList; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index afa2429cb6e8..fff56637fdd0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -63,15 +63,20 @@ public List compact(final CompactionRequestImpl request, } @Override - protected List commitWriter(StoreFileWriter writer, FileDetails fd, + protected List commitWriter(FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = Lists.newArrayList(writer.getPath()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); writer.close(); + writer = null; return newFiles; } @Override + protected void abortWriter() throws IOException { + abortWriter(writer); + } + protected void abortWriter(StoreFileWriter writer) throws IOException { Path leftoverFile = writer.getPath(); try { @@ -79,6 +84,9 @@ protected void abortWriter(StoreFileWriter writer) throws IOException { } catch (IOException e) { LOG.warn("Failed to close the writer after an unfinished compaction.", e); } + finally { + writer = null; + } try { store.getFileSystem().delete(leftoverFile, false); } catch (IOException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 547555e3812e..5573f6e90f58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -125,9 +125,10 @@ public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails f } @Override - protected List commitWriter(StripeMultiFileWriter writer, FileDetails fd, + protected List commitWriter(FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles()); + writer = null; assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata."; return newFiles; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java index 079b59ba0274..8d9b66e53d2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java @@ -95,7 +95,7 @@ public List load() throws IOException { } @Override - protected boolean requireWritingToTmpDirFirst() { + public boolean requireWritingToTmpDirFirst() { return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java index 5bdcafe588f0..5fa651dc5484 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java @@ -57,7 +57,7 @@ public List load() throws IOException { } @Override - protected boolean requireWritingToTmpDirFirst() { + public boolean requireWritingToTmpDirFirst() { // Returns true if either of the two StoreFileTracker returns true. // For example, if we want to migrate from a tracker implementation which can ignore the broken // files under data directory to a tracker implementation which can not, if we still allow diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java index f56a0dde4741..e8a73a6948ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java @@ -88,4 +88,10 @@ void replace(Collection compactedFiles, Collection * @param builder The table descriptor builder for the given table. */ TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder); + + /** + * Whether the implementation of this tracker requires you to write to temp directory first, i.e, + * does not allow broken store files under the actual data directory. + */ + public boolean requireWritingToTmpDirFirst(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java index b6de32b09a0d..0e2898d0fa3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java @@ -177,7 +177,7 @@ public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) th * Whether the implementation of this tracker requires you to write to temp directory first, i.e, * does not allow broken store files under the actual data directory. */ - protected abstract boolean requireWritingToTmpDirFirst(); + public abstract boolean requireWritingToTmpDirFirst(); protected abstract void doAddNewStoreFiles(Collection newFiles) throws IOException; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java index 50530dad69e7..1196c5219ee2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java @@ -89,7 +89,7 @@ public FaultyMobStoreCompactor(Configuration conf, HStore store) { } @Override - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, boolean major, int numofFilesToCompact) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java index e0fca1fea7c5..6a0a8baa9ded 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java @@ -128,13 +128,13 @@ public MyCompactor(Configuration conf, HStore store) { } @Override - protected List commitWriter(StoreFileWriter writer, FileDetails fd, + protected List commitWriter(FileDetails fd, CompactionRequestImpl request) throws IOException { HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer; Cell cell = writerImpl.getLastCell(); // The cell should be backend with an KeyOnlyKeyValue. IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue); - return super.commitWriter(writer, fd, request); + return super.commitWriter(fd, request); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFileBasedStoreFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFileBasedStoreFileCleaner.java new file mode 100644 index 000000000000..5aa2a382156f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFileBasedStoreFileCleaner.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import java.io.IOException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category({ MediumTests.class, RegionServerTests.class }) +public class TestFileBasedStoreFileCleaner { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFileBasedStoreFileCleaner.class); + + private final HBaseTestingUtil testUtil = new HBaseTestingUtil(); + private final static byte[] fam = Bytes.toBytes("cf_1"); + private final static byte[] qual1 = Bytes.toBytes("qf_1"); + private final static byte[] val = Bytes.toBytes("val"); + private final static String junkFileName = "409fad9a751c4e8c86d7f32581bdc156"; + TableName tableName; + + + @Before + public void setUp() throws Exception { + testUtil.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL, "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); + testUtil.getConfiguration().set(FileBasedStoreFileCleaner.FILEBASED_STOREFILE_CLEANER_ENABLED, "true"); + testUtil.getConfiguration().set(FileBasedStoreFileCleaner.FILEBASED_STOREFILE_CLEANER_TTL, "0"); + testUtil.getConfiguration().set(FileBasedStoreFileCleaner.FILEBASED_STOREFILE_CLEANER_PERIOD, "15000000"); + testUtil.getConfiguration().set(FileBasedStoreFileCleaner.FILEBASED_STOREFILE_CLEANER_DELAY, "0"); + testUtil.startMiniCluster(1); + } + + @After + public void tearDown() throws Exception { + testUtil.deleteTable(tableName); + testUtil.shutdownMiniCluster(); + } + + @Test + public void testDeletingJunkFile() throws Exception { + tableName = TableName.valueOf(getClass().getSimpleName() + "testDeletingJunkFile"); + createTableWithData(tableName); + + HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0); + ServerName sn = testUtil.getMiniHBaseCluster().getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName()); + HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn); + FileBasedStoreFileCleaner cleaner = rs.getFileBasedStoreFileCleaner(); + + //create junk file + HStore store = region.getStore(fam); + Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName()); + Path junkFilePath = new Path(cfPath, junkFileName); + + FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath); + junkFileOS.writeUTF("hello"); + junkFileOS.close(); + + int storeFiles = store.getStorefilesCount(); + assertTrue(storeFiles > 0); + + //verify the file exist before the chore and missing afterwards + assertTrue(store.getFileSystem().exists(junkFilePath)); + cleaner.chore(); + assertFalse(store.getFileSystem().exists(junkFilePath)); + + //verify no storefile got deleted + int currentStoreFiles = store.getStorefilesCount(); + assertEquals(currentStoreFiles, storeFiles); + + } + + @Test + public void testSkippningCompactedFiles() throws Exception { + tableName = TableName.valueOf(getClass().getSimpleName() + "testSkippningCompactedFiles"); + createTableWithData(tableName); + + HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0); + + ServerName sn = testUtil.getMiniHBaseCluster().getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName()); + HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn); + FileBasedStoreFileCleaner cleaner = rs.getFileBasedStoreFileCleaner(); + + //run major compaction to generate compaced files + region.compact(true); + + //make sure there are compacted files + HStore store = region.getStore(fam); + int compactedFiles = store.getCompactedFilesCount(); + assertTrue(compactedFiles > 0); + + cleaner.chore(); + + //verify none of the compacted files wee deleted + int existingCompactedFiles = store.getCompactedFilesCount(); + assertEquals(compactedFiles, existingCompactedFiles); + + //verify adding a junk file does not break anything + Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName()); + Path junkFilePath = new Path(cfPath, junkFileName); + + FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath); + junkFileOS.writeUTF("hello"); + junkFileOS.close(); + + assertTrue(store.getFileSystem().exists(junkFilePath)); + cleaner.setEnabled(true); + cleaner.chore(); + assertFalse(store.getFileSystem().exists(junkFilePath)); + + //verify compacted files are still intact + existingCompactedFiles = store.getCompactedFilesCount(); + assertEquals(compactedFiles, existingCompactedFiles); + } + + private Table createTableWithData(TableName tableName) throws IOException { + Table table = testUtil.createTable(tableName, fam); + try { + for (int i = 1; i < 10; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + table.put(p); + } + // flush them + testUtil.getAdmin().flush(tableName); + for (int i = 11; i < 20; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + table.put(p); + } + // flush them + testUtil.getAdmin().flush(tableName); + for (int i = 21; i < 30; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + table.put(p); + } + // flush them + testUtil.getAdmin().flush(tableName); + } catch (IOException e) { + table.close(); + throw e; + } + return table; + } +} From 2f5cd86d67c730bbd09c41db394cc5f70de91b52 Mon Sep 17 00:00:00 2001 From: Szabolcs Bukros Date: Thu, 28 Oct 2021 17:28:45 +0200 Subject: [PATCH 4/8] HBASE-26271: Cleanup the broken store files under data directory fixes based on feedback --- .../hbase/mob/DefaultMobStoreCompactor.java | 1 - ...eaner.java => BrokenStoreFileCleaner.java} | 50 +++++++++---------- .../hbase/regionserver/HRegionServer.java | 40 +++++++-------- .../hadoop/hbase/regionserver/HStore.java | 2 + .../hbase/regionserver/StoreEngine.java | 4 ++ .../regionserver/compactions/Compactor.java | 8 ++- .../compactions/DateTieredCompactor.java | 1 - .../compactions/DefaultCompactor.java | 1 - .../compactions/StripeCompactor.java | 1 - .../storefiletracker/StoreFileTracker.java | 2 +- .../StoreFileTrackerBase.java | 6 --- ...r.java => TestBrokenStoreFileCleaner.java} | 18 +++---- 12 files changed, 68 insertions(+), 66 deletions(-) rename hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/{FileBasedStoreFileCleaner.java => BrokenStoreFileCleaner.java} (74%) rename hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/{TestFileBasedStoreFileCleaner.java => TestBrokenStoreFileCleaner.java} (89%) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index e250c0342caa..a45e6a2c3e3b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -670,7 +670,6 @@ protected List commitWriter(FileDetails fd, writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); writer.appendMobMetadata(mobRefSet.get()); writer.close(); - writer = null; clearThreadLocals(); return newFiles; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FileBasedStoreFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java similarity index 74% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FileBasedStoreFileCleaner.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java index 06a12ac6d86d..fe5a7e0b6ff3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FileBasedStoreFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -29,7 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -39,34 +39,34 @@ * This Chore, every time it runs, will clear the unsused HFiles in the data * folder. */ -@InterfaceAudience.Private public class FileBasedStoreFileCleaner extends ScheduledChore { - private static final Logger LOG = LoggerFactory.getLogger(FileBasedStoreFileCleaner.class); - public static final String FILEBASED_STOREFILE_CLEANER_ENABLED = - "hbase.region.filebased.storefilecleaner.enabled"; - public static final boolean DEFAULT_FILEBASED_STOREFILE_CLEANER_ENABLED = false; - public static final String FILEBASED_STOREFILE_CLEANER_TTL = - "hbase.region.filebased.storefilecleaner.ttl"; - public static final long DEFAULT_FILEBASED_STOREFILE_CLEANER_TTL = 1000 * 60 * 60 * 12; //12h - public static final String FILEBASED_STOREFILE_CLEANER_DELAY = - "hbase.region.filebased.storefilecleaner.delay"; - public static final int DEFAULT_FILEBASED_STOREFILE_CLEANER_DELAY = 1000 * 60 * 60 * 2; //2h - public static final String FILEBASED_STOREFILE_CLEANER_DELAY_JITTER = - "hbase.region.filebased.storefilecleaner.delay.jitter"; - public static final double DEFAULT_FILEBASED_STOREFILE_CLEANER_DELAY_JITTER = 0.25D; - public static final String FILEBASED_STOREFILE_CLEANER_PERIOD = - "hbase.region.filebased.storefilecleaner.period"; - public static final int DEFAULT_FILEBASED_STOREFILE_CLEANER_PERIOD = 1000 * 60 * 60 * 6; //6h +@InterfaceAudience.Private public class BrokenStoreFileCleaner extends ScheduledChore { + private static final Logger LOG = LoggerFactory.getLogger(BrokenStoreFileCleaner.class); + public static final String BROKEN_STOREFILE_CLEANER_ENABLED = + "hbase.region.broken.storefilecleaner.enabled"; + public static final boolean DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED = false; + public static final String BROKEN_STOREFILE_CLEANER_TTL = + "hbase.region.broken.storefilecleaner.ttl"; + public static final long DEFAULT_BROKEN_STOREFILE_CLEANER_TTL = 1000 * 60 * 60 * 12; //12h + public static final String BROKEN_STOREFILE_CLEANER_DELAY = + "hbase.region.broken.storefilecleaner.delay"; + public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY = 1000 * 60 * 60 * 2; //2h + public static final String BROKEN_STOREFILE_CLEANER_DELAY_JITTER = + "hbase.region.broken.storefilecleaner.delay.jitter"; + public static final double DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER = 0.25D; + public static final String BROKEN_STOREFILE_CLEANER_PERIOD = + "hbase.region.broken.storefilecleaner.period"; + public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD = 1000 * 60 * 60 * 6; //6h private HRegionServer regionServer; private final AtomicBoolean enabled = new AtomicBoolean(true); private long ttl; - public FileBasedStoreFileCleaner(final int delay, final int period, final Stoppable stopper, Configuration conf, + public BrokenStoreFileCleaner(final int delay, final int period, final Stoppable stopper, Configuration conf, HRegionServer regionServer) { - super("FileBasedStoreFileCleaner", stopper, period, delay); + super("BrokenStoreFileCleaner", stopper, period, delay); this.regionServer = regionServer; - setEnabled(conf.getBoolean(FILEBASED_STOREFILE_CLEANER_ENABLED, DEFAULT_FILEBASED_STOREFILE_CLEANER_ENABLED)); - ttl = conf.getLong(FILEBASED_STOREFILE_CLEANER_TTL, DEFAULT_FILEBASED_STOREFILE_CLEANER_TTL); + setEnabled(conf.getBoolean(BROKEN_STOREFILE_CLEANER_ENABLED, DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED)); + ttl = conf.getLong(BROKEN_STOREFILE_CLEANER_TTL, DEFAULT_BROKEN_STOREFILE_CLEANER_TTL); } public boolean setEnabled(final boolean enabled) { @@ -85,7 +85,7 @@ public boolean getEnabled() { AtomicLong failedDeletes = new AtomicLong(0); for (HRegion region : regionServer.getRegions()) { for (HStore store : region.getStores()) { - //only clean do cleanup in store using file based storefile tracking + //only do cleanup in stores not using tmp directories if (store.getStoreEngine().requireWritingToTmpDirFirst()) { continue; } @@ -102,11 +102,11 @@ public boolean getEnabled() { } } LOG.debug( - "FileBasedStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed to delete {}", + "BrokenStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed to delete {}", regionServer.getServerName().getServerName(), EnvironmentEdgeManager.currentTime() - start, deletedFiles.get(), failedDeletes.get()); } else { - LOG.trace("File based storefile Cleaner chore disabled! Not cleaning."); + LOG.trace("Broken storefile Cleaner chore disabled! Not cleaning."); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 229bf2f16744..02944a2905d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -432,7 +432,7 @@ public class HRegionServer extends HBaseServerBase */ final ServerNonceManager nonceManager; - private FileBasedStoreFileCleaner fileBasedStoreFileCleaner; + private BrokenStoreFileCleaner brokenStoreFileCleaner; @InterfaceAudience.Private CompactedHFilesDischarger compactedFileDischarger; @@ -1833,8 +1833,8 @@ private void startServices() throws IOException { if (this.slowLogTableOpsChore != null) { choreService.scheduleChore(slowLogTableOpsChore); } - if (this.fileBasedStoreFileCleaner != null) { - choreService.scheduleChore(fileBasedStoreFileCleaner); + if (this.brokenStoreFileCleaner != null) { + choreService.scheduleChore(brokenStoreFileCleaner); } // Leases is not a Thread. Internally it runs a daemon thread. If it gets @@ -1916,20 +1916,20 @@ private void initializeThreads() { onlyMetaRefresh, this, this); } - int fileBasedStoreFileCleanerPeriod = conf.getInt( - FileBasedStoreFileCleaner.FILEBASED_STOREFILE_CLEANER_PERIOD, - FileBasedStoreFileCleaner.DEFAULT_FILEBASED_STOREFILE_CLEANER_PERIOD); - int fileBasedStoreFileCleanerDelay = conf.getInt( - FileBasedStoreFileCleaner.FILEBASED_STOREFILE_CLEANER_DELAY, - FileBasedStoreFileCleaner.DEFAULT_FILEBASED_STOREFILE_CLEANER_DELAY); - double fileBasedStoreFileCleanerDelayJitter = conf.getDouble( - FileBasedStoreFileCleaner.FILEBASED_STOREFILE_CLEANER_DELAY_JITTER, - FileBasedStoreFileCleaner.DEFAULT_FILEBASED_STOREFILE_CLEANER_DELAY_JITTER); - double jitterRate = (RandomUtils.nextDouble() - 0.5D) * fileBasedStoreFileCleanerDelayJitter; - long jitterValue = Math.round(fileBasedStoreFileCleanerDelay * jitterRate); - this.fileBasedStoreFileCleaner = - new FileBasedStoreFileCleaner((int) (fileBasedStoreFileCleanerDelay + jitterValue), - fileBasedStoreFileCleanerPeriod, this, conf, this); + int brokenStoreFileCleanerPeriod = conf.getInt( + BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, + BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD); + int brokenStoreFileCleanerDelay = conf.getInt( + BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, + BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY); + double brokenStoreFileCleanerDelayJitter = conf.getDouble( + BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER, + BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER); + double jitterRate = (RandomUtils.nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter; + long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate); + this.brokenStoreFileCleaner = + new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue), + brokenStoreFileCleanerPeriod, this, conf, this); registerConfigurationObservers(); } @@ -3506,8 +3506,8 @@ protected boolean clusterMode() { } @InterfaceAudience.Private - public FileBasedStoreFileCleaner getFileBasedStoreFileCleaner(){ - return fileBasedStoreFileCleaner; + public BrokenStoreFileCleaner getBrokenStoreFileCleaner(){ + return brokenStoreFileCleaner; } @Override @@ -3520,6 +3520,6 @@ protected void stopChores() { shutdownChore(storefileRefresher); shutdownChore(fsUtilizationChore); shutdownChore(slowLogTableOpsChore); - shutdownChore(fileBasedStoreFileCleaner); + shutdownChore(brokenStoreFileCleaner); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 4bb43eb6ef02..0798f9da3b97 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1157,6 +1157,8 @@ protected List doCompaction(CompactionRequestImpl cr, } } replaceStoreFiles(filesToCompact, sfs, true); + storeEngine.resetCompactionWriter(); + if (cr.isMajor()) { majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java index 60607bdbb5cd..1abd310bca93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java @@ -521,4 +521,8 @@ public void removeCompactedFiles(Collection compactedFiles) { public boolean requireWritingToTmpDirFirst() { return storeFileTracker.requireWritingToTmpDirFirst(); } + + public void resetCompactionWriter(){ + compactor.resetWriter(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 4b0aff6686d9..77b39f5859b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -357,7 +357,6 @@ protected final List compact(final CompactionRequestImpl request, LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream() .map(n -> n.toString()) .collect(Collectors.joining(", ", "{ ", " }"))); - writer = null; } writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor()); finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId, @@ -560,4 +559,11 @@ public List getCompactionTargets(){ Collectors.toList()); } } + + /** + * Reset the Writer when the new storefiles were successfully added + */ + public void resetWriter(){ + writer = null; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java index 7be2e69caa2a..6ca758feaa71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -82,7 +82,6 @@ public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetai protected List commitWriter(FileDetails fd, CompactionRequestImpl request) throws IOException { List pathList = writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles()); - writer = null; return pathList; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index fff56637fdd0..43a26042fb3f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -68,7 +68,6 @@ protected List commitWriter(FileDetails fd, List newFiles = Lists.newArrayList(writer.getPath()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); writer.close(); - writer = null; return newFiles; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 5573f6e90f58..060a11b41fe6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -128,7 +128,6 @@ public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails f protected List commitWriter(FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles()); - writer = null; assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata."; return newFiles; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java index e8a73a6948ed..aabbe8d87494 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java @@ -93,5 +93,5 @@ void replace(Collection compactedFiles, Collection * Whether the implementation of this tracker requires you to write to temp directory first, i.e, * does not allow broken store files under the actual data directory. */ - public boolean requireWritingToTmpDirFirst(); + boolean requireWritingToTmpDirFirst(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java index 0e2898d0fa3c..db10f4db4c4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java @@ -173,12 +173,6 @@ public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) th return builder.build(); } - /** - * Whether the implementation of this tracker requires you to write to temp directory first, i.e, - * does not allow broken store files under the actual data directory. - */ - public abstract boolean requireWritingToTmpDirFirst(); - protected abstract void doAddNewStoreFiles(Collection newFiles) throws IOException; protected abstract void doAddCompactionResults(Collection compactedFiles, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFileBasedStoreFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java similarity index 89% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFileBasedStoreFileCleaner.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java index 5aa2a382156f..acc38fc13c6b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFileBasedStoreFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java @@ -21,9 +21,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.CompactType; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; @@ -41,11 +41,11 @@ import static org.junit.Assert.assertTrue; @Category({ MediumTests.class, RegionServerTests.class }) -public class TestFileBasedStoreFileCleaner { +public class TestBrokenStoreFileCleaner { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestFileBasedStoreFileCleaner.class); + HBaseClassTestRule.forClass(TestBrokenStoreFileCleaner.class); private final HBaseTestingUtil testUtil = new HBaseTestingUtil(); private final static byte[] fam = Bytes.toBytes("cf_1"); @@ -58,10 +58,10 @@ public class TestFileBasedStoreFileCleaner { @Before public void setUp() throws Exception { testUtil.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL, "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); - testUtil.getConfiguration().set(FileBasedStoreFileCleaner.FILEBASED_STOREFILE_CLEANER_ENABLED, "true"); - testUtil.getConfiguration().set(FileBasedStoreFileCleaner.FILEBASED_STOREFILE_CLEANER_TTL, "0"); - testUtil.getConfiguration().set(FileBasedStoreFileCleaner.FILEBASED_STOREFILE_CLEANER_PERIOD, "15000000"); - testUtil.getConfiguration().set(FileBasedStoreFileCleaner.FILEBASED_STOREFILE_CLEANER_DELAY, "0"); + testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_ENABLED, "true"); + testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_TTL, "0"); + testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, "15000000"); + testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, "0"); testUtil.startMiniCluster(1); } @@ -79,7 +79,7 @@ public void testDeletingJunkFile() throws Exception { HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0); ServerName sn = testUtil.getMiniHBaseCluster().getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName()); HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn); - FileBasedStoreFileCleaner cleaner = rs.getFileBasedStoreFileCleaner(); + BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner(); //create junk file HStore store = region.getStore(fam); @@ -113,7 +113,7 @@ public void testSkippningCompactedFiles() throws Exception { ServerName sn = testUtil.getMiniHBaseCluster().getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName()); HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn); - FileBasedStoreFileCleaner cleaner = rs.getFileBasedStoreFileCleaner(); + BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner(); //run major compaction to generate compaced files region.compact(true); From 9608c7ff3faf689ca3e10c6e291e1e3a5aee230b Mon Sep 17 00:00:00 2001 From: Szabolcs Bukros Date: Thu, 4 Nov 2021 15:25:58 +0100 Subject: [PATCH 5/8] HBASE-26271: Cleanup the broken store files under data directory added javadoc fixed typos additonal fileTTL UT --- .../regionserver/AbstractMultiFileWriter.java | 4 ++ .../regionserver/BrokenStoreFileCleaner.java | 10 ++--- .../hadoop/hbase/regionserver/HStore.java | 4 ++ .../hbase/regionserver/StoreEngine.java | 11 +++++ .../regionserver/compactions/Compactor.java | 2 +- .../compactions/DefaultCompactor.java | 3 +- .../TestBrokenStoreFileCleaner.java | 44 ++++++++++++++++++- 7 files changed, 68 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java index 394c31259cf0..82c3867c103c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java @@ -110,6 +110,10 @@ public List abortWriters() { return paths; } + /** + * Returns all writers. This is used to prevent deleting currently writen storefiles + * during cleanup. + */ public abstract Collection writers(); /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java index fe5a7e0b6ff3..d7d884deb839 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -39,7 +38,8 @@ * This Chore, every time it runs, will clear the unsused HFiles in the data * folder. */ -@InterfaceAudience.Private public class BrokenStoreFileCleaner extends ScheduledChore { +@InterfaceAudience.Private +public class BrokenStoreFileCleaner extends ScheduledChore { private static final Logger LOG = LoggerFactory.getLogger(BrokenStoreFileCleaner.class); public static final String BROKEN_STOREFILE_CLEANER_ENABLED = "hbase.region.broken.storefilecleaner.enabled"; @@ -59,14 +59,14 @@ private HRegionServer regionServer; private final AtomicBoolean enabled = new AtomicBoolean(true); - private long ttl; + private long fileTtl; public BrokenStoreFileCleaner(final int delay, final int period, final Stoppable stopper, Configuration conf, HRegionServer regionServer) { super("BrokenStoreFileCleaner", stopper, period, delay); this.regionServer = regionServer; setEnabled(conf.getBoolean(BROKEN_STOREFILE_CLEANER_ENABLED, DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED)); - ttl = conf.getLong(BROKEN_STOREFILE_CLEANER_TTL, DEFAULT_BROKEN_STOREFILE_CLEANER_TTL); + fileTtl = conf.getLong(BROKEN_STOREFILE_CLEANER_TTL, DEFAULT_BROKEN_STOREFILE_CLEANER_TTL); } public boolean setEnabled(final boolean enabled) { @@ -165,7 +165,7 @@ boolean validate(Path file) { } boolean isOldEnough(FileStatus file){ - return file.getModificationTime() + ttl < System.currentTimeMillis(); + return file.getModificationTime() + fileTtl < EnvironmentEdgeManager.currentTime(); } private void deleteFile(FileStatus file, HStore store, AtomicLong deletedFiles, AtomicLong failedDeletes) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 0798f9da3b97..11effea370ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1157,6 +1157,10 @@ protected List doCompaction(CompactionRequestImpl cr, } } replaceStoreFiles(filesToCompact, sfs, true); + + // This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the + // CleanerChore know that compaction is done and the file can be cleaned up if compaction + // have failed. storeEngine.resetCompactionWriter(); if (cr.isMajor()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java index 1abd310bca93..43c0ee28407c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java @@ -518,10 +518,21 @@ public void removeCompactedFiles(Collection compactedFiles) { } } + /** + * Whether the implementation of the used storefile tracker requires you to write to temp + * directory first, i.e, does not allow broken store files under the actual data directory. + */ public boolean requireWritingToTmpDirFirst() { return storeFileTracker.requireWritingToTmpDirFirst(); } + /** + * Resets the compaction writer when the new file is committed and used as active storefile. + * This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the + * CleanerChore know that compaction is done and the file can be cleaned up if compaction + * have failed. Currently called in + * @see org.apache.hadoop.hbase.regionserver.HStore#doCompaction(CompactionRequestImpl,Collection, User, long, List) + */ public void resetCompactionWriter(){ compactor.resetWriter(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 77b39f5859b0..0ee7d349e4c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -563,7 +563,7 @@ public List getCompactionTargets(){ /** * Reset the Writer when the new storefiles were successfully added */ - public void resetWriter(){ + public void resetWriter(){ writer = null; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 43a26042fb3f..fb7f83b3d102 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -82,8 +82,7 @@ protected void abortWriter(StoreFileWriter writer) throws IOException { writer.close(); } catch (IOException e) { LOG.warn("Failed to close the writer after an unfinished compaction.", e); - } - finally { + } finally { writer = null; } try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java index acc38fc13c6b..1c0e3bda908a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java @@ -105,7 +105,7 @@ public void testDeletingJunkFile() throws Exception { } @Test - public void testSkippningCompactedFiles() throws Exception { + public void testSkippingCompactedFiles() throws Exception { tableName = TableName.valueOf(getClass().getSimpleName() + "testSkippningCompactedFiles"); createTableWithData(tableName); @@ -125,7 +125,7 @@ public void testSkippningCompactedFiles() throws Exception { cleaner.chore(); - //verify none of the compacted files wee deleted + //verify none of the compacted files were deleted int existingCompactedFiles = store.getCompactedFilesCount(); assertEquals(compactedFiles, existingCompactedFiles); @@ -147,6 +147,46 @@ public void testSkippningCompactedFiles() throws Exception { assertEquals(compactedFiles, existingCompactedFiles); } + @Test + public void testJunkFileTTL() throws Exception { + tableName = TableName.valueOf(getClass().getSimpleName() + "testDeletingJunkFile"); + createTableWithData(tableName); + + HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0); + ServerName sn = testUtil.getMiniHBaseCluster().getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName()); + HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn); + + //create junk file + HStore store = region.getStore(fam); + Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName()); + Path junkFilePath = new Path(cfPath, junkFileName); + + FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath); + junkFileOS.writeUTF("hello"); + junkFileOS.close(); + + int storeFiles = store.getStorefilesCount(); + assertTrue(storeFiles > 0); + + //verify the file exist before the chore + assertTrue(store.getFileSystem().exists(junkFilePath)); + + //set a 5 sec ttl + rs.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_TTL, "5000"); + BrokenStoreFileCleaner cleaner = new BrokenStoreFileCleaner(15000000, + 0, rs, rs.getConfiguration(), rs); + cleaner.chore(); + //file is still present after chore run + assertTrue(store.getFileSystem().exists(junkFilePath)); + Thread.sleep(5000); + cleaner.chore(); + assertFalse(store.getFileSystem().exists(junkFilePath)); + + //verify no storefile got deleted + int currentStoreFiles = store.getStorefilesCount(); + assertEquals(currentStoreFiles, storeFiles); + } + private Table createTableWithData(TableName tableName) throws IOException { Table table = testUtil.createTable(tableName, fam); try { From 956f74eef6156c5053fb83ddf3d5a3c3b99179eb Mon Sep 17 00:00:00 2001 From: Szabolcs Bukros Date: Mon, 8 Nov 2021 16:15:02 +0100 Subject: [PATCH 6/8] HBASE-26271: Cleanup the broken store files under data directory improve naming and add some explanation in the comments --- .../hbase/regionserver/BrokenStoreFileCleaner.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java index d7d884deb839..a81f76bbdedc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java @@ -77,8 +77,8 @@ public boolean getEnabled() { return this.enabled.get(); } - @InterfaceAudience.Private - @Override public void chore() { + @Override + public void chore() { if (getEnabled()) { long start = EnvironmentEdgeManager.currentTime(); AtomicLong deletedFiles = new AtomicLong(0); @@ -132,12 +132,14 @@ private void cleanFileIfNeeded(FileStatus file, HStore store, return; } + // Compacted files can still have readers and are cleaned by a separate chore, so they have to + // be skipped here if(isCompactedFile(file, store)){ LOG.trace("Cleanup is done by a different chore for file {}, skip cleanup", file.getPath()); return; } - if(isCompactingFile(file, store)){ + if(isCompactionResultFile(file, store)){ LOG.trace("The file is the result of an ongoing compaction {}, skip cleanup", file.getPath()); return; } @@ -145,7 +147,7 @@ private void cleanFileIfNeeded(FileStatus file, HStore store, deleteFile(file, store, deletedFiles, failedDeletes); } - private boolean isCompactingFile(FileStatus file, HStore store) { + private boolean isCompactionResultFile(FileStatus file, HStore store) { return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath()); } From ae0fa65a0612bd6d2f507ab8ed187f7246dfd6fb Mon Sep 17 00:00:00 2001 From: Szabolcs Bukros Date: Mon, 8 Nov 2021 20:44:03 +0100 Subject: [PATCH 7/8] HBASE-26271: Cleanup the broken store files under data directory clean up after rebase --- .../MigrationStoreFileTracker.java | 8 - .../StoreFileTrackerFactory.java | 138 ------------------ .../TestMergesSplitsAddToTracker.java | 4 - 3 files changed, 150 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java index 5fa651dc5484..53a474d3bde7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java @@ -93,12 +93,4 @@ static Class getSrcTrackerClass(Configuration conf) static Class getDstTrackerClass(Configuration conf) { return StoreFileTrackerFactory.getStoreFileTrackerClassForMigration(conf, DST_IMPL); } - - static Class getSrcTrackerClass(Configuration conf) { - return StoreFileTrackerFactory.getStoreFileTrackerClassForMigration(conf, SRC_IMPL); - } - - static Class getDstTrackerClass(Configuration conf) { - return StoreFileTrackerFactory.getStoreFileTrackerClassForMigration(conf, DST_IMPL); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java index 7689d4fc8434..1c683ae3de62 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java @@ -311,142 +311,4 @@ public static void checkForModifyTable(Configuration conf, TableDescriptor oldTa } } } - - // should not use MigrationStoreFileTracker for new family - private static void checkForNewFamily(Configuration conf, TableDescriptor table, - ColumnFamilyDescriptor family) throws IOException { - Configuration mergedConf = mergeConfigurations(conf, table, family); - Class tracker = getTrackerClass(mergedConf); - if (MigrationStoreFileTracker.class.isAssignableFrom(tracker)) { - throw new DoNotRetryIOException( - "Should not use " + Trackers.MIGRATION + " as store file tracker for new family " - + family.getNameAsString() + " of table " + table.getTableName()); - } - } - - /** - * Pre check when creating a new table. - *

- * For now, only make sure that we do not use {@link Trackers#MIGRATION} for newly created tables. - * @throws IOException when there are check errors, the upper layer should fail the - * {@code CreateTableProcedure}. - */ - public static void checkForCreateTable(Configuration conf, TableDescriptor table) - throws IOException { - for (ColumnFamilyDescriptor family : table.getColumnFamilies()) { - checkForNewFamily(conf, table, family); - } - } - - - /** - * Pre check when modifying a table. - *

- * The basic idea is when you want to change the store file tracker implementation, you should use - * {@link Trackers#MIGRATION} first and then change to the destination store file tracker - * implementation. - *

- * There are several rules: - *

    - *
  • For newly added family, you should not use {@link Trackers#MIGRATION}.
  • - *
  • For modifying a family: - *
      - *
    • If old tracker is {@link Trackers#MIGRATION}, then: - *
        - *
      • The new tracker is also {@link Trackers#MIGRATION}, then they must have the same src and - * dst tracker.
      • - *
      • The new tracker is not {@link Trackers#MIGRATION}, then the new tracker must be the dst - * tracker of the old tracker.
      • - *
      - *
    • - *
    • If the old tracker is not {@link Trackers#MIGRATION}, then: - *
        - *
      • If the new tracker is {@link Trackers#MIGRATION}, then the old tracker must be the src - * tracker of the new tracker.
      • - *
      • If the new tracker is not {@link Trackers#MIGRATION}, then the new tracker must be the same - * with old tracker.
      • - *
      - *
    • - *
    - *
  • - *
- * @throws IOException when there are check errors, the upper layer should fail the - * {@code ModifyTableProcedure}. - */ - public static void checkForModifyTable(Configuration conf, TableDescriptor oldTable, - TableDescriptor newTable) throws IOException { - for (ColumnFamilyDescriptor newFamily : newTable.getColumnFamilies()) { - ColumnFamilyDescriptor oldFamily = oldTable.getColumnFamily(newFamily.getName()); - if (oldFamily == null) { - checkForNewFamily(conf, newTable, newFamily); - continue; - } - Configuration oldConf = mergeConfigurations(conf, oldTable, oldFamily); - Configuration newConf = mergeConfigurations(conf, newTable, newFamily); - - Class oldTracker = getTrackerClass(oldConf); - Class newTracker = getTrackerClass(newConf); - - if (MigrationStoreFileTracker.class.isAssignableFrom(oldTracker)) { - Class oldSrcTracker = - MigrationStoreFileTracker.getSrcTrackerClass(oldConf); - Class oldDstTracker = - MigrationStoreFileTracker.getDstTrackerClass(oldConf); - if (oldTracker.equals(newTracker)) { - // confirm that we have the same src tracker and dst tracker - Class newSrcTracker = - MigrationStoreFileTracker.getSrcTrackerClass(newConf); - if (!oldSrcTracker.equals(newSrcTracker)) { - throw new DoNotRetryIOException( - "The src tracker has been changed from " + getStoreFileTrackerName(oldSrcTracker) - + " to " + getStoreFileTrackerName(newSrcTracker) + " for family " - + newFamily.getNameAsString() + " of table " + newTable.getTableName()); - } - Class newDstTracker = - MigrationStoreFileTracker.getDstTrackerClass(newConf); - if (!oldDstTracker.equals(newDstTracker)) { - throw new DoNotRetryIOException( - "The dst tracker has been changed from " + getStoreFileTrackerName(oldDstTracker) - + " to " + getStoreFileTrackerName(newDstTracker) + " for family " - + newFamily.getNameAsString() + " of table " + newTable.getTableName()); - } - } else { - // we can only change to the dst tracker - if (!newTracker.equals(oldDstTracker)) { - throw new DoNotRetryIOException( - "Should migrate tracker to " + getStoreFileTrackerName(oldDstTracker) + " but got " - + getStoreFileTrackerName(newTracker) + " for family " + newFamily.getNameAsString() - + " of table " + newTable.getTableName()); - } - } - } else { - if (!oldTracker.equals(newTracker)) { - // can only change to MigrationStoreFileTracker and the src tracker should be the old - // tracker - if (!MigrationStoreFileTracker.class.isAssignableFrom(newTracker)) { - throw new DoNotRetryIOException("Should change to " + Trackers.MIGRATION - + " first when migrating from " + getStoreFileTrackerName(oldTracker) + " for family " - + newFamily.getNameAsString() + " of table " + newTable.getTableName()); - } - Class newSrcTracker = - MigrationStoreFileTracker.getSrcTrackerClass(newConf); - if (!oldTracker.equals(newSrcTracker)) { - throw new DoNotRetryIOException( - "Should use src tracker " + getStoreFileTrackerName(oldTracker) + " first but got " - + getStoreFileTrackerName(newSrcTracker) + " when migrating from " - + getStoreFileTrackerName(oldTracker) + " for family " + newFamily.getNameAsString() - + " of table " + newTable.getTableName()); - } - Class newDstTracker = - MigrationStoreFileTracker.getDstTrackerClass(newConf); - // the src and dst tracker should not be the same - if (newSrcTracker.equals(newDstTracker)) { - throw new DoNotRetryIOException("The src tracker and dst tracker are both " - + getStoreFileTrackerName(newSrcTracker) + " for family " - + newFamily.getNameAsString() + " of table " + newTable.getTableName()); - } - } - } - } - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java index 7544426824eb..68fc444493c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java @@ -18,11 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory. -<<<<<<< HEAD TRACKER_IMPL; -======= - TRACK_IMPL; ->>>>>>> HBASE-26079 Use StoreFileTracker when splitting and merging (#3617) import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; From ed0e58d42fce03a587b04c8061042206b07c6e5a Mon Sep 17 00:00:00 2001 From: Szabolcs Bukros Date: Tue, 9 Nov 2021 10:22:51 +0100 Subject: [PATCH 8/8] HBASE-26271: Cleanup the broken store files under data directory checkstyle fixes additonal comments --- .../hbase/mob/DefaultMobStoreCompactor.java | 1 - .../regionserver/BrokenStoreFileCleaner.java | 37 ++++++++++++------- .../hbase/regionserver/StoreEngine.java | 4 +- .../AbstractMultiOutputCompactor.java | 1 + .../compactions/DateTieredCompactor.java | 3 +- .../compactions/DefaultCompactor.java | 1 + .../hbase/mob/FaultyMobStoreCompactor.java | 1 - .../TestBrokenStoreFileCleaner.java | 27 ++++++++------ 8 files changed, 46 insertions(+), 29 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index a45e6a2c3e3b..15f0a73a9df9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java index a81f76bbdedc..0c4807d8badc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -28,11 +33,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; /** * This Chore, every time it runs, will clear the unsused HFiles in the data @@ -61,11 +61,12 @@ public class BrokenStoreFileCleaner extends ScheduledChore { private final AtomicBoolean enabled = new AtomicBoolean(true); private long fileTtl; - public BrokenStoreFileCleaner(final int delay, final int period, final Stoppable stopper, Configuration conf, - HRegionServer regionServer) { + public BrokenStoreFileCleaner(final int delay, final int period, final Stoppable stopper, + Configuration conf, HRegionServer regionServer) { super("BrokenStoreFileCleaner", stopper, period, delay); this.regionServer = regionServer; - setEnabled(conf.getBoolean(BROKEN_STOREFILE_CLEANER_ENABLED, DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED)); + setEnabled( + conf.getBoolean(BROKEN_STOREFILE_CLEANER_ENABLED, DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED)); fileTtl = conf.getLong(BROKEN_STOREFILE_CLEANER_TTL, DEFAULT_BROKEN_STOREFILE_CLEANER_TTL); } @@ -93,8 +94,10 @@ public void chore() { new Path(region.getRegionFileSystem().getRegionDir(), store.getColumnFamilyName()); try { - List fsStoreFiles = Arrays.asList(region.getRegionFileSystem().fs.listStatus(storePath)); - fsStoreFiles.forEach(file -> cleanFileIfNeeded(file, store, deletedFiles, failedDeletes)); + List fsStoreFiles = + Arrays.asList(region.getRegionFileSystem().fs.listStatus(storePath)); + fsStoreFiles.forEach( + file -> cleanFileIfNeeded(file, store, deletedFiles, failedDeletes)); } catch (IOException e) { LOG.warn("Failed to list files in {}, cleanup is skipped there",storePath); continue; @@ -102,7 +105,8 @@ public void chore() { } } LOG.debug( - "BrokenStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed to delete {}", + "BrokenStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed " + + "to delete {}", regionServer.getServerName().getServerName(), EnvironmentEdgeManager.currentTime() - start, deletedFiles.get(), failedDeletes.get()); } else { @@ -151,12 +155,16 @@ private boolean isCompactionResultFile(FileStatus file, HStore store) { return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath()); } + // Compacted files can still have readers and are cleaned by a separate chore, so they have to + // be skipped here private boolean isCompactedFile(FileStatus file, HStore store) { - return store.getStoreEngine().getStoreFileManager().getCompactedfiles().stream().anyMatch(sf -> sf.getPath().equals(file.getPath())); + return store.getStoreEngine().getStoreFileManager().getCompactedfiles().stream() + .anyMatch(sf -> sf.getPath().equals(file.getPath())); } private boolean isActiveStorefile(FileStatus file, HStore store) { - return store.getStoreEngine().getStoreFileManager().getStorefiles().stream().anyMatch(sf -> sf.getPath().equals(file.getPath())); + return store.getStoreEngine().getStoreFileManager().getStorefiles().stream() + .anyMatch(sf -> sf.getPath().equals(file.getPath())); } boolean validate(Path file) { @@ -170,7 +178,8 @@ boolean isOldEnough(FileStatus file){ return file.getModificationTime() + fileTtl < EnvironmentEdgeManager.currentTime(); } - private void deleteFile(FileStatus file, HStore store, AtomicLong deletedFiles, AtomicLong failedDeletes) { + private void deleteFile(FileStatus file, HStore store, AtomicLong deletedFiles, + AtomicLong failedDeletes) { Path filePath = file.getPath(); LOG.debug("Removing {} from store", filePath); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java index 43c0ee28407c..318b701baf72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java @@ -41,9 +41,11 @@ import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -531,7 +533,7 @@ public boolean requireWritingToTmpDirFirst() { * This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the * CleanerChore know that compaction is done and the file can be cleaned up if compaction * have failed. Currently called in - * @see org.apache.hadoop.hbase.regionserver.HStore#doCompaction(CompactionRequestImpl,Collection, User, long, List) + * @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List) */ public void resetCompactionWriter(){ compactor.resetWriter(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java index c505eb5cd25f..19b7a98627e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java @@ -79,6 +79,7 @@ protected void abortWriter() throws IOException { e); } } + //this step signals that the target file is no longer writen and can be cleaned up writer = null; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java index 6ca758feaa71..43e037c5e702 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -81,7 +81,8 @@ public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetai @Override protected List commitWriter(FileDetails fd, CompactionRequestImpl request) throws IOException { - List pathList = writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles()); + List pathList = + writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles()); return pathList; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index fb7f83b3d102..ad2384a97ab8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -83,6 +83,7 @@ protected void abortWriter(StoreFileWriter writer) throws IOException { } catch (IOException e) { LOG.warn("Failed to close the writer after an unfinished compaction.", e); } finally { + //this step signals that the target file is no longer writen and can be cleaned up writer = null; } try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java index 1196c5219ee2..d178d564f650 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; -import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java index 1c0e3bda908a..78755a4fe772 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java @@ -17,13 +17,16 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.CompactType; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; @@ -35,10 +38,6 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; @Category({ MediumTests.class, RegionServerTests.class }) public class TestBrokenStoreFileCleaner { @@ -57,10 +56,13 @@ public class TestBrokenStoreFileCleaner { @Before public void setUp() throws Exception { - testUtil.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL, "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); - testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_ENABLED, "true"); + testUtil.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL, + "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); + testUtil.getConfiguration() + .set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_ENABLED, "true"); testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_TTL, "0"); - testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, "15000000"); + testUtil.getConfiguration() + .set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, "15000000"); testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, "0"); testUtil.startMiniCluster(1); } @@ -77,7 +79,8 @@ public void testDeletingJunkFile() throws Exception { createTableWithData(tableName); HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0); - ServerName sn = testUtil.getMiniHBaseCluster().getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName()); + ServerName sn = testUtil.getMiniHBaseCluster() + .getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName()); HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn); BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner(); @@ -111,7 +114,8 @@ public void testSkippingCompactedFiles() throws Exception { HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0); - ServerName sn = testUtil.getMiniHBaseCluster().getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName()); + ServerName sn = testUtil.getMiniHBaseCluster() + .getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName()); HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn); BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner(); @@ -153,7 +157,8 @@ public void testJunkFileTTL() throws Exception { createTableWithData(tableName); HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0); - ServerName sn = testUtil.getMiniHBaseCluster().getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName()); + ServerName sn = testUtil.getMiniHBaseCluster() + .getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName()); HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn); //create junk file