Skip to content

Commit 8a8ff3f

Browse files
committed
Propagating DataFrameReader's options to the text datasource on schema inferring
1 parent 109935f commit 8a8ff3f

File tree

4 files changed

+16
-13
lines changed

4 files changed

+16
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,6 @@ private[sql] class JSONOptions(
107107
allowBackslashEscapingAnyCharacter)
108108
factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, allowUnquotedControlChars)
109109
}
110+
111+
val textOptions = parameters.originalMap
110112
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,8 @@ object TextInputCSVDataSource extends CSVDataSource {
184184
DataSource.apply(
185185
sparkSession,
186186
paths = paths,
187-
className = classOf[TextFileFormat].getName
187+
className = classOf[TextFileFormat].getName,
188+
options = options.textOptions
188189
).resolveRelation(checkFilesExist = false))
189190
.select("value").as[String](Encoders.STRING)
190191
} else {
@@ -248,7 +249,8 @@ object MultiLineCSVDataSource extends CSVDataSource {
248249
options: CSVOptions): RDD[PortableDataStream] = {
249250
val paths = inputPaths.map(_.getPath)
250251
val name = paths.mkString(",")
251-
val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
252+
val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions(
253+
options.textOptions))
252254
FileInputFormat.setInputPaths(job, paths: _*)
253255
val conf = job.getConfiguration
254256

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,4 +186,6 @@ class CSVOptions(
186186
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
187187
settings
188188
}
189+
190+
val textOptions = parameters.originalMap
189191
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,7 @@ object TextInputJsonDataSource extends JsonDataSource {
9292
sparkSession: SparkSession,
9393
inputPaths: Seq[FileStatus],
9494
parsedOptions: JSONOptions): StructType = {
95-
val json: Dataset[String] = createBaseDataset(
96-
sparkSession, inputPaths, parsedOptions.lineSeparator)
95+
val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions)
9796
inferFromDataset(json, parsedOptions)
9897
}
9998

@@ -106,18 +105,14 @@ object TextInputJsonDataSource extends JsonDataSource {
106105
private def createBaseDataset(
107106
sparkSession: SparkSession,
108107
inputPaths: Seq[FileStatus],
109-
lineSeparator: Option[String]): Dataset[String] = {
110-
val textOptions = lineSeparator.map { lineSep =>
111-
Map(TextOptions.LINE_SEPARATOR -> lineSep)
112-
}.getOrElse(Map.empty[String, String])
113-
108+
parsedOptions: JSONOptions): Dataset[String] = {
114109
val paths = inputPaths.map(_.getPath.toString)
115110
sparkSession.baseRelationToDataFrame(
116111
DataSource.apply(
117112
sparkSession,
118113
paths = paths,
119114
className = classOf[TextFileFormat].getName,
120-
options = textOptions
115+
options = parsedOptions.textOptions
121116
).resolveRelation(checkFilesExist = false))
122117
.select("value").as(Encoders.STRING)
123118
}
@@ -151,16 +146,18 @@ object MultiLineJsonDataSource extends JsonDataSource {
151146
sparkSession: SparkSession,
152147
inputPaths: Seq[FileStatus],
153148
parsedOptions: JSONOptions): StructType = {
154-
val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths)
149+
val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths, parsedOptions)
155150
val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions)
156151
JsonInferSchema.infer(sampled, parsedOptions, createParser)
157152
}
158153

159154
private def createBaseRdd(
160155
sparkSession: SparkSession,
161-
inputPaths: Seq[FileStatus]): RDD[PortableDataStream] = {
156+
inputPaths: Seq[FileStatus],
157+
parsedOptions: JSONOptions): RDD[PortableDataStream] = {
162158
val paths = inputPaths.map(_.getPath)
163-
val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
159+
val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions(
160+
parsedOptions.textOptions))
164161
val conf = job.getConfiguration
165162
val name = paths.mkString(",")
166163
FileInputFormat.setInputPaths(job, paths: _*)

0 commit comments

Comments
 (0)