Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final class UnsafeFixedWidthAggregationMap {
* An empty aggregation buffer, encoded in UnsafeRow format. When inserting a new key into the
* map, we copy this buffer and use it as the value.
*/
private final long[] emptyAggregationBuffer;
private final byte[] emptyAggregationBuffer;

private final StructType aggregationBufferSchema;

Expand All @@ -63,10 +63,10 @@ public final class UnsafeFixedWidthAggregationMap {
/**
* Scratch space that is used when encoding grouping keys into UnsafeRow format.
*
* By default, this is a 1MB array, but it will grow as necessary in case larger keys are
* By default, this is a 8 kb array, but it will grow as necessary in case larger keys are
* encountered.
*/
private long[] groupingKeyConversionScratchSpace = new long[1024 / 8];
private byte[] groupingKeyConversionScratchSpace = new byte[1024 * 8];

private final boolean enablePerfMetrics;

Expand Down Expand Up @@ -123,13 +123,13 @@ public UnsafeFixedWidthAggregationMap(
}

/**
* Convert a Java object row into an UnsafeRow, allocating it into a new long array.
* Convert a Java object row into an UnsafeRow, allocating it into a new byte array.
*/
private static long[] convertToUnsafeRow(InternalRow javaRow, StructType schema) {
private static byte[] convertToUnsafeRow(InternalRow javaRow, StructType schema) {
final UnsafeRowConverter converter = new UnsafeRowConverter(schema);
final long[] unsafeRow = new long[converter.getSizeRequirement(javaRow)];
final long writtenLength =
converter.writeRow(javaRow, unsafeRow, PlatformDependent.LONG_ARRAY_OFFSET);
final byte[] unsafeRow = new byte[converter.getSizeRequirement(javaRow)];
final int writtenLength =
converter.writeRow(javaRow, unsafeRow, PlatformDependent.BYTE_ARRAY_OFFSET);
assert (writtenLength == unsafeRow.length): "Size requirement calculation was wrong!";
return unsafeRow;
}
Expand All @@ -143,34 +143,34 @@ public UnsafeRow getAggregationBuffer(InternalRow groupingKey) {
// Make sure that the buffer is large enough to hold the key. If it's not, grow it:
if (groupingKeySize > groupingKeyConversionScratchSpace.length) {
// This new array will be initially zero, so there's no need to zero it out here
groupingKeyConversionScratchSpace = new long[groupingKeySize];
groupingKeyConversionScratchSpace = new byte[groupingKeySize];
} else {
// Zero out the buffer that's used to hold the current row. This is necessary in order
// to ensure that rows hash properly, since garbage data from the previous row could
// otherwise end up as padding in this row. As a performance optimization, we only zero out
// the portion of the buffer that we'll actually write to.
Arrays.fill(groupingKeyConversionScratchSpace, 0, groupingKeySize, 0);
Arrays.fill(groupingKeyConversionScratchSpace, 0, groupingKeySize, (byte) 0);
}
final long actualGroupingKeySize = groupingKeyToUnsafeRowConverter.writeRow(
final int actualGroupingKeySize = groupingKeyToUnsafeRowConverter.writeRow(
groupingKey,
groupingKeyConversionScratchSpace,
PlatformDependent.LONG_ARRAY_OFFSET);
PlatformDependent.BYTE_ARRAY_OFFSET);
assert (groupingKeySize == actualGroupingKeySize) : "Size requirement calculation was wrong!";

// Probe our map using the serialized key
final BytesToBytesMap.Location loc = map.lookup(
groupingKeyConversionScratchSpace,
PlatformDependent.LONG_ARRAY_OFFSET,
PlatformDependent.BYTE_ARRAY_OFFSET,
groupingKeySize);
if (!loc.isDefined()) {
// This is the first time that we've seen this grouping key, so we'll insert a copy of the
// empty aggregation buffer into the map:
loc.putNewKey(
groupingKeyConversionScratchSpace,
PlatformDependent.LONG_ARRAY_OFFSET,
PlatformDependent.BYTE_ARRAY_OFFSET,
groupingKeySize,
emptyAggregationBuffer,
PlatformDependent.LONG_ARRAY_OFFSET,
PlatformDependent.BYTE_ARRAY_OFFSET,
emptyAggregationBuffer.length
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class UnsafeRowConverter(fieldTypes: Array[DataType]) {
* @param baseOffset the base offset of the destination address
* @return the number of bytes written. This should be equal to `getSizeRequirement(row)`.
*/
def writeRow(row: InternalRow, baseObject: Object, baseOffset: Long): Long = {
def writeRow(row: InternalRow, baseObject: Object, baseOffset: Long): Int = {
unsafeRow.pointTo(baseObject, baseOffset, writers.length, null)
var fieldNumber = 0
var appendCursor: Int = fixedLengthSize
Expand Down