Skip to content

Conversation

@jiasheng55
Copy link

What changes were proposed in this pull request?

The code changes in PR(#16542) make me very confusing:

private def handleJobCompletion(job: Job, completedTime: Long) { val jobSet = jobSets.get(job.time) jobSet.handleJobCompletion(job) job.setEndTime(completedTime) listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo)) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) } job.result match { case Failure(e) => reportError("Error running job " + job, e) case _ => if (jobSet.hasCompleted) { jobSets.remove(jobSet.time) jobGenerator.onBatchCompletion(jobSet.time) logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.processingDelay / 1000.0 )) } } }

If a Job failed and the JobSet containing it has completed, listenerBus will post a StreamingListenerBatchCompleted, while jobGenerator will not invoke onBatchCompletion. So the batch is completed or not ?

The key point is if a Job in a Batch failed, whether or not we consider the Batch as completed.

I think if someone register a listener on StreamingListenerBatchCompleted, he just wants to get notified only when the batch finishes with no error. So if a Job is failed, we should not remove it from its JobSet, thus the JobSet has not completed.

How was this patch tested?

existing tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

wangjiasheng added 2 commits November 27, 2017 16:28
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

cc @CodingCat, WDYT about this revert?

@viirya
Copy link
Member

viirya commented Nov 30, 2017

cc @zsxwing

@viirya
Copy link
Member

viirya commented Nov 30, 2017

Btw, I think this does not only revert the commit, seems it also proposes its fix. It is better to modify the title.

@CodingCat
Copy link
Contributor

did I miss anything? @Victor-Wong , you are describing what #16542 does in your description, but you are reverting it in your changes?

@jiasheng55 jiasheng55 changed the title Revert "[SPARK-18905][STREAMING] Fix the issue of removing a failed jobset from JobScheduler.jobSets" [SPARK][STREAMING] Invoke onBatchCompletion() only when all jobs in the JobSet are Success Dec 1, 2017
@jiasheng55
Copy link
Author

@viirya Sorry for the misleading title, I have changed it now.

@jiasheng55
Copy link
Author

@CodingCat Yes, this PR wants to solve the same issue in #16542, but I think this is a better way to solve it.
If a Job failed, I think we should not remove it from its JobSet, so jobSet.hasCompleted will return false. As a result, we will not receive a StreamingListenerBatchCompleted.
What I want to say is that if a Job is failed, we should consider the Batch as not completed.
I am not confident about my English, if I am not describing it clear, please let me know.

@CodingCat
Copy link
Contributor

What I want to say is that if a Job is failed, we should consider the Batch as not completed. isn't #16542 doing the same thing?

@jiasheng55
Copy link
Author

jiasheng55 commented Dec 1, 2017

@CodingCat please checkout the difference between the two PR.

if (jobSet.hasCompleted) { listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) }
As shown above, in #16542, if a Job failed, listenerBus still post a StreamingListenerBatchCompleted event, which I believe to be incorrect, because the Batch is not completed (a Job of it has failed).

@CodingCat
Copy link
Contributor

#16542 has guaranteed that the failed batch can be re-executed, and I didn’t check if reverting the change in #16542 plus your new change can guarantee the same thing...

Suppose it also guarantees, the remaining discussion becomes “what does complete mean in English?” which is not interesting to me to discuss

@CodingCat
Copy link
Contributor

One thing to note is that mute an event is a behavior change, if a user has introduced some customized listener to capture all completed batches and also extract failed job info, he/she will see a broken scenario with the change in this PR

@jiasheng55
Copy link
Author

jiasheng55 commented Dec 1, 2017

@CodingCat

One thing to note is that mute an event is a behavior change

I agree with that, so we should be careful about changing the current behavior. I will close the PR later.

the remaining discussion becomes “what does complete mean in English?” which is not interesting to me to discuss

Maybe the remaining discussion should be how to let the user know that he will get a StreamingListenerBatchCompleted event even if the batch failed.
What about adding some comments:

/** Called when processing of a batch of jobs has completed **(event if the batch failed)**. */ def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }

As I was using the StreamingListenerBatchCompleted to do some metadata checkpointing stuff, which should be done only when the batch succeeded.

@CodingCat
Copy link
Contributor

if you just worry about

As I was using the StreamingListenerBatchCompleted to do some metadata checkpointing stuff, which should be done only when the batch succeeded.

If this is your concern, you should handle whether complete is with a failure in your own listener, check: https://github.com/Azure/azure-event-hubs-spark/blob/fecc34de8a238049806d033d8a85d888cad75901/core/src/main/scala/org/apache/spark/streaming/eventhubs/checkpoint/ProgressTrackingListener.scala#L38

IMHO, Complete includes Failure&Success cases is more intuitive to the (scala) user since we are used to .OnComplete in Scala Future, etc.

@jiasheng55
Copy link
Author

@CodingCat
Thank you:)

@jiasheng55 jiasheng55 closed this Dec 3, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants