Skip to content

Commit c60adbc

Browse files
BukrosSzabolcsApache9
authored andcommitted
HBASE-26271 Cleanup the broken store files under data directory (#3786)
Signed-off-by: Duo Zhang <[email protected]> Signed-off-by: Josh Elser <[email protected]> Signed-off-by: Wellington Ramos Chevreuil <[email protected]>
1 parent 1d410ec commit c60adbc

22 files changed

+566
-32
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.hadoop.hbase.CellUtil;
2929
import org.apache.hadoop.hbase.KeyValue;
3030
import org.apache.hadoop.hbase.PrivateCellUtil;
31-
import org.apache.hadoop.hbase.regionserver.CellSink;
3231
import org.apache.hadoop.hbase.regionserver.HMobStore;
3332
import org.apache.hadoop.hbase.regionserver.HStore;
3433
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -52,6 +51,8 @@
5251
import org.slf4j.Logger;
5352
import org.slf4j.LoggerFactory;
5453

54+
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
55+
5556
/**
5657
* Compact passed set of files in the mob-enabled column family.
5758
*/
@@ -154,7 +155,6 @@ public List<Path> compact(CompactionRequestImpl request, ThroughputController th
154155
* the scanner to filter the deleted cells.
155156
* @param fd File details
156157
* @param scanner Where to read from.
157-
* @param writer Where to write to.
158158
* @param smallestReadPoint Smallest read point.
159159
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
160160
* @param throughputController The compaction throughput controller.
@@ -163,7 +163,7 @@ public List<Path> compact(CompactionRequestImpl request, ThroughputController th
163163
* @return Whether compaction ended; false if it was interrupted for any reason.
164164
*/
165165
@Override
166-
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
166+
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
167167
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
168168
boolean major, int numofFilesToCompact) throws IOException {
169169
long bytesWrittenProgressForCloseCheck = 0;
@@ -369,4 +369,14 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
369369
progress.complete();
370370
return true;
371371
}
372+
373+
374+
@Override
375+
protected List<Path> commitWriter(FileDetails fd,
376+
CompactionRequestImpl request) throws IOException {
377+
List<Path> newFiles = Lists.newArrayList(writer.getPath());
378+
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
379+
writer.close();
380+
return newFiles;
381+
}
372382
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,11 @@ public List<Path> abortWriters() {
110110
return paths;
111111
}
112112

113-
protected abstract Collection<StoreFileWriter> writers();
113+
/**
114+
* Returns all writers. This is used to prevent deleting currently writen storefiles
115+
* during cleanup.
116+
*/
117+
public abstract Collection<StoreFileWriter> writers();
114118

115119
/**
116120
* Subclasses override this method to be called at the end of a successful sequence of append; all
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver;
19+
20+
import java.io.IOException;
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicLong;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.fs.FileStatus;
27+
import org.apache.hadoop.fs.Path;
28+
import org.apache.hadoop.hbase.ScheduledChore;
29+
import org.apache.hadoop.hbase.Stoppable;
30+
import org.apache.hadoop.hbase.io.HFileLink;
31+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
32+
import org.apache.hadoop.ipc.RemoteException;
33+
import org.apache.yetus.audience.InterfaceAudience;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
/**
38+
* This Chore, every time it runs, will clear the unsused HFiles in the data
39+
* folder.
40+
*/
41+
@InterfaceAudience.Private
42+
public class BrokenStoreFileCleaner extends ScheduledChore {
43+
private static final Logger LOG = LoggerFactory.getLogger(BrokenStoreFileCleaner.class);
44+
public static final String BROKEN_STOREFILE_CLEANER_ENABLED =
45+
"hbase.region.broken.storefilecleaner.enabled";
46+
public static final boolean DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED = false;
47+
public static final String BROKEN_STOREFILE_CLEANER_TTL =
48+
"hbase.region.broken.storefilecleaner.ttl";
49+
public static final long DEFAULT_BROKEN_STOREFILE_CLEANER_TTL = 1000 * 60 * 60 * 12; //12h
50+
public static final String BROKEN_STOREFILE_CLEANER_DELAY =
51+
"hbase.region.broken.storefilecleaner.delay";
52+
public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY = 1000 * 60 * 60 * 2; //2h
53+
public static final String BROKEN_STOREFILE_CLEANER_DELAY_JITTER =
54+
"hbase.region.broken.storefilecleaner.delay.jitter";
55+
public static final double DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER = 0.25D;
56+
public static final String BROKEN_STOREFILE_CLEANER_PERIOD =
57+
"hbase.region.broken.storefilecleaner.period";
58+
public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD = 1000 * 60 * 60 * 6; //6h
59+
60+
private HRegionServer regionServer;
61+
private final AtomicBoolean enabled = new AtomicBoolean(true);
62+
private long fileTtl;
63+
64+
public BrokenStoreFileCleaner(final int delay, final int period, final Stoppable stopper,
65+
Configuration conf, HRegionServer regionServer) {
66+
super("BrokenStoreFileCleaner", stopper, period, delay);
67+
this.regionServer = regionServer;
68+
setEnabled(
69+
conf.getBoolean(BROKEN_STOREFILE_CLEANER_ENABLED, DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED));
70+
fileTtl = conf.getLong(BROKEN_STOREFILE_CLEANER_TTL, DEFAULT_BROKEN_STOREFILE_CLEANER_TTL);
71+
}
72+
73+
public boolean setEnabled(final boolean enabled) {
74+
return this.enabled.getAndSet(enabled);
75+
}
76+
77+
public boolean getEnabled() {
78+
return this.enabled.get();
79+
}
80+
81+
@Override
82+
public void chore() {
83+
if (getEnabled()) {
84+
long start = EnvironmentEdgeManager.currentTime();
85+
AtomicLong deletedFiles = new AtomicLong(0);
86+
AtomicLong failedDeletes = new AtomicLong(0);
87+
for (HRegion region : regionServer.getRegions()) {
88+
for (HStore store : region.getStores()) {
89+
//only do cleanup in stores not using tmp directories
90+
if (store.getStoreEngine().requireWritingToTmpDirFirst()) {
91+
continue;
92+
}
93+
Path storePath =
94+
new Path(region.getRegionFileSystem().getRegionDir(), store.getColumnFamilyName());
95+
96+
try {
97+
List<FileStatus> fsStoreFiles =
98+
Arrays.asList(region.getRegionFileSystem().fs.listStatus(storePath));
99+
fsStoreFiles.forEach(
100+
file -> cleanFileIfNeeded(file, store, deletedFiles, failedDeletes));
101+
} catch (IOException e) {
102+
LOG.warn("Failed to list files in {}, cleanup is skipped there",storePath);
103+
continue;
104+
}
105+
}
106+
}
107+
LOG.debug(
108+
"BrokenStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed "
109+
+ "to delete {}",
110+
regionServer.getServerName().getServerName(), EnvironmentEdgeManager.currentTime() - start,
111+
deletedFiles.get(), failedDeletes.get());
112+
} else {
113+
LOG.trace("Broken storefile Cleaner chore disabled! Not cleaning.");
114+
}
115+
}
116+
117+
private void cleanFileIfNeeded(FileStatus file, HStore store,
118+
AtomicLong deletedFiles, AtomicLong failedDeletes) {
119+
if(file.isDirectory()){
120+
LOG.trace("This is a Directory {}, skip cleanup", file.getPath());
121+
return;
122+
}
123+
124+
if(!validate(file.getPath())){
125+
LOG.trace("Invalid file {}, skip cleanup", file.getPath());
126+
return;
127+
}
128+
129+
if(!isOldEnough(file)){
130+
LOG.trace("Fresh file {}, skip cleanup", file.getPath());
131+
return;
132+
}
133+
134+
if(isActiveStorefile(file, store)){
135+
LOG.trace("Actively used storefile file {}, skip cleanup", file.getPath());
136+
return;
137+
}
138+
139+
// Compacted files can still have readers and are cleaned by a separate chore, so they have to
140+
// be skipped here
141+
if(isCompactedFile(file, store)){
142+
LOG.trace("Cleanup is done by a different chore for file {}, skip cleanup", file.getPath());
143+
return;
144+
}
145+
146+
if(isCompactionResultFile(file, store)){
147+
LOG.trace("The file is the result of an ongoing compaction {}, skip cleanup", file.getPath());
148+
return;
149+
}
150+
151+
deleteFile(file, store, deletedFiles, failedDeletes);
152+
}
153+
154+
private boolean isCompactionResultFile(FileStatus file, HStore store) {
155+
return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath());
156+
}
157+
158+
// Compacted files can still have readers and are cleaned by a separate chore, so they have to
159+
// be skipped here
160+
private boolean isCompactedFile(FileStatus file, HStore store) {
161+
return store.getStoreEngine().getStoreFileManager().getCompactedfiles().stream()
162+
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
163+
}
164+
165+
private boolean isActiveStorefile(FileStatus file, HStore store) {
166+
return store.getStoreEngine().getStoreFileManager().getStorefiles().stream()
167+
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
168+
}
169+
170+
boolean validate(Path file) {
171+
if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
172+
return true;
173+
}
174+
return StoreFileInfo.validateStoreFileName(file.getName());
175+
}
176+
177+
boolean isOldEnough(FileStatus file){
178+
return file.getModificationTime() + fileTtl < EnvironmentEdgeManager.currentTime();
179+
}
180+
181+
private void deleteFile(FileStatus file, HStore store, AtomicLong deletedFiles,
182+
AtomicLong failedDeletes) {
183+
Path filePath = file.getPath();
184+
LOG.debug("Removing {} from store", filePath);
185+
try {
186+
boolean success = store.getFileSystem().delete(filePath, false);
187+
if (!success) {
188+
failedDeletes.incrementAndGet();
189+
LOG.warn("Attempted to delete:" + filePath
190+
+ ", but couldn't. Attempt to delete on next pass.");
191+
}
192+
else{
193+
deletedFiles.incrementAndGet();
194+
}
195+
} catch (IOException e) {
196+
e = e instanceof RemoteException ?
197+
((RemoteException)e).unwrapRemoteException() : e;
198+
LOG.warn("Error while deleting: " + filePath, e);
199+
}
200+
}
201+
202+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void append(Cell cell) throws IOException {
7171
}
7272

7373
@Override
74-
protected Collection<StoreFileWriter> writers() {
74+
public Collection<StoreFileWriter> writers() {
7575
return lowerBoundary2Writer.values();
7676
}
7777

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,7 @@ public Path commitDaughterRegion(final RegionInfo regionInfo, List<Path> allRegi
609609
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
610610
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
611611
env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
612-
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
612+
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
613613
}
614614
return regionDir;
615615
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,8 @@ public class HRegionServer extends Thread implements
546546
*/
547547
protected final ConfigurationManager configurationManager;
548548

549+
private BrokenStoreFileCleaner brokenStoreFileCleaner;
550+
549551
@InterfaceAudience.Private
550552
CompactedHFilesDischarger compactedFileDischarger;
551553

@@ -2156,6 +2158,9 @@ private void startServices() throws IOException {
21562158
if (this.slowLogTableOpsChore != null) {
21572159
choreService.scheduleChore(slowLogTableOpsChore);
21582160
}
2161+
if (this.brokenStoreFileCleaner != null) {
2162+
choreService.scheduleChore(brokenStoreFileCleaner);
2163+
}
21592164

21602165
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
21612166
// an unhandled exception, it will just exit.
@@ -2236,6 +2241,22 @@ private void initializeThreads() {
22362241
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
22372242
onlyMetaRefresh, this, this);
22382243
}
2244+
2245+
int brokenStoreFileCleanerPeriod = conf.getInt(
2246+
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
2247+
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
2248+
int brokenStoreFileCleanerDelay = conf.getInt(
2249+
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
2250+
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
2251+
double brokenStoreFileCleanerDelayJitter = conf.getDouble(
2252+
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
2253+
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
2254+
double jitterRate = (RandomUtils.nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
2255+
long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
2256+
this.brokenStoreFileCleaner =
2257+
new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
2258+
brokenStoreFileCleanerPeriod, this, conf, this);
2259+
22392260
registerConfigurationObservers();
22402261
}
22412262

@@ -4027,4 +4048,9 @@ public Iterator<ServerName> getBootstrapNodes() {
40274048
public MetaRegionLocationCache getMetaRegionLocationCache() {
40284049
return this.metaRegionLocationCache;
40294050
}
4051+
4052+
@InterfaceAudience.Private
4053+
public BrokenStoreFileCleaner getBrokenStoreFileCleaner(){
4054+
return brokenStoreFileCleaner;
4055+
}
40304056
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,6 +1156,12 @@ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
11561156
}
11571157
}
11581158
replaceStoreFiles(filesToCompact, sfs, true);
1159+
1160+
// This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
1161+
// CleanerChore know that compaction is done and the file can be cleaned up if compaction
1162+
// have failed.
1163+
storeEngine.resetCompactionWriter();
1164+
11591165
if (cr.isMajor()) {
11601166
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
11611167
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@
4242
import org.apache.hadoop.hbase.log.HBaseMarkers;
4343
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
4444
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
45+
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
4546
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
4647
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
4748
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
49+
import org.apache.hadoop.hbase.security.User;
4850
import org.apache.hadoop.hbase.util.ReflectionUtils;
4951
import org.apache.yetus.audience.InterfaceAudience;
5052
import org.slf4j.Logger;
@@ -532,6 +534,25 @@ public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
532534
}
533535
}
534536

537+
/**
538+
* Whether the implementation of the used storefile tracker requires you to write to temp
539+
* directory first, i.e, does not allow broken store files under the actual data directory.
540+
*/
541+
public boolean requireWritingToTmpDirFirst() {
542+
return storeFileTracker.requireWritingToTmpDirFirst();
543+
}
544+
545+
/**
546+
* Resets the compaction writer when the new file is committed and used as active storefile.
547+
* This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
548+
* CleanerChore know that compaction is done and the file can be cleaned up if compaction
549+
* have failed. Currently called in
550+
* @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
551+
*/
552+
public void resetCompactionWriter(){
553+
compactor.resetWriter();
554+
}
555+
535556
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
536557
allowedOnPath = ".*/TestHStore.java")
537558
ReadWriteLock getLock() {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void setNoStripeMetadata() {
5858
}
5959

6060
@Override
61-
protected Collection<StoreFileWriter> writers() {
61+
public Collection<StoreFileWriter> writers() {
6262
return existingWriters;
6363
}
6464

0 commit comments

Comments
 (0)