Skip to content

Commit 00bc81e

Browse files
committed
Remove use of fastutil and replace with use of java.io, spark.util and Guava classes
1 parent 98225a6 commit 00bc81e

File tree

19 files changed

+66
-124
lines changed

19 files changed

+66
-124
lines changed

core/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,6 @@
157157
</exclusion>
158158
</exclusions>
159159
</dependency>
160-
<dependency>
161-
<groupId>it.unimi.dsi</groupId>
162-
<artifactId>fastutil</artifactId>
163-
</dependency>
164160
<dependency>
165161
<groupId>colt</groupId>
166162
<artifactId>colt</artifactId>

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818
package org.apache.spark.broadcast
1919

2020
import java.io.{File, FileOutputStream, ObjectInputStream, ObjectOutputStream, OutputStream}
21-
import java.net.{URI, URL, URLConnection}
21+
import java.io.{BufferedInputStream, BufferedOutputStream}
22+
import java.net.{URL, URLConnection, URI}
2223
import java.util.concurrent.TimeUnit
2324

24-
import it.unimi.dsi.fastutil.io.{FastBufferedInputStream, FastBufferedOutputStream}
25-
2625
import org.apache.spark.{HttpServer, Logging, SecurityManager, SparkConf, SparkEnv}
2726
import org.apache.spark.io.CompressionCodec
2827
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
@@ -164,7 +163,7 @@ private[spark] object HttpBroadcast extends Logging {
164163
if (compress) {
165164
compressionCodec.compressedOutputStream(new FileOutputStream(file))
166165
} else {
167-
new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
166+
new BufferedOutputStream(new FileOutputStream(file), bufferSize)
168167
}
169168
}
170169
val ser = SparkEnv.get.serializer.newInstance()
@@ -195,7 +194,7 @@ private[spark] object HttpBroadcast extends Logging {
195194
if (compress) {
196195
compressionCodec.compressedInputStream(inputStream)
197196
} else {
198-
new FastBufferedInputStream(inputStream, bufferSize)
197+
new BufferedInputStream(inputStream, bufferSize)
199198
}
200199
}
201200
val ser = SparkEnv.get.serializer.newInstance()

core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,36 +22,33 @@ import java.util.{HashMap => JHashMap}
2222
import scala.collection.JavaConversions.mapAsScalaMap
2323
import scala.collection.Map
2424
import scala.collection.mutable.HashMap
25+
import scala.reflect.ClassTag
2526

2627
import cern.jet.stat.Probability
27-
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
28+
29+
import org.apache.spark.util.collection.OpenHashMap
2830

2931
/**
3032
* An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval.
3133
*/
32-
private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double)
33-
extends ApproximateEvaluator[OLMap[T], Map[T, BoundedDouble]] {
34+
private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, confidence: Double)
35+
extends ApproximateEvaluator[OpenHashMap[T,Long], Map[T, BoundedDouble]] {
3436

3537
var outputsMerged = 0
36-
var sums = new OLMap[T] // Sum of counts for each key
38+
var sums = new OpenHashMap[T,Long]() // Sum of counts for each key
3739

38-
override def merge(outputId: Int, taskResult: OLMap[T]) {
40+
override def merge(outputId: Int, taskResult: OpenHashMap[T,Long]) {
3941
outputsMerged += 1
40-
val iter = taskResult.object2LongEntrySet.fastIterator()
41-
while (iter.hasNext) {
42-
val entry = iter.next()
43-
sums.put(entry.getKey, sums.getLong(entry.getKey) + entry.getLongValue)
42+
taskResult.foreach{ case (key,value) =>
43+
sums.changeValue(key, value, _ + value)
4444
}
4545
}
4646

4747
override def currentResult(): Map[T, BoundedDouble] = {
4848
if (outputsMerged == totalOutputs) {
4949
val result = new JHashMap[T, BoundedDouble](sums.size)
50-
val iter = sums.object2LongEntrySet.fastIterator()
51-
while (iter.hasNext) {
52-
val entry = iter.next()
53-
val sum = entry.getLongValue()
54-
result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
50+
sums.foreach{ case (key,sum) =>
51+
result(key) = new BoundedDouble(sum, 1.0, sum, sum)
5552
}
5653
result
5754
} else if (outputsMerged == 0) {
@@ -60,16 +57,13 @@ private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Dou
6057
val p = outputsMerged.toDouble / totalOutputs
6158
val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
6259
val result = new JHashMap[T, BoundedDouble](sums.size)
63-
val iter = sums.object2LongEntrySet.fastIterator()
64-
while (iter.hasNext) {
65-
val entry = iter.next()
66-
val sum = entry.getLongValue
60+
sums.foreach{ case (key, sum) =>
6761
val mean = (sum + 1 - p) / p
6862
val variance = (sum + 1) * (1 - p) / (p * p)
6963
val stdev = math.sqrt(variance)
7064
val low = mean - confFactor * stdev
7165
val high = mean + confFactor * stdev
72-
result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
66+
result(key) = new BoundedDouble(mean, confidence, low, high)
7367
}
7468
result
7569
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import scala.collection.mutable.ArrayBuffer
2525
import scala.reflect.{classTag, ClassTag}
2626

2727
import com.clearspring.analytics.stream.cardinality.HyperLogLog
28-
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
2928
import org.apache.hadoop.io.BytesWritable
3029
import org.apache.hadoop.io.compress.CompressionCodec
3130
import org.apache.hadoop.io.NullWritable
@@ -43,6 +42,7 @@ import org.apache.spark.partial.GroupedCountEvaluator
4342
import org.apache.spark.partial.PartialResult
4443
import org.apache.spark.storage.StorageLevel
4544
import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils}
45+
import org.apache.spark.util.collection.OpenHashMap
4646
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}
4747

4848
/**
@@ -834,19 +834,16 @@ abstract class RDD[T: ClassTag](
834834
throw new SparkException("countByValue() does not support arrays")
835835
}
836836
// TODO: This should perhaps be distributed by default.
837-
def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = {
838-
val map = new OLMap[T]
839-
while (iter.hasNext) {
840-
val v = iter.next()
841-
map.put(v, map.getLong(v) + 1L)
837+
def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = {
838+
val map = new OpenHashMap[T,Long]
839+
iter.foreach{
840+
t => map.changeValue(t, 1L, _ + 1L)
842841
}
843842
Iterator(map)
844843
}
845-
def mergeMaps(m1: OLMap[T], m2: OLMap[T]): OLMap[T] = {
846-
val iter = m2.object2LongEntrySet.fastIterator()
847-
while (iter.hasNext) {
848-
val entry = iter.next()
849-
m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
844+
def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = {
845+
m2.foreach{ case (key, value) =>
846+
m1.changeValue(key, value, _ + value)
850847
}
851848
m1
852849
}
@@ -866,11 +863,10 @@ abstract class RDD[T: ClassTag](
866863
if (elementClassTag.runtimeClass.isArray) {
867864
throw new SparkException("countByValueApprox() does not support arrays")
868865
}
869-
val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) =>
870-
val map = new OLMap[T]
871-
while (iter.hasNext) {
872-
val v = iter.next()
873-
map.put(v, map.getLong(v) + 1L)
866+
val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T,Long] = { (ctx, iter) =>
867+
val map = new OpenHashMap[T,Long]
868+
iter.foreach{
869+
t => map.changeValue(t, 1L, _ + 1L)
874870
}
875871
map
876872
}

core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import java.io.InputStream
20+
import java.io.{BufferedInputStream, InputStream}
2121

2222
import scala.io.Source
2323

24-
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
2524
import org.apache.hadoop.fs.{Path, FileSystem}
2625
import org.json4s.jackson.JsonMethods._
2726

@@ -62,7 +61,7 @@ private[spark] class ReplayListenerBus(
6261
var currentLine = "<not started>"
6362
try {
6463
fileStream = Some(fileSystem.open(path))
65-
bufferedStream = Some(new FastBufferedInputStream(fileStream.get))
64+
bufferedStream = Some(new BufferedInputStream(fileStream.get))
6665
compressStream = Some(wrapForCompression(bufferedStream.get))
6766

6867
// Parse each line as an event and post the event to all attached listeners

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import java.io.{DataInputStream, DataOutputStream}
20+
import java.io.{ByteArrayOutputStream, DataInputStream, DataOutputStream}
2121
import java.nio.ByteBuffer
2222

2323
import scala.collection.mutable.HashMap
2424

25-
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
26-
2725
import org.apache.spark.TaskContext
2826
import org.apache.spark.executor.TaskMetrics
2927
import org.apache.spark.serializer.SerializerInstance
@@ -104,7 +102,7 @@ private[spark] object Task {
104102
serializer: SerializerInstance)
105103
: ByteBuffer = {
106104

107-
val out = new FastByteArrayOutputStream(4096)
105+
val out = new ByteArrayOutputStream(4096)
108106
val dataOut = new DataOutputStream(out)
109107

110108
// Write currentFiles
@@ -125,8 +123,7 @@ private[spark] object Task {
125123
dataOut.flush()
126124
val taskBytes = serializer.serialize(task).array()
127125
out.write(taskBytes)
128-
out.trim()
129-
ByteBuffer.wrap(out.array)
126+
ByteBuffer.wrap(out.toByteArray)
130127
}
131128

132129
/**

core/src/main/scala/org/apache/spark/serializer/Serializer.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@
1717

1818
package org.apache.spark.serializer
1919

20-
import java.io.{EOFException, InputStream, OutputStream}
20+
import java.io.{ByteArrayOutputStream, EOFException, InputStream, OutputStream}
2121
import java.nio.ByteBuffer
2222

23-
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
24-
2523
import org.apache.spark.SparkEnv
2624
import org.apache.spark.annotation.DeveloperApi
2725
import org.apache.spark.util.{ByteBufferInputStream, NextIterator}
@@ -73,10 +71,9 @@ trait SerializerInstance {
7371

7472
def serializeMany[T](iterator: Iterator[T]): ByteBuffer = {
7573
// Default implementation uses serializeStream
76-
val stream = new FastByteArrayOutputStream()
74+
val stream = new ByteArrayOutputStream()
7775
serializeStream(stream).writeAll(iterator)
78-
val buffer = ByteBuffer.allocate(stream.position.toInt)
79-
buffer.put(stream.array, 0, stream.position.toInt)
76+
val buffer = ByteBuffer.wrap(stream.toByteArray)
8077
buffer.flip()
8178
buffer
8279
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.storage
1919

20-
import java.io.{File, InputStream, OutputStream}
20+
import java.io.{File, InputStream, OutputStream, BufferedOutputStream, ByteArrayOutputStream}
2121
import java.nio.{ByteBuffer, MappedByteBuffer}
2222

2323
import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -26,7 +26,6 @@ import scala.concurrent.duration._
2626
import scala.util.Random
2727

2828
import akka.actor.{ActorSystem, Cancellable, Props}
29-
import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
3029
import sun.nio.ch.DirectBuffer
3130

3231
import org.apache.spark.{Logging, MapOutputTracker, SecurityManager, SparkConf, SparkEnv, SparkException}
@@ -992,7 +991,7 @@ private[spark] class BlockManager(
992991
outputStream: OutputStream,
993992
values: Iterator[Any],
994993
serializer: Serializer = defaultSerializer) {
995-
val byteStream = new FastBufferedOutputStream(outputStream)
994+
val byteStream = new BufferedOutputStream(outputStream)
996995
val ser = serializer.newInstance()
997996
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
998997
}
@@ -1002,10 +1001,9 @@ private[spark] class BlockManager(
10021001
blockId: BlockId,
10031002
values: Iterator[Any],
10041003
serializer: Serializer = defaultSerializer): ByteBuffer = {
1005-
val byteStream = new FastByteArrayOutputStream(4096)
1004+
val byteStream = new ByteArrayOutputStream(4096)
10061005
dataSerializeStream(blockId, byteStream, values, serializer)
1007-
byteStream.trim()
1008-
ByteBuffer.wrap(byteStream.array)
1006+
ByteBuffer.wrap(byteStream.toByteArray)
10091007
}
10101008

10111009
/**

core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@
1717

1818
package org.apache.spark.storage
1919

20-
import java.io.{FileOutputStream, File, OutputStream}
20+
import java.io.{BufferedOutputStream, FileOutputStream, File, OutputStream}
2121
import java.nio.channels.FileChannel
2222

23-
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
24-
2523
import org.apache.spark.Logging
2624
import org.apache.spark.serializer.{SerializationStream, Serializer}
2725

@@ -119,7 +117,7 @@ private[spark] class DiskBlockObjectWriter(
119117
ts = new TimeTrackingOutputStream(fos)
120118
channel = fos.getChannel()
121119
lastValidPosition = initialPosition
122-
bs = compressStream(new FastBufferedOutputStream(ts, bufferSize))
120+
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
123121
objOut = serializer.newInstance().serializeStream(bs)
124122
initialized = true
125123
this

core/src/main/scala/org/apache/spark/util/FileLogger.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717

1818
package org.apache.spark.util
1919

20-
import java.io._
20+
import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, IOException}
2121
import java.net.URI
2222
import java.text.SimpleDateFormat
2323
import java.util.Date
2424

25-
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
2625
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
2726

2827
import org.apache.spark.{Logging, SparkConf}
@@ -100,7 +99,7 @@ private[spark] class FileLogger(
10099
hadoopDataStream.get
101100
}
102101

103-
val bstream = new FastBufferedOutputStream(dstream, outputBufferSize)
102+
val bstream = new BufferedOutputStream(dstream, outputBufferSize)
104103
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
105104
new PrintWriter(cstream)
106105
}

0 commit comments

Comments
 (0)