Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.spark.streaming.kafka

import java.io.File
import java.util.Collections
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
Expand Down Expand Up @@ -101,8 +104,7 @@ class DirectKafkaStreamSuite
ssc, kafkaParams, topics)
}

val allReceived =
new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
val allReceived = new ConcurrentLinkedQueue[(String, String)]

// hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
Expand Down Expand Up @@ -131,11 +133,11 @@ class DirectKafkaStreamSuite
assert(partSize === rangeSize, "offset ranges are wrong")
}
}
stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
assert(allReceived.size === totalSent,
"didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
"didn't get expected number of messages, messages:\n" + allReceived.asScala.mkString("\n"))
}
ssc.stop()
}
Expand Down Expand Up @@ -173,8 +175,8 @@ class DirectKafkaStreamSuite
"Start offset not from latest"
)

val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
val collectedData = new ConcurrentLinkedQueue[String]()
stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
val newData = Map("b" -> 10)
kafkaTestUtils.sendMessages(topic, newData)
Expand Down Expand Up @@ -219,8 +221,8 @@ class DirectKafkaStreamSuite
"Start offset not from latest"
)

val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
val collectedData = new ConcurrentLinkedQueue[String]()
stream.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
val newData = Map("b" -> 10)
kafkaTestUtils.sendMessages(topic, newData)
Expand Down Expand Up @@ -265,7 +267,7 @@ class DirectKafkaStreamSuite
// This is to collect the raw data received from Kafka
kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
val data = rdd.map { _._2 }.collect()
DirectKafkaStreamSuite.collectedData.appendAll(data)
DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))
}

// This is ensure all the data is eventually receiving only once
Expand Down Expand Up @@ -335,14 +337,13 @@ class DirectKafkaStreamSuite
ssc, kafkaParams, Set(topic))
}

val allReceived =
new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
val allReceived = new ConcurrentLinkedQueue[(String, String)]

stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
assert(allReceived.size === totalSent,
"didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
"didn't get expected number of messages, messages:\n" + allReceived.asScala.mkString("\n"))

// Calculate all the record number collected in the StreamingListener.
assert(collector.numRecordsSubmitted.get() === totalSent)
Expand Down Expand Up @@ -389,17 +390,16 @@ class DirectKafkaStreamSuite
}
}

val collectedData =
new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]]
val collectedData = new ConcurrentLinkedQueue[Array[String]]()

// Used for assertion failure messages.
def dataToString: String =
collectedData.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")

// This is to collect the raw data received from Kafka
kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
val data = rdd.map { _._2 }.collect()
collectedData += data
collectedData.add(data)
}

ssc.start()
Expand All @@ -415,7 +415,7 @@ class DirectKafkaStreamSuite
eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
// Assert that rate estimator values are used to determine maxMessagesPerPartition.
// Funky "-" in message makes the complete assertion message read better.
assert(collectedData.exists(_.size == expectedSize),
assert(collectedData.asScala.exists(_.size == expectedSize),
s" - No arrays of size $expectedSize for rate $rate found in $dataToString")
}
}
Expand All @@ -433,7 +433,7 @@ class DirectKafkaStreamSuite
}

object DirectKafkaStreamSuite {
val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
val collectedData = new ConcurrentLinkedQueue[String]()
@volatile var total = -1L

class InputInfoCollector extends StreamingListener {
Expand Down
12 changes: 12 additions & 0 deletions scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,18 @@ This file is divided into 3 sections:
]]></customMessage>
</check>

<check customId="mutablesynchronizedbuffer" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">mutable\.SynchronizedBuffer</parameter></parameters>
<customMessage><![CDATA[
Are you sure that you want to use mutable.SynchronizedBuffer? In most cases, you should use
java.util.concurrent.ConcurrentLinkedQueue instead.
If you must use mutable.SynchronizedBuffer, wrap the code block with
// scalastyle:off mutablesynchronizedbuffer
mutable.SynchronizedBuffer[...]
// scalastyle:on mutablesynchronizedbuffer
]]></customMessage>
</check>

<check customId="classforname" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">Class\.forName</parameter></parameters>
<customMessage><![CDATA[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.spark.streaming.receiver

import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -83,7 +85,7 @@ private[streaming] class ReceiverSupervisorImpl(
cleanupOldBlocks(threshTime)
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
registeredBlockGenerators.foreach { bg =>
registeredBlockGenerators.asScala.foreach { bg =>
bg.updateRate(eps)
}
}
Expand All @@ -92,8 +94,7 @@ private[streaming] class ReceiverSupervisorImpl(
/** Unique block ids if one wants to add blocks directly */
private val newBlockId = new AtomicLong(System.currentTimeMillis())

private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]
with mutable.SynchronizedBuffer[BlockGenerator]
private val registeredBlockGenerators = new ConcurrentLinkedQueue[BlockGenerator]

/** Divides received data records into data blocks for pushing in BlockManager. */
private val defaultBlockGeneratorListener = new BlockGeneratorListener {
Expand Down Expand Up @@ -170,11 +171,11 @@ private[streaming] class ReceiverSupervisorImpl(
}

override protected def onStart() {
registeredBlockGenerators.foreach { _.start() }
registeredBlockGenerators.asScala.foreach { _.start() }
}

override protected def onStop(message: String, error: Option[Throwable]) {
registeredBlockGenerators.foreach { _.stop() }
registeredBlockGenerators.asScala.foreach { _.stop() }
env.rpcEnv.stop(endpoint)
}

Expand All @@ -194,10 +195,11 @@ private[streaming] class ReceiverSupervisorImpl(
override def createBlockGenerator(
blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
// Cleanup BlockGenerators that have already been stopped
registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }
val stoppedGenerators = registeredBlockGenerators.asScala.filter{ _.isStopped() }
stoppedGenerators.foreach(registeredBlockGenerators.remove(_))

val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
registeredBlockGenerators += newBlockGenerator
registeredBlockGenerators.add(newBlockGenerator)
newBlockGenerator
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.streaming

import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.language.existentials
Expand Down Expand Up @@ -645,8 +647,8 @@ class BasicOperationsSuite extends TestSuiteBase {
val networkStream =
ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
val mappedStream = networkStream.map(_ + ".").persist()
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(mappedStream, outputBuffer)
val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
val outputStream = new TestOutputStream(mappedStream, outputQueue)

outputStream.register()
ssc.start()
Expand Down Expand Up @@ -685,7 +687,7 @@ class BasicOperationsSuite extends TestSuiteBase {
testServer.stop()

// verify data has been received
assert(outputBuffer.size > 0)
assert(!outputQueue.isEmpty)
assert(blockRdds.size > 0)
assert(persistentRddIds.size > 0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.streaming

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream}
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.reflect.ClassTag

Expand Down Expand Up @@ -105,7 +107,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
val operatedStream = operation(inputStream)
operatedStream.print()
val outputStream = new TestOutputStreamWithPartitions(operatedStream,
new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]])
new ConcurrentLinkedQueue[Seq[Seq[V]]])
outputStream.register()
ssc.checkpoint(checkpointDir)

Expand Down Expand Up @@ -166,7 +168,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
// are written to make sure that both of them have been written.
assert(checkpointFilesOfLatestTime.size === 2)
}
outputStream.output.map(_.flatten)
outputStream.output.asScala.map(_.flatten).toSeq

} finally {
ssc.stop(stopSparkContext = stopSparkContext)
Expand Down Expand Up @@ -591,7 +593,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
// Set up the streaming context and input streams
val batchDuration = Seconds(2) // Due to 1-second resolution of setLastModified() on some OS's.
val testDir = Utils.createTempDir()
val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
val outputBuffer = new ConcurrentLinkedQueue[Seq[Int]]

/**
* Writes a file named `i` (which contains the number `i`) to the test directory and sets its
Expand Down Expand Up @@ -671,7 +673,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
ssc.stop()
// Check that we shut down while the third batch was being processed
assert(batchCounter.getNumCompletedBatches === 2)
assert(outputStream.output.flatten === Seq(1, 3))
assert(outputStream.output.asScala.toSeq.flatten === Seq(1, 3))
}

// The original StreamingContext has now been stopped.
Expand Down Expand Up @@ -721,7 +723,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1)
}
}
logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]"))
logInfo("Output after restart = " + outputStream.output.asScala.mkString("[", ", ", "]"))
assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()

Expand All @@ -730,11 +732,11 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
assert(recordedFiles(ssc) === (1 to 9))

// Append the new output to the old buffer
outputBuffer ++= outputStream.output
outputBuffer.addAll(outputStream.output)

// Verify whether all the elements received are as expected
val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
assert(outputBuffer.flatten.toSet === expectedOutput.toSet)
assert(outputBuffer.asScala.flatten.toSet === expectedOutput.toSet)
}
} finally {
Utils.deleteRecursively(testDir)
Expand Down Expand Up @@ -894,7 +896,8 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
* Advances the manual clock on the streaming scheduler by given number of batches.
* It also waits for the expected amount of time for each batch.
*/
def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] =
def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long):
Iterable[Seq[V]] =
{
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.getTimeMillis())
Expand All @@ -908,7 +911,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
val outputStream = ssc.graph.getOutputStreams().filter { dstream =>
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
}.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
outputStream.output.map(_.flatten)
outputStream.output.asScala.map(_.flatten)
}
}

Expand Down
Loading