Skip to content

Commit 89068ad

Browse files
committed
[SPARK-7927] whitespace fixes for streaming.
1 parent 3e312a5 commit 89068ad

File tree

14 files changed

+28
-27
lines changed

14 files changed

+28
-27
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ class StreamingContext private[streaming] (
461461
val conf = sc_.hadoopConfiguration
462462
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
463463
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
464-
directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf)
464+
directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf)
465465
val data = br.map { case (k, v) =>
466466
val bytes = v.getBytes
467467
require(bytes.length == recordLength, "Byte array does not have correct length. " +

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
227227
* @param numPartitions Number of partitions of each RDD in the new DStream.
228228
*/
229229
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
230-
:JavaPairDStream[K, JIterable[V]] = {
230+
: JavaPairDStream[K, JIterable[V]] = {
231231
dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions)
232232
.mapValues(asJavaIterable _)
233233
}
@@ -247,7 +247,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
247247
windowDuration: Duration,
248248
slideDuration: Duration,
249249
partitioner: Partitioner
250-
):JavaPairDStream[K, JIterable[V]] = {
250+
): JavaPairDStream[K, JIterable[V]] = {
251251
dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner)
252252
.mapValues(asJavaIterable _)
253253
}
@@ -262,7 +262,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
262262
* batching interval
263263
*/
264264
def reduceByKeyAndWindow(reduceFunc: JFunction2[V, V, V], windowDuration: Duration)
265-
:JavaPairDStream[K, V] = {
265+
: JavaPairDStream[K, V] = {
266266
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration)
267267
}
268268

@@ -281,7 +281,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
281281
reduceFunc: JFunction2[V, V, V],
282282
windowDuration: Duration,
283283
slideDuration: Duration
284-
):JavaPairDStream[K, V] = {
284+
): JavaPairDStream[K, V] = {
285285
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
286286
}
287287

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ abstract class DStream[T: ClassTag] (
659659
// DStreams can't be serialized with closures, we can't proactively check
660660
// it for serializability and so we pass the optional false to SparkContext.clean
661661
val cleanedF = context.sparkContext.clean(transformFunc, false)
662-
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
662+
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
663663
assert(rdds.length == 1)
664664
cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
665665
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
6969
* processing semantics are undefined.
7070
*/
7171
private[streaming]
72-
class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
72+
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
7373
@transient ssc_ : StreamingContext,
7474
directory: String,
7575
filter: Path => Boolean = FileInputDStream.defaultFilter,
@@ -251,7 +251,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
251251

252252
/** Generate one RDD from an array of files */
253253
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
254-
val fileRDDs = files.map(file =>{
254+
val fileRDDs = files.map { file =>
255255
val rdd = serializableConfOpt.map(_.value) match {
256256
case Some(config) => context.sparkContext.newAPIHadoopFile(
257257
file,
@@ -267,7 +267,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
267267
"Refer to the streaming programming guide for more details.")
268268
}
269269
rdd
270-
})
270+
}
271271
new UnionRDD(context.sparkContext, fileRDDs)
272272
}
273273

@@ -294,7 +294,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
294294
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
295295
logDebug(this.getClass().getSimpleName + ".readObject used")
296296
ois.defaultReadObject()
297-
generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
297+
generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
298298
batchTimeToSelectedFiles =
299299
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
300300
recentlySelectedFiles = new mutable.HashSet[String]()

streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.StreamingContext.rddToFileName
3232
/**
3333
* Extra functions available on DStream of (key, value) pairs through an implicit conversion.
3434
*/
35-
class PairDStreamFunctions[K, V](self: DStream[(K,V)])
35+
class PairDStreamFunctions[K, V](self: DStream[(K, V)])
3636
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])
3737
extends Serializable
3838
{

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
3838
_windowDuration: Duration,
3939
_slideDuration: Duration,
4040
partitioner: Partitioner
41-
) extends DStream[(K,V)](parent.ssc) {
41+
) extends DStream[(K, V)](parent.ssc) {
4242

4343
require(_windowDuration.isMultipleOf(parent.slideDuration),
4444
"The window duration of ReducedWindowedDStream (" + _windowDuration + ") " +
@@ -58,7 +58,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
5858
super.persist(StorageLevel.MEMORY_ONLY_SER)
5959
reducedStream.persist(StorageLevel.MEMORY_ONLY_SER)
6060

61-
def windowDuration: Duration = _windowDuration
61+
def windowDuration: Duration = _windowDuration
6262

6363
override def dependencies: List[DStream[_]] = List(reducedStream)
6464

@@ -68,7 +68,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
6868

6969
override def parentRememberDuration: Duration = rememberDuration + windowDuration
7070

71-
override def persist(storageLevel: StorageLevel): DStream[(K,V)] = {
71+
override def persist(storageLevel: StorageLevel): DStream[(K, V)] = {
7272
super.persist(storageLevel)
7373
reducedStream.persist(storageLevel)
7474
this
@@ -118,7 +118,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
118118

119119
// Get the RDD of the reduced value of the previous window
120120
val previousWindowRDD =
121-
getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
121+
getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K, V)]()))
122122

123123
// Make the list of RDDs that needs to cogrouped together for reducing their reduced values
124124
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs

streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,19 @@ import scala.reflect.ClassTag
2525

2626
private[streaming]
2727
class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
28-
parent: DStream[(K,V)],
28+
parent: DStream[(K, V)],
2929
createCombiner: V => C,
3030
mergeValue: (C, V) => C,
3131
mergeCombiner: (C, C) => C,
3232
partitioner: Partitioner,
3333
mapSideCombine: Boolean = true
34-
) extends DStream[(K,C)] (parent.ssc) {
34+
) extends DStream[(K, C)] (parent.ssc) {
3535

3636
override def dependencies: List[DStream[_]] = List(parent)
3737

3838
override def slideDuration: Duration = parent.slideDuration
3939

40-
override def compute(validTime: Time): Option[RDD[(K,C)]] = {
40+
override def compute(validTime: Time): Option[RDD[(K, C)]] = {
4141
parent.getOrCompute(validTime) match {
4242
case Some(rdd) => Some(rdd.combineByKey[C](
4343
createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))

streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
5151
val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
5252
val i = iterator.map(t => {
5353
val itr = t._2._2.iterator
54-
val headOption = if(itr.hasNext) Some(itr.next) else None
54+
val headOption = if (itr.hasNext) Some(itr.next()) else None
5555
(t._1, t._2._1.toSeq, headOption)
5656
})
5757
updateFuncLocal(i)

streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class WindowedDStream[T: ClassTag](
4444
// Persist parent level by default, as those RDDs are going to be obviously reused.
4545
parent.persist(StorageLevel.MEMORY_ONLY_SER)
4646

47-
def windowDuration: Duration = _windowDuration
47+
def windowDuration: Duration = _windowDuration
4848

4949
override def dependencies: List[DStream[_]] = List(parent)
5050

@@ -68,7 +68,7 @@ class WindowedDStream[T: ClassTag](
6868
new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
6969
} else {
7070
logDebug("Using normal union for windowing at " + validTime)
71-
new UnionRDD(ssc.sc,rddsInWindow)
71+
new UnionRDD(ssc.sc, rddsInWindow)
7272
}
7373
Some(windowRDD)
7474
}

streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ private[streaming] class BlockGenerator(
164164
private def keepPushingBlocks() {
165165
logInfo("Started block pushing thread")
166166
try {
167-
while(!stopped) {
167+
while (!stopped) {
168168
Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
169169
case Some(block) => pushBlock(block)
170170
case None =>

0 commit comments

Comments
 (0)