Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,13 @@ orc-mapreduce-1.4.3-nohive.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
parquet-column-1.8.2.jar
parquet-common-1.8.2.jar
parquet-encoding-1.8.2.jar
parquet-format-2.3.1.jar
parquet-hadoop-1.8.2.jar
parquet-column-1.10.0.jar
parquet-common-1.10.0.jar
parquet-encoding-1.10.0.jar
parquet-format-2.4.0.jar
parquet-hadoop-1.10.0.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.8.2.jar
parquet-jackson-1.10.0.jar
protobuf-java-2.5.0.jar
py4j-0.10.6.jar
pyrolite-4.13.jar
Expand Down
12 changes: 6 additions & 6 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,13 @@ orc-mapreduce-1.4.3-nohive.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
parquet-column-1.8.2.jar
parquet-common-1.8.2.jar
parquet-encoding-1.8.2.jar
parquet-format-2.3.1.jar
parquet-hadoop-1.8.2.jar
parquet-column-1.10.0.jar
parquet-common-1.10.0.jar
parquet-encoding-1.10.0.jar
parquet-format-2.4.0.jar
parquet-hadoop-1.10.0.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.8.2.jar
parquet-jackson-1.10.0.jar
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(btw, don't forget to fix

parquet-column-1.8.2.jar
parquet-common-1.8.2.jar
parquet-encoding-1.8.2.jar
parquet-format-2.3.1.jar
parquet-hadoop-1.8.2.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.8.2.jar
too)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

