diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 40d29af28f90..47103f6698f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -211,7 +211,12 @@ case class FileSourceScanExec( // Note that some vals referring the file-based relation are lazy intentionally // so that this plan can be canonicalized on executor side too. See SPARK-23731. override lazy val supportsColumnar: Boolean = { - relation.fileFormat.supportBatch(relation.sparkSession, schema) + val conf = relation.sparkSession.sessionState.conf + // Only output columnar if there is WSCG to read it. + val requiredWholeStageCodegenSettings = + conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf, schema) + requiredWholeStageCodegenSettings && + relation.fileFormat.supportBatch(relation.sparkSession, schema) } private lazy val needsUnsafeRowConversion: Boolean = { @@ -447,6 +452,8 @@ case class FileSourceScanExec( } lazy val inputRDD: RDD[InternalRow] = { + val options = relation.options + + (FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString) val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, @@ -454,7 +461,7 @@ case class FileSourceScanExec( partitionSchema = relation.partitionSchema, requiredSchema = requiredSchema, filters = pushedDownFilters, - options = relation.options, + options = options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) val readRDD = if (bucketedScan) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index f9b37fb5d9fc..0263de8525fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -60,6 +60,11 @@ trait FileFormat { /** * Returns whether this format supports returning columnar batch or not. + * If columnar batch output is requested, users shall supply + * FileFormat.OPTION_RETURNING_BATCH -> true + * in relation options when calling buildReaderWithPartitionValues. + * This should only be passed as true if it can actually be supported. + * For ParquetFileFormat and OrcFileFormat, passing this option is required. * * TODO: we should just have different traits for the different formats. */ @@ -184,6 +189,14 @@ object FileFormat { val METADATA_NAME = "_metadata" + /** + * Option to pass to buildReaderWithPartitionValues to return columnar batch output or not. + * For ParquetFileFormat and OrcFileFormat, passing this option is required. + * This should only be passed as true if it can actually be supported, which can be checked + * by calling supportBatch. + */ + val OPTION_RETURNING_BATCH = "returning_batch" + // supported metadata struct fields for hadoop fs relation val METADATA_STRUCT: StructType = new StructType() .add(StructField(FILE_PATH, StringType)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 2b060c901531..34517ed60deb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection -import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -100,8 +99,7 @@ class OrcFileFormat override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { val conf = sparkSession.sessionState.conf - conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled && - !WholeStageCodegenExec.isTooManyFields(conf, schema) && + conf.orcVectorizedReaderEnabled && schema.forall(s => OrcUtils.supportColumnarReads( s.dataType, sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled)) } @@ -113,6 +111,18 @@ class OrcFileFormat true } + /** + * Build the reader. + * + * @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in options, to indicate whether + * the reader should return row or columnar output. + * If the caller can handle both, pass + * FileFormat.OPTION_RETURNING_BATCH -> + * supportBatch(sparkSession, + * StructType(requiredSchema.fields ++ partitionSchema.fields)) + * as the option. + * It should be set to "true" only if this reader can support it. + */ override def buildReaderWithPartitionValues( sparkSession: SparkSession, dataSchema: StructType, @@ -124,9 +134,24 @@ class OrcFileFormat val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) val sqlConf = sparkSession.sessionState.conf - val enableVectorizedReader = supportBatch(sparkSession, resultSchema) val capacity = sqlConf.orcVectorizedReaderBatchSize + // Should always be set by FileSourceScanExec creating this. + // Check conf before checking option, to allow working around an issue by changing conf. + val enableVectorizedReader = sqlConf.orcVectorizedReaderEnabled && + options.get(FileFormat.OPTION_RETURNING_BATCH) + .getOrElse { + throw new IllegalArgumentException( + "OPTION_RETURNING_BATCH should always be set for OrcFileFormat. " + + "To workaround this issue, set spark.sql.orc.enableVectorizedReader=false.") + } + .equals("true") + if (enableVectorizedReader) { + // If the passed option said that we are to return batches, we need to also be able to + // do this based on config and resultSchema. + assert(supportBatch(sparkSession, resultSchema)) + } + OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis) val broadcastedConf = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 9765e7c78019..f66434d3cafa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -45,7 +45,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector} import org.apache.spark.sql.internal.SQLConf @@ -169,12 +168,11 @@ class ParquetFileFormat } /** - * Returns whether the reader will return the rows as batch or not. + * Returns whether the reader can return the rows as batch or not. */ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { val conf = sparkSession.sessionState.conf - ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && conf.wholeStageEnabled && - !WholeStageCodegenExec.isTooManyFields(conf, schema) + ParquetUtils.isBatchReadSupportedForSchema(conf, schema) } override def vectorTypes( @@ -197,6 +195,18 @@ class ParquetFileFormat true } + /** + * Build the reader. + * + * @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in options, to indicate whether + * the reader should return row or columnar output. + * If the caller can handle both, pass + * FileFormat.OPTION_RETURNING_BATCH -> + * supportBatch(sparkSession, + * StructType(requiredSchema.fields ++ partitionSchema.fields)) + * as the option. + * It should be set to "true" only if this reader can support it. + */ override def buildReaderWithPartitionValues( sparkSession: SparkSession, dataSchema: StructType, @@ -245,8 +255,6 @@ class ParquetFileFormat val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion val capacity = sqlConf.parquetVectorizedReaderBatchSize val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown - // Whole stage codegen (PhysicalRDD) is able to deal with batches directly - val returningBatch = supportBatch(sparkSession, resultSchema) val pushDownDate = sqlConf.parquetFilterPushDownDate val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal @@ -257,6 +265,22 @@ class ParquetFileFormat val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead + // Should always be set by FileSourceScanExec creating this. + // Check conf before checking option, to allow working around an issue by changing conf. + val returningBatch = sparkSession.sessionState.conf.parquetVectorizedReaderEnabled && + options.get(FileFormat.OPTION_RETURNING_BATCH) + .getOrElse { + throw new IllegalArgumentException( + "OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. " + + "To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.") + } + .equals("true") + if (returningBatch) { + // If the passed option said that we are to return batches, we need to also be able to + // do this based on config and resultSchema. + assert(supportBatch(sparkSession, resultSchema)) + } + (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala index ad75f634050d..5ab439746c8b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala @@ -622,4 +622,30 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { } } } + + Seq("parquet", "orc").foreach { format => + test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in $format") { + // The issue was that ParquetFileFormat would not count the _metadata columns towards + // the WholeStageCodegenExec.isTooManyFields limit, while FileSourceScanExec would, + // resulting in Parquet reader returning columnar output, while scan expected row. + withTempPath { dir => + sql(s"SELECT ${(1 to 100).map(i => s"id+$i as c$i").mkString(", ")} FROM RANGE(100)") + .write.format(format).save(dir.getAbsolutePath) + (98 to 102).foreach { wscgCols => + withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> wscgCols.toString) { + // Would fail with + // java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch + // cannot be cast to org.apache.spark.sql.catalyst.InternalRow + sql( + s""" + |SELECT + | ${(1 to 100).map(i => s"sum(c$i)").mkString(", ")}, + | max(_metadata.file_path) + |FROM $format.`$dir`""".stripMargin + ).collect() + } + } + } + } + } }