Skip to content

Commit 1d97332

Browse files
Davies Liurxin
authored andcommitted
[SPARK-11243][SQL] output UnsafeRow from columnar cache
This PR change InMemoryTableScan to output UnsafeRow, and optimize the unrolling and scanning by coping the bytes for var-length types between UnsafeRow and ByteBuffer directly without creating the wrapper objects. When scanning the decimals in TPC-DS store_sales table, it's 80% faster (copy it as long without create Decimal objects). Author: Davies Liu <[email protected]> Closes #9203 from davies/unsafe_cache.
1 parent 40a10d7 commit 1d97332

File tree

7 files changed

+291
-132
lines changed

7 files changed

+291
-132
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ public UTF8String getUTF8String(int ordinal) {
402402
if (isNullAt(ordinal)) return null;
403403
final long offsetAndSize = getLong(ordinal);
404404
final int offset = (int) (offsetAndSize >> 32);
405-
final int size = (int) (offsetAndSize & ((1L << 32) - 1));
405+
final int size = (int) offsetAndSize;
406406
return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
407407
}
408408

@@ -413,7 +413,7 @@ public byte[] getBinary(int ordinal) {
413413
} else {
414414
final long offsetAndSize = getLong(ordinal);
415415
final int offset = (int) (offsetAndSize >> 32);
416-
final int size = (int) (offsetAndSize & ((1L << 32) - 1));
416+
final int size = (int) offsetAndSize;
417417
final byte[] bytes = new byte[size];
418418
Platform.copyMemory(
419419
baseObject,
@@ -446,7 +446,7 @@ public UnsafeRow getStruct(int ordinal, int numFields) {
446446
} else {
447447
final long offsetAndSize = getLong(ordinal);
448448
final int offset = (int) (offsetAndSize >> 32);
449-
final int size = (int) (offsetAndSize & ((1L << 32) - 1));
449+
final int size = (int) offsetAndSize;
450450
final UnsafeRow row = new UnsafeRow();
451451
row.pointTo(baseObject, baseOffset + offset, numFields, size);
452452
return row;
@@ -460,7 +460,7 @@ public UnsafeArrayData getArray(int ordinal) {
460460
} else {
461461
final long offsetAndSize = getLong(ordinal);
462462
final int offset = (int) (offsetAndSize >> 32);
463-
final int size = (int) (offsetAndSize & ((1L << 32) - 1));
463+
final int size = (int) offsetAndSize;
464464
final UnsafeArrayData array = new UnsafeArrayData();
465465
array.pointTo(baseObject, baseOffset + offset, size);
466466
return array;
@@ -474,7 +474,7 @@ public UnsafeMapData getMap(int ordinal) {
474474
} else {
475475
final long offsetAndSize = getLong(ordinal);
476476
final int offset = (int) (offsetAndSize >> 32);
477-
final int size = (int) (offsetAndSize & ((1L << 32) - 1));
477+
final int size = (int) offsetAndSize;
478478
final UnsafeMapData map = new UnsafeMapData();
479479
map.pointTo(baseObject, baseOffset + offset, size);
480480
return map;
@@ -618,6 +618,27 @@ public void writeTo(ByteBuffer buffer) {
618618
buffer.position(pos + sizeInBytes);
619619
}
620620

621+
/**
622+
* Write the bytes of var-length field into ByteBuffer
623+
*
624+
* Note: only work with HeapByteBuffer
625+
*/
626+
public void writeFieldTo(int ordinal, ByteBuffer buffer) {
627+
final long offsetAndSize = getLong(ordinal);
628+
final int offset = (int) (offsetAndSize >> 32);
629+
final int size = (int) offsetAndSize;
630+
631+
buffer.putInt(size);
632+
int pos = buffer.position();
633+
buffer.position(pos + size);
634+
Platform.copyMemory(
635+
baseObject,
636+
baseOffset + offset,
637+
buffer.array(),
638+
Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + pos,
639+
size);
640+
}
641+
621642
@Override
622643
public void writeExternal(ObjectOutput out) throws IOException {
623644
byte[] bytes = getBytes();

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions.codegen;
1919

20-
import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
2120
import org.apache.spark.sql.types.Decimal;
2221
import org.apache.spark.unsafe.Platform;
2322
import org.apache.spark.unsafe.types.CalendarInterval;
@@ -64,29 +63,72 @@ public void setOffset(int ordinal) {
6463
Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset);
6564
}
6665

67-
public void writeCompactDecimal(int ordinal, Decimal input, int precision, int scale) {
68-
// make sure Decimal object has the same scale as DecimalType
69-
if (input.changePrecision(precision, scale)) {
70-
Platform.putLong(holder.buffer, holder.cursor, input.toUnscaledLong());
71-
setOffset(ordinal);
72-
holder.cursor += 8;
73-
} else {
74-
setNullAt(ordinal);
66+
public void write(int ordinal, boolean value) {
67+
Platform.putBoolean(holder.buffer, holder.cursor, value);
68+
setOffset(ordinal);
69+
holder.cursor += 1;
70+
}
71+
72+
public void write(int ordinal, byte value) {
73+
Platform.putByte(holder.buffer, holder.cursor, value);
74+
setOffset(ordinal);
75+
holder.cursor += 1;
76+
}
77+
78+
public void write(int ordinal, short value) {
79+
Platform.putShort(holder.buffer, holder.cursor, value);
80+
setOffset(ordinal);
81+
holder.cursor += 2;
82+
}
83+
84+
public void write(int ordinal, int value) {
85+
Platform.putInt(holder.buffer, holder.cursor, value);
86+
setOffset(ordinal);
87+
holder.cursor += 4;
88+
}
89+
90+
public void write(int ordinal, long value) {
91+
Platform.putLong(holder.buffer, holder.cursor, value);
92+
setOffset(ordinal);
93+
holder.cursor += 8;
94+
}
95+
96+
public void write(int ordinal, float value) {
97+
if (Float.isNaN(value)) {
98+
value = Float.NaN;
99+
}
100+
Platform.putFloat(holder.buffer, holder.cursor, value);
101+
setOffset(ordinal);
102+
holder.cursor += 4;
103+
}
104+
105+
public void write(int ordinal, double value) {
106+
if (Double.isNaN(value)) {
107+
value = Double.NaN;
75108
}
109+
Platform.putDouble(holder.buffer, holder.cursor, value);
110+
setOffset(ordinal);
111+
holder.cursor += 8;
76112
}
77113

78114
public void write(int ordinal, Decimal input, int precision, int scale) {
79115
// make sure Decimal object has the same scale as DecimalType
80116
if (input.changePrecision(precision, scale)) {
81-
final byte[] bytes = input.toJavaBigDecimal().unscaledValue().toByteArray();
82-
assert bytes.length <= 16;
83-
holder.grow(bytes.length);
84-
85-
// Write the bytes to the variable length portion.
86-
Platform.copyMemory(
87-
bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, bytes.length);
88-
setOffset(ordinal);
89-
holder.cursor += bytes.length;
117+
if (precision <= Decimal.MAX_LONG_DIGITS()) {
118+
Platform.putLong(holder.buffer, holder.cursor, input.toUnscaledLong());
119+
setOffset(ordinal);
120+
holder.cursor += 8;
121+
} else {
122+
final byte[] bytes = input.toJavaBigDecimal().unscaledValue().toByteArray();
123+
assert bytes.length <= 16;
124+
holder.grow(bytes.length);
125+
126+
// Write the bytes to the variable length portion.
127+
Platform.copyMemory(
128+
bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, bytes.length);
129+
setOffset(ordinal);
130+
holder.cursor += bytes.length;
131+
}
90132
} else {
91133
setNullAt(ordinal);
92134
}

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java

Lines changed: 72 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ private void zeroOutPaddingBytes(int numBytes) {
5858
}
5959
}
6060

61+
public boolean isNullAt(int ordinal) {
62+
return BitSetMethods.isSet(holder.buffer, startingOffset, ordinal);
63+
}
64+
6165
public void setNullAt(int ordinal) {
6266
BitSetMethods.set(holder.buffer, startingOffset, ordinal);
6367
Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L);
@@ -95,41 +99,75 @@ public void alignToWords(int numBytes) {
9599
}
96100
}
97101

98-
public void writeCompactDecimal(int ordinal, Decimal input, int precision, int scale) {
99-
// make sure Decimal object has the same scale as DecimalType
100-
if (input.changePrecision(precision, scale)) {
101-
Platform.putLong(holder.buffer, getFieldOffset(ordinal), input.toUnscaledLong());
102-
} else {
103-
setNullAt(ordinal);
104-
}
102+
public void write(int ordinal, boolean value) {
103+
Platform.putBoolean(holder.buffer, getFieldOffset(ordinal), value);
105104
}
106105

107-
public void write(int ordinal, Decimal input, int precision, int scale) {
108-
// grow the global buffer before writing data.
109-
holder.grow(16);
106+
public void write(int ordinal, byte value) {
107+
Platform.putByte(holder.buffer, getFieldOffset(ordinal), value);
108+
}
110109

111-
// zero-out the bytes
112-
Platform.putLong(holder.buffer, holder.cursor, 0L);
113-
Platform.putLong(holder.buffer, holder.cursor + 8, 0L);
110+
public void write(int ordinal, short value) {
111+
Platform.putShort(holder.buffer, getFieldOffset(ordinal), value);
112+
}
114113

115-
// Make sure Decimal object has the same scale as DecimalType.
116-
// Note that we may pass in null Decimal object to set null for it.
117-
if (input == null || !input.changePrecision(precision, scale)) {
118-
BitSetMethods.set(holder.buffer, startingOffset, ordinal);
119-
// keep the offset for future update
120-
setOffsetAndSize(ordinal, 0L);
121-
} else {
122-
final byte[] bytes = input.toJavaBigDecimal().unscaledValue().toByteArray();
123-
assert bytes.length <= 16;
114+
public void write(int ordinal, int value) {
115+
Platform.putInt(holder.buffer, getFieldOffset(ordinal), value);
116+
}
124117

125-
// Write the bytes to the variable length portion.
126-
Platform.copyMemory(
127-
bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, bytes.length);
128-
setOffsetAndSize(ordinal, bytes.length);
118+
public void write(int ordinal, long value) {
119+
Platform.putLong(holder.buffer, getFieldOffset(ordinal), value);
120+
}
121+
122+
public void write(int ordinal, float value) {
123+
if (Float.isNaN(value)) {
124+
value = Float.NaN;
129125
}
126+
Platform.putFloat(holder.buffer, getFieldOffset(ordinal), value);
127+
}
130128

131-
// move the cursor forward.
132-
holder.cursor += 16;
129+
public void write(int ordinal, double value) {
130+
if (Double.isNaN(value)) {
131+
value = Double.NaN;
132+
}
133+
Platform.putDouble(holder.buffer, getFieldOffset(ordinal), value);
134+
}
135+
136+
public void write(int ordinal, Decimal input, int precision, int scale) {
137+
if (precision <= Decimal.MAX_LONG_DIGITS()) {
138+
// make sure Decimal object has the same scale as DecimalType
139+
if (input.changePrecision(precision, scale)) {
140+
Platform.putLong(holder.buffer, getFieldOffset(ordinal), input.toUnscaledLong());
141+
} else {
142+
setNullAt(ordinal);
143+
}
144+
} else {
145+
// grow the global buffer before writing data.
146+
holder.grow(16);
147+
148+
// zero-out the bytes
149+
Platform.putLong(holder.buffer, holder.cursor, 0L);
150+
Platform.putLong(holder.buffer, holder.cursor + 8, 0L);
151+
152+
// Make sure Decimal object has the same scale as DecimalType.
153+
// Note that we may pass in null Decimal object to set null for it.
154+
if (input == null || !input.changePrecision(precision, scale)) {
155+
BitSetMethods.set(holder.buffer, startingOffset, ordinal);
156+
// keep the offset for future update
157+
setOffsetAndSize(ordinal, 0L);
158+
} else {
159+
final byte[] bytes = input.toJavaBigDecimal().unscaledValue().toByteArray();
160+
assert bytes.length <= 16;
161+
162+
// Write the bytes to the variable length portion.
163+
Platform.copyMemory(
164+
bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, bytes.length);
165+
setOffsetAndSize(ordinal, bytes.length);
166+
}
167+
168+
// move the cursor forward.
169+
holder.cursor += 16;
170+
}
133171
}
134172

135173
public void write(int ordinal, UTF8String input) {
@@ -151,7 +189,10 @@ public void write(int ordinal, UTF8String input) {
151189
}
152190

153191
public void write(int ordinal, byte[] input) {
154-
final int numBytes = input.length;
192+
write(ordinal, input, 0, input.length);
193+
}
194+
195+
public void write(int ordinal, byte[] input, int offset, int numBytes) {
155196
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes);
156197

157198
// grow the global buffer before writing data.
@@ -160,7 +201,8 @@ public void write(int ordinal, byte[] input) {
160201
zeroOutPaddingBytes(numBytes);
161202

162203
// Write the bytes to the variable length portion.
163-
Platform.copyMemory(input, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, numBytes);
204+
Platform.copyMemory(input, Platform.BYTE_ARRAY_OFFSET + offset,
205+
holder.buffer, holder.cursor, numBytes);
164206

165207
setOffsetAndSize(ordinal, numBytes);
166208

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala

Lines changed: 2 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
8989
val setNull = dt match {
9090
case t: DecimalType if t.precision > Decimal.MAX_LONG_DIGITS =>
9191
// Can't call setNullAt() for DecimalType with precision larger than 18.
92-
s"$rowWriter.write($index, null, ${t.precision}, ${t.scale});"
92+
s"$rowWriter.write($index, (Decimal) null, ${t.precision}, ${t.scale});"
9393
case _ => s"$rowWriter.setNullAt($index);"
9494
}
9595

@@ -124,17 +124,10 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
124124
"""
125125

126126
case _ if ctx.isPrimitiveType(dt) =>
127-
val fieldOffset = ctx.freshName("fieldOffset")
128127
s"""
129-
final long $fieldOffset = $rowWriter.getFieldOffset($index);
130-
Platform.putLong($bufferHolder.buffer, $fieldOffset, 0L);
131-
${writePrimitiveType(ctx, input.value, dt, s"$bufferHolder.buffer", fieldOffset)}
128+
$rowWriter.write($index, ${input.value});
132129
"""
133130

134-
case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS =>
135-
s"$rowWriter.writeCompactDecimal($index, ${input.value}, " +
136-
s"${t.precision}, ${t.scale});"
137-
138131
case t: DecimalType =>
139132
s"$rowWriter.write($index, ${input.value}, ${t.precision}, ${t.scale});"
140133

@@ -204,20 +197,6 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
204197
${writeMapToBuffer(ctx, element, kt, vt, bufferHolder)}
205198
"""
206199

207-
case _ if ctx.isPrimitiveType(et) =>
208-
// Should we do word align?
209-
val dataSize = et.defaultSize
210-
211-
s"""
212-
$arrayWriter.setOffset($index);
213-
${writePrimitiveType(ctx, element, et,
214-
s"$bufferHolder.buffer", s"$bufferHolder.cursor")}
215-
$bufferHolder.cursor += $dataSize;
216-
"""
217-
218-
case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS =>
219-
s"$arrayWriter.writeCompactDecimal($index, $element, ${t.precision}, ${t.scale});"
220-
221200
case t: DecimalType =>
222201
s"$arrayWriter.write($index, $element, ${t.precision}, ${t.scale});"
223202

@@ -296,38 +275,6 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
296275
"""
297276
}
298277

299-
private def writePrimitiveType(
300-
ctx: CodeGenContext,
301-
input: String,
302-
dt: DataType,
303-
buffer: String,
304-
offset: String) = {
305-
assert(ctx.isPrimitiveType(dt))
306-
307-
val putMethod = s"put${ctx.primitiveTypeName(dt)}"
308-
309-
dt match {
310-
case FloatType | DoubleType =>
311-
val normalized = ctx.freshName("normalized")
312-
val boxedType = ctx.boxedType(dt)
313-
val handleNaN =
314-
s"""
315-
final ${ctx.javaType(dt)} $normalized;
316-
if ($boxedType.isNaN($input)) {
317-
$normalized = $boxedType.NaN;
318-
} else {
319-
$normalized = $input;
320-
}
321-
"""
322-
323-
s"""
324-
$handleNaN
325-
Platform.$putMethod($buffer, $offset, $normalized);
326-
"""
327-
case _ => s"Platform.$putMethod($buffer, $offset, $input);"
328-
}
329-
}
330-
331278
def createCode(ctx: CodeGenContext, expressions: Seq[Expression]): GeneratedExpressionCode = {
332279
val exprEvals = expressions.map(e => e.gen(ctx))
333280
val exprTypes = expressions.map(_.dataType)

0 commit comments

Comments
 (0)