Skip to content

Commit fd6b0bc

Browse files
committed
[SPARK-25102][SQL][2.4] Write Spark version to ORC/Parquet file metadata
### What changes were proposed in this pull request? This is a backport of #22932 . Currently, Spark writes Spark version number into Hive Table properties with `spark.sql.create.version`. ``` parameters:{ spark.sql.sources.schema.part.0={ "type":"struct", "fields":[{"name":"a","type":"integer","nullable":true,"metadata":{}}] }, transient_lastDdlTime=1541142761, spark.sql.sources.schema.numParts=1, spark.sql.create.version=2.4.0 } ``` This PR aims to write Spark versions to ORC/Parquet file metadata with `org.apache.spark.sql.create.version` because we used `org.apache.` prefix in Parquet metadata already. It's different from Hive Table property key `spark.sql.create.version`, but it seems that we cannot change Hive Table property for backward compatibility. After this PR, ORC and Parquet file generated by Spark will have the following metadata. **ORC (`native` and `hive` implmentation)** ``` $ orc-tools meta /tmp/o File Version: 0.12 with ... ... User Metadata: org.apache.spark.sql.create.version=3.0.0 ``` **PARQUET** ``` $ parquet-tools meta /tmp/p ... creator: parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a) extra: org.apache.spark.sql.create.version = 3.0.0 extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}}]} ``` ### Why are the changes needed? This backport helps us handle this files differently in Apache Spark 3.0.0. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Pass the Jenkins with newly added test cases. Closes #28142 from dongjoon-hyun/SPARK-25102-2.4. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent aa9701b commit fd6b0bc

File tree

14 files changed

+153
-25
lines changed

14 files changed

+153
-25
lines changed

core/src/main/scala/org/apache/spark/package.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache
1919

2020
import java.util.Properties
2121

22+
import org.apache.spark.util.VersionUtils
23+
2224
/**
2325
* Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to
2426
* Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a distributed collection,
@@ -89,6 +91,7 @@ package object spark {
8991
}
9092

9193
val SPARK_VERSION = SparkBuildInfo.spark_version
94+
val SPARK_VERSION_SHORT = VersionUtils.shortVersion(SparkBuildInfo.spark_version)
9295
val SPARK_BRANCH = SparkBuildInfo.spark_branch
9396
val SPARK_REVISION = SparkBuildInfo.spark_revision
9497
val SPARK_BUILD_USER = SparkBuildInfo.spark_build_user

core/src/main/scala/org/apache/spark/util/VersionUtils.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ package org.apache.spark.util
2323
private[spark] object VersionUtils {
2424

2525
private val majorMinorRegex = """^(\d+)\.(\d+)(\..*)?$""".r
26+
private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r
2627

2728
/**
2829
* Given a Spark version string, return the major version number.
@@ -36,6 +37,19 @@ private[spark] object VersionUtils {
3637
*/
3738
def minorVersion(sparkVersion: String): Int = majorMinorVersion(sparkVersion)._2
3839

40+
/**
41+
* Given a Spark version string, return the short version string.
42+
* E.g., for 3.0.0-SNAPSHOT, return '3.0.0'.
43+
*/
44+
def shortVersion(sparkVersion: String): String = {
45+
shortVersionRegex.findFirstMatchIn(sparkVersion) match {
46+
case Some(m) => m.group(1)
47+
case None =>
48+
throw new IllegalArgumentException(s"Spark tried to parse '$sparkVersion' as a Spark" +
49+
s" version string, but it could not find the major/minor/maintenance version numbers.")
50+
}
51+
}
52+
3953
/**
4054
* Given a Spark version string, return the (major version number, minor version number).
4155
* E.g., for 2.0.1-SNAPSHOT, return (2, 0).

core/src/test/scala/org/apache/spark/util/VersionUtilsSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,29 @@ class VersionUtilsSuite extends SparkFunSuite {
7373
}
7474
}
7575
}
76+
77+
test("Return short version number") {
78+
assert(shortVersion("3.0.0") === "3.0.0")
79+
assert(shortVersion("3.0.0-SNAPSHOT") === "3.0.0")
80+
withClue("shortVersion parsing should fail for missing maintenance version number") {
81+
intercept[IllegalArgumentException] {
82+
shortVersion("3.0")
83+
}
84+
}
85+
withClue("shortVersion parsing should fail for invalid major version number") {
86+
intercept[IllegalArgumentException] {
87+
shortVersion("x.0.0")
88+
}
89+
}
90+
withClue("shortVersion parsing should fail for invalid minor version number") {
91+
intercept[IllegalArgumentException] {
92+
shortVersion("3.x.0")
93+
}
94+
}
95+
withClue("shortVersion parsing should fail for invalid maintenance version number") {
96+
intercept[IllegalArgumentException] {
97+
shortVersion("3.0.x")
98+
}
99+
}
100+
}
76101
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ package org.apache.spark.sql.execution.datasources.orc
2020
import org.apache.hadoop.fs.Path
2121
import org.apache.hadoop.io.NullWritable
2222
import org.apache.hadoop.mapreduce.TaskAttemptContext
23-
import org.apache.orc.mapred.OrcStruct
24-
import org.apache.orc.mapreduce.OrcOutputFormat
23+
import org.apache.orc.OrcFile
24+
import org.apache.orc.mapred.{OrcOutputFormat => OrcMapRedOutputFormat, OrcStruct}
25+
import org.apache.orc.mapreduce.{OrcMapreduceRecordWriter, OrcOutputFormat}
2526

2627
import org.apache.spark.sql.catalyst.InternalRow
2728
import org.apache.spark.sql.execution.datasources.OutputWriter
@@ -36,11 +37,17 @@ private[orc] class OrcOutputWriter(
3637
private[this] val serializer = new OrcSerializer(dataSchema)
3738

3839
private val recordWriter = {
39-
new OrcOutputFormat[OrcStruct]() {
40+
val orcOutputFormat = new OrcOutputFormat[OrcStruct]() {
4041
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
4142
new Path(path)
4243
}
43-
}.getRecordWriter(context)
44+
}
45+
val filename = orcOutputFormat.getDefaultWorkFile(context, ".orc")
46+
val options = OrcMapRedOutputFormat.buildOptions(context.getConfiguration)
47+
val writer = OrcFile.createWriter(filename, options)
48+
val recordWriter = new OrcMapreduceRecordWriter[OrcStruct](writer)
49+
OrcUtils.addSparkVersionMetadata(writer)
50+
recordWriter
4451
}
4552

4653
override def write(row: InternalRow): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,19 @@
1717

1818
package org.apache.spark.sql.execution.datasources.orc
1919

20+
import java.nio.charset.StandardCharsets.UTF_8
2021
import java.util.Locale
2122

2223
import scala.collection.JavaConverters._
2324

2425
import org.apache.hadoop.conf.Configuration
2526
import org.apache.hadoop.fs.{FileStatus, Path}
26-
import org.apache.orc.{OrcFile, Reader, TypeDescription}
27+
import org.apache.orc.{OrcFile, Reader, TypeDescription, Writer}
2728

28-
import org.apache.spark.SparkException
29+
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
2930
import org.apache.spark.deploy.SparkHadoopUtil
3031
import org.apache.spark.internal.Logging
31-
import org.apache.spark.sql.SparkSession
32+
import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
3233
import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
3334
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
3435
import org.apache.spark.sql.types._
@@ -144,4 +145,11 @@ object OrcUtils extends Logging {
144145
}
145146
}
146147
}
148+
149+
/**
150+
* Add a metadata specifying Spark version.
151+
*/
152+
def addSparkVersionMetadata(writer: Writer): Unit = {
153+
writer.addUserMetadata(SPARK_VERSION_METADATA_KEY, UTF_8.encode(SPARK_VERSION_SHORT))
154+
}
147155
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ import org.apache.parquet.hadoop.api.WriteSupport
2929
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
3030
import org.apache.parquet.io.api.{Binary, RecordConsumer}
3131

32+
import org.apache.spark.SPARK_VERSION_SHORT
3233
import org.apache.spark.internal.Logging
34+
import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
3335
import org.apache.spark.sql.catalyst.InternalRow
3436
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
3537
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -93,7 +95,10 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
9395
this.rootFieldWriters = schema.map(_.dataType).map(makeWriter).toArray[ValueWriter]
9496

9597
val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema)
96-
val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava
98+
val metadata = Map(
99+
SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT,
100+
ParquetReadSupport.SPARK_METADATA_KEY -> schemaString
101+
).asJava
97102

98103
logInfo(
99104
s"""Initialized Parquet WriteSupport with Catalyst schema:

sql/core/src/main/scala/org/apache/spark/sql/package.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,13 @@ package object sql {
4444
type Strategy = SparkStrategy
4545

4646
type DataFrame = Dataset[Row]
47+
48+
/**
49+
* Metadata key which is used to write Spark version in the followings:
50+
* - Parquet file metadata
51+
* - ORC file metadata
52+
*
53+
* Note that Hive table property `spark.sql.create.version` also has Spark version.
54+
*/
55+
private[sql] val SPARK_VERSION_METADATA_KEY = "org.apache.spark.version"
4756
}

sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ Partition Values [ds=2017-08-01, hr=10]
9393
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10
9494
Created Time [not included in comparison]
9595
Last Access [not included in comparison]
96-
Partition Statistics 1121 bytes, 3 rows
96+
Partition Statistics 1189 bytes, 3 rows
9797

9898
# Storage Information
9999
Location [not included in comparison]sql/core/spark-warehouse/t
@@ -128,7 +128,7 @@ Partition Values [ds=2017-08-01, hr=10]
128128
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10
129129
Created Time [not included in comparison]
130130
Last Access [not included in comparison]
131-
Partition Statistics 1121 bytes, 3 rows
131+
Partition Statistics 1189 bytes, 3 rows
132132

133133
# Storage Information
134134
Location [not included in comparison]sql/core/spark-warehouse/t
@@ -155,7 +155,7 @@ Partition Values [ds=2017-08-01, hr=11]
155155
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11
156156
Created Time [not included in comparison]
157157
Last Access [not included in comparison]
158-
Partition Statistics 1098 bytes, 4 rows
158+
Partition Statistics 1166 bytes, 4 rows
159159

160160
# Storage Information
161161
Location [not included in comparison]sql/core/spark-warehouse/t
@@ -190,7 +190,7 @@ Partition Values [ds=2017-08-01, hr=10]
190190
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10
191191
Created Time [not included in comparison]
192192
Last Access [not included in comparison]
193-
Partition Statistics 1121 bytes, 3 rows
193+
Partition Statistics 1189 bytes, 3 rows
194194

195195
# Storage Information
196196
Location [not included in comparison]sql/core/spark-warehouse/t
@@ -217,7 +217,7 @@ Partition Values [ds=2017-08-01, hr=11]
217217
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11
218218
Created Time [not included in comparison]
219219
Last Access [not included in comparison]
220-
Partition Statistics 1098 bytes, 4 rows
220+
Partition Statistics 1166 bytes, 4 rows
221221

222222
# Storage Information
223223
Location [not included in comparison]sql/core/spark-warehouse/t
@@ -244,7 +244,7 @@ Partition Values [ds=2017-09-01, hr=5]
244244
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-09-01/hr=5
245245
Created Time [not included in comparison]
246246
Last Access [not included in comparison]
247-
Partition Statistics 1144 bytes, 2 rows
247+
Partition Statistics 1212 bytes, 2 rows
248248

249249
# Storage Information
250250
Location [not included in comparison]sql/core/spark-warehouse/t

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
509509
case plan: InMemoryRelation => plan
510510
}.head
511511
// InMemoryRelation's stats is file size before the underlying RDD is materialized
512-
assert(inMemoryRelation.computeStats().sizeInBytes === 800)
512+
assert(inMemoryRelation.computeStats().sizeInBytes === 868)
513513

514514
// InMemoryRelation's stats is updated after materializing RDD
515515
dfFromFile.collect()
@@ -522,7 +522,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
522522

523523
// Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats
524524
// is calculated
525-
assert(inMemoryRelation2.computeStats().sizeInBytes === 800)
525+
assert(inMemoryRelation2.computeStats().sizeInBytes === 868)
526526

527527
// InMemoryRelation's stats should be updated after calculating stats of the table
528528
// clear cache to simulate a fresh environment

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
4545
import testImplicits._
4646
Seq(1.0, 0.5).foreach { compressionFactor =>
4747
withSQLConf("spark.sql.sources.fileCompressionFactor" -> compressionFactor.toString,
48-
"spark.sql.autoBroadcastJoinThreshold" -> "400") {
48+
"spark.sql.autoBroadcastJoinThreshold" -> "434") {
4949
withTempPath { workDir =>
5050
// the file size is 740 bytes
5151
val workDirPath = workDir.getAbsolutePath

0 commit comments

Comments
 (0)