Skip to content

Conversation

@wangxiaojing
Copy link
Contributor

JIRA issue: SPARK-4485
We are planning to create a BroadcastHashouterJoin to implement the broadcast join for left outer join and right outer join
In left outer join :
If the size of data from right side is smaller than the user-settable thresholdAUTO_BROADCASTJOIN_THRESHOLD,
the planner would mark it as the broadcast relation and mark the other relation as the stream side. The broadcast table will be broadcasted to all of the executors involved in the join, as aorg.apache.spark.broadcast.Broadcast object. It will use joins.BroadcastHashouterJoin.,else it will usejoins.HashOuterJoin.
In right outer join:
If the size of data from left side is smaller than the user-settable threshold AUTO_BROADCASTJOIN_THRESHOLD,the planner would mark it as the broadcast relation and mark the other relation as the stream side. The broadcast table will be broadcasted to all of the executors involved in the join, as aorg.apache.spark.broadcast.Broadcast object. It will use joins.BroadcastHashouterJoin. else it will usejoins.HashOuterJoin.

The benchmark suggests these made the optimized version 7x faster when left outer join andright outer join


Original:
left outer join : 15439 ms 
right outer join : 9707 ms
Optimized:
left outer join : 1992 ms
 right outer join : 1288 ms

The micro benchmark load data1/kv3.txt into a normal Hive table.
Benchmark code:


 def benchmark(f: => Unit) = {
    val begin = System.currentTimeMillis()
    f
    val end = System.currentTimeMillis()
    end - begin
  }
  val sc = new SparkContext(
    new SparkConf()
      .setMaster("local")
      .setAppName(getClass.getSimpleName.stripSuffix("$")))
  val hiveContext = new HiveContext(sc)
  import hiveContext._
  sql("drop table if exists left_table")
  sql("drop table if exists right_table")
  sql( """create table left_table (key int, value string)
       """.stripMargin)
  sql( s"""load data local inpath "/data1/kv3.txt" into table left_table""")
  sql( """create table right_table (key int, value string)
       """.stripMargin)
  sql(
    """
      |from left_table
      |insert overwrite table right_table
      |select left_table.key, left_table.value
    """.stripMargin)
  val leftOuterJoin = sql(
    """select a.key from left_table a
      | left outer join right_table b on a.key = b.key """.stripMargin)
  val rightOuterJoin = sql(
    """select a.key from left_table a
      |right outer join right_table b on a.key = b.key""".stripMargin)

  val leftOuterJoinDuration = benchmark(leftOuterJoin.count())
  val rightOuterJoinDuration = benchmark(rightOuterJoin.count())
  println(s"left outer join : $leftOuterJoinDuration ms right outer join : $rightOuterJoinDuration ms")

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@chenghao-intel
Copy link
Contributor

Semantically, I don't think the outer join can be done via broadcast the small table in some cases.

For example:
Let's say right table is the small table (broadcasted) and the join type is RightOuterJoin, as any single partition in Left side, doesn't have the equikey, but the equikey does exist in the other partition, however, it still will outputs the tuple with left side references as null, which is supposed to output nothing, since the other partition will output the correct tuple for this.

But if it's the LeftOuterJoin with right table broadcasted, it probably works.

@tianyi
Copy link
Contributor

tianyi commented Nov 20, 2014

@chenghao-intel ,you are right , the broadcast outer join could only works for "LeftOuterJoin with right table broadcasted" and "RightOuterJoin with left table broadcasted". For other cases, spark should still use HashOuterJoin. The BroadcastHashOuterJoin is not replacing HashOuterJoin, just a optimization for particular cases.

@chenghao-intel
Copy link
Contributor

Oh, Thanks for clarification. I read the code again and it does work for this particular optimization. there is some duplicated code with HashOuterJoin, probably we can make it as shared.

@chenghao-intel
Copy link
Contributor

By the way, LEFT_SEMI_JOIN could also be optimized as broadcast, I think.

@chenghao-intel
Copy link
Contributor

I think most of code can be shared between this PR and #3375. Create a HashOuterJoin trait or something after #3375 merged?

@liancheng
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Nov 21, 2014

Test build #23699 has started for PR 3362 at commit 3c23b42.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 21, 2014

Test build #23699 has finished for PR 3362 at commit 3c23b42.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BroadcastHashOuterJoin(

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23699/
Test FAILed.

@wangxiaojing
Copy link
Contributor Author

@liancheng Add testsuite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should throw a exception here, because other types of outer join should not use BroadcastHashOuterJoin

@pzzs
Copy link
Contributor

pzzs commented Nov 28, 2014

it is very good!

@liancheng
Copy link
Contributor

ok to test

@liancheng
Copy link
Contributor

The benchmark figures are pretty impressive! Sorry for the delay, 1.1.1 and 1.2.0 releases plus Thanksgiving slow down PR reviewing/merging a lot, will come back to this soon later this week or early next week.

@SparkQA
Copy link

SparkQA commented Nov 28, 2014

Test build #23928 has started for PR 3362 at commit ae00f9e.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 28, 2014

Test build #23928 has finished for PR 3362 at commit ae00f9e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BroadcastHashOuterJoin(

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23928/
Test PASSed.

@marmbrus
Copy link
Contributor

Sorry for the delay! The benchmark results are very impressive so I think we should definitely add this feature! Here is what I think needs to be done:

  • I like @chenghao-intel 's suggestion about code sharing if possible.
  • We should be using HashedRelation for the hash tables.
  • All of the test tables are small, so we need to make sure that we are still running tests for HashOuterJoin and not always following this code path.

Since we are trying to keep the PR queue limited to active PRs, I propose we close this issue for now and reopen it as soon as the new version is ready. Looking forward to this improved join implementation :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are never planning HashOuterJoin? We need to make sure we don't lose test coverage here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is always MetastoreRelation

@asfgit asfgit closed this in ca12608 Dec 17, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants