From e923f7da03b62bd01f584162659e0a4e08537739 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 27 Feb 2016 10:46:36 +0000 Subject: [PATCH] Add shorttype suport. --- .../parquet/UnsafeRowParquetRecordReader.java | 3 ++ .../parquet/VectorizedRleValuesReader.java | 34 ++++++++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java index 9d50cfab3bd3d..e7f0ec2e77895 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java @@ -765,6 +765,9 @@ private void readIntBatch(int rowId, int num, ColumnVector column) throws IOExce } else if (DecimalType.is64BitDecimalType(column.dataType())) { defColumn.readIntsAsLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else if (column.dataType() == DataTypes.ShortType) { + defColumn.readShorts( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else { throw new NotImplementedException("Unimplemented type: " + column.dataType()); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index b2048c0e397ed..8613fcae0b805 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -301,6 +301,38 @@ public void readBytes(int total, ColumnVector c, } } + public void readShorts(int total, ColumnVector c, + int rowId, int level, VectorizedValuesReader data) { + int left = total; + while (left > 0) { + if (this.currentCount == 0) this.readNextGroup(); + int n = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + if (currentValue == level) { + for (int i = 0; i < n; i++) { + c.putShort(rowId + i, (short)data.readInteger()); + } + } else { + c.putNulls(rowId, n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (currentBuffer[currentBufferIdx++] == level) { + c.putShort(rowId + i, (short)data.readInteger()); + } else { + c.putNull(rowId + i); + } + } + break; + } + rowId += n; + left -= n; + currentCount -= n; + } + } + public void readLongs(int total, ColumnVector c, int rowId, int level, VectorizedValuesReader data) { int left = total; @@ -611,4 +643,4 @@ private void readNextGroup() { throw new ParquetDecodingException("not a valid mode " + this.mode); } } -} \ No newline at end of file +}