Skip to content

Commit 1cc689f

Browse files
committed
ConcurrentHashMap -> SynchronizedMap
1 parent db26c3a commit 1cc689f

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.io.{IOException, ObjectInputStream}
2121
import java.util.concurrent.ConcurrentHashMap
2222

2323
import scala.collection.mutable
24-
import scala.collection.JavaConverters._
2524
import scala.reflect.ClassTag
2625

2726
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
@@ -98,7 +97,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
9897
// Map of batch-time to selected file info for the remembered batches
9998
// This is a concurrent map because it's also accessed in unit tests
10099
@transient private[streaming] var batchTimeToSelectedFiles =
101-
new ConcurrentHashMap[Time, Array[String]].asScala
100+
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
102101

103102
// Set of files that were selected in the remembered batches
104103
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()
@@ -273,7 +272,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
273272
logDebug(this.getClass().getSimpleName + ".readObject used")
274273
ois.defaultReadObject()
275274
generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
276-
batchTimeToSelectedFiles = new ConcurrentHashMap[Time, Array[String]].asScala
275+
batchTimeToSelectedFiles =
276+
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
277277
recentlySelectedFiles = new mutable.HashSet[String]()
278278
fileToModTime = new TimeStampedHashMap[String, Long](true)
279279
}

0 commit comments

Comments
 (0)