Skip to content

Conversation

@tedyu
Copy link
Contributor

@tedyu tedyu commented Dec 31, 2020

What changes were proposed in this pull request?

This PR adds support for json / jsonb expression as PushableColumnBase.

With this change, SupportsPushDownFilters implementation would be able to push Filter down to DB.

The activation of this enhancement is controlled by nestedPredicatePushdownEnabled flag.

Why are the changes needed?

Currently implementation of SupportsPushDownFilters doesn't have a chance to perform pushdown even if third party DB engine supports json expression pushdown.

This poses challenge when table schema uses json / jsonb to encode large amount of data.

Here is some background on how I came about the current approach.

Canonical json expression is something like: phone->code or phone->>code where phone is the json(b) column and code is the field.
However, the json expression is rejected by:

org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to toAttribute on unresolved object, tree: unresolvedalias(lambdafunction(code, lambda 'phone, false), None)
    at org.apache.spark.sql.catalyst.analysis.UnresolvedAlias.toAttribute(unresolved.scala:463)
    at org.apache.spark.sql.catalyst.plans.logical.Project.$anonfun$output$1(basicLogicalOperators.scala:63)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at org.apache.spark.sql.catalyst.plans.logical.Project.output(basicLogicalOperators.scala:63)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$inputSet$1(QueryPlan.scala:57)

Does this PR introduce any user-facing change?

No

How was this patch tested?

With this change, plus corresponding changes in spark-cassandra-connector, Filter involving json expression can be pushed down.

Here is the plan prior to predicate pushdown:

2020-12-26 03:28:59,926 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0
+- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS phone#33]
   +- Filter (get_json_object(phone#37, $.code) = 1200)
      +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person
 - Cassandra Filters: []
 - Requested Columns: [id,address,phone]
Here is the plan with pushdown:
2020-12-28 01:40:08,150 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0
    +- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS phone#33]
       +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person
     - Cassandra Filters: [[phone->'code' = ?, 1200]]
     - Requested Columns: [id,address,phone]

@github-actions github-actions bot added the SQL label Dec 31, 2020
@tedyu
Copy link
Contributor Author

tedyu commented Dec 31, 2020

/cc @xuanyuanking @cloud-fan

@SparkQA
Copy link

SparkQA commented Dec 31, 2020

Test build #133570 has finished for PR 30984 at commit bfb3211.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tedyu
Copy link
Contributor Author

tedyu commented Dec 31, 2020

From https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/133570/console :

Caused by: java.nio.file.FileSystemException: /home/jenkins/workspace/SparkPullRequestBuilder/external/kafka-0-10-token-provider/target/streams/compile/_global: No space left on device
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:91)

@cloud-fan
Can someone with admin permission clean up some space ?

Thanks

@tedyu
Copy link
Contributor Author

tedyu commented Dec 31, 2020

Looking at https://github.com/apache/spark/pull/30984/checks?check_run_id=1630709203,

[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 21.0 failed 1 times, most recent failure: Lost task 1.0 in stage 21.0 (TID 20) (fv-az212-589.internal.cloudapp.net executor driver): java.lang.NoClassDefFoundError: Could not initialize class javax.imageio.ImageTypeSpecifier
...
[info]   Cause: java.lang.NoClassDefFoundError: Could not initialize class javax.imageio.ImageTypeSpecifier
[info]   at com.sun.imageio.plugins.png.PNGImageReader.getImageTypes(PNGImageReader.java:1531)

The above was not caused by my change.

@tedyu
Copy link
Contributor Author

tedyu commented Jan 2, 2021

I ran ImageFileFormatSuite locally with patch:

[info] ImageFileFormatSuite:
[info] - Smoke test: create basic ImageSchema dataframe (6 seconds, 101 milliseconds)
[info] - image datasource count test (1 second, 368 milliseconds)
[info] - image datasource test: read jpg image (230 milliseconds)
[info] - image datasource test: read png image (211 milliseconds)
[info] - image datasource test: read non image (709 milliseconds)
[info] - image datasource partition test (564 milliseconds)
[info] - readImages pixel values test (318 milliseconds)
[info] ScalaTest
[info] Run completed in 13 seconds, 772 milliseconds.

@SparkQA
Copy link

SparkQA commented Jan 3, 2021

Test build #133607 has started for PR 30984 at commit ebc6584.

@tedyu
Copy link
Contributor Author

tedyu commented Jan 4, 2021

From @cloud-fan :

How can we make sure all the data sources support GetJsonObject function?

My expectation is that, after this change is accepted, there would be more and more data sources which utilize this capability.

I have shown in SPARK-33915 how the data source utilizes this change (for spark-cassandra-connector).
Other data sources can follow the lead.

If the data source doesn't push down json related Filter, it would be the same as the current status.

@shaneknapp
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Jan 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38206/

@SparkQA
Copy link

SparkQA commented Jan 5, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38206/

@SparkQA
Copy link

SparkQA commented Jan 5, 2021

Test build #133617 has finished for PR 30984 at commit ebc6584.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tedyu
Copy link
Contributor Author

tedyu commented Jan 5, 2021

From https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/133617/testReport/pyspark.ml.tests.test_tuning/CrossValidatorTests/test_save_load_pipeline_estimator/

[Errno 28] No space left on device

Looks like some cleanup is needed for the test machine.

@shaneknapp
Do you have access to the machine ?

@tedyu
Copy link
Contributor Author

tedyu commented Jan 5, 2021

@HyukjinKwon @cloud-fan
Please provide review comment .

Thanks

@maropu maropu changed the title [SPARK-33915] Allow json expression to be pushable column [SPARK-33915][SQL] Allow json expression to be pushable column Jan 5, 2021
@tedyu
Copy link
Contributor Author

tedyu commented Jan 8, 2021

@HyukjinKwon
Do you have any more question ?

I will be happy to answer / investigate.

Thanks

@HyukjinKwon
Copy link
Member

@tedyu, the special characters are not allowed in some sources such as Hive as you tested. However, they are allowed in some other sources when you use DSLs:

scala> spark.range(1).toDF("GetJsonObject(phone#37,$.phone)").write.option("header", true).mode("overwrite").csv("/tmp/foo")

scala> spark.read.option("header", true).csv("/tmp/foo").show()
+-------------------------------+
|GetJsonObject(phone#37,$.phone)|
+-------------------------------+
|                              0|
+-------------------------------+

In this case, the filters will still be pushed down to the datasource implementation site. We should have a way to identify if the pushed GetJsonObject(phone#37,$.phone) is a field or expression pushed down.

This makes me believe the current implementation based on strings are flaky, and incomplete.

@tedyu
Copy link
Contributor Author

tedyu commented Jan 10, 2021

Was the above output obtained with or without my change ?

scala> spark.range(1).toDF("GetJsonObject(phone#37,$.phone)").write.option("header", true).mode("overwrite").csv("/tmp/foo")

scala> spark.read.option("header", true).csv("/tmp/foo").show()
+-------------------------------+
|GetJsonObject(phone#37,$.phone)|
+-------------------------------+
|                              0|
+-------------------------------+

I got the above with my change.

I am also open to suggestion on alternative approach.

@tedyu
Copy link
Contributor Author

tedyu commented Jan 10, 2021

@HyukjinKwon
Can you point me to an implementation of SupportsPushDownFilters (within Spark or third party) which doesn't rely on string matching ?

Looking at PushableColumnBase:

  def unapply(e: Expression): Option[String] = {

It seems existing code, including GetStructField, is String oriented.
I wonder if the above consideration for column name applies to GetStructField as well.

Thanks

@tedyu
Copy link
Contributor Author

tedyu commented Jan 10, 2021

For SupportsPushDownRequiredColumns, existing implementation uses regex matching to detect special functions:

https://github.com/datastax/spark-cassandra-connector/blob/master/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraScanBuilder.scala#L134-L140

@HyukjinKwon
Was arbitrary column name (as you mentioned above) considered when SupportsPushDownRequiredColumns feature was designed ?

@tedyu
Copy link
Contributor Author

tedyu commented Jan 10, 2021

@maropu
I went over related commit, #28366, and saw your comment.

It would be good if you can chime in on this change.

Thanks

@tedyu
Copy link
Contributor Author

tedyu commented Jan 10, 2021

I added some log in PushableColumnBase#unapply for the case of GetJsonObject.
When reading back the column:

         spark.read.option("header", true).csv(new Path(basePath, "fourth").toString).show()
+--------------------------------+
|get_json_object(phone, '$.code')|
+--------------------------------+
|                               1|
+--------------------------------+

For the above call, the additional log didn't show up.

If I use df.select("get_json_object(phone, '$.code')") :

[info]   org.apache.spark.sql.AnalysisException: cannot resolve '`get_json_object(phone, '$.code')`' given input columns: [get_json_object(phone, '$.code')];
[info] 'Project ['get_json_object(phone, '$.code')]
[info] +- RelationV2[get_json_object(phone, '$.code')#380] csv file:/nfusr/dev-server/tedyu/spark/target/tmp/spark-9007ae17-eace-41ee-8a3c-c62730fb8906/second
[info]   at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:170)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:167)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:341)
[info]   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:341)
[info]   at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104)
[info]   at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116)
[info]   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73)
[info]   at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116)
[info]   at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127)
[info]   at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:132)
[info]   at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
[info]   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
[info]   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
[info]   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
[info]   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
[info]   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
[info]   at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:132)
[info]   at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:243)
[info]   at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:137)
[info]   at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:104)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:167)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:94)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:183)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:94)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:91)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:154)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:175)
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:219)

@HyukjinKwon
Copy link
Member

Maybe it's best to loop with @rdblue, @viirya, @dbtsai who looked into this codes. I doubt if this is a good approach relying on strings to push down other expressions but I am okay if other people think it's fine.

@tedyu
Copy link
Contributor Author

tedyu commented Jan 11, 2021

Ping @xuanyuanking for opinion since he made comment on SPARK-33915

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's best to loop with @rdblue, @viirya, @dbtsai who looked into this codes. I doubt if this is a good approach relying on strings to push down other expressions but I am okay if other people think it's fine.

Hmm, I have the same feeling. This looks not a reliable approach. PushableColumnBase is used to get a pushable column name. Converting an expression like GetJsonObject(column, field) to a string to push down may not reliable and may have unexpected result.

@tedyu
Copy link
Contributor Author

tedyu commented Jan 11, 2021

@viirya
As I mentioned in #30984 (comment), this approach is consistent with how existing implementation handles the (nested) pushable columns.

Can you outline one scenario where the converted string would produce unexpected result ?

I am interested in hearing alternative approach to pushing down json expressions.

Thanks

@cloud-fan
Copy link
Contributor

I don't think it's consistent with the existing implementation, otherwise we should be using strings all around like a > 1 instead of GreaterThan(a, 1).

The only exception I know of is nested columns. We use string a.b to represent a nested column, instead of a dedicated class. This is for backward compatibility reasons and we have a legacy config to fallback. It's also not that bad as the string representation for nested columns is pretty standard.

We probably need to merge the V1 Filter and V2 Expression to support cases like this. Using string is not the right way to go.

@tedyu
Copy link
Contributor Author

tedyu commented Jan 11, 2021

Here is some background on how I came about the current approach.

Canonical json expression is something like: phone->code or phone->>code where phone is the json(b) column and code is the field.
However, the json expression is rejected by:

org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to toAttribute on unresolved object, tree: unresolvedalias(lambdafunction(code, lambda 'phone, false), None)
    at org.apache.spark.sql.catalyst.analysis.UnresolvedAlias.toAttribute(unresolved.scala:463)
    at org.apache.spark.sql.catalyst.plans.logical.Project.$anonfun$output$1(basicLogicalOperators.scala:63)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at org.apache.spark.sql.catalyst.plans.logical.Project.output(basicLogicalOperators.scala:63)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$inputSet$1(QueryPlan.scala:57)

I haven't spent much time investigating how the native jsonb expression can be directly supported in Spark because the lambda is a fundamental notation.

bq. using strings all around like a > 1 instead of GreaterThan(a, 1)

In general, I agree strong typing is better than string matching.
However, please note that what this PR tries to handle is not the '>' part, it is the a part.

bq. the string representation for nested columns is pretty standard.

As I mentioned above, json path expression is quite standard. I tend to think that get_json_object(phone, '$.code') should be treated similarly as phone->code (due to the restriction on lambda).

Along this line of thinking, using string matching for pushing down json path expression is tantamount to pushing down nested column. I have updated the PR by adding the nestedPredicatePushdownEnabled condition to the GetJsonObject case.

The implementation of SupportsPushDownFilters can decide whether the pushed down Filter should be honored or not (since it has more knowledge about the underlying schema). If not, the Filter would be treated on driver side.

@cloud-fan
bq. probably need to merge the V1 Filter and V2 Expression to support cases like this

If you can outline in relatively more detailed steps how the merge would help expressing json path expression, that would be nice.
I have some cycles which I can contribute to building this.

@SparkQA
Copy link

SparkQA commented Jan 11, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38533/

@SparkQA
Copy link

SparkQA commented Jan 11, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38533/

@SparkQA
Copy link

SparkQA commented Jan 12, 2021

Test build #133947 has finished for PR 30984 at commit fe8034d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tedyu
Copy link
Contributor Author

tedyu commented Jan 12, 2021

@HyukjinKwon @cloud-fan
I have updated the PR to include checking nestedPredicatePushdownEnabled flag.

Please also see #30984 (comment) where I outline how I got to the current formation.

Thanks

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Apr 23, 2021
@github-actions github-actions bot closed this Apr 24, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants