Skip to content

Commit 4e787dc

Browse files
parthchandrasunchao
andcommitted
[SPARK-37974][SQL] Implement vectorized DELTA_BYTE_ARRAY and DELTA_LENGTH_BYTE_ARRAY encodings for Parquet V2 support
### What changes were proposed in this pull request? This PR provides a vectorized implementation of the DELTA_BYTE_ARRAY encoding of Parquet V2. The PR also implements the DELTA_LENGTH_BYTE_ARRAY encoding which is needed by the former. ### Why are the changes needed? The current support for Parquet V2 in the vectorized reader uses a non-vectorized version of the above encoding and needs to be vectorized. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Reproduces all the tests for the encodings from the Parquet implementation. Also adds more cases to the Parquet Encoding test suite. Closes #35262 from parthchandra/SPARK-36879-PR3. Lead-authored-by: Parth Chandra <[email protected]> Co-authored-by: Chao Sun <[email protected]> Signed-off-by: Chao Sun <[email protected]>
1 parent c2ed15d commit 4e787dc

16 files changed

+1250
-717
lines changed

sql/core/benchmarks/DataSourceReadBenchmark-jdk11-results.txt

Lines changed: 212 additions & 212 deletions
Large diffs are not rendered by default.

sql/core/benchmarks/DataSourceReadBenchmark-jdk17-results.txt

Lines changed: 235 additions & 235 deletions
Large diffs are not rendered by default.

sql/core/benchmarks/DataSourceReadBenchmark-results.txt

Lines changed: 212 additions & 212 deletions
Large diffs are not rendered by default.

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.Set;
3030

3131
import com.google.common.annotations.VisibleForTesting;
32+
import org.apache.parquet.VersionParser;
33+
import org.apache.parquet.VersionParser.ParsedVersion;
3234
import org.apache.parquet.column.page.PageReadStore;
3335
import scala.Option;
3436

@@ -69,6 +71,9 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
6971
protected MessageType fileSchema;
7072
protected MessageType requestedSchema;
7173
protected StructType sparkSchema;
74+
// Keep track of the version of the parquet writer. An older version wrote
75+
// corrupt delta byte arrays, and the version check is needed to detect that.
76+
protected ParsedVersion writerVersion;
7277

