Skip to content

Conversation

@MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Jul 22, 2018

What changes were proposed in this pull request?

In the PR, I added new option for Avro datasource - compression. The option allows to specify compression codec for saved Avro files. This option is similar to compression option in another datasources like JSON and CSV.

Also I added the SQL configs spark.sql.avro.compression.codec and spark.sql.avro.deflate.level. I put the configs into SQLConf. If the compression option is not specified by an user, the first SQL config is taken into account.

How was this patch tested?

I added new test which read meta info from written avro files and checks avro.codec property.

@SparkQA
Copy link

SparkQA commented Jul 22, 2018

Test build #93404 has finished for PR 21837 at commit c5802df.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Jul 22, 2018

jenkins, retest this, please

@SparkQA
Copy link

SparkQA commented Jul 22, 2018

Test build #93405 has finished for PR 21837 at commit f8b580b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

.avro(snappyDir)

val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir))
val deflateSize = FileUtils.sizeOfDirectory(new File(deflateDir))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any or easy way to check the metadata for compression level?

@SparkQA
Copy link

SparkQA commented Jul 22, 2018

Test build #93408 has finished for PR 21837 at commit f8b580b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


case "deflate" =>
val deflateLevel = spark.conf.get(
AVRO_DEFLATE_LEVEL, Deflater.DEFAULT_COMPRESSION.toString).toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deflater.DEFAULT_COMPRESSION is -1 here. Why change the default value to -6 in SQLConf?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it because I didn't have any ideas what -1 means. Is it closer to best compression 9 or fast compression 1? Probably the level is eventually set in zlib in which -1 means 6. @gengliangwang From your point of view, -1 means better or faster compression?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the compression algorithm changes in the feature, the default value may not be 6. But if we use DEFAULT_COMPRESSION, it will still be equivalent to the new default value. So I suggest we still use Deflater.DEFAULT_COMPRESSION instead of any specific number.

val deflateDir = s"$dir/deflate"
val snappyDir = s"$dir/snappy"

val df = spark.read.avro(testAvro)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am removing all the .avro method: #21841
Could you change it to ".format("avro").load" or ".format("avro").save"? Thanks!

@MaxGekk
Copy link
Member Author

MaxGekk commented Jul 23, 2018

is there any or easy way to check the metadata for compression level?

@HyukjinKwon I am not sure the level exists in the metadata. At least avro-tools doesn't show it.

java -jar ~/avro-tools-1.7.7.jar getmeta ./part-00000-780d0d02-ec8c-407e-8427-1790b81a3726-c000.avro
avro.schema	{"type":"record","name":"topLevelRecord","namespace":".topLevelRecord","fields": ...}
avro.codec	deflate

@SparkQA
Copy link

SparkQA commented Jul 24, 2018

Test build #93461 has finished for PR 21837 at commit d21d3e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

.save(snappyDir)

val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir))
val deflateSize = FileUtils.sizeOfDirectory(new File(deflateDir))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @MaxGekk. Can we then check the type of compression at least avro.codec deflate?

* config `spark.sql.avro.deflate.level` is used by default. For other compressions, the default
* value is `6`.
*/
val compressionLevel: Int = {
Copy link
Member

@HyukjinKwon HyukjinKwon Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we don't expose this as an option for now? IIUC, this compression level only applies to deflate, right? Also, this option looks not for keeping the same options from the thrid party as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the option keeping in mind other compression codecs can be added in the future, for example zstandard. For those codecs, the level could be useful too. Another point is specifying compression level together with compression codec in Avro options looks more natural comparing to SQL global settings:

df.write
  .options(Map("compression" -> "deflate", "compressionLevel" -> "9"))
  .format("avro")
  .save(deflateDir)

vs

spark.conf.set("spark.sql.avro.deflate.level", "9")
df.write
  .option("compression", "deflate"))
  .format("avro")
  .save(deflateDir)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I know that could be useful in some ways but I was thinking we should better not add this just for now. Thing is, it sounds currently too specific to one compression option in Avro for now .. There are many options to expose in, for example in CSV datasource too in this way ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, to be honest, I wonder users would want to change compression level often ..

val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec")
.doc("Compression codec used in writing of AVRO files.")
.stringConf
.createWithDefault("snappy")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we .checkValues(Set( too?

val AVRO_DEFLATE_LEVEL = buildConf("spark.sql.avro.deflate.level")
.doc("Compression level for the deflate codec used in writing of AVRO files. " +
"Valid value must be in the range of from 1 to 9 inclusive. " +
"The default value is -1 which corresponds to 6 level in the current implementation.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do the check like checkValue(_ => -1, ...) here


val AVRO_DEFLATE_LEVEL = buildConf("spark.sql.avro.deflate.level")
.doc("Compression level for the deflate codec used in writing of AVRO files. " +
"Valid value must be in the range of from 1 to 9 inclusive. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be -1 right (https://www.zlib.net/manual.html)?

@MaxGekk MaxGekk changed the title [SPARK-24881][SQL] New Avro options - compression and compressionLevel [SPARK-24881][SQL] New Avro option - compression Jul 26, 2018
.createWithDefault(20)

val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec")
.doc("Compression codec used in writing of AVRO files.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document the default value?

@SparkQA
Copy link

SparkQA commented Jul 27, 2018

Test build #93636 has finished for PR 21837 at commit 5561582.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SparkQA
Copy link

SparkQA commented Jul 27, 2018

Test build #93632 has finished for PR 21837 at commit b315f37.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@transient val parameters: CaseInsensitiveMap[String],
@transient val conf: Configuration) extends Logging with Serializable {
@transient val conf: Configuration,
@transient val sqlConf: SQLConf) extends Logging with Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may just get SQLConf by calling SQLConf.get without passing it in.

test("write with compression") {
test("write with compression - sql configs") {
withTempPath { dir =>
val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can use SQLConf.AVRO_COMPRESSION_CODEC.key now.

test("write with compression - sql configs") {
withTempPath { dir =>
val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec"
val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@SparkQA
Copy link

SparkQA commented Jul 27, 2018

Test build #93634 has finished for PR 21837 at commit 6915f34.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 27, 2018

Test build #93637 has finished for PR 21837 at commit ebaf327.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 27, 2018

Test build #93658 has finished for PR 21837 at commit 5f83902.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Jul 27, 2018

jenkins, retest this, please

@SparkQA
Copy link

SparkQA commented Jul 27, 2018

Test build #93659 has finished for PR 21837 at commit 5f83902.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

@asfgit asfgit closed this in 0a0f68b Jul 27, 2018
@MaxGekk MaxGekk deleted the avro-compression branch August 17, 2019 13:35
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
In the PR, I added new option for Avro datasource - `compression`. The option allows to specify compression codec for saved Avro files. This option is similar to `compression` option in another datasources like `JSON` and `CSV`.

Also I added the SQL configs `spark.sql.avro.compression.codec` and `spark.sql.avro.deflate.level`. I put the configs into `SQLConf`. If the `compression` option is not specified by an user, the first SQL config is taken into account.

I added new test which read meta info from written avro files and checks `avro.codec` property.

Author: Maxim Gekk <[email protected]>

Closes apache#21837 from MaxGekk/avro-compression.

(cherry picked from commit 0a0f68b)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants