From a89663411e568f265103f0b695168d4db68a2b36 Mon Sep 17 00:00:00 2001 From: bjyfhanfei Date: Mon, 4 Sep 2017 17:00:25 +0800 Subject: [PATCH 1/2] add partition subconcurrency --- .../spark/streaming/kafka/KafkaRDD.scala | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index 5ea52b6ad36a0..da235caf15ed2 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -57,10 +57,29 @@ class KafkaRDD[ messageHandler: MessageAndMetadata[K, V] => R ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { override def getPartitions: Array[Partition] = { - offsetRanges.zipWithIndex.map { case (o, i) => - val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) - new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) - }.toArray + val subconcurrency = if (kafkaParams.contains("topic.partition.subconcurrency")) + kafkaParams.get("topic.partition.subconcurrency").asInstanceOf[String].toInt + else 1 + val numPartitions = offsetRanges.length + val kafkaRDDPartitionArray = new Array[Partition](subconcurrency * numPartitions) + for (i <- 0 until numPartitions) { + val offsetRange = offsetRanges(i) + val (host, port) = leaders(TopicAndPartition(offsetRange.topic, offsetRange.partition)) + val step = (offsetRange.untilOffset - offsetRange.fromOffset) / subconcurrency + + var from = -1L + var until = -1L + for (j <- 0 until subconcurrency) { + from = offsetRange.fromOffset + j * step + until = offsetRange.fromOffset + (j + 1) * step - 1 + if (j == subconcurrency) { + until = offsetRange.untilOffset + } + kafkaRDDPartitionArray(i * subconcurrency + j) = new KafkaRDDPartition(i, + offsetRange.topic, offsetRange.partition, from, until, host, port) + } + } + kafkaRDDPartitionArray } override def count(): Long = offsetRanges.map(_.count).sum From d1132195d6b2087be4f18ad25614836c46512fe7 Mon Sep 17 00:00:00 2001 From: bjyfhanfei Date: Tue, 19 Sep 2017 14:12:29 +0800 Subject: [PATCH 2/2] add topic.partition.subconcurrency --- .../spark/streaming/kafka/KafkaRDD.scala | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index da235caf15ed2..9e896a95fbc70 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -57,29 +57,35 @@ class KafkaRDD[ messageHandler: MessageAndMetadata[K, V] => R ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { override def getPartitions: Array[Partition] = { + val subconcurrency = if (kafkaParams.contains("topic.partition.subconcurrency")) - kafkaParams.get("topic.partition.subconcurrency").asInstanceOf[String].toInt - else 1 + kafkaParams.getOrElse("topic.partition.subconcurrency","1").toInt + else 1 val numPartitions = offsetRanges.length - val kafkaRDDPartitionArray = new Array[Partition](subconcurrency * numPartitions) + + val subOffsetRanges: Array[OffsetRange] = new Array[OffsetRange](subconcurrency * numPartitions) for (i <- 0 until numPartitions) { val offsetRange = offsetRanges(i) - val (host, port) = leaders(TopicAndPartition(offsetRange.topic, offsetRange.partition)) val step = (offsetRange.untilOffset - offsetRange.fromOffset) / subconcurrency var from = -1L var until = -1L + for (j <- 0 until subconcurrency) { from = offsetRange.fromOffset + j * step - until = offsetRange.fromOffset + (j + 1) * step - 1 + until = offsetRange.fromOffset + (j + 1) * step -1 if (j == subconcurrency) { until = offsetRange.untilOffset } - kafkaRDDPartitionArray(i * subconcurrency + j) = new KafkaRDDPartition(i, - offsetRange.topic, offsetRange.partition, from, until, host, port) + subOffsetRanges(i * subconcurrency + j) = OffsetRange.create(offsetRange.topic, offsetRange.partition, from, until) } } - kafkaRDDPartitionArray + + subOffsetRanges.zipWithIndex.map{ case (o, i) => + val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) + new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) + }.toArray + } override def count(): Long = offsetRanges.map(_.count).sum