Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>
<dependency>
<groupId>colt</groupId>
<artifactId>colt</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
package org.apache.spark.broadcast

import java.io.{File, FileOutputStream, ObjectInputStream, ObjectOutputStream, OutputStream}
import java.net.{URI, URL, URLConnection}
import java.io.{BufferedInputStream, BufferedOutputStream}
import java.net.{URL, URLConnection, URI}
import java.util.concurrent.TimeUnit

import it.unimi.dsi.fastutil.io.{FastBufferedInputStream, FastBufferedOutputStream}

import org.apache.spark.{HttpServer, Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
Expand Down Expand Up @@ -164,7 +163,7 @@ private[spark] object HttpBroadcast extends Logging {
if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file))
} else {
new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
new BufferedOutputStream(new FileOutputStream(file), bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
Expand Down Expand Up @@ -195,7 +194,7 @@ private[spark] object HttpBroadcast extends Logging {
if (compress) {
compressionCodec.compressedInputStream(inputStream)
} else {
new FastBufferedInputStream(inputStream, bufferSize)
new BufferedInputStream(inputStream, bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,33 @@ import java.util.{HashMap => JHashMap}
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag

import cern.jet.stat.Probability
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}

import org.apache.spark.util.collection.OpenHashMap

/**
* An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval.
*/
private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[OLMap[T], Map[T, BoundedDouble]] {
private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[OpenHashMap[T,Long], Map[T, BoundedDouble]] {

var outputsMerged = 0
var sums = new OLMap[T] // Sum of counts for each key
var sums = new OpenHashMap[T,Long]() // Sum of counts for each key

override def merge(outputId: Int, taskResult: OLMap[T]) {
override def merge(outputId: Int, taskResult: OpenHashMap[T,Long]) {
outputsMerged += 1
val iter = taskResult.object2LongEntrySet.fastIterator()
while (iter.hasNext) {
val entry = iter.next()
sums.put(entry.getKey, sums.getLong(entry.getKey) + entry.getLongValue)
taskResult.foreach { case (key, value) =>
sums.changeValue(key, value, _ + value)
}
}

override def currentResult(): Map[T, BoundedDouble] = {
if (outputsMerged == totalOutputs) {
val result = new JHashMap[T, BoundedDouble](sums.size)
val iter = sums.object2LongEntrySet.fastIterator()
while (iter.hasNext) {
val entry = iter.next()
val sum = entry.getLongValue()
result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
sums.foreach { case (key, sum) =>
result(key) = new BoundedDouble(sum, 1.0, sum, sum)
}
result
} else if (outputsMerged == 0) {
Expand All @@ -60,16 +57,13 @@ private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Dou
val p = outputsMerged.toDouble / totalOutputs
val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
val result = new JHashMap[T, BoundedDouble](sums.size)
val iter = sums.object2LongEntrySet.fastIterator()
while (iter.hasNext) {
val entry = iter.next()
val sum = entry.getLongValue
sums.foreach { case (key, sum) =>
val mean = (sum + 1 - p) / p
val variance = (sum + 1) * (1 - p) / (p * p)
val stdev = math.sqrt(variance)
val low = mean - confFactor * stdev
val high = mean + confFactor * stdev
result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
result(key) = new BoundedDouble(mean, confidence, low, high)
}
result
}
Expand Down
34 changes: 16 additions & 18 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ package org.apache.spark.rdd
import java.util.Random

import scala.collection.Map
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{classTag, ClassTag}

import com.clearspring.analytics.stream.cardinality.HyperLogLog
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.NullWritable
Expand All @@ -43,6 +41,7 @@ import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}

/**
Expand Down Expand Up @@ -834,24 +833,24 @@ abstract class RDD[T: ClassTag](
throw new SparkException("countByValue() does not support arrays")
}
// TODO: This should perhaps be distributed by default.
def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = {
val map = new OLMap[T]
while (iter.hasNext) {
val v = iter.next()
map.put(v, map.getLong(v) + 1L)
def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = {
val map = new OpenHashMap[T,Long]
iter.foreach {
t => map.changeValue(t, 1L, _ + 1L)
}
Iterator(map)
}
def mergeMaps(m1: OLMap[T], m2: OLMap[T]): OLMap[T] = {
val iter = m2.object2LongEntrySet.fastIterator()
while (iter.hasNext) {
val entry = iter.next()
m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = {
m2.foreach { case (key, value) =>
m1.changeValue(key, value, _ + value)
}
m1
}
val myResult = mapPartitions(countPartition).reduce(mergeMaps)
myResult.asInstanceOf[java.util.Map[T, Long]] // Will be wrapped as a Scala mutable Map
// Convert to a Scala mutable map
val mutableResult = scala.collection.mutable.Map[T,Long]()
myResult.foreach { case (k, v) => mutableResult.put(k, v) }
mutableResult
}

/**
Expand All @@ -866,11 +865,10 @@ abstract class RDD[T: ClassTag](
if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValueApprox() does not support arrays")
}
val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) =>
val map = new OLMap[T]
while (iter.hasNext) {
val v = iter.next()
map.put(v, map.getLong(v) + 1L)
val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T,Long] = { (ctx, iter) =>
val map = new OpenHashMap[T,Long]
iter.foreach {
t => map.changeValue(t, 1L, _ + 1L)
}
map
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.spark.scheduler

import java.io.InputStream
import java.io.{BufferedInputStream, InputStream}

import scala.io.Source

import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import org.apache.hadoop.fs.{Path, FileSystem}
import org.json4s.jackson.JsonMethods._

Expand Down Expand Up @@ -62,7 +61,7 @@ private[spark] class ReplayListenerBus(
var currentLine = "<not started>"
try {
fileStream = Some(fileSystem.open(path))
bufferedStream = Some(new FastBufferedInputStream(fileStream.get))
bufferedStream = Some(new BufferedInputStream(fileStream.get))
compressStream = Some(wrapForCompression(bufferedStream.get))

// Parse each line as an event and post the event to all attached listeners
Expand Down
9 changes: 3 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package org.apache.spark.scheduler

import java.io.{DataInputStream, DataOutputStream}
import java.io.{ByteArrayOutputStream, DataInputStream, DataOutputStream}
import java.nio.ByteBuffer

import scala.collection.mutable.HashMap

import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream

import org.apache.spark.TaskContext
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.serializer.SerializerInstance
Expand Down Expand Up @@ -104,7 +102,7 @@ private[spark] object Task {
serializer: SerializerInstance)
: ByteBuffer = {

val out = new FastByteArrayOutputStream(4096)
val out = new ByteArrayOutputStream(4096)
val dataOut = new DataOutputStream(out)

// Write currentFiles
Expand All @@ -125,8 +123,7 @@ private[spark] object Task {
dataOut.flush()
val taskBytes = serializer.serialize(task).array()
out.write(taskBytes)
out.trim()
ByteBuffer.wrap(out.array)
ByteBuffer.wrap(out.toByteArray)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does seem pretty bad ....

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.serializer

import java.io.{EOFException, InputStream, OutputStream}
import java.io.{ByteArrayOutputStream, EOFException, InputStream, OutputStream}
import java.nio.ByteBuffer

import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream

import org.apache.spark.SparkEnv
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.{ByteBufferInputStream, NextIterator}
Expand Down Expand Up @@ -73,10 +71,9 @@ trait SerializerInstance {

def serializeMany[T](iterator: Iterator[T]): ByteBuffer = {
// Default implementation uses serializeStream
val stream = new FastByteArrayOutputStream()
val stream = new ByteArrayOutputStream()
serializeStream(stream).writeAll(iterator)
val buffer = ByteBuffer.allocate(stream.position.toInt)
buffer.put(stream.array, 0, stream.position.toInt)
val buffer = ByteBuffer.wrap(stream.toByteArray)
buffer.flip()
buffer
}
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import java.io.{File, InputStream, OutputStream}
import java.io.{File, InputStream, OutputStream, BufferedOutputStream, ByteArrayOutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.collection.mutable.{ArrayBuffer, HashMap}
Expand All @@ -26,7 +26,6 @@ import scala.concurrent.duration._
import scala.util.Random

import akka.actor.{ActorSystem, Cancellable, Props}
import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
import sun.nio.ch.DirectBuffer

import org.apache.spark.{Logging, MapOutputTracker, SecurityManager, SparkConf, SparkEnv, SparkException}
Expand Down Expand Up @@ -992,7 +991,7 @@ private[spark] class BlockManager(
outputStream: OutputStream,
values: Iterator[Any],
serializer: Serializer = defaultSerializer) {
val byteStream = new FastBufferedOutputStream(outputStream)
val byteStream = new BufferedOutputStream(outputStream)
val ser = serializer.newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}
Expand All @@ -1002,10 +1001,9 @@ private[spark] class BlockManager(
blockId: BlockId,
values: Iterator[Any],
serializer: Serializer = defaultSerializer): ByteBuffer = {
val byteStream = new FastByteArrayOutputStream(4096)
val byteStream = new ByteArrayOutputStream(4096)
dataSerializeStream(blockId, byteStream, values, serializer)
byteStream.trim()
ByteBuffer.wrap(byteStream.array)
ByteBuffer.wrap(byteStream.toByteArray)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.storage

import java.io.{FileOutputStream, File, OutputStream}
import java.io.{BufferedOutputStream, FileOutputStream, File, OutputStream}
import java.nio.channels.FileChannel

import it.unimi.dsi.fastutil.io.FastBufferedOutputStream

import org.apache.spark.Logging
import org.apache.spark.serializer.{SerializationStream, Serializer}

Expand Down Expand Up @@ -119,7 +117,7 @@ private[spark] class DiskBlockObjectWriter(
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
lastValidPosition = initialPosition
bs = compressStream(new FastBufferedOutputStream(ts, bufferSize))
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
initialized = true
this
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package org.apache.spark.util

import java.io._
import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, IOException}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date

import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import org.apache.hadoop.fs.{FSDataOutputStream, Path}

import org.apache.spark.{Logging, SparkConf}
Expand Down Expand Up @@ -100,7 +99,7 @@ private[spark] class FileLogger(
hadoopDataStream.get
}

val bstream = new FastBufferedOutputStream(dstream, outputBufferSize)
val bstream = new BufferedOutputStream(dstream, outputBufferSize)
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
new PrintWriter(cstream)
}
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable.ArrayBuffer

import it.unimi.dsi.fastutil.ints.IntOpenHashSet

import org.apache.spark.Logging
import org.apache.spark.util.collection.OpenHashSet

/**
* Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
Expand Down Expand Up @@ -207,7 +206,7 @@ private[spark] object SizeEstimator extends Logging {
// Estimate the size of a large array by sampling elements without replacement.
var size = 0.0
val rand = new Random(42)
val drawn = new IntOpenHashSet(ARRAY_SAMPLE_SIZE)
val drawn = new OpenHashSet[Int](ARRAY_SAMPLE_SIZE)
for (i <- 0 until ARRAY_SAMPLE_SIZE) {
var index = 0
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.util.collection

import java.util.{Arrays, Comparator}

import com.google.common.hash.Hashing

import org.apache.spark.annotation.DeveloperApi

/**
Expand Down Expand Up @@ -199,11 +201,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)

/**
* Re-hash a value to deal better with hash functions that don't differ in the lower bits.
* We use the Murmur Hash 3 finalization step that's also used in fastutil.
*/
private def rehash(h: Int): Int = {
it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
}
private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()

/** Double the table's size and re-hash everything */
protected def growTable() {
Expand Down
Loading