Skip to content

Commit 97fd174

Browse files
committed
fix thread safe
1 parent 743ef16 commit 97fd174

File tree

3 files changed

+7
-7
lines changed

3 files changed

+7
-7
lines changed

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class ExternalAppendOnlyMap[K, V, C](
8181
this(createCombiner, mergeValue, mergeCombiners, serializer, blockManager, TaskContext.get())
8282
}
8383

84-
private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
84+
@volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
8585
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
8686
private val sparkConf = SparkEnv.get.conf
8787
private val diskBlockManager = blockManager.diskBlockManager
@@ -115,7 +115,7 @@ class ExternalAppendOnlyMap[K, V, C](
115115
private val keyComparator = new HashComparator[K]
116116
private val ser = serializer.newInstance()
117117

118-
private var readingIterator: SpillableIterator = null
118+
@volatile private var readingIterator: SpillableIterator = null
119119

120120
/**
121121
* Number of files this map has spilled so far.

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ private[spark] class ExternalSorter[K, V, C](
124124
// Data structures to store in-memory objects before we spill. Depending on whether we have an
125125
// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
126126
// store them in an array buffer.
127-
private var map = new PartitionedAppendOnlyMap[K, C]
128-
private var buffer = new PartitionedPairBuffer[K, C]
127+
@volatile private var map = new PartitionedAppendOnlyMap[K, C]
128+
@volatile private var buffer = new PartitionedPairBuffer[K, C]
129129

130130
// Total spilling statistics
131131
private var _diskBytesSpilled = 0L
@@ -135,9 +135,9 @@ private[spark] class ExternalSorter[K, V, C](
135135
private var _peakMemoryUsedBytes: Long = 0L
136136
def peakMemoryUsedBytes: Long = _peakMemoryUsedBytes
137137

138-
private var isShuffleSort: Boolean = true
138+
@volatile private var isShuffleSort: Boolean = true
139139
private val forceSpillFiles = new ArrayBuffer[SpilledFile]
140-
private var readingIterator: SpillableIterator = null
140+
@volatile private var readingIterator: SpillableIterator = null
141141

142142
// A comparator for keys K that orders them within a partition to allow aggregation or sorting.
143143
// Can be a partial ordering by hash code if a total ordering is not provided through by the

core/src/main/scala/org/apache/spark/util/collection/Spillable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
5858
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MaxValue)
5959

6060
// Threshold for this collection's size in bytes before we start tracking its memory usage
61-
// To avoid memory leak for rdd.first(), initialize this to a value orders of magnitude > 0
61+
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
6262
private[this] var myMemoryThreshold = initialMemoryThreshold
6363

6464
// Number of elements read from input since last spill

0 commit comments

Comments
 (0)