Skip to content

Conversation

@juliuszsompolski
Copy link
Contributor

@juliuszsompolski juliuszsompolski commented Oct 28, 2022

What changes were proposed in this pull request?

We move the decision about supporting columnar output based on WSCG one level from ParquetFileFormat / OrcFileFormat up to FileSourceScanExec, and pass it as a new required option for ParquetFileFormat / OrcFileFormat. Now the semantics is as follows:

  • ParquetFileFormat.supportsBatch and OrcFileFormat.supportsBatch returns whether it can, not necessarily will return columnar output.
  • To return columnar output, an option FileFormat.OPTION_RETURNING_BATCH needs to be passed to buildReaderWithPartitionValues in these two file formats. It should only be set to true if supportsBatch is also true, but it can be set to false if we don't want columnar output nevertheless - this way, FileSourceScanExec can set it to false when there are more than 100 columsn for WSCG, and ParquetFileFormat / OrcFileFormat doesn't have to concern itself about WSCG limits.
  • To avoid not passing it by accident, this option is made required. Making it required requires updating a few places that use it, but an error resulting from this is very obscure. It's better to fail early and explicitly here.

Why are the changes needed?

This explains it for ParquetFileFormat. OrcFileFormat had exactly the same issue.

java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to org.apache.spark.sql.catalyst.InternalRow was being thrown because ParquetReader was outputting columnar batches, while FileSourceScanExec expected row output.

The mismatch comes from the fact that ParquetFileFormat.supportBatch depends on WholeStageCodegenExec.isTooManyFields(conf, schema), where the threshold is 100 fields.

When this is used in FileSourceScanExec:

  override lazy val supportsColumnar: Boolean = {
      relation.fileFormat.supportBatch(relation.sparkSession, schema)
  }

the schema comes from output attributes, which includes extra metadata attributes.

However, inside ParquetFileFormat.buildReaderWithPartitionValues it was calculated again as

      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = requiredSchema,
        filters = pushedDownFilters,
        options = options,
        hadoopConf = hadoopConf
...
val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
...
val returningBatch = supportBatch(sparkSession, resultSchema)

Where requiredSchema and partitionSchema wouldn't include the metadata columns:

FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L, file_path#6388)
FileSourceScanExec: dataSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
FileSourceScanExec: partitionSchema: StructType()
FileSourceScanExec: requiredSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))

Column like file_path#6388 are added by the scan, and contain metadata added by the scan, not by the file reader which concerns itself with what is within the file.

Does this PR introduce any user-facing change?

Not a public API change, but it is now required to pass FileFormat.OPTION_RETURNING_BATCH in options to ParquetFileFormat.buildReaderWithPartitionValues. The only user of this API in Apache Spark is FileSourceScanExec.

How was this patch tested?

Tests added

Backports #38397 from juliuszsompolski/SPARK-40918.

Authored-by: Juliusz Sompolski [email protected]
Signed-off-by: Wenchen Fan [email protected]

…rquetFileFormat on producing columnar output

### What changes were proposed in this pull request?

We move the decision about supporting columnar output based on WSCG one level from ParquetFileFormat / OrcFileFormat up to FileSourceScanExec, and pass it as a new required option for ParquetFileFormat / OrcFileFormat. Now the semantics is as follows:
* `ParquetFileFormat.supportsBatch` and `OrcFileFormat.supportsBatch` returns whether it **can**, not necessarily **will** return columnar output.
* To return columnar output, an option `FileFormat.OPTION_RETURNING_BATCH` needs to be passed to `buildReaderWithPartitionValues` in these two file formats. It should only be set to `true` if `supportsBatch` is also `true`, but it can be set to `false` if we don't want columnar output nevertheless - this way, `FileSourceScanExec` can set it to false when there are more than 100 columsn for WSCG, and `ParquetFileFormat` / `OrcFileFormat` doesn't have to concern itself about WSCG limits.
* To avoid not passing it by accident, this option is made required. Making it required requires updating a few places that use it, but an error resulting from this is very obscure. It's better to fail early and explicitly here.

### Why are the changes needed?

This explains it for `ParquetFileFormat`. `OrcFileFormat` had exactly the same issue.

`java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to org.apache.spark.sql.catalyst.InternalRow` was being thrown because ParquetReader was outputting columnar batches, while FileSourceScanExec expected row output.

The mismatch comes from the fact that `ParquetFileFormat.supportBatch` depends on `WholeStageCodegenExec.isTooManyFields(conf, schema)`, where the threshold is 100 fields.

When this is used in `FileSourceScanExec`:
```
  override lazy val supportsColumnar: Boolean = {
      relation.fileFormat.supportBatch(relation.sparkSession, schema)
  }
```
the `schema` comes from output attributes, which includes extra metadata attributes.

However, inside `ParquetFileFormat.buildReaderWithPartitionValues` it was calculated again as
```
      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = requiredSchema,
        filters = pushedDownFilters,
        options = options,
        hadoopConf = hadoopConf
...
val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
...
val returningBatch = supportBatch(sparkSession, resultSchema)
```

Where `requiredSchema` and `partitionSchema` wouldn't include the metadata columns:
```
FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L, file_path#6388)
FileSourceScanExec: dataSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
FileSourceScanExec: partitionSchema: StructType()
FileSourceScanExec: requiredSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
```

Column like `file_path#6388` are added by the scan, and contain metadata added by the scan, not by the file reader which concerns itself with what is within the file.

### Does this PR introduce _any_ user-facing change?

Not a public API change, but it is now required to pass `FileFormat.OPTION_RETURNING_BATCH` in `options` to `ParquetFileFormat.buildReaderWithPartitionValues`. The only user of this API in Apache Spark is `FileSourceScanExec`.

### How was this patch tested?

Tests added

Closes apache#38397 from juliuszsompolski/SPARK-40918.

Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@juliuszsompolski
Copy link
Contributor Author

@cloud-fan

@github-actions github-actions bot added the SQL label Oct 28, 2022
@dongjoon-hyun dongjoon-hyun changed the title [SPARK-40918][SQL] Mismatch between FileSourceScanExec and Orc and ParquetFileFormat on producing columnar output [SPARK-40918][SQL][3.3] Mismatch between FileSourceScanExec and Orc and ParquetFileFormat on producing columnar output Oct 28, 2022
@cloud-fan
Copy link
Contributor

@cloud-fan
Copy link
Contributor

thanks, merging to 3.3!

cloud-fan pushed a commit that referenced this pull request Oct 31, 2022
…nd ParquetFileFormat on producing columnar output

### What changes were proposed in this pull request?

We move the decision about supporting columnar output based on WSCG one level from ParquetFileFormat / OrcFileFormat up to FileSourceScanExec, and pass it as a new required option for ParquetFileFormat / OrcFileFormat. Now the semantics is as follows:
* `ParquetFileFormat.supportsBatch` and `OrcFileFormat.supportsBatch` returns whether it **can**, not necessarily **will** return columnar output.
* To return columnar output, an option `FileFormat.OPTION_RETURNING_BATCH` needs to be passed to `buildReaderWithPartitionValues` in these two file formats. It should only be set to `true` if `supportsBatch` is also `true`, but it can be set to `false` if we don't want columnar output nevertheless - this way, `FileSourceScanExec` can set it to false when there are more than 100 columsn for WSCG, and `ParquetFileFormat` / `OrcFileFormat` doesn't have to concern itself about WSCG limits.
* To avoid not passing it by accident, this option is made required. Making it required requires updating a few places that use it, but an error resulting from this is very obscure. It's better to fail early and explicitly here.

### Why are the changes needed?

This explains it for `ParquetFileFormat`. `OrcFileFormat` had exactly the same issue.

`java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to org.apache.spark.sql.catalyst.InternalRow` was being thrown because ParquetReader was outputting columnar batches, while FileSourceScanExec expected row output.

The mismatch comes from the fact that `ParquetFileFormat.supportBatch` depends on `WholeStageCodegenExec.isTooManyFields(conf, schema)`, where the threshold is 100 fields.

When this is used in `FileSourceScanExec`:
```
  override lazy val supportsColumnar: Boolean = {
      relation.fileFormat.supportBatch(relation.sparkSession, schema)
  }
```
the `schema` comes from output attributes, which includes extra metadata attributes.

However, inside `ParquetFileFormat.buildReaderWithPartitionValues` it was calculated again as
```
      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = requiredSchema,
        filters = pushedDownFilters,
        options = options,
        hadoopConf = hadoopConf
...
val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
...
val returningBatch = supportBatch(sparkSession, resultSchema)
```

Where `requiredSchema` and `partitionSchema` wouldn't include the metadata columns:
```
FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L, file_path#6388)
FileSourceScanExec: dataSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
FileSourceScanExec: partitionSchema: StructType()
FileSourceScanExec: requiredSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
```

Column like `file_path#6388` are added by the scan, and contain metadata added by the scan, not by the file reader which concerns itself with what is within the file.

### Does this PR introduce _any_ user-facing change?

Not a public API change, but it is now required to pass `FileFormat.OPTION_RETURNING_BATCH` in `options` to `ParquetFileFormat.buildReaderWithPartitionValues`. The only user of this API in Apache Spark is `FileSourceScanExec`.

### How was this patch tested?

Tests added

Backports #38397 from juliuszsompolski/SPARK-40918.

Authored-by: Juliusz Sompolski <julekdatabricks.com>
Signed-off-by: Wenchen Fan <wenchendatabricks.com>

Closes #38431 from juliuszsompolski/SPARK-40918-3.3.

Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@cloud-fan cloud-fan closed this Oct 31, 2022
@juliuszsompolski
Copy link
Contributor Author

all tests passed actually: https://github.com/juliuszsompolski/apache-spark/runs/9189880361

Yeah, they did in all three runs, but three times in a row it didn't update github status...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants