Skip to content

Commit 26445c2

Browse files
lianchengyhuai
authored andcommitted
[SPARK-14206][SQL] buildReader() implementation for CSV
## What changes were proposed in this pull request? Major changes: 1. Implement `FileFormat.buildReader()` for the CSV data source. 1. Add an extra argument to `FileFormat.buildReader()`, `physicalSchema`, which is basically the result of `FileFormat.inferSchema` or user specified schema. This argument is necessary because the CSV data source needs to know all the columns of the underlying files to read the file. ## How was this patch tested? Existing tests should do the work. Author: Cheng Lian <[email protected]> Closes #12002 from liancheng/spark-14206-csv-build-reader.
1 parent da54abf commit 26445c2

File tree

9 files changed

+119
-63
lines changed

9 files changed

+119
-63
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
5959
if (files.fileFormat.toString == "TestFileFormat" ||
6060
files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
6161
files.fileFormat.toString == "ORC" ||
62-
files.fileFormat.isInstanceOf[json.DefaultSource] ||
63-
files.fileFormat.isInstanceOf[text.DefaultSource]) &&
62+
files.fileFormat.isInstanceOf[csv.DefaultSource] ||
63+
files.fileFormat.isInstanceOf[text.DefaultSource] ||
64+
files.fileFormat.isInstanceOf[json.DefaultSource]) &&
6465
files.sqlContext.conf.useFileScan =>
6566
// Filters on this relation fall into four categories based on where we can use them to avoid
6667
// reading unneeded data:
@@ -80,14 +81,6 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
8081
val dataColumns =
8182
l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver)
8283

83-
val bucketColumns =
84-
AttributeSet(
85-
files.bucketSpec
86-
.map(_.bucketColumnNames)
87-
.getOrElse(Nil)
88-
.map(l.resolveQuoted(_, files.sqlContext.conf.resolver)
89-
.getOrElse(sys.error(""))))
90-
9184
// Partition keys are not available in the statistics of the files.
9285
val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty)
9386

@@ -113,8 +106,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
113106

114107
val readFile = files.fileFormat.buildReader(
115108
sqlContext = files.sqlContext,
109+
dataSchema = files.dataSchema,
116110
partitionSchema = files.partitionSchema,
117-
dataSchema = prunedDataSchema,
111+
requiredSchema = prunedDataSchema,
118112
filters = pushedDownFilters,
119113
options = files.options)
120114

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.csv
1919

2020
import scala.util.control.NonFatal
2121

22-
import org.apache.hadoop.fs.{FileStatus, Path}
22+
import org.apache.hadoop.fs.Path
2323
import org.apache.hadoop.io.{NullWritable, Text}
2424
import org.apache.hadoop.mapreduce.RecordWriter
2525
import org.apache.hadoop.mapreduce.TaskAttemptContext
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
3030
import org.apache.spark.sql._
3131
import org.apache.spark.sql.catalyst.InternalRow
3232
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
33+
import org.apache.spark.sql.execution.datasources.PartitionedFile
3334
import org.apache.spark.sql.sources._
3435
import org.apache.spark.sql.types._
3536

@@ -49,14 +50,10 @@ object CSVRelation extends Logging {
4950
}, true)
5051
}
5152

52-
def parseCsv(
53-
tokenizedRDD: RDD[Array[String]],
53+
def csvParser(
5454
schema: StructType,
5555
requiredColumns: Array[String],
56-
inputs: Seq[FileStatus],
57-
sqlContext: SQLContext,
58-
params: CSVOptions): RDD[InternalRow] = {
59-
56+
params: CSVOptions): Array[String] => Option[InternalRow] = {
6057
val schemaFields = schema.fields
6158
val requiredFields = StructType(requiredColumns.map(schema(_))).fields
6259
val safeRequiredFields = if (params.dropMalformed) {
@@ -74,7 +71,8 @@ object CSVRelation extends Logging {
7471
}
7572
val requiredSize = requiredFields.length
7673
val row = new GenericMutableRow(requiredSize)
77-
tokenizedRDD.flatMap { tokens =>
74+
75+
(tokens: Array[String]) => {
7876
if (params.dropMalformed && schemaFields.length != tokens.length) {
7977
logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
8078
None
@@ -118,6 +116,33 @@ object CSVRelation extends Logging {
118116
}
119117
}
120118
}
119+
120+
def parseCsv(
121+
tokenizedRDD: RDD[Array[String]],
122+
schema: StructType,
123+
requiredColumns: Array[String],
124+
options: CSVOptions): RDD[InternalRow] = {
125+
val parser = csvParser(schema, requiredColumns, options)
126+
tokenizedRDD.flatMap(parser(_).toSeq)
127+
}
128+
129+
// Skips the header line of each file if the `header` option is set to true.
130+
def dropHeaderLine(
131+
file: PartitionedFile, lines: Iterator[String], csvOptions: CSVOptions): Unit = {
132+
// TODO What if the first partitioned file consists of only comments and empty lines?
133+
if (csvOptions.headerFlag && file.start == 0) {
134+
val nonEmptyLines = if (csvOptions.isCommentSet) {
135+
val commentPrefix = csvOptions.comment.toString
136+
lines.dropWhile { line =>
137+
line.trim.isEmpty || line.trim.startsWith(commentPrefix)
138+
}
139+
} else {
140+
lines.dropWhile(_.trim.isEmpty)
141+
}
142+
143+
if (nonEmptyLines.hasNext) nonEmptyLines.drop(1)
144+
}
145+
}
121146
}
122147

123148
private[sql] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWriterFactory {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,19 @@ package org.apache.spark.sql.execution.datasources.csv
1919

2020
import java.nio.charset.{Charset, StandardCharsets}
2121

22+
import org.apache.hadoop.conf.Configuration
2223
import org.apache.hadoop.fs.FileStatus
2324
import org.apache.hadoop.io.{LongWritable, Text}
2425
import org.apache.hadoop.mapred.TextInputFormat
25-
import org.apache.hadoop.mapreduce.Job
26+
import org.apache.hadoop.mapreduce._
2627

2728
import org.apache.spark.broadcast.Broadcast
2829
import org.apache.spark.rdd.RDD
2930
import org.apache.spark.sql.SQLContext
3031
import org.apache.spark.sql.catalyst.InternalRow
31-
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
32-
import org.apache.spark.sql.execution.datasources.CompressionCodecs
32+
import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection}
33+
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
34+
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, HadoopFileLinesReader, PartitionedFile}
3335
import org.apache.spark.sql.sources._
3436
import org.apache.spark.sql.types.{StringType, StructField, StructType}
3537
import org.apache.spark.util.SerializableConfiguration
@@ -91,6 +93,46 @@ class DefaultSource extends FileFormat with DataSourceRegister {
9193
new CSVOutputWriterFactory(csvOptions)
9294
}
9395

96+
override def buildReader(
97+
sqlContext: SQLContext,
98+
dataSchema: StructType,
99+
partitionSchema: StructType,
100+
requiredSchema: StructType,
101+
filters: Seq[Filter],
102+
options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
103+
val csvOptions = new CSVOptions(options)
104+
val headers = requiredSchema.fields.map(_.name)
105+
106+
val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
107+
val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
108+
109+
(file: PartitionedFile) => {
110+
val lineIterator = {
111+
val conf = broadcastedConf.value.value
112+
new HadoopFileLinesReader(file, conf).map { line =>
113+
new String(line.getBytes, 0, line.getLength, csvOptions.charset)
114+
}
115+
}
116+
117+
CSVRelation.dropHeaderLine(file, lineIterator, csvOptions)
118+
119+
val unsafeRowIterator = {
120+
val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers)
121+
val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions)
122+
tokenizedIterator.flatMap(parser(_).toSeq)
123+
}
124+
125+
// Appends partition values
126+
val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
127+
val joinedRow = new JoinedRow()
128+
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)
129+
130+
unsafeRowIterator.map { dataRow =>
131+
appendPartitionColumns(joinedRow(dataRow, file.partitionValues))
132+
}
133+
}
134+
}
135+
94136
/**
95137
* This supports to eliminate unneeded columns before producing an RDD
96138
* containing all of its tuples as Row objects. This reads all the tokens of each line
@@ -113,8 +155,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
113155
val pathsString = csvFiles.map(_.getPath.toUri.toString)
114156
val header = dataSchema.fields.map(_.name)
115157
val tokenizedRdd = tokenRdd(sqlContext, csvOptions, header, pathsString)
116-
val rows = CSVRelation.parseCsv(
117-
tokenizedRdd, dataSchema, requiredColumns, csvFiles, sqlContext, csvOptions)
158+
val rows = CSVRelation.parseCsv(tokenizedRdd, dataSchema, requiredColumns, csvOptions)
118159

119160
val requiredDataSchema = StructType(requiredColumns.map(c => dataSchema.find(_.name == c).get))
120161
rows.mapPartitions { iterator =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,9 @@ class DefaultSource extends FileFormat with DataSourceRegister {
124124

125125
override def buildReader(
126126
sqlContext: SQLContext,
127-
partitionSchema: StructType,
128127
dataSchema: StructType,
128+
partitionSchema: StructType,
129+
requiredSchema: StructType,
129130
filters: Seq[Filter],
130131
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
131132
val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
@@ -136,15 +137,15 @@ class DefaultSource extends FileFormat with DataSourceRegister {
136137
val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
137138
.getOrElse(sqlContext.conf.columnNameOfCorruptRecord)
138139

139-
val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
140+
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
140141
val joinedRow = new JoinedRow()
141142

142143
file => {
143144
val lines = new HadoopFileLinesReader(file, broadcastedConf.value.value).map(_.toString)
144145

145146
val rows = JacksonParser.parseJson(
146147
lines,
147-
dataSchema,
148+
requiredSchema,
148149
columnNameOfCorruptRecord,
149150
parsedOptions)
150151

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -276,38 +276,26 @@ private[sql] class DefaultSource
276276
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
277277
}
278278

279-
/**
280-
* Returns a function that can be used to read a single file in as an Iterator of InternalRow.
281-
*
282-
* @param partitionSchema The schema of the partition column row that will be present in each
283-
* PartitionedFile. These columns should be prepended to the rows that
284-
* are produced by the iterator.
285-
* @param dataSchema The schema of the data that should be output for each row. This may be a
286-
* subset of the columns that are present in the file if column pruning has
287-
* occurred.
288-
* @param filters A set of filters than can optionally be used to reduce the number of rows output
289-
* @param options A set of string -> string configuration options.
290-
* @return
291-
*/
292279
override def buildReader(
293280
sqlContext: SQLContext,
294-
partitionSchema: StructType,
295281
dataSchema: StructType,
282+
partitionSchema: StructType,
283+
requiredSchema: StructType,
296284
filters: Seq[Filter],
297285
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
298286
val parquetConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
299287
parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
300288
parquetConf.set(
301289
CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
302-
CatalystSchemaConverter.checkFieldNames(dataSchema).json)
290+
CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
303291
parquetConf.set(
304292
CatalystWriteSupport.SPARK_ROW_SCHEMA,
305-
CatalystSchemaConverter.checkFieldNames(dataSchema).json)
293+
CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
306294

307295
// We want to clear this temporary metadata from saving into Parquet file.
308296
// This metadata is only useful for detecting optional columns when pushdowning filters.
309297
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
310-
dataSchema).asInstanceOf[StructType]
298+
requiredSchema).asInstanceOf[StructType]
311299
CatalystWriteSupport.setSchema(dataSchemaToWrite, parquetConf)
312300

313301
// Sets flags for `CatalystSchemaConverter`
@@ -324,7 +312,7 @@ private[sql] class DefaultSource
324312
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
325313
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
326314
// is used here.
327-
.flatMap(ParquetFilters.createFilter(dataSchema, _))
315+
.flatMap(ParquetFilters.createFilter(requiredSchema, _))
328316
.reduceOption(FilterApi.and)
329317
} else {
330318
None
@@ -394,7 +382,7 @@ private[sql] class DefaultSource
394382
enableVectorizedParquetReader) {
395383
iter.asInstanceOf[Iterator[InternalRow]]
396384
} else {
397-
val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
385+
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
398386
val joinedRow = new JoinedRow()
399387
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
400388

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,9 @@ class DefaultSource extends FileFormat with DataSourceRegister {
129129

130130
override def buildReader(
131131
sqlContext: SQLContext,
132-
partitionSchema: StructType,
133132
dataSchema: StructType,
133+
partitionSchema: StructType,
134+
requiredSchema: StructType,
134135
filters: Seq[Filter],
135136
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
136137
val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -385,9 +385,9 @@ abstract class OutputWriter {
385385
*
386386
* @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise
387387
* this relation.
388-
* @param partitionSchema The schmea of the columns (if any) that are used to partition the relation
388+
* @param partitionSchema The schema of the columns (if any) that are used to partition the relation
389389
* @param dataSchema The schema of any remaining columns. Note that if any partition columns are
390-
* present in the actual data files as well, they are removed.
390+
* present in the actual data files as well, they are preserved.
391391
* @param bucketSpec Describes the bucketing (hash-partitioning of the files by some column values).
392392
* @param fileFormat A file format that can be used to read and write the data in files.
393393
* @param options Configuration used when reading / writing data.
@@ -462,20 +462,24 @@ trait FileFormat {
462462
/**
463463
* Returns a function that can be used to read a single file in as an Iterator of InternalRow.
464464
*
465+
* @param dataSchema The global data schema. It can be either specified by the user, or
466+
* reconciled/merged from all underlying data files. If any partition columns
467+
* are contained in the files, they are preserved in this schema.
465468
* @param partitionSchema The schema of the partition column row that will be present in each
466-
* PartitionedFile. These columns should be prepended to the rows that
469+
* PartitionedFile. These columns should be appended to the rows that
467470
* are produced by the iterator.
468-
* @param dataSchema The schema of the data that should be output for each row. This may be a
469-
* subset of the columns that are present in the file if column pruning has
470-
* occurred.
471+
* @param requiredSchema The schema of the data that should be output for each row. This may be a
472+
* subset of the columns that are present in the file if column pruning has
473+
* occurred.
471474
* @param filters A set of filters than can optionally be used to reduce the number of rows output
472475
* @param options A set of string -> string configuration options.
473476
* @return
474477
*/
475478
def buildReader(
476479
sqlContext: SQLContext,
477-
partitionSchema: StructType,
478480
dataSchema: StructType,
481+
partitionSchema: StructType,
482+
requiredSchema: StructType,
479483
filters: Seq[Filter],
480484
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
481485
// TODO: Remove this default implementation when the other formats have been ported

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,14 +376,15 @@ class TestFileFormat extends FileFormat {
376376

377377
override def buildReader(
378378
sqlContext: SQLContext,
379-
partitionSchema: StructType,
380379
dataSchema: StructType,
380+
partitionSchema: StructType,
381+
requiredSchema: StructType,
381382
filters: Seq[Filter],
382383
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
383384

384385
// Record the arguments so they can be checked in the test case.
385386
LastArguments.partitionSchema = partitionSchema
386-
LastArguments.dataSchema = dataSchema
387+
LastArguments.dataSchema = requiredSchema
387388
LastArguments.filters = filters
388389
LastArguments.options = options
389390

0 commit comments

Comments
 (0)