diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index c3d1dd444b506..f479c13f00be6 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -162,13 +162,13 @@ orc-mapreduce-1.4.3-nohive.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
+parquet-column-1.10.0.jar
+parquet-common-1.10.0.jar
+parquet-encoding-1.10.0.jar
+parquet-format-2.4.0.jar
+parquet-hadoop-1.10.0.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
+parquet-jackson-1.10.0.jar
protobuf-java-2.5.0.jar
py4j-0.10.6.jar
pyrolite-4.13.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 290867035f91d..e7c4599cb5003 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -163,13 +163,13 @@ orc-mapreduce-1.4.3-nohive.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
+parquet-column-1.10.0.jar
+parquet-common-1.10.0.jar
+parquet-encoding-1.10.0.jar
+parquet-format-2.4.0.jar
+parquet-hadoop-1.10.0.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
+parquet-jackson-1.10.0.jar
protobuf-java-2.5.0.jar
py4j-0.10.6.jar
pyrolite-4.13.jar
diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1
index 97ad65a4096cb..3447cd7395d95 100644
--- a/dev/deps/spark-deps-hadoop-3.1
+++ b/dev/deps/spark-deps-hadoop-3.1
@@ -181,13 +181,13 @@ orc-mapreduce-1.4.3-nohive.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
+parquet-column-1.10.0.jar
+parquet-common-1.10.0.jar
+parquet-encoding-1.10.0.jar
+parquet-format-2.4.0.jar
+parquet-hadoop-1.10.0.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
+parquet-jackson-1.10.0.jar
protobuf-java-2.5.0.jar
py4j-0.10.6.jar
pyrolite-4.13.jar
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 075b953a0898e..a8f2486ca58d9 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -964,7 +964,7 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
Sets the compression codec used when writing Parquet files. If either `compression` or
`parquet.compression` is specified in the table-specific options/properties, the precedence would be
`compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include:
- none, uncompressed, snappy, gzip, lzo.
+ none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
diff --git a/pom.xml b/pom.xml
index 88e77ff874748..6e37e518d86e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,7 +129,7 @@
1.2.1
10.12.1.1
- 1.8.2
+ 1.10.0
1.4.3
nohive
1.6.0
@@ -1778,6 +1778,12 @@
parquet-hadoop
${parquet.version}
${parquet.deps.scope}
+
+
+ commons-pool
+ commons-pool
+
+
org.apache.parquet
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 895e150756567..b00edca97cd44 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -345,7 +345,7 @@ object SQLConf {
"snappy, gzip, lzo.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
- .checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo"))
+ .checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo", "lz4", "brotli", "zstd"))
.createWithDefault("snappy")
val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index e65cd252c3ddf..10d6ed85a4080 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -293,7 +293,7 @@ protected static IntIterator createRLEIterator(
return new RLEIntIterator(
new RunLengthBitPackingHybridDecoder(
BytesUtils.getWidthFromMaxInt(maxLevel),
- new ByteArrayInputStream(bytes.toByteArray())));
+ bytes.toInputStream()));
} catch (IOException e) {
throw new IOException("could not read levels in page for col " + descriptor, e);
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 72f1d024b08ce..d5969b55eef96 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -21,6 +21,8 @@
import java.util.Arrays;
import java.util.TimeZone;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
@@ -388,7 +390,8 @@ private void decodeDictionaryIds(
* is guaranteed that num is smaller than the number of values left in the current page.
*/
- private void readBooleanBatch(int rowId, int num, WritableColumnVector column) {
+ private void readBooleanBatch(int rowId, int num, WritableColumnVector column)
+ throws IOException {
if (column.dataType() != DataTypes.BooleanType) {
throw constructConvertNotSupportedException(descriptor, column);
}
@@ -396,7 +399,7 @@ private void readBooleanBatch(int rowId, int num, WritableColumnVector column) {
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
}
- private void readIntBatch(int rowId, int num, WritableColumnVector column) {
+ private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType ||
@@ -414,7 +417,7 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) {
}
}
- private void readLongBatch(int rowId, int num, WritableColumnVector column) {
+ private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType()) ||
@@ -434,7 +437,7 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) {
}
}
- private void readFloatBatch(int rowId, int num, WritableColumnVector column) {
+ private void readFloatBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: support implicit cast to double?
if (column.dataType() == DataTypes.FloatType) {
@@ -445,7 +448,7 @@ private void readFloatBatch(int rowId, int num, WritableColumnVector column) {
}
}
- private void readDoubleBatch(int rowId, int num, WritableColumnVector column) {
+ private void readDoubleBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.DoubleType) {
@@ -456,7 +459,7 @@ private void readDoubleBatch(int rowId, int num, WritableColumnVector column) {
}
}
- private void readBinaryBatch(int rowId, int num, WritableColumnVector column) {
+ private void readBinaryBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
@@ -556,7 +559,7 @@ public Void visit(DataPageV2 dataPageV2) {
});
}
- private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) throws IOException {
+ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) throws IOException {
this.endOfPageValueCount = valuesRead + pageValueCount;
if (dataEncoding.usesDictionary()) {
this.dataColumn = null;
@@ -581,7 +584,7 @@ private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) thr
}
try {
- dataColumn.initFromPage(pageValueCount, bytes, offset);
+ dataColumn.initFromPage(pageValueCount, in);
} catch (IOException e) {
throw new IOException("could not read page in col " + descriptor, e);
}
@@ -602,12 +605,11 @@ private void readPageV1(DataPageV1 page) throws IOException {
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
try {
- byte[] bytes = page.getBytes().toByteArray();
- rlReader.initFromPage(pageValueCount, bytes, 0);
- int next = rlReader.getNextOffset();
- dlReader.initFromPage(pageValueCount, bytes, next);
- next = dlReader.getNextOffset();
- initDataReader(page.getValueEncoding(), bytes, next);
+ BytesInput bytes = page.getBytes();
+ ByteBufferInputStream in = bytes.toInputStream();
+ rlReader.initFromPage(pageValueCount, in);
+ dlReader.initFromPage(pageValueCount, in);
+ initDataReader(page.getValueEncoding(), in);
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
@@ -619,12 +621,13 @@ private void readPageV2(DataPageV2 page) throws IOException {
page.getRepetitionLevels(), descriptor);
int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
- this.defColumn = new VectorizedRleValuesReader(bitWidth);
+ // do not read the length from the stream. v2 pages handle dividing the page bytes.
+ this.defColumn = new VectorizedRleValuesReader(bitWidth, false);
this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
- this.defColumn.initFromBuffer(
- this.pageValueCount, page.getDefinitionLevels().toByteArray());
+ this.defColumn.initFromPage(
+ this.pageValueCount, page.getDefinitionLevels().toInputStream());
try {
- initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
+ initDataReader(page.getDataEncoding(), page.getData().toInputStream());
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 5b75f719339fb..aacefacfc1c1a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -20,34 +20,30 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.io.ParquetDecodingException;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
-import org.apache.spark.unsafe.Platform;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
+import org.apache.spark.unsafe.Platform;
/**
* An implementation of the Parquet PLAIN decoder that supports the vectorized interface.
*/
public class VectorizedPlainValuesReader extends ValuesReader implements VectorizedValuesReader {
- private byte[] buffer;
- private int offset;
- private int bitOffset; // Only used for booleans.
- private ByteBuffer byteBuffer; // used to wrap the byte array buffer
+ private ByteBufferInputStream in = null;
- private static final boolean bigEndianPlatform =
- ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
+ // Only used for booleans.
+ private int bitOffset;
+ private byte currentByte = 0;
public VectorizedPlainValuesReader() {
}
@Override
- public void initFromPage(int valueCount, byte[] bytes, int offset) throws IOException {
- this.buffer = bytes;
- this.offset = offset + Platform.BYTE_ARRAY_OFFSET;
- if (bigEndianPlatform) {
- byteBuffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
- }
+ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
+ this.in = in;
}
@Override
@@ -63,115 +59,157 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) {
}
}
+ private ByteBuffer getBuffer(int length) {
+ try {
+ return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read " + length + " bytes", e);
+ }
+ }
+
@Override
public final void readIntegers(int total, WritableColumnVector c, int rowId) {
- c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
- offset += 4 * total;
+ int requiredBytes = total * 4;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset() + buffer.position();
+ c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
+ } else {
+ for (int i = 0; i < total; i += 1) {
+ c.putInt(rowId + i, buffer.getInt());
+ }
+ }
}
@Override
public final void readLongs(int total, WritableColumnVector c, int rowId) {
- c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
- offset += 8 * total;
+ int requiredBytes = total * 8;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset() + buffer.position();
+ c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
+ } else {
+ for (int i = 0; i < total; i += 1) {
+ c.putLong(rowId + i, buffer.getLong());
+ }
+ }
}
@Override
public final void readFloats(int total, WritableColumnVector c, int rowId) {
- c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
- offset += 4 * total;
+ int requiredBytes = total * 4;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset() + buffer.position();
+ c.putFloats(rowId, total, buffer.array(), offset);
+ } else {
+ for (int i = 0; i < total; i += 1) {
+ c.putFloat(rowId + i, buffer.getFloat());
+ }
+ }
}
@Override
public final void readDoubles(int total, WritableColumnVector c, int rowId) {
- c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
- offset += 8 * total;
+ int requiredBytes = total * 8;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset() + buffer.position();
+ c.putDoubles(rowId, total, buffer.array(), offset);
+ } else {
+ for (int i = 0; i < total; i += 1) {
+ c.putDouble(rowId + i, buffer.getDouble());
+ }
+ }
}
@Override
public final void readBytes(int total, WritableColumnVector c, int rowId) {
- for (int i = 0; i < total; i++) {
- // Bytes are stored as a 4-byte little endian int. Just read the first byte.
- // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride.
- c.putByte(rowId + i, Platform.getByte(buffer, offset));
- offset += 4;
+ // Bytes are stored as a 4-byte little endian int. Just read the first byte.
+ // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride.
+ int requiredBytes = total * 4;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ for (int i = 0; i < total; i += 1) {
+ c.putByte(rowId + i, buffer.get());
+ // skip the next 3 bytes
+ buffer.position(buffer.position() + 3);
}
}
@Override
public final boolean readBoolean() {
- byte b = Platform.getByte(buffer, offset);
- boolean v = (b & (1 << bitOffset)) != 0;
+ // TODO: vectorize decoding and keep boolean[] instead of currentByte
+ if (bitOffset == 0) {
+ try {
+ currentByte = (byte) in.read();
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read a byte", e);
+ }
+ }
+
+ boolean v = (currentByte & (1 << bitOffset)) != 0;
bitOffset += 1;
if (bitOffset == 8) {
bitOffset = 0;
- offset++;
}
return v;
}
@Override
public final int readInteger() {
- int v = Platform.getInt(buffer, offset);
- if (bigEndianPlatform) {
- v = java.lang.Integer.reverseBytes(v);
- }
- offset += 4;
- return v;
+ return getBuffer(4).getInt();
}
@Override
public final long readLong() {
- long v = Platform.getLong(buffer, offset);
- if (bigEndianPlatform) {
- v = java.lang.Long.reverseBytes(v);
- }
- offset += 8;
- return v;
+ return getBuffer(8).getLong();
}
@Override
public final byte readByte() {
- return (byte)readInteger();
+ return (byte) readInteger();
}
@Override
public final float readFloat() {
- float v;
- if (!bigEndianPlatform) {
- v = Platform.getFloat(buffer, offset);
- } else {
- v = byteBuffer.getFloat(offset - Platform.BYTE_ARRAY_OFFSET);
- }
- offset += 4;
- return v;
+ return getBuffer(4).getFloat();
}
@Override
public final double readDouble() {
- double v;
- if (!bigEndianPlatform) {
- v = Platform.getDouble(buffer, offset);
- } else {
- v = byteBuffer.getDouble(offset - Platform.BYTE_ARRAY_OFFSET);
- }
- offset += 8;
- return v;
+ return getBuffer(8).getDouble();
}
@Override
public final void readBinary(int total, WritableColumnVector v, int rowId) {
for (int i = 0; i < total; i++) {
int len = readInteger();
- int start = offset;
- offset += len;
- v.putByteArray(rowId + i, buffer, start - Platform.BYTE_ARRAY_OFFSET, len);
+ ByteBuffer buffer = getBuffer(len);
+ if (buffer.hasArray()) {
+ v.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + buffer.position(), len);
+ } else {
+ byte[] bytes = new byte[len];
+ buffer.get(bytes);
+ v.putByteArray(rowId + i, bytes);
+ }
}
}
@Override
public final Binary readBinary(int len) {
- Binary result = Binary.fromConstantByteArray(buffer, offset - Platform.BYTE_ARRAY_OFFSET, len);
- offset += len;
- return result;
+ ByteBuffer buffer = getBuffer(len);
+ if (buffer.hasArray()) {
+ return Binary.fromConstantByteArray(
+ buffer.array(), buffer.arrayOffset() + buffer.position(), len);
+ } else {
+ byte[] bytes = new byte[len];
+ buffer.get(bytes);
+ return Binary.fromConstantByteArray(bytes);
+ }
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index fc7fa70c39419..fe3d31ae8e746 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet;
import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.bitpacking.BytePacker;
@@ -27,6 +28,9 @@
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
/**
* A values reader for Parquet's run-length encoded data. This is based off of the version in
* parquet-mr with these changes:
@@ -49,9 +53,7 @@ private enum MODE {
}
// Encoded data.
- private byte[] in;
- private int end;
- private int offset;
+ private ByteBufferInputStream in;
// bit/byte width of decoded data and utility to batch unpack them.
private int bitWidth;
@@ -70,45 +72,40 @@ private enum MODE {
// If true, the bit width is fixed. This decoder is used in different places and this also
// controls if we need to read the bitwidth from the beginning of the data stream.
private final boolean fixedWidth;
+ private final boolean readLength;
public VectorizedRleValuesReader() {
- fixedWidth = false;
+ this.fixedWidth = false;
+ this.readLength = false;
}
public VectorizedRleValuesReader(int bitWidth) {
- fixedWidth = true;
+ this.fixedWidth = true;
+ this.readLength = bitWidth != 0;
+ init(bitWidth);
+ }
+
+ public VectorizedRleValuesReader(int bitWidth, boolean readLength) {
+ this.fixedWidth = true;
+ this.readLength = readLength;
init(bitWidth);
}
@Override
- public void initFromPage(int valueCount, byte[] page, int start) {
- this.offset = start;
- this.in = page;
+ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
+ this.in = in;
if (fixedWidth) {
- if (bitWidth != 0) {
+ // initialize for repetition and definition levels
+ if (readLength) {
int length = readIntLittleEndian();
- this.end = this.offset + length;
+ this.in = in.sliceStream(length);
}
} else {
- this.end = page.length;
- if (this.end != this.offset) init(page[this.offset++] & 255);
- }
- if (bitWidth == 0) {
- // 0 bit width, treat this as an RLE run of valueCount number of 0's.
- this.mode = MODE.RLE;
- this.currentCount = valueCount;
- this.currentValue = 0;
- } else {
- this.currentCount = 0;
+ // initialize for values
+ if (in.available() > 0) {
+ init(in.read());
+ }
}
- }
-
- // Initialize the reader from a buffer. This is used for the V2 page encoding where the
- // definition are in its own buffer.
- public void initFromBuffer(int valueCount, byte[] data) {
- this.offset = 0;
- this.in = data;
- this.end = data.length;
if (bitWidth == 0) {
// 0 bit width, treat this as an RLE run of valueCount number of 0's.
this.mode = MODE.RLE;
@@ -129,11 +126,6 @@ private void init(int bitWidth) {
this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
}
- @Override
- public int getNextOffset() {
- return this.end;
- }
-
@Override
public boolean readBoolean() {
return this.readInteger() != 0;
@@ -182,7 +174,7 @@ public void readIntegers(
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -217,7 +209,7 @@ public void readBooleans(
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -251,7 +243,7 @@ public void readBytes(
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -285,7 +277,7 @@ public void readShorts(
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -321,7 +313,7 @@ public void readLongs(
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -355,7 +347,7 @@ public void readFloats(
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -389,7 +381,7 @@ public void readDoubles(
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -423,7 +415,7 @@ public void readBinarys(
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -462,7 +454,7 @@ public void readIntegers(
WritableColumnVector nulls,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -559,12 +551,12 @@ public Binary readBinary(int len) {
/**
* Reads the next varint encoded int.
*/
- private int readUnsignedVarInt() {
+ private int readUnsignedVarInt() throws IOException {
int value = 0;
int shift = 0;
int b;
do {
- b = in[offset++] & 255;
+ b = in.read();
value |= (b & 0x7F) << shift;
shift += 7;
} while ((b & 0x80) != 0);
@@ -574,35 +566,32 @@ private int readUnsignedVarInt() {
/**
* Reads the next 4 byte little endian int.
*/
- private int readIntLittleEndian() {
- int ch4 = in[offset] & 255;
- int ch3 = in[offset + 1] & 255;
- int ch2 = in[offset + 2] & 255;
- int ch1 = in[offset + 3] & 255;
- offset += 4;
+ private int readIntLittleEndian() throws IOException {
+ int ch4 = in.read();
+ int ch3 = in.read();
+ int ch2 = in.read();
+ int ch1 = in.read();
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
}
/**
* Reads the next byteWidth little endian int.
*/
- private int readIntLittleEndianPaddedOnBitWidth() {
+ private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
switch (bytesWidth) {
case 0:
return 0;
case 1:
- return in[offset++] & 255;
+ return in.read();
case 2: {
- int ch2 = in[offset] & 255;
- int ch1 = in[offset + 1] & 255;
- offset += 2;
+ int ch2 = in.read();
+ int ch1 = in.read();
return (ch1 << 8) + ch2;
}
case 3: {
- int ch3 = in[offset] & 255;
- int ch2 = in[offset + 1] & 255;
- int ch1 = in[offset + 2] & 255;
- offset += 3;
+ int ch3 = in.read();
+ int ch2 = in.read();
+ int ch1 = in.read();
return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
}
case 4: {
@@ -619,32 +608,36 @@ private int ceil8(int value) {
/**
* Reads the next group.
*/
- private void readNextGroup() {
- int header = readUnsignedVarInt();
- this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
- switch (mode) {
- case RLE:
- this.currentCount = header >>> 1;
- this.currentValue = readIntLittleEndianPaddedOnBitWidth();
- return;
- case PACKED:
- int numGroups = header >>> 1;
- this.currentCount = numGroups * 8;
- int bytesToRead = ceil8(this.currentCount * this.bitWidth);
-
- if (this.currentBuffer.length < this.currentCount) {
- this.currentBuffer = new int[this.currentCount];
- }
- currentBufferIdx = 0;
- int valueIndex = 0;
- for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex += this.bitWidth) {
- this.packer.unpack8Values(in, byteIndex, this.currentBuffer, valueIndex);
- valueIndex += 8;
- }
- offset += bytesToRead;
- return;
- default:
- throw new ParquetDecodingException("not a valid mode " + this.mode);
+ private void readNextGroup() {
+ try {
+ int header = readUnsignedVarInt();
+ this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
+ switch (mode) {
+ case RLE:
+ this.currentCount = header >>> 1;
+ this.currentValue = readIntLittleEndianPaddedOnBitWidth();
+ return;
+ case PACKED:
+ int numGroups = header >>> 1;
+ this.currentCount = numGroups * 8;
+
+ if (this.currentBuffer.length < this.currentCount) {
+ this.currentBuffer = new int[this.currentCount];
+ }
+ currentBufferIdx = 0;
+ int valueIndex = 0;
+ while (valueIndex < this.currentCount) {
+ // values are bit packed 8 at a time, so reading bitWidth will always work
+ ByteBuffer buffer = in.slice(bitWidth);
+ this.packer.unpack8Values(buffer, buffer.position(), this.currentBuffer, valueIndex);
+ valueIndex += 8;
+ }
+ return;
+ default:
+ throw new ParquetDecodingException("not a valid mode " + this.mode);
+ }
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read from input stream", e);
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index f36a89a4c3c5f..9cfc30725f03a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -81,7 +81,10 @@ object ParquetOptions {
"uncompressed" -> CompressionCodecName.UNCOMPRESSED,
"snappy" -> CompressionCodecName.SNAPPY,
"gzip" -> CompressionCodecName.GZIP,
- "lzo" -> CompressionCodecName.LZO)
+ "lzo" -> CompressionCodecName.LZO,
+ "lz4" -> CompressionCodecName.LZ4,
+ "brotli" -> CompressionCodecName.BROTLI,
+ "zstd" -> CompressionCodecName.ZSTD)
def getParquetCompressionCodecName(name: String): String = {
shortParquetCompressionCodecNames(name).name()
diff --git a/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
index 51dac111029e8..58ed201e2a60f 100644
--- a/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
@@ -89,7 +89,7 @@ Database default
Table t
Partition Values [ds=2017-08-01, hr=10]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10
-Partition Statistics 1067 bytes, 3 rows
+Partition Statistics 1121 bytes, 3 rows
# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
@@ -122,7 +122,7 @@ Database default
Table t
Partition Values [ds=2017-08-01, hr=10]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10
-Partition Statistics 1067 bytes, 3 rows
+Partition Statistics 1121 bytes, 3 rows
# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
@@ -147,7 +147,7 @@ Database default
Table t
Partition Values [ds=2017-08-01, hr=11]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11
-Partition Statistics 1080 bytes, 4 rows
+Partition Statistics 1098 bytes, 4 rows
# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
@@ -180,7 +180,7 @@ Database default
Table t
Partition Values [ds=2017-08-01, hr=10]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10
-Partition Statistics 1067 bytes, 3 rows
+Partition Statistics 1121 bytes, 3 rows
# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
@@ -205,7 +205,7 @@ Database default
Table t
Partition Values [ds=2017-08-01, hr=11]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11
-Partition Statistics 1080 bytes, 4 rows
+Partition Statistics 1098 bytes, 4 rows
# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
@@ -230,7 +230,7 @@ Database default
Table t
Partition Values [ds=2017-09-01, hr=5]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-09-01/hr=5
-Partition Statistics 1054 bytes, 2 rows
+Partition Statistics 1144 bytes, 2 rows
# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 863703b15f4f1..efc2f20a907f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
case plan: InMemoryRelation => plan
}.head
// InMemoryRelation's stats is file size before the underlying RDD is materialized
- assert(inMemoryRelation.computeStats().sizeInBytes === 740)
+ assert(inMemoryRelation.computeStats().sizeInBytes === 800)
// InMemoryRelation's stats is updated after materializing RDD
dfFromFile.collect()
@@ -516,7 +516,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
// Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats
// is calculated
- assert(inMemoryRelation2.computeStats().sizeInBytes === 740)
+ assert(inMemoryRelation2.computeStats().sizeInBytes === 800)
// InMemoryRelation's stats should be updated after calculating stats of the table
// clear cache to simulate a fresh environment