@@ -237,13 +237,14 @@ case class BroadcastNestedLoopJoin(
237237 }
238238}
239239
240- /*
241- In some case ,data skew happens.SkewJoin sample the table rdd to find the largest key,then make the largest key
242- rows as a table rdd.The streamed rdd will be made as mainstreamedtable rdd without the largest key and the maxkeystreamedtable rdd
243- with the largest key.
244- Then,join the two table with the buildtable.
245- Finally,union the two result rdd.
246- */
240+ /**
241+ * :: DeveloperApi ::
242+ *In some case ,data skew happens.SkewJoin sample the table rdd to find the largest key,then make the largest key
243+ *rows as a table rdd.The streamed rdd will be made as mainstreamedtable rdd without the largest key and the maxkeystreamedtable rdd
244+ *with the largest key.
245+ *Then,join the two table with the buildtable.
246+ *Finally,union the two result rdd.
247+ */
247248@ DeveloperApi
248249case class SkewJoin (
249250 leftKeys : Seq [Expression ],
@@ -273,41 +274,41 @@ case class SkewJoin(
273274
274275
275276 def execute () = {
276- val streamedtable = streamedPlan.execute()
277+ val streamedTable = streamedPlan.execute()
277278 // This will later write as configuration
278- val sample = streamedtable .sample(false , 0.3 , 9 ).map(row => streamSideKeyGenerator(row)).collect()
279- val sortedsample = sample.sortWith((row1, row2) => row1.hashCode() > row2.hashCode())
279+ val sample = streamedTable .sample(false , 0.3 , 9 ).map(row => streamSideKeyGenerator(row)).collect()
280+ val sortedSample = sample.sortWith((row1, row2) => row1.hashCode() > row2.hashCode())
280281 var max = 0
281282 var num = sample.size - 1
282283 var temp = 0
283- var maxrowkey = sortedsample (0 )
284+ var maxrowKey = sortedSample (0 )
284285 // find the largest key
285- if (sortedsample .size > 1 ) {
286+ if (sortedSample .size > 1 ) {
286287 for (i <- 1 to num) {
287- if (sortedsample (i - 1 ) == sortedsample (i)) temp += 1
288+ if (sortedSample (i - 1 ) == sortedSample (i)) temp += 1
288289 else {
289290 if (temp > max) {
290291 max = temp
291- maxrowkey = sortedsample (i - 1 )
292+ maxrowKey = sortedSample (i - 1 )
292293 }
293294 temp = 0
294295 }
295296 }
296297 }
297- val maxkeystreamedtable = streamedtable .filter(row => {
298- streamSideKeyGenerator(row).toString().equals(maxrowkey .toString())
298+ val maxKeyStreamedTable = streamedTable .filter(row => {
299+ streamSideKeyGenerator(row).toString().equals(maxrowKey .toString())
299300 })
300- val mainstreamedtable = streamedtable .filter(row => {
301- ! streamSideKeyGenerator(row).toString().equals(maxrowkey .toString())
301+ val mainStreamedTable = streamedTable .filter(row => {
302+ ! streamSideKeyGenerator(row).toString().equals(maxrowKey .toString())
302303 })
303- val buildrdd = buildPlan.execute()
304- val maxkeyjoinedrdd = maxkeystreamedtable .map(_.copy()).cartesian(buildrdd .map(_.copy())).map {
304+ val buildRdd = buildPlan.execute()
305+ val maxKeyJoinedRdd = maxKeyStreamedTable .map(_.copy()).cartesian(buildRdd .map(_.copy())).map {
305306 case (l : Row , r : Row ) => buildRow(l ++ r)
306307 }
307- val mainjoinedrdd = mainstreamedtable .map(_.copy()).cartesian(buildrdd .map(_.copy())).map {
308+ val mainJoinedRdd = mainStreamedTable .map(_.copy()).cartesian(buildRdd .map(_.copy())).map {
308309 case (l : Row , r : Row ) => buildRow(l ++ r)
309310 }
310- sc.union(maxkeyjoinedrdd, mainjoinedrdd )
311+ sc.union(maxKeyJoinedRdd, mainJoinedRdd )
311312 }
312313}
313314
0 commit comments