Skip to content

Conversation

@rtreffer
Copy link
Contributor

This is my current WIP on SPARK-4176. It should be compatible with other implementations of parquet.

https://github.com/Parquet/parquet-format/blob/master/LogicalTypes.md#decimal

For byte arrays, binary and fixed, the unscaled number must be encoded as
two's complement using big-endian byte order
(the most significant byte is the zeroth element)

This is the default encoding on bigint. It should thus be compatible with other implementations, although it would be great if s.o. could test this.

I've tested this locally with powers of 2 up to 2^200 in the spark shell, without errors but

Code I've used for (local) testing (on spark shell):

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.io.File

def decimalList(n : Int) = {
  require(n > 0)
  val one = Decimal(1)
  val two = Decimal(2)
  one.changePrecision(n,0)
  two.changePrecision(n,0)
  val list = scala.collection.mutable.ArrayBuffer(one)
  while (list.length < n) {
    val v = list(list.length - 1)
    list += Decimal(v.toJavaBigDecimal.multiply(two.toJavaBigDecimal),n,0)
  }
  list.toList
}

def decimalRows(l : List[Decimal]) = l.zipWithIndex.map(e => Row(e._2,e._1))

def decimalRowRdd(l : List[Decimal]) = sc.parallelize(decimalRows(l))

def df(n : Int) = {
  val data = decimalList(n)
  val len = data.lastOption.get.toString.length
  val schema = StructType(Array(
    StructField("id", IntegerType, true),
    StructField("value", DecimalType(len,0), true)))
  sqlContext.createDataFrame(decimalRowRdd(data), schema)
}

def delete(filename : String) : Unit = {
  val f = new File(filename)
  if (!f.exists) return
  if (f.isDirectory) f.listFiles.foreach(_.delete)
  f.delete
}

def test(n : Int) = {
  val src = df(n)
  delete("/tmp/typetest")
  src.save("/tmp/typetest")
  val copy = sqlContext.load("/tmp/typetest")
  src.collect().sortBy(_.getInt(0)).zip(copy.collect().sortBy(_.getInt(0))).foreach(e => {
    if (e._1 != e._2) println(s"${e._1} != ${e._2}")
  })
  delete("/tmp/typetest")
  println(s"Tested 1..2^${n - 1}")
}

@rtreffer
Copy link
Contributor Author

Note: I came across https://issues.apache.org/jira/browse/SPARK-8342 while testing, it seems like Decimal math is unsafe at the moment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type of d is (Int, Int). So I think it already contains precision and scale, why to add s?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It caused a warning on my system. So I thought it would be better to make it explicit.

I could drop that line from this patch, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be correct. Scala pattern extractors use tuples if they want to return multiple values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As said it was only about a warning, not about correctness. I'll drop this change on the next version, it draws too much attention and is not needed.

@rtreffer rtreffer force-pushed the spark-4176-store-large-decimal-in-parquet branch from 32630df to 8f6445c Compare June 15, 2015 11:26
@marmbrus
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Jun 17, 2015

Test build #35062 has finished for PR 6796 at commit 8f6445c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rtreffer rtreffer force-pushed the spark-4176-store-large-decimal-in-parquet branch from 8f6445c to 7310902 Compare June 18, 2015 08:36
@SparkQA
Copy link

SparkQA commented Jun 18, 2015

Test build #35115 has finished for PR 6796 at commit 7310902.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

Thanks for working on this! Did a quick pass and it looks pretty good. I'll let @liancheng do a more complete review. Does SPARK-8342 block merging this or can you remove the WIP tag now?

@rtreffer
Copy link
Contributor Author

The only reason for the WIP is that I have not yet cross-tested the interoperability with e.g. hive. It follows the spec, but I'd like to test it (or have s.o. verify this).

@marmbrus
Copy link
Contributor

Ah cool, it might even be a good idea to check in small files that are created with other systems for these kinds of tests.

@rtreffer
Copy link
Contributor Author

I was thinking about this: Create a small parquet file with spark, load it with hive, copy it to a new parquet table with hive, read that with spark.

If that matches the input -> win. Otherwise -> some more work.

PS: SPARK-8342 / SPARK-8359 were only problems during my initial tests. It's a bit harder to test read/write of Decimal if the implementation has bugs. So those are unrelated to this patch, but they might reduce the usefulness of this patch (you can't do reliable math in the ranges you could now load/save)

@marmbrus
Copy link
Contributor

Are you thinking about doing this as part of the test or doing it manually? Right now parquet and its tests have no hive dependencies, which I think is good. But I would definitely like to have a test that reads a file that was written by hive/impala/etc (perhaps created manually and checked in).

/cc @liancheng who has also been working on parquet interop. This could also maybe come as a follow-up PR if we want to add the interop tests in one go.

@rtreffer
Copy link
Contributor Author

Yes, manually. I could add the file I was writing afterwards, sounds like a good idea.

@rtreffer rtreffer changed the title [SPARK-4176][WIP] Support decimal types with precision > 18 in parquet [SPARK-4176] Support decimal types with precision > 18 in parquet Jun 19, 2015
@rtreffer
Copy link
Contributor Author

Just did a test with hive, I can declare a parquet file written with spark as

CREATE EXTERNAL TABLE ptest (id INT, value DECIMAL(30,0))
STORED AS PARQUET
LOCATION "file:///home/rtreffer/work/hadoop/parquet";

and it does work. I'm now trying to test the opposite direction plus a test case.

I've also dropped the WIP. Doesn't make sense anymore.

@rtreffer
Copy link
Contributor Author

(hive 1.2.0 and hadoop 2.7.0 without hdfs or a cluster)

@rtreffer
Copy link
Contributor Author

Ok, it looks like I can't open hive generated parquet files, but it looks more like a type error.

scala> val hive = sqlContext.load("/home/rtreffer/work/hadoop/hive-parquet")
warning: there was one deprecation warning; re-run with -deprecation for details
15/06/19 22:03:26 INFO ParquetFileReader: Initiating action with parallelism: 5
hive: org.apache.spark.sql.DataFrame = [id: int, value: decimal(30,0)]

scala> hive.collect.foreach(println)
15/06/19 22:03:35 INFO BlockManagerInfo: Removed broadcast_8_piece0 on localhost:42189 in memory (size: 2.4 KB, free: 265.1 MB)
15/06/19 22:03:35 INFO BlockManagerInfo: Removed broadcast_9_piece0 on localhost:42189 in memory (size: 2.4 KB, free: 265.1 MB)
15/06/19 22:03:35 INFO MemoryStore: ensureFreeSpace(130208) called with curMem=0, maxMem=278019440
15/06/19 22:03:35 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 127.2 KB, free 265.0 MB)
15/06/19 22:03:35 INFO MemoryStore: ensureFreeSpace(14082) called with curMem=130208, maxMem=278019440
15/06/19 22:03:35 INFO MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 13.8 KB, free 265.0 MB)
15/06/19 22:03:35 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on localhost:42189 (size: 13.8 KB, free: 265.1 MB)
15/06/19 22:03:35 INFO SparkContext: Created broadcast 10 from collect at <console>:38
15/06/19 22:03:35 INFO SparkContext: Starting job: collect at <console>:38
15/06/19 22:03:35 INFO DAGScheduler: Got job 9 (collect at <console>:38) with 1 output partitions (allowLocal=false)
15/06/19 22:03:35 INFO DAGScheduler: Final stage: ResultStage 11(collect at <console>:38)
15/06/19 22:03:35 INFO DAGScheduler: Parents of final stage: List()
15/06/19 22:03:35 INFO DAGScheduler: Missing parents: List()
15/06/19 22:03:35 INFO DAGScheduler: Submitting ResultStage 11 (MapPartitionsRDD[45] at collect at <console>:38), which has no missing parents
15/06/19 22:03:35 INFO MemoryStore: ensureFreeSpace(5568) called with curMem=144290, maxMem=278019440
15/06/19 22:03:35 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 5.4 KB, free 265.0 MB)
15/06/19 22:03:35 INFO MemoryStore: ensureFreeSpace(2964) called with curMem=149858, maxMem=278019440
15/06/19 22:03:35 INFO MemoryStore: Block broadcast_11_piece0 stored as bytes in memory (estimated size 2.9 KB, free 265.0 MB)
15/06/19 22:03:36 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory on localhost:42189 (size: 2.9 KB, free: 265.1 MB)
15/06/19 22:03:36 INFO SparkContext: Created broadcast 11 from broadcast at DAGScheduler.scala:893
15/06/19 22:03:36 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 11 (MapPartitionsRDD[45] at collect at <console>:38)
15/06/19 22:03:36 INFO TaskSchedulerImpl: Adding task set 11.0 with 1 tasks
15/06/19 22:03:36 INFO TaskSetManager: Starting task 0.0 in stage 11.0 (TID 36, localhost, PROCESS_LOCAL, 1471 bytes)
15/06/19 22:03:36 INFO Executor: Running task 0.0 in stage 11.0 (TID 36)
15/06/19 22:03:36 INFO ParquetRelation2$$anonfun$buildScan$1$$anon$1: Input split: ParquetInputSplit{part: file:/home/rtreffer/work/hadoop/hive-parquet/000000_0 start: 0 end: 874 length: 874 hosts: []}
15/06/19 22:03:36 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 33 records.
15/06/19 22:03:36 INFO InternalParquetRecordReader: at row 0. reading next block
15/06/19 22:03:36 INFO InternalParquetRecordReader: block read in memory in 0 ms. row count = 33
15/06/19 22:03:36 ERROR Executor: Exception in task 0.0 in stage 11.0 (TID 36)
org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/home/rtreffer/work/hadoop/hive-parquet/000000_0
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
        at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
        at org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:163)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$class.foreach(Iterator.scala:750)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:295)
        at scala.collection.AbstractIterator.to(Iterator.scala:1202)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:287)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1202)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:274)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1202)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.parquet.io.ParquetDecodingException: The requested schema is not compatible with the file schema. incompatible types: optional binary value (DECIMAL(30,0)) != optional fixed_len_byte_array(13) value (DECIMAL(30,0))
        at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:106)
        at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:98)
        at org.apache.parquet.schema.PrimitiveType.accept(PrimitiveType.java:389)
        at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:88)
        at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:62)
        at org.apache.parquet.schema.MessageType.accept(MessageType.java:58)
        at org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:149)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
        ... 28 more
15/06/19 22:03:36 WARN TaskSetManager: Lost task 0.0 in stage 11.0 (TID 36, localhost): org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/home/rtreffer/work/hadoop/hive-parquet/000000_0
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
        at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
        at org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:163)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$class.foreach(Iterator.scala:750)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:295)
        at scala.collection.AbstractIterator.to(Iterator.scala:1202)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:287)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1202)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:274)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1202)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.parquet.io.ParquetDecodingException: The requested schema is not compatible with the file schema. incompatible types: optional binary value (DECIMAL(30,0)) != optional fixed_len_byte_array(13) value (DECIMAL(30,0))
        at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:106)
        at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:98)
        at org.apache.parquet.schema.PrimitiveType.accept(PrimitiveType.java:389)
        at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:88)
        at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:62)
        at org.apache.parquet.schema.MessageType.accept(MessageType.java:58)
        at org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:149)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
        ... 28 more

