Skip to content

Commit b99129c

Browse files
tdaszsxwing
authored andcommitted
[SPARK-15982][SPARK-16009][SPARK-16007][SQL] Harmonize the behavior of DataFrameReader.text/csv/json/parquet/orc
## What changes were proposed in this pull request? Issues with current reader behavior. - `text()` without args returns an empty DF with no columns -> inconsistent, its expected that text will always return a DF with `value` string field, - `textFile()` without args fails with exception because of the above reason, it expected the DF returned by `text()` to have a `value` field. - `orc()` does not have var args, inconsistent with others - `json(single-arg)` was removed, but that caused source compatibility issues - [SPARK-16009](https://issues.apache.org/jira/browse/SPARK-16009) - user specified schema was not respected when `text/csv/...` were used with no args - [SPARK-16007](https://issues.apache.org/jira/browse/SPARK-16007) The solution I am implementing is to do the following. - For each format, there will be a single argument method, and a vararg method. For json, parquet, csv, text, this means adding json(string), etc.. For orc, this means adding orc(varargs). - Remove the special handling of text(), csv(), etc. that returns empty dataframe with no fields. Rather pass on the empty sequence of paths to the datasource, and let each datasource handle it right. For e.g, text data source, should return empty DF with schema (value: string) - Deduped docs and fixed their formatting. ## How was this patch tested? Added new unit tests for Scala and Java tests Author: Tathagata Das <[email protected]> Closes #13727 from tdas/SPARK-15982.
1 parent 6df8e38 commit b99129c

File tree

3 files changed

+420
-56
lines changed

3 files changed

+420
-56
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 98 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
119119
* @since 1.4.0
120120
*/
121121
def load(): DataFrame = {
122-
val dataSource =
123-
DataSource(
124-
sparkSession,
125-
userSpecifiedSchema = userSpecifiedSchema,
126-
className = source,
127-
options = extraOptions.toMap)
128-
Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation()))
122+
load(Seq.empty: _*) // force invocation of `load(...varargs...)`
129123
}
130124

131125
/**
@@ -135,7 +129,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
135129
* @since 1.4.0
136130
*/
137131
def load(path: String): DataFrame = {
138-
option("path", path).load()
132+
load(Seq(path): _*) // force invocation of `load(...varargs...)`
139133
}
140134

141135
/**
@@ -146,18 +140,15 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
146140
*/
147141
@scala.annotation.varargs
148142
def load(paths: String*): DataFrame = {
149-
if (paths.isEmpty) {
150-
sparkSession.emptyDataFrame
151-
} else {
152-
sparkSession.baseRelationToDataFrame(
153-
DataSource.apply(
154-
sparkSession,
155-
paths = paths,
156-
userSpecifiedSchema = userSpecifiedSchema,
157-
className = source,
158-
options = extraOptions.toMap).resolveRelation())
159-
}
143+
sparkSession.baseRelationToDataFrame(
144+
DataSource.apply(
145+
sparkSession,
146+
paths = paths,
147+
userSpecifiedSchema = userSpecifiedSchema,
148+
className = source,
149+
options = extraOptions.toMap).resolveRelation())
160150
}
151+
161152
/**
162153
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
163154
* url named table and connection properties.
@@ -245,13 +236,25 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
245236
sparkSession.baseRelationToDataFrame(relation)
246237
}
247238

239+
/**
240+
* Loads a JSON file (one object per line) and returns the result as a [[DataFrame]].
241+
* See the documentation on the overloaded `json()` method with varargs for more details.
242+
*
243+
* @since 1.4.0
244+
*/
245+
def json(path: String): DataFrame = {
246+
// This method ensures that calls that explicit need single argument works, see SPARK-16009
247+
json(Seq(path): _*)
248+
}
249+
248250
/**
249251
* Loads a JSON file (one object per line) and returns the result as a [[DataFrame]].
250252
*
251253
* This function goes through the input once to determine the input schema. If you know the
252254
* schema in advance, use the version that specifies the schema to avoid the extra scan.
253255
*
254256
* You can set the following JSON-specific options to deal with non-standard JSON files:
257+
* <ul>
255258
* <li>`primitivesAsString` (default `false`): infers all primitive values as a string type</li>
256259
* <li>`prefersDecimal` (default `false`): infers all floating-point values as a decimal
257260
* type. If the values do not fit in decimal, then it infers them as doubles.</li>
@@ -266,17 +269,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
266269
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
267270
* during parsing.</li>
268271
* <ul>
269-
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the
270-
* malformed string into a new field configured by `columnNameOfCorruptRecord`. When
272+
* <li> - `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
273+
* the malformed string into a new field configured by `columnNameOfCorruptRecord`. When
271274
* a schema is set by user, it sets `null` for extra fields.</li>
272-
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
273-
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
275+
* <li> - `DROPMALFORMED` : ignores the whole corrupted records.</li>
276+
* <li> - `FAILFAST` : throws an exception when it meets corrupted records.</li>
274277
* </ul>
275278
* <li>`columnNameOfCorruptRecord` (default is the value specified in
276279
* `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
277280
* created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
278-
*
279-
* @since 1.6.0
281+
* </ul>
282+
* @since 2.0.0
280283
*/
281284
@scala.annotation.varargs
282285
def json(paths: String*): DataFrame = format("json").load(paths : _*)
@@ -326,6 +329,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
326329
parsedOptions))(sparkSession))
327330
}
328331

332+
/**
333+
* Loads a CSV file and returns the result as a [[DataFrame]]. See the documentation on the
334+
* other overloaded `csv()` method for more details.
335+
*
336+
* @since 2.0.0
337+
*/
338+
def csv(path: String): DataFrame = {
339+
// This method ensures that calls that explicit need single argument works, see SPARK-16009
340+
csv(Seq(path): _*)
341+
}
342+
329343
/**
330344
* Loads a CSV file and returns the result as a [[DataFrame]].
331345
*
@@ -334,6 +348,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
334348
* specify the schema explicitly using [[schema]].
335349
*
336350
* You can set the following CSV-specific options to deal with CSV files:
351+
* <ul>
337352
* <li>`sep` (default `,`): sets the single character as a separator for each
338353
* field and value.</li>
339354
* <li>`encoding` (default `UTF-8`): decodes the CSV files by the given encoding
@@ -370,26 +385,37 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
370385
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
371386
* during parsing.</li>
372387
* <ul>
373-
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. When
388+
* <li> - `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. When
374389
* a schema is set by user, it sets `null` for extra fields.</li>
375-
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
376-
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
390+
* <li> - `DROPMALFORMED` : ignores the whole corrupted records.</li>
391+
* <li> - `FAILFAST` : throws an exception when it meets corrupted records.</li>
392+
* </ul>
377393
* </ul>
378-
*
379394
* @since 2.0.0
380395
*/
381396
@scala.annotation.varargs
382397
def csv(paths: String*): DataFrame = format("csv").load(paths : _*)
383398

384399
/**
385-
* Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
386-
* [[DataFrame]] if no paths are passed in.
400+
* Loads a Parquet file, returning the result as a [[DataFrame]]. See the documentation
401+
* on the other overloaded `parquet()` method for more details.
402+
*
403+
* @since 2.0.0
404+
*/
405+
def parquet(path: String): DataFrame = {
406+
// This method ensures that calls that explicit need single argument works, see SPARK-16009
407+
parquet(Seq(path): _*)
408+
}
409+
410+
/**
411+
* Loads a Parquet file, returning the result as a [[DataFrame]].
387412
*
388413
* You can set the following Parquet-specific option(s) for reading Parquet files:
414+
* <ul>
389415
* <li>`mergeSchema` (default is the value specified in `spark.sql.parquet.mergeSchema`): sets
390416
* whether we should merge schemas collected from all Parquet part-files. This will override
391417
* `spark.sql.parquet.mergeSchema`.</li>
392-
*
418+
* </ul>
393419
* @since 1.4.0
394420
*/
395421
@scala.annotation.varargs
@@ -404,7 +430,20 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
404430
* @since 1.5.0
405431
* @note Currently, this method can only be used after enabling Hive support.
406432
*/
407-
def orc(path: String): DataFrame = format("orc").load(path)
433+
def orc(path: String): DataFrame = {
434+
// This method ensures that calls that explicit need single argument works, see SPARK-16009
435+
orc(Seq(path): _*)
436+
}
437+
438+
/**
439+
* Loads an ORC file and returns the result as a [[DataFrame]].
440+
*
441+
* @param paths input paths
442+
* @since 2.0.0
443+
* @note Currently, this method can only be used after enabling Hive support.
444+
*/
445+
@scala.annotation.varargs
446+
def orc(paths: String*): DataFrame = format("orc").load(paths: _*)
408447

409448
/**
410449
* Returns the specified table as a [[DataFrame]].
@@ -417,6 +456,18 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
417456
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)))
418457
}
419458

459+
/**
460+
* Loads text files and returns a [[DataFrame]] whose schema starts with a string column named
461+
* "value", and followed by partitioned columns if there are any. See the documentation on
462+
* the other overloaded `text()` method for more details.
463+
*
464+
* @since 2.0.0
465+
*/
466+
def text(path: String): DataFrame = {
467+
// This method ensures that calls that explicit need single argument works, see SPARK-16009
468+
text(Seq(path): _*)
469+
}
470+
420471
/**
421472
* Loads text files and returns a [[DataFrame]] whose schema starts with a string column named
422473
* "value", and followed by partitioned columns if there are any.
@@ -430,12 +481,22 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
430481
* spark.read().text("/path/to/spark/README.md")
431482
* }}}
432483
*
433-
* @param paths input path
484+
* @param paths input paths
434485
* @since 1.6.0
435486
*/
436487
@scala.annotation.varargs
437488
def text(paths: String*): DataFrame = format("text").load(paths : _*)
438489

490+
/**
491+
* Loads text files and returns a [[Dataset]] of String. See the documentation on the
492+
* other overloaded `textFile()` method for more details.
493+
* @since 2.0.0
494+
*/
495+
def textFile(path: String): Dataset[String] = {
496+
// This method ensures that calls that explicit need single argument works, see SPARK-16009
497+
textFile(Seq(path): _*)
498+
}
499+
439500
/**
440501
* Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset
441502
* contains a single string column named "value".
@@ -457,6 +518,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
457518
*/
458519
@scala.annotation.varargs
459520
def textFile(paths: String*): Dataset[String] = {
521+
if (userSpecifiedSchema.nonEmpty) {
522+
throw new AnalysisException("User specified schema not supported with `textFile`")
523+
}
460524
text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder)
461525
}
462526

0 commit comments

Comments
 (0)