From b697436d2e5eefb8007cbe8add57426f40668923 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Nov 2016 16:53:39 -0700 Subject: [PATCH 1/8] Use text data source in CSV and JSON data sources. --- .../apache/spark/sql/DataFrameReader.scala | 2 +- .../datasources/csv/CSVFileFormat.scala | 34 ++++++++----------- .../datasources/csv/CSVRelation.scala | 4 +-- .../datasources/json/InferSchema.scala | 6 ++-- .../datasources/json/JsonFileFormat.scala | 28 ++------------- .../datasources/json/TestJsonData.scala | 10 +++--- 6 files changed, 28 insertions(+), 56 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index a77937efd7e15..a6f12a9441998 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -325,7 +325,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) val schema = userSpecifiedSchema.getOrElse { InferSchema.infer( - jsonRDD, + sparkSession.createDataset(jsonRDD)(Encoders.STRING), columnNameOfCorruptRecord, parsedOptions) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index a3691158ee758..119f21c584c57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce._ import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ @@ -56,8 +56,8 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { // TODO: Move filtering. val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString) - val rdd = baseRdd(sparkSession, csvOptions, paths) - val firstLine = findFirstLine(csvOptions, rdd) + val lines = readText(sparkSession, csvOptions, paths) + val firstLine = findFirstLine(csvOptions, lines) val firstRow = new CsvReader(csvOptions).parseLine(firstLine) val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis val header = makeSafeHeader(firstRow, csvOptions, caseSensitive) @@ -173,35 +173,28 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { } } - private def baseRdd( - sparkSession: SparkSession, - options: CSVOptions, - inputPaths: Seq[String]): RDD[String] = { - readText(sparkSession, options, inputPaths.mkString(",")) - } - private def tokenRdd( sparkSession: SparkSession, options: CSVOptions, header: Array[String], inputPaths: Seq[String]): RDD[Array[String]] = { - val rdd = baseRdd(sparkSession, options, inputPaths) + val lines = readText(sparkSession, options, inputPaths) // Make sure firstLine is materialized before sending to executors - val firstLine = if (options.headerFlag) findFirstLine(options, rdd) else null - CSVRelation.univocityTokenizer(rdd, firstLine, options) + val firstLine = if (options.headerFlag) findFirstLine(options, lines) else null + CSVRelation.univocityTokenizer(lines, firstLine, options) } /** * Returns the first line of the first non-empty file in path */ - private def findFirstLine(options: CSVOptions, rdd: RDD[String]): String = { + private def findFirstLine(options: CSVOptions, lines: Dataset[String]): String = { if (options.isCommentSet) { val comment = options.comment.toString - rdd.filter { line => + lines.filter { line => line.trim.nonEmpty && !line.startsWith(comment) }.first() } else { - rdd.filter { line => + lines.filter { line => line.trim.nonEmpty }.first() } @@ -210,14 +203,15 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { private def readText( sparkSession: SparkSession, options: CSVOptions, - location: String): RDD[String] = { + inputPaths: Seq[String]): Dataset[String] = { if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { - sparkSession.sparkContext.textFile(location) + sparkSession.read.textFile(inputPaths: _*) } else { val charset = options.charset - sparkSession.sparkContext - .hadoopFile[LongWritable, Text, TextInputFormat](location) + val rdd = sparkSession.sparkContext + .hadoopFile[LongWritable, Text, TextInputFormat](inputPaths.mkString(",")) .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))) + sparkSession.createDataset(rdd)(Encoders.STRING) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index a249b9d9d59b8..e02b35b4b9b74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -38,12 +38,12 @@ import org.apache.spark.sql.types._ object CSVRelation extends Logging { def univocityTokenizer( - file: RDD[String], + file: Dataset[String], firstLine: String, params: CSVOptions): RDD[Array[String]] = { // If header is set, make sure firstLine is materialized before sending to executors. val commentPrefix = params.comment.toString - file.mapPartitions { iter => + file.rdd.mapPartitions { iter => val parser = new CsvReader(params) val filteredIter = iter.filter { line => line.trim.nonEmpty && !line.startsWith(commentPrefix) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala index dc8bd817f2906..a45183d55ef9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala @@ -21,7 +21,7 @@ import java.util.Comparator import com.fasterxml.jackson.core._ -import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil import org.apache.spark.sql.catalyst.json.JSONOptions @@ -37,7 +37,7 @@ private[sql] object InferSchema { * 3. Replace any remaining null fields with string, the top type */ def infer( - json: RDD[String], + json: Dataset[String], columnNameOfCorruptRecord: String, configOptions: JSONOptions): StructType = { require(configOptions.samplingRatio > 0, @@ -50,7 +50,7 @@ private[sql] object InferSchema { } // perform schema inference on each row and merge afterwards - val rootType = schemaData.mapPartitions { iter => + val rootType = schemaData.rdd.mapPartitions { iter => val factory = new JsonFactory() configOptions.setJacksonOptions(factory) iter.flatMap { row => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 0e38aefecb673..033e9687314e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -21,15 +21,12 @@ import java.io.CharArrayWriter import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.io.{LongWritable, NullWritable, Text} -import org.apache.hadoop.mapred.{JobConf, TextInputFormat} +import org.apache.hadoop.io.{NullWritable, Text} import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat import org.apache.spark.TaskContext import org.apache.spark.internal.Logging -import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions} @@ -58,10 +55,10 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val jsonFiles = files.filterNot { status => val name = status.getPath.getName (name.startsWith("_") && !name.contains("=")) || name.startsWith(".") - }.toArray + }.map(_.getPath.toString).toArray val jsonSchema = InferSchema.infer( - createBaseRdd(sparkSession, jsonFiles), + sparkSession.read.textFile(jsonFiles: _*), columnNameOfCorruptRecord, parsedOptions) checkConstraints(jsonSchema) @@ -119,25 +116,6 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { } } - private def createBaseRdd( - sparkSession: SparkSession, - inputPaths: Seq[FileStatus]): RDD[String] = { - val job = Job.getInstance(sparkSession.sessionState.newHadoopConf()) - val conf = job.getConfiguration - - val paths = inputPaths.map(_.getPath) - - if (paths.nonEmpty) { - FileInputFormat.setInputPaths(job, paths: _*) - } - - sparkSession.sparkContext.hadoopRDD( - conf.asInstanceOf[JobConf], - classOf[TextInputFormat], - classOf[LongWritable], - classOf[Text]).map(_._2.toString) // get the text line - } - /** Constraints to be imposed on schema to be stored. */ private def checkConstraints(schema: StructType): Unit = { if (schema.fieldNames.length != schema.fieldNames.distinct.length) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index a400940db924a..c03d8368209c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.json import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Dataset, Encoders, SparkSession} private[json] trait TestJsonData { protected def spark: SparkSession @@ -196,14 +196,14 @@ private[json] trait TestJsonData { """42""" :: """ ","ian":"test"}""" :: Nil) - def emptyRecords: RDD[String] = - spark.sparkContext.parallelize( + def emptyRecords: Dataset[String] = + spark.createDataset( """{""" :: """""" :: """{"a": {}}""" :: """{"a": {"b": {}}}""" :: """{"b": [{"c": {}}]}""" :: - """]""" :: Nil) + """]""" :: Nil)(Encoders.STRING) def timestampAsLong: RDD[String] = spark.sparkContext.parallelize( @@ -230,5 +230,5 @@ private[json] trait TestJsonData { lazy val singleRow: RDD[String] = spark.sparkContext.parallelize("""{"a":123}""" :: Nil) - def empty: RDD[String] = spark.sparkContext.parallelize(Seq[String]()) + def empty: Dataset[String] = spark.createDataset(Seq[String]())(Encoders.STRING) } From cfb2f413023d49bfd655012717a99f3dc05702dd Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Nov 2016 17:40:37 -0700 Subject: [PATCH 2/8] Don't check file existence. --- .../execution/datasources/json/JsonFileFormat.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 033e9687314e0..1d0c0b56598f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat import org.apache.spark.TaskContext import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Encoders, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions} import org.apache.spark.sql.catalyst.util.CompressionCodecs @@ -57,6 +57,14 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { (name.startsWith("_") && !name.contains("=")) || name.startsWith(".") }.map(_.getPath.toString).toArray + val lines = sparkSession.baseRelationToDataFrame( + DataSource.apply( + sparkSession, + paths = jsonFiles, + className = classOf[TextBasedFileFormat].getName + ).resolveRelation(checkFilesExist = false)) + .select("value").as[String](Encoders.STRING) + val jsonSchema = InferSchema.infer( sparkSession.read.textFile(jsonFiles: _*), columnNameOfCorruptRecord, From 0fda0ecf01b74c3451429dce8943fc73eb2f7840 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Nov 2016 18:10:38 -0700 Subject: [PATCH 3/8] Fix name. --- .../spark/sql/execution/datasources/json/JsonFileFormat.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 1d0c0b56598f0..d120e43370750 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions} import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.text.TextOutputWriter +import org.apache.spark.sql.execution.datasources.text.{TextFileFormat, TextOutputWriter} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -61,7 +61,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { DataSource.apply( sparkSession, paths = jsonFiles, - className = classOf[TextBasedFileFormat].getName + className = classOf[TextFileFormat].getName ).resolveRelation(checkFilesExist = false)) .select("value").as[String](Encoders.STRING) From eb8ddfd7c80c349edd20ce64f7dc69b2b75f022f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 4 Nov 2016 11:33:01 -0700 Subject: [PATCH 4/8] Actually use `lines` --- .../spark/sql/execution/datasources/json/JsonFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index d120e43370750..fdf17e097cd0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -66,7 +66,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { .select("value").as[String](Encoders.STRING) val jsonSchema = InferSchema.infer( - sparkSession.read.textFile(jsonFiles: _*), + lines, columnNameOfCorruptRecord, parsedOptions) checkConstraints(jsonSchema) From acce60d4f584f549a44f6dd90e3db5e6694a61f0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 4 Nov 2016 15:53:00 -0700 Subject: [PATCH 5/8] Clean up CSV code to reduce number of scans. --- .../datasources/csv/CSVFileFormat.scala | 29 +++++++++---------- .../datasources/json/JsonFileFormat.scala | 4 +-- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 119f21c584c57..44b0cc1fddc0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration @@ -56,13 +57,16 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { // TODO: Move filtering. val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString) - val lines = readText(sparkSession, csvOptions, paths) - val firstLine = findFirstLine(csvOptions, lines) + val lines: Dataset[String] = readText(sparkSession, csvOptions, paths) + val firstLine: String = findFirstLine(csvOptions, lines) val firstRow = new CsvReader(csvOptions).parseLine(firstLine) val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis val header = makeSafeHeader(firstRow, csvOptions, caseSensitive) - val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths) + val parsedRdd: RDD[Array[String]] = CSVRelation.univocityTokenizer( + lines, + firstLine = if (csvOptions.headerFlag) firstLine else null, + params = csvOptions) val schema = if (csvOptions.inferSchemaFlag) { CSVInferSchema.infer(parsedRdd, header, csvOptions) } else { @@ -173,17 +177,6 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { } } - private def tokenRdd( - sparkSession: SparkSession, - options: CSVOptions, - header: Array[String], - inputPaths: Seq[String]): RDD[Array[String]] = { - val lines = readText(sparkSession, options, inputPaths) - // Make sure firstLine is materialized before sending to executors - val firstLine = if (options.headerFlag) findFirstLine(options, lines) else null - CSVRelation.univocityTokenizer(lines, firstLine, options) - } - /** * Returns the first line of the first non-empty file in path */ @@ -205,7 +198,13 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { options: CSVOptions, inputPaths: Seq[String]): Dataset[String] = { if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { - sparkSession.read.textFile(inputPaths: _*) + sparkSession.baseRelationToDataFrame( + DataSource.apply( + sparkSession, + paths = inputPaths, + className = classOf[TextFileFormat].getName + ).resolveRelation(checkFilesExist = false)) + .select("value").as[String](Encoders.STRING) } else { val charset = options.charset val rdd = sparkSession.sparkContext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index fdf17e097cd0b..ff2476854bb7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -52,10 +52,10 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) - val jsonFiles = files.filterNot { status => + val jsonFiles: Seq[String] = files.filterNot { status => val name = status.getPath.getName (name.startsWith("_") && !name.contains("=")) || name.startsWith(".") - }.map(_.getPath.toString).toArray + }.map(_.getPath.toString) val lines = sparkSession.baseRelationToDataFrame( DataSource.apply( From 3082844686aa8344d5fc350c790d85aa4f8ba500 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 17 Nov 2016 13:03:55 -0800 Subject: [PATCH 6/8] Ensure that inferSchema is not called with empty set of files. --- .../spark/sql/execution/datasources/csv/CSVFileFormat.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 44b0cc1fddc0d..a3c741df71237 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -53,6 +53,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { + require(files.nonEmpty, "Cannot infer schema from an empty set of files") val csvOptions = new CSVOptions(options) // TODO: Move filtering. From 4d19978d9ad731905e8fb4c6d15196901b22d886 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 30 Nov 2016 09:49:04 -0800 Subject: [PATCH 7/8] Use SQL filter to speed up first line query. --- .../execution/datasources/csv/CSVFileFormat.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index a3c741df71237..e627f040d3cc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat +import org.apache.spark.sql.functions.{length, trim} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration @@ -182,15 +183,12 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { * Returns the first line of the first non-empty file in path */ private def findFirstLine(options: CSVOptions, lines: Dataset[String]): String = { + import lines.sqlContext.implicits._ + val nonEmptyLines = lines.filter(length(trim($"value")) > 0) if (options.isCommentSet) { - val comment = options.comment.toString - lines.filter { line => - line.trim.nonEmpty && !line.startsWith(comment) - }.first() + nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).first() } else { - lines.filter { line => - line.trim.nonEmpty - }.first() + nonEmptyLines.first() } } From b01a307499a5b2a00039cb4b195d05a40157438c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 30 Nov 2016 09:58:25 -0800 Subject: [PATCH 8/8] Undo JSON changes. --- .../apache/spark/sql/DataFrameReader.scala | 2 +- .../datasources/json/InferSchema.scala | 6 +-- .../datasources/json/JsonFileFormat.scala | 42 ++++++++++++------- .../datasources/json/TestJsonData.scala | 10 ++--- 4 files changed, 37 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 727d9137efcc7..1af2f9afea5eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -326,7 +326,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) val schema = userSpecifiedSchema.getOrElse { InferSchema.infer( - sparkSession.createDataset(jsonRDD)(Encoders.STRING), + jsonRDD, columnNameOfCorruptRecord, parsedOptions) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala index a45183d55ef9b..dc8bd817f2906 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala @@ -21,7 +21,7 @@ import java.util.Comparator import com.fasterxml.jackson.core._ -import org.apache.spark.sql.Dataset +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil import org.apache.spark.sql.catalyst.json.JSONOptions @@ -37,7 +37,7 @@ private[sql] object InferSchema { * 3. Replace any remaining null fields with string, the top type */ def infer( - json: Dataset[String], + json: RDD[String], columnNameOfCorruptRecord: String, configOptions: JSONOptions): StructType = { require(configOptions.samplingRatio > 0, @@ -50,7 +50,7 @@ private[sql] object InferSchema { } // perform schema inference on each row and merge afterwards - val rootType = schemaData.rdd.mapPartitions { iter => + val rootType = schemaData.mapPartitions { iter => val factory = new JsonFactory() configOptions.setJacksonOptions(factory) iter.flatMap { row => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index ff2476854bb7a..0e38aefecb673 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -21,18 +21,21 @@ import java.io.CharArrayWriter import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.io.{NullWritable, Text} +import org.apache.hadoop.io.{LongWritable, NullWritable, Text} +import org.apache.hadoop.mapred.{JobConf, TextInputFormat} import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat import org.apache.spark.TaskContext import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, Encoders, Row, SparkSession} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions} import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.text.{TextFileFormat, TextOutputWriter} +import org.apache.spark.sql.execution.datasources.text.TextOutputWriter import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -52,21 +55,13 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) - val jsonFiles: Seq[String] = files.filterNot { status => + val jsonFiles = files.filterNot { status => val name = status.getPath.getName (name.startsWith("_") && !name.contains("=")) || name.startsWith(".") - }.map(_.getPath.toString) - - val lines = sparkSession.baseRelationToDataFrame( - DataSource.apply( - sparkSession, - paths = jsonFiles, - className = classOf[TextFileFormat].getName - ).resolveRelation(checkFilesExist = false)) - .select("value").as[String](Encoders.STRING) + }.toArray val jsonSchema = InferSchema.infer( - lines, + createBaseRdd(sparkSession, jsonFiles), columnNameOfCorruptRecord, parsedOptions) checkConstraints(jsonSchema) @@ -124,6 +119,25 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { } } + private def createBaseRdd( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus]): RDD[String] = { + val job = Job.getInstance(sparkSession.sessionState.newHadoopConf()) + val conf = job.getConfiguration + + val paths = inputPaths.map(_.getPath) + + if (paths.nonEmpty) { + FileInputFormat.setInputPaths(job, paths: _*) + } + + sparkSession.sparkContext.hadoopRDD( + conf.asInstanceOf[JobConf], + classOf[TextInputFormat], + classOf[LongWritable], + classOf[Text]).map(_._2.toString) // get the text line + } + /** Constraints to be imposed on schema to be stored. */ private def checkConstraints(schema: StructType): Unit = { if (schema.fieldNames.length != schema.fieldNames.distinct.length) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index c03d8368209c0..a400940db924a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.json import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Dataset, Encoders, SparkSession} +import org.apache.spark.sql.SparkSession private[json] trait TestJsonData { protected def spark: SparkSession @@ -196,14 +196,14 @@ private[json] trait TestJsonData { """42""" :: """ ","ian":"test"}""" :: Nil) - def emptyRecords: Dataset[String] = - spark.createDataset( + def emptyRecords: RDD[String] = + spark.sparkContext.parallelize( """{""" :: """""" :: """{"a": {}}""" :: """{"a": {"b": {}}}""" :: """{"b": [{"c": {}}]}""" :: - """]""" :: Nil)(Encoders.STRING) + """]""" :: Nil) def timestampAsLong: RDD[String] = spark.sparkContext.parallelize( @@ -230,5 +230,5 @@ private[json] trait TestJsonData { lazy val singleRow: RDD[String] = spark.sparkContext.parallelize("""{"a":123}""" :: Nil) - def empty: Dataset[String] = spark.createDataset(Seq[String]())(Encoders.STRING) + def empty: RDD[String] = spark.sparkContext.parallelize(Seq[String]()) }