Skip to content

Commit f79f63c

Browse files
committed
address comments
1 parent b9377fa commit f79f63c

File tree

3 files changed

+38
-15
lines changed

3 files changed

+38
-15
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929
* this class per writing program, so that the memory segment/data buffer can be reused. Note that
3030
* for each incoming record, we should call `reset` of BufferHolder instance before write the record
3131
* and reuse the data buffer.
32+
*
33+
* Generally we should call `UnsafeRow.setTotalSize` and pass in `BufferHolder.totalSize` to update
34+
* the size of the result row, after writing a record to the buffer. However, we can skip this step
35+
* if the fields of row are all fixed-length, as the size of result row is also fixed.
3236
*/
3337
public class BufferHolder {
3438
public byte[] buffer;

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,14 @@
2929
* A helper class to write data into global row buffer using `UnsafeRow` format.
3030
*
3131
* It will remember the offset of row buffer which it starts to write, and move the cursor of row
32-
* buffer while writing. If a new record comes, the cursor of row buffer will be reset, so we need
33-
* to also call `reset` of this class before writing, to update the `startingOffset` and clear out
34-
* null bits. Note that if we use it to write data into the result unsafe row, which means we will
35-
* always write from the very beginning of the global row buffer, we don't need to update
36-
* `startingOffset` and can just call `zeroOutNullBytes` before writing new record.
32+
* buffer while writing. If new data(can be the input record if this is the outermost writer, or
33+
* nested struct if this is an inner writer) comes, the starting cursor of row buffer may be
34+
* changed, so we need to call `UnsafeRowWriter.reset` before writing, to update the
35+
* `startingOffset` and clear out null bits.
36+
*
37+
* Note that if this is the outermost writer, which means we will always write from the very
38+
* beginning of the global row buffer, we don't need to update `startingOffset` and can just call
39+
* `zeroOutNullBytes` before writing new data.
3740
*/
3841
public class UnsafeRowWriter {
3942

@@ -43,6 +46,17 @@ public class UnsafeRowWriter {
4346
private final int nullBitsSize;
4447
private final int fixedSize;
4548

49+
public UnsafeRowWriter(BufferHolder holder, int numFields) {
50+
this.holder = holder;
51+
this.nullBitsSize = UnsafeRow.calculateBitSetWidthInBytes(numFields);
52+
this.fixedSize = nullBitsSize + 8 * numFields;
53+
this.startingOffset = holder.cursor;
54+
}
55+
56+
/**
57+
* Resets the `startingOffset` according to the current cursor of row buffer, and clear out null
58+
* bits. This should be called before we write a new nested struct to the row buffer.
59+
*/
4660
public void reset() {
4761
this.startingOffset = holder.cursor;
4862

@@ -53,19 +67,15 @@ public void reset() {
5367
zeroOutNullBytes();
5468
}
5569

70+
/**
71+
* Clears out null bits. This should be called before we write a new row to row buffer.
72+
*/
5673
public void zeroOutNullBytes() {
5774
for (int i = 0; i < nullBitsSize; i += 8) {
5875
Platform.putLong(holder.buffer, startingOffset + i, 0L);
5976
}
6077
}
6178

62-
public UnsafeRowWriter(BufferHolder holder, int numFields) {
63-
this.holder = holder;
64-
this.nullBitsSize = UnsafeRow.calculateBitSetWidthInBytes(numFields);
65-
this.fixedSize = nullBitsSize + 8 * numFields;
66-
this.startingOffset = holder.cursor;
67-
}
68-
6979
private void zeroOutPaddingBytes(int numBytes) {
7080
if ((numBytes & 0x07) > 0) {
7181
Platform.putLong(holder.buffer, holder.cursor + ((numBytes >> 3) << 3), 0L);

sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.types._
2424
import org.apache.spark.util.Benchmark
2525

2626
/**
27-
* Benchmark [[UnsafeProjection]] for flat schema(primitive-type fields).
27+
* Benchmark [[UnsafeProjection]] for fixed-length/primitive-type fields.
2828
*/
2929
object UnsafeProjectionBenchmark {
3030

@@ -86,7 +86,7 @@ object UnsafeProjectionBenchmark {
8686
val rows3 = generateRows(schema3, numRows)
8787
val projection3 = UnsafeProjection.create(attrs3, attrs3)
8888

89-
benchmark.addCase("primitive types") { _ =>
89+
benchmark.addCase("7 primitive types") { _ =>
9090
for (_ <- 1 to iters) {
9191
var sum = 0L
9292
var i = 0
@@ -110,7 +110,7 @@ object UnsafeProjectionBenchmark {
110110
val rows4 = generateRows(schema4, numRows)
111111
val projection4 = UnsafeProjection.create(attrs4, attrs4)
112112

113-
benchmark.addCase("nullable primitive types") { _ =>
113+
benchmark.addCase("7 nullable primitive types") { _ =>
114114
for (_ <- 1 to iters) {
115115
var sum = 0L
116116
var i = 0
@@ -122,6 +122,15 @@ object UnsafeProjectionBenchmark {
122122
}
123123

124124

125+
/*
126+
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
127+
unsafe projection: Avg Time(ms) Avg Rate(M/s) Relative Rate
128+
-------------------------------------------------------------------------------
129+
single long 1533.34 175.07 1.00 X
130+
single nullable long 2306.73 116.37 0.66 X
131+
primitive types 8403.93 31.94 0.18 X
132+
nullable primitive types 12448.39 21.56 0.12 X
133+
*/
125134
benchmark.run()
126135
}
127136
}

0 commit comments

Comments
 (0)