Skip to content

Commit c52ff2c

Browse files
Adding native-array converter
1 parent 619c397 commit c52ff2c

File tree

1 file changed

+132
-7
lines changed

1 file changed

+132
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala

Lines changed: 132 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,17 @@
1717

1818
package org.apache.spark.sql.parquet
1919

20-
import scala.collection.mutable.{Buffer, ArrayBuffer}
20+
import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
21+
import scala.reflect.ClassTag
22+
import scala.reflect.runtime.universe.runtimeMirror
2123

2224
import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
2325
import parquet.schema.MessageType
2426

25-
import org.apache.spark.Logging
2627
import org.apache.spark.sql.catalyst.types._
2728
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Row, Attribute}
2829
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
29-
import scala.collection.mutable
30+
import org.apache.spark.util.Utils
3031

3132
private[parquet] object CatalystConverter {
3233
// The type internally used for fields
@@ -46,6 +47,11 @@ private[parquet] object CatalystConverter {
4647
parent: CatalystConverter): Converter = {
4748
val fieldType: DataType = field.dataType
4849
fieldType match {
50+
// For native JVM types we use a converter with native arrays
51+
case ArrayType(elementType: NativeType) => {
52+
new CatalystNativeArrayConverter(elementType, fieldIndex, parent)
53+
}
54+
// This is for other types of arrays, including those with nested fields
4955
case ArrayType(elementType: DataType) => {
5056
new CatalystArrayConverter(elementType, fieldIndex, parent)
5157
}
@@ -322,8 +328,17 @@ object CatalystArrayConverter {
322328
val INITIAL_ARRAY_SIZE = 20
323329
}
324330

325-
// this is for single-element groups of primitive or complex types
326-
// Note: AvroParquet only uses arrays for primitive types (?)
331+
/**
332+
* A `parquet.io.api.GroupConverter` that converts a single-element groups that
333+
* match the characteristics of an array (see
334+
* [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
335+
* [[org.apache.spark.sql.catalyst.types.ArrayType]].
336+
*
337+
* @param elementType The type of the array elements
338+
* @param index The position of this (array) field inside its parent converter
339+
* @param parent The parent converter
340+
* @param buffer A data buffer
341+
*/
327342
private[parquet] class CatalystArrayConverter(
328343
val elementType: DataType,
329344
val index: Int,
@@ -353,7 +368,8 @@ private[parquet] class CatalystArrayConverter(
353368
// arrays have only one (repeated) field, which is its elements
354369
override val size = 1
355370

356-
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit ={
371+
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
372+
// fieldIndex is ignored (assumed to be zero but not checked)
357373
buffer += value
358374
}
359375

@@ -380,6 +396,115 @@ private[parquet] class CatalystArrayConverter(
380396
override def getCurrentRecord: Row = throw new UnsupportedOperationException
381397
}
382398

399+
private[parquet] class CatalystNativeArrayConverter[T <: NativeType](
400+
val elementType: NativeType,
401+
val index: Int,
402+
protected[parquet] val parent: CatalystConverter,
403+
protected[parquet] var capacity: Int = CatalystArrayConverter.INITIAL_ARRAY_SIZE)
404+
extends GroupConverter with CatalystConverter {
405+
406+
// similar comment as in [[Decoder]]: this should probably be in NativeType
407+
private val classTag = {
408+
val mirror = runtimeMirror(Utils.getSparkClassLoader)
409+
ClassTag[T#JvmType](mirror.runtimeClass(elementType.tag.tpe))
410+
}
411+
412+
private var buffer: Array[T#JvmType] = classTag.newArray(capacity)
413+
414+
private var elements: Int = 0
415+
416+
protected[parquet] val converter: Converter = CatalystConverter.createConverter(
417+
new CatalystConverter.FieldType(
418+
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
419+
elementType,
420+
false),
421+
fieldIndex=0,
422+
parent=this)
423+
424+
override def getConverter(fieldIndex: Int): Converter = converter
425+
426+
// arrays have only one (repeated) field, which is its elements
427+
override val size = 1
428+
429+
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit =
430+
throw new UnsupportedOperationException
431+
432+
// Overriden here to avoid auto-boxing for primitive types
433+
override protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit = {
434+
checkGrowBuffer()
435+
buffer(elements) = value.asInstanceOf[T#JvmType]
436+
elements += 1
437+
}
438+
439+
override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = {
440+
checkGrowBuffer()
441+
buffer(elements) = value.asInstanceOf[T#JvmType]
442+
elements += 1
443+
}
444+
445+
override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = {
446+
checkGrowBuffer()
447+
buffer(elements) = value.asInstanceOf[T#JvmType]
448+
elements += 1
449+
}
450+
451+
override protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit = {
452+
checkGrowBuffer()
453+
buffer(elements) = value.asInstanceOf[T#JvmType]
454+
elements += 1
455+
}
456+
457+
override protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit = {
458+
checkGrowBuffer()
459+
buffer(elements) = value.asInstanceOf[T#JvmType]
460+
elements += 1
461+
}
462+
463+
override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = {
464+
checkGrowBuffer()
465+
buffer(elements) = value.getBytes.asInstanceOf[T#JvmType]
466+
elements += 1
467+
}
468+
469+
override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit = {
470+
checkGrowBuffer()
471+
buffer(elements) = value.toStringUsingUTF8.asInstanceOf[T#JvmType]
472+
elements += 1
473+
}
474+
475+
override protected[parquet] def clearBuffer(): Unit = {
476+
elements = 0
477+
}
478+
479+
override def start(): Unit = {}
480+
481+
override def end(): Unit = {
482+
assert(parent != null)
483+
parent.updateField(
484+
index,
485+
new GenericRow {
486+
// TODO: it would be much nicer to use a view here but GenericRow requires an Array
487+
// TODO: we should avoid using GenericRow as a wrapper but [[GetField]] current
488+
// requires that
489+
override val values = buffer.slice(0, elements).map(_.asInstanceOf[Any])
490+
})
491+
clearBuffer()
492+
}
493+
494+
// Should be only called in root group converter!
495+
override def getCurrentRecord: Row = throw new UnsupportedOperationException
496+
497+
private def checkGrowBuffer(): Unit = {
498+
if (elements >= capacity) {
499+
val newCapacity = 2 * capacity
500+
val tmp: Array[T#JvmType] = classTag.newArray(newCapacity)
501+
Array.copy(buffer, 0, tmp, 0, capacity)
502+
buffer = tmp
503+
capacity = newCapacity
504+
}
505+
}
506+
}
507+
383508
// this is for multi-element groups of primitive or complex types
384509
// that have repetition level optional or required (so struct fields)
385510
private[parquet] class CatalystStructConverter(
@@ -407,7 +532,7 @@ private[parquet] class CatalystMapConverter(
407532
override protected[parquet] val parent: CatalystConverter)
408533
extends GroupConverter with CatalystConverter {
409534

410-
private val map = new mutable.HashMap[Any, Any]()
535+
private val map = new HashMap[Any, Any]()
411536

412537
private val keyValueConverter = new GroupConverter with CatalystConverter {
413538
private var currentKey: Any = null

0 commit comments

Comments
 (0)