Skip to content

Commit 486ed08

Browse files
committed
Merge remote-tracking branch 'upstream/master' into insertNullabilityCheck
2 parents d3747d1 + af2effd commit 486ed08

File tree

43 files changed

+2893
-77
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2893
-77
lines changed

LICENSE

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,22 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
771771
See the License for the specific language governing permissions and
772772
limitations under the License.
773773

774+
========================================================================
775+
For TestTimSort (core/src/test/java/org/apache/spark/util/collection/TestTimSort.java):
776+
========================================================================
777+
Copyright (C) 2015 Stijn de Gouw
778+
779+
Licensed under the Apache License, Version 2.0 (the "License");
780+
you may not use this file except in compliance with the License.
781+
You may obtain a copy of the License at
782+
783+
http://www.apache.org/licenses/LICENSE-2.0
784+
785+
Unless required by applicable law or agreed to in writing, software
786+
distributed under the License is distributed on an "AS IS" BASIS,
787+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
788+
See the License for the specific language governing permissions and
789+
limitations under the License.
774790

775791
========================================================================
776792
For LimitedInputStream

core/src/main/java/org/apache/spark/util/collection/TimSort.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -425,15 +425,14 @@ private void pushRun(int runBase, int runLen) {
425425
private void mergeCollapse() {
426426
while (stackSize > 1) {
427427
int n = stackSize - 2;
428-
if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) {
428+
if ( (n >= 1 && runLen[n-1] <= runLen[n] + runLen[n+1])
429+
|| (n >= 2 && runLen[n-2] <= runLen[n] + runLen[n-1])) {
429430
if (runLen[n - 1] < runLen[n + 1])
430431
n--;
431-
mergeAt(n);
432-
} else if (runLen[n] <= runLen[n + 1]) {
433-
mergeAt(n);
434-
} else {
432+
} else if (runLen[n] > runLen[n + 1]) {
435433
break; // Invariant is established
436434
}
435+
mergeAt(n);
437436
}
438437
}
439438

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -280,15 +280,24 @@ object AccumulatorParam {
280280

281281
// TODO: The multi-thread support in accumulators is kind of lame; check
282282
// if there's a more intuitive way of doing it right
283-
private[spark] object Accumulators {
284-
// Store a WeakReference instead of a StrongReference because this way accumulators can be
285-
// appropriately garbage collected during long-running jobs and release memory
286-
type WeakAcc = WeakReference[Accumulable[_, _]]
287-
val originals = Map[Long, WeakAcc]()
288-
val localAccums = new ThreadLocal[Map[Long, WeakAcc]]() {
289-
override protected def initialValue() = Map[Long, WeakAcc]()
283+
private[spark] object Accumulators extends Logging {
284+
/**
285+
* This global map holds the original accumulator objects that are created on the driver.
286+
* It keeps weak references to these objects so that accumulators can be garbage-collected
287+
* once the RDDs and user-code that reference them are cleaned up.
288+
*/
289+
val originals = Map[Long, WeakReference[Accumulable[_, _]]]()
290+
291+
/**
292+
* This thread-local map holds per-task copies of accumulators; it is used to collect the set
293+
* of accumulator updates to send back to the driver when tasks complete. After tasks complete,
294+
* this map is cleared by `Accumulators.clear()` (see Executor.scala).
295+
*/
296+
private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
297+
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
290298
}
291-
var lastId: Long = 0
299+
300+
private var lastId: Long = 0
292301

293302
def newId(): Long = synchronized {
294303
lastId += 1
@@ -297,16 +306,16 @@ private[spark] object Accumulators {
297306

298307
def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
299308
if (original) {
300-
originals(a.id) = new WeakAcc(a)
309+
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
301310
} else {
302-
localAccums.get()(a.id) = new WeakAcc(a)
311+
localAccums.get()(a.id) = a
303312
}
304313
}
305314

306315
// Clear the local (non-original) accumulators for the current thread
307316
def clear() {
308317
synchronized {
309-
localAccums.get.clear
318+
localAccums.get.clear()
310319
}
311320
}
312321

@@ -320,12 +329,7 @@ private[spark] object Accumulators {
320329
def values: Map[Long, Any] = synchronized {
321330
val ret = Map[Long, Any]()
322331
for ((id, accum) <- localAccums.get) {
323-
// Since we are now storing weak references, we must check whether the underlying data
324-
// is valid.
325-
ret(id) = accum.get match {
326-
case Some(values) => values.localValue
327-
case None => None
328-
}
332+
ret(id) = accum.localValue
329333
}
330334
return ret
331335
}
@@ -341,6 +345,8 @@ private[spark] object Accumulators {
341345
case None =>
342346
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
343347
}
348+
} else {
349+
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
344350
}
345351
}
346352
}

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,10 +188,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
188188
/** Perform broadcast cleanup. */
189189
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
190190
try {
191-
logDebug("Cleaning broadcast " + broadcastId)
191+
logDebug(s"Cleaning broadcast $broadcastId")
192192
broadcastManager.unbroadcast(broadcastId, true, blocking)
193193
listeners.foreach(_.broadcastCleaned(broadcastId))
194-
logInfo("Cleaned broadcast " + broadcastId)
194+
logDebug(s"Cleaned broadcast $broadcastId")
195195
} catch {
196196
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
197197
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,7 +1074,7 @@ private[spark] class BlockManager(
10741074
* Remove all blocks belonging to the given broadcast.
10751075
*/
10761076
def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
1077-
logInfo(s"Removing broadcast $broadcastId")
1077+
logDebug(s"Removing broadcast $broadcastId")
10781078
val blocksToRemove = blockInfo.keys.collect {
10791079
case bid @ BroadcastBlockId(`broadcastId`, _) => bid
10801080
}
@@ -1086,7 +1086,7 @@ private[spark] class BlockManager(
10861086
* Remove a block from both memory and disk.
10871087
*/
10881088
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
1089-
logInfo(s"Removing block $blockId")
1089+
logDebug(s"Removing block $blockId")
10901090
val info = blockInfo.get(blockId).orNull
10911091
if (info != null) {
10921092
info.synchronized {

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class BlockManagerMaster(
6161
tachyonSize: Long): Boolean = {
6262
val res = askDriverWithReply[Boolean](
6363
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
64-
logInfo("Updated info of block " + blockId)
64+
logDebug(s"Updated info of block $blockId")
6565
res
6666
}
6767

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
184184
val entry = entries.remove(blockId)
185185
if (entry != null) {
186186
currentMemory -= entry.size
187-
logInfo(s"Block $blockId of size ${entry.size} dropped from memory (free $freeMemory)")
187+
logDebug(s"Block $blockId of size ${entry.size} dropped from memory (free $freeMemory)")
188188
true
189189
} else {
190190
false
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/**
2+
* Copyright 2015 Stijn de Gouw
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package org.apache.spark.util.collection;
18+
19+
import java.util.*;
20+
21+
/**
22+
* This codes generates a int array which fails the standard TimSort.
23+
*
24+
* The blog that reported the bug
25+
* http://www.envisage-project.eu/timsort-specification-and-verification/
26+
*
27+
* This codes was originally wrote by Stijn de Gouw, modified by Evan Yu to adapt to
28+
* our test suite.
29+
*
30+
* https://github.com/abstools/java-timsort-bug
31+
* https://github.com/abstools/java-timsort-bug/blob/master/LICENSE
32+
*/
33+
public class TestTimSort {
34+
35+
private static final int MIN_MERGE = 32;
36+
37+
/**
38+
* Returns an array of integers that demonstrate the bug in TimSort
39+
*/
40+
public static int[] getTimSortBugTestSet(int length) {
41+
int minRun = minRunLength(length);
42+
List<Long> runs = runsJDKWorstCase(minRun, length);
43+
return createArray(runs, length);
44+
}
45+
46+
private static int minRunLength(int n) {
47+
int r = 0; // Becomes 1 if any 1 bits are shifted off
48+
while (n >= MIN_MERGE) {
49+
r |= (n & 1);
50+
n >>= 1;
51+
}
52+
return n + r;
53+
}
54+
55+
private static int[] createArray(List<Long> runs, int length) {
56+
int[] a = new int[length];
57+
Arrays.fill(a, 0);
58+
int endRun = -1;
59+
for (long len : runs) {
60+
a[endRun += len] = 1;
61+
}
62+
a[length - 1] = 0;
63+
return a;
64+
}
65+
66+
/**
67+
* Fills <code>runs</code> with a sequence of run lengths of the form<br>
68+
* Y_n x_{n,1} x_{n,2} ... x_{n,l_n} <br>
69+
* Y_{n-1} x_{n-1,1} x_{n-1,2} ... x_{n-1,l_{n-1}} <br>
70+
* ... <br>
71+
* Y_1 x_{1,1} x_{1,2} ... x_{1,l_1}<br>
72+
* The Y_i's are chosen to satisfy the invariant throughout execution,
73+
* but the x_{i,j}'s are merged (by <code>TimSort.mergeCollapse</code>)
74+
* into an X_i that violates the invariant.
75+
*
76+
* @param length The sum of all run lengths that will be added to <code>runs</code>.
77+
*/
78+
private static List<Long> runsJDKWorstCase(int minRun, int length) {
79+
List<Long> runs = new ArrayList<Long>();
80+
81+
long runningTotal = 0, Y = minRun + 4, X = minRun;
82+
83+
while (runningTotal + Y + X <= length) {
84+
runningTotal += X + Y;
85+
generateJDKWrongElem(runs, minRun, X);
86+
runs.add(0, Y);
87+
// X_{i+1} = Y_i + x_{i,1} + 1, since runs.get(1) = x_{i,1}
88+
X = Y + runs.get(1) + 1;
89+
// Y_{i+1} = X_{i+1} + Y_i + 1
90+
Y += X + 1;
91+
}
92+
93+
if (runningTotal + X <= length) {
94+
runningTotal += X;
95+
generateJDKWrongElem(runs, minRun, X);
96+
}
97+
98+
runs.add(length - runningTotal);
99+
return runs;
100+
}
101+
102+
/**
103+
* Adds a sequence x_1, ..., x_n of run lengths to <code>runs</code> such that:<br>
104+
* 1. X = x_1 + ... + x_n <br>
105+
* 2. x_j >= minRun for all j <br>
106+
* 3. x_1 + ... + x_{j-2} < x_j < x_1 + ... + x_{j-1} for all j <br>
107+
* These conditions guarantee that TimSort merges all x_j's one by one
108+
* (resulting in X) using only merges on the second-to-last element.
109+
*
110+
* @param X The sum of the sequence that should be added to runs.
111+
*/
112+
private static void generateJDKWrongElem(List<Long> runs, int minRun, long X) {
113+
for (long newTotal; X >= 2 * minRun + 1; X = newTotal) {
114+
//Default strategy
115+
newTotal = X / 2 + 1;
116+
//Specialized strategies
117+
if (3 * minRun + 3 <= X && X <= 4 * minRun + 1) {
118+
// add x_1=MIN+1, x_2=MIN, x_3=X-newTotal to runs
119+
newTotal = 2 * minRun + 1;
120+
} else if (5 * minRun + 5 <= X && X <= 6 * minRun + 5) {
121+
// add x_1=MIN+1, x_2=MIN, x_3=MIN+2, x_4=X-newTotal to runs
122+
newTotal = 3 * minRun + 3;
123+
} else if (8 * minRun + 9 <= X && X <= 10 * minRun + 9) {
124+
// add x_1=MIN+1, x_2=MIN, x_3=MIN+2, x_4=2MIN+2, x_5=X-newTotal to runs
125+
newTotal = 5 * minRun + 5;
126+
} else if (13 * minRun + 15 <= X && X <= 16 * minRun + 17) {
127+
// add x_1=MIN+1, x_2=MIN, x_3=MIN+2, x_4=2MIN+2, x_5=3MIN+4, x_6=X-newTotal to runs
128+
newTotal = 8 * minRun + 9;
129+
}
130+
runs.add(0, X - newTotal);
131+
}
132+
runs.add(0, X);
133+
}
134+
}

core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@ class SorterSuite extends FunSuite {
6565
}
6666
}
6767

68+
// http://www.envisage-project.eu/timsort-specification-and-verification/
69+
test("SPARK-5984 TimSort bug") {
70+
val data = TestTimSort.getTimSortBugTestSet(67108864)
71+
new Sorter(new IntArraySortDataFormat).sort(data, 0, data.length, Ordering.Int)
72+
(0 to data.length - 2).foreach(i => assert(data(i) <= data(i + 1)))
73+
}
74+
6875
/** Runs an experiment several times. */
6976
def runExperiment(name: String, skip: Boolean = false)(f: => Unit, prepare: () => Unit): Unit = {
7077
if (skip) {

docs/graphx-programming-guide.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -663,7 +663,7 @@ val graph: Graph[Int, Float] = ...
663663
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
664664
Iterator((triplet.dstId, "Hi"))
665665
}
666-
def reduceFun(a: Int, b: Int): Int = a + b
666+
def reduceFun(a: String, b: String): String = a + " " + b
667667
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)
668668
{% endhighlight %}
669669

@@ -674,7 +674,7 @@ val graph: Graph[Int, Float] = ...
674674
def msgFun(triplet: EdgeContext[Int, Float, String]) {
675675
triplet.sendToDst("Hi")
676676
}
677-
def reduceFun(a: Int, b: Int): Int = a + b
677+
def reduceFun(a: String, b: String): String = a + " " + b
678678
val result = graph.aggregateMessages[String](msgFun, reduceFun)
679679
{% endhighlight %}
680680

0 commit comments

Comments
 (0)