Skip to content

Commit 1e2fd5f

Browse files
BukrosSzabolcsapurtell
authored andcommitted
HBASE-26271 Cleanup the broken store files under data directory (apache#3786)
Signed-off-by: Duo Zhang <[email protected]> Signed-off-by: Josh Elser <[email protected]> Signed-off-by: Wellington Ramos Chevreuil <[email protected]> Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
1 parent c609f5c commit 1e2fd5f

22 files changed

+566
-33
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.hadoop.hbase.CellUtil;
2929
import org.apache.hadoop.hbase.PrivateCellUtil;
3030
import org.apache.hadoop.hbase.KeyValue;
31-
import org.apache.hadoop.hbase.regionserver.CellSink;
3231
import org.apache.hadoop.hbase.regionserver.HMobStore;
3332
import org.apache.hadoop.hbase.regionserver.HStore;
3433
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -52,6 +51,8 @@
5251
import org.slf4j.Logger;
5352
import org.slf4j.LoggerFactory;
5453

54+
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
55+
5556
/**
5657
* Compact passed set of files in the mob-enabled column family.
5758
*/
@@ -154,7 +155,6 @@ public List<Path> compact(CompactionRequestImpl request, ThroughputController th
154155
* the scanner to filter the deleted cells.
155156
* @param fd File details
156157
* @param scanner Where to read from.
157-
* @param writer Where to write to.
158158
* @param smallestReadPoint Smallest read point.
159159
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
160160
* @param throughputController The compaction throughput controller.
@@ -163,10 +163,9 @@ public List<Path> compact(CompactionRequestImpl request, ThroughputController th
163163
* @return Whether compaction ended; false if it was interrupted for any reason.
164164
*/
165165
@Override
166-
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
166+
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
167167
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
168168
boolean major, int numofFilesToCompact) throws IOException {
169-
long bytesWrittenProgressForCloseCheck = 0;
170169
long bytesWrittenProgressForLog = 0;
171170
long bytesWrittenProgressForShippedCall = 0;
172171
// Since scanner.next() can return 'false' but still be delivering data,
@@ -369,4 +368,14 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
369368
progress.complete();
370369
return true;
371370
}
371+
372+
373+
@Override
374+
protected List<Path> commitWriter(FileDetails fd,
375+
CompactionRequestImpl request) throws IOException {
376+
List<Path> newFiles = Lists.newArrayList(writer.getPath());
377+
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
378+
writer.close();
379+
return newFiles;
380+
}
372381
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,11 @@ public List<Path> abortWriters() {
110110
return paths;
111111
}
112112

113-
protected abstract Collection<StoreFileWriter> writers();
113+
/**
114+
* Returns all writers. This is used to prevent deleting currently writen storefiles
115+
* during cleanup.
116+
*/
117+
public abstract Collection<StoreFileWriter> writers();
114118

115119
/**
116120
* Subclasses override this method to be called at the end of a successful sequence of append; all
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver;
19+
20+
import java.io.IOException;
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicLong;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.fs.FileStatus;
27+
import org.apache.hadoop.fs.Path;
28+
import org.apache.hadoop.hbase.ScheduledChore;
29+
import org.apache.hadoop.hbase.Stoppable;
30+
import org.apache.hadoop.hbase.io.HFileLink;
31+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
32+
import org.apache.hadoop.ipc.RemoteException;
33+
import org.apache.yetus.audience.InterfaceAudience;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
/**
38+
* This Chore, every time it runs, will clear the unsused HFiles in the data
39+
* folder.
40+
*/
41+
@InterfaceAudience.Private
42+
public class BrokenStoreFileCleaner extends ScheduledChore {
43+
private static final Logger LOG = LoggerFactory.getLogger(BrokenStoreFileCleaner.class);
44+
public static final String BROKEN_STOREFILE_CLEANER_ENABLED =
45+
"hbase.region.broken.storefilecleaner.enabled";
46+
public static final boolean DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED = false;
47+
public static final String BROKEN_STOREFILE_CLEANER_TTL =
48+
"hbase.region.broken.storefilecleaner.ttl";
49+
public static final long DEFAULT_BROKEN_STOREFILE_CLEANER_TTL = 1000 * 60 * 60 * 12; //12h
50+
public static final String BROKEN_STOREFILE_CLEANER_DELAY =
51+
"hbase.region.broken.storefilecleaner.delay";
52+
public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY = 1000 * 60 * 60 * 2; //2h
53+
public static final String BROKEN_STOREFILE_CLEANER_DELAY_JITTER =
54+
"hbase.region.broken.storefilecleaner.delay.jitter";
55+
public static final double DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER = 0.25D;
56+
public static final String BROKEN_STOREFILE_CLEANER_PERIOD =
57+
"hbase.region.broken.storefilecleaner.period";
58+
public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD = 1000 * 60 * 60 * 6; //6h
59+
60+
private HRegionServer regionServer;
61+
private final AtomicBoolean enabled = new AtomicBoolean(true);
62+
private long fileTtl;
63+
64+
public BrokenStoreFileCleaner(final int delay, final int period, final Stoppable stopper,
65+
Configuration conf, HRegionServer regionServer) {
66+
super("BrokenStoreFileCleaner", stopper, period, delay);
67+
this.regionServer = regionServer;
68+
setEnabled(
69+
conf.getBoolean(BROKEN_STOREFILE_CLEANER_ENABLED, DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED));
70+
fileTtl = conf.getLong(BROKEN_STOREFILE_CLEANER_TTL, DEFAULT_BROKEN_STOREFILE_CLEANER_TTL);
71+
}
72+
73+
public boolean setEnabled(final boolean enabled) {
74+
return this.enabled.getAndSet(enabled);
75+
}
76+
77+
public boolean getEnabled() {
78+
return this.enabled.get();
79+
}
80+
81+
@Override
82+
public void chore() {
83+
if (getEnabled()) {
84+
long start = EnvironmentEdgeManager.currentTime();
85+
AtomicLong deletedFiles = new AtomicLong(0);
86+
AtomicLong failedDeletes = new AtomicLong(0);
87+
for (HRegion region : regionServer.getRegions()) {
88+
for (HStore store : region.getStores()) {
89+
//only do cleanup in stores not using tmp directories
90+
if (store.getStoreEngine().requireWritingToTmpDirFirst()) {
91+
continue;
92+
}
93+
Path storePath =
94+
new Path(region.getRegionFileSystem().getRegionDir(), store.getColumnFamilyName());
95+
96+
try {
97+
List<FileStatus> fsStoreFiles =
98+
Arrays.asList(region.getRegionFileSystem().fs.listStatus(storePath));
99+
fsStoreFiles.forEach(
100+
file -> cleanFileIfNeeded(file, store, deletedFiles, failedDeletes));
101+
} catch (IOException e) {
102+
LOG.warn("Failed to list files in {}, cleanup is skipped there",storePath);
103+
continue;
104+
}
105+
}
106+
}
107+
LOG.debug(
108+
"BrokenStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed "
109+
+ "to delete {}",
110+
regionServer.getServerName().getServerName(), EnvironmentEdgeManager.currentTime() - start,
111+
deletedFiles.get(), failedDeletes.get());
112+
} else {
113+
LOG.trace("Broken storefile Cleaner chore disabled! Not cleaning.");
114+
}
115+
}
116+
117+
private void cleanFileIfNeeded(FileStatus file, HStore store,
118+
AtomicLong deletedFiles, AtomicLong failedDeletes) {
119+
if(file.isDirectory()){
120+
LOG.trace("This is a Directory {}, skip cleanup", file.getPath());
121+
return;
122+
}
123+
124+
if(!validate(file.getPath())){
125+
LOG.trace("Invalid file {}, skip cleanup", file.getPath());
126+
return;
127+
}
128+
129+
if(!isOldEnough(file)){
130+
LOG.trace("Fresh file {}, skip cleanup", file.getPath());
131+
return;
132+
}
133+
134+
if(isActiveStorefile(file, store)){
135+
LOG.trace("Actively used storefile file {}, skip cleanup", file.getPath());
136+
return;
137+
}
138+
139+
// Compacted files can still have readers and are cleaned by a separate chore, so they have to
140+
// be skipped here
141+
if(isCompactedFile(file, store)){
142+
LOG.trace("Cleanup is done by a different chore for file {}, skip cleanup", file.getPath());
143+
return;
144+
}
145+
146+
if(isCompactionResultFile(file, store)){
147+
LOG.trace("The file is the result of an ongoing compaction {}, skip cleanup", file.getPath());
148+
return;
149+
}
150+
151+
deleteFile(file, store, deletedFiles, failedDeletes);
152+
}
153+
154+
private boolean isCompactionResultFile(FileStatus file, HStore store) {
155+
return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath());
156+
}
157+
158+
// Compacted files can still have readers and are cleaned by a separate chore, so they have to
159+
// be skipped here
160+
private boolean isCompactedFile(FileStatus file, HStore store) {
161+
return store.getStoreEngine().getStoreFileManager().getCompactedfiles().stream()
162+
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
163+
}
164+
165+
private boolean isActiveStorefile(FileStatus file, HStore store) {
166+
return store.getStoreEngine().getStoreFileManager().getStorefiles().stream()
167+
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
168+
}
169+
170+
boolean validate(Path file) {
171+
if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
172+
return true;
173+
}
174+
return StoreFileInfo.validateStoreFileName(file.getName());
175+
}
176+
177+
boolean isOldEnough(FileStatus file){
178+
return file.getModificationTime() + fileTtl < EnvironmentEdgeManager.currentTime();
179+
}
180+
181+
private void deleteFile(FileStatus file, HStore store, AtomicLong deletedFiles,
182+
AtomicLong failedDeletes) {
183+
Path filePath = file.getPath();
184+
LOG.debug("Removing {} from store", filePath);
185+
try {
186+
boolean success = store.getFileSystem().delete(filePath, false);
187+
if (!success) {
188+
failedDeletes.incrementAndGet();
189+
LOG.warn("Attempted to delete:" + filePath
190+
+ ", but couldn't. Attempt to delete on next pass.");
191+
}
192+
else{
193+
deletedFiles.incrementAndGet();
194+
}
195+
} catch (IOException e) {
196+
e = e instanceof RemoteException ?
197+
((RemoteException)e).unwrapRemoteException() : e;
198+
LOG.warn("Error while deleting: " + filePath, e);
199+
}
200+
}
201+
202+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void append(Cell cell) throws IOException {
7171
}
7272

7373
@Override
74-
protected Collection<StoreFileWriter> writers() {
74+
public Collection<StoreFileWriter> writers() {
7575
return lowerBoundary2Writer.values();
7676
}
7777

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,7 @@ public Path commitDaughterRegion(final RegionInfo regionInfo, List<Path> allRegi
609609
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
610610
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
611611
env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
612-
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
612+
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
613613
}
614614
return regionDir;
615615
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,8 @@ public class HRegionServer extends Thread implements
548548
*/
549549
protected final ConfigurationManager configurationManager;
550550

551+
private BrokenStoreFileCleaner brokenStoreFileCleaner;
552+
551553
@InterfaceAudience.Private
552554
CompactedHFilesDischarger compactedFileDischarger;
553555

@@ -2167,6 +2169,9 @@ private void startServices() throws IOException {
21672169
if (this.slowLogTableOpsChore != null) {
21682170
choreService.scheduleChore(slowLogTableOpsChore);
21692171
}
2172+
if (this.brokenStoreFileCleaner != null) {
2173+
choreService.scheduleChore(brokenStoreFileCleaner);
2174+
}
21702175

21712176
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
21722177
// an unhandled exception, it will just exit.
@@ -2247,6 +2252,22 @@ private void initializeThreads() {
22472252
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
22482253
onlyMetaRefresh, this, this);
22492254
}
2255+
2256+
int brokenStoreFileCleanerPeriod = conf.getInt(
2257+
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
2258+
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
2259+
int brokenStoreFileCleanerDelay = conf.getInt(
2260+
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
2261+
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
2262+
double brokenStoreFileCleanerDelayJitter = conf.getDouble(
2263+
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
2264+
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
2265+
double jitterRate = (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
2266+
long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
2267+
this.brokenStoreFileCleaner =
2268+
new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
2269+
brokenStoreFileCleanerPeriod, this, conf, this);
2270+
22502271
registerConfigurationObservers();
22512272
}
22522273

@@ -4039,4 +4060,9 @@ public Iterator<ServerName> getBootstrapNodes() {
40394060
public MetaRegionLocationCache getMetaRegionLocationCache() {
40404061
return this.metaRegionLocationCache;
40414062
}
4063+
4064+
@InterfaceAudience.Private
4065+
public BrokenStoreFileCleaner getBrokenStoreFileCleaner(){
4066+
return brokenStoreFileCleaner;
4067+
}
40424068
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1153,6 +1153,12 @@ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
11531153
}
11541154
}
11551155
replaceStoreFiles(filesToCompact, sfs, true);
1156+
1157+
// This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
1158+
// CleanerChore know that compaction is done and the file can be cleaned up if compaction
1159+
// have failed.
1160+
storeEngine.resetCompactionWriter();
1161+
11561162
if (cr.isMajor()) {
11571163
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
11581164
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@
4242
import org.apache.hadoop.hbase.log.HBaseMarkers;
4343
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
4444
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
45+
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
4546
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
4647
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
4748
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
49+
import org.apache.hadoop.hbase.security.User;
4850
import org.apache.hadoop.hbase.util.ReflectionUtils;
4951
import org.apache.yetus.audience.InterfaceAudience;
5052
import org.slf4j.Logger;
@@ -532,6 +534,25 @@ public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
532534
}
533535
}
534536

537+
/**
538+
* Whether the implementation of the used storefile tracker requires you to write to temp
539+
* directory first, i.e, does not allow broken store files under the actual data directory.
540+
*/
541+
public boolean requireWritingToTmpDirFirst() {
542+
return storeFileTracker.requireWritingToTmpDirFirst();
543+
}
544+
545+
/**
546+
* Resets the compaction writer when the new file is committed and used as active storefile.
547+
* This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
548+
* CleanerChore know that compaction is done and the file can be cleaned up if compaction
549+
* have failed. Currently called in
550+
* @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
551+
*/
552+
public void resetCompactionWriter(){
553+
compactor.resetWriter();
554+
}
555+
535556
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
536557
allowedOnPath = ".*/TestHStore.java")
537558
ReadWriteLock getLock() {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void setNoStripeMetadata() {
5858
}
5959

6060
@Override
61-
protected Collection<StoreFileWriter> writers() {
61+
public Collection<StoreFileWriter> writers() {
6262
return existingWriters;
6363
}
6464

0 commit comments

Comments
 (0)