Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ public static List<Tag> getTags(Cell cell) {
* Retrieve Cell's first tag, matching the passed in type
* @param cell The Cell
* @param type Type of the Tag to retrieve
* @return null if there is no tag of the passed in tag type
* @return Optional, empty if there is no tag of the passed in tag type
*/
public static Optional<Tag> getTag(Cell cell, byte type) {
boolean bufferBacked = cell instanceof ByteBufferExtendedCell;
Expand Down
60 changes: 52 additions & 8 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;

/**
* Immutable POJO class for representing a table name. Which is of the form: &lt;table
* namespace&gt;:&lt;table qualifier&gt; Two special namespaces: 1. hbase - system namespace, used
Expand Down Expand Up @@ -386,28 +389,69 @@ public static TableName valueOf(String namespaceAsString, String qualifierAsStri
}

/**
* @param fullName will use the entire byte array
* @throws IllegalArgumentException if fullName equals old root or old meta. Some code depends on
* this. The test is buried in the table creation to save on
* array comparison when we're creating a standard table object
* that will be in the cache.
*/
public static TableName valueOf(byte[] fullName) throws IllegalArgumentException {
return valueOf(fullName, 0, fullName.length);
}

/**
* @param fullName byte array to look into
* @param offset within said array
* @param length within said array
* @throws IllegalArgumentException if fullName equals old root or old meta.
*/
public static TableName valueOf(byte[] fullName, int offset, int length)
throws IllegalArgumentException {
Preconditions.checkArgument(offset >= 0, "offset must be non-negative but was %s", offset);
Preconditions.checkArgument(offset < fullName.length, "offset (%s) must be < array length (%s)",
offset, fullName.length);
Preconditions.checkArgument(length <= fullName.length,
"length (%s) must be <= array length (%s)", length, fullName.length);
for (TableName tn : tableCache) {
if (Arrays.equals(tn.getName(), fullName)) {
final byte[] tnName = tn.getName();
if (Bytes.equals(tnName, 0, tnName.length, fullName, offset, length)) {
return tn;
}
}

int namespaceDelimIndex = org.apache.hbase.thirdparty.com.google.common.primitives.Bytes
.lastIndexOf(fullName, (byte) NAMESPACE_DELIM);
int namespaceDelimIndex = ArrayUtils.lastIndexOf(fullName, (byte) NAMESPACE_DELIM);

if (namespaceDelimIndex < 0) {
if (namespaceDelimIndex < offset) {
return createTableNameIfNecessary(ByteBuffer.wrap(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME),
ByteBuffer.wrap(fullName));
ByteBuffer.wrap(fullName, offset, length));
} else {
return createTableNameIfNecessary(ByteBuffer.wrap(fullName, offset, namespaceDelimIndex),
ByteBuffer.wrap(fullName, namespaceDelimIndex + 1, length - (namespaceDelimIndex + 1)));
}
}

/**
* @param fullname of a table, possibly with a leading namespace and ':' as delimiter.
* @throws IllegalArgumentException if fullName equals old root or old meta.
*/
public static TableName valueOf(ByteBuffer fullname) {
fullname = fullname.duplicate();
fullname.mark();
boolean miss = true;
while (fullname.hasRemaining() && miss) {
miss = ((byte) NAMESPACE_DELIM) != fullname.get();
}
if (miss) {
fullname.reset();
return valueOf(null, fullname);
} else {
return createTableNameIfNecessary(ByteBuffer.wrap(fullName, 0, namespaceDelimIndex),
ByteBuffer.wrap(fullName, namespaceDelimIndex + 1,
fullName.length - (namespaceDelimIndex + 1)));
ByteBuffer qualifier = fullname.slice();
int delimiterIndex = fullname.position() - 1;
fullname.reset();
// changing variable name for clarity
ByteBuffer namespace = fullname.duplicate();
namespace.limit(delimiterIndex);
return valueOf(namespace, qualifier);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TimeZone;
Expand Down Expand Up @@ -422,17 +423,16 @@ private void scanKeysValues(Path file, KeyValueStatsCollector fileStats, HFileSc
}
// check if mob files are missing.
if (checkMobIntegrity && MobUtils.isMobReferenceCell(cell)) {
Tag tnTag = MobUtils.getTableNameTag(cell);
if (tnTag == null) {
Optional<TableName> tn = MobUtils.getTableName(cell);
if (!tn.isPresent()) {
System.err.println(
"ERROR, wrong tag format in mob reference cell " + CellUtil.getCellKeyAsString(cell));
} else if (!MobUtils.hasValidMobRefCellValue(cell)) {
System.err.println(
"ERROR, wrong value format in mob reference cell " + CellUtil.getCellKeyAsString(cell));
} else {
TableName tn = TableName.valueOf(Tag.cloneValue(tnTag));
String mobFileName = MobUtils.getMobFileName(cell);
boolean exist = mobFileExists(fs, tn, mobFileName,
boolean exist = mobFileExists(fs, tn.get(), mobFileName,
Bytes.toString(CellUtil.cloneFamily(cell)), foundMobFiles, missingMobFiles);
if (!exist) {
// report error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -37,9 +37,11 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
Expand All @@ -62,7 +64,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;

/**
* Compact passed set of files in the mob-enabled column family.
Expand All @@ -82,12 +87,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
* compaction process.
*/

static ThreadLocal<Set<String>> mobRefSet = new ThreadLocal<Set<String>>() {
@Override
protected Set<String> initialValue() {
return new HashSet<String>();
}
};
static ThreadLocal<SetMultimap<TableName, String>> mobRefSet =
ThreadLocal.withInitial(HashMultimap::create);

/*
* Is it user or system-originated request.
Expand Down Expand Up @@ -192,34 +193,72 @@ public List<Path> compact(CompactionRequestImpl request,
// Check if I/O optimized MOB compaction
if (ioOptimizedMode) {
if (request.isMajor() && request.getPriority() == HStore.PRIORITY_USER) {
Path mobDir =
MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
List<Path> mobFiles = MobUtils.getReferencedMobFiles(request.getFiles(), mobDir);
// reset disableIO
disableIO.set(Boolean.FALSE);
if (mobFiles.size() > 0) {
calculateMobLengthMap(mobFiles);
try {
final SetMultimap<TableName, String> mobRefs = request.getFiles().stream().map(file -> {
byte[] value = file.getMetadataValue(HStoreFile.MOB_FILE_REFS);
ImmutableSetMultimap.Builder<TableName, String> builder;
if (value == null) {
builder = ImmutableSetMultimap.builder();
} else {
try {
builder = MobUtils.deserializeMobFileRefs(value);
} catch (RuntimeException exception) {
throw new RuntimeException("failure getting mob references for hfile " + file,
exception);
}
}
return builder;
}).reduce((a, b) -> a.putAll(b.build())).orElseGet(ImmutableSetMultimap::builder).build();
// reset disableIO
disableIO.set(Boolean.FALSE);
if (!mobRefs.isEmpty()) {
calculateMobLengthMap(mobRefs);
}
LOG.info(
"Table={} cf={} region={}. I/O optimized MOB compaction. "
+ "Total referenced MOB files: {}",
tableName, familyName, regionName, mobRefs.size());
} catch (RuntimeException exception) {
throw new IOException("Failed to get list of referenced hfiles for request " + request,
exception);
}
LOG.info("Table={} cf={} region={}. I/O optimized MOB compaction. "
+ "Total referenced MOB files: {}", tableName, familyName, regionName, mobFiles.size());
}
}

return compact(request, scannerFactory, writerFactory, throughputController, user);
}

private void calculateMobLengthMap(List<Path> mobFiles) throws IOException {
/**
* @param mobRefs multimap of original table name -> mob hfile
*/
private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throws IOException {
FileSystem fs = store.getFileSystem();
HashMap<String, Long> map = mobLengthMap.get();
map.clear();
for (Path p : mobFiles) {
if (MobFileName.isOldMobFileName(p.getName())) {
for (Map.Entry<TableName, String> reference : mobRefs.entries()) {
final TableName table = reference.getKey();
final String mobfile = reference.getValue();
if (MobFileName.isOldMobFileName(mobfile)) {
disableIO.set(Boolean.TRUE);
}
FileStatus st = fs.getFileStatus(p);
long size = st.getLen();
LOG.debug("Referenced MOB file={} size={}", p, size);
map.put(p.getName(), fs.getFileStatus(p).getLen());
List<Path> locations = mobStore.getLocations(table);
for (Path p : locations) {
try {
FileStatus st = fs.getFileStatus(new Path(p, mobfile));
long size = st.getLen();
LOG.debug("Referenced MOB file={} size={}", mobfile, size);
map.put(mobfile, size);
break;
} catch (FileNotFoundException exception) {
LOG.debug("Mob file {} was not in location {}. May have other locations to try.", mobfile,
p);
}
}
if (!map.containsKey(mobfile)) {
throw new FileNotFoundException("Could not find mob file " + mobfile + " in the list of "
+ "expected locations: " + locations);
}

}
}

Expand Down Expand Up @@ -395,8 +434,15 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
// We leave large MOB file as is (is not compacted),
// then we update set of MOB file references
// and append mob cell directly to the store's writer
mobRefSet.get().add(fName);
writer.append(mobCell);
Optional<TableName> refTable = MobUtils.getTableName(c);
if (refTable.isPresent()) {
mobRefSet.get().put(refTable.get(), fName);
writer.append(c);
} else {
throw new IOException("MOB cell did not contain a tablename "
+ "tag. should not be possible. see ref guide on mob troubleshooting. "
+ "store=" + getStoreInfo() + " cell=" + c);
}
}
}
} else {
Expand Down Expand Up @@ -444,9 +490,15 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
if (MobUtils.hasValidMobRefCellValue(c)) {
// We do not check mobSizeThreshold during normal compaction,
// leaving it to a MOB compaction run
writer.append(c);
// Add MOB reference to a MOB reference set
mobRefSet.get().add(MobUtils.getMobFileName(c));
Optional<TableName> refTable = MobUtils.getTableName(c);
if (refTable.isPresent()) {
mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
writer.append(c);
} else {
throw new IOException("MOB cell did not contain a tablename "
+ "tag. should not be possible. see ref guide on mob troubleshooting. " + "store="
+ getStoreInfo() + " cell=" + c);
}
} else {
String errMsg = String.format("Corrupted MOB reference: %s", c.toString());
throw new IOException(errMsg);
Expand Down Expand Up @@ -525,7 +577,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
throughputController.finish(compactionName);
if (!finished && mobFileWriter != null) {
// Remove all MOB references because compaction failed
mobRefSet.get().clear();
clearThreadLocals();
// Abort writer
LOG.debug("Aborting writer for {} because of a compaction failure, Store {}",
mobFileWriter.getPath(), getStoreInfo());
Expand All @@ -543,16 +595,13 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
return true;
}

private String getStoreInfo() {
protected String getStoreInfo() {
return String.format("[table=%s family=%s region=%s]", store.getTableName().getNameAsString(),
store.getColumnFamilyName(), store.getRegionInfo().getEncodedName());
}

private void clearThreadLocals() {
Set<String> set = mobRefSet.get();
if (set != null) {
set.clear();
}
mobRefSet.get().clear();
HashMap<String, Long> map = mobLengthMap.get();
if (map != null) {
map.clear();
Expand All @@ -567,7 +616,7 @@ private StoreFileWriter newMobWriter(FileDetails fd, boolean major) throws IOExc
LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(),
getStoreInfo());
// Add reference we get for compact MOB
mobRefSet.get().add(mobFileWriter.getPath().getName());
mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
return mobFileWriter;
} catch (IOException e) {
// Bailing out
Expand Down Expand Up @@ -599,7 +648,7 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId
LOG.debug("Aborting writer for {} because there are no MOB cells, store={}",
mobFileWriter.getPath(), getStoreInfo());
// Remove MOB file from reference set
mobRefSet.get().remove(mobFileWriter.getPath().getName());
mobRefSet.get().remove(store.getTableName(), mobFileWriter.getPath().getName());
abortWriter(mobFileWriter);
}
} else {
Expand All @@ -612,9 +661,7 @@ protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
// Append MOB references
Set<String> refSet = mobRefSet.get();
writer.appendMobMetadata(refSet);
writer.appendMobMetadata(mobRefSet.get());
writer.close();
clearThreadLocals();
return newFiles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
Expand All @@ -47,6 +48,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;

/**
* An implementation of the StoreFlusher. It extends the DefaultStoreFlusher. If the store is not a
* mob store, the flusher flushes the MemStore the same with DefaultStoreFlusher, If the store is a
Expand Down Expand Up @@ -277,7 +280,8 @@ protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, Mon
// The hfile is current up to and including cacheFlushSeqNum.
status.setStatus("Flushing " + store + ": appending metadata");
writer.appendMetadata(cacheFlushSeqNum, false);
writer.appendMobMetadata(mobRefSet.get());
writer.appendMobMetadata(ImmutableSetMultimap.<TableName, String> builder()
.putAll(store.getTableName(), mobRefSet.get()).build());
status.setStatus("Flushing " + store + ": closing flushed file");
writer.close();
}
Expand Down
Loading