Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Jul 28, 2016

What changes were proposed in this pull request?

Vectorized parquet reader now doesn't support complex types such as ArrayType, MapType and StructType. We should support it to extend the coverage of performance improvement introduced by vectorized parquet reader. This patch is to add ArrayType and StructType first.

Main changes

  • Obtain repetition and definition level information for Parquet schema

    In order to support complex types in vectorized Parquet reader, we need to use repetition and definition level information for Parquet schema which are used to encoded the structure of complex types. This PR introduces a class to capture these encoding: RepetitionDefinitionInfo. This PR also introduces few classes to capture Parquet schema structure: ParquetField, ParquetStruct, ParquetArray and ParquetMap. A new method getParquetStruct is added to ParquetSchemaConverter which is used to create a ParquetStruct object which captures the structure and metadata. The ParquetStruct has the same schema structure as the required schema used to guide Parquet reading. It is used to provide the corresponding repetition and definition levels for the fields in the required schema.

  • Attach VectorizedColumnReader to ColumnVector

    Because in flat schema each ColumnVector is actually a data column, previously the relation between VectorizedColumnReader and ColumnVector is one-by-one. Now only the ColumnVector representing a data column will have corresponding VectorizedColumnReader. Then when it is time to read batch, the ColumnVector with complex type will delegate to its child ColumnVector.

  • Implement constructing complex records in VectorizedColumnReader

    The readBatch in VectorizedColumnReader is the main method to read data into ColumnVector. Previously its behavior is simply to load required number of data according to the data type of the column vector. Now after the data is loaded into the column, we need to construct complex records in its parent column that could be an ArrayType, MapType or StructType. The way to restore the data as complex types is encoding in repetition and definition levels in Parquet. The new method constructComplexRecords in VectorizedColumnReader implements the logic to restore the complex data. Basically, what constructComplexRecords does is to count the continuous values and add array into the parent column if the repetition level value indicates a new record happens. Besides, constructComplexRecords also needs to consider the null values. Null values could mean a null record at root level, an empty array or struct. This method considers different cases and sets it correctly.

Benchmark

val N = 10000
withParquetTable((0 until N).map { i =>
  ((i to i + 1000).toList, (i to i + 100).map(_.toString).toList,
    (i to i + 1000).map(_.toDouble / 2).toList,
    ((0 to 10).map(_.toString).toList, (0 to 10).map(_.toString).toList))
}, "t") {
  val benchmark = new Benchmark("Vectorization Parquet for nested types", N)
  benchmark.addCase("Vectorization Parquet reader", 10) { iter =>
    sql("SELECT _1[10], _2[20], _3[30], _4._1[5], _4._2[5] FROM t").collect()
  }
  benchmark.run()
}

Disabled vectorization:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz
Vectorization Parquet for nested types:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Vectorization Parquet reader                  1706 / 2207          0.0      170580.8       1.0X

Enabled vectorization:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz
Vectorization Parquet for nested types:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Vectorization Parquet reader                   789 /  972          0.0       78919.4       1.0X

How was this patch tested?

Jenkins tests.

@SparkQA
Copy link

SparkQA commented Jul 28, 2016

Test build #62957 has finished for PR 14388 at commit 8cfeb7e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maver1ck
Copy link
Contributor

maver1ck commented Aug 3, 2016

@viirya
I tried to test your patch on my production workflow.
Getting:

Py4JJavaError: An error occurred while calling o56.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 1.0 failed 1 times, most recent failure: Lost task 20.0 in stage 1.0 (TID 21, 188.165.13.157): java.lang.ArrayIndexOutOfBoundsException: 4096
    at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putIntsLittleEndian(OnHeapColumnVector.java:221)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readIntegers(VectorizedPlainValuesReader.java:68)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readIntegers(VectorizedRleValuesReader.java:189)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readIntBatch(VectorizedColumnReader.java:388)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:247)
    at org.apache.spark.sql.execution.vectorized.ColumnVector.readBatch(ColumnVector.java:1094)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.readBatchOnColumnVector(VectorizedParquetRecordReader.java:263)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.readBatchOnColumnVector(VectorizedParquetRecordReader.java:266)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:251)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:138)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:36)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:128)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1867)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1880)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1893)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1907)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:899)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:898)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2217)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2216)
    at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:2216)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:211)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 4096
    at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putIntsLittleEndian(OnHeapColumnVector.java:221)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readIntegers(VectorizedPlainValuesReader.java:68)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readIntegers(VectorizedRleValuesReader.java:189)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readIntBatch(VectorizedColumnReader.java:388)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:247)
    at org.apache.spark.sql.execution.vectorized.ColumnVector.readBatch(ColumnVector.java:1094)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.readBatchOnColumnVector(VectorizedParquetRecordReader.java:263)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.readBatchOnColumnVector(VectorizedParquetRecordReader.java:266)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:251)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:138)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:36)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:128)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    ... 1 more

