Skip to content

Conversation

@juliuszsompolski
Copy link
Contributor

@juliuszsompolski juliuszsompolski commented Oct 26, 2022

What changes were proposed in this pull request?

We move the decision about supporting columnar output based on WSCG one level from ParquetFileFormat / OrcFileFormat up to FileSourceScanExec, and pass it as a new required option for ParquetFileFormat / OrcFileFormat. Now the semantics is as follows:

  • ParquetFileFormat.supportsBatch and OrcFileFormat.supportsBatch returns whether it can, not necessarily will return columnar output.
  • To return columnar output, an option FileFormat.OPTION_RETURNING_BATCH needs to be passed to buildReaderWithPartitionValues in these two file formats. It should only be set to true if supportsBatch is also true, but it can be set to false if we don't want columnar output nevertheless - this way, FileSourceScanExec can set it to false when there are more than 100 columsn for WSCG, and ParquetFileFormat / OrcFileFormat doesn't have to concern itself about WSCG limits.
  • To avoid not passing it by accident, this option is made required. Making it required requires updating a few places that use it, but an error resulting from this is very obscure. It's better to fail early and explicitly here.

Why are the changes needed?

This explains it for ParquetFileFormat. OrcFileFormat had exactly the same issue.

java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to org.apache.spark.sql.catalyst.InternalRow was being thrown because ParquetReader was outputting columnar batches, while FileSourceScanExec expected row output.

The mismatch comes from the fact that ParquetFileFormat.supportBatch depends on WholeStageCodegenExec.isTooManyFields(conf, schema), where the threshold is 100 fields.

When this is used in FileSourceScanExec:

  override lazy val supportsColumnar: Boolean = {
      relation.fileFormat.supportBatch(relation.sparkSession, schema)
  }

the schema comes from output attributes, which includes extra metadata attributes.

However, inside ParquetFileFormat.buildReaderWithPartitionValues it was calculated again as

      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = requiredSchema,
        filters = pushedDownFilters,
        options = options,
        hadoopConf = hadoopConf
...
val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
...
val returningBatch = supportBatch(sparkSession, resultSchema)

Where requiredSchema and partitionSchema wouldn't include the metadata columns:

FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L, file_path#6388)
FileSourceScanExec: dataSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
FileSourceScanExec: partitionSchema: StructType()
FileSourceScanExec: requiredSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))

Column like file_path#6388 are added by the scan, and contain metadata added by the scan, not by the file reader which concerns itself with what is within the file.

Does this PR introduce any user-facing change?

Not a public API change, but it is now required to pass FileFormat.OPTION_RETURNING_BATCH in options to ParquetFileFormat.buildReaderWithPartitionValues. The only user of this API in Apache Spark is FileSourceScanExec.

How was this patch tested?

Tests added

@juliuszsompolski
Copy link
Contributor Author

cc @cloud-fan @rednaxelafx

@github-actions github-actions bot added the SQL label Oct 26, 2022
Copy link
Contributor

@rednaxelafx rednaxelafx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks a lot for finding and fixing this!

@juliuszsompolski juliuszsompolski changed the title [SPARK-40918] Mismatch in WSCG.isTooManyFields when using _metadata [SPARK-40918] Mismatch between FileSourceScanExec and ParquetFileFormat on producing columnar output Oct 26, 2022
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @sunchao and @viirya

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-40918] Mismatch between FileSourceScanExec and ParquetFileFormat on producing columnar output [SPARK-40918][SQL] Mismatch between FileSourceScanExec and ParquetFileFormat on producing columnar output Oct 26, 2022
@juliuszsompolski juliuszsompolski changed the title [SPARK-40918][SQL] Mismatch between FileSourceScanExec and ParquetFileFormat on producing columnar output [SPARK-40918][SQL] Mismatch between FileSourceScanExec and Orc and ParquetFileFormat on producing columnar output Oct 27, 2022
options.get(FileFormat.OPTION_RETURNING_BATCH)
.getOrElse {
throw new IllegalArgumentException(
"OPTION_RETURNING_BATCH should always be set for OrcFileFormat." +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Add one space at the end?

"OPTION_RETURNING_BATCH should always be set for OrcFileFormat." +
"OPTION_RETURNING_BATCH should always be set for OrcFileFormat. " +

.getOrElse {
throw new IllegalArgumentException(
"OPTION_RETURNING_BATCH should always be set for OrcFileFormat." +
"To workaround this issue, set spark.sql.orc.enableVectorizedReader=false.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a correct recommendation? Why not recommend to set OPTION_RETURNING_BATCH?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a correct recommendation? Why not recommend to set OPTION_RETURNING_BATCH?

@dongjoon-hyun passing OPTION_RETURNING_BATCH is something that the developer of the code that called without setting this option can do. For an end user who faces this issue by hitting some code path that doesn't set this, the workaround would be to disable this config. Hence it's called a "workaround" not a "fix".

options.get(FileFormat.OPTION_RETURNING_BATCH)
.getOrElse {
throw new IllegalArgumentException(
"OPTION_RETURNING_BATCH should always be set for ParquetFileFormat." +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. nit. Add one more space at the end of the message.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that OrcFileFormat has no issue like _metadata columns, I'm wondering why the title implies there is an issue in Orc? I didn't find any proper explanation about ORC issue in the PR description too.

Mismatch between FileSourceScanExec and Orc and ParquetFileFormat on producing columnar output

Could you elaborate more about ORC case with !WholeStageCodegenExec.isTooManyFields(conf, schema) in the PR description, @juliuszsompolski ?

@juliuszsompolski
Copy link
Contributor Author

Given that OrcFileFormat has no issue like _metadata columns

@dongjoon-hyun I think OrcFIleFormat has exactly the same issue as ParquetFileFormat, like @cloud-fan pointed out? When there was a column like _metadata.file_path requested for OrcFileFormat, it would also count that column in FileSourceScanExec.supporsBatch, but not count it in OrcFileFormat.supportsBatch during buildReaderWithPartitionValues. The code changes I made to OrcFileFormat exactly mirror what I did to ParquetFileFormat.
I updated the description to descibe "ParquetFileFormat or OrcFileFormat" in various places, but the issue seems exactly the same.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you are right. I mislead the context. Thank you, @juliuszsompolski .

@juliuszsompolski
Copy link
Contributor Author

Thanks. I updated the title after fixing the Orc, but forgot to update the description which still was describing about Parquet only.
This bug exists in branch-3.3 as well btw; if it doesn't merge cleanly I can open a separate PR later.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 77694b4 Oct 28, 2022
@cloud-fan
Copy link
Contributor

unfortunately it conflicts with 3.3, @juliuszsompolski could you open a backport PR? thanks!

juliuszsompolski added a commit to juliuszsompolski/apache-spark that referenced this pull request Oct 28, 2022
…rquetFileFormat on producing columnar output

### What changes were proposed in this pull request?

We move the decision about supporting columnar output based on WSCG one level from ParquetFileFormat / OrcFileFormat up to FileSourceScanExec, and pass it as a new required option for ParquetFileFormat / OrcFileFormat. Now the semantics is as follows:
* `ParquetFileFormat.supportsBatch` and `OrcFileFormat.supportsBatch` returns whether it **can**, not necessarily **will** return columnar output.
* To return columnar output, an option `FileFormat.OPTION_RETURNING_BATCH` needs to be passed to `buildReaderWithPartitionValues` in these two file formats. It should only be set to `true` if `supportsBatch` is also `true`, but it can be set to `false` if we don't want columnar output nevertheless - this way, `FileSourceScanExec` can set it to false when there are more than 100 columsn for WSCG, and `ParquetFileFormat` / `OrcFileFormat` doesn't have to concern itself about WSCG limits.
* To avoid not passing it by accident, this option is made required. Making it required requires updating a few places that use it, but an error resulting from this is very obscure. It's better to fail early and explicitly here.

### Why are the changes needed?

This explains it for `ParquetFileFormat`. `OrcFileFormat` had exactly the same issue.

`java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to org.apache.spark.sql.catalyst.InternalRow` was being thrown because ParquetReader was outputting columnar batches, while FileSourceScanExec expected row output.

The mismatch comes from the fact that `ParquetFileFormat.supportBatch` depends on `WholeStageCodegenExec.isTooManyFields(conf, schema)`, where the threshold is 100 fields.

When this is used in `FileSourceScanExec`:
```
  override lazy val supportsColumnar: Boolean = {
      relation.fileFormat.supportBatch(relation.sparkSession, schema)
  }
```
the `schema` comes from output attributes, which includes extra metadata attributes.

However, inside `ParquetFileFormat.buildReaderWithPartitionValues` it was calculated again as
```
      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = requiredSchema,
        filters = pushedDownFilters,
        options = options,
        hadoopConf = hadoopConf
...
val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
...
val returningBatch = supportBatch(sparkSession, resultSchema)
```

Where `requiredSchema` and `partitionSchema` wouldn't include the metadata columns:
```
FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L, file_path#6388)
FileSourceScanExec: dataSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
FileSourceScanExec: partitionSchema: StructType()
FileSourceScanExec: requiredSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
```

Column like `file_path#6388` are added by the scan, and contain metadata added by the scan, not by the file reader which concerns itself with what is within the file.

### Does this PR introduce _any_ user-facing change?

Not a public API change, but it is now required to pass `FileFormat.OPTION_RETURNING_BATCH` in `options` to `ParquetFileFormat.buildReaderWithPartitionValues`. The only user of this API in Apache Spark is `FileSourceScanExec`.

### How was this patch tested?

Tests added

Closes apache#38397 from juliuszsompolski/SPARK-40918.

Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@juliuszsompolski
Copy link
Contributor Author

3.3 PR: #38431

cloud-fan pushed a commit that referenced this pull request Oct 31, 2022
…nd ParquetFileFormat on producing columnar output

### What changes were proposed in this pull request?

We move the decision about supporting columnar output based on WSCG one level from ParquetFileFormat / OrcFileFormat up to FileSourceScanExec, and pass it as a new required option for ParquetFileFormat / OrcFileFormat. Now the semantics is as follows:
* `ParquetFileFormat.supportsBatch` and `OrcFileFormat.supportsBatch` returns whether it **can**, not necessarily **will** return columnar output.
* To return columnar output, an option `FileFormat.OPTION_RETURNING_BATCH` needs to be passed to `buildReaderWithPartitionValues` in these two file formats. It should only be set to `true` if `supportsBatch` is also `true`, but it can be set to `false` if we don't want columnar output nevertheless - this way, `FileSourceScanExec` can set it to false when there are more than 100 columsn for WSCG, and `ParquetFileFormat` / `OrcFileFormat` doesn't have to concern itself about WSCG limits.
* To avoid not passing it by accident, this option is made required. Making it required requires updating a few places that use it, but an error resulting from this is very obscure. It's better to fail early and explicitly here.

### Why are the changes needed?

This explains it for `ParquetFileFormat`. `OrcFileFormat` had exactly the same issue.

`java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to org.apache.spark.sql.catalyst.InternalRow` was being thrown because ParquetReader was outputting columnar batches, while FileSourceScanExec expected row output.

The mismatch comes from the fact that `ParquetFileFormat.supportBatch` depends on `WholeStageCodegenExec.isTooManyFields(conf, schema)`, where the threshold is 100 fields.

When this is used in `FileSourceScanExec`:
```
  override lazy val supportsColumnar: Boolean = {
      relation.fileFormat.supportBatch(relation.sparkSession, schema)
  }
```
the `schema` comes from output attributes, which includes extra metadata attributes.

However, inside `ParquetFileFormat.buildReaderWithPartitionValues` it was calculated again as
```
      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = requiredSchema,
        filters = pushedDownFilters,
        options = options,
        hadoopConf = hadoopConf
...
val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
...
val returningBatch = supportBatch(sparkSession, resultSchema)
```

Where `requiredSchema` and `partitionSchema` wouldn't include the metadata columns:
```
FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L, file_path#6388)
FileSourceScanExec: dataSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
FileSourceScanExec: partitionSchema: StructType()
FileSourceScanExec: requiredSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
```

Column like `file_path#6388` are added by the scan, and contain metadata added by the scan, not by the file reader which concerns itself with what is within the file.

### Does this PR introduce _any_ user-facing change?

Not a public API change, but it is now required to pass `FileFormat.OPTION_RETURNING_BATCH` in `options` to `ParquetFileFormat.buildReaderWithPartitionValues`. The only user of this API in Apache Spark is `FileSourceScanExec`.

### How was this patch tested?

Tests added

Backports #38397 from juliuszsompolski/SPARK-40918.

Authored-by: Juliusz Sompolski <julekdatabricks.com>
Signed-off-by: Wenchen Fan <wenchendatabricks.com>

Closes #38431 from juliuszsompolski/SPARK-40918-3.3.

Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
…rquetFileFormat on producing columnar output

### What changes were proposed in this pull request?

We move the decision about supporting columnar output based on WSCG one level from ParquetFileFormat / OrcFileFormat up to FileSourceScanExec, and pass it as a new required option for ParquetFileFormat / OrcFileFormat. Now the semantics is as follows:
* `ParquetFileFormat.supportsBatch` and `OrcFileFormat.supportsBatch` returns whether it **can**, not necessarily **will** return columnar output.
* To return columnar output, an option `FileFormat.OPTION_RETURNING_BATCH` needs to be passed to `buildReaderWithPartitionValues` in these two file formats. It should only be set to `true` if `supportsBatch` is also `true`, but it can be set to `false` if we don't want columnar output nevertheless - this way, `FileSourceScanExec` can set it to false when there are more than 100 columsn for WSCG, and `ParquetFileFormat` / `OrcFileFormat` doesn't have to concern itself about WSCG limits.
* To avoid not passing it by accident, this option is made required. Making it required requires updating a few places that use it, but an error resulting from this is very obscure. It's better to fail early and explicitly here.

### Why are the changes needed?

This explains it for `ParquetFileFormat`. `OrcFileFormat` had exactly the same issue.

`java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to org.apache.spark.sql.catalyst.InternalRow` was being thrown because ParquetReader was outputting columnar batches, while FileSourceScanExec expected row output.

The mismatch comes from the fact that `ParquetFileFormat.supportBatch` depends on `WholeStageCodegenExec.isTooManyFields(conf, schema)`, where the threshold is 100 fields.

When this is used in `FileSourceScanExec`:
```
  override lazy val supportsColumnar: Boolean = {
      relation.fileFormat.supportBatch(relation.sparkSession, schema)
  }
```
the `schema` comes from output attributes, which includes extra metadata attributes.

However, inside `ParquetFileFormat.buildReaderWithPartitionValues` it was calculated again as
```
      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = requiredSchema,
        filters = pushedDownFilters,
        options = options,
        hadoopConf = hadoopConf
...
val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
...
val returningBatch = supportBatch(sparkSession, resultSchema)
```

Where `requiredSchema` and `partitionSchema` wouldn't include the metadata columns:
```
FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L, file_path#6388)
FileSourceScanExec: dataSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
FileSourceScanExec: partitionSchema: StructType()
FileSourceScanExec: requiredSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
```

Column like `file_path#6388` are added by the scan, and contain metadata added by the scan, not by the file reader which concerns itself with what is within the file.

### Does this PR introduce _any_ user-facing change?

Not a public API change, but it is now required to pass `FileFormat.OPTION_RETURNING_BATCH` in `options` to `ParquetFileFormat.buildReaderWithPartitionValues`. The only user of this API in Apache Spark is `FileSourceScanExec`.

### How was this patch tested?

Tests added

Closes apache#38397 from juliuszsompolski/SPARK-40918.

Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants