Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec, Lz4Codec, SnappyCodec}

import org.apache.spark.util.Utils

private[datasources] object CompressionCodecs {
private val shortCompressionCodecNames = Map(
"bzip2" -> classOf[BZip2Codec].getName,
"gzip" -> classOf[GzipCodec].getName,
"lz4" -> classOf[Lz4Codec].getName,
"snappy" -> classOf[SnappyCodec].getName)

/**
* Return the full version of the given codec class.
* If it is already a class name, just return it.
*/
def getCodecClassName(name: String): String = {
val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
try {
// Validate the codec name
Utils.classForName(codecName)
codecName
} catch {
case e: ClassNotFoundException =>
throw new IllegalArgumentException(s"Codec [$codecName] " +
s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.charset.Charset
import org.apache.hadoop.io.compress._

import org.apache.spark.Logging
import org.apache.spark.sql.execution.datasources.CompressionCodecs
import org.apache.spark.util.Utils

private[sql] case class CSVParameters(@transient parameters: Map[String, String]) extends Logging {
Expand Down Expand Up @@ -78,7 +79,7 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String]

val compressionCodec: Option[String] = {
val name = parameters.get("compression").orElse(parameters.get("codec"))
name.map(CSVCompressionCodecs.getCodecClassName)
name.map(CompressionCodecs.getCodecClassName)
}

val maxColumns = 20480
Expand Down Expand Up @@ -114,28 +115,3 @@ private[csv] object ParseModes {
true // We default to permissive is the mode string is not valid
}
}

private[csv] object CSVCompressionCodecs {
private val shortCompressionCodecNames = Map(
"bzip2" -> classOf[BZip2Codec].getName,
"gzip" -> classOf[GzipCodec].getName,
"lz4" -> classOf[Lz4Codec].getName,
"snappy" -> classOf[SnappyCodec].getName)

/**
* Return the full version of the given codec class.
* If it is already a class name, just return it.
*/
def getCodecClassName(name: String): String = {
val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
try {
// Validate the codec name
Utils.classForName(codecName)
codecName
} catch {
case e: ClassNotFoundException =>
throw new IllegalArgumentException(s"Codec [$codecName] " +
s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.json

import com.fasterxml.jackson.core.{JsonFactory, JsonParser}

import org.apache.spark.sql.execution.datasources.CompressionCodecs

/**
* Options for the JSON data source.
*
Expand All @@ -32,7 +34,8 @@ case class JSONOptions(
allowSingleQuotes: Boolean = true,
allowNumericLeadingZeros: Boolean = false,
allowNonNumericNumbers: Boolean = false,
allowBackslashEscapingAnyCharacter: Boolean = false) {
allowBackslashEscapingAnyCharacter: Boolean = false,
compressionCodec: Option[String] = None) {

/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
Expand All @@ -46,7 +49,6 @@ case class JSONOptions(
}
}


object JSONOptions {
def createFromConfigMap(parameters: Map[String, String]): JSONOptions = JSONOptions(
samplingRatio =
Expand All @@ -64,6 +66,10 @@ object JSONOptions {
allowNonNumericNumbers =
parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true),
allowBackslashEscapingAnyCharacter =
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false),
compressionCodec = {
val name = parameters.get("compression").orElse(parameters.get("codec"))
name.map(CompressionCodecs.getCodecClassName)
}
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonFactory
import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
Expand Down Expand Up @@ -162,6 +163,15 @@ private[sql] class JSONRelation(
}

override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = {
val conf = job.getConfiguration
options.compressionCodec.foreach { codec =>
conf.set("mapreduce.output.fileoutputformat.compress", "true")
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
conf.set("mapreduce.map.output.compress", "true")
conf.set("mapreduce.map.output.compress.codec", codec)
}

new BucketedOutputWriterFactory {
override def newInstance(
path: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1467,6 +1467,34 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}

test("SPARK-12872 Support to specify the option for compression codec") {
withTempDir { dir =>
val dir = Utils.createTempDir()
dir.delete()
val path = dir.getCanonicalPath
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)

val jsonDF = sqlContext.read.json(path)
val jsonDir = new File(dir, "json").getCanonicalPath
jsonDF.coalesce(1).write
.format("json")
.option("compression", "gZiP")
.save(jsonDir)

val compressedFiles = new File(jsonDir).listFiles()
assert(compressedFiles.exists(_.getName.endsWith(".gz")))

val jsonCopy = sqlContext.read
.format("json")
.load(jsonDir)

assert(jsonCopy.count == jsonDF.count)
val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
checkAnswer(jsonCopySome, jsonDFSome)
}
}

test("Casting long as timestamp") {
withTempTable("jsonTable") {
val schema = (new StructType).add("ts", TimestampType)
Expand Down