7378
/**
7479
* The total number of rows this RecordReader will eventually read. The sum of the
@@ -93,6 +98,12 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
9398
HadoopInputFile.fromPath(file, configuration), options);
9499
this.reader = new ParquetRowGroupReaderImpl(fileReader);
95100
this.fileSchema = fileReader.getFileMetaData().getSchema();
101+
try {
102+
this.writerVersion = VersionParser.parse(fileReader.getFileMetaData().getCreatedBy());
103+
} catch (Exception e) {
104+
// Swallow any exception, if we cannot parse the version we will revert to a sequential read
105+
// if the column is a delta byte array encoding (due to PARQUET-246).
106+
}
96107
Map<String, String> fileMetadata = fileReader.getFileMetaData().getKeyValueMetaData();
97108
ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));
98109
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
import java.time.ZoneId;
2222
import java.util.PrimitiveIterator;
2323

24+
import org.apache.parquet.CorruptDeltaByteArrays;
25+
import org.apache.parquet.VersionParser.ParsedVersion;
2426
import org.apache.parquet.bytes.ByteBufferInputStream;
2527
import org.apache.parquet.bytes.BytesInput;
2628
import org.apache.parquet.bytes.BytesUtils;
2729
import org.apache.parquet.column.ColumnDescriptor;
2830
import org.apache.parquet.column.Dictionary;
2931
import org.apache.parquet.column.Encoding;
3032
import org.apache.parquet.column.page.*;
33+
import org.apache.parquet.column.values.RequiresPreviousReader;
3134
import org.apache.parquet.column.values.ValuesReader;
3235
import org.apache.parquet.schema.LogicalTypeAnnotation;
3336
import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
@@ -86,6 +89,7 @@ public class VectorizedColumnReader {
8689
private final ColumnDescriptor descriptor;
8790
private final LogicalTypeAnnotation logicalTypeAnnotation;
8891
private final String datetimeRebaseMode;
92+
private final ParsedVersion writerVersion;
8993

9094
public VectorizedColumnReader(
9195
ColumnDescriptor descriptor,
@@ -96,7 +100,8 @@ public VectorizedColumnReader(
96100
String datetimeRebaseMode,
97101
String datetimeRebaseTz,
98102
String int96RebaseMode,
99-
String int96RebaseTz) throws IOException {
103+
String int96RebaseTz,
104+
ParsedVersion writerVersion) throws IOException {
100105
this.descriptor = descriptor;
101106
this.pageReader = pageReader;
102107
this.readState = new ParquetReadState(descriptor.getMaxDefinitionLevel(), rowIndexes);
@@ -129,6 +134,7 @@ public VectorizedColumnReader(
129134
this.datetimeRebaseMode = datetimeRebaseMode;
130135
assert "LEGACY".equals(int96RebaseMode) || "EXCEPTION".equals(int96RebaseMode) ||
131136
"CORRECTED".equals(int96RebaseMode);
137+
this.writerVersion = writerVersion;
132138
}
133139

134140
private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName) {
@@ -259,6 +265,7 @@ private void initDataReader(
259265
int pageValueCount,
260266
Encoding dataEncoding,
261267
ByteBufferInputStream in) throws IOException {
268+
ValuesReader previousReader = this.dataColumn;
262269
if (dataEncoding.usesDictionary()) {
263270
this.dataColumn = null;
264271
if (dictionary == null) {
@@ -283,6 +290,12 @@ private void initDataReader(
283290
} catch (IOException e) {
284291
throw new IOException("could not read page in col " + descriptor, e);
285292
}
293+
// for PARQUET-246 (See VectorizedDeltaByteArrayReader.setPreviousValues)
294+
if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
295+
previousReader instanceof RequiresPreviousReader) {
296+
// previousReader can only be set if reading sequentially
297+
((RequiresPreviousReader) dataColumn).setPreviousReader(previousReader);
298+
}
286299
}
287300

288301
private ValuesReader getValuesReader(Encoding encoding) {

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,19 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce
9090
Preconditions.checkArgument(miniSize % 8 == 0,
9191
"miniBlockSize must be multiple of 8, but it's " + miniSize);
9292
this.miniBlockSizeInValues = (int) miniSize;
93+
// True value count. May be less than valueCount because of nulls
9394
this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
9495
this.bitWidths = new int[miniBlockNumInABlock];
9596
this.unpackedValuesBuffer = new long[miniBlockSizeInValues];
9697
// read the first value
9798
firstValue = BytesUtils.readZigZagVarLong(in);
9899
}
99100

101+
// True value count. May be less than valueCount because of nulls
102+
int getTotalValueCount() {
103+
return totalValueCount;
104+
}
105+
100106
@Override
101107
public byte readByte() {
102108
readValues(1, null, 0, (w, r, v) -> byteVal = (byte) v);

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java

Lines changed: 95 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,50 +16,127 @@
1616
*/
1717
package org.apache.spark.sql.execution.datasources.parquet;
1818

19+
import static org.apache.spark.sql.types.DataTypes.BinaryType;
20+
import static org.apache.spark.sql.types.DataTypes.IntegerType;
21+
1922
import org.apache.parquet.bytes.ByteBufferInputStream;
20-
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
23+
import org.apache.parquet.column.values.RequiresPreviousReader;
24+
import org.apache.parquet.column.values.ValuesReader;
2125
import org.apache.parquet.io.api.Binary;
26+
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
2227
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
2328

2429
import java.io.IOException;
2530
import java.nio.ByteBuffer;
2631

