Skip to content

Commit 59e29f6

Browse files
committed
[SPARK-4964] settle on "Direct" as a naming convention for the new stream
1 parent 8c31855 commit 59e29f6

File tree

3 files changed

+9
-9
lines changed

3 files changed

+9
-9
lines changed
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ import org.apache.spark.streaming.dstream._
5353
* @param maxRetries maximum number of times in a row to retry getting leaders' offsets
5454
*/
5555
private[streaming]
56-
class DeterministicKafkaInputDStream[
56+
class DirectKafkaInputDStream[
5757
K: ClassTag,
5858
V: ClassTag,
5959
U <: Decoder[_]: ClassTag,
@@ -68,7 +68,7 @@ class DeterministicKafkaInputDStream[
6868
"spark.streaming.kafka.maxRetries", 1)
6969

7070
protected[streaming] override val checkpointData =
71-
new DeterministicKafkaInputDStreamCheckpointData
71+
new DirectKafkaInputDStreamCheckpointData
7272

7373
protected val kc = new KafkaCluster(kafkaParams)
7474

@@ -129,7 +129,7 @@ class DeterministicKafkaInputDStream[
129129
}
130130

131131
private[streaming]
132-
class DeterministicKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
132+
class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
133133
def batchForTime = data.asInstanceOf[mutable.HashMap[
134134
Time, Array[OffsetRange.OffsetRangeTuple]]]
135135

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ object KafkaUtils {
246246
* starting point of the stream
247247
*/
248248
@Experimental
249-
def createNewStream[
249+
def createDirectStream[
250250
K: ClassTag,
251251
V: ClassTag,
252252
U <: Decoder[_]: ClassTag,
@@ -257,7 +257,7 @@ object KafkaUtils {
257257
fromOffsets: Map[TopicAndPartition, Long],
258258
messageHandler: MessageAndMetadata[K, V] => R
259259
): InputDStream[R] = {
260-
new DeterministicKafkaInputDStream[K, V, U, T, R](
260+
new DirectKafkaInputDStream[K, V, U, T, R](
261261
ssc, kafkaParams, fromOffsets, messageHandler)
262262
}
263263

@@ -289,7 +289,7 @@ object KafkaUtils {
289289
* @param topics names of the topics to consume
290290
*/
291291
@Experimental
292-
def createNewStream[
292+
def createDirectStream[
293293
K: ClassTag,
294294
V: ClassTag,
295295
U <: Decoder[_]: ClassTag,
@@ -313,7 +313,7 @@ object KafkaUtils {
313313
val fromOffsets = leaderOffsets.map { case (tp, lo) =>
314314
(tp, lo.offset)
315315
}
316-
new DeterministicKafkaInputDStream[K, V, U, T, (K, V)](
316+
new DirectKafkaInputDStream[K, V, U, T, (K, V)](
317317
ssc, kafkaParams, fromOffsets, messageHandler)
318318
}).fold(
319319
errs => throw new SparkException(errs.mkString("\n")),

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaNewStreamSuite.scala renamed to external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.SparkConf
2929
import org.apache.spark.storage.StorageLevel
3030
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
3131

32-
class KafkaNewStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
32+
class KafkaDirectStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
3333
val sparkConf = new SparkConf()
3434
.setMaster("local[4]")
3535
.setAppName(this.getClass.getSimpleName)
@@ -63,7 +63,7 @@ class KafkaNewStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with
6363
createTopic(t)
6464
produceAndSendMessage(t, data)
6565
}
66-
val stream = KafkaUtils.createNewStream[String, String, StringDecoder, StringDecoder](
66+
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
6767
ssc, kafkaParams, topics)
6868
var total = 0L;
6969

0 commit comments

Comments
 (0)