Skip to content

Commit a8f13bb

Browse files
committed
Using Parquet writer API to do compatibility tests
1 parent f2208cd commit a8f13bb

File tree

2 files changed

+19
-36
lines changed

2 files changed

+19
-36
lines changed

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetAvroCompatibilitySuite.scala

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,9 @@ import java.util.{List => JList, Map => JMap}
2222

2323
import scala.collection.JavaConversions._
2424

25-
import org.apache.hadoop.mapreduce.Job
26-
import org.apache.parquet.avro.{AvroParquetOutputFormat, AvroWriteSupport}
27-
import org.apache.parquet.hadoop.ParquetOutputFormat
25+
import org.apache.hadoop.fs.Path
26+
import org.apache.parquet.avro.AvroParquetWriter
2827

29-
import org.apache.spark.rdd.RDD._
3028
import org.apache.spark.sql.parquet.test.avro.{Nested, ParquetAvroCompat}
3129
import org.apache.spark.sql.test.TestSQLContext
3230
import org.apache.spark.sql.{Row, SQLContext}
@@ -88,21 +86,13 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest {
8886
override protected def beforeAll(): Unit = {
8987
super.beforeAll()
9088

91-
val job = new Job()
92-
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])
93-
AvroParquetOutputFormat.setSchema(job, ParquetAvroCompat.getClassSchema)
94-
95-
sqlContext
96-
.sparkContext
97-
.parallelize(0 until 10)
98-
.map(i => (null, makeParquetAvroCompat(i)))
99-
.coalesce(1)
100-
.saveAsNewAPIHadoopFile(
101-
parquetStore.getCanonicalPath,
102-
classOf[Void],
103-
classOf[ParquetAvroCompat],
104-
classOf[ParquetOutputFormat[ParquetAvroCompat]],
105-
job.getConfiguration)
89+
val writer =
90+
new AvroParquetWriter[ParquetAvroCompat](
91+
new Path(parquetStore.getCanonicalPath),
92+
ParquetAvroCompat.getClassSchema)
93+
94+
(0 until 10).foreach(i => writer.write(makeParquetAvroCompat(i)))
95+
writer.close()
10696
}
10797

10898
test("Read Parquet file generated by parquet-avro") {

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetThriftCompatibilitySuite.scala

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ import java.util.{List => JList, Map => JMap}
2222

2323
import scala.collection.JavaConversions._
2424

25-
import org.apache.hadoop.mapreduce.Job
26-
import org.apache.parquet.hadoop.ParquetOutputFormat
27-
import org.apache.parquet.hadoop.thrift.ParquetThriftOutputFormat
25+
import org.apache.hadoop.fs.Path
26+
import org.apache.parquet.hadoop.metadata.CompressionCodecName
27+
import org.apache.parquet.thrift.ThriftParquetWriter
2828

2929
import org.apache.spark.sql.parquet.test.thrift.{Nested, ParquetThriftCompat, Suit}
3030
import org.apache.spark.sql.test.TestSQLContext
@@ -84,21 +84,14 @@ class ParquetThriftCompatibilitySuite extends ParquetCompatibilityTest {
8484
override protected def beforeAll(): Unit = {
8585
super.beforeAll()
8686

87-
val job = new Job()
88-
ParquetThriftOutputFormat.setThriftClass(job, classOf[ParquetThriftCompat])
89-
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetThriftCompat])
90-
91-
sqlContext
92-
.sparkContext
93-
.parallelize(0 until 10)
94-
.map(i => (null, makeParquetThriftCompat(i)))
95-
.coalesce(1)
96-
.saveAsNewAPIHadoopFile(
97-
parquetStore.getCanonicalPath,
98-
classOf[Void],
87+
val writer =
88+
new ThriftParquetWriter[ParquetThriftCompat](
89+
new Path(parquetStore.getCanonicalPath),
9990
classOf[ParquetThriftCompat],
100-
classOf[ParquetThriftOutputFormat[ParquetThriftCompat]],
101-
job.getConfiguration)
91+
CompressionCodecName.SNAPPY)
92+
93+
(0 until 10).foreach(i => writer.write(makeParquetThriftCompat(i)))
94+
writer.close()
10295
}
10396

10497
test("Read Parquet file generated by parquet-thrift") {

0 commit comments

Comments
 (0)