Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -38,6 +39,7 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
Expand All @@ -51,6 +53,7 @@
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
Expand Down Expand Up @@ -146,10 +149,14 @@ public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> s
@Override
public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException {
boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
throws IOException {
// make this writer with tags always because of possible new cells with tags.
return store.getStoreEngine().createWriter(
createParams(fd, shouldDropBehind, major).includeMVCCReadpoint(true).includesTag(true));
return store.getStoreEngine()
.createWriter(
createParams(fd, shouldDropBehind, major, writerCreationTracker)
.includeMVCCReadpoint(true)
.includesTag(true));
}
};

Expand Down Expand Up @@ -285,17 +292,19 @@ private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throw
* </ol>
* @param fd File details
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @param throughputController The compaction throughput controller.
* @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for any reason.
*/
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Clear old mob references
Expand Down Expand Up @@ -661,9 +670,8 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId
}
}


@Override
protected List<Path> commitWriter(FileDetails fd,
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -112,7 +113,7 @@ public DefaultMobStoreFlusher(Configuration conf, HStore store) throws IOExcepti
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
ArrayList<Path> result = new ArrayList<>();
long cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
Expand All @@ -126,7 +127,7 @@ public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
writer = createWriter(snapshot, true);
writer = createWriter(snapshot, true, writerCreationTracker);
IOException e = null;
try {
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void init(StoreScanner sourceScanner, WriterFactory factory) {
* comments in HBASE-15400 for more details.
*/
public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException {
return commitWriters(maxSeqId, majorCompaction, Collections.EMPTY_SET);
return commitWriters(maxSeqId, majorCompaction, Collections.emptyList());
}

public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
Expand Down Expand Up @@ -110,11 +110,7 @@ public List<Path> abortWriters() {
return paths;
}

/**
* Returns all writers. This is used to prevent deleting currently writen storefiles
* during cleanup.
*/
public abstract Collection<StoreFileWriter> writers();
protected abstract Collection<StoreFileWriter> writers();

/**
* Subclasses override this method to be called at the end of a successful sequence of append; all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private void cleanFileIfNeeded(FileStatus file, HStore store,
}

private boolean isCompactionResultFile(FileStatus file, HStore store) {
return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath());
return store.getStoreFilesBeingWritten().contains(file.getPath());
}

// Compacted files can still have readers and are cleaned by a separate chore, so they have to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver;

import java.util.function.Consumer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -40,6 +42,8 @@ public final class CreateStoreFileWriterParams {

private String fileStoragePolicy = HConstants.EMPTY_STRING;

private Consumer<Path> writerCreationTracker;

private CreateStoreFileWriterParams() {
}

Expand Down Expand Up @@ -127,8 +131,16 @@ public CreateStoreFileWriterParams fileStoragePolicy(String fileStoragePolicy) {
return this;
}

public Consumer<Path> writerCreationTracker() {
return writerCreationTracker;
}

public CreateStoreFileWriterParams writerCreationTracker(Consumer<Path> writerCreationTracker) {
this.writerCreationTracker = writerCreationTracker;
return this;
}

public static CreateStoreFileWriterParams create() {
return new CreateStoreFileWriterParams();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void append(Cell cell) throws IOException {
}

@Override
public Collection<StoreFileWriter> writers() {
protected Collection<StoreFileWriter> writers() {
return lowerBoundary2Writer.values();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
Expand All @@ -44,8 +45,8 @@ public DefaultStoreFlusher(Configuration conf, HStore store) {

@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
Consumer<Path> writerCreationTracker) throws IOException {
ArrayList<Path> result = new ArrayList<>();
int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
Expand All @@ -59,7 +60,7 @@ public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
writer = createWriter(snapshot, false);
writer = createWriter(snapshot, false, writerCreationTracker);
IOException e = null;
try {
performFlush(scanner, writer, throughputController);
Expand Down
Loading