Skip to content

Commit 1d390aa

Browse files
committed
Adds parquet-thrift compatibility test
1 parent 440f7b3 commit 1d390aa

File tree

8 files changed

+3751
-24
lines changed

8 files changed

+3751
-24
lines changed

sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/thrift/Nested.java

Lines changed: 552 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/thrift/ParquetThriftCompat.java

Lines changed: 2854 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/thrift/Suit.java

Lines changed: 51 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetAvroCompatibilitySuite.scala

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.sql.parquet
1919

20-
import java.io.File
2120
import java.nio.ByteBuffer
2221
import java.util.{List => JList, Map => JMap}
2322

@@ -26,19 +25,17 @@ import scala.collection.JavaConversions._
2625
import org.apache.hadoop.mapreduce.Job
2726
import org.apache.parquet.avro.{AvroParquetOutputFormat, AvroWriteSupport}
2827
import org.apache.parquet.hadoop.ParquetOutputFormat
29-
import org.scalatest.BeforeAndAfterAll
3028

3129
import org.apache.spark.rdd.RDD._
3230
import org.apache.spark.sql.parquet.test.avro.{Nested, ParquetAvroCompat}
3331
import org.apache.spark.sql.test.TestSQLContext
34-
import org.apache.spark.sql.{QueryTest, Row, SQLContext}
35-
import org.apache.spark.util.Utils
32+
import org.apache.spark.sql.{Row, SQLContext}
3633

3734
object ParquetAvroCompatibilitySuite {
35+
import ParquetCompatibilityTest.makeNullable
36+
3837
def makeParquetAvroCompat(i: Int): ParquetAvroCompat = {
39-
def nullable[T <: AnyRef](f: => T): T = {
40-
if (i % 3 == 0) null.asInstanceOf[T] else f
41-
}
38+
def nullable[T <: AnyRef] = makeNullable[T](i) _
4239

4340
def makeComplexColumn(i: Int): JMap[String, JList[Nested]] = {
4441
mapAsJavaMap(Seq.tabulate(3) { n =>
@@ -82,44 +79,35 @@ object ParquetAvroCompatibilitySuite {
8279
}
8380
}
8481

85-
class ParquetAvroCompatibilitySuite extends QueryTest with ParquetTest with BeforeAndAfterAll {
86-
import ParquetAvroCompatibilitySuite._
82+
class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest {
83+
import ParquetAvroCompatibilitySuite.makeParquetAvroCompat
84+
import ParquetCompatibilityTest.makeNullable
8785

8886
override def sqlContext: SQLContext = TestSQLContext
8987

90-
private var avroStore: File = _
91-
9288
override protected def beforeAll(): Unit = {
93-
avroStore = Utils.createTempDir(namePrefix = "parquet-avro_")
94-
avroStore.delete()
89+
super.beforeAll()
9590

9691
val job = new Job()
9792
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])
9893
AvroParquetOutputFormat.setSchema(job, ParquetAvroCompat.getClassSchema)
9994

10095
sqlContext
10196
.sparkContext
102-
.parallelize(1 until 2)
97+
.parallelize(0 until 10)
10398
.map(i => (null, makeParquetAvroCompat(i)))
10499
.coalesce(1)
105100
.saveAsNewAPIHadoopFile(
106-
avroStore.getCanonicalPath,
101+
parquetStore.getCanonicalPath,
107102
classOf[Void],
108103
classOf[ParquetAvroCompat],
109104
classOf[ParquetOutputFormat[ParquetAvroCompat]],
110105
job.getConfiguration)
111106
}
112107

113-
override protected def afterAll(): Unit = {
114-
avroStore.delete()
115-
}
116-
117108
test("Read Parquet file generated by parquet-avro") {
118-
val df = sqlContext.read.parquet(avroStore.getCanonicalPath)
119-
checkAnswer(df, (1 until 2).map { i =>
120-
def nullable[T <: AnyRef](f: => T): T = {
121-
if (i % 3 == 0) null.asInstanceOf[T] else f
122-
}
109+
checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), (0 until 10).map { i =>
110+
def nullable[T <: AnyRef] = makeNullable[T](i) _
123111

124112
Row(
125113
i % 2 == 0,
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.parquet
19+
20+
import java.io.File
21+
22+
import org.scalatest.BeforeAndAfterAll
23+
24+
import org.apache.spark.sql.test.TestSQLContext
25+
import org.apache.spark.sql.{QueryTest, SQLContext}
26+
import org.apache.spark.util.Utils
27+
28+
abstract class ParquetCompatibilityTest extends QueryTest with ParquetTest with BeforeAndAfterAll {
29+
override def sqlContext: SQLContext = TestSQLContext
30+
31+
protected var parquetStore: File = _
32+
33+
override protected def beforeAll(): Unit = {
34+
parquetStore = Utils.createTempDir(namePrefix = "parquet-compat_")
35+
parquetStore.delete()
36+
}
37+
38+
override protected def afterAll(): Unit = {
39+
Utils.deleteRecursively(parquetStore)
40+
}
41+
}
42+
43+
object ParquetCompatibilityTest {
44+
def makeNullable[T <: AnyRef](i: Int)(f: => T): T = {
45+
if (i % 3 == 0) null.asInstanceOf[T] else f
46+
}
47+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.parquet
19+
20+
import java.nio.ByteBuffer
21+
import java.util.{List => JList, Map => JMap}
22+
23+
import scala.collection.JavaConversions._
24+
25+
import org.apache.hadoop.mapreduce.Job
26+
import org.apache.parquet.hadoop.ParquetOutputFormat
27+
import org.apache.parquet.hadoop.thrift.ParquetThriftOutputFormat
28+
29+
import org.apache.spark.sql.parquet.test.thrift.{Nested, ParquetThriftCompat, Suit}
30+
import org.apache.spark.sql.test.TestSQLContext
31+
import org.apache.spark.sql.{Row, SQLContext}
32+
33+
object ParquetThriftCompatibilitySuite {
34+
def makeParquetThriftCompat(i: Int): ParquetThriftCompat = {
35+
def makeComplexColumn(i: Int): JMap[Integer, JList[Nested]] = {
36+
mapAsJavaMap(Seq.tabulate(3) { n =>
37+
(i + n: Integer) -> seqAsJavaList(Seq.tabulate(3) { m =>
38+
new Nested(
39+
seqAsJavaList(Seq.tabulate(3)(j => i + j + m)),
40+
s"val_${i + m}")
41+
})
42+
}.toMap)
43+
}
44+
45+
val value = new ParquetThriftCompat(
46+
i % 2 == 0,
47+
i.toByte,
48+
(i + 1).toShort,
49+
i + 2,
50+
i.toLong * 10,
51+
i.toDouble + 0.2d,
52+
ByteBuffer.wrap(s"val_$i".getBytes),
53+
s"val_$i",
54+
Suit.values()(i % 4),
55+
56+
seqAsJavaList(Seq.tabulate(3)(n => s"arr_${i + n}")),
57+
setAsJavaSet(Set(i)),
58+
mapAsJavaMap(Seq.tabulate(3)(n =>(i + n: Integer) -> s"val_${i + n}").toMap),
59+
makeComplexColumn(i))
60+
61+
if (i % 3 == 0) {
62+
value
63+
} else {
64+
value
65+
.setMaybeBoolColumn(i % 2 == 0)
66+
.setMaybeByteColumn(i.toByte)
67+
.setMaybeShortColumn((i + 1).toShort)
68+
.setMaybeIntColumn(i + 2)
69+
.setMaybeLongColumn(i.toLong * 10)
70+
.setMaybeDoubleColumn(i.toDouble + 0.2d)
71+
.setMaybeBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes))
72+
.setMaybeStringColumn(s"val_$i")
73+
.setMaybeEnumColumn(Suit.values()(i % 4))
74+
}
75+
}
76+
}
77+
78+
class ParquetThriftCompatibilitySuite extends ParquetCompatibilityTest {
79+
import ParquetCompatibilityTest.makeNullable
80+
import ParquetThriftCompatibilitySuite.makeParquetThriftCompat
81+
82+
override def sqlContext: SQLContext = TestSQLContext
83+
84+
override protected def beforeAll(): Unit = {
85+
super.beforeAll()
86+
87+
val job = new Job()
88+
ParquetThriftOutputFormat.setThriftClass(job, classOf[ParquetThriftCompat])
89+
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetThriftCompat])
90+
91+
sqlContext
92+
.sparkContext
93+
.parallelize(0 until 10)
94+
.map(i => (null, makeParquetThriftCompat(i)))
95+
.coalesce(1)
96+
.saveAsNewAPIHadoopFile(
97+
parquetStore.getCanonicalPath,
98+
classOf[Void],
99+
classOf[ParquetThriftCompat],
100+
classOf[ParquetThriftOutputFormat[ParquetThriftCompat]],
101+
job.getConfiguration)
102+
}
103+
104+
test("Read Parquet file generated by parquet-thrift") {
105+
checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), (0 until 10).map { i =>
106+
def nullable[T <: AnyRef] = makeNullable[T](i) _
107+
108+
Row(
109+
i % 2 == 0,
110+
i.toByte,
111+
(i + 1).toShort,
112+
i + 2,
113+
i.toLong * 10,
114+
i.toDouble + 0.2d,
115+
// Thrift `BINARY` values are actually unencoded `STRING` values, and thus are always
116+
// treated as `BINARY (UTF8)` in parquet-thrift, since parquet-thrift always assume
117+
// Thrift `STRING`s are encoded using UTF-8.
118+
s"val_$i",
119+
s"val_$i",
120+
// Thrift ENUM values are converted to Parquet binaries containing UTF-8 strings
121+
Suit.values()(i % 4).name(),
122+
123+
nullable(i % 2 == 0: java.lang.Boolean),
124+
nullable(i.toByte: java.lang.Byte),
125+
nullable((i + 1).toShort: java.lang.Short),
126+
nullable(i + 2: Integer),
127+
nullable((i * 10).toLong: java.lang.Long),
128+
nullable(i.toDouble + 0.2d: java.lang.Double),
129+
nullable(s"val_$i"),
130+
nullable(s"val_$i"),
131+
nullable(Suit.values()(i % 4).name()),
132+
133+
Seq.tabulate(3)(n => s"arr_${i + n}"),
134+
// Thrift `SET`s are converted to Parquet `LIST`s
135+
Seq(i),
136+
Seq.tabulate(3)(n =>(i + n: Integer) -> s"val_${i + n}").toMap,
137+
Seq.tabulate(3) { n =>
138+
(i + n) -> Seq.tabulate(3) { m =>
139+
Row(Seq.tabulate(3)(j => i + j + m), s"val_${i + m}")
140+
}
141+
}.toMap)
142+
})
143+
}
144+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
cd $(dirname $0)/..
19+
BASEDIR=`pwd`
20+
cd -
21+
22+
rm -rf $BASEDIR/gen-java
23+
mkdir -p $BASEDIR/gen-java
24+
25+
thrift\
26+
--gen java\
27+
-out $BASEDIR/gen-java\
28+
$BASEDIR/thrift/parquet-compat.thrift
29+
30+
avro-tools idl $BASEDIR/avro/parquet-compat.avdl > $BASEDIR/avro/parquet-compat.avpr
31+
avro-tools compile -string protocol $BASEDIR/avro/parquet-compat.avpr $BASEDIR/gen-java

0 commit comments

Comments
 (0)