-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27269][SQL] File source v2 should validate data schema only #24203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #103913 has finished for PR 24203 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good except one Dongjoon's comment and the test failure.
| }.asNullable | ||
| lazy val dataSchema: StructType = userSpecifiedSchema.map { schema => | ||
| val partitionSchema = fileIndex.partitionSchema | ||
| val equality = sparkSession.sessionState.conf.resolver |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
equality -> resolver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The naming is following DataSource.scala line 185. I think it is OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the one line written two year ago? All the other new instances use resolver = sparkSession.sessionState.conf.resolver (more than 7).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you search with conf.resolver, there are more instances with val resolver.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
Show resolved
Hide resolved
| val partitionSchema = fileIndex.partitionSchema | ||
| val equality = sparkSession.sessionState.conf.resolver | ||
| StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name)))) | ||
| }.orElse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation? (https://github.com/databricks/scala-style-guide#pattern-matching)
Line 50 ~ 58 should be updated.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
Show resolved
Hide resolved
| val table = new DummyFileTable(spark, options, Seq(pathName), expectedDataSchema, None) | ||
| assert(table.dataSchema == expectedDataSchema) | ||
| val expectedPartitionSchema = StructType(Seq(StructField("p", IntegerType, true))) | ||
| assert(table.fileIndex.partitionSchema == expectedPartitionSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. additional space after ==.
| def inferSchema(files: Seq[FileStatus]): Option[StructType] | ||
|
|
||
| /** | ||
| * Returns whether this format supports the given [[DataType]] in write path. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write -> read/write.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gengliangwang . Sorry for missing the regression last time.
This PR seems to have a different issue. Previously, we check the schema in read-path via toBatch. For now, it's removed. Currently, we only remove partition columns by names and we don't check the column types in the data schema.
|
Test build #103936 has finished for PR 24203 at commit
|
|
@dongjoon-hyun, do you mean the issue is that we don't check the schema in |
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
Show resolved
Hide resolved
|
Test build #103967 has finished for PR 24203 at commit
|
|
Test build #103966 has finished for PR 24203 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
Outdated
Show resolved
Hide resolved
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only one nit comment about naming. Thank you, @gengliangwang .
|
Test build #103984 has finished for PR 24203 at commit
|
|
Merged to master. |
What changes were proposed in this pull request?
Currently, File source v2 allows each data source to specify the supported data types by implementing the method
supportsDataTypeinFileScanandFileWriteBuilder.However, in the read path, the validation checks all the data types in
readSchema, which might contain partition columns. This is actually a regression. E.g. Text data source only supports String data type, while the partition columns can still contain Integer type since partition columns are processed by Spark.This PR is to:
How was this patch tested?
Unit test