Skip to content

Conversation

@jose-torres
Copy link
Contributor

What changes were proposed in this pull request?

Add a specific stop method for ContinuousExecution. The previous StreamExecution.stop() method had a race condition as applied to continuous processing: if the cancellation was round-tripped to the driver too quickly, the generic SparkException it caused would be reported as the query death cause. We earlier decided that SparkException should not be added to the StreamExecution.isInterruptionException() whitelist, so we need to ensure this never happens instead.

How was this patch tested?

Existing tests. I could consistently reproduce the previous flakiness by putting Thread.sleep(1000) between the first job cancellation and thread interruption in StreamExecution.stop().

@jose-torres
Copy link
Contributor Author

@zsxwing @dongjoon-hyun

@SparkQA
Copy link

SparkQA commented May 21, 2018

Test build #90915 has finished for PR 21384 at commit 61f691d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// The query execution thread will clean itself up in the finally clause of runContinuous.
// We just need to interrupt the long running job.
queryExecutionThread.interrupt()
queryExecutionThread.join()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pinging me, @jose-torres .
So, technically, two sparkSession.sparkContext.cancelJobGroup(runId.toString) are removed in continuousExecution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. The remaining one in the finally clause of runContinuous() is sufficient, because jobs are only started within that method.

@tdas
Copy link
Contributor

tdas commented May 24, 2018

LGTM. Merging to master.

@asfgit asfgit closed this in f457933 May 24, 2018
asfgit pushed a commit that referenced this pull request Feb 13, 2019
## What changes were proposed in this pull request?

Add a specific stop method for ContinuousExecution. The previous StreamExecution.stop() method had a race condition as applied to continuous processing: if the cancellation was round-tripped to the driver too quickly, the generic SparkException it caused would be reported as the query death cause. We earlier decided that SparkException should not be added to the StreamExecution.isInterruptionException() whitelist, so we need to ensure this never happens instead.

## How was this patch tested?

Existing tests. I could consistently reproduce the previous flakiness by putting Thread.sleep(1000) between the first job cancellation and thread interruption in StreamExecution.stop().

Author: Jose Torres <[email protected]>

Closes #21384 from jose-torres/fixKafka.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants