@@ -236,3 +236,107 @@ case class BroadcastNestedLoopJoin(
236236 streamedPlusMatches.flatMap(_._1), sc.makeRDD(rightOuterMatches))
237237 }
238238}
239+
240+ /*
241+ In some case ,data skew happens.SkewJoin sample the table rdd to find the largest key,then make the largest key
242+ rows as a table rdd.The streamed rdd will be made as mainstreamedtable rdd without the largest key and the maxkeystreamedtable rdd
243+ with the largest key.
244+ Then,join the two table with the buildtable.
245+ Finally,union the two result rdd.
246+ */
247+ @ DeveloperApi
248+ case class SkewJoin (
249+ leftKeys : Seq [Expression ],
250+ rightKeys : Seq [Expression ],
251+ buildSide : BuildSide ,
252+ left : SparkPlan ,
253+ right : SparkPlan ,
254+ @ transient sc : SparkContext ) extends BinaryNode {
255+ override def outputPartitioning : Partitioning = left.outputPartitioning
256+
257+ override def requiredChildDistribution =
258+ ClusteredDistribution (leftKeys) :: ClusteredDistribution (rightKeys) :: Nil
259+
260+ val (buildPlan, streamedPlan) = buildSide match {
261+ case BuildLeft => (left, right)
262+ case BuildRight => (right, left)
263+ }
264+ val (buildKeys, streamedKeys) = buildSide match {
265+ case BuildLeft => (leftKeys, rightKeys)
266+ case BuildRight => (rightKeys, leftKeys)
267+ }
268+
269+ def output = left.output ++ right.output
270+
271+ @ transient lazy val buildSideKeyGenerator = new Projection (buildKeys, buildPlan.output)
272+ @ transient lazy val streamSideKeyGenerator = new Projection (streamedKeys, streamedPlan.output)
273+
274+
275+ def execute () = {
276+ val streamedtable = streamedPlan.execute()
277+ // This will later write as configuration
278+ val sample = streamedtable.sample(false , 0.3 , 9 ).map(row => streamSideKeyGenerator(row)).collect()
279+ val sortedsample = sample.sortWith((row1, row2) => row1.hashCode() > row2.hashCode())
280+ var max = 0
281+ var num = sample.size - 1
282+ var temp = 0
283+ var maxrowkey = sortedsample(0 )
284+ // find the largest key
285+ if (sortedsample.size > 1 ) {
286+ for (i <- 1 to num) {
287+ if (sortedsample(i - 1 ) == sortedsample(i)) temp += 1
288+ else {
289+ if (temp > max) {
290+ max = temp
291+ maxrowkey = sortedsample(i - 1 )
292+ }
293+ temp = 0
294+ }
295+ }
296+ }
297+ val maxkeystreamedtable = streamedtable.filter(row => {
298+ streamSideKeyGenerator(row).toString().equals(maxrowkey.toString())
299+ })
300+ val mainstreamedtable = streamedtable.filter(row => {
301+ ! streamSideKeyGenerator(row).toString().equals(maxrowkey.toString())
302+ })
303+ val buildrdd = buildPlan.execute()
304+ val maxkeyjoinedrdd = maxkeystreamedtable.map(_.copy()).cartesian(buildrdd.map(_.copy())).map {
305+ case (l : Row , r : Row ) => buildRow(l ++ r)
306+ }
307+ val mainjoinedrdd = mainstreamedtable.map(_.copy()).cartesian(buildrdd.map(_.copy())).map {
308+ case (l : Row , r : Row ) => buildRow(l ++ r)
309+ }
310+ sc.union(maxkeyjoinedrdd, mainjoinedrdd)
311+ }
312+ }
313+
314+
315+ object SkewJoin extends Strategy with PredicateHelper {
316+ def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
317+ // Find inner joins where at least some predicates can be evaluated by matching hash keys
318+ // using the HashFilteredJoin pattern.
319+ case SkewFilteredJoin (Inner , leftKeys, rightKeys, condition, left, right) =>
320+ val hashJoin =
321+ execution.SkewJoin (leftKeys, rightKeys, BuildRight , planLater(left), planLater(right), sparkContext)
322+ condition.map(Filter (_, hashJoin)).getOrElse(hashJoin) :: Nil
323+ case _ => Nil
324+ }
325+ }
326+
327+ object SkewFilteredJoin extends Logging with PredicateHelper {
328+ /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
329+ type ReturnType =
330+ (JoinType , Seq [Expression ], Seq [Expression ], Option [Expression ], LogicalPlan , LogicalPlan )
331+
332+ def unapply (plan : LogicalPlan ): Option [ReturnType ] = plan match {
333+ // All predicates can be evaluated for inner join (i.e., those that are in the ON
334+ // clause and WHERE clause.)
335+ case FilteredOperation (predicates, join@ Join (left, right, Inner , condition)) =>
336+ logger.debug(s " Considering Skew inner join on: ${predicates ++ condition}" )
337+ splitPredicates(predicates ++ condition, join)
338+ case join@ Join (left, right, joinType, condition) =>
339+ logger.debug(s " Considering Skew join on: $condition" )
340+ splitPredicates(condition.toSeq, join)
341+ case _ => None
342+ }
0 commit comments