Skip to content

Commit 68815b2

Browse files
committed
Reformat the code style
Reformat the name and anotation.Thanks rxin .If there are other code style problems ,i will try to reformat it .
1 parent aa06072 commit 68815b2

File tree

1 file changed

+23
-22
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution

1 file changed

+23
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -237,13 +237,14 @@ case class BroadcastNestedLoopJoin(
237237
}
238238
}
239239

240-
/*
241-
In some case ,data skew happens.SkewJoin sample the table rdd to find the largest key,then make the largest key
242-
rows as a table rdd.The streamed rdd will be made as mainstreamedtable rdd without the largest key and the maxkeystreamedtable rdd
243-
with the largest key.
244-
Then,join the two table with the buildtable.
245-
Finally,union the two result rdd.
246-
*/
240+
/**
241+
* :: DeveloperApi ::
242+
*In some case ,data skew happens.SkewJoin sample the table rdd to find the largest key,then make the largest key
243+
*rows as a table rdd.The streamed rdd will be made as mainstreamedtable rdd without the largest key and the maxkeystreamedtable rdd
244+
*with the largest key.
245+
*Then,join the two table with the buildtable.
246+
*Finally,union the two result rdd.
247+
*/
247248
@DeveloperApi
248249
case class SkewJoin(
249250
leftKeys: Seq[Expression],
@@ -273,41 +274,41 @@ case class SkewJoin(
273274

274275

275276
def execute() = {
276-
val streamedtable = streamedPlan.execute()
277+
val streamedTable = streamedPlan.execute()
277278
//This will later write as configuration
278-
val sample = streamedtable.sample(false, 0.3, 9).map(row => streamSideKeyGenerator(row)).collect()
279-
val sortedsample = sample.sortWith((row1, row2) => row1.hashCode() > row2.hashCode())
279+
val sample = streamedTable.sample(false, 0.3, 9).map(row => streamSideKeyGenerator(row)).collect()
280+
val sortedSample = sample.sortWith((row1, row2) => row1.hashCode() > row2.hashCode())
280281
var max = 0
281282
var num = sample.size - 1
282283
var temp = 0
283-
var maxrowkey = sortedsample(0)
284+
var maxrowKey = sortedSample(0)
284285
//find the largest key
285-
if (sortedsample.size > 1) {
286+
if (sortedSample.size > 1) {
286287
for (i <- 1 to num) {
287-
if (sortedsample(i - 1) == sortedsample(i)) temp += 1
288+
if (sortedSample(i - 1) == sortedSample(i)) temp += 1
288289
else {
289290
if (temp > max) {
290291
max = temp
291-
maxrowkey = sortedsample(i - 1)
292+
maxrowKey = sortedSample(i - 1)
292293
}
293294
temp = 0
294295
}
295296
}
296297
}
297-
val maxkeystreamedtable = streamedtable.filter(row => {
298-
streamSideKeyGenerator(row).toString().equals(maxrowkey.toString())
298+
val maxKeyStreamedTable = streamedTable.filter(row => {
299+
streamSideKeyGenerator(row).toString().equals(maxrowKey.toString())
299300
})
300-
val mainstreamedtable = streamedtable.filter(row => {
301-
!streamSideKeyGenerator(row).toString().equals(maxrowkey.toString())
301+
val mainStreamedTable = streamedTable.filter(row => {
302+
!streamSideKeyGenerator(row).toString().equals(maxrowKey.toString())
302303
})
303-
val buildrdd = buildPlan.execute()
304-
val maxkeyjoinedrdd = maxkeystreamedtable.map(_.copy()).cartesian(buildrdd.map(_.copy())).map {
304+
val buildRdd = buildPlan.execute()
305+
val maxKeyJoinedRdd = maxKeyStreamedTable.map(_.copy()).cartesian(buildRdd.map(_.copy())).map {
305306
case (l: Row, r: Row) => buildRow(l ++ r)
306307
}
307-
val mainjoinedrdd = mainstreamedtable.map(_.copy()).cartesian(buildrdd.map(_.copy())).map {
308+
val mainJoinedRdd = mainStreamedTable.map(_.copy()).cartesian(buildRdd.map(_.copy())).map {
308309
case (l: Row, r: Row) => buildRow(l ++ r)
309310
}
310-
sc.union(maxkeyjoinedrdd, mainjoinedrdd)
311+
sc.union(maxKeyJoinedRdd, mainJoinedRdd)
311312
}
312313
}
313314

0 commit comments

Comments
 (0)