Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private[spark] object SQLConf {
val DIALECT = "spark.sql.dialect"
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"

// This is only used for the thriftserver
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
Expand Down Expand Up @@ -77,6 +78,9 @@ trait SQLConf {
/** When true tables cached using the in-memory columnar caching will be compressed. */
private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean

/** The compression codec for writing to a Parquetfile */
private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION, "snappy")

/** The number of rows that will be */
private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,13 @@ private[sql] object ParquetRelation {
// The compression type
type CompressionType = parquet.hadoop.metadata.CompressionCodecName

// The default compression
val defaultCompression = CompressionCodecName.GZIP
// The parquet compression short names
val shortParquetCompressionCodecNames = Map(
"NONE" -> CompressionCodecName.UNCOMPRESSED,
"UNCOMPRESSED" -> CompressionCodecName.UNCOMPRESSED,
"SNAPPY" -> CompressionCodecName.SNAPPY,
"GZIP" -> CompressionCodecName.GZIP,
"LZO" -> CompressionCodecName.LZO)

/**
* Creates a new ParquetRelation and underlying Parquetfile for the given LogicalPlan. Note that
Expand Down Expand Up @@ -141,9 +146,8 @@ private[sql] object ParquetRelation {
conf: Configuration,
sqlContext: SQLContext): ParquetRelation = {
val path = checkPath(pathString, allowExisting, conf)
if (conf.get(ParquetOutputFormat.COMPRESSION) == null) {
conf.set(ParquetOutputFormat.COMPRESSION, ParquetRelation.defaultCompression.name())
}
conf.set(ParquetOutputFormat.COMPRESSION, shortParquetCompressionCodecNames.getOrElse(
sqlContext.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED).name())
ParquetRelation.enableLogForwarding()
ParquetTypesConverter.writeMetaData(attributes, path, conf)
new ParquetRelation(path.toString, Some(conf), sqlContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,100 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString)
}

test("Compression options for writing to a Parquetfile") {
val defaultParquetCompressionCodec = TestSQLContext.parquetCompressionCodec
import scala.collection.JavaConversions._

val file = getTempFilePath("parquet")
val path = file.toString
val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
.map(i => TestRDDEntry(i, s"val_$i"))

// test default compression codec
rdd.saveAsParquetFile(path)
var actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)

parquetFile(path).registerTempTable("tmp")
checkAnswer(
sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
(5, "val_5") ::
(7, "val_7") :: Nil)

Utils.deleteRecursively(file)

// test uncompressed parquet file with property value "UNCOMPRESSED"
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "UNCOMPRESSED")

rdd.saveAsParquetFile(path)
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)

parquetFile(path).registerTempTable("tmp")
checkAnswer(
sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
(5, "val_5") ::
(7, "val_7") :: Nil)

Utils.deleteRecursively(file)

// test uncompressed parquet file with property value "none"
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "none")

rdd.saveAsParquetFile(path)
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
assert(actualCodec === "UNCOMPRESSED" :: Nil)

parquetFile(path).registerTempTable("tmp")
checkAnswer(
sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
(5, "val_5") ::
(7, "val_7") :: Nil)

Utils.deleteRecursively(file)

// test gzip compression codec
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "gzip")

rdd.saveAsParquetFile(path)
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)

parquetFile(path).registerTempTable("tmp")
checkAnswer(
sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
(5, "val_5") ::
(7, "val_7") :: Nil)

Utils.deleteRecursively(file)

// test snappy compression codec
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "snappy")

rdd.saveAsParquetFile(path)
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)

parquetFile(path).registerTempTable("tmp")
checkAnswer(
sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
(5, "val_5") ::
(7, "val_7") :: Nil)

Utils.deleteRecursively(file)

// TODO: Lzo requires additional external setup steps so leave it out for now
// ref.: https://github.com/Parquet/parquet-mr/blob/parquet-1.5.0/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java#L169

// Set it back.
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, defaultParquetCompressionCodec)
}

test("Read/Write All Types with non-primitive type") {
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
val range = (0 to 255)
Expand Down