Skip to content

Conversation

@Sephiroth-Lin
Copy link
Contributor

see jira https://issues.apache.org/jira/browse/SPARK-9066

use tpc-ds to test, for below SQL clause:

with single_value as (
  select 1 tpcds_val from date_dim
)
select sum(ss_quantity * ss_sales_price) ssales, tpcds_val
from store_sales, single_value
group by tpcds_val

use this patch run1h55min, without this patch run half tasks use 16.7h

@scwf

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37347 has finished for PR 7417 at commit 0a62098.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37350 has finished for PR 7417 at commit 61d1a7e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class LDAModel(JavaModelWrapper):
    • class LDA(object):
    • trait ImplicitCastInputTypes extends ExpectsInputTypes
    • abstract class BinaryOperator extends BinaryExpression with ExpectsInputTypes
    • case class UnaryMinus(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class UnaryPositive(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class Abs(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class BitwiseNot(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class Factorial(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Hex(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Unhex(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Round(child: Expression, scale: Expression)
    • case class Md5(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Sha1(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Crc32(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Not(child: Expression)
    • case class And(left: Expression, right: Expression) extends BinaryOperator with Predicate
    • case class Or(left: Expression, right: Expression) extends BinaryOperator with Predicate
    • trait StringRegexExpression extends ImplicitCastInputTypes
    • trait String2StringExpression extends ImplicitCastInputTypes
    • trait StringComparison extends ImplicitCastInputTypes
    • case class StringSpace(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class StringLength(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Ascii(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Base64(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class UnBase64(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode

@Sephiroth-Lin
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #23 has finished for PR 7417 at commit 61d1a7e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class LDAModel(JavaModelWrapper):
    • class LDA(object):
    • trait ImplicitCastInputTypes extends ExpectsInputTypes
    • abstract class BinaryOperator extends BinaryExpression with ExpectsInputTypes
    • case class UnaryMinus(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class UnaryPositive(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class Abs(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class BitwiseNot(child: Expression) extends UnaryExpression with ExpectsInputTypes
    • case class Factorial(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Hex(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Unhex(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Round(child: Expression, scale: Expression)
    • case class Md5(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Sha1(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Crc32(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Not(child: Expression)
    • case class And(left: Expression, right: Expression) extends BinaryOperator with Predicate
    • case class Or(left: Expression, right: Expression) extends BinaryOperator with Predicate
    • trait StringRegexExpression extends ImplicitCastInputTypes
    • trait String2StringExpression extends ImplicitCastInputTypes
    • trait StringComparison extends ImplicitCastInputTypes
    • case class StringSpace(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class StringLength(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Ascii(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Base64(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class UnBase64(child: Expression) extends UnaryExpression with ImplicitCastInputTypes

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37354 has finished for PR 7417 at commit 61d1a7e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question. Why not use sizeInBytes? I assume we want to move as little data as possible? Using sizeInBytes would be a bit more involved, since this would involve the planner, and (probably) adding a BuildSide parameter to CartesianProduct...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, use partition size here is not accurate, see a rdd with 100 partitions, and each partition has one record and a rdd with 10 partition and each partition has 100 million records, use the method above will cause more scan from hdfs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Yes, use sizeInBytes is better, but also have a problem, if leftResults only have 1 record and this record size are big, and rightResults have many records and these records total size are small, then at this scenario will cause worse performance. The best way is we check the total records for the partition, but now we can not get it.

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #37467 has finished for PR 7417 at commit 23deb4b.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CartesianProduct(

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #37471 has finished for PR 7417 at commit eb9d155.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CartesianProduct(

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #37495 has finished for PR 7417 at commit 8198648.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CartesianProduct(

@SparkQA
Copy link

SparkQA commented Jul 17, 2015

Test build #37557 has finished for PR 7417 at commit 1006d46.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CartesianProduct(

@SparkQA
Copy link

SparkQA commented Jul 17, 2015

Test build #37566 has finished for PR 7417 at commit 547242e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BroadcastCartesianProduct(
    • case class CartesianProduct(

@SparkQA
Copy link

SparkQA commented Jul 20, 2015

Test build #37807 has finished for PR 7417 at commit a168900.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BroadcastCartesianProduct(
    • case class CartesianProduct(

@hvanhovell
Copy link
Contributor

Do you have any benchmarking results for this? Would be great to see how much this improves the current situation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this different from a BroadcastNestedLoopJoin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BroadcastNestedLoopJoin just used for out join right? But this is used for cartesian.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inner join variant with (degenerate) condition 1 = 1 would do the same.

All I am saying is that this also a way to get a broadcasting cartesian join going, and it saves some lines of code.

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #38024 has finished for PR 7417 at commit 99bcde7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BroadcastCartesianProduct(
    • case class CartesianProduct(

@Sephiroth-Lin
Copy link
Contributor Author

@hvanhovell I use tpc-ds to test, for below SQL clause:

with single_value as (
  select 1 tpcds_val from date_dim
)
select sum(ss_quantity * ss_sales_price) ssales, tpcds_val
from store_sales, single_value
group by tpcds_val

use this patch run1h55min, without this patch run half tasks use 16.7h

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is almost the same as the code above. I would put it in a method i.e. createCartesianProduct, and wrap the result in a Filter operator.

@scwf
Copy link
Contributor

scwf commented Sep 8, 2015

@Sephiroth-Lin can you rebase this?

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@Sephiroth-Lin
Copy link
Contributor Author

@scwf done. @zsxwing code updated.

@SparkQA
Copy link

SparkQA commented Sep 8, 2015

Test build #42133 has finished for PR 7417 at commit 8a8658c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CartesianProduct(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think BroadcastNestedLoopJoin can support the condition, and push down the filter into the operator can also reduce the memory overhead, as BroadcastNestedLoopJoin will put all of the valid tuple into a compact buffer.

@SparkQA
Copy link

SparkQA commented Sep 9, 2015

Test build #42193 has finished for PR 7417 at commit e01c8f0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CartesianProduct(

@chenghao-intel
Copy link
Contributor

BTW, can you add some unit test like what I did at #8652

@SparkQA
Copy link

SparkQA commented Sep 29, 2015

Test build #43092 has finished for PR 7417 at commit ce6ad25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

@zsxwing the rdds order do matters for RDD.cartesian, because of the inefficient way we compute CartesianRDD:

for (x <- rdd1.iterator(currSplit.s1, context);
     y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)

For a.cartesian(b), when we compute partition in result RDD, logically we only need to compute the referenced partition of a and b once, but actually we comput partition of b many times according to the number of elements in partition of a.

Anyway this PR LGTM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of passing a BuildSide to CartesianProduct, why not just change the parameters order according to the data size? like

if (left < right) {
  CartesianProduct(left, right)
} else {
  CartesianProduct(right, left)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I am a little concern about the side switch based on the statistic, as I commented previously. And also as @cloud-fan comment out:

for (x <- rdd1.iterator(currSplit.s1, context);
     y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)

What we actually cared is the average amount of records in each partition in both sides, and, I don't think we can say, the one take the bigger file size in statistics will also with more average amount of records in its partition(most likely the average amount of records in each partition should be same).

Probably we'd better add more statistic info says partition number logical plan or average file size of each partition, and in order not to make confusing for the further improvement, I think we'd better remove this optimization rule for cartesian join. And that's why I didn't do that at #8652

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! This optimization should depend on record numbers, not data size.

@cloud-fan
Copy link
Contributor

Hi @Sephiroth-Lin , according to the previous discussion, I think we should NOT do optimization according to data size, do you mind closing this PR and help us review #8652? It contains part of your optimization which is still valid.

@Sephiroth-Lin
Copy link
Contributor Author

@cloud-fan OK.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.