Skip to content

Commit 056e7f0

Browse files
committed
reduce the perf regression of vectorized parquet reader caused by datetime rebase
1 parent b7cabc8 commit 056e7f0

File tree

6 files changed

+191
-56
lines changed

6 files changed

+191
-56
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ object RebaseDateTime {
7171
-719164, -682945, -646420, -609895, -536845, -500320, -463795,
7272
-390745, -354220, -317695, -244645, -208120, -171595, -141427)
7373

74+
final val lastSwitchJulianDay: Int = julianGregDiffSwitchDay.last
75+
7476
// The first days of Common Era (CE) which is mapped to the '0001-01-01' date in Julian calendar.
7577
private final val julianCommonEraStartDay = julianGregDiffSwitchDay(0)
7678

@@ -416,6 +418,8 @@ object RebaseDateTime {
416418
// in the interval: [julianGregDiffSwitchMicros(i), julianGregDiffSwitchMicros(i+1))
417419
private val julianGregRebaseMap = loadRebaseRecords("julian-gregorian-rebase-micros.json")
418420

421+
final val lastSwitchJulianTs: Long = julianGregRebaseMap.values.map(_.switches.last).max
422+
419423
/**
420424
* An optimized version of [[rebaseJulianToGregorianMicros(ZoneId, Long)]]. This method leverages
421425
* the pre-calculated rebasing maps to save calculation. If the rebasing map doesn't contain

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -423,15 +423,8 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw
423423
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
424424
} else if (column.dataType() == DataTypes.DateType ) {
425425
if (rebaseDateTime) {
426-
for (int i = 0; i < num; i++) {
427-
if (defColumn.readInteger() == maxDefLevel) {
428-
column.putInt(
429-
rowId + i,
430-
RebaseDateTime.rebaseJulianToGregorianDays(dataColumn.readInteger()));
431-
} else {
432-
column.putNull(rowId + i);
433-
}
434-
}
426+
defColumn.readIntegersWithRebase(
427+
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
435428
} else {
436429
defColumn.readIntegers(
437430
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
@@ -449,15 +442,8 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro
449442
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
450443
} else if (originalType == OriginalType.TIMESTAMP_MICROS) {
451444
if (rebaseDateTime) {
452-
for (int i = 0; i < num; i++) {
453-
if (defColumn.readInteger() == maxDefLevel) {
454-
column.putLong(
455-
rowId + i,
456-
RebaseDateTime.rebaseJulianToGregorianMicros(dataColumn.readLong()));
457-
} else {
458-
column.putNull(rowId + i);
459-
}
460-
}
445+
defColumn.readLongsWithRebase(
446+
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
461447
} else {
462448
defColumn.readLongs(
463449
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.apache.parquet.bytes.ByteBufferInputStream;
2424
import org.apache.parquet.io.ParquetDecodingException;
25+
import org.apache.spark.sql.catalyst.util.RebaseDateTime;
2526
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
2627

2728
import org.apache.parquet.column.values.ValuesReader;
@@ -81,6 +82,33 @@ public final void readIntegers(int total, WritableColumnVector c, int rowId) {
8182
}
8283
}
8384

85+
// A fork of `readIntegers` to rebase the date values. For performance reasons, this method
86+
// iterates the values twice: check if we need to rebase first, then go to the optimized branch
87+
// if rebase is not needed.
88+
@Override
89+
public final void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) {
90+
int requiredBytes = total * 4;
91+
ByteBuffer buffer = getBuffer(requiredBytes);
92+
boolean rebase = false;
93+
for (int i = 0; i < total; i += 1) {
94+
rebase = buffer.getInt(buffer.position() + i * 4) < RebaseDateTime.lastSwitchJulianDay();
95+
}
96+
if (rebase) {
97+
for (int i = 0; i < total; i += 1) {
98+
c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt()));
99+
}
100+
} else {
101+
if (buffer.hasArray()) {
102+
int offset = buffer.arrayOffset() + buffer.position();
103+
c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
104+
} else {
105+
for (int i = 0; i < total; i += 1) {
106+
c.putInt(rowId + i, buffer.getInt());
107+
}
108+
}
109+
}
110+
}
111+
84112
@Override
85113
public final void readLongs(int total, WritableColumnVector c, int rowId) {
86114
int requiredBytes = total * 8;
@@ -96,6 +124,33 @@ public final void readLongs(int total, WritableColumnVector c, int rowId) {
96124
}
97125
}
98126

127+
// A fork of `readLongs` to rebase the timestamp values. For performance reasons, this method
128+
// iterates the values twice: check if we need to rebase first, then go to the optimized branch
129+
// if rebase is not needed.
130+
@Override
131+
public final void readLongsWithRebase(int total, WritableColumnVector c, int rowId) {
132+
int requiredBytes = total * 8;
133+
ByteBuffer buffer = getBuffer(requiredBytes);
134+
boolean rebase = false;
135+
for (int i = 0; i < total; i += 1) {
136+
rebase = buffer.getLong(buffer.position() + i * 8) < RebaseDateTime.lastSwitchJulianTs();
137+
}
138+
if (rebase) {
139+
for (int i = 0; i < total; i += 1) {
140+
c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong()));
141+
}
142+
} else {
143+
if (buffer.hasArray()) {
144+
int offset = buffer.arrayOffset() + buffer.position();
145+
c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
146+
} else {
147+
for (int i = 0; i < total; i += 1) {
148+
c.putLong(rowId + i, buffer.getLong());
149+
}
150+
}
151+
}
152+
}
153+
99154
@Override
100155
public final void readFloats(int total, WritableColumnVector c, int rowId) {
101156
int requiredBytes = total * 4;

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.parquet.io.ParquetDecodingException;
2727
import org.apache.parquet.io.api.Binary;
2828

29+
import org.apache.spark.sql.catalyst.util.RebaseDateTime;
2930
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
3031

3132
import java.io.IOException;
@@ -203,6 +204,43 @@ public void readIntegers(
203204
}
204205
}
205206

207+
// A fork of `readIntegers`, which rebases the date int value (days) before filling
208+
// the Spark column vector.
209+
public void readIntegersWithRebase(
210+
int total,
211+
WritableColumnVector c,
212+
int rowId,
213+
int level,
214+
VectorizedValuesReader data) throws IOException {
215+
int left = total;
216+
while (left > 0) {
217+
if (this.currentCount == 0) this.readNextGroup();
218+
int n = Math.min(left, this.currentCount);
219+
switch (mode) {
220+
case RLE:
221+
if (currentValue == level) {
222+
data.readIntegersWithRebase(n, c, rowId);
223+
} else {
224+
c.putNulls(rowId, n);
225+
}
226+
break;
227+
case PACKED:
228+
for (int i = 0; i < n; ++i) {
229+
if (currentBuffer[currentBufferIdx++] == level) {
230+
c.putInt(rowId + i,
231+
RebaseDateTime.rebaseJulianToGregorianDays(data.readInteger()));
232+
} else {
233+
c.putNull(rowId + i);
234+
}
235+
}
236+
break;
237+
}
238+
rowId += n;
239+
left -= n;
240+
currentCount -= n;
241+
}
242+
}
243+
206244
// TODO: can this code duplication be removed without a perf penalty?
207245
public void readBooleans(
208246
int total,
@@ -342,6 +380,43 @@ public void readLongs(
342380
}
343381
}
344382

383+
// A fork of `readLongs`, which rebases the timestamp long value (microseconds) before filling
384+
// the Spark column vector.
385+
public void readLongsWithRebase(
386+
int total,
387+
WritableColumnVector c,
388+
int rowId,
389+
int level,
390+
VectorizedValuesReader data) throws IOException {
391+
int left = total;
392+
while (left > 0) {
393+
if (this.currentCount == 0) this.readNextGroup();
394+
int n = Math.min(left, this.currentCount);
395+
switch (mode) {
396+
case RLE:
397+
if (currentValue == level) {
398+
data.readLongsWithRebase(n, c, rowId);
399+
} else {
400+
c.putNulls(rowId, n);
401+
}
402+
break;
403+
case PACKED:
404+
for (int i = 0; i < n; ++i) {
405+
if (currentBuffer[currentBufferIdx++] == level) {
406+
c.putLong(rowId + i,
407+
RebaseDateTime.rebaseJulianToGregorianMicros(data.readLong()));
408+
} else {
409+
c.putNull(rowId + i);
410+
}
411+
}
412+
break;
413+
}
414+
rowId += n;
415+
left -= n;
416+
currentCount -= n;
417+
}
418+
}
419+
345420
public void readFloats(
346421
int total,
347422
WritableColumnVector c,
@@ -508,6 +583,11 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) {
508583
}
509584
}
510585

586+
@Override
587+
public void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) {
588+
throw new UnsupportedOperationException("only readInts is valid.");
589+
}
590+
511591
@Override
512592
public byte readByte() {
513593
throw new UnsupportedOperationException("only readInts is valid.");
@@ -523,6 +603,11 @@ public void readLongs(int total, WritableColumnVector c, int rowId) {
523603
throw new UnsupportedOperationException("only readInts is valid.");
524604
}
525605

606+
@Override
607+
public void readLongsWithRebase(int total, WritableColumnVector c, int rowId) {
608+
throw new UnsupportedOperationException("only readInts is valid.");
609+
}
610+
526611
@Override
527612
public void readBinary(int total, WritableColumnVector c, int rowId) {
528613
throw new UnsupportedOperationException("only readInts is valid.");

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ public interface VectorizedValuesReader {
4040
void readBooleans(int total, WritableColumnVector c, int rowId);
4141
void readBytes(int total, WritableColumnVector c, int rowId);
4242
void readIntegers(int total, WritableColumnVector c, int rowId);
43+
void readIntegersWithRebase(int total, WritableColumnVector c, int rowId);
4344
void readLongs(int total, WritableColumnVector c, int rowId);
45+
void readLongsWithRebase(int total, WritableColumnVector c, int rowId);
4446
void readFloats(int total, WritableColumnVector c, int rowId);
4547
void readDoubles(int total, WritableColumnVector c, int rowId);
4648
void readBinary(int total, WritableColumnVector c, int rowId);

0 commit comments

Comments
 (0)