Skip to content

Conversation

@kiszk
Copy link
Member

@kiszk kiszk commented Mar 14, 2017

What changes were proposed in this pull request?

This PR fixes a problem that was reported in Databricks forum. When the following code is executed, a schema for id has nullable = true while read.schema() specifies nullable = false.

I realized this is because current Spark sets true into nullable in its schema for file-based data source. This PR makes this conservative setting precise.

val field = "id"
val df = spark.range(0, 5, 1, 1).toDF(field)
val fmt = "parquet"
val path = "/tmp/parquet"
val schema = StructType(Seq(StructField(field, LongType, false)))
df.write.format(fmt).mode("overwrite").save(path)
val dfRead = spark.read.format(fmt).schema(schema).load(path)
dfRead.printSchema

How was this patch tested?

added new test in DataFrameReaderWriterSuite

@SparkQA
Copy link

SparkQA commented Mar 14, 2017

Test build #74538 has finished for PR 17293 at commit e36bb0b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 14, 2017

Test build #74549 has finished for PR 17293 at commit 797a18d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Mar 14, 2017

Oh @kiszk, I think this deals with the essentially same problem in #14124 in a different way. I initially proposed forcing all cases into nullable schema in the similar places (please refer the details in the JIRA and PR). It now turned to propose forcing the schemas into nullable ones when we set, per discussion, though. I guess the problem is essentially the same.

@kiszk
Copy link
Member Author

kiszk commented Mar 15, 2017

@HyukjinKwon Thank you for your clarification. I understood when we enable user-specified schema, we have to add code for validation data with the schema.
I am prototyping code to validate in ParquetReader.

@SparkQA
Copy link

SparkQA commented Mar 20, 2017

Test build #74849 has finished for PR 17293 at commit eafc386.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 20, 2017

Test build #74875 has finished for PR 17293 at commit 67183d9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 20, 2017

Test build #74887 has finished for PR 17293 at commit 99ad48c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Mar 20, 2017

@HyukjinKwon I added data validation using schema information for Parquet Reader, as @gatorsmile suggested in https://www.mail-archive.com/[email protected]/msg39233.html. Could you please take a look?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Mar 21, 2017

Uh.. I am actually not sure if we want the non-nullability per #14124 (comment).

I am willing to help test and verify if it is okay for non-nullability at my best but I hope anyone who is qualified decides what we want first.

Another personal humble opinion is, I think we should enable/disable this for all datasource or not if the change is not too big (assuming this PR enables non-nullability for Parquet only).

cc @cloud-fan, @marmbrus and @liancheng, do we want to enable this non-nullability?

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74948 has finished for PR 17293 at commit d0a989b.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class PointStr(x: String, y: String)

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74950 has finished for PR 17293 at commit 19acf23.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74957 has finished for PR 17293 at commit 731d814.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74958 has finished for PR 17293 at commit fadc935.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Mar 21, 2017

This failure occurs due to an issue described at #17302.
Waiting for merging this PR.

@kiszk
Copy link
Member Author

kiszk commented Mar 21, 2017

@HyukjinKwon If we agree with adding data cleaning phase, I will do this for other data sources in another PR.

@SparkQA
Copy link

SparkQA commented Mar 24, 2017

Test build #75149 has started for PR 17293 at commit f63d51b.

@kiszk
Copy link
Member Author

kiszk commented Mar 24, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Mar 24, 2017

Test build #75158 has finished for PR 17293 at commit f63d51b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Mar 24, 2017

cc: @HyukjinKwon, @cloud-fan, @marmbrus and @liancheng,

@SparkQA
Copy link

SparkQA commented May 30, 2017

Test build #77529 has finished for PR 17293 at commit af25215.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 30, 2017

Test build #77530 has finished for PR 17293 at commit 9a24e00.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 30, 2017

Test build #77541 has finished for PR 17293 at commit 6837eb3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented May 30, 2017

ping @HyukjinKwon, @gatorsmile

1 similar comment
@kiszk
Copy link
Member Author

kiszk commented Jun 27, 2017

ping @HyukjinKwon, @gatorsmile

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jun 27, 2017

(Just FWIW, I am waiting for the feedback for #17293 (comment). I can help review but I think we need a decision (which I can't make by rule I believe) whether we should support this case or not).

@JasonMWhite
Copy link
Contributor

This issue causes a lot of headaches for us when picking up parquet datasets. To get around this issue, we write the schema alongside the parquet files in a side-band, and then when loading, create the correct schema object (with non-nullable columns) and swap it in, like so:

    schema = read_schema_file(path)
    new_java_schema = spark._jvm.org.apache.spark.sql.types.DataType.fromJson(schema.json())
    java_rdd = data_frame._jdf.toJavaRDD()
    new_jdf = spark._jsparkSession.createDataFrame(java_rdd, new_java_schema)

    return sql.DataFrame(new_jdf, data_frame.sql_ctx)

This is extremely ugly code, particularly in PySpark, and I'd rather remove it. I don't see why we wouldn't want to trust the nullable flags that Spark itself correctly writes to parquet files.

@kiszk
Copy link
Member Author

kiszk commented Jun 28, 2017

@HyukjinKwon thank you for reminding us this discussion. I agree that we need a discussion.

@cloud-fan, @marmbrus, @liancheng is there any comment on this?

* 2. If A doesn't exist in `that`, it's included in the result schema.
* 3. If B doesn't exist in `this`, it's also included in the result schema.
* 2. If A doesn't exist in `that`, it's included in the result schema with nullable.
* 3. If B doesn't exist in `this`, it's also included in the result schema with nullable.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

@cloud-fan
Copy link
Contributor

My main concern is about how to validate it. This is one pain point of Spark: we don't control the storage layer and we can't trust any constraints like nullable, primary key, etc.

I think it's a safe choice to always treat input data as nullable, and I think it's all about performance, validating also have performance penalty, and it may eliminate the benefit of non-nullable optimization.

BTW another reason is Spark doesn't work well if the nullable information is wrong, we may return wrong result which is very hard to debug.

@JasonMWhite
Copy link
Contributor

Reliable nullability information is about far more than non-nullable optimization to us. I would happily opt in to any performance penalty that validated that non-nullable columns were actually non-nullable, with a hard fail if it encountered an unexpected null. This is a real problem we face running a reasonably large data warehouse at scale.

In fact, we already do this in another project through the use of a custom function called assert_not_null that throws an exception if it encounters a null in a specific field. This is awkward for us because:

  • it requires the use of a sideband storage of the schema, or the use of another library to read the actual schema of the parquet files to identify the columns that should be not nullable
  • UDFs can't be non-nullable AFAIK (at least they couldn't be when I last looked, please LMK if this is no longer the case), so we have to reach into the protected spark namespace to add this new function

@cloud-fan
Copy link
Contributor

Actually Spark already has a AssertNotNull expression to do the validation. @kiszk I think we should use this expression to validate all data sources instead of just parquet.

@kiszk
Copy link
Member Author

kiszk commented Jul 5, 2017

I see. AssertNonNull can facilitate optimizations by using nonNull information.

@kiszk kiszk closed this Jul 5, 2017
@jeff303
Copy link
Contributor

jeff303 commented Apr 11, 2019

Does the closure of this PR imply that setting nullable=false in a custom (user-defined) schema will never have an effect when loading CSV or JSON data from a file? In other words, if someone sets nullable: false in a custom JSON schema, as in the following scenario, it will be ignored?

val customSchemaJson = <...some custom JSON schema...>
val customSchema = DataType.fromJson(customSchemaJson).asInstanceOf[StructType]
spark.read.schema(customSchema).json("/path/to/data.json")

@HyukjinKwon
Copy link
Member

We should discuss further. Yes, for now. That's what happens now.

@jeff303
Copy link
Contributor

jeff303 commented Apr 12, 2019

Thanks for the answer, @HyukjinKwon. Is there any place that specifies the full syntax (DDL and/or JSON) and known limitations of custom schemas? I searched in various ways through the docs root here but couldn't find anything. We are incorporating support for custom schemas into our tool and I was hoping to find something I could link our users to.

@HyukjinKwon
Copy link
Member

Apache Spark side doesnt document SQL full syntax. You shkuld take a look at ANTLR definition. Let's ask it to mailing list since that subject is orthogonal.

@michelsciortino
Copy link

This issue still persist in 2023.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants