From 144c4cd62812ed9cdf4bf773c6826d0e390606b0 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 26 Aug 2015 15:58:50 +0800 Subject: [PATCH 1/2] Adds testing direct write API for Parquet --- .../parquet/ParquetCompatibilityTest.scala | 73 ++++++++++++++++--- .../ParquetThriftCompatibilitySuite.scala | 72 ++++++++++++++---- 2 files changed, 122 insertions(+), 23 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala index df68432faeeb3..c7c124baad3a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala @@ -17,11 +17,15 @@ package org.apache.spark.sql.execution.datasources.parquet -import scala.collection.JavaConverters._ +import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, seqAsJavaListConverter} +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} -import org.apache.parquet.hadoop.ParquetFileReader -import org.apache.parquet.schema.MessageType +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.hadoop.api.WriteSupport.WriteContext +import org.apache.parquet.hadoop.{ParquetFileReader, ParquetWriter} +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark.sql.QueryTest @@ -38,11 +42,10 @@ private[sql] abstract class ParquetCompatibilityTest extends QueryTest with Parq val fs = fsPath.getFileSystem(configuration) val parquetFiles = fs.listStatus(fsPath, new PathFilter { override def accept(path: Path): Boolean = pathFilter(path) - }).toSeq + }).toSeq.asJava - val footers = - ParquetFileReader.readAllFootersInParallel(configuration, parquetFiles.asJava, true) - footers.iterator().next().getParquetMetadata.getFileMetaData.getSchema + val footers = ParquetFileReader.readAllFootersInParallel(configuration, parquetFiles, true) + footers.asScala.head.getParquetMetadata.getFileMetaData.getSchema } protected def logParquetSchema(path: String): Unit = { @@ -53,8 +56,58 @@ private[sql] abstract class ParquetCompatibilityTest extends QueryTest with Parq } } -object ParquetCompatibilityTest { - def makeNullable[T <: AnyRef](i: Int)(f: => T): T = { - if (i % 3 == 0) null.asInstanceOf[T] else f +private[sql] object ParquetCompatibilityTest { + private class DirectWriteSupport( + schema: MessageType, writer: RecordConsumer => Unit, metadata: Map[String, String]) + extends WriteSupport[Void] { + + private var recordConsumer: RecordConsumer = _ + + override def init(configuration: Configuration): WriteContext = { + new WriteContext(schema, metadata.asJava) + } + + override def write(record: Void): Unit = { + writer(recordConsumer) + } + + override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { + this.recordConsumer = recordConsumer + } + } + + def writeDirect + (path: String, schema: String, metadata: Map[String, String] = Map.empty[String, String]) + (writer: RecordConsumer => Unit): Unit = { + writeDirect(path, MessageTypeParser.parseMessageType(schema), metadata)(writer) + } + + def writeDirect + (path: String, schema: MessageType, metadata: Map[String, String]) + (writer: RecordConsumer => Unit): Unit = { + val writeSupport = new DirectWriteSupport(schema, writer, metadata) + val parquetWriter = new ParquetWriter[Void](new Path(path), writeSupport) + try parquetWriter.write(null) finally parquetWriter.close() + } + + def message(f: => Unit)(implicit consumer: RecordConsumer): Unit = { + consumer.startMessage() + f + consumer.endMessage() + } + + def group(f: => Unit)(implicit consumer: RecordConsumer): Unit = { + consumer.startGroup() + f + consumer.endGroup() + } + + def field + (name: String, index: Int) + (f: => Unit) + (implicit consumer: RecordConsumer): Unit = { + consumer.startField(name, index) + f + consumer.endField(name, index) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetThriftCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetThriftCompatibilitySuite.scala index b789c5a106e56..aabc4e032c52a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetThriftCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetThriftCompatibilitySuite.scala @@ -33,11 +33,9 @@ class ParquetThriftCompatibilitySuite extends ParquetCompatibilityTest with Shar """.stripMargin) checkAnswer(sqlContext.read.parquet(parquetFilePath.toString), (0 until 10).map { i => - def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i) - val suits = Array("SPADES", "HEARTS", "DIAMONDS", "CLUBS") - Row( + val nonNullablePrimitiveValues = Seq( i % 2 == 0, i.toByte, (i + 1).toShort, @@ -50,18 +48,15 @@ class ParquetThriftCompatibilitySuite extends ParquetCompatibilityTest with Shar s"val_$i", s"val_$i", // Thrift ENUM values are converted to Parquet binaries containing UTF-8 strings - suits(i % 4), + suits(i % 4)) - nullable(i % 2 == 0: java.lang.Boolean), - nullable(i.toByte: java.lang.Byte), - nullable((i + 1).toShort: java.lang.Short), - nullable(i + 2: Integer), - nullable((i * 10).toLong: java.lang.Long), - nullable(i.toDouble + 0.2d: java.lang.Double), - nullable(s"val_$i"), - nullable(s"val_$i"), - nullable(suits(i % 4)), + val nullablePrimitiveValues = if (i % 3 == 0) { + Seq.fill(nonNullablePrimitiveValues.length)(null) + } else { + nonNullablePrimitiveValues + } + val complexValues = Seq( Seq.tabulate(3)(n => s"arr_${i + n}"), // Thrift `SET`s are converted to Parquet `LIST`s Seq(i), @@ -71,6 +66,57 @@ class ParquetThriftCompatibilitySuite extends ParquetCompatibilityTest with Shar Row(Seq.tabulate(3)(j => i + j + m), s"val_${i + m}") } }.toMap) + + Row(nonNullablePrimitiveValues ++ nullablePrimitiveValues ++ complexValues: _*) }) } + + test("SPARK-10136 list of primitive list") { + withTempPath { dir => + val path = dir.getCanonicalPath + val schema = + s"""message ListOfPrimitiveList { + | required group f (LIST) { + | repeated group f_tuple (LIST) { + | repeated int32 f_tuple_tuple; + | } + | } + |} + """.stripMargin + + writeDirect(path, schema) { implicit consumer => + (0 until 2).foreach { i => + message { + field("f", 0) { + group { + field("f_tuple", 0) { + group { + field("f_tuple_tuple", 0) { + consumer.addInteger(i + 0) + consumer.addInteger(i + 1) + } + } + + group { + field("f_tuple_tuple", 0) { + consumer.addInteger(i + 2) + consumer.addInteger(i + 3) + } + } + } + } + } + } + } + } + + logParquetSchema(path) + + checkAnswer( + sqlContext.read.parquet(path), + Seq( + Row(Seq(Seq(0, 1), Seq(2, 3))), + Row(Seq(Seq(1, 2), Seq(3, 4))))) + } + } } From 85747e47db207503f98f1ba6de114ad92a1a2587 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 26 Aug 2015 17:56:52 +0800 Subject: [PATCH 2/2] Removes implicit RecordConsumer arguments, and allows writing multiple records --- .../parquet/ParquetCompatibilityTest.scala | 83 +++++++++++-------- .../ParquetThriftCompatibilitySuite.scala | 62 ++++++++++---- 2 files changed, 91 insertions(+), 54 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala index c7c124baad3a4..91f3ce4d34c8b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala @@ -57,9 +57,32 @@ private[sql] abstract class ParquetCompatibilityTest extends QueryTest with Parq } private[sql] object ParquetCompatibilityTest { - private class DirectWriteSupport( - schema: MessageType, writer: RecordConsumer => Unit, metadata: Map[String, String]) - extends WriteSupport[Void] { + implicit class RecordConsumerDSL(consumer: RecordConsumer) { + def message(f: => Unit): Unit = { + consumer.startMessage() + f + consumer.endMessage() + } + + def group(f: => Unit): Unit = { + consumer.startGroup() + f + consumer.endGroup() + } + + def field(name: String, index: Int)(f: => Unit): Unit = { + consumer.startField(name, index) + f + consumer.endField(name, index) + } + } + + /** + * A testing Parquet [[WriteSupport]] implementation used to write manually constructed Parquet + * records with arbitrary structures. + */ + private class DirectWriteSupport(schema: MessageType, metadata: Map[String, String]) + extends WriteSupport[RecordConsumer => Unit] { private var recordConsumer: RecordConsumer = _ @@ -67,8 +90,8 @@ private[sql] object ParquetCompatibilityTest { new WriteContext(schema, metadata.asJava) } - override def write(record: Void): Unit = { - writer(recordConsumer) + override def write(recordWriter: RecordConsumer => Unit): Unit = { + recordWriter.apply(recordConsumer) } override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { @@ -76,38 +99,26 @@ private[sql] object ParquetCompatibilityTest { } } - def writeDirect - (path: String, schema: String, metadata: Map[String, String] = Map.empty[String, String]) - (writer: RecordConsumer => Unit): Unit = { - writeDirect(path, MessageTypeParser.parseMessageType(schema), metadata)(writer) - } - - def writeDirect - (path: String, schema: MessageType, metadata: Map[String, String]) - (writer: RecordConsumer => Unit): Unit = { - val writeSupport = new DirectWriteSupport(schema, writer, metadata) - val parquetWriter = new ParquetWriter[Void](new Path(path), writeSupport) - try parquetWriter.write(null) finally parquetWriter.close() - } - - def message(f: => Unit)(implicit consumer: RecordConsumer): Unit = { - consumer.startMessage() - f - consumer.endMessage() - } - - def group(f: => Unit)(implicit consumer: RecordConsumer): Unit = { - consumer.startGroup() - f - consumer.endGroup() + /** + * Writes arbitrary messages conforming to a given `schema` to a Parquet file located by `path`. + * Records are produced by `recordWriters`. + */ + def writeDirect(path: String, schema: String, recordWriters: (RecordConsumer => Unit)*): Unit = { + writeDirect(path, schema, Map.empty[String, String], recordWriters: _*) } - def field - (name: String, index: Int) - (f: => Unit) - (implicit consumer: RecordConsumer): Unit = { - consumer.startField(name, index) - f - consumer.endField(name, index) + /** + * Writes arbitrary messages conforming to a given `schema` to a Parquet file located by `path` + * with given user-defined key-value `metadata`. Records are produced by `recordWriters`. + */ + def writeDirect( + path: String, + schema: String, + metadata: Map[String, String], + recordWriters: (RecordConsumer => Unit)*): Unit = { + val messageType = MessageTypeParser.parseMessageType(schema) + val writeSupport = new DirectWriteSupport(messageType, metadata) + val parquetWriter = new ParquetWriter[RecordConsumer => Unit](new Path(path), writeSupport) + try recordWriters.foreach(parquetWriter.write) finally parquetWriter.close() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetThriftCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetThriftCompatibilitySuite.scala index aabc4e032c52a..88a3d878f97fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetThriftCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetThriftCompatibilitySuite.scala @@ -74,6 +74,12 @@ class ParquetThriftCompatibilitySuite extends ParquetCompatibilityTest with Shar test("SPARK-10136 list of primitive list") { withTempPath { dir => val path = dir.getCanonicalPath + + // This Parquet schema is translated from the following Thrift schema: + // + // struct ListOfPrimitiveList { + // 1: list> f; + // } val schema = s"""message ListOfPrimitiveList { | required group f (LIST) { @@ -84,31 +90,51 @@ class ParquetThriftCompatibilitySuite extends ParquetCompatibilityTest with Shar |} """.stripMargin - writeDirect(path, schema) { implicit consumer => - (0 until 2).foreach { i => - message { - field("f", 0) { - group { - field("f_tuple", 0) { - group { - field("f_tuple_tuple", 0) { - consumer.addInteger(i + 0) - consumer.addInteger(i + 1) - } + writeDirect(path, schema, { rc => + rc.message { + rc.field("f", 0) { + rc.group { + rc.field("f_tuple", 0) { + rc.group { + rc.field("f_tuple_tuple", 0) { + rc.addInteger(0) + rc.addInteger(1) } + } - group { - field("f_tuple_tuple", 0) { - consumer.addInteger(i + 2) - consumer.addInteger(i + 3) - } + rc.group { + rc.field("f_tuple_tuple", 0) { + rc.addInteger(2) + rc.addInteger(3) } } } } } } - } + }, { rc => + rc.message { + rc.field("f", 0) { + rc.group { + rc.field("f_tuple", 0) { + rc.group { + rc.field("f_tuple_tuple", 0) { + rc.addInteger(4) + rc.addInteger(5) + } + } + + rc.group { + rc.field("f_tuple_tuple", 0) { + rc.addInteger(6) + rc.addInteger(7) + } + } + } + } + } + } + }) logParquetSchema(path) @@ -116,7 +142,7 @@ class ParquetThriftCompatibilitySuite extends ParquetCompatibilityTest with Shar sqlContext.read.parquet(path), Seq( Row(Seq(Seq(0, 1), Seq(2, 3))), - Row(Seq(Seq(1, 2), Seq(3, 4))))) + Row(Seq(Seq(4, 5), Seq(6, 7))))) } } }