Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public UnsafeRow appendRow(Object kbase, long koff, int klen,

keyRowId = numRows;
keyRow.pointTo(base, recordOffset, klen);
valueRow.pointTo(base, recordOffset + klen, vlen + 4);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering why we did this before. Was it a mistake?

Copy link
Member Author

@kiszk kiszk Jul 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question.
@sameeragarwal had similar question one year ago. However, no response from @ooq

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall it being intentional.
See discussion here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ooq thank you for pointing out interesting discussion.
This discussion seems to make sense for page management. The question of @cloud-fan and me is whether valueRow uses only vlen. I think that +4 is for page management.

valueRow.pointTo(base, recordOffset + klen, vlen);
numRows++;
return valueRow;
}
Expand Down Expand Up @@ -95,7 +95,7 @@ protected UnsafeRow getValueFromKey(int rowId) {
getKeyRow(rowId);
}
assert(rowId >= 0);
valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen + 4);
valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen);
return valueRow;
}

Expand Down Expand Up @@ -131,7 +131,7 @@ public boolean next() {
}

key.pointTo(base, offsetInPage, klen);
value.pointTo(base, offsetInPage + klen, vlen + 4);
value.pointTo(base, offsetInPage + klen, vlen);

offsetInPage += recordLength;
recordsInPage -= 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public UnsafeRow() {}
*/
public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
assert numFields >= 0 : "numFields (" + numFields + ") should >= 0";
assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only need the assertion here, in pointTo, and in setTotalSize. Other places are just checking length for existing unsafe rows, which is unnecessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done.

this.baseObject = baseObject;
this.baseOffset = baseOffset;
this.sizeInBytes = sizeInBytes;
Expand All @@ -183,6 +184,7 @@ public void pointTo(byte[] buf, int sizeInBytes) {
}

public void setTotalSize(int sizeInBytes) {
assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
this.sizeInBytes = sizeInBytes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public UnsafeRow appendRow(Object kbase, long koff, int klen,

keyRowId = numRows;
keyRow.pointTo(base, recordOffset + 8, klen);
valueRow.pointTo(base, recordOffset + 8 + klen, vlen + 4);
valueRow.pointTo(base, recordOffset + 8 + klen, vlen);
numRows++;
return valueRow;
}
Expand Down Expand Up @@ -102,7 +102,7 @@ public UnsafeRow getValueFromKey(int rowId) {
long offset = keyRow.getBaseOffset();
int klen = keyRow.getSizeInBytes();
int vlen = Platform.getInt(base, offset - 8) - klen - 4;
valueRow.pointTo(base, offset + klen, vlen + 4);
valueRow.pointTo(base, offset + klen, vlen);
return valueRow;
}

Expand Down Expand Up @@ -146,7 +146,7 @@ public boolean next() {
currentvlen = totalLength - currentklen;

key.pointTo(base, offsetInPage + 8, currentklen);
value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen + 4);
value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen);

offsetInPage += 8 + totalLength + 8;
recordsInPage -= 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,10 @@ private static final class RowComparator extends RecordComparator {

@Override
public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) {
// TODO: Why are the sizes -1?
row1.pointTo(baseObj1, baseOff1, -1);
row2.pointTo(baseObj2, baseOff2, -1);
// Note that since ordering doesn't need the total length of the record, we just pass 0
// into the row.
row1.pointTo(baseObj1, baseOff1, 0);
row2.pointTo(baseObj2, baseOff2, 0);
return ordering.compare(row1, row2);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@ private static final class KVComparator extends RecordComparator {

@Override
public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) {
// Note that since ordering doesn't need the total length of the record, we just pass -1
// Note that since ordering doesn't need the total length of the record, we just pass 0
// into the row.
row1.pointTo(baseObj1, baseOff1 + 4, -1);
row2.pointTo(baseObj2, baseOff2 + 4, -1);
row1.pointTo(baseObj1, baseOff1 + 4, 0);
row2.pointTo(baseObj2, baseOff2 + 4, 0);
return ordering.compare(row1, row2);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
val valueRowBuffer = new Array[Byte](valueSize)
ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
val valueRow = new UnsafeRow(valueSchema.fields.length)
valueRow.pointTo(valueRowBuffer, valueSize)
// If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
// This is a workaround for the following:
// Prior to Spark 2.3 mistakenly append 4 bytes to the value row in
// `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data
valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8)
map.put(keyRow, valueRow)
}
}
Expand Down Expand Up @@ -427,7 +431,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
val valueRowBuffer = new Array[Byte](valueSize)
ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
val valueRow = new UnsafeRow(valueSchema.fields.length)
valueRow.pointTo(valueRowBuffer, valueSize)
// If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
// This is a workaround for the following:
// Prior to Spark 2.3 mistakenly append 4 bytes to the value row in
// `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data
valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8)
map.put(keyRow, valueRow)
}
}
Expand Down