-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18620][Streaming][Kinesis] Flatten input rates in timeline for streaming + kinesis #16114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
👍 just had a play with it, it solves my original issue. |
|
Test build #69546 has finished for PR 16114 at commit
|
| if (batch.size() <= maxRecords) { | ||
| addRecords(batch, checkpointer) | ||
| } else { | ||
| val numIter = batch.size / maxRecords |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this clause a bit simpler as ...
for (start <- 0 until batch.size by maxRecords) {
addRecords(batch.sublist(start, math.min(start + maxRecords, batch.size)), checkpointer)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I'll fix
|
|
||
| /** Return the current rate limit defined in [[BlockGenerator]]. */ | ||
| private[kinesis] def getCurrentLimit: Int = { | ||
| assert(blockGenerator != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty trivial but do we use runtime assertions in general in the project? the next line fails already when it's null whether assertions are on or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just added this assertion along with the other parts such as assert(kinesisCheckpointer != null, "Kinesis Checkpointer not initialized!") because both're initialized in onStart. But, I have no strong opnion on this and it's okay to remove this entry to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be okay to keep it if we add a useful error in case this assertion doesn't hold, e.g.
assert(blockGenerator != null, "Expected blockGenerator to be set for the receiver before the processor received records")
or something like that
|
Test build #69558 has finished for PR 16114 at commit
|
| batch: List[Record], checkpointer: IRecordProcessorCheckpointer): Unit = { | ||
| val maxRecords = receiver.getCurrentLimit | ||
| if (batch.size() <= maxRecords) { | ||
| addRecords(batch, checkpointer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the for loop even takes care of this case, but no big deal either way. It seems like a reasonable change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, I see. I'll fix, thanks!
|
@srowen Do u know qualified maintainers on this component? |
|
Test build #69620 has finished for PR 16114 at commit
|
| private def processRecordsWithLimit( | ||
| batch: List[Record], checkpointer: IRecordProcessorCheckpointer): Unit = { | ||
| val maxRecords = receiver.getCurrentLimit | ||
| for (start <- 0 until batch.size by maxRecords) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, it just occurred to me that you would have a problem here if batch.size and maxRecords were both over Int.MaxValue / 2, and maxRecords were a bit smaller than batch.size. The addition below overflows.
It seems like a corner case but I note above you already defensively capped the maxRecords at Int.MaxValue so maybe it's less unlikely than it sounds.
You can fix it by letting the addition and min comparison take place over longs and then convert back to int.
Alternatively I think this is even simpler in Scala, though I imagine there's some extra overhead here:
batch.grouped(maxRecords).foreach(batch => addRecords(batch, checkpointer))
I don't know of a good reviewer for this component but I think I'm comfortable merging a straightforward change like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, since each kinesis shard has strict read limits of throughput (http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html), batch.size hardly exceeds Int.MaxValue / 2. But, since I like your idea in terms of code clearness, I fixed.
| private def addRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer): Unit = { | ||
| receiver.addRecords(shardId, batch) | ||
| logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId") | ||
| receiver.setCheckpointer(shardId, checkpointer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW is this supposed to be called on every batch or once at the end? I don't know how it works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, you're right and this code overwrites checkpointer every the callback function called (maybe, every 1 sec.). I'm not sure what an original author thinks about though, it seems this is waste of codes. But, I also not sure that it is worth fixing this and this fix is out of scope in this jira. If necessary, I'm pleased to fix in follow-up activities.
|
Test build #69624 has finished for PR 16114 at commit
|
|
Jenkins, retest this please. |
|
Test build #69626 has finished for PR 16114 at commit
|
|
Test build #69627 has finished for PR 16114 at commit
|
|
@brkyvz maybe you can give this a look to make sure it makes sense? especially the bit about the checkpointer. |
| // in `KinesisClientLibConfiguration`. For example, if we set 10 to the number of max | ||
| // records in a worker and a producer aggregates two records into one message, the worker | ||
| // possibly 20 records every callback function called. | ||
| batch.asScala.grouped(receiver.getCurrentLimit).foreach { batch => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, one last comment -- batch is used for the overall data set and each subset. They should be named differently for clarity.
It's also my fault for not realizing the collections here were Java not Scala, and you have to convert to use the nice Scala idiom. I think it's OK as it's just going to wrap and not copy the class, but it does bear being careful about performance here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, I also think, when maxRecords is small and batch is large, many iterations cause a little overheads. So, I restored the code to the previous java-style one.
| batch.asScala.grouped(receiver.getCurrentLimit).foreach { batch => | ||
| receiver.addRecords(shardId, batch.asJava) | ||
| logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId") | ||
| receiver.setCheckpointer(shardId, checkpointer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be outside, after the foreach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's what I suspected at #16114 (comment) -- thanks for confirming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, I'll fix
|
Test build #69762 has finished for PR 16114 at commit
|
|
Test build #69765 has finished for PR 16114 at commit
|
| val miniBatch = batch.subList(start, math.min(start + maxRecords, batch.size)) | ||
| receiver.addRecords(shardId, miniBatch) | ||
| } | ||
| logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would leave this comment inside the for loop, because IIRC addRecords will be a blocking call where it needs to be written to the WAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay
|
I will need to take a deeper look at this to remember the code. I'm not sure but there may be some issues with the checkpointing happening to the WriteAheadLog and DynamoDB. Going to come back to this in a couple hours. |
|
Test build #69833 has finished for PR 16114 at commit
|
|
I've taken a look at the code. This change seems safe to me. Even if we process extra data, but fail to checkpoint to Kinesis, Spark streaming will re-process the exact same batch on a restart providing at least once semantics but with a stronger guarantee (that the data will be processed with exactly the same batching). LGTM! Thanks @maropu |
|
Merged to master |
… streaming + kinesis ## What changes were proposed in this pull request? This pr is to make input rates in timeline more flat for spark streaming + kinesis. Since kinesis workers fetch records and push them into block generators in bulk, timeline in web UI has many spikes when `maxRates` applied (See a Figure.1 below). This fix splits fetched input records into multiple `adRecords` calls. Figure.1 Apply `maxRates=500` in vanilla Spark <img width="1084" alt="apply_limit in_vanilla_spark" src="https://cloud.githubusercontent.com/assets/692303/20823861/4602f300-b89b-11e6-95f3-164a37061305.png"> Figure.2 Apply `maxRates=500` in Spark with my patch <img width="1056" alt="apply_limit in_spark_with_my_patch" src="https://cloud.githubusercontent.com/assets/692303/20823882/6c46352c-b89b-11e6-81ab-afd8abfe0cfe.png"> ## How was this patch tested? Add tests to check to split input records into multiple `addRecords` calls. Author: Takeshi YAMAMURO <[email protected]> Closes apache#16114 from maropu/SPARK-18620.
… streaming + kinesis ## What changes were proposed in this pull request? This pr is to make input rates in timeline more flat for spark streaming + kinesis. Since kinesis workers fetch records and push them into block generators in bulk, timeline in web UI has many spikes when `maxRates` applied (See a Figure.1 below). This fix splits fetched input records into multiple `adRecords` calls. Figure.1 Apply `maxRates=500` in vanilla Spark <img width="1084" alt="apply_limit in_vanilla_spark" src="https://cloud.githubusercontent.com/assets/692303/20823861/4602f300-b89b-11e6-95f3-164a37061305.png"> Figure.2 Apply `maxRates=500` in Spark with my patch <img width="1056" alt="apply_limit in_spark_with_my_patch" src="https://cloud.githubusercontent.com/assets/692303/20823882/6c46352c-b89b-11e6-81ab-afd8abfe0cfe.png"> ## How was this patch tested? Add tests to check to split input records into multiple `addRecords` calls. Author: Takeshi YAMAMURO <[email protected]> Closes apache#16114 from maropu/SPARK-18620.
What changes were proposed in this pull request?
This pr is to make input rates in timeline more flat for spark streaming + kinesis.
Since kinesis workers fetch records and push them into block generators in bulk, timeline in web UI has many spikes when
maxRatesapplied (See a Figure.1 below). This fix splits fetched input records into multipleadRecordscalls.Figure.1 Apply

maxRates=500in vanilla SparkFigure.2 Apply

maxRates=500in Spark with my patchHow was this patch tested?
Add tests to check to split input records into multiple
addRecordscalls.