protobuf-java-2.5.0.jar
py4j-0.10.6.jar
pyrolite-4.13.jar
Expand Down
12 changes: 6 additions & 6 deletions dev/deps/spark-deps-hadoop-3.1
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ orc-mapreduce-1.4.3-nohive.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
parquet-column-1.8.2.jar
parquet-common-1.8.2.jar
parquet-encoding-1.8.2.jar
parquet-format-2.3.1.jar
parquet-hadoop-1.8.2.jar
parquet-column-1.10.0.jar
parquet-common-1.10.0.jar
parquet-encoding-1.10.0.jar
parquet-format-2.4.0.jar
parquet-hadoop-1.10.0.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.8.2.jar
parquet-jackson-1.10.0.jar
protobuf-java-2.5.0.jar
py4j-0.10.6.jar
pyrolite-4.13.jar
Expand Down
2 changes: 1 addition & 1 deletion docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
Sets the compression codec used when writing Parquet files. If either `compression` or
`parquet.compression` is specified in the table-specific options/properties, the precedence would be
`compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include:
none, uncompressed, snappy, gzip, lzo.
none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
</td>
</tr>
<tr>
Expand Down
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<derby.version>10.12.1.1</derby.version>
<parquet.version>1.8.2</parquet.version>
<parquet.version>1.10.0</parquet.version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue . To see the Jenkins result, could you resolve the dependency check failure with the following?

./dev/test-dependencies.sh --replace-manifest

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike last time, it seems that this PR touches commons-pool dependency together? Can we avoid this?

-commons-pool-1.5.4.jar
+commons-pool-1.6.jar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I excluded the commons-pool dependency from parquet-hadoop to avoid this. I also tested the latest Parquet release with commons-pool 1.5.4 and everything passes. I don't think it actually requires 1.6.

<orc.version>1.4.3</orc.version>
<orc.classifier>nohive</orc.classifier>
<hive.parquet.version>1.6.0</hive.parquet.version>
Expand Down Expand Up @@ -1778,6 +1778,12 @@
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
<scope>${parquet.deps.scope}</scope>
<exclusions>
<exclusion>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ object SQLConf {
"snappy, gzip, lzo.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo"))
.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo", "lz4", "brotli", "zstd"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update sql-programming-guide.md together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

.createWithDefault("snappy")

val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ protected static IntIterator createRLEIterator(
return new RLEIntIterator(
new RunLengthBitPackingHybridDecoder(
BytesUtils.getWidthFromMaxInt(maxLevel),
new ByteArrayInputStream(bytes.toByteArray())));
bytes.toInputStream()));
} catch (IOException e) {
throw new IOException("could not read levels in page for col " + descriptor, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Arrays;
import java.util.TimeZone;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
Expand Down Expand Up @@ -388,15 +390,16 @@ private void decodeDictionaryIds(
* is guaranteed that num is smaller than the number of values left in the current page.
*/

private void readBooleanBatch(int rowId, int num, WritableColumnVector column) {
private void readBooleanBatch(int rowId, int num, WritableColumnVector column)
throws IOException {
if (column.dataType() != DataTypes.BooleanType) {
throw constructConvertNotSupportedException(descriptor, column);
}
defColumn.readBooleans(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
}

private void readIntBatch(int rowId, int num, WritableColumnVector column) {
private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType ||
Expand All @@ -414,7 +417,7 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) {
}
}

private void readLongBatch(int rowId, int num, WritableColumnVector column) {
private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType()) ||
Expand All @@ -434,7 +437,7 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) {
}
}

private void readFloatBatch(int rowId, int num, WritableColumnVector column) {
private void readFloatBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: support implicit cast to double?
if (column.dataType() == DataTypes.FloatType) {
Expand All @@ -445,7 +448,7 @@ private void readFloatBatch(int rowId, int num, WritableColumnVector column) {
}
}

private void readDoubleBatch(int rowId, int num, WritableColumnVector column) {
private void readDoubleBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.DoubleType) {
Expand All @@ -456,7 +459,7 @@ private void readDoubleBatch(int rowId, int num, WritableColumnVector column) {
}
}

private void readBinaryBatch(int rowId, int num, WritableColumnVector column) {
private void readBinaryBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
Expand Down Expand Up @@ -556,7 +559,7 @@ public Void visit(DataPageV2 dataPageV2) {
});
}

private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) throws IOException {
private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) throws IOException {
this.endOfPageValueCount = valuesRead + pageValueCount;
if (dataEncoding.usesDictionary()) {
this.dataColumn = null;
Expand All @@ -581,7 +584,7 @@ private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) thr
}

try {
dataColumn.initFromPage(pageValueCount, bytes, offset);
dataColumn.initFromPage(pageValueCount, in);
} catch (IOException e) {
throw new IOException("could not read page in col " + descriptor, e);
}
Expand All @@ -602,12 +605,11 @@ private void readPageV1(DataPageV1 page) throws IOException {
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
try {
byte[] bytes = page.getBytes().toByteArray();
rlReader.initFromPage(pageValueCount, bytes, 0);
int next = rlReader.getNextOffset();
dlReader.initFromPage(pageValueCount, bytes, next);
next = dlReader.getNextOffset();
initDataReader(page.getValueEncoding(), bytes, next);
BytesInput bytes = page.getBytes();
ByteBufferInputStream in = bytes.toInputStream();
rlReader.initFromPage(pageValueCount, in);
dlReader.initFromPage(pageValueCount, in);
initDataReader(page.getValueEncoding(), in);
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
Expand All @@ -619,12 +621,13 @@ private void readPageV2(DataPageV2 page) throws IOException {
page.getRepetitionLevels(), descriptor);

int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
this.defColumn = new VectorizedRleValuesReader(bitWidth);
// do not read the length from the stream. v2 pages handle dividing the page bytes.
this.defColumn = new VectorizedRleValuesReader(bitWidth, false);
this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
this.defColumn.initFromBuffer(
this.pageValueCount, page.getDefinitionLevels().toByteArray());
this.defColumn.initFromPage(
this.pageValueCount, page.getDefinitionLevels().toInputStream());
try {
initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
initDataReader(page.getDataEncoding(), page.getData().toInputStream());
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
Expand Down
Loading