Skip to content

Commit 07d9c53

Browse files
HyukjinKwonliancheng
authored andcommitted
[SPARK-9876][SQL][FOLLOWUP] Enable string and binary tests for Parquet predicate pushdown and replace deprecated fromByteArray.
## What changes were proposed in this pull request? It seems Parquet has been upgraded to 1.8.1 by #13280. So, this PR enables string and binary predicate push down which was disabled due to [SPARK-11153](https://issues.apache.org/jira/browse/SPARK-11153) and [PARQUET-251](https://issues.apache.org/jira/browse/PARQUET-251) and cleans up some comments unremoved (I think by mistake). This PR also replace the API, `fromByteArray()` deprecated in [PARQUET-251](https://issues.apache.org/jira/browse/PARQUET-251). ## How was this patch tested? Unit tests in `ParquetFilters` Author: hyukjinkwon <[email protected]> Closes #13389 from HyukjinKwon/parquet-1.8-followup.
1 parent 7f7eb39 commit 07d9c53

File tree

4 files changed

+9
-18
lines changed

4 files changed

+9
-18
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public final void readBinary(int total, ColumnVector v, int rowId) {
170170

171171
@Override
172172
public final Binary readBinary(int len) {
173-
Binary result = Binary.fromByteArray(buffer, offset - Platform.BYTE_ARRAY_OFFSET, len);
173+
Binary result = Binary.fromConstantByteArray(buffer, offset - Platform.BYTE_ARRAY_OFFSET, len);
174174
offset += len;
175175
return result;
176176
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
150150

151151
case StringType =>
152152
(row: SpecializedGetters, ordinal: Int) =>
153-
recordConsumer.addBinary(Binary.fromByteArray(row.getUTF8String(ordinal).getBytes))
153+
recordConsumer.addBinary(
154+
Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes))
154155

155156
case TimestampType =>
156157
(row: SpecializedGetters, ordinal: Int) => {
@@ -165,12 +166,12 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
165166
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
166167
val buf = ByteBuffer.wrap(timestampBuffer)
167168
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
168-
recordConsumer.addBinary(Binary.fromByteArray(timestampBuffer))
169+
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
169170
}
170171

171172
case BinaryType =>
172173
(row: SpecializedGetters, ordinal: Int) =>
173-
recordConsumer.addBinary(Binary.fromByteArray(row.getBinary(ordinal)))
174+
recordConsumer.addBinary(Binary.fromReusedByteArray(row.getBinary(ordinal)))
174175

175176
case DecimalType.Fixed(precision, scale) =>
176177
makeDecimalWriter(precision, scale)
@@ -227,7 +228,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
227228
shift -= 8
228229
}
229230

230-
recordConsumer.addBinary(Binary.fromByteArray(decimalBuffer, 0, numBytes))
231+
recordConsumer.addBinary(Binary.fromReusedByteArray(decimalBuffer, 0, numBytes))
231232
}
232233

233234
val binaryWriterUsingUnscaledBytes =
@@ -248,7 +249,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
248249
decimalBuffer
249250
}
250251

251-
recordConsumer.addBinary(Binary.fromByteArray(fixedLengthBytes, 0, numBytes))
252+
recordConsumer.addBinary(Binary.fromReusedByteArray(fixedLengthBytes, 0, numBytes))
252253
}
253254

254255
writeLegacyParquetFormat match {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ private[sql] object ParquetFilters {
5050
(n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
5151
case DoubleType =>
5252
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
53-
5453
// Binary.fromString and Binary.fromByteArray don't accept null values
5554
case StringType =>
5655
(n: String, v: Any) => FilterApi.eq(
@@ -73,7 +72,6 @@ private[sql] object ParquetFilters {
7372
(n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
7473
case DoubleType =>
7574
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
76-
7775
case StringType =>
7876
(n: String, v: Any) => FilterApi.notEq(
7977
binaryColumn(n),
@@ -93,7 +91,6 @@ private[sql] object ParquetFilters {
9391
(n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
9492
case DoubleType =>
9593
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
96-
9794
case StringType =>
9895
(n: String, v: Any) =>
9996
FilterApi.lt(binaryColumn(n),
@@ -112,7 +109,6 @@ private[sql] object ParquetFilters {
112109
(n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
113110
case DoubleType =>
114111
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
115-
116112
case StringType =>
117113
(n: String, v: Any) =>
118114
FilterApi.ltEq(binaryColumn(n),
@@ -131,8 +127,6 @@ private[sql] object ParquetFilters {
131127
(n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
132128
case DoubleType =>
133129
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
134-
135-
// See https://issues.apache.org/jira/browse/SPARK-11153
136130
case StringType =>
137131
(n: String, v: Any) =>
138132
FilterApi.gt(binaryColumn(n),
@@ -151,7 +145,6 @@ private[sql] object ParquetFilters {
151145
(n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
152146
case DoubleType =>
153147
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
154-
155148
case StringType =>
156149
(n: String, v: Any) =>
157150
FilterApi.gtEq(binaryColumn(n),
@@ -174,7 +167,6 @@ private[sql] object ParquetFilters {
174167
case DoubleType =>
175168
(n: String, v: Set[Any]) =>
176169
FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
177-
178170
case StringType =>
179171
(n: String, v: Set[Any]) =>
180172
FilterApi.userDefined(binaryColumn(n),

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
229229
}
230230
}
231231

232-
// See https://issues.apache.org/jira/browse/SPARK-11153
233-
ignore("filter pushdown - string") {
232+
test("filter pushdown - string") {
234233
withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df =>
235234
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
236235
checkFilterPredicate(
@@ -258,8 +257,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
258257
}
259258
}
260259

261-
// See https://issues.apache.org/jira/browse/SPARK-11153
262-
ignore("filter pushdown - binary") {
260+
test("filter pushdown - binary") {
263261
implicit class IntToBinary(int: Int) {
264262
def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
265263
}

0 commit comments

Comments
 (0)