@@ -237,107 +237,5 @@ case class BroadcastNestedLoopJoin(
237237 }
238238}
239239
240- /**
241- * :: DeveloperApi ::
242- *In some case ,data skew happens.SkewJoin sample the table rdd to find the largest key,then make the largest key
243- *rows as a table rdd.The streamed rdd will be made as mainstreamedtable rdd without the largest key and the maxkeystreamedtable rdd
244- *with the largest key.
245- *Then,join the two table with the buildtable.
246- *Finally,union the two result rdd.
247- */
248- @ DeveloperApi
249- case class SkewJoin (
250- leftKeys : Seq [Expression ],
251- rightKeys : Seq [Expression ],
252- buildSide : BuildSide ,
253- left : SparkPlan ,
254- right : SparkPlan ,
255- @ transient sc : SparkContext ) extends BinaryNode {
256- override def outputPartitioning : Partitioning = left.outputPartitioning
257-
258- override def requiredChildDistribution =
259- ClusteredDistribution (leftKeys) :: ClusteredDistribution (rightKeys) :: Nil
260-
261- val (buildPlan, streamedPlan) = buildSide match {
262- case BuildLeft => (left, right)
263- case BuildRight => (right, left)
264- }
265- val (buildKeys, streamedKeys) = buildSide match {
266- case BuildLeft => (leftKeys, rightKeys)
267- case BuildRight => (rightKeys, leftKeys)
268- }
269-
270- def output = left.output ++ right.output
271-
272- @ transient lazy val buildSideKeyGenerator = new Projection (buildKeys, buildPlan.output)
273- @ transient lazy val streamSideKeyGenerator = new Projection (streamedKeys, streamedPlan.output)
274-
275-
276- def execute () = {
277- val streamedTable = streamedPlan.execute()
278- // This will later write as configuration
279- val sample = streamedTable.sample(false , 0.3 , 9 ).map(row => streamSideKeyGenerator(row)).collect()
280- val sortedSample = sample.sortWith((row1, row2) => row1.hashCode() > row2.hashCode())
281- var max = 0
282- var num = sample.size - 1
283- var temp = 0
284- var maxrowKey = sortedSample(0 )
285- // find the largest key
286- if (sortedSample.size > 1 ) {
287- for (i <- 1 to num) {
288- if (sortedSample(i - 1 ) == sortedSample(i)) temp += 1
289- else {
290- if (temp > max) {
291- max = temp
292- maxrowKey = sortedSample(i - 1 )
293- }
294- temp = 0
295- }
296- }
297- }
298- val maxKeyStreamedTable = streamedTable.filter(row => {
299- streamSideKeyGenerator(row).toString().equals(maxrowKey.toString())
300- })
301- val mainStreamedTable = streamedTable.filter(row => {
302- ! streamSideKeyGenerator(row).toString().equals(maxrowKey.toString())
303- })
304- val buildRdd = buildPlan.execute()
305- val maxKeyJoinedRdd = maxKeyStreamedTable.map(_.copy()).cartesian(buildRdd.map(_.copy())).map {
306- case (l : Row , r : Row ) => buildRow(l ++ r)
307- }
308- val mainJoinedRdd = mainStreamedTable.map(_.copy()).cartesian(buildRdd.map(_.copy())).map {
309- case (l : Row , r : Row ) => buildRow(l ++ r)
310- }
311- sc.union(maxKeyJoinedRdd, mainJoinedRdd)
312- }
313- }
314240
315241
316- object SkewJoin extends Strategy with PredicateHelper {
317- def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
318- // Find inner joins where at least some predicates can be evaluated by matching hash keys
319- // using the HashFilteredJoin pattern.
320- case SkewFilteredJoin (Inner , leftKeys, rightKeys, condition, left, right) =>
321- val hashJoin =
322- execution.SkewJoin (leftKeys, rightKeys, BuildRight , planLater(left), planLater(right), sparkContext)
323- condition.map(Filter (_, hashJoin)).getOrElse(hashJoin) :: Nil
324- case _ => Nil
325- }
326- }
327-
328- object SkewFilteredJoin extends Logging with PredicateHelper {
329- /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
330- type ReturnType =
331- (JoinType , Seq [Expression ], Seq [Expression ], Option [Expression ], LogicalPlan , LogicalPlan )
332-
333- def unapply (plan : LogicalPlan ): Option [ReturnType ] = plan match {
334- // All predicates can be evaluated for inner join (i.e., those that are in the ON
335- // clause and WHERE clause.)
336- case FilteredOperation (predicates, join@ Join (left, right, Inner , condition)) =>
337- logger.debug(s " Considering Skew inner join on: ${predicates ++ condition}" )
338- splitPredicates(predicates ++ condition, join)
339- case join@ Join (left, right, joinType, condition) =>
340- logger.debug(s " Considering Skew join on: $condition" )
341- splitPredicates(condition.toSeq, join)
342- case _ => None
343- }
0 commit comments