Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,12 @@ case class FileSourceScanExec(
// Note that some vals referring the file-based relation are lazy intentionally
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
override lazy val supportsColumnar: Boolean = {
relation.fileFormat.supportBatch(relation.sparkSession, schema)
val conf = relation.sparkSession.sessionState.conf
// Only output columnar if there is WSCG to read it.
val requiredWholeStageCodegenSettings =
conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf, schema)
requiredWholeStageCodegenSettings &&
relation.fileFormat.supportBatch(relation.sparkSession, schema)
}

private lazy val needsUnsafeRowConversion: Boolean = {
Expand Down Expand Up @@ -447,14 +452,16 @@ case class FileSourceScanExec(
}

lazy val inputRDD: RDD[InternalRow] = {
val options = relation.options +
(FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString)
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
options = options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))

val readRDD = if (bucketedScan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ trait FileFormat {

/**
* Returns whether this format supports returning columnar batch or not.
* If columnar batch output is requested, users shall supply
* FileFormat.OPTION_RETURNING_BATCH -> true
* in relation options when calling buildReaderWithPartitionValues.
* This should only be passed as true if it can actually be supported.
* For ParquetFileFormat and OrcFileFormat, passing this option is required.
*
* TODO: we should just have different traits for the different formats.
*/
Expand Down Expand Up @@ -184,6 +189,14 @@ object FileFormat {

val METADATA_NAME = "_metadata"

/**
* Option to pass to buildReaderWithPartitionValues to return columnar batch output or not.
* For ParquetFileFormat and OrcFileFormat, passing this option is required.
* This should only be passed as true if it can actually be supported, which can be checked
* by calling supportBatch.
*/
val OPTION_RETURNING_BATCH = "returning_batch"

// supported metadata struct fields for hadoop fs relation
val METADATA_STRUCT: StructType = new StructType()
.add(StructField(FILE_PATH, StringType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -100,8 +99,7 @@ class OrcFileFormat

override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled &&
!WholeStageCodegenExec.isTooManyFields(conf, schema) &&
conf.orcVectorizedReaderEnabled &&
schema.forall(s => OrcUtils.supportColumnarReads(
s.dataType, sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled))
}
Expand All @@ -113,6 +111,18 @@ class OrcFileFormat
true
}

/**
* Build the reader.
*
* @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in options, to indicate whether
* the reader should return row or columnar output.
* If the caller can handle both, pass
* FileFormat.OPTION_RETURNING_BATCH ->
* supportBatch(sparkSession,
* StructType(requiredSchema.fields ++ partitionSchema.fields))
* as the option.
* It should be set to "true" only if this reader can support it.
*/
override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
Expand All @@ -124,9 +134,24 @@ class OrcFileFormat

val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
val capacity = sqlConf.orcVectorizedReaderBatchSize

// Should always be set by FileSourceScanExec creating this.
// Check conf before checking option, to allow working around an issue by changing conf.
val enableVectorizedReader = sqlConf.orcVectorizedReaderEnabled &&
options.get(FileFormat.OPTION_RETURNING_BATCH)
.getOrElse {
throw new IllegalArgumentException(
"OPTION_RETURNING_BATCH should always be set for OrcFileFormat. " +
"To workaround this issue, set spark.sql.orc.enableVectorizedReader=false.")
}
.equals("true")
if (enableVectorizedReader) {
// If the passed option said that we are to return batches, we need to also be able to
// do this based on config and resultSchema.
assert(supportBatch(sparkSession, resultSchema))
}

OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis)

val broadcastedConf =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -169,12 +168,11 @@ class ParquetFileFormat
}

/**
* Returns whether the reader will return the rows as batch or not.
* Returns whether the reader can return the rows as batch or not.
*/
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && conf.wholeStageEnabled &&
!WholeStageCodegenExec.isTooManyFields(conf, schema)
ParquetUtils.isBatchReadSupportedForSchema(conf, schema)
}

override def vectorTypes(
Expand All @@ -197,6 +195,18 @@ class ParquetFileFormat
true
}

/**
* Build the reader.
*
* @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in options, to indicate whether
* the reader should return row or columnar output.
* If the caller can handle both, pass
* FileFormat.OPTION_RETURNING_BATCH ->
* supportBatch(sparkSession,
* StructType(requiredSchema.fields ++ partitionSchema.fields))
* as the option.
* It should be set to "true" only if this reader can support it.
*/
override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
Expand Down Expand Up @@ -245,8 +255,6 @@ class ParquetFileFormat
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
Expand All @@ -257,6 +265,22 @@ class ParquetFileFormat
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead

// Should always be set by FileSourceScanExec creating this.
// Check conf before checking option, to allow working around an issue by changing conf.
val returningBatch = sparkSession.sessionState.conf.parquetVectorizedReaderEnabled &&
options.get(FileFormat.OPTION_RETURNING_BATCH)
.getOrElse {
throw new IllegalArgumentException(
"OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. " +
"To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.")
}
.equals("true")
if (returningBatch) {
// If the passed option said that we are to return batches, we need to also be able to
// do this based on config and resultSchema.
assert(supportBatch(sparkSession, resultSchema))
}

(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,4 +622,30 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
}
}
}

Seq("parquet", "orc").foreach { format =>
test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in $format") {
// The issue was that ParquetFileFormat would not count the _metadata columns towards
// the WholeStageCodegenExec.isTooManyFields limit, while FileSourceScanExec would,
// resulting in Parquet reader returning columnar output, while scan expected row.
withTempPath { dir =>
sql(s"SELECT ${(1 to 100).map(i => s"id+$i as c$i").mkString(", ")} FROM RANGE(100)")
.write.format(format).save(dir.getAbsolutePath)
(98 to 102).foreach { wscgCols =>
withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> wscgCols.toString) {
// Would fail with
// java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch
// cannot be cast to org.apache.spark.sql.catalyst.InternalRow
sql(
s"""
|SELECT
| ${(1 to 100).map(i => s"sum(c$i)").mkString(", ")},
| max(_metadata.file_path)
|FROM $format.`$dir`""".stripMargin
).collect()
}
}
}
}
}
}