Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,13 @@ case class CreateDataSourceTableAsSelectCommand(
tableExists: Boolean): BaseRelation = {
// Create the relation based on the input logical plan: `data`.
val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
val options = table.properties ++ table.storage.properties ++ pathOption
val dataSource = DataSource(
session,
className = table.provider.get,
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
options = table.storage.properties ++ pathOption,
options = options,
catalogTable = if (tableExists) Some(table) else None)

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ case class InsertIntoHadoopFsRelationCommand(
}

val pathExists = fs.exists(qualifiedOutputPath)
// If we are appending data to an existing dir.
val isAppend = pathExists && (mode == SaveMode.Append)

val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
Expand Down Expand Up @@ -136,6 +134,8 @@ case class InsertIntoHadoopFsRelationCommand(
}
}

val newOptions = catalogTable.map(_.properties).getOrElse(Map.empty) ++ options

val updatedPartitionPaths =
FileFormatWriter.write(
sparkSession = sparkSession,
Expand All @@ -148,8 +148,7 @@ case class InsertIntoHadoopFsRelationCommand(
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
options = options)

options = newOptions)

// update metastore partition metadata
refreshUpdatedPartitions(updatedPartitionPaths)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources

import java.io.File

import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.util.Utils

abstract class CompressionTestUtils extends QueryTest with SQLTestUtils with BeforeAndAfterAll {

var originalParquetCompressionCodecName: String = _
var originalOrcCompressionCodecName: String = _

protected override def beforeAll(): Unit = {
super.beforeAll()
originalParquetCompressionCodecName = spark.conf.get(SQLConf.PARQUET_COMPRESSION.key)
originalOrcCompressionCodecName = spark.conf.get(SQLConf.ORC_COMPRESSION.key)
spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, "snappy")
spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy")
}

protected override def afterAll(): Unit = {
try {
spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, originalParquetCompressionCodecName)
spark.conf.set(SQLConf.ORC_COMPRESSION.key, originalOrcCompressionCodecName)
} finally {
super.afterAll()
}
}

protected override lazy val sql = spark.sql _

protected def checkCTASCompression(
formatClause: String,
optionClause: String,
expectedFileNameSuffix: String): Unit = {
withTable("tab1") {
sql(
s"""
|CREATE TABLE tab1
|$formatClause
|$optionClause
|AS SELECT 1 as col1
""".stripMargin)
val path = spark.sessionState.catalog.getTableMetadata(TableIdentifier("tab1")).location
val leafFiles = Utils.recursiveList(new File(path))
assert(leafFiles.count(_.getName.endsWith(expectedFileNameSuffix)) == 1)
}
}

protected def checkInsertCompression(
format: String,
isNative: Boolean,
optionClause: String,
tablePropertiesClause: String,
isPartitioned: Boolean,
expectedFileNameSuffix: String): Unit = {
withTable("tab1") {
val (schemaClause, partitionClause) = if (isPartitioned) {
if (isNative) {
("(col1 int, col2 int)", "PARTITIONED BY (col2)")
} else {
("(col1 int)", "PARTITIONED BY (col2 int)")
}
} else {
("(col1 int, col2 int)", "")
}
val formatClause = if (isNative) s"USING $format" else s"STORED AS $format"
sql(
s"""
|CREATE TABLE tab1 $schemaClause
|$formatClause
|$optionClause
|$partitionClause
|$tablePropertiesClause
""".stripMargin)
sql(
"""
|INSERT INTO TABLE tab1
|SELECT 1 as col1, 2 as col2
""".stripMargin)
val path = if (isPartitioned) {
spark.sessionState.catalog.getPartition(TableIdentifier("tab1"), Map("col2" -> "2"))
.location
} else {
spark.sessionState.catalog.getTableMetadata(TableIdentifier("tab1")).location
}
val leafFiles = Utils.recursiveList(new File(path))
assert(leafFiles.count(_.getName.endsWith(expectedFileNameSuffix)) == 1)
}
}
}

class CompressionSuite extends CompressionTestUtils with SharedSQLContext {

test("CTAS against native data source table - parquet") {
checkCTASCompression(
formatClause = "USING parquet",
optionClause = "OPTIONS('compression' = 'gzip')",
expectedFileNameSuffix = "gz.parquet"
)

checkCTASCompression(
formatClause = "USING parquet",
optionClause = "TBLPROPERTIES('compression' = 'gzip')",
expectedFileNameSuffix = "gz.parquet"
)

checkCTASCompression(
formatClause = "USING parquet",
optionClause =
"OPTIONS('compression' = 'gzip') TBLPROPERTIES('compression' = 'uncompressed')",
expectedFileNameSuffix = "gz.parquet"
)
}

test("INSERT against native data source table - parquet") {
Seq("false", "true").foreach { isPartitioned =>
checkInsertCompression(
format = "parquet",
isNative = true,
optionClause = "OPTIONS('compression' = 'gzip')",
tablePropertiesClause = "",
isPartitioned = isPartitioned.toBoolean,
expectedFileNameSuffix = "gz.parquet"
)

checkInsertCompression(
format = "parquet",
isNative = true,
optionClause = "",
tablePropertiesClause = "TBLPROPERTIES('compression' = 'gzip')",
isPartitioned = isPartitioned.toBoolean,
expectedFileNameSuffix = "gz.parquet"
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.sources.CompressionTestUtils

class HiveCompressionSuite extends CompressionTestUtils with TestHiveSingleton {

test("CTAS after converting hive table to data source table - parquet") {
withSQLConf(CONVERT_METASTORE_PARQUET.key -> "true") {
checkCTASCompression(
formatClause = "STORED AS parquet",
optionClause = "TBLPROPERTIES('compression' = 'gzip')",
expectedFileNameSuffix = "gz.parquet"
)
}
}

test("INSERT after converting hive table to data source table - parquet") {
withSQLConf(CONVERT_METASTORE_PARQUET.key -> "true") {
checkInsertCompression(
format = "parquet",
isNative = false,
optionClause = "",
tablePropertiesClause = "TBLPROPERTIES('compression' = 'gzip')",
isPartitioned = false,
expectedFileNameSuffix = "gz.parquet"
)
}
}

test("CTAS after converting hive table to data source table - orc") {
withSQLConf(CONVERT_METASTORE_ORC.key -> "true") {
checkCTASCompression(
formatClause = "STORED AS orc",
optionClause = "TBLPROPERTIES('compression' = 'zlib')",
expectedFileNameSuffix = "zlib.orc"
)
}
}

test("INSERT after converting hive table to data source table - orc") {
withSQLConf(CONVERT_METASTORE_ORC.key -> "true") {
checkInsertCompression(
format = "orc",
isNative = false,
optionClause = "",
tablePropertiesClause = "TBLPROPERTIES('compression' = 'zlib')",
isPartitioned = false,
expectedFileNameSuffix = "zlib.orc"
)
}
}
}