diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 306f43dc4214a..ed93e5829660d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -203,12 +203,13 @@ case class CreateDataSourceTableAsSelectCommand( tableExists: Boolean): BaseRelation = { // Create the relation based on the input logical plan: `data`. val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_)) + val options = table.properties ++ table.storage.properties ++ pathOption val dataSource = DataSource( session, className = table.provider.get, partitionColumns = table.partitionColumnNames, bucketSpec = table.bucketSpec, - options = table.storage.properties ++ pathOption, + options = options, catalogTable = if (tableExists) Some(table) else None) try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 675bee85bf61e..76ca252099459 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -87,8 +87,6 @@ case class InsertIntoHadoopFsRelationCommand( } val pathExists = fs.exists(qualifiedOutputPath) - // If we are appending data to an existing dir. - val isAppend = pathExists && (mode == SaveMode.Append) val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, @@ -136,6 +134,8 @@ case class InsertIntoHadoopFsRelationCommand( } } + val newOptions = catalogTable.map(_.properties).getOrElse(Map.empty) ++ options + val updatedPartitionPaths = FileFormatWriter.write( sparkSession = sparkSession, @@ -148,8 +148,7 @@ case class InsertIntoHadoopFsRelationCommand( partitionColumns = partitionColumns, bucketSpec = bucketSpec, statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)), - options = options) - + options = newOptions) // update metastore partition metadata refreshUpdatedPartitions(updatedPartitionPaths) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CompressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CompressionSuite.scala new file mode 100644 index 0000000000000..38b86f0d82e16 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CompressionSuite.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.io.File + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} +import org.apache.spark.util.Utils + +abstract class CompressionTestUtils extends QueryTest with SQLTestUtils with BeforeAndAfterAll { + + var originalParquetCompressionCodecName: String = _ + var originalOrcCompressionCodecName: String = _ + + protected override def beforeAll(): Unit = { + super.beforeAll() + originalParquetCompressionCodecName = spark.conf.get(SQLConf.PARQUET_COMPRESSION.key) + originalOrcCompressionCodecName = spark.conf.get(SQLConf.ORC_COMPRESSION.key) + spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, "snappy") + spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy") + } + + protected override def afterAll(): Unit = { + try { + spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, originalParquetCompressionCodecName) + spark.conf.set(SQLConf.ORC_COMPRESSION.key, originalOrcCompressionCodecName) + } finally { + super.afterAll() + } + } + + protected override lazy val sql = spark.sql _ + + protected def checkCTASCompression( + formatClause: String, + optionClause: String, + expectedFileNameSuffix: String): Unit = { + withTable("tab1") { + sql( + s""" + |CREATE TABLE tab1 + |$formatClause + |$optionClause + |AS SELECT 1 as col1 + """.stripMargin) + val path = spark.sessionState.catalog.getTableMetadata(TableIdentifier("tab1")).location + val leafFiles = Utils.recursiveList(new File(path)) + assert(leafFiles.count(_.getName.endsWith(expectedFileNameSuffix)) == 1) + } + } + + protected def checkInsertCompression( + format: String, + isNative: Boolean, + optionClause: String, + tablePropertiesClause: String, + isPartitioned: Boolean, + expectedFileNameSuffix: String): Unit = { + withTable("tab1") { + val (schemaClause, partitionClause) = if (isPartitioned) { + if (isNative) { + ("(col1 int, col2 int)", "PARTITIONED BY (col2)") + } else { + ("(col1 int)", "PARTITIONED BY (col2 int)") + } + } else { + ("(col1 int, col2 int)", "") + } + val formatClause = if (isNative) s"USING $format" else s"STORED AS $format" + sql( + s""" + |CREATE TABLE tab1 $schemaClause + |$formatClause + |$optionClause + |$partitionClause + |$tablePropertiesClause + """.stripMargin) + sql( + """ + |INSERT INTO TABLE tab1 + |SELECT 1 as col1, 2 as col2 + """.stripMargin) + val path = if (isPartitioned) { + spark.sessionState.catalog.getPartition(TableIdentifier("tab1"), Map("col2" -> "2")) + .location + } else { + spark.sessionState.catalog.getTableMetadata(TableIdentifier("tab1")).location + } + val leafFiles = Utils.recursiveList(new File(path)) + assert(leafFiles.count(_.getName.endsWith(expectedFileNameSuffix)) == 1) + } + } +} + +class CompressionSuite extends CompressionTestUtils with SharedSQLContext { + + test("CTAS against native data source table - parquet") { + checkCTASCompression( + formatClause = "USING parquet", + optionClause = "OPTIONS('compression' = 'gzip')", + expectedFileNameSuffix = "gz.parquet" + ) + + checkCTASCompression( + formatClause = "USING parquet", + optionClause = "TBLPROPERTIES('compression' = 'gzip')", + expectedFileNameSuffix = "gz.parquet" + ) + + checkCTASCompression( + formatClause = "USING parquet", + optionClause = + "OPTIONS('compression' = 'gzip') TBLPROPERTIES('compression' = 'uncompressed')", + expectedFileNameSuffix = "gz.parquet" + ) + } + + test("INSERT against native data source table - parquet") { + Seq("false", "true").foreach { isPartitioned => + checkInsertCompression( + format = "parquet", + isNative = true, + optionClause = "OPTIONS('compression' = 'gzip')", + tablePropertiesClause = "", + isPartitioned = isPartitioned.toBoolean, + expectedFileNameSuffix = "gz.parquet" + ) + + checkInsertCompression( + format = "parquet", + isNative = true, + optionClause = "", + tablePropertiesClause = "TBLPROPERTIES('compression' = 'gzip')", + isPartitioned = isPartitioned.toBoolean, + expectedFileNameSuffix = "gz.parquet" + ) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompressionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompressionSuite.scala new file mode 100644 index 0000000000000..33aed4286bd09 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompressionSuite.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.sources.CompressionTestUtils + +class HiveCompressionSuite extends CompressionTestUtils with TestHiveSingleton { + + test("CTAS after converting hive table to data source table - parquet") { + withSQLConf(CONVERT_METASTORE_PARQUET.key -> "true") { + checkCTASCompression( + formatClause = "STORED AS parquet", + optionClause = "TBLPROPERTIES('compression' = 'gzip')", + expectedFileNameSuffix = "gz.parquet" + ) + } + } + + test("INSERT after converting hive table to data source table - parquet") { + withSQLConf(CONVERT_METASTORE_PARQUET.key -> "true") { + checkInsertCompression( + format = "parquet", + isNative = false, + optionClause = "", + tablePropertiesClause = "TBLPROPERTIES('compression' = 'gzip')", + isPartitioned = false, + expectedFileNameSuffix = "gz.parquet" + ) + } + } + + test("CTAS after converting hive table to data source table - orc") { + withSQLConf(CONVERT_METASTORE_ORC.key -> "true") { + checkCTASCompression( + formatClause = "STORED AS orc", + optionClause = "TBLPROPERTIES('compression' = 'zlib')", + expectedFileNameSuffix = "zlib.orc" + ) + } + } + + test("INSERT after converting hive table to data source table - orc") { + withSQLConf(CONVERT_METASTORE_ORC.key -> "true") { + checkInsertCompression( + format = "orc", + isNative = false, + optionClause = "", + tablePropertiesClause = "TBLPROPERTIES('compression' = 'zlib')", + isPartitioned = false, + expectedFileNameSuffix = "zlib.orc" + ) + } + } +}