From 4916d6c39585dd063e74a299e9f745a1787ac6e3 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 30 Dec 2017 12:59:42 +0800 Subject: [PATCH 1/2] fix --- .../command/createDataSourceTables.scala | 3 +- .../InsertIntoHadoopFsRelationCommand.scala | 7 +- .../spark/sql/sources/CompressionSuite.scala | 160 ++++++++++++++++++ .../hive/execution/HiveCompressionSuite.scala | 71 ++++++++ 4 files changed, 236 insertions(+), 5 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/CompressionSuite.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompressionSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 306f43dc4214a..ed93e5829660d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -203,12 +203,13 @@ case class CreateDataSourceTableAsSelectCommand( tableExists: Boolean): BaseRelation = { // Create the relation based on the input logical plan: `data`. val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_)) + val options = table.properties ++ table.storage.properties ++ pathOption val dataSource = DataSource( session, className = table.provider.get, partitionColumns = table.partitionColumnNames, bucketSpec = table.bucketSpec, - options = table.storage.properties ++ pathOption, + options = options, catalogTable = if (tableExists) Some(table) else None) try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 675bee85bf61e..76ca252099459 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -87,8 +87,6 @@ case class InsertIntoHadoopFsRelationCommand( } val pathExists = fs.exists(qualifiedOutputPath) - // If we are appending data to an existing dir. - val isAppend = pathExists && (mode == SaveMode.Append) val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, @@ -136,6 +134,8 @@ case class InsertIntoHadoopFsRelationCommand( } } + val newOptions = catalogTable.map(_.properties).getOrElse(Map.empty) ++ options + val updatedPartitionPaths = FileFormatWriter.write( sparkSession = sparkSession, @@ -148,8 +148,7 @@ case class InsertIntoHadoopFsRelationCommand( partitionColumns = partitionColumns, bucketSpec = bucketSpec, statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)), - options = options) - + options = newOptions) // update metastore partition metadata refreshUpdatedPartitions(updatedPartitionPaths) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CompressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CompressionSuite.scala new file mode 100644 index 0000000000000..6f830ff834d5a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CompressionSuite.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.io.File + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} +import org.apache.spark.util.Utils + +abstract class CompressionTestUtils extends QueryTest with SQLTestUtils with BeforeAndAfterAll { + + var originalParquetCompressionCodeName: String = _ + var originalOrcCompressionCodeName: String = _ + + protected override def beforeAll(): Unit = { + super.beforeAll() + originalParquetCompressionCodeName = spark.conf.get(SQLConf.PARQUET_COMPRESSION.key) + originalOrcCompressionCodeName = spark.conf.get(SQLConf.ORC_COMPRESSION.key) + spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, "snappy") + spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy") + } + + protected override def afterAll(): Unit = { + try { + spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, originalParquetCompressionCodeName) + spark.conf.set(SQLConf.ORC_COMPRESSION.key, originalOrcCompressionCodeName) + } finally { + super.afterAll() + } + } + + protected override lazy val sql = spark.sql _ + + protected def checkCTASCompression( + formatClause: String, + optionClause: String, + expectedFileNameSuffix: String): Unit = { + withTable("tab1") { + sql( + s""" + |CREATE TABLE tab1 + |$formatClause + |$optionClause + |AS SELECT 1 as col1 + """.stripMargin) + val path = spark.sessionState.catalog.getTableMetadata(TableIdentifier("tab1")).location + val leafFiles = Utils.recursiveList(new File(path)) + assert(leafFiles.count(_.getName.endsWith(expectedFileNameSuffix)) == 1) + } + } + + protected def checkInsertCompression( + format: String, + isNative: Boolean, + optionClause: String, + tablePropertiesClause: String, + isPartitioned: Boolean, + expectedFileNameSuffix: String): Unit = { + withTable("tab1") { + val (schemaClause, partitionClause) = if (isPartitioned) { + if (isNative) { + ("(col1 int, col2 int)", "PARTITIONED BY (col2)") + } else { + ("(col1 int)", "PARTITIONED BY (col2 int)") + } + } else { + ("(col1 int, col2 int)", "") + } + val formatClause = if (isNative) s"USING $format" else s"STORED AS $format" + sql( + s""" + |CREATE TABLE tab1 $schemaClause + |$formatClause + |$optionClause + |$partitionClause + |$tablePropertiesClause + """.stripMargin) + sql( + """ + |INSERT INTO TABLE tab1 + |SELECT 1 as col1, 2 as col2 + """.stripMargin) + val path = if (isPartitioned) { + spark.sessionState.catalog.getPartition(TableIdentifier("tab1"), Map("col2" -> "2")) + .location + } else { + spark.sessionState.catalog.getTableMetadata(TableIdentifier("tab1")).location + } + val leafFiles = Utils.recursiveList(new File(path)) + assert(leafFiles.count(_.getName.endsWith(expectedFileNameSuffix)) == 1) + } + } +} + +class CompressionSuite extends CompressionTestUtils with SharedSQLContext { + + test("CTAS against native data source table - parquet") { + checkCTASCompression( + formatClause = "USING parquet", + optionClause = "OPTIONS('compression' = 'gzip')", + expectedFileNameSuffix = "gz.parquet" + ) + + checkCTASCompression( + formatClause = "USING parquet", + optionClause = "TBLPROPERTIES('compression' = 'gzip')", + expectedFileNameSuffix = "gz.parquet" + ) + + checkCTASCompression( + formatClause = "USING parquet", + optionClause = + "OPTIONS('compression' = 'gzip') TBLPROPERTIES('compression' = 'uncompressed')", + expectedFileNameSuffix = "gz.parquet" + ) + } + + test("INSERT against native data source table - parquet") { + Seq("false", "true").foreach { isPartitioned => + checkInsertCompression( + format = "parquet", + isNative = true, + optionClause = "OPTIONS('compression' = 'gzip')", + tablePropertiesClause = "", + isPartitioned = isPartitioned.toBoolean, + expectedFileNameSuffix = "gz.parquet" + ) + + checkInsertCompression( + format = "parquet", + isNative = true, + optionClause = "", + tablePropertiesClause = "TBLPROPERTIES('compression' = 'gzip')", + isPartitioned = isPartitioned.toBoolean, + expectedFileNameSuffix = "gz.parquet" + ) + } + } + +} \ No newline at end of file diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompressionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompressionSuite.scala new file mode 100644 index 0000000000000..b99d569fe3f08 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompressionSuite.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.sources.CompressionTestUtils + +class HiveCompressionSuite extends CompressionTestUtils with TestHiveSingleton { + + test("CTAS after converting hive table to data source table - parquet") { + withSQLConf(CONVERT_METASTORE_PARQUET.key -> "true") { + checkCTASCompression( + formatClause = "STORED AS parquet", + optionClause = "TBLPROPERTIES('compression' = 'gzip')", + expectedFileNameSuffix = "gz.parquet" + ) + } + } + + test("INSERT after converting hive table to data source table - parquet") { + withSQLConf(CONVERT_METASTORE_PARQUET.key -> "true") { + checkInsertCompression( + format = "parquet", + isNative = false, + optionClause = "", + tablePropertiesClause = "TBLPROPERTIES('compression' = 'gzip')", + isPartitioned = false, + expectedFileNameSuffix = "gz.parquet" + ) + } + } + + test("CTAS after converting hive table to data source table - orc") { + withSQLConf(CONVERT_METASTORE_ORC.key -> "true") { + checkCTASCompression( + formatClause = "STORED AS orc", + optionClause = "TBLPROPERTIES('compression' = 'zlib')", + expectedFileNameSuffix = "zlib.orc" + ) + } + } + + test("INSERT after converting hive table to data source table - orc") { + withSQLConf(CONVERT_METASTORE_ORC.key -> "true") { + checkInsertCompression( + format = "orc", + isNative = false, + optionClause = "", + tablePropertiesClause = "TBLPROPERTIES('compression' = 'zlib')", + isPartitioned = false, + expectedFileNameSuffix = "zlib.orc" + ) + } + } +} \ No newline at end of file From 7189562d9e37c6a0642cab037daba18c11170812 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 30 Dec 2017 13:58:01 +0800 Subject: [PATCH 2/2] fix --- .../spark/sql/sources/CompressionSuite.scala | 15 +++++++-------- .../sql/hive/execution/HiveCompressionSuite.scala | 2 +- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CompressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CompressionSuite.scala index 6f830ff834d5a..38b86f0d82e16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CompressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CompressionSuite.scala @@ -29,21 +29,21 @@ import org.apache.spark.util.Utils abstract class CompressionTestUtils extends QueryTest with SQLTestUtils with BeforeAndAfterAll { - var originalParquetCompressionCodeName: String = _ - var originalOrcCompressionCodeName: String = _ + var originalParquetCompressionCodecName: String = _ + var originalOrcCompressionCodecName: String = _ protected override def beforeAll(): Unit = { super.beforeAll() - originalParquetCompressionCodeName = spark.conf.get(SQLConf.PARQUET_COMPRESSION.key) - originalOrcCompressionCodeName = spark.conf.get(SQLConf.ORC_COMPRESSION.key) + originalParquetCompressionCodecName = spark.conf.get(SQLConf.PARQUET_COMPRESSION.key) + originalOrcCompressionCodecName = spark.conf.get(SQLConf.ORC_COMPRESSION.key) spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, "snappy") spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy") } protected override def afterAll(): Unit = { try { - spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, originalParquetCompressionCodeName) - spark.conf.set(SQLConf.ORC_COMPRESSION.key, originalOrcCompressionCodeName) + spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, originalParquetCompressionCodecName) + spark.conf.set(SQLConf.ORC_COMPRESSION.key, originalOrcCompressionCodecName) } finally { super.afterAll() } @@ -156,5 +156,4 @@ class CompressionSuite extends CompressionTestUtils with SharedSQLContext { ) } } - -} \ No newline at end of file +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompressionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompressionSuite.scala index b99d569fe3f08..33aed4286bd09 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompressionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompressionSuite.scala @@ -68,4 +68,4 @@ class HiveCompressionSuite extends CompressionTestUtils with TestHiveSingleton { ) } } -} \ No newline at end of file +}