1717
1818package org .apache .spark .sql .execution .datasources .parquet
1919
20+ import java .io .{DataOutput , OutputStream }
2021import java .math .{BigDecimal , BigInteger }
2122import java .nio .ByteOrder
2223
@@ -281,6 +282,24 @@ private[parquet] class CatalystRowConverter(
281282 }
282283 }
283284
285+ // !! HACK ALERT !!
286+ //
287+ // This is a hacky utility class used to steal underlying byte array from a Parquet `Binary`
288+ // without copying it as long as we know that it's safe to do so.
289+ private final class ByteArrayThief extends OutputStream {
290+ var bytes : Array [Byte ] = _
291+ var offset : Int = _
292+ var numBytes : Int = _
293+
294+ override def write (bytes : Array [Byte ], offset : Int , length : Int ): Unit = {
295+ this .bytes = bytes
296+ this .offset = offset
297+ this .numBytes = length
298+ }
299+
300+ override def write (b : Int ): Unit = ()
301+ }
302+
284303 /**
285304 * Parquet converter for strings. A dictionary is used to minimize string decoding cost.
286305 */
@@ -289,6 +308,8 @@ private[parquet] class CatalystRowConverter(
289308
290309 private var expandedDictionary : Array [UTF8String ] = null
291310
311+ private val byteArrayThief = new ByteArrayThief
312+
292313 override def hasDictionarySupport : Boolean = true
293314
294315 override def setDictionary (dictionary : Dictionary ): Unit = {
@@ -302,13 +323,11 @@ private[parquet] class CatalystRowConverter(
302323 }
303324
304325 override def addBinary (value : Binary ): Unit = {
305- // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we
306- // are using `Binary.toByteBuffer.array()` to steal the underlying byte array without copying
307- // it.
308- val buffer = value.toByteBuffer
309- val offset = buffer.position()
310- val numBytes = buffer.limit() - buffer.position()
311- updater.set(UTF8String .fromBytes(buffer.array(), offset, numBytes))
326+ value.writeTo(byteArrayThief)
327+ val bytes = byteArrayThief.bytes
328+ val offset = byteArrayThief.offset
329+ val numBytes = byteArrayThief.numBytes
330+ updater.set(UTF8String .fromBytes(bytes, offset, numBytes))
312331 }
313332 }
314333
@@ -320,6 +339,8 @@ private[parquet] class CatalystRowConverter(
320339 updater : ParentContainerUpdater )
321340 extends CatalystPrimitiveConverter (updater) {
322341
342+ private val byteArrayThief = new ByteArrayThief
343+
323344 // Converts decimals stored as INT32
324345 override def addInt (value : Int ): Unit = {
325346 addLong(value : Long )
@@ -340,23 +361,21 @@ private[parquet] class CatalystRowConverter(
340361 val scale = decimalType.scale
341362
342363 if (precision <= CatalystSchemaConverter .MAX_PRECISION_FOR_INT64 ) {
343- // Constructs a `Decimal` with an unscaled `Long` value if possible. The underlying
344- // `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we are using
345- // `Binary.toByteBuffer.array()` to steal the underlying byte array without copying it.
346- val buffer = value.toByteBuffer
347- val bytes = buffer.array()
348- val start = buffer.position()
349- val end = buffer.limit()
364+ // Constructs a `Decimal` with an unscaled `Long` value if possible.
365+ value.writeTo(byteArrayThief)
366+ val bytes = byteArrayThief.bytes
367+ val begin = byteArrayThief.offset
368+ val end = begin + byteArrayThief.numBytes
350369
351370 var unscaled = 0L
352- var i = start
371+ var i = begin
353372
354373 while (i < end) {
355374 unscaled = (unscaled << 8 ) | (bytes(i) & 0xff )
356375 i += 1
357376 }
358377
359- val bits = 8 * (end - start)
378+ val bits = 8 * byteArrayThief.numBytes
360379 unscaled = (unscaled << (64 - bits)) >> (64 - bits)
361380 Decimal (unscaled, precision, scale)
362381 } else {
0 commit comments