Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 107 additions & 60 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Function;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
Expand All @@ -46,6 +48,8 @@
@InterfaceAudience.Private
public class KeyValueUtil {

private static final Logger LOG = LoggerFactory.getLogger(KeyValueUtil.class);

/**************** length *********************/

public static int length(short rlen, byte flen, int qlen, int vlen, int tlen, boolean withTags) {
Expand Down Expand Up @@ -510,97 +514,124 @@ public static long write(final KeyValue kv, final DataOutput out) throws IOExcep

static String bytesToHex(byte[] buf, int offset, int length) {
String bufferContents = buf != null ? Bytes.toStringBinary(buf, offset, length) : "<null>";
return ", KeyValueBytesHex=" + bufferContents + ", offset=" + offset
+ ", length=" + length;
return ", KeyValueBytesHex=" + bufferContents + ", offset=" + offset + ", length=" + length;
}

static void checkKeyValueBytes(byte[] buf, int offset, int length, boolean withTags) {
if (buf == null) {
throw new IllegalArgumentException("Invalid to have null " +
"byte array in KeyValue.");
String msg = "Invalid to have null byte array in KeyValue.";
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}

int pos = offset, endOffset = offset + length;
// check the key
if (pos + Bytes.SIZEOF_INT > endOffset) {
throw new IllegalArgumentException(
"Overflow when reading key length at position=" + pos + bytesToHex(buf, offset, length));
String msg =
"Overflow when reading key length at position=" + pos + bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
int keyLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
pos += Bytes.SIZEOF_INT;
if (keyLen <= 0 || pos + keyLen > endOffset) {
throw new IllegalArgumentException(
"Invalid key length in KeyValue. keyLength=" + keyLen + bytesToHex(buf, offset, length));
String msg =
"Invalid key length in KeyValue. keyLength=" + keyLen + bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
// check the value
if (pos + Bytes.SIZEOF_INT > endOffset) {
throw new IllegalArgumentException("Overflow when reading value length at position=" + pos
+ bytesToHex(buf, offset, length));
String msg =
"Overflow when reading value length at position=" + pos + bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
int valLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
pos += Bytes.SIZEOF_INT;
if (valLen < 0 || pos + valLen > endOffset) {
throw new IllegalArgumentException("Invalid value length in KeyValue, valueLength=" + valLen
+ bytesToHex(buf, offset, length));
String msg = "Invalid value length in KeyValue, valueLength=" + valLen +
bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
// check the row
if (pos + Bytes.SIZEOF_SHORT > endOffset) {
throw new IllegalArgumentException(
"Overflow when reading row length at position=" + pos + bytesToHex(buf, offset, length));
String msg =
"Overflow when reading row length at position=" + pos + bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
short rowLen = Bytes.toShort(buf, pos, Bytes.SIZEOF_SHORT);
pos += Bytes.SIZEOF_SHORT;
if (rowLen < 0 || pos + rowLen > endOffset) {
throw new IllegalArgumentException(
"Invalid row length in KeyValue, rowLength=" + rowLen + bytesToHex(buf, offset, length));
String msg =
"Invalid row length in KeyValue, rowLength=" + rowLen + bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
pos += rowLen;
// check the family
if (pos + Bytes.SIZEOF_BYTE > endOffset) {
throw new IllegalArgumentException("Overflow when reading family length at position=" + pos
+ bytesToHex(buf, offset, length));
String msg = "Overflow when reading family length at position=" + pos +
bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
int familyLen = buf[pos];
pos += Bytes.SIZEOF_BYTE;
if (familyLen < 0 || pos + familyLen > endOffset) {
throw new IllegalArgumentException("Invalid family length in KeyValue, familyLength="
+ familyLen + bytesToHex(buf, offset, length));
String msg = "Invalid family length in KeyValue, familyLength=" + familyLen +
bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
pos += familyLen;
// check the qualifier
int qualifierLen = keyLen - Bytes.SIZEOF_SHORT - rowLen - Bytes.SIZEOF_BYTE - familyLen
- Bytes.SIZEOF_LONG - Bytes.SIZEOF_BYTE;
if (qualifierLen < 0 || pos + qualifierLen > endOffset) {
throw new IllegalArgumentException("Invalid qualifier length in KeyValue, qualifierLen="
+ qualifierLen + bytesToHex(buf, offset, length));
String msg = "Invalid qualifier length in KeyValue, qualifierLen=" + qualifierLen +
bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
pos += qualifierLen;
// check the timestamp
if (pos + Bytes.SIZEOF_LONG > endOffset) {
throw new IllegalArgumentException(
"Overflow when reading timestamp at position=" + pos + bytesToHex(buf, offset, length));
String msg =
"Overflow when reading timestamp at position=" + pos + bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
long timestamp = Bytes.toLong(buf, pos, Bytes.SIZEOF_LONG);
if (timestamp < 0) {
throw new IllegalArgumentException(
"Timestamp cannot be negative, ts=" + timestamp + bytesToHex(buf, offset, length));
String msg =
"Timestamp cannot be negative, ts=" + timestamp + bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
pos += Bytes.SIZEOF_LONG;
// check the type
if (pos + Bytes.SIZEOF_BYTE > endOffset) {
throw new IllegalArgumentException(
"Overflow when reading type at position=" + pos + bytesToHex(buf, offset, length));
String msg =
"Overflow when reading type at position=" + pos + bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
byte type = buf[pos];
if (!Type.isValidType(type)) {
throw new IllegalArgumentException(
"Invalid type in KeyValue, type=" + type + bytesToHex(buf, offset, length));
String msg = "Invalid type in KeyValue, type=" + type + bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
pos += Bytes.SIZEOF_BYTE;
// check the value
if (pos + valLen > endOffset) {
throw new IllegalArgumentException(
"Overflow when reading value part at position=" + pos + bytesToHex(buf, offset, length));
String msg =
"Overflow when reading value part at position=" + pos + bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
pos += valLen;
// check the tags
Expand All @@ -609,39 +640,55 @@ static void checkKeyValueBytes(byte[] buf, int offset, int length, boolean withT
// withTags is true but no tag in the cell.
return;
}
if (pos + Bytes.SIZEOF_SHORT > endOffset) {
throw new IllegalArgumentException("Overflow when reading tags length at position=" + pos
+ bytesToHex(buf, offset, length));
}
short tagsLen = Bytes.toShort(buf, pos);
pos += Bytes.SIZEOF_SHORT;
if (tagsLen < 0 || pos + tagsLen > endOffset) {
throw new IllegalArgumentException("Invalid tags length in KeyValue at position="
+ (pos - Bytes.SIZEOF_SHORT) + bytesToHex(buf, offset, length));
}
int tagsEndOffset = pos + tagsLen;
for (; pos < tagsEndOffset;) {
if (pos + Tag.TAG_LENGTH_SIZE > endOffset) {
throw new IllegalArgumentException("Overflow when reading tag length at position=" + pos
+ bytesToHex(buf, offset, length));
}
short tagLen = Bytes.toShort(buf, pos);
pos += Tag.TAG_LENGTH_SIZE;
// tagLen contains one byte tag type, so must be not less than 1.
if (tagLen < 1 || pos + tagLen > endOffset) {
throw new IllegalArgumentException(
"Invalid tag length at position=" + (pos - Tag.TAG_LENGTH_SIZE) + ", tagLength="
+ tagLen + bytesToHex(buf, offset, length));
}
pos += tagLen;
}
pos = checkKeyValueTagBytes(buf, offset, length, pos, endOffset);
}
if (pos != endOffset) {
throw new IllegalArgumentException("Some redundant bytes in KeyValue's buffer, startOffset="
+ pos + ", endOffset=" + endOffset + bytesToHex(buf, offset, length));
String msg = "Some redundant bytes in KeyValue's buffer, startOffset=" + pos + ", endOffset="
+ endOffset + bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
}

private static int checkKeyValueTagBytes(byte[] buf, int offset, int length, int pos,
int endOffset) {
if (pos + Bytes.SIZEOF_SHORT > endOffset) {
String msg = "Overflow when reading tags length at position=" + pos +
bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
short tagsLen = Bytes.toShort(buf, pos);
pos += Bytes.SIZEOF_SHORT;
if (tagsLen < 0 || pos + tagsLen > endOffset) {
String msg = "Invalid tags length in KeyValue at position=" + (pos - Bytes.SIZEOF_SHORT)
+ bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
int tagsEndOffset = pos + tagsLen;
for (; pos < tagsEndOffset;) {
if (pos + Tag.TAG_LENGTH_SIZE > endOffset) {
String msg = "Overflow when reading tag length at position=" + pos +
bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
short tagLen = Bytes.toShort(buf, pos);
pos += Tag.TAG_LENGTH_SIZE;
// tagLen contains one byte tag type, so must be not less than 1.
if (tagLen < 1 || pos + tagLen > endOffset) {
String msg =
"Invalid tag length at position=" + (pos - Tag.TAG_LENGTH_SIZE) + ", tagLength="
+ tagLen + bytesToHex(buf, offset, length);
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
pos += tagLen;
}
return pos;
}

/**
* Create a KeyValue reading from the raw InputStream. Named
* <code>createKeyValueFromInputStream</code> so doesn't clash with {@link #create(DataInput)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,7 @@ protected boolean readNext(Entry entry) throws IOException {
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
long originalPosition = this.inputStream.getPos();
if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
if (LOG.isTraceEnabled()) {
LOG.trace("Reached end of expected edits area at offset " + originalPosition);
}
LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
return false;
}
WALKey.Builder builder = WALKey.newBuilder();
Expand Down Expand Up @@ -373,10 +371,8 @@ protected boolean readNext(Entry entry) throws IOException {
WALKey walKey = builder.build();
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
if (LOG.isTraceEnabled()) {
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" +
this.inputStream.getPos());
}
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
this.inputStream.getPos());
seekOnFs(originalPosition);
return false;
}
Expand All @@ -393,9 +389,7 @@ protected boolean readNext(Entry entry) throws IOException {
try {
posAfterStr = this.inputStream.getPos() + "";
} catch (Throwable t) {
if (LOG.isTraceEnabled()) {
LOG.trace("Error getting pos for error message - ignoring", t);
}
LOG.trace("Error getting pos for error message - ignoring", t);
}
String message = " while reading " + expectedCells + " WAL KVs; started reading at "
+ posBefore + " and read up to " + posAfterStr;
Expand All @@ -412,27 +406,21 @@ protected boolean readNext(Entry entry) throws IOException {
} catch (EOFException eof) {
// If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
if (originalPosition < 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("Encountered a malformed edit, but can't seek back to last good position "
+ "because originalPosition is negative. last offset="
+ this.inputStream.getPos(), eof);
}
LOG.warn("Encountered a malformed edit, but can't seek back to last good position "
+ "because originalPosition is negative. last offset={}",
this.inputStream.getPos(), eof);
throw eof;
}
// If stuck at the same place and we got and exception, lets go back at the beginning.
if (inputStream.getPos() == originalPosition && resetPosition) {
if (LOG.isTraceEnabled()) {
LOG.trace("Encountered a malformed edit, seeking to the beginning of the WAL since "
+ "current position and original position match at " + originalPosition);
}
LOG.warn("Encountered a malformed edit, seeking to the beginning of the WAL since "
+ "current position and original position match at {}", originalPosition);
seekOnFs(0);
} else {
// Else restore our position to original location in hope that next time through we will
// read successfully.
if (LOG.isTraceEnabled()) {
LOG.trace("Encountered a malformed edit, seeking back to last good position in file, "
+ "from " + inputStream.getPos()+" to " + originalPosition, eof);
}
LOG.warn("Encountered a malformed edit, seeking back to last good position in file, "
+ "from {} to {}", inputStream.getPos(), originalPosition, eof);
seekOnFs(originalPosition);
}
return false;
Expand Down