Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
Expand All @@ -30,7 +31,6 @@
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -69,12 +69,13 @@ public class HalfStoreFileReader extends StoreFileReader {
* @param fileInfo HFile info
* @param cacheConf CacheConfig
* @param r original reference file (contains top or bottom)
* @param refCount reference count
* @param conf Configuration
*/
public HalfStoreFileReader(final ReaderContext context, final HFileInfo fileInfo,
final CacheConfig cacheConf, final Reference r, StoreFileInfo storeFileInfo,
final CacheConfig cacheConf, final Reference r, AtomicInteger refCount,
final Configuration conf) throws IOException {
super(context, fileInfo, cacheConf, storeFileInfo, conf);
super(context, fileInfo, cacheConf, refCount, conf);
// This is not actual midkey for this half-file; its just border
// around which we split top and bottom. Have to look in files to find
// actual last and first keys for bottom and top halves. Half-files don't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,12 @@ public boolean isCompactedAway() {
}

public int getRefCount() {
return fileInfo.getRefCount();
return fileInfo.refCount.get();
}

/** Returns true if the file is still used in reads */
public boolean isReferencedInReads() {
int rc = fileInfo.getRefCount();
int rc = fileInfo.refCount.get();
assert rc >= 0; // we should not go negative.
return rc > 0;
}
Expand Down Expand Up @@ -653,11 +653,11 @@ Set<String> getCompactedStoreFiles() {
}

long increaseRefCount() {
return this.fileInfo.increaseRefCount();
return this.fileInfo.refCount.incrementAndGet();
}

long decreaseRefCount() {
return this.fileInfo.decreaseRefCount();
return this.fileInfo.refCount.decrementAndGet();
}

static void increaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public class StoreFileInfo implements Configurable {
// Counter that is incremented every time a scanner is created on the
// store file. It is decremented when the scan on the store file is
// done.
private final AtomicInteger refCount = new AtomicInteger(0);
final AtomicInteger refCount = new AtomicInteger(0);

/**
* Create a Store File Info
Expand Down Expand Up @@ -274,13 +274,12 @@ public HDFSBlocksDistribution getHDFSBlockDistribution() {
return this.hdfsBlocksDistribution;
}

public StoreFileReader createReader(ReaderContext context, CacheConfig cacheConf)
throws IOException {
StoreFileReader createReader(ReaderContext context, CacheConfig cacheConf) throws IOException {
StoreFileReader reader = null;
if (this.reference != null) {
reader = new HalfStoreFileReader(context, hfileInfo, cacheConf, reference, this, conf);
reader = new HalfStoreFileReader(context, hfileInfo, cacheConf, reference, refCount, conf);
} else {
reader = new StoreFileReader(context, hfileInfo, cacheConf, this, conf);
reader = new StoreFileReader(context, hfileInfo, cacheConf, refCount, conf);
}
return reader;
}
Expand Down Expand Up @@ -650,7 +649,7 @@ boolean isNoReadahead() {
return this.noReadahead;
}

public HFileInfo getHFileInfo() {
HFileInfo getHFileInfo() {
return hfileInfo;
}

Expand Down Expand Up @@ -682,16 +681,4 @@ public void initHFileInfo(ReaderContext context) throws IOException {
this.hfileInfo = new HFileInfo(context, conf);
}

int getRefCount() {
return this.refCount.get();
}

int increaseRefCount() {
return this.refCount.incrementAndGet();
}

int decreaseRefCount() {
return this.refCount.decrementAndGet();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
Expand Down Expand Up @@ -77,26 +78,24 @@ public class StoreFileReader {
private int prefixLength = -1;
protected Configuration conf;

/**
* All {@link StoreFileReader} for the same StoreFile will share the
* {@link StoreFileInfo#refCount}. Counter that is incremented every time a scanner is created on
* the store file. It is decremented when the scan on the store file is done.
*/
private final StoreFileInfo storeFileInfo;
// Counter that is incremented every time a scanner is created on the
// store file. It is decremented when the scan on the store file is
// done. All StoreFileReader for the same StoreFile will share this counter.
private final AtomicInteger refCount;
private final ReaderContext context;

private StoreFileReader(HFile.Reader reader, StoreFileInfo storeFileInfo, ReaderContext context,
private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, ReaderContext context,
Configuration conf) {
this.reader = reader;
bloomFilterType = BloomType.NONE;
this.storeFileInfo = storeFileInfo;
this.refCount = refCount;
this.context = context;
this.conf = conf;
}

public StoreFileReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf,
StoreFileInfo storeFileInfo, Configuration conf) throws IOException {
this(HFile.createReader(context, fileInfo, cacheConf, conf), storeFileInfo, context, conf);
AtomicInteger refCount, Configuration conf) throws IOException {
this(HFile.createReader(context, fileInfo, cacheConf, conf), refCount, context, conf);
}

void copyFields(StoreFileReader storeFileReader) throws IOException {
Expand All @@ -121,7 +120,7 @@ public boolean isPrimaryReplicaReader() {
*/
@InterfaceAudience.Private
StoreFileReader() {
this.storeFileInfo = null;
this.refCount = new AtomicInteger(0);
this.reader = null;
this.context = null;
}
Expand Down Expand Up @@ -152,23 +151,23 @@ public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread,
* is opened.
*/
int getRefCount() {
return storeFileInfo.getRefCount();
return refCount.get();
}

/**
* Indicate that the scanner has started reading with this reader. We need to increment the ref
* count so reader is not close until some object is holding the lock
*/
void incrementRefCount() {
storeFileInfo.increaseRefCount();
refCount.incrementAndGet();
}

/**
* Indicate that the scanner has finished reading with this reader. We need to decrement the ref
* count, and also, if this is not the common pread reader, we should close it.
*/
void readCompleted() {
storeFileInfo.decreaseRefCount();
refCount.decrementAndGet();
if (context.getReaderType() == ReaderType.STREAM) {
try {
reader.close(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ HFile.Writer getHFileWriter() {
* @param dir Directory to create file in.
* @return random filename inside passed <code>dir</code>
*/
public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
if (!fs.getFileStatus(dir).isDirectory()) {
throw new IOException("Expecting " + dir.toString() + " to be a directory");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1168,11 +1168,10 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
StoreFileWriter halfWriter = null;
try {
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build();
StoreFileInfo storeFileInfo =
new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference);
storeFileInfo.initHFileInfo(context);
halfReader = (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader());
HFileInfo hfile = new HFileInfo(context, conf);
halfReader =
new HalfStoreFileReader(context, hfile, cacheConf, reference, new AtomicInteger(0), conf);
hfile.initMetaAndIndex(halfReader.getHFileReader());
Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();

int blocksize = familyDescriptor.getBlocksize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -38,10 +39,10 @@
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -117,12 +118,10 @@ public void testHalfScanAndReseek() throws IOException {
private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, CacheConfig cacheConf)
throws IOException {
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, p).build();
StoreFileInfo storeFileInfo =
new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, fs.getFileStatus(p), bottom);
storeFileInfo.initHFileInfo(context);
final HalfStoreFileReader halfreader =
(HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
HFileInfo fileInfo = new HFileInfo(context, TEST_UTIL.getConfiguration());
final HalfStoreFileReader halfreader = new HalfStoreFileReader(context, fileInfo, cacheConf,
bottom, new AtomicInteger(0), TEST_UTIL.getConfiguration());
fileInfo.initMetaAndIndex(halfreader.getHFileReader());
halfreader.loadFileInfo();
final HFileScanner scanner = halfreader.getScanner(false, false);

Expand Down Expand Up @@ -215,12 +214,10 @@ public void testHalfScanner() throws IOException {
private Cell doTestOfSeekBefore(Path p, FileSystem fs, Reference bottom, Cell seekBefore,
CacheConfig cacheConfig) throws IOException {
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, p).build();
StoreFileInfo storeFileInfo =
new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, fs.getFileStatus(p), bottom);
storeFileInfo.initHFileInfo(context);
final HalfStoreFileReader halfreader =
(HalfStoreFileReader) storeFileInfo.createReader(context, cacheConfig);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
HFileInfo fileInfo = new HFileInfo(context, TEST_UTIL.getConfiguration());
final HalfStoreFileReader halfreader = new HalfStoreFileReader(context, fileInfo, cacheConfig,
bottom, new AtomicInteger(0), TEST_UTIL.getConfiguration());
fileInfo.initMetaAndIndex(halfreader.getHFileReader());
halfreader.loadFileInfo();
final HFileScanner scanner = halfreader.getScanner(false, false);
scanner.seekBefore(seekBefore);
Expand Down
Loading