From 8a8ff3f5bfdfaee7ec73e362cfa34261d199f407 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 27 Apr 2018 15:23:40 +0200 Subject: [PATCH 1/5] Propagating DataFrameReader's options to the text datasource on schema inferring --- .../spark/sql/catalyst/json/JSONOptions.scala | 2 ++ .../datasources/csv/CSVDataSource.scala | 6 ++++-- .../datasources/csv/CSVOptions.scala | 2 ++ .../datasources/json/JsonDataSource.scala | 19 ++++++++----------- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 5c9adc3332bc..111d1f8105e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -107,4 +107,6 @@ private[sql] class JSONOptions( allowBackslashEscapingAnyCharacter) factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, allowUnquotedControlChars) } + + val textOptions = parameters.originalMap } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 4870d75fc5f0..e96118942ed2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -184,7 +184,8 @@ object TextInputCSVDataSource extends CSVDataSource { DataSource.apply( sparkSession, paths = paths, - className = classOf[TextFileFormat].getName + className = classOf[TextFileFormat].getName, + options = options.textOptions ).resolveRelation(checkFilesExist = false)) .select("value").as[String](Encoders.STRING) } else { @@ -248,7 +249,8 @@ object MultiLineCSVDataSource extends CSVDataSource { options: CSVOptions): RDD[PortableDataStream] = { val paths = inputPaths.map(_.getPath) val name = paths.mkString(",") - val job = Job.getInstance(sparkSession.sessionState.newHadoopConf()) + val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions( + options.textOptions)) FileInputFormat.setInputPaths(job, paths: _*) val conf = job.getConfiguration diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index c16790630ce1..d543d54972e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -186,4 +186,6 @@ class CSVOptions( settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER) settings } + + val textOptions = parameters.originalMap } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 5769c09c9a1d..95a171f113fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -92,8 +92,7 @@ object TextInputJsonDataSource extends JsonDataSource { sparkSession: SparkSession, inputPaths: Seq[FileStatus], parsedOptions: JSONOptions): StructType = { - val json: Dataset[String] = createBaseDataset( - sparkSession, inputPaths, parsedOptions.lineSeparator) + val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions) inferFromDataset(json, parsedOptions) } @@ -106,18 +105,14 @@ object TextInputJsonDataSource extends JsonDataSource { private def createBaseDataset( sparkSession: SparkSession, inputPaths: Seq[FileStatus], - lineSeparator: Option[String]): Dataset[String] = { - val textOptions = lineSeparator.map { lineSep => - Map(TextOptions.LINE_SEPARATOR -> lineSep) - }.getOrElse(Map.empty[String, String]) - + parsedOptions: JSONOptions): Dataset[String] = { val paths = inputPaths.map(_.getPath.toString) sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, paths = paths, className = classOf[TextFileFormat].getName, - options = textOptions + options = parsedOptions.textOptions ).resolveRelation(checkFilesExist = false)) .select("value").as(Encoders.STRING) } @@ -151,16 +146,18 @@ object MultiLineJsonDataSource extends JsonDataSource { sparkSession: SparkSession, inputPaths: Seq[FileStatus], parsedOptions: JSONOptions): StructType = { - val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths) + val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths, parsedOptions) val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions) JsonInferSchema.infer(sampled, parsedOptions, createParser) } private def createBaseRdd( sparkSession: SparkSession, - inputPaths: Seq[FileStatus]): RDD[PortableDataStream] = { + inputPaths: Seq[FileStatus], + parsedOptions: JSONOptions): RDD[PortableDataStream] = { val paths = inputPaths.map(_.getPath) - val job = Job.getInstance(sparkSession.sessionState.newHadoopConf()) + val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions( + parsedOptions.textOptions)) val conf = job.getConfiguration val name = paths.mkString(",") FileInputFormat.setInputPaths(job, paths: _*) From 1fa871dd4cad9a04cf8617031ab47844a82bb56e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 28 Apr 2018 09:58:31 +0200 Subject: [PATCH 2/5] Make textOptions serializable --- .../org/apache/spark/sql/catalyst/json/JSONOptions.scala | 4 +++- .../spark/sql/execution/datasources/csv/CSVOptions.scala | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 111d1f8105e1..9bfc690d1a6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.json import java.nio.charset.StandardCharsets import java.util.{Locale, TimeZone} +import scala.collection.immutable.ListMap + import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import org.apache.commons.lang3.time.FastDateFormat @@ -108,5 +110,5 @@ private[sql] class JSONOptions( factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, allowUnquotedControlChars) } - val textOptions = parameters.originalMap + val textOptions = ListMap(parameters.toList: _*) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index d543d54972e4..53c948b003ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.datasources.csv import java.nio.charset.StandardCharsets import java.util.{Locale, TimeZone} +import scala.collection.immutable.ListMap + import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling} import org.apache.commons.lang3.time.FastDateFormat @@ -187,5 +189,5 @@ class CSVOptions( settings } - val textOptions = parameters.originalMap + val textOptions = ListMap(parameters.toList: _*) } From 49012a392329241969e1d1d938f965b84968f559 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 5 May 2018 11:16:44 +0200 Subject: [PATCH 3/5] Adding @transient to textOptions because they shouldn't be serialized --- .../org/apache/spark/sql/catalyst/json/JSONOptions.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 5191cd9f4a0c..9df5f6d331a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.catalyst.json import java.nio.charset.{Charset, StandardCharsets} import java.util.{Locale, TimeZone} -import scala.collection.immutable.ListMap - import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import org.apache.commons.lang3.time.FastDateFormat @@ -139,5 +137,5 @@ private[sql] class JSONOptions( factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, allowUnquotedControlChars) } - val textOptions = ListMap(parameters.toList: _*) + @transient val textOptions = parameters } From cd1ffadd3ef19e72fdc6bf228d7281008564dd1a Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 6 May 2018 10:09:37 +0200 Subject: [PATCH 4/5] Removing the separate val for textOptions --- .../org/apache/spark/sql/catalyst/json/JSONOptions.scala | 4 +--- .../spark/sql/execution/datasources/csv/CSVDataSource.scala | 4 ++-- .../spark/sql/execution/datasources/csv/CSVOptions.scala | 4 +--- .../spark/sql/execution/datasources/json/JsonDataSource.scala | 4 ++-- 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 9df5f6d331a8..68c2b95e94fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util._ * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]]. */ private[sql] class JSONOptions( - @transient private val parameters: CaseInsensitiveMap[String], + @transient val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String, defaultColumnNameOfCorruptRecord: String) extends Logging with Serializable { @@ -136,6 +136,4 @@ private[sql] class JSONOptions( allowBackslashEscapingAnyCharacter) factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, allowUnquotedControlChars) } - - @transient val textOptions = parameters } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index a352d781f730..dc54d182651b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -186,7 +186,7 @@ object TextInputCSVDataSource extends CSVDataSource { sparkSession, paths = paths, className = classOf[TextFileFormat].getName, - options = options.textOptions + options = options.parameters ).resolveRelation(checkFilesExist = false)) .select("value").as[String](Encoders.STRING) } else { @@ -252,7 +252,7 @@ object MultiLineCSVDataSource extends CSVDataSource { val paths = inputPaths.map(_.getPath) val name = paths.mkString(",") val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions( - options.textOptions)) + options.parameters)) FileInputFormat.setInputPaths(job, paths: _*) val conf = job.getConfiguration diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 51e5f60139d1..a409b65ddd1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util._ class CSVOptions( - @transient private val parameters: CaseInsensitiveMap[String], + @transient val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String, defaultColumnNameOfCorruptRecord: String) extends Logging with Serializable { @@ -191,6 +191,4 @@ class CSVOptions( settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER) settings } - - val textOptions = ListMap(parameters.toList: _*) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 0c69d896069a..ba83df0efebd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -121,7 +121,7 @@ object TextInputJsonDataSource extends JsonDataSource { sparkSession, paths = paths, className = classOf[TextFileFormat].getName, - options = parsedOptions.textOptions + options = parsedOptions.parameters ).resolveRelation(checkFilesExist = false)) .select("value").as(Encoders.STRING) } @@ -174,7 +174,7 @@ object MultiLineJsonDataSource extends JsonDataSource { parsedOptions: JSONOptions): RDD[PortableDataStream] = { val paths = inputPaths.map(_.getPath) val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions( - parsedOptions.textOptions)) + parsedOptions.parameters)) val conf = job.getConfiguration val name = paths.mkString(",") FileInputFormat.setInputPaths(job, paths: _*) From d138c447e287bec3807d0b0b8a0c68282efa1883 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 6 May 2018 10:53:25 +0200 Subject: [PATCH 5/5] Removing unused imports --- .../apache/spark/sql/execution/datasources/csv/CSVOptions.scala | 2 -- .../apache/spark/sql/execution/datasources/csv/CSVUtils.scala | 2 -- .../spark/sql/execution/datasources/csv/UnivocityParser.scala | 2 -- 3 files changed, 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index a409b65ddd1e..ed2dc65a4791 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.datasources.csv import java.nio.charset.StandardCharsets import java.util.{Locale, TimeZone} -import scala.collection.immutable.ListMap - import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling} import org.apache.commons.lang3.time.FastDateFormat diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala index 31464f1bcc68..9dae41b63e81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala @@ -17,10 +17,8 @@ package org.apache.spark.sql.execution.datasources.csv -import org.apache.spark.input.PortableDataStream import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset -import org.apache.spark.sql.catalyst.json.JSONOptions import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 3d6cc30f2ba8..99557a1ceb0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources.csv import java.io.InputStream import java.math.BigDecimal -import java.text.NumberFormat -import java.util.Locale import scala.util.Try import scala.util.control.NonFatal