Skip to content

Conversation

@arzt
Copy link

@arzt arzt commented Apr 26, 2017

What changes were proposed in this pull request?

Omit rounding of backpressure rate. Effects:

  • no batch with large number of records is created when rate from PID estimator is one
  • the number of records per batch and partition is more fine-grained improving backpressure accuracy

How was this patch tested?

This was tested by running:

  • mvn test -pl external/kafka-0-8
  • mvn test -pl external/kafka-0-10
  • a streaming application which was suffering from the issue

@JasonMWhite

The contribution is my original work and I license the work to the project under the project’s open source license

@JasonMWhite
Copy link
Contributor

Code looks sound. Could you add or modify a test to illustrate/verify?

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CC @koeninger
The thing is, it seems like rates are intentionally not floating point here, but I don't know the history of it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Given your change, I think double becomes more reasonable than float.

@koeninger
Copy link
Contributor

How do you read 0.1 of a kafka message for a given partition of a given batch?

Ultimately the floor for a rate limit, assuming one is set, needs to be 1 message per partition per batch, not a fraction, which is why it's a long.

If you want to delay that conversion by keeping it as a double as long as possible, that makes sense, but the lines like

(secsPerBatch * limit).toLong

probably need attention too.

@arzt
Copy link
Author

arzt commented Apr 27, 2017

Thanks for your valuable feedback. I added tests as suggested by @JasonMWhite and used toDouble. @koeninger the estimated rate is per second summed over all partitions, is it? The batch time usually is longer. So even values less than 1 but greater than 0 for backpressureRate can make sense for one partition. The casting to long is only needed when the absolute number of messages is computed, but even this number can be zero for some partitions, e.g. when there is no lag. I hope I am not confused here. There is also a test covering this.

@arzt
Copy link
Author

arzt commented Apr 27, 2017

To run tests or debug using IntelliJ:
mvn test -DforkMode=never -pl external/kafka-0-8 "-Dsuites=org.apache.spark.streaming.kafka.DirectKafkaStreamSuite maxMessagesPerPartition"

@koeninger
Copy link
Contributor

@arzt It's entirely possible to have batch times less than a second, and I'm not sure I agree that the absolute number of messages allowable for a partition should ever be zero.

So to put this another way, right now effectiveRateLimitPerPartition is a Map[TopicPartition, Long], which matches the return value of the function maxMessagesPerPartition.

You're wanting to change effectiveRateLimitPerPartition to a Map[TopicPartition, Double], which is probably a good idea, and should fix the bug around treating a very small rate limit as no limit.

But it still needs to be converted to Map[TopicPartition, Long] before returning. Calling .toLong is probably not the right thing to do there, because 0.99 will get truncated to 0.

I think one message per partition per batch is the minimum reasonable rate limit, otherwise particular partitions may not make progress. The relative lag calculation might take care of that in future batches, but it still seems questionable, even if it's a corner case.

@arzt
Copy link
Author

arzt commented Apr 27, 2017

@koeninger I agree that assuming a long batch size is wrong, not sure whether it even matters.
But what if for one partition there is no lack in the current batch? Then fetching 1 message for this partition from kafka, is you suggest, would fail. So here zero makes sense in my eyes. This is also the old behaviour if rate > 1 and lag == 0 here.
Further, I think that truncating 0.99 to 0 messages per partition is also the right thing to do, as one cannot be sure that there is one message available if (secsPerBatch * limit) < 1.0. And as you say, in a future batch it is very like to become greater than 1.0.
Do you agree?

@koeninger
Copy link
Contributor

koeninger commented Apr 27, 2017 via email

@JasonMWhite
Copy link
Contributor

JasonMWhite commented Apr 27, 2017

I think @koeninger's suggestion is valid. effectiveRateLimitPerPartition is the upper bound on the number of messages per partition per second, and maxMessagesPerPartition sets an upper bound on the number of messages to be retrieved per partition per batch window.

Making effectiveRateLimitPerPartition a float will allow it to handle properly rates of < 1/partition/s, so this is definitely a good idea. maxMessagesPerPartition must still be an integer, as you can't retrieve partial messages. All agreed there.

Setting maxMessagesPerPartition to have a minimum of 1 message per partition per batch window is a good safe value to allow progress in all cases. If there isn't 1 message to retrieve, clamp will prevent it from attempting to retrieve an invalid message.

@arzt
Copy link
Author

arzt commented Apr 27, 2017

I changed the max messages per partition to be at least 1. Agreed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual result should be deterministic, why not check the correct value instead of just not None ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After omitting the case of zero messages per topic, one of the tests is redundant. I removed it from each DirectKafkaStreamSuite.

@JasonMWhite
Copy link
Contributor

Tests have some fairly repetitive code, but not sure if that's a problem or not. Looks good to me.

@koeninger
Copy link
Contributor

LGTM pending jason's comments on tests

@felixcheung
Copy link
Member

Jenkins, ok to test

@SparkQA
Copy link

SparkQA commented Apr 28, 2017

Test build #76256 has finished for PR 17774 at commit d4a7867.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 28, 2017

Test build #76263 has finished for PR 17774 at commit c98b9a4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@arzt
Copy link
Author

arzt commented May 2, 2017

Sorry for being inactive. All good with this?

@JasonMWhite
Copy link
Contributor

LGTM

@arzt
Copy link
Author

arzt commented May 9, 2017

@felixcheung will this be merged?

@felixcheung
Copy link
Member

@brkyvz @zsxwing

@arzt
Copy link
Author

arzt commented May 29, 2017

It's been a while. What can I do to draw some attention to this request? Is this issue not relevant enough? Thanks for reconsideration @felixcheung @brkyvz @zsxwing

@arzt arzt force-pushed the kafka-back-pressure branch from c98b9a4 to 16b9aaf Compare June 26, 2017 07:44
@SparkQA
Copy link

SparkQA commented Jun 26, 2017

Test build #78618 has finished for PR 17774 at commit 16b9aaf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

I think @tdas @zsxwing should comment if this is the right direction...

@arzt arzt force-pushed the kafka-back-pressure branch from 16b9aaf to 29fe32c Compare August 3, 2017 07:19
@arzt arzt force-pushed the kafka-back-pressure branch from 29fe32c to fddf5e5 Compare November 2, 2017 15:29
@arzt arzt force-pushed the kafka-back-pressure branch from fddf5e5 to 1acbe4c Compare November 6, 2017 12:57
@pptaszynski
Copy link

I am looking forward this one to be merged. we are suffering from the issue it resolved quite badly. It effectively makes the back-pressure not working for us at all.

@koeninger
Copy link
Contributor

LGTM
@tdas @zsxwing absent any objections from you in the next couple of days, I'll merge this

@felixcheung
Copy link
Member

Jenkins, ok to test

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending tests

@SparkQA
Copy link

SparkQA commented Mar 14, 2018

Test build #88226 has finished for PR 17774 at commit 1acbe4c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in dffeac3 Mar 16, 2018
@koeninger
Copy link
Contributor

merged to master
Thanks @arzt !

mstewart141 pushed a commit to mstewart141/spark that referenced this pull request Mar 24, 2018
… with large number of records

## What changes were proposed in this pull request?

Omit rounding of backpressure rate. Effects:
- no batch with large number of records is created when rate from PID estimator is one
- the number of records per batch and partition is more fine-grained improving backpressure accuracy

## How was this patch tested?

This was tested by running:
- `mvn test -pl external/kafka-0-8`
- `mvn test -pl external/kafka-0-10`
- a streaming application which was suffering from the issue

JasonMWhite

The contribution is my original work and I license the work to the project under the project’s open source license

Author: Sebastian Arzt <[email protected]>

Closes apache#17774 from arzt/kafka-back-pressure.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants