Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ class StreamExecution(
finishTrigger(dataAvailable)
if (dataAvailable) {
// Update committed offsets.
committedOffsets ++= availableOffsets
batchCommitLog.add(currentBatchId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is existing, but do we not write to the commit log if there is no new data in the next batch?

Copy link
Member Author

@zsxwing zsxwing Apr 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because we only update currentBatchId when data is available. cc @tdas

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, when there is a batch with data, it finishes executing and then increments the batch id for the next batch. But if next batch has no data, then the batchid not further increment, and gets eventually used in a future batch with data.

So i think it is correct to update the commit log as soon this batch is done, and before the batch id is incremented for next batch. This change maintains that invariant as far as i think.

committedOffsets ++= availableOffsets
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
Expand Down