15/06/19 22:03:36 ERROR TaskSetManager: Task 0 in stage 11.0 failed 1 times; aborting job
15/06/19 22:03:36 INFO TaskSchedulerImpl: Removed TaskSet 11.0, whose tasks have all completed, from pool
15/06/19 22:03:36 INFO TaskSchedulerImpl: Cancelling stage 11
15/06/19 22:03:36 INFO DAGScheduler: ResultStage 11 (collect at <console>:38) failed in 0.021 s
15/06/19 22:03:36 INFO DAGScheduler: Job 9 failed: collect at <console>:38, took 0.033099 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 36, localhost): org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/home/rtreffer/work/hadoop/hive-parquet/000000_0
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
        at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
        at org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:163)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$class.foreach(Iterator.scala:750)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:295)
        at scala.collection.AbstractIterator.to(Iterator.scala:1202)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:287)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1202)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:274)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1202)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.parquet.io.ParquetDecodingException: The requested schema is not compatible with the file schema. incompatible types: optional binary value (DECIMAL(30,0)) != optional fixed_len_byte_array(13) value (DECIMAL(30,0))
        at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:106)
        at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:98)
        at org.apache.parquet.schema.PrimitiveType.accept(PrimitiveType.java:389)
        at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:88)
        at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:62)
        at org.apache.parquet.schema.MessageType.accept(MessageType.java:58)
        at org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:149)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
        ... 28 more

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1285)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1276)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1275)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1275)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:749)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:749)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:749)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1484)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1445)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Hm, could be that the spark decoder is too strict. There are various ways to encode DECIMAL(30), and it looks like hive chooses fixed_len arrays, while I prefer variable length arrays. Have to double check that.

@rtreffer
Copy link
Contributor Author

I've pushed the hive generated parquet file and I'll call it a day.

I think I'll have to relax the validation of column types for DECIMAL.

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35317 has finished for PR 6796 at commit b8f2f2d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rtreffer
Copy link
Contributor Author

Ok, I think I'm slowly getting down to the cause....
strictTypeChecking on https://github.com/Parquet/parquet-mr/blob/master/parquet-column/src/main/java/parquet/io/ColumnIOFactory.java#L100 creates the exception

The relevant class for the job setup is ParquetTableScan (doExecute).

I'm not yet sure if this can be fixed on the job setup or on the receiver side.

@rtreffer
Copy link
Contributor Author

The problematic line is
https://github.com/apache/spark/blob/v1.4.0/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala#L105
(calling ParquetTypesConverter.convertFromAttributes)
This causes the schema to be normalized, but that does not work for decimal.

message hive_schema {
  optional int32 id;
  optional fixed_len_byte_array(13) value (DECIMAL(30,0));
}

is replaced by

message root {
  optional int32 id;
  optional binary value (DECIMAL(30,0));
}

I removed that line and the loading of the data works.

@rtreffer rtreffer changed the title [SPARK-4176] Support decimal types with precision > 18 in parquet [SPARK-4176][WIP] Support decimal types with precision > 18 in parquet Jun 20, 2015
@rtreffer
Copy link
Contributor Author

I've pushed a very early version of a fix. (Literally early, it's nearly 1:00 am. And I'd expect the test build to fail, I'll fix the outstanding issues later today)

PS: Loading the hive parquet works now, but I've not yet tested much more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These could be

parquetSchema.filter(_.containsField(name)).map(_.getType(name))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, an early version had that, I somehow moved to this verbose code O.o

Thanks

Am 21. Juni 2015 02:33:15 MESZ, schrieb Davies Liu [email protected]:

   toThriftSchemaNames: Boolean = false): ParquetType = {
  • val parquetElementTypeBySchema =

These could be

parquetSchema.filter(_.containsField(name)).map(_.getType(name))

Reply to this email directly or view it on GitHub:
https://github.com/apache/spark/pull/6796/files#r32889401

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also performs a type check / conversion. That's why I've removed it. It would look like this

val parquetElementTypeBySchema =
  parquetSchema.filter(_.isInstanceOf[ParquetGroupType]).filter(_.containsField(name)).map(_.getType(name))

I would settle on collect, does that look ok?

  val parquetElementTypeBySchema = parquetSchema.collect {
    case gType : ParquetGroupType if (gType.containsField(name)) => gType.getType(name)
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is better

@SparkQA
Copy link

SparkQA commented Jun 21, 2015

Test build #35373 has finished for PR 6796 at commit 464d24e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should int32, int64 if possible, see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal

DECIMAL can be used to annotate the following types:

int32: for 1 <= precision <= 9
int64: for 1 <= precision <= 18; precision <= 10 will produce a warning
fixed_len_byte_array: precision is limited by the array size. Length n can store <= floor(log_10(2^(8*n - 1) - 1)) base-10 digits
binary: precision is not limited, but is required. The minimum number of bytes to store the unscaled value should be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would we gain by encoding it that way?
We use a minimal length fixed byte array which should provide a similar compact encoding. (DECIMAL(9) should end up as 4 bytes, and smaller decimal values should take even less space)

Decoding is a different story, though.

PS: I was focusing on DECIMAL with precision >=19. Shouldn't small decimal handling be a new ticket?

Am 21. Juni 2015 02:41:44 MESZ, schrieb Davies Liu [email protected]:

@@ -229,11 +231,15 @@ private[parquet] object ParquetTypesConverter
extends Logging {
case LongType =>
Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT64))
case TimestampType =>
Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT96))
case DecimalType.Fixed(precision, scale) if precision <= 18 =>

  •  // TODO: for now, our writer only supports decimals that fit
    
    in a Long