@viirya
Copy link
Member Author

viirya commented Aug 3, 2016

@maver1ck Thanks for reporting this! I will take a look. Can you show me what the schema you test and what the data looks like? Thanks.

viirya added 2 commits August 12, 2016 14:55
…t-complex-type

Conflicts:
	sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@viirya
Copy link
Member Author

viirya commented Aug 12, 2016

Hi @maver1ck Can you try the latest changes on your production workflow? Thank you!

@maver1ck
Copy link
Contributor

@viirya
I will after the weekend.

@SparkQA
Copy link

SparkQA commented Aug 12, 2016

Test build #63669 has finished for PR 14388 at commit 9bae60f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 12, 2016

Test build #63677 has finished for PR 14388 at commit d0d7230.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Aug 12, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Aug 12, 2016

Test build #63688 has finished for PR 14388 at commit d0d7230.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Aug 22, 2016

@maver1ck Any results about the test? Thank you.

@viirya
Copy link
Member Author

viirya commented Aug 29, 2016

ping @maver1ck

@mallman
Copy link
Contributor

mallman commented Aug 29, 2016

@viirya If I do a simple select on an array field it works, but if I add an order by clause which orders by the array column I get exceptions like

16/08/29 21:47:01 ERROR Executor: Exception in task 12.0 in stage 11.0 (TID 53)
java.lang.ArrayIndexOutOfBoundsException: 4096
    at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:401)
    at org.apache.spark.sql.execution.vectorized.ColumnVector.putByteArray(ColumnVector.java:578)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.decodeDictionaryIds(VectorizedColumnReader.java:342)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:236)
    at org.apache.spark.sql.execution.vectorized.ColumnVector.readBatch(ColumnVector.java:1101)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.readBatchOnColumnVector(VectorizedParquetRecordReader.java:263)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.readBatchOnColumnVector(VectorizedParquetRecordReader.java:266)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:251)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:138)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:36)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:97)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:134)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:97)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
    at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:628)
    at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
    at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1393)
    at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1390)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/08/29 21:47:01 ERROR Executor: Exception in task 1.0 in stage 11.0 (TID 42)
java.lang.ArrayIndexOutOfBoundsException: 4096
    at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:401)
    at org.apache.spark.sql.execution.vectorized.ColumnVector.putByteArray(ColumnVector.java:578)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.decodeDictionaryIds(VectorizedColumnReader.java:342)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:236)
    at org.apache.spark.sql.execution.vectorized.ColumnVector.readBatch(ColumnVector.java:1101)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.readBatchOnColumnVector(VectorizedParquetRecordReader.java:263)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.readBatchOnColumnVector(VectorizedParquetRecordReader.java:266)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:251)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:138)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:36)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:97)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
    at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:670)
    at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
    at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1393)
    at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1390)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

@viirya
Copy link
Member Author

viirya commented Aug 30, 2016

@mallman Thanks for reporting this. It is helpful. I will investigate it.

@viirya
Copy link
Member Author

viirya commented Aug 30, 2016

@mallman I ran a simple test, but can't reproduce the issue. The following benchmark codes do select an array column and add an order by clause on it. Can you give me an example code to reproduce it? Thanks.

val N = 10000
withParquetTable((0 until N).map { i =>
  ((i to i + 1000).toList, (i to i + 100).map(_.toString).toList,
    (i to i + 1000).map(_.toDouble / 2).toList,
    ((0 to 10).map(_.toString).toList, (0 to 10).map(_.toString).toList))
}, "t") {
  val benchmark = new Benchmark("Vectorization Parquet for nested types", N)
  benchmark.addCase("Vectorization Parquet reader", 10) { iter =>
    sql("SELECT _1 FROM t ORDER BY _1").collect()
  }
  benchmark.run()

@mallman
Copy link
Contributor

mallman commented Aug 30, 2016

@viirya I'll see what I can do. If nothing else, I may be able to share a private data file over S3 if you promise not to share it with anyone else.

@viirya
Copy link
Member Author

viirya commented Aug 31, 2016

@mallman Thanks! I promise not to share it with others.

@mallman
Copy link
Contributor

mallman commented Aug 31, 2016

@viirya I sent you an email with a link to a test file to your public github e-mail address.

@viirya
Copy link
Member Author

viirya commented Sep 1, 2016

@mallman Thanks. I will not share that file.

@mallman
Copy link
Contributor

mallman commented Sep 11, 2016

@viirya Any progress on this?

@viirya
Copy link
Member Author

viirya commented Sep 12, 2016

@mallman Not yet. I am working on another PR recently. I will return back when that is solved.

@viirya
Copy link
Member Author

viirya commented Oct 6, 2016

This change seems not easy to maintain. I would like to close this for now. Maybe open later.

@viirya viirya closed this Oct 6, 2016
@viirya viirya deleted the vectorized-parquet-complex-type branch December 27, 2023 18:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants