Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Apr 22, 2016

What changes were proposed in this pull request?

We have logical plans that produce domain objects which are ObjectType. As we can't estimate the size of ObjectType, we throw an UnsupportedOperationException if trying to do that. We should set a default size for ObjectType to avoid this failure.

How was this patch tested?

DatasetSuite.

def unapply(plan: LogicalPlan): Option[LogicalPlan] = {
if (plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold) {
// We can't estimate the size of ObjectType
if (plan.find(_.isInstanceOf[ObjectProducer]).isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the size currently?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ObjectType simply throws exception if we call its defaultSize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ObjectProducer does not always produce objects, think about int encoder. We should check if the output is ObjectType

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. Let me add the check too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a special case, right? We are facing the same issue as long as we calculate the statistics values.

@cloud-fan
Copy link
Contributor

Can you construct a query that can trigger this bug?

@viirya
Copy link
Member Author

viirya commented Apr 22, 2016

@cloud-fan Just added.

@cloud-fan
Copy link
Contributor

cloud-fan commented Apr 22, 2016

After think about it, it is better to implement statistics in SerializeFromObject? The default implementation in UnaryNode is wrong for it.

@gatorsmile
Copy link
Member

@cloud-fan That does not help. The parent UnaryNode can be any type. We will face this issue when its child is SerializeFromObject

@gatorsmile
Copy link
Member

gatorsmile commented Apr 22, 2016

We need to change the implementation of UnaryNode's the default statistics calculation. However, the other node types that do not use this default method still face the same issue.

@viirya
Copy link
Member Author

viirya commented Apr 22, 2016

@cloud-fan Sounds good.

@viirya
Copy link
Member Author

viirya commented Apr 22, 2016

Let me try it...

@cloud-fan
Copy link
Contributor

The object operators are really special, it breaks the contract that operator will always produce unsafe rows, so their usage is quite limited. Generally speaking, an ObjectProducer will always have an ObjectConsumer nearby to turn it back to unsafe rows. So if we implement statistics for ObjectConsumer, there should be nowhere else that will estimate size for object type.

@viirya
Copy link
Member Author

viirya commented Apr 22, 2016

Yea I see.

@viirya
Copy link
Member Author

viirya commented Apr 22, 2016

@cloud-fan Is there guarantee that an ObjectConsumer can't produce domain object? If no, I think it is safer to implement statistics in SerializeFromObject, instead of ObjectConsumer?

@gatorsmile
Copy link
Member

== Optimized Logical Plan ==
Project [user#7,recommendations#48 AS prediction#77,actual#65 AS label#78]
+- Join Inner, Some((user#7 = id#64))
   :- Project [user#7,recommendations#48]
   :  +- Join Inner, Some((user#7 = id#47))
   :     :- Aggregate [user#7], [user#7]
   :     :  +- LocalRelation [user#7], [[0],[0],[0],[1],[1],[1],[2],[2],[2]]
   :     +- Project [_1#44 AS id#47,_2#45 AS recommendations#48]
   :        +- LogicalRDD [_1#44,_2#45], MapPartitionsRDD[19] at rddToDatasetHolder at ALS.scala:335
   +- Project [_1#61 AS id#64,_2#62 AS actual#65]
      +- Filter isnotnull(_1#61)
         +- SerializeFromObject [input[0, scala.Tuple2]._1 AS _1#61,newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData) AS _2#62]
            +- MapGroups <function2>, value#55: int, newInstance(class scala.Tuple2), [value#55], [user#7,item#8], obj#60: scala.Tuple2
               +- AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, int] AS value#55]
                  +- LocalRelation [user#7,item#8], [[0,3],[0,4],[0,5],[1,3],[1,5],[1,4],[2,3],[2,5],[2,4]]

How about this plan? I am still unable to catch your main points.

@gatorsmile
Copy link
Member

When we calculating the statistics of Filter, we hit the issue caused by the UnaryNode's default statistics calculation, right?

@viirya
Copy link
Member Author

viirya commented Apr 22, 2016

@gatorsmile Look at the SerializeFromObject in your plan. If we implement statistics in it, we can skip estimating size of MapGroups which produces domain objects.

@gatorsmile
Copy link
Member

@viirya Nope. Actually, I did that before. It does not work. The issue is its parent node's statistics calculation triggers the exception.

@gatorsmile
Copy link
Member

gatorsmile commented Apr 22, 2016

The problem is the parent node calls the defaultSize of its child's output.

    val childRowSize = child.output.map(_.dataType.defaultSize).sum + 8
    val outputRowSize = output.map(_.dataType.defaultSize).sum + 8

Thus, we should check the dataType here.

@cloud-fan
Copy link
Contributor

cloud-fan commented Apr 22, 2016

@viirya , yea you are right, ObjectConsumer may also produce objects, so we should implement statistic in SerializeFromObject.

@gatorsmile I may misunderstand your point. What do you mean by The problem is the parent node calls the defaultSize of its child's output? If we make SerializeFromObject.statistics work, its parent should be OK too, as SerializeFromObject will never produce objects.

@SparkQA
Copy link

SparkQA commented Apr 22, 2016

Test build #56647 has finished for PR 12599 at commit 84207c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

@cloud-fan If we do not produce objects, it should work. Otherwise, we will hit the exception when the parent node calculates the statistics:

val childRowSize = child.output.map(_.dataType.defaultSize).sum + 8

Previously, I just simply use the child's statistics value as the statistics of SerializeFromObject. Obviously, it does not help.

@SparkQA
Copy link

SparkQA commented Apr 22, 2016

Test build #56650 has finished for PR 12599 at commit 8e0541c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// We can't estimate the size of ObjectType. We implement statistics here to avoid
// directly estimate any child plan which produces domain objects as output.
override def statistics: Statistics = {
if (child.output.find(_.dataType.isInstanceOf[ObjectType]).isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can just do child.output.head.isInstanceOf[ObjectType], this is guarantted in:

trait ObjectConsumer extends UnaryNode {
   assert(child.output.length == 1)
   ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. yes.

@viirya viirya changed the title [SPARK-14838][SQL] Skip automatically broadcast a plan when it contains ObjectProducer [SPARK-14838][SQL] Implement statistics in SerializeFromObject to avoid failure when estimating sizeInBytes for ObjectType Apr 22, 2016
// directly estimate any child plan which produces domain objects as output.
override def statistics: Statistics = {
if (child.output.head.dataType.isInstanceOf[ObjectType]) {
Statistics(sizeInBytes = Long.MaxValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bring back this discussion: #12599 (comment)

We can calculate the row size by this.output.map(_.dataType), can't we?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So your point is to store numRows instead of sizeInBytes in Statistics? Is it any benefit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can think is that we need to manipulate sizeInBytes directly in statistics method of some logical plans, such as summing up children's sizeInBytes. So it is more convenient?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or should we go deeper to find the first child that doesn't output objects and take its statistic? Returning Long.max means we can't broadcast join a plan having object operators, which is bad for Dataset

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the difference between estimated sizeInBytes is acceptable, I think we can do it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the logic here. Now it looks for an underlying logical plan that can be used to construct useful statistics.

@SparkQA
Copy link

SparkQA commented Apr 22, 2016

Test build #56665 has finished for PR 12599 at commit 9d2033f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 22, 2016

Test build #56669 has finished for PR 12599 at commit 4b5f66d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// directly estimate any child plan which produces domain objects as output.
override def statistics: Statistics = {
if (child.output.head.dataType.isInstanceOf[ObjectType]) {
val underlyingPlan = child.find { p =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After rethink about it, this can be very complex, e.g. MapGroups. Maybe we can just return Long.max and add a TODO here.

cc @marmbrus @davies

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, for the case of MapGroups, I use the default way to calculate the sizeInBytes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not revert to Long.max immediately and see others comments first.

Copy link
Contributor

@davies davies Apr 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just have a default size (for example, 4k) for ObjectType ?

Since we will have better estimation on ObjectConsumer, the default size of ObjectType does not matter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The danger of default size for ObjectType is to underestimate the size of domain object output. Then we might broadcast a big size plan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may not understand what we will have better estimation on ObjectConsumer means.

Copy link
Contributor

@davies davies Apr 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ObjectProducer always sit in the middle of query plan (especially for join), the direct children of join can't be ObjectProducer.

Thinking of three operators: SQL operator -> ObjectProducer -> ObjectConsumer (produce UnsafeRow). The data size of logical plan depends on the number of rows and the size of each row, the default size of Object only affect size of row. The estimation of ObjectConsumer only depends on number of rows from ObjectProducer and the size of row produced by it self, this means the size of object will NOT change the size of ObjectConsumer.

Since we can have better estimation on ObjectConsumer, so the estimation of ObjectProducer do not matter (the number of rows matter, but size of row do not matter).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the 4k default size

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davies Thanks. That makes sense.

@SparkQA
Copy link

SparkQA commented Apr 22, 2016

Test build #56687 has finished for PR 12599 at commit dcd6056.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 22, 2016

Test build #56690 has finished for PR 12599 at commit 3ff11a1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya changed the title [SPARK-14838][SQL] Implement statistics in SerializeFromObject to avoid failure when estimating sizeInBytes for ObjectType [SPARK-14838][SQL] Set default size for ObjecType to avoid failure when estimating sizeInBytes in ObjectProducer Apr 23, 2016
@SparkQA
Copy link

SparkQA commented Apr 24, 2016

Test build #56820 has finished for PR 12599 at commit 6b6c12d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

checkDataset(wideDF.map(_.getLong(0)), 0L until 10 : _*)
}

test("Estimate size on ObjectProducer will cause failure") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test case name is wrong?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix. thanks. Please see if the new name is more proper.

@SparkQA
Copy link

SparkQA commented Apr 24, 2016

Test build #56825 has finished for PR 12599 at commit c26c3bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Apr 24, 2016

LGTM,
Merging this into master, thanks!

@asfgit asfgit closed this in ba5e0b8 Apr 24, 2016
@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/56832/
Test FAILed.

@viirya
Copy link
Member Author

viirya commented Apr 24, 2016

Unrelated failure. I think it is ok. Thanks.

@viirya viirya deleted the skip-broadcast-objectproducer branch December 27, 2023 18:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants