Skip to content

Conversation

@tdas
Copy link
Contributor

@tdas tdas commented Mar 1, 2018

What changes were proposed in this pull request?

Currently, when the Kafka source reads from Kafka, it generates as many tasks as the number of partitions in the topic(s) to be read. In some case, it may be beneficial to read the data with greater parallelism, that is, with more number partitions/tasks. That means, offset ranges must be divided up into smaller ranges such the number of records in partition ~= total records in batch / desired partitions. This would also balance out any data skews between topic-partitions.

In this patch, I have added a new option called minPartitions, which allows the user to specify the desired level of parallelism.

How was this patch tested?

New tests in KafkaMicroBatchV2SourceSuite.

import org.apache.spark.sql.sources.v2.DataSourceOptions


private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs.

@tdas
Copy link
Contributor Author

tdas commented Mar 1, 2018

@zsxwing @jose-torres

@SparkQA
Copy link

SparkQA commented Mar 1, 2018

Test build #87805 has finished for PR 20698 at commit ebb9b51.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2018

Test build #87806 has finished for PR 20698 at commit 5f066a0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
// (KAFKA-1894).
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
require(Thread.currentThread().isInstanceOf[UninterruptibleThread])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between assert and require here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not much really. assert throws Assertions and require throws IllegalArgumentException. Just a matter of preference. I can revert this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertions can be turned off


import KafkaOffsetRangeCalculator._
/**
* Calculate the offset ranges that we are going to process this batch. If `numPartitions`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: /s/numPartitions/minPartitions


private[kafka010] object KafkaOffsetRangeCalculator {

private val DEFAULT_MIN_PARTITIONS = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super-nit: this isn't really a default, 0 isn't a valid number of min partitions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we shouldn't be using default values like this. Rather I want to use Options. However, DataSourceOptions does not give me a way to get back an Option[Int], thus forces me to specify some default value. Let me see what I can do about it. I dont want to reason about 0 in the subsequent conditions and math calculations either.

// If minPartitions not set or there are enough partitions to satisfy minPartitions
if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) {
// Assign preferred executor locations to each range such that the same topic-partition is
// always read from the same executor and the KafkaConsumer can be reused
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry that "always" is misleading here. It's not guaranteed that the same executor will run the partition or that the KafkaConsumer can be reused.

val tp = offsetRange.topicPartition
val size = offsetRange.untilOffset - offsetRange.fromOffset
// number of partitions to divvy up this topic partition to
val parts = math.max(math.round(size * 1.0 / totalSize * minPartitions), 1).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to understand why this number is being calculated as it is. I think it's correct, but a comment explaining why this is the right number to divvy would help.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, a comment about how this is calculating the weight of partitions to assign to this topic would help. In addition, the sum of parts after this calculation will be >= minPartitions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote this completely using the code used by from sparkContext.parallelize to make splits.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L123

@zsxwing
Copy link
Member

zsxwing commented Mar 1, 2018

cc @brkyvz

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! Left some clarification questions


private val consumer = {
if (!reuseKafkaConsumer) {
// If we can't reuse CachedKafkaConsumers, creating a new CachedKafkaConsumer. As here we
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We use 'assign' here, hence don't need to ...

}

override def close(): Unit = {
// Indicate that we're no longer using this consumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe remove this?

}

// If minPartitions not set or there are enough partitions to satisfy minPartitions
if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need the first check. offsetRanges.size should be greater than 0 right? Otherwise we wouldn't have called into this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewritten. I dont want to rely on this default value of 0, as @jose-torres expressed concern earlier. So i rewrote this to explicitly check whether minPartitions have been set or not.

fromOffsets: PartitionOffsetMap,
untilOffsets: PartitionOffsetMap,
executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = {
val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this check here before? What if there are new topic partitions? Are we missing those, because they may not exist in fromOffsets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fromOffsets here will contain the initial offsets of new partitions. See the how fromOffsets is set with startOffsets + newPartitionInitialOffsets.

val tp = offsetRange.topicPartition
val size = offsetRange.untilOffset - offsetRange.fromOffset
// number of partitions to divvy up this topic partition to
val parts = math.max(math.round(size * 1.0 / totalSize * minPartitions), 1).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, a comment about how this is calculating the weight of partitions to assign to this topic would help. In addition, the sum of parts after this calculation will be >= minPartitions

@SparkQA
Copy link

SparkQA commented Mar 2, 2018

Test build #87870 has finished for PR 20698 at commit 3eae3f1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jose-torres
Copy link
Contributor

LGTM

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple minor nits, and one additional test request (because I'm paranoid). otherwise LGTM

KafkaOffsetRange(
range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line

} else {

// Splits offset ranges with relatively large amount of data to smaller ones.
val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: map(_.size).sum

offsetRanges.flatMap { range =>
// Split the current range into subranges as close to the ideal range size
val rangeSize = range.untilOffset - range.fromOffset
val numSplitsInRange = math.round(rangeSize.toDouble / idealRangeSize).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: range.size, you may remove rangeSize above

private[kafka010] object KafkaOffsetRangeCalculator {

def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator = {
val optionalValue = Option(options.get("minPartitions").orElse(null)).map(_.toInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: .orNull instead of .orElse(null). Why don't you actually do:

options.get("minPartitions").map(_.toInt)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it returns java Optional and not scala Option.

fromOffset: Long,
untilOffset: Long,
preferredLoc: Option[String]) {
def size: Long = untilOffset - fromOffset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nite: maybe make this a lazy val so that it'll be calculated once

KafkaOffsetRange(tp1, 4, 5, None))) // location pref not set when minPartition is set
}

testWithMinPartitions("N skewed TopicPartitions to M offset ranges", 3) { calc =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also add a test:

fromOffsets = Map(tp1 -> 1),
untilOffsets = Map(tp1 -> 10)
minPartitions = 3

@SparkQA
Copy link

SparkQA commented Mar 2, 2018

Test build #87911 has finished for PR 20698 at commit 1e244e0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 3, 2018

Test build #87913 has finished for PR 20698 at commit 602ab36.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@brkyvz
Copy link
Contributor

brkyvz commented Mar 3, 2018

LGTM pending tests.

@tdas
Copy link
Contributor Author

tdas commented Mar 3, 2018

Thank you. Merging to master only as this is a new feature touching production code paths.

@asfgit asfgit closed this in 486f99e Mar 3, 2018
@danielyahn
Copy link

@tdas Is it tested whether this fix is backward compatible? I see that fix version is 2.4.0 in Jira. But it looks like your code change is all around spark-sql-kafka-0-10. I have a cluster in 2.3.2 and am interested in using this feature.

otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…elism than the number of topic-partitions

Currently, when the Kafka source reads from Kafka, it generates as many tasks as the number of partitions in the topic(s) to be read. In some case, it may be beneficial to read the data with greater parallelism, that is, with more number partitions/tasks. That means, offset ranges must be divided up into smaller ranges such the number of records in partition ~= total records in batch / desired partitions. This would also balance out any data skews between topic-partitions.

In this patch, I have added a new option called `minPartitions`, which allows the user to specify the desired level of parallelism.

New tests in KafkaMicroBatchV2SourceSuite.

Author: Tathagata Das <[email protected]>

Closes apache#20698 from tdas/SPARK-23541.

Ref: LIHADOOP-48531
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants