Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public final void readBinary(int total, ColumnVector v, int rowId) {

@Override
public final Binary readBinary(int len) {
Binary result = Binary.fromByteArray(buffer, offset - Platform.BYTE_ARRAY_OFFSET, len);
Binary result = Binary.fromConstantByteArray(buffer, offset - Platform.BYTE_ARRAY_OFFSET, len);
offset += len;
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi

case StringType =>
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addBinary(Binary.fromByteArray(row.getUTF8String(ordinal).getBytes))
recordConsumer.addBinary(
Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're converting from a String, you can use Binary.fromString, which sets the reuse flag correctly. It looks like this should be using the "constant" variant, which signals to Parquet that the underlying bytes won't be changed by the program after they have been passed to Parquet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review! (Actually it is UTF8String. So, it has to be converted into String to use Binary.fromString).. though.. I am a bit worried that it might possibly be reused in the future (although I think it is not reused for now).

This can write corrupt statistics if this is reused.. Is my understanding correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UTF8String itself is immutable, but the underlying buffer it points to can be mutable. I'd vote for using Binary.fromReusedByteArray here.


case TimestampType =>
(row: SpecializedGetters, ordinal: Int) => {
Expand All @@ -165,12 +166,12 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
val buf = ByteBuffer.wrap(timestampBuffer)
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
recordConsumer.addBinary(Binary.fromByteArray(timestampBuffer))
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
}

case BinaryType =>
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addBinary(Binary.fromByteArray(row.getBinary(ordinal)))
recordConsumer.addBinary(Binary.fromReusedByteArray(row.getBinary(ordinal)))

case DecimalType.Fixed(precision, scale) =>
makeDecimalWriter(precision, scale)
Expand Down Expand Up @@ -227,7 +228,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
shift -= 8
}

recordConsumer.addBinary(Binary.fromByteArray(decimalBuffer, 0, numBytes))
recordConsumer.addBinary(Binary.fromReusedByteArray(decimalBuffer, 0, numBytes))
}

val binaryWriterUsingUnscaledBytes =
Expand All @@ -248,7 +249,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
decimalBuffer
}

recordConsumer.addBinary(Binary.fromByteArray(fixedLengthBytes, 0, numBytes))
recordConsumer.addBinary(Binary.fromReusedByteArray(fixedLengthBytes, 0, numBytes))
}

writeLegacyParquetFormat match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// Binary.fromString and Binary.fromByteArray don't accept null values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Lots of unnecessary white-space changes. I don't generally like to commit these, but it's fine if this is more in line with the project's published style guidelines.

case StringType =>
(n: String, v: Any) => FilterApi.eq(
Expand All @@ -73,7 +72,6 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

case StringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Expand All @@ -93,7 +91,6 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])

case StringType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n),
Expand All @@ -112,7 +109,6 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

case StringType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n),
Expand All @@ -131,8 +127,6 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See https://issues.apache.org/jira/browse/SPARK-11153
case StringType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n),
Expand All @@ -151,7 +145,6 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

case StringType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n),
Expand All @@ -174,7 +167,6 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))

case StringType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

// See https://issues.apache.org/jira/browse/SPARK-11153
ignore("filter pushdown - string") {
test("filter pushdown - string") {
withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate(
Expand Down Expand Up @@ -258,8 +257,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

// See https://issues.apache.org/jira/browse/SPARK-11153
ignore("filter pushdown - binary") {
test("filter pushdown - binary") {
implicit class IntToBinary(int: Int) {
def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
}
Expand Down