Some(ParquetTypeInfo(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,

We should int32, int64 if possible, see
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal

DECIMAL can be used to annotate the following types:

int32: for 1 <= precision <= 9
int64: for 1 <= precision <= 18; precision <= 10 will produce a warning
fixed_len_byte_array: precision is limited by the array size. Length n
can store <= floor(log_10(2^(8*n - 1) - 1)) base-10 digits
binary: precision is not limited, but is required. The minimum number
of bytes to store the unscaled value should be used.

Reply to this email directly or view it on GitHub:
https://github.com/apache/spark/pull/6796/files#r32889441

Diese Nachricht wurde von meinem Android-Mobiltelefon mit K-9 Mail gesendet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using int32 and int64 makes encoding and decoding faster since they don't introduce boxing costs. But I agree that should be made in another PR.

@rtreffer rtreffer force-pushed the spark-4176-store-large-decimal-in-parquet branch from 464d24e to f973b58 Compare June 21, 2015 20:00
@SparkQA
Copy link

SparkQA commented Jun 21, 2015

Test build #35414 has finished for PR 6796 at commit f973b58.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rtreffer rtreffer force-pushed the spark-4176-store-large-decimal-in-parquet branch from f973b58 to 8ff6603 Compare June 23, 2015 12:36
@SparkQA
Copy link

SparkQA commented Jun 23, 2015

Test build #35545 has finished for PR 6796 at commit 8ff6603.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rtreffer
Copy link
Contributor Author

Hi @liancheng, thank you for the thorough review, will push a reworked version soon. Everything sounds reasonable :-)

With "private" Settings I meant that I can't change the setting in the shell because it's marked as "isPublic = false" in https://github.com/liancheng/spark/blob/2a2062d3f530ecd26e75b306aee42761d67d8724/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala#L273

I'm not sure if that's intended.

@liancheng
Copy link
Contributor

@rtreffer Yeah, it's intended. As explained above, this feature flag must be set to false for now because the write path hasn't been refactored to respect the Parquet format spec. If we turn this on, CatalystSchemaConverter will generate standard Parquet schema while the write path still writes data conforming to the old legacy format, which leads to data corruption.

@liancheng
Copy link
Contributor

Hey @rtreffer, just want to make sure whether you are still working on this? I'm asking because I just opened #7231 to refactor Parquet read path for interoperability and backwards-compatibility, which also touches the decimal parts. I believe the new CatalystDecimalConverter already covers the read path of decimals with precision > 18, which means this PR can be further simplified. Just in case you don't have time to continue this PR, I'm happy to fork your branch and get it merged (will still list you as the main author).

@rtreffer
Copy link
Contributor Author

rtreffer commented Jul 7, 2015

Hi @liancheng,

I'm rebasing on you PR right now. I can work for ~1-2h / day on this PR so feel free to take over the PR if this blocks anything.

@rtreffer rtreffer force-pushed the spark-4176-store-large-decimal-in-parquet branch from 5fe321e to e6dad45 Compare July 7, 2015 18:36
@SparkQA
Copy link

SparkQA commented Jul 7, 2015

Test build #36702 has finished for PR 6796 at commit e6dad45.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rtreffer rtreffer force-pushed the spark-4176-store-large-decimal-in-parquet branch from e6dad45 to 7a57c16 Compare July 7, 2015 18:43
@rtreffer
Copy link
Contributor Author

rtreffer commented Jul 7, 2015

The writeDecimal method is rather ugly, and the write path needs to know if we follow parquet style or not as this implies a different encoding (addInteger / addLong).

@SparkQA
Copy link

SparkQA commented Jul 7, 2015

Test build #36703 has finished for PR 6796 at commit 7a57c16.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rtreffer rtreffer force-pushed the spark-4176-store-large-decimal-in-parquet branch from 7a57c16 to 1152721 Compare July 12, 2015 17:02
@SparkQA
Copy link

SparkQA commented Jul 12, 2015

Test build #37097 has finished for PR 6796 at commit 1152721.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rtreffer rtreffer force-pushed the spark-4176-store-large-decimal-in-parquet branch from 1152721 to 3e30bdf Compare July 12, 2015 17:42
@SparkQA
Copy link

SparkQA commented Jul 12, 2015

Test build #37099 has finished for PR 6796 at commit 3e30bdf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rtreffer rtreffer force-pushed the spark-4176-store-large-decimal-in-parquet branch from 3e30bdf to c8d4d6c Compare July 13, 2015 07:44
@SparkQA
Copy link

SparkQA commented Jul 13, 2015

Test build #37130 has finished for PR 6796 at commit c8d4d6c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 13, 2015

Test build #1055 has finished for PR 6796 at commit 3e30bdf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rtreffer rtreffer force-pushed the spark-4176-store-large-decimal-in-parquet branch from c8d4d6c to 1703c26 Compare July 13, 2015 14:57
@SparkQA
Copy link

SparkQA commented Jul 13, 2015

Test build #37143 has finished for PR 6796 at commit 1703c26.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rtreffer rtreffer force-pushed the spark-4176-store-large-decimal-in-parquet branch 2 times, most recently from 06d337a to 83ca029 Compare July 13, 2015 18:20
…quets fixed_byte_array

Parquet defines multiple ways to store decimals. This patch enables
the reading of all variations as well as writing decimals in the
smallest fixed-length container possible (INT32, INT64, FIXED_LEN_BYTE_ARRAY).
@rtreffer rtreffer force-pushed the spark-4176-store-large-decimal-in-parquet branch from 83ca029 to 1dad677 Compare July 13, 2015 18:27
@SparkQA
Copy link

SparkQA commented Jul 13, 2015

Test build #37146 has finished for PR 6796 at commit 83ca029.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 13, 2015

Test build #37147 has finished for PR 6796 at commit 1dad677.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor

@rtreffer Since #7455 supersedes this PR, would you mind to close this one?

@rtreffer
Copy link
Contributor Author

@liancheng sure, I just wasn't sure if it should be closed :-)

@rtreffer rtreffer closed this Jul 18, 2015
asfgit pushed a commit that referenced this pull request Jul 27, 2015
This PR is based on #6796 authored by rtreffer.

To support large decimal precisions (> 18), we do the following things in this PR:

1. Making `CatalystSchemaConverter` support large decimal precision

   Decimal types with large precision are always converted to fixed-length byte array.

2. Making `CatalystRowConverter` support reading decimal values with large precision

   When the precision is > 18, constructs `Decimal` values with an unscaled `BigInteger` rather than an unscaled `Long`.

3. Making `RowWriteSupport` support writing decimal values with large precision

   In this PR we always write decimals as fixed-length byte array, because Parquet write path hasn't been refactored to conform Parquet format spec (see SPARK-6774 & SPARK-8848).

Two follow-up tasks should be done in future PRs:

- [ ] Writing decimals as `INT32`, `INT64` when possible while fixing SPARK-8848
- [ ] Adding compatibility tests as part of SPARK-5463

Author: Cheng Lian <[email protected]>

Closes #7455 from liancheng/spark-4176 and squashes the following commits:

a543d10 [Cheng Lian] Fixes errors introduced while rebasing
9e31cdf [Cheng Lian] Supports decimals with precision > 18 for Parquet
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants