Skip to content

Commit c4af64e

Browse files
committed
[SPARK-46766][SQL][AVRO] ZSTD Buffer Pool Support For AVRO datasource
### What changes were proposed in this pull request? This PR adds ZSTD Buffer Pool Support For AVRO datasource writing with zstd compression codec ### Why are the changes needed? Enable a tuning technique for users ### Does this PR introduce _any_ user-facing change? yes, add a new configuration ### How was this patch tested? passing existing ci shall be sufficient ### Was this patch authored or co-authored using generative AI tooling? no Closes #44792 from yaooqinn/SPARK-46766. Authored-by: Kent Yao <[email protected]> Signed-off-by: Kent Yao <[email protected]>
1 parent a09b726 commit c4af64e

File tree

3 files changed

+27
-6
lines changed

3 files changed

+27
-6
lines changed

connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._
2424
import org.apache.avro.Schema
2525
import org.apache.avro.file.{DataFileReader, FileReader}
2626
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
27-
import org.apache.avro.mapred.FsInput
27+
import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
2828
import org.apache.avro.mapreduce.AvroJob
2929
import org.apache.hadoop.conf.Configuration
3030
import org.apache.hadoop.fs.FileStatus
@@ -104,18 +104,25 @@ private[sql] object AvroUtils extends Logging {
104104

105105
parsedOptions.compression.toLowerCase(Locale.ROOT) match {
106106
case codecName if AvroCompressionCodec.values().exists(c => c.lowerCaseName() == codecName) =>
107+
val jobConf = job.getConfiguration
107108
AvroCompressionCodec.fromString(codecName) match {
108109
case UNCOMPRESSED =>
109-
job.getConfiguration.setBoolean("mapred.output.compress", false)
110+
jobConf.setBoolean("mapred.output.compress", false)
110111
case compressed =>
111-
job.getConfiguration.setBoolean("mapred.output.compress", true)
112-
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, compressed.getCodecName)
112+
jobConf.setBoolean("mapred.output.compress", true)
113+
jobConf.set(AvroJob.CONF_OUTPUT_CODEC, compressed.getCodecName)
113114
if (compressed.getSupportCompressionLevel) {
114115
val level = sqlConf.getConfString(s"spark.sql.avro.$codecName.level",
115116
compressed.getDefaultCompressionLevel.toString)
116117
logInfo(s"Compressing Avro output using the $codecName codec at level $level")
117-
val s = if (compressed == ZSTANDARD) "zstd" else codecName
118-
job.getConfiguration.setInt(s"avro.mapred.$s.level", level.toInt)
118+
val s = if (compressed == ZSTANDARD) {
119+
val bufferPoolEnabled = sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED)
120+
jobConf.setBoolean(AvroOutputFormat.ZSTD_BUFFERPOOL_KEY, bufferPoolEnabled)
121+
"zstd"
122+
} else {
123+
codecName
124+
}
125+
jobConf.setInt(s"avro.mapred.$s.level", level.toInt)
119126
} else {
120127
logInfo(s"Compressing Avro output using the $codecName codec")
121128
}

docs/sql-data-sources-avro.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,14 @@ Configuration of Avro can be done via `spark.conf.set` or by running `SET key=va
380380
</td>
381381
<td>4.0.0</td>
382382
</tr>
383+
<tr>
384+
<td>spark.sql.avro.zstandard.bufferPool.enabled</td>
385+
<td>false</td>
386+
<td>
387+
If true, enable buffer pool of ZSTD JNI library when writing of AVRO files.
388+
</td>
389+
<td>4.0.0</td>
390+
</tr>
383391
<tr>
384392
<td>spark.sql.avro.datetimeRebaseModeInRead</td>
385393
<td><code>EXCEPTION</code></td>

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3643,6 +3643,12 @@ object SQLConf {
36433643
.intConf
36443644
.createOptional
36453645

3646+
val AVRO_ZSTANDARD_BUFFER_POOL_ENABLED = buildConf("spark.sql.avro.zstandard.bufferPool.enabled")
3647+
.doc("If true, enable buffer pool of ZSTD JNI library when writing of AVRO files")
3648+
.version("4.0.0")
3649+
.booleanConf
3650+
.createWithDefault(false)
3651+
36463652
val LEGACY_SIZE_OF_NULL = buildConf("spark.sql.legacy.sizeOfNull")
36473653
.internal()
36483654
.doc(s"If it is set to false, or ${ANSI_ENABLED.key} is true, then size of null returns " +

0 commit comments

Comments
 (0)