Skip to content

Commit fb2bdea

Browse files
juliuszsompolskicloud-fan
authored andcommitted
[SPARK-40918][SQL][3.3] Mismatch between FileSourceScanExec and Orc and ParquetFileFormat on producing columnar output
### What changes were proposed in this pull request? We move the decision about supporting columnar output based on WSCG one level from ParquetFileFormat / OrcFileFormat up to FileSourceScanExec, and pass it as a new required option for ParquetFileFormat / OrcFileFormat. Now the semantics is as follows: * `ParquetFileFormat.supportsBatch` and `OrcFileFormat.supportsBatch` returns whether it **can**, not necessarily **will** return columnar output. * To return columnar output, an option `FileFormat.OPTION_RETURNING_BATCH` needs to be passed to `buildReaderWithPartitionValues` in these two file formats. It should only be set to `true` if `supportsBatch` is also `true`, but it can be set to `false` if we don't want columnar output nevertheless - this way, `FileSourceScanExec` can set it to false when there are more than 100 columsn for WSCG, and `ParquetFileFormat` / `OrcFileFormat` doesn't have to concern itself about WSCG limits. * To avoid not passing it by accident, this option is made required. Making it required requires updating a few places that use it, but an error resulting from this is very obscure. It's better to fail early and explicitly here. ### Why are the changes needed? This explains it for `ParquetFileFormat`. `OrcFileFormat` had exactly the same issue. `java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to org.apache.spark.sql.catalyst.InternalRow` was being thrown because ParquetReader was outputting columnar batches, while FileSourceScanExec expected row output. The mismatch comes from the fact that `ParquetFileFormat.supportBatch` depends on `WholeStageCodegenExec.isTooManyFields(conf, schema)`, where the threshold is 100 fields. When this is used in `FileSourceScanExec`: ``` override lazy val supportsColumnar: Boolean = { relation.fileFormat.supportBatch(relation.sparkSession, schema) } ``` the `schema` comes from output attributes, which includes extra metadata attributes. However, inside `ParquetFileFormat.buildReaderWithPartitionValues` it was calculated again as ``` relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, requiredSchema = requiredSchema, filters = pushedDownFilters, options = options, hadoopConf = hadoopConf ... val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) ... val returningBatch = supportBatch(sparkSession, resultSchema) ``` Where `requiredSchema` and `partitionSchema` wouldn't include the metadata columns: ``` FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L, file_path#6388) FileSourceScanExec: dataSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true)) FileSourceScanExec: partitionSchema: StructType() FileSourceScanExec: requiredSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true)) ``` Column like `file_path#6388` are added by the scan, and contain metadata added by the scan, not by the file reader which concerns itself with what is within the file. ### Does this PR introduce _any_ user-facing change? Not a public API change, but it is now required to pass `FileFormat.OPTION_RETURNING_BATCH` in `options` to `ParquetFileFormat.buildReaderWithPartitionValues`. The only user of this API in Apache Spark is `FileSourceScanExec`. ### How was this patch tested? Tests added Backports apache#38397 from juliuszsompolski/SPARK-40918. Authored-by: Juliusz Sompolski <julekdatabricks.com> Signed-off-by: Wenchen Fan <wenchendatabricks.com> Closes apache#38431 from juliuszsompolski/SPARK-40918-3.3. Authored-by: Juliusz Sompolski <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent c7ef560 commit fb2bdea

File tree

5 files changed

+107
-12
lines changed

5 files changed

+107
-12
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,12 @@ case class FileSourceScanExec(
211211
// Note that some vals referring the file-based relation are lazy intentionally
212212
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
213213
override lazy val supportsColumnar: Boolean = {
214-
relation.fileFormat.supportBatch(relation.sparkSession, schema)
214+
val conf = relation.sparkSession.sessionState.conf
215+
// Only output columnar if there is WSCG to read it.
216+
val requiredWholeStageCodegenSettings =
217+
conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf, schema)
218+
requiredWholeStageCodegenSettings &&
219+
relation.fileFormat.supportBatch(relation.sparkSession, schema)
215220
}
216221

217222
private lazy val needsUnsafeRowConversion: Boolean = {
@@ -447,14 +452,16 @@ case class FileSourceScanExec(
447452
}
448453

449454
lazy val inputRDD: RDD[InternalRow] = {
455+
val options = relation.options +
456+
(FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString)
450457
val readFile: (PartitionedFile) => Iterator[InternalRow] =
451458
relation.fileFormat.buildReaderWithPartitionValues(
452459
sparkSession = relation.sparkSession,
453460
dataSchema = relation.dataSchema,
454461
partitionSchema = relation.partitionSchema,
455462
requiredSchema = requiredSchema,
456463
filters = pushedDownFilters,
457-
options = relation.options,
464+
options = options,
458465
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
459466

460467
val readRDD = if (bucketedScan) {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ trait FileFormat {
6060

6161
/**
6262
* Returns whether this format supports returning columnar batch or not.
63+
* If columnar batch output is requested, users shall supply
64+
* FileFormat.OPTION_RETURNING_BATCH -> true
65+
* in relation options when calling buildReaderWithPartitionValues.
66+
* This should only be passed as true if it can actually be supported.
67+
* For ParquetFileFormat and OrcFileFormat, passing this option is required.
6368
*
6469
* TODO: we should just have different traits for the different formats.
6570
*/
@@ -184,6 +189,14 @@ object FileFormat {
184189

185190
val METADATA_NAME = "_metadata"
186191

192+
/**
193+
* Option to pass to buildReaderWithPartitionValues to return columnar batch output or not.
194+
* For ParquetFileFormat and OrcFileFormat, passing this option is required.
195+
* This should only be passed as true if it can actually be supported, which can be checked
196+
* by calling supportBatch.
197+
*/
198+
val OPTION_RETURNING_BATCH = "returning_batch"
199+
187200
// supported metadata struct fields for hadoop fs relation
188201
val METADATA_STRUCT: StructType = new StructType()
189202
.add(StructField(FILE_PATH, StringType))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import org.apache.spark.sql.SparkSession
3636
import org.apache.spark.sql.catalyst.InternalRow
3737
import org.apache.spark.sql.catalyst.expressions._
3838
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
39-
import org.apache.spark.sql.execution.WholeStageCodegenExec
4039
import org.apache.spark.sql.execution.datasources._
4140
import org.apache.spark.sql.sources._
4241
import org.apache.spark.sql.types._
@@ -100,8 +99,7 @@ class OrcFileFormat
10099

101100
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
102101
val conf = sparkSession.sessionState.conf
103-
conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled &&
104-
!WholeStageCodegenExec.isTooManyFields(conf, schema) &&
102+
conf.orcVectorizedReaderEnabled &&
105103
schema.forall(s => OrcUtils.supportColumnarReads(
106104
s.dataType, sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled))
107105
}
@@ -113,6 +111,18 @@ class OrcFileFormat
113111
true
114112
}
115113

114+
/**
115+
* Build the reader.
116+
*
117+
* @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in options, to indicate whether
118+
* the reader should return row or columnar output.
119+
* If the caller can handle both, pass
120+
* FileFormat.OPTION_RETURNING_BATCH ->
121+
* supportBatch(sparkSession,
122+
* StructType(requiredSchema.fields ++ partitionSchema.fields))
123+
* as the option.
124+
* It should be set to "true" only if this reader can support it.
125+
*/
116126
override def buildReaderWithPartitionValues(
117127
sparkSession: SparkSession,
118128
dataSchema: StructType,
@@ -124,9 +134,24 @@ class OrcFileFormat
124134

125135
val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
126136
val sqlConf = sparkSession.sessionState.conf
127-
val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
128137
val capacity = sqlConf.orcVectorizedReaderBatchSize
129138

139+
// Should always be set by FileSourceScanExec creating this.
140+
// Check conf before checking option, to allow working around an issue by changing conf.
141+
val enableVectorizedReader = sqlConf.orcVectorizedReaderEnabled &&
142+
options.get(FileFormat.OPTION_RETURNING_BATCH)
143+
.getOrElse {
144+
throw new IllegalArgumentException(
145+
"OPTION_RETURNING_BATCH should always be set for OrcFileFormat. " +
146+
"To workaround this issue, set spark.sql.orc.enableVectorizedReader=false.")
147+
}
148+
.equals("true")
149+
if (enableVectorizedReader) {
150+
// If the passed option said that we are to return batches, we need to also be able to
151+
// do this based on config and resultSchema.
152+
assert(supportBatch(sparkSession, resultSchema))
153+
}
154+
130155
OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis)
131156

132157
val broadcastedConf =

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
4545
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
4646
import org.apache.spark.sql.catalyst.util.DateTimeUtils
4747
import org.apache.spark.sql.errors.QueryExecutionErrors
48-
import org.apache.spark.sql.execution.WholeStageCodegenExec
4948
import org.apache.spark.sql.execution.datasources._
5049
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
5150
import org.apache.spark.sql.internal.SQLConf
@@ -169,12 +168,11 @@ class ParquetFileFormat
169168
}
170169

171170
/**
172-
* Returns whether the reader will return the rows as batch or not.
171+
* Returns whether the reader can return the rows as batch or not.
173172
*/
174173
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
175174
val conf = sparkSession.sessionState.conf
176-
ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && conf.wholeStageEnabled &&
177-
!WholeStageCodegenExec.isTooManyFields(conf, schema)
175+
ParquetUtils.isBatchReadSupportedForSchema(conf, schema)
178176
}
179177

180178
override def vectorTypes(
@@ -197,6 +195,18 @@ class ParquetFileFormat
197195
true
198196
}
199197

198+
/**
199+
* Build the reader.
200+
*
201+
* @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in options, to indicate whether
202+
* the reader should return row or columnar output.
203+
* If the caller can handle both, pass
204+
* FileFormat.OPTION_RETURNING_BATCH ->
205+
* supportBatch(sparkSession,
206+
* StructType(requiredSchema.fields ++ partitionSchema.fields))
207+
* as the option.
208+
* It should be set to "true" only if this reader can support it.
209+
*/
200210
override def buildReaderWithPartitionValues(
201211
sparkSession: SparkSession,
202212
dataSchema: StructType,
@@ -245,8 +255,6 @@ class ParquetFileFormat
245255
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
246256
val capacity = sqlConf.parquetVectorizedReaderBatchSize
247257
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
248-
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
249-
val returningBatch = supportBatch(sparkSession, resultSchema)
250258
val pushDownDate = sqlConf.parquetFilterPushDownDate
251259
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
252260
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
@@ -257,6 +265,22 @@ class ParquetFileFormat
257265
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
258266
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
259267

268+
// Should always be set by FileSourceScanExec creating this.
269+
// Check conf before checking option, to allow working around an issue by changing conf.
270+
val returningBatch = sparkSession.sessionState.conf.parquetVectorizedReaderEnabled &&
271+
options.get(FileFormat.OPTION_RETURNING_BATCH)
272+
.getOrElse {
273+
throw new IllegalArgumentException(
274+
"OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. " +
275+
"To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.")
276+
}
277+
.equals("true")
278+
if (returningBatch) {
279+
// If the passed option said that we are to return batches, we need to also be able to
280+
// do this based on config and resultSchema.
281+
assert(supportBatch(sparkSession, resultSchema))
282+
}
283+
260284
(file: PartitionedFile) => {
261285
assert(file.partitionValues.numFields == partitionSchema.size)
262286

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,4 +622,30 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
622622
}
623623
}
624624
}
625+
626+
Seq("parquet", "orc").foreach { format =>
627+
test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in $format") {
628+
// The issue was that ParquetFileFormat would not count the _metadata columns towards
629+
// the WholeStageCodegenExec.isTooManyFields limit, while FileSourceScanExec would,
630+
// resulting in Parquet reader returning columnar output, while scan expected row.
631+
withTempPath { dir =>
632+
sql(s"SELECT ${(1 to 100).map(i => s"id+$i as c$i").mkString(", ")} FROM RANGE(100)")
633+
.write.format(format).save(dir.getAbsolutePath)
634+
(98 to 102).foreach { wscgCols =>
635+
withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> wscgCols.toString) {
636+
// Would fail with
637+
// java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch
638+
// cannot be cast to org.apache.spark.sql.catalyst.InternalRow
639+
sql(
640+
s"""
641+
|SELECT
642+
| ${(1 to 100).map(i => s"sum(c$i)").mkString(", ")},
643+
| max(_metadata.file_path)
644+
|FROM $format.`$dir`""".stripMargin
645+
).collect()
646+
}
647+
}
648+
}
649+
}
650+
}
625651
}

0 commit comments

Comments
 (0)