2732
/**
28-
* An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized interface.
33+
* An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized
34+
* interface.
2935
*/
30-
public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase {
31-
private final DeltaByteArrayReader deltaByteArrayReader = new DeltaByteArrayReader();
36+
public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase
37+
implements VectorizedValuesReader, RequiresPreviousReader {
38+
39+
private final VectorizedDeltaBinaryPackedReader prefixLengthReader;
40+
private final VectorizedDeltaLengthByteArrayReader suffixReader;
41+
private WritableColumnVector prefixLengthVector;
42+
private ByteBuffer previous;
43+
private int currentRow = 0;
44+
45+
// Temporary variable used by readBinary
46+
private final WritableColumnVector binaryValVector;
47+
// Temporary variable used by skipBinary
48+
private final WritableColumnVector tempBinaryValVector;
49+
50+
VectorizedDeltaByteArrayReader() {
51+
this.prefixLengthReader = new VectorizedDeltaBinaryPackedReader();
52+
this.suffixReader = new VectorizedDeltaLengthByteArrayReader();
53+
binaryValVector = new OnHeapColumnVector(1, BinaryType);
54+
tempBinaryValVector = new OnHeapColumnVector(1, BinaryType);
55+
}
3256

3357
@Override
3458
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
35-
deltaByteArrayReader.initFromPage(valueCount, in);
59+
prefixLengthVector = new OnHeapColumnVector(valueCount, IntegerType);
60+
prefixLengthReader.initFromPage(valueCount, in);
61+
prefixLengthReader.readIntegers(prefixLengthReader.getTotalValueCount(),
62+
prefixLengthVector, 0);
63+
suffixReader.initFromPage(valueCount, in);
3664
}
3765

3866
@Override
3967
public Binary readBinary(int len) {
40-
return deltaByteArrayReader.readBytes();
68+
readValues(1, binaryValVector, 0);
69+
return Binary.fromConstantByteArray(binaryValVector.getBinary(0));
4170
}
4271

43-
@Override
44-
public void readBinary(int total, WritableColumnVector c, int rowId) {
72+
private void readValues(int total, WritableColumnVector c, int rowId) {
4573
for (int i = 0; i < total; i++) {
46-
Binary binary = deltaByteArrayReader.readBytes();
47-
ByteBuffer buffer = binary.toByteBuffer();
48-
if (buffer.hasArray()) {
49-
c.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + buffer.position(),
50-
binary.length());
51-
} else {
52-
byte[] bytes = new byte[binary.length()];
53-
buffer.get(bytes);
54-
c.putByteArray(rowId + i, bytes);
74+
// NOTE: due to PARQUET-246, it is important that we
75+
// respect prefixLength which was read from prefixLengthReader,
76+
// even for the *first* value of a page. Even though the first
77+
// value of the page should have an empty prefix, it may not
78+
// because of PARQUET-246.
79+
int prefixLength = prefixLengthVector.getInt(currentRow);
80+
ByteBuffer suffix = suffixReader.getBytes(currentRow);
81+
byte[] suffixArray = suffix.array();
82+
int suffixLength = suffix.limit() - suffix.position();
83+
int length = prefixLength + suffixLength;
84+
85+
// We have to do this to materialize the output
86+
WritableColumnVector arrayData = c.arrayData();
87+
int offset = arrayData.getElementsAppended();
88+
if (prefixLength != 0) {
89+
arrayData.appendBytes(prefixLength, previous.array(), previous.position());
5590
}
91+
arrayData.appendBytes(suffixLength, suffixArray, suffix.position());
92+
c.putArray(rowId + i, offset, length);
93+
previous = arrayData.getByteBuffer(offset, length);
94+
currentRow++;
95+
}
96+
}
97+
98+
@Override
99+
public void readBinary(int total, WritableColumnVector c, int rowId) {
100+
readValues(total, c, rowId);
101+
}
102+
103+
/**
104+
* There was a bug (PARQUET-246) in which DeltaByteArrayWriter's reset() method did not clear the
105+
* previous value state that it tracks internally. This resulted in the first value of all pages
106+
* (except for the first page) to be a delta from the last value of the previous page. In order to
107+
* read corrupted files written with this bug, when reading a new page we need to recover the
108+
* previous page's last value to use it (if needed) to read the first value.
109+
*/
110+
public void setPreviousReader(ValuesReader reader) {
111+
if (reader != null) {
112+
this.previous = ((VectorizedDeltaByteArrayReader) reader).previous;
56113
}
57114
}
58115

59116
@Override
60117
public void skipBinary(int total) {
118+
WritableColumnVector c1 = tempBinaryValVector;
119+
WritableColumnVector c2 = binaryValVector;
120+
61121
for (int i = 0; i < total; i++) {
62-
deltaByteArrayReader.skip();
122+
int prefixLength = prefixLengthVector.getInt(currentRow);
123+
ByteBuffer suffix = suffixReader.getBytes(currentRow);
124+
byte[] suffixArray = suffix.array();
125+
int suffixLength = suffix.limit() - suffix.position();
126+
int length = prefixLength + suffixLength;
127+
128+
WritableColumnVector arrayData = c1.arrayData();
129+
c1.reset();
130+
if (prefixLength != 0) {
131+
arrayData.appendBytes(prefixLength, previous.array(), previous.position());
132+
}
133+
arrayData.appendBytes(suffixLength, suffixArray, suffix.position());
134+
previous = arrayData.getByteBuffer(0, length);
135+
currentRow++;
136+
137+
WritableColumnVector tmp = c1;
138+
c1 = c2;
139+
c2 = tmp;
63140
}
64141
}
65142

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.datasources.parquet;
18+
19+
import static org.apache.spark.sql.types.DataTypes.IntegerType;
20+
21+
import java.io.EOFException;
22+
import java.io.IOException;
23+
import java.nio.ByteBuffer;
24+
import org.apache.parquet.bytes.ByteBufferInputStream;
25+
import org.apache.parquet.io.ParquetDecodingException;
26+
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
27+
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
28+
29+
/**
30+
* An implementation of the Parquet DELTA_LENGTH_BYTE_ARRAY decoder that supports the vectorized
31+
* interface.
32+
*/
33+
public class VectorizedDeltaLengthByteArrayReader extends VectorizedReaderBase implements
34+
VectorizedValuesReader {
35+
36+
private final VectorizedDeltaBinaryPackedReader lengthReader;
37+
private ByteBufferInputStream in;
38+
private WritableColumnVector lengthsVector;
39+
private int currentRow = 0;
40+
41+
VectorizedDeltaLengthByteArrayReader() {
42+
lengthReader = new VectorizedDeltaBinaryPackedReader();
43+
}
44+
45+
@Override
46+
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
47+
lengthsVector = new OnHeapColumnVector(valueCount, IntegerType);
48+
lengthReader.initFromPage(valueCount, in);
49+
lengthReader.readIntegers(lengthReader.getTotalValueCount(), lengthsVector, 0);
50+
this.in = in.remainingStream();
51+
}
52+
53+
@Override
54+
public void readBinary(int total, WritableColumnVector c, int rowId) {
55+
ByteBuffer buffer;
56+
ByteBufferOutputWriter outputWriter = ByteBufferOutputWriter::writeArrayByteBuffer;
57+
int length;
58+
for (int i = 0; i < total; i++) {
59+
length = lengthsVector.getInt(rowId + i);
60+
try {
61+
buffer = in.slice(length);
62+
} catch (EOFException e) {
63+
throw new ParquetDecodingException("Failed to read " + length + " bytes");
64+
}
65+
outputWriter.write(c, rowId + i, buffer, length);
66+
}
67+
currentRow += total;
68+
}
69+
70+
public ByteBuffer getBytes(int rowId) {
71+
int length = lengthsVector.getInt(rowId);
72+
try {
73+
return in.slice(length);
74+
} catch (EOFException e) {
75+
throw new ParquetDecodingException("Failed to read " + length + " bytes");
76+
}
77+
}
78+
79+
@Override
80+
public void skipBinary(int total) {
81+
for (int i = 0; i < total; i++) {
82+
int remaining = lengthsVector.getInt(currentRow + i);
83+
while (remaining > 0) {
84+
remaining -= in.skip(remaining);
85+
}
86+
}
87+
currentRow += total;
88+
}
89+
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,8 @@ private void checkEndOfRowGroup() throws IOException {
367367
datetimeRebaseMode,
368368
datetimeRebaseTz,
369369
int96RebaseMode,
370-
int96RebaseTz);
370+
int96RebaseTz,
371+
writerVersion);
371372
}
372373
totalCountLoadedSoFar += pages.getRowCount();
373374
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.datasources.parquet;
1919

20+
import java.nio.ByteBuffer;
21+
2022
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
2123

2224
import org.apache.parquet.io.api.Binary;
@@ -86,4 +88,18 @@ interface IntegerOutputWriter {
8688
void write(WritableColumnVector outputColumnVector, int rowId, long val);
8789
}
8890

91+
@FunctionalInterface
92+
interface ByteBufferOutputWriter {
93+
void write(WritableColumnVector c, int rowId, ByteBuffer val, int length);
94+
95+
static void writeArrayByteBuffer(WritableColumnVector c, int rowId, ByteBuffer val,
96+
int length) {
97+
c.putByteArray(rowId,
98+
val.array(),
99+
val.arrayOffset() + val.position(),
100+
length);
101+
}
102+
103+
static void skipWrite(WritableColumnVector c, int rowId, ByteBuffer val, int length) { }
104+
}
89105
}

0 commit comments

Comments
 (0)