diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlushContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlushContext.java new file mode 100644 index 000000000000..b97210e41d0d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlushContext.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.log.HBaseMarkers; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.util.StringUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; + +/** + * Default implementation of StoreFlushContext, that assumes hfiles are flushed to temp files + * first, so that upon commit phase, these hfiles need to be renamed into the final family dir. + */ +@InterfaceAudience.Private +public class DefaultStoreFlushContext extends StoreFlushContext { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultStoreFlushContext.class); + + private MemStoreSnapshot snapshot; + private List tempFiles; + private List committedFiles; + private long cacheFlushCount; + private long cacheFlushSize; + private long outputFileSize; + + public void init(HStore store, Long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { + super.init(store, cacheFlushSeqNum, tracker); + } + + /** + * This is not thread safe. The caller should have a lock on the region or the store. + * If necessary, the lock can be added with the patch provided in HBASE-10087 + */ + @Override + public MemStoreSize prepare() { + // passing the current sequence number of the wal - to allow bookkeeping in the memstore + this.snapshot = store.memstore.snapshot(); + this.cacheFlushCount = snapshot.getCellsCount(); + this.cacheFlushSize = snapshot.getDataSize(); + committedFiles = new ArrayList<>(1); + return snapshot.getMemStoreSize(); + } + + @Override + public void flushCache(MonitoredTask status) throws IOException { + RegionServerServices rsService = store.getHRegion().getRegionServerServices(); + ThroughputController throughputController = + rsService == null ? null : rsService.getFlushThroughputController(); + tempFiles = + store.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker); + } + + @Override + public boolean commit(MonitoredTask status) throws IOException { + if (CollectionUtils.isEmpty(this.tempFiles)) { + return false; + } + List storeFiles = new ArrayList<>(this.tempFiles.size()); + for (Path storeFilePath : tempFiles) { + try { + HStoreFile sf = store.commitFile(storeFilePath, cacheFlushSeqNum, status); + outputFileSize += sf.getReader().length(); + storeFiles.add(sf); + } catch (IOException ex) { + LOG.error("Failed to commit store file {}", storeFilePath, ex); + // Try to delete the files we have committed before. + for (HStoreFile sf : storeFiles) { + Path pathToDelete = sf.getPath(); + try { + sf.deleteStoreFile(); + } catch (IOException deleteEx) { + LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we committed, " + + "halting {}", pathToDelete, ex); + Runtime.getRuntime().halt(1); + } + } + throw new IOException("Failed to commit the flush", ex); + } + } + + for (HStoreFile sf : storeFiles) { + if (store.getCoprocessorHost() != null) { + store.getCoprocessorHost().postFlush(store, sf, tracker); + } + committedFiles.add(sf.getPath()); + } + + store.flushedCellsCount.addAndGet(cacheFlushCount); + store.flushedCellsSize.addAndGet(cacheFlushSize); + store.flushedOutputFileSize.addAndGet(outputFileSize); + + // Add new file to store files. Clear snapshot too while we have the Store write lock. + return store.updateStorefiles(storeFiles, snapshot.getId()); + } + + @Override + public long getOutputFileSize() { + return outputFileSize; + } + + @Override + public List getCommittedFiles() { + return committedFiles; + } + + /** + * Similar to commit, but called in secondary region replicas for replaying the + * flush cache from primary region. Adds the new files to the store, and drops the + * snapshot depending on dropMemstoreSnapshot argument. + * @param fileNames names of the flushed files + * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot + * @throws IOException If the flush replay fails + */ + @Override + public void replayFlush(List fileNames, boolean dropMemstoreSnapshot) + throws IOException { + List storeFiles = new ArrayList<>(fileNames.size()); + for (String file : fileNames) { + // open the file as a store file (hfile link, etc) + StoreFileInfo storeFileInfo = store.getRegionFileSystem(). + getStoreFileInfo(store.getColumnFamilyName(), file); + HStoreFile storeFile = store.createStoreFileAndReader(storeFileInfo); + storeFiles.add(storeFile); + store.storeSize.addAndGet(storeFile.getReader().length()); + store.totalUncompressedBytes + .addAndGet(storeFile.getReader().getTotalUncompressedBytes()); + if (LOG.isInfoEnabled()) { + LOG.info("Region: " + store.getRegionInfo().getEncodedName() + + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() + + ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize=" + + StringUtils.TraditionalBinaryPrefix + .long2String(storeFile.getReader().length(), "", 1)); + } + } + + long snapshotId = -1; // -1 means do not drop + if (dropMemstoreSnapshot && snapshot != null) { + snapshotId = snapshot.getId(); + snapshot.close(); + } + store.updateStorefiles(storeFiles, snapshotId); + } + + /** + * Abort the snapshot preparation. Drops the snapshot if any. + * @throws IOException if the abort fails. + */ + @Override + public void abort() throws IOException { + if (snapshot != null) { + //We need to close the snapshot when aborting, otherwise, the segment scanner + //won't be closed. If we are using MSLAB, the chunk referenced by those scanners + //can't be released, thus memory leak + snapshot.close(); + store.updateStorefiles(Collections.emptyList(), snapshot.getId()); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 5c97c5f66ba0..7b558c9ad641 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -82,7 +82,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; -import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.quotas.RegionSizeStore; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; @@ -137,6 +136,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver { public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class"; + public static final String STORE_FLUSH_CONTEXT_CLASS_NAME = + "hbase.regionserver.store.flush.context.class"; public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = "hbase.server.compactchecker.interval.multiplier"; public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; @@ -160,8 +161,10 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, protected Configuration conf; private long lastCompactSize = 0; volatile boolean forceMajor = false; - private AtomicLong storeSize = new AtomicLong(); - private AtomicLong totalUncompressedBytes = new AtomicLong(); + /* how many bytes to write between status checks */ + static int closeCheckInterval = 0; + AtomicLong storeSize = new AtomicLong(); + AtomicLong totalUncompressedBytes = new AtomicLong(); private LongAdder memstoreOnlyRowReadsCount = new LongAdder(); // rows that has cells from both memstore and files (or only files) private LongAdder mixedRowReadsCount = new LongAdder(); @@ -225,11 +228,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, private long blockingFileCount; private int compactionCheckMultiplier; - private AtomicLong flushedCellsCount = new AtomicLong(); + AtomicLong flushedCellsCount = new AtomicLong(); private AtomicLong compactedCellsCount = new AtomicLong(); private AtomicLong majorCompactedCellsCount = new AtomicLong(); - private AtomicLong flushedCellsSize = new AtomicLong(); - private AtomicLong flushedOutputFileSize = new AtomicLong(); + AtomicLong flushedCellsSize = new AtomicLong(); + AtomicLong flushedOutputFileSize = new AtomicLong(); private AtomicLong compactedCellsSize = new AtomicLong(); private AtomicLong majorCompactedCellsSize = new AtomicLong(); @@ -296,8 +299,8 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family, List hStoreFiles = loadStoreFiles(warmup); // Move the storeSize calculation out of loadStoreFiles() method, because the secondary read // replica's refreshStoreFiles() will also use loadStoreFiles() to refresh its store files and - // update the storeSize in the refreshStoreSizeAndTotalBytes() finally (just like compaction) , so - // no need calculate the storeSize twice. + // update the storeSize in the refreshStoreSizeAndTotalBytes() finally (just like compaction) , + // so no need calculate the storeSize twice. this.storeSize.addAndGet(getStorefilesSize(hStoreFiles, sf -> true)); this.totalUncompressedBytes.addAndGet(getTotalUncompressedBytes(hStoreFiles)); this.storeEngine.getStoreFileManager().loadFiles(hStoreFiles); @@ -697,7 +700,7 @@ protected HStoreFile createStoreFileAndReader(final Path p) throws IOException { return createStoreFileAndReader(info); } - private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { + HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); HStoreFile storeFile = new HStoreFile(info, getColumnFamilyDescriptor().getBloomFilterType(), getCacheConfig()); @@ -1091,7 +1094,7 @@ public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException { * @param path The pathname of the tmp file into which the store was flushed * @return store file created. */ - private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status) + HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status) throws IOException { // Write-out finished successfully, move into the right spot Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path); @@ -1223,7 +1226,7 @@ private long getTotalSize(Collection sfs) { * @param sfs Store files * @return Whether compaction is required. */ - private boolean updateStorefiles(List sfs, long snapshotId) throws IOException { + boolean updateStorefiles(List sfs, long snapshotId) throws IOException { this.lock.writeLock().lock(); try { this.storeEngine.getStoreFileManager().insertNewFiles(sfs); @@ -2360,149 +2363,16 @@ public void upsert(Iterable cells, long readpoint, MemStoreSizing memstore } } - public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) { - return new StoreFlusherImpl(cacheFlushId, tracker); - } - - private final class StoreFlusherImpl implements StoreFlushContext { - - private final FlushLifeCycleTracker tracker; - private final long cacheFlushSeqNum; - private MemStoreSnapshot snapshot; - private List tempFiles; - private List committedFiles; - private long cacheFlushCount; - private long cacheFlushSize; - private long outputFileSize; - - private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { - this.cacheFlushSeqNum = cacheFlushSeqNum; - this.tracker = tracker; - } - - /** - * This is not thread safe. The caller should have a lock on the region or the store. - * If necessary, the lock can be added with the patch provided in HBASE-10087 - */ - @Override - public MemStoreSize prepare() { - // passing the current sequence number of the wal - to allow bookkeeping in the memstore - this.snapshot = memstore.snapshot(); - this.cacheFlushCount = snapshot.getCellsCount(); - this.cacheFlushSize = snapshot.getDataSize(); - committedFiles = new ArrayList<>(1); - return snapshot.getMemStoreSize(); - } - - @Override - public void flushCache(MonitoredTask status) throws IOException { - RegionServerServices rsService = region.getRegionServerServices(); - ThroughputController throughputController = - rsService == null ? null : rsService.getFlushThroughputController(); - tempFiles = - HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker); - } - - @Override - public boolean commit(MonitoredTask status) throws IOException { - if (CollectionUtils.isEmpty(this.tempFiles)) { - return false; - } - List storeFiles = new ArrayList<>(this.tempFiles.size()); - for (Path storeFilePath : tempFiles) { - try { - HStoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status); - outputFileSize += sf.getReader().length(); - storeFiles.add(sf); - } catch (IOException ex) { - LOG.error("Failed to commit store file {}", storeFilePath, ex); - // Try to delete the files we have committed before. - for (HStoreFile sf : storeFiles) { - Path pathToDelete = sf.getPath(); - try { - sf.deleteStoreFile(); - } catch (IOException deleteEx) { - LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we committed, " - + "halting {}", pathToDelete, ex); - Runtime.getRuntime().halt(1); - } - } - throw new IOException("Failed to commit the flush", ex); - } - } - - for (HStoreFile sf : storeFiles) { - if (HStore.this.getCoprocessorHost() != null) { - HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker); - } - committedFiles.add(sf.getPath()); - } - - HStore.this.flushedCellsCount.addAndGet(cacheFlushCount); - HStore.this.flushedCellsSize.addAndGet(cacheFlushSize); - HStore.this.flushedOutputFileSize.addAndGet(outputFileSize); - - // Add new file to store files. Clear snapshot too while we have the Store write lock. - return HStore.this.updateStorefiles(storeFiles, snapshot.getId()); - } - - @Override - public long getOutputFileSize() { - return outputFileSize; - } - - @Override - public List getCommittedFiles() { - return committedFiles; - } - - /** - * Similar to commit, but called in secondary region replicas for replaying the - * flush cache from primary region. Adds the new files to the store, and drops the - * snapshot depending on dropMemstoreSnapshot argument. - * @param fileNames names of the flushed files - * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot - */ - @Override - public void replayFlush(List fileNames, boolean dropMemstoreSnapshot) - throws IOException { - List storeFiles = new ArrayList<>(fileNames.size()); - for (String file : fileNames) { - // open the file as a store file (hfile link, etc) - StoreFileInfo storeFileInfo = - getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file); - HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); - storeFiles.add(storeFile); - HStore.this.storeSize.addAndGet(storeFile.getReader().length()); - HStore.this.totalUncompressedBytes - .addAndGet(storeFile.getReader().getTotalUncompressedBytes()); - if (LOG.isInfoEnabled()) { - LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() + - ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize=" - + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1)); - } - } - - long snapshotId = -1; // -1 means do not drop - if (dropMemstoreSnapshot && snapshot != null) { - snapshotId = snapshot.getId(); - snapshot.close(); - } - HStore.this.updateStorefiles(storeFiles, snapshotId); - } - - /** - * Abort the snapshot preparation. Drops the snapshot if any. - */ - @Override - public void abort() throws IOException { - if (snapshot != null) { - //We need to close the snapshot when aborting, otherwise, the segment scanner - //won't be closed. If we are using MSLAB, the chunk referenced by those scanners - //can't be released, thus memory leak - snapshot.close(); - HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId()); - } + public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) + throws IOException { + Class flushContextClass = (Class) + conf.getClass(STORE_FLUSH_CONTEXT_CLASS_NAME, DefaultStoreFlushContext.class); + try { + StoreFlushContext flushContext = flushContextClass.getDeclaredConstructor().newInstance(); + flushContext.init(this, cacheFlushId, tracker); + return flushContext; + } catch (Exception e) { + throw new IOException(e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java index e53fdc0de2a6..e032916582b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java @@ -29,15 +29,30 @@ * A store flush context carries the state required to prepare/flush/commit the store's cache. */ @InterfaceAudience.Private -interface StoreFlushContext { +public abstract class StoreFlushContext { + protected HStore store; + protected long cacheFlushSeqNum; + protected FlushLifeCycleTracker tracker; + + /** + * Initializes StoreFlushContext fields. Needs to be called after construction. + * @param store The HStore instance managing files + * @param cacheFlushSeqNum the seqnum for the memstore cache flush + * @param tracker the tracker for the flush cycle + */ + public void init(HStore store, long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { + this.store = store; + this.cacheFlushSeqNum = cacheFlushSeqNum; + this.tracker = tracker; + } /** * Prepare for a store flush (create snapshot) * Requires pausing writes. * A very short operation. * @return The size of snapshot to flush */ - MemStoreSize prepare(); + abstract MemStoreSize prepare(); /** * Flush the cache (create the new store file) @@ -47,7 +62,7 @@ interface StoreFlushContext { * * @throws IOException in case the flush fails */ - void flushCache(MonitoredTask status) throws IOException; + abstract void flushCache(MonitoredTask status) throws IOException; /** * Commit the flush - add the store file to the store and clear the @@ -58,9 +73,9 @@ interface StoreFlushContext { * A very short operation * * @return whether compaction is required - * @throws IOException + * @throws IOException if the commit fails */ - boolean commit(MonitoredTask status) throws IOException; + abstract boolean commit(MonitoredTask status) throws IOException; /** * Similar to commit, but called in secondary region replicas for replaying the @@ -68,24 +83,25 @@ interface StoreFlushContext { * snapshot depending on dropMemstoreSnapshot argument. * @param fileNames names of the flushed files * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot - * @throws IOException + * @throws IOException if the replay flush fails */ - void replayFlush(List fileNames, boolean dropMemstoreSnapshot) throws IOException; + abstract void replayFlush(List fileNames, boolean dropMemstoreSnapshot) + throws IOException; /** * Abort the snapshot preparation. Drops the snapshot if any. - * @throws IOException + * @throws IOException if the abort operation fails */ - void abort() throws IOException; + abstract void abort() throws IOException; /** * Returns the newly committed files from the flush. Called only if commit returns true * @return a list of Paths for new files */ - List getCommittedFiles(); + abstract List getCommittedFiles(); /** * @return the total file size for flush output files, in bytes */ - long getOutputFileSize(); + abstract long getOutputFileSize(); }