Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.streaming

import java.io.{IOException, ObjectInputStream}
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
Expand All @@ -33,10 +34,10 @@ import org.apache.spark.util.Utils
* The buffer contains a sequence of RDD's, each containing a sequence of items
*/
class TestOutputStream[T: ClassTag](parent: DStream[T],
val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]())
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
output.add(collected)
}, false) {

// This is to clear the output buffer every it is read from a checkpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.spark.streaming.flume

import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps

Expand Down Expand Up @@ -102,9 +102,8 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
utils.eventsPerBatch, 5)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputQueue)
outputStream.register()

ssc.start()
Expand All @@ -115,11 +114,11 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log

// The eventually is required to ensure that all data in the batch has been processed.
eventually(timeout(10 seconds), interval(100 milliseconds)) {
val flattenOutputBuffer = outputBuffer.flatten
val headers = flattenOutputBuffer.map(_.event.getHeaders.asScala.map {
val flattenOutput = outputQueue.asScala.toSeq.flatten
val headers = flattenOutput.map(_.event.getHeaders.asScala.map {
case (key, value) => (key.toString, value.toString)
}).map(_.asJava)
val bodies = flattenOutputBuffer.map(e => JavaUtils.bytesToString(e.event.getBody))
val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody))
utils.assertOutput(headers.asJava, bodies.asJava)
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.streaming.flume

import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps

Expand Down Expand Up @@ -51,14 +52,14 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
val input = (1 to 100).map { _.toString }
val utils = new FlumeTestUtils
try {
val outputBuffer = startContext(utils.getTestPort(), testCompression)
val outputQueue = startContext(utils.getTestPort(), testCompression)

eventually(timeout(10 seconds), interval(100 milliseconds)) {
utils.writeInput(input.asJava, testCompression)
}

eventually(timeout(10 seconds), interval(100 milliseconds)) {
val outputEvents = outputBuffer.flatten.map { _.event }
val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event }
outputEvents.foreach {
event =>
event.getHeaders.get("test") should be("header")
Expand All @@ -76,16 +77,15 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w

/** Setup and start the streaming context */
private def startContext(
testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = {
testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = {
ssc = new StreamingContext(conf, Milliseconds(200))
val flumeStream = FlumeUtils.createStream(
ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputQueue)
outputStream.register()
ssc.start()
outputBuffer
outputQueue
}

/** Class to create socket channel with compression */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.spark.streaming.kafka

import java.io.File
import java.util.Arrays
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps

Expand Down Expand Up @@ -101,8 +102,7 @@ class DirectKafkaStreamSuite
ssc, kafkaParams, topics)
}

val allReceived =
new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
val allReceived = new ConcurrentLinkedQueue[(String, String)]()

// hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
Expand Down Expand Up @@ -131,11 +131,12 @@ class DirectKafkaStreamSuite
assert(partSize === rangeSize, "offset ranges are wrong")
}
}
stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
assert(allReceived.size === totalSent,
"didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))
}
ssc.stop()
}
Expand Down Expand Up @@ -173,8 +174,8 @@ class DirectKafkaStreamSuite
"Start offset not from latest"
)

val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
val collectedData = new ConcurrentLinkedQueue[String]()
stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
val newData = Map("b" -> 10)
kafkaTestUtils.sendMessages(topic, newData)
Expand Down Expand Up @@ -219,8 +220,8 @@ class DirectKafkaStreamSuite
"Start offset not from latest"
)

val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
val collectedData = new ConcurrentLinkedQueue[String]()
stream.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
val newData = Map("b" -> 10)
kafkaTestUtils.sendMessages(topic, newData)
Expand Down Expand Up @@ -265,7 +266,7 @@ class DirectKafkaStreamSuite
// This is to collect the raw data received from Kafka
kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
val data = rdd.map { _._2 }.collect()
DirectKafkaStreamSuite.collectedData.appendAll(data)
DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))
}

// This is ensure all the data is eventually receiving only once
Expand Down Expand Up @@ -335,14 +336,14 @@ class DirectKafkaStreamSuite
ssc, kafkaParams, Set(topic))
}

val allReceived =
new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
val allReceived = new ConcurrentLinkedQueue[(String, String)]

stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
assert(allReceived.size === totalSent,
"didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))

// Calculate all the record number collected in the StreamingListener.
assert(collector.numRecordsSubmitted.get() === totalSent)
Expand Down Expand Up @@ -389,17 +390,16 @@ class DirectKafkaStreamSuite
}
}

val collectedData =
new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]]
val collectedData = new ConcurrentLinkedQueue[Array[String]]()

// Used for assertion failure messages.
def dataToString: String =
collectedData.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")

// This is to collect the raw data received from Kafka
kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
val data = rdd.map { _._2 }.collect()
collectedData += data
collectedData.add(data)
}

ssc.start()
Expand All @@ -415,7 +415,7 @@ class DirectKafkaStreamSuite
eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
// Assert that rate estimator values are used to determine maxMessagesPerPartition.
// Funky "-" in message makes the complete assertion message read better.
assert(collectedData.exists(_.size == expectedSize),
assert(collectedData.asScala.exists(_.size == expectedSize),
s" - No arrays of size $expectedSize for rate $rate found in $dataToString")
}
}
Expand All @@ -433,7 +433,7 @@ class DirectKafkaStreamSuite
}

object DirectKafkaStreamSuite {
val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
val collectedData = new ConcurrentLinkedQueue[String]()
@volatile var total = -1L

class InputInfoCollector extends StreamingListener {
Expand Down Expand Up @@ -468,4 +468,3 @@ private[streaming] class ConstantEstimator(@volatile private var rate: Long)
processingDelay: Long,
schedulingDelay: Long): Option[Double] = Some(rate)
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package org.apache.spark.streaming.receiver

import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import com.google.common.base.Throwables
Expand Down Expand Up @@ -83,7 +84,7 @@ private[streaming] class ReceiverSupervisorImpl(
cleanupOldBlocks(threshTime)
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
registeredBlockGenerators.foreach { bg =>
registeredBlockGenerators.asScala.foreach { bg =>
bg.updateRate(eps)
}
}
Expand All @@ -92,8 +93,7 @@ private[streaming] class ReceiverSupervisorImpl(
/** Unique block ids if one wants to add blocks directly */
private val newBlockId = new AtomicLong(System.currentTimeMillis())

private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]
with mutable.SynchronizedBuffer[BlockGenerator]
private val registeredBlockGenerators = new ConcurrentLinkedQueue[BlockGenerator]()

/** Divides received data records into data blocks for pushing in BlockManager. */
private val defaultBlockGeneratorListener = new BlockGeneratorListener {
Expand Down Expand Up @@ -170,11 +170,11 @@ private[streaming] class ReceiverSupervisorImpl(
}

override protected def onStart() {
registeredBlockGenerators.foreach { _.start() }
registeredBlockGenerators.asScala.foreach { _.start() }
}

override protected def onStop(message: String, error: Option[Throwable]) {
registeredBlockGenerators.foreach { _.stop() }
registeredBlockGenerators.asScala.foreach { _.stop() }
env.rpcEnv.stop(endpoint)
}

Expand All @@ -194,10 +194,11 @@ private[streaming] class ReceiverSupervisorImpl(
override def createBlockGenerator(
blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
// Cleanup BlockGenerators that have already been stopped
registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }
val stoppedGenerators = registeredBlockGenerators.asScala.filter{ _.isStopped() }
stoppedGenerators.foreach(registeredBlockGenerators.remove(_))

val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
registeredBlockGenerators += newBlockGenerator
registeredBlockGenerators.add(newBlockGenerator)
newBlockGenerator
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).
map { case (outputOpId, outputOpIdAndSparkJobIds) =>
// sort SparkJobIds for each OutputOpId
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).toSeq.sorted)
}

val outputOps: Seq[(OutputOperationUIData, Seq[SparkJobId])] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private[ui] case class BatchUIData(
val processingStartTime: Option[Long],
val processingEndTime: Option[Long],
val outputOperations: mutable.HashMap[OutputOpId, OutputOperationUIData] = mutable.HashMap(),
var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
var outputOpIdSparkJobIdPairs: Iterable[OutputOpIdAndSparkJobId] = Seq.empty) {

/**
* Time taken for the first job of this batch to start processing from the time this batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.spark.streaming.ui

import java.util.{LinkedHashMap, Map => JMap, Properties}
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.mutable.{ArrayBuffer, HashMap, Queue, SynchronizedBuffer}
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, Queue}

import org.apache.spark.scheduler._
import org.apache.spark.streaming.{StreamingContext, Time}
Expand All @@ -41,9 +43,9 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
// we may not be able to get the corresponding BatchUIData when receiving onJobStart. So here we
// cannot use a map of (Time, BatchUIData).
private[ui] val batchTimeToOutputOpIdSparkJobIdPair =
new LinkedHashMap[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]] {
new LinkedHashMap[Time, ConcurrentLinkedQueue[OutputOpIdAndSparkJobId]] {
override def removeEldestEntry(
p1: JMap.Entry[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]]): Boolean = {
p1: JMap.Entry[Time, ConcurrentLinkedQueue[OutputOpIdAndSparkJobId]]): Boolean = {
// If a lot of "onBatchCompleted"s happen before "onJobStart" (image if
// SparkContext.listenerBus is very slow), "batchTimeToOutputOpIdToSparkJobIds"
// may add some information for a removed batch when processing "onJobStart". It will be a
Expand Down Expand Up @@ -131,12 +133,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
var outputOpIdToSparkJobIds = batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)
if (outputOpIdToSparkJobIds == null) {
outputOpIdToSparkJobIds =
new ArrayBuffer[OutputOpIdAndSparkJobId]()
with SynchronizedBuffer[OutputOpIdAndSparkJobId]
outputOpIdToSparkJobIds = new ConcurrentLinkedQueue[OutputOpIdAndSparkJobId]()
batchTimeToOutputOpIdSparkJobIdPair.put(batchTime, outputOpIdToSparkJobIds)
}
outputOpIdToSparkJobIds += OutputOpIdAndSparkJobId(outputOpId, jobStart.jobId)
outputOpIdToSparkJobIds.add(OutputOpIdAndSparkJobId(outputOpId, jobStart.jobId))
}
}

Expand Down Expand Up @@ -256,8 +256,11 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
batchUIData.foreach { _batchUIData =>
val outputOpIdToSparkJobIds =
Option(batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)).getOrElse(Seq.empty)
// We use an Iterable rather than explicitly converting to a seq so that updates
// will propegate
val outputOpIdToSparkJobIds: Iterable[OutputOpIdAndSparkJobId] =
Option(batchTimeToOutputOpIdSparkJobIdPair.get(batchTime).asScala)
.getOrElse(Seq.empty)
_batchUIData.outputOpIdSparkJobIdPairs = outputOpIdToSparkJobIds
}
batchUIData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ trait JavaTestBase extends TestSuiteBase {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
ssc.getState()
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
res.map(_.asJava).asJava
res.map(_.asJava).toSeq.asJava
}

/**
Expand All @@ -85,7 +85,7 @@ trait JavaTestBase extends TestSuiteBase {
implicit val cm: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
res.map(entry => entry.map(_.asJava).asJava).asJava
res.map(entry => entry.map(_.asJava).asJava).toSeq.asJava
}
}

Expand Down
Loading