Skip to content

Conversation

@sujith71955
Copy link
Contributor

What changes were proposed in this pull request?

Currently even Broadcast thread is timed out, Jobs are not aborted and it will run in the background.
As per current design the broadcast future will be waiting till the timeout for the job result, which needs to be broadcasted , when the broadcast future timeout happens the
job tasks running in the background will not getting killed and it will continue running in background.

As part of solution we shall get the jobs based on execution id from app-status store and cancel the respective job before throwing out the Future time out exception,
this can help to terminate the job and its respective tasks promptly when Timeout-exception happens, this will also save the additional resources getting utilized even after timeout exception thrown from driver.

After fix In Spark web UI the jobs are getting failed once timeout error occurs.

How was this patch tested?

Manually
Before fix

scala> val df1 = spark.range(0,10000,1,10000).selectExpr("id%10000 as key1", "id as value1")
df1: org.apache.spark.sql.DataFrame = [key1: bigint, value1: bigint]

scala> val df2 = spark.range(0,10000,1,10000).selectExpr("id%10000 as key2", "id as value2")
df2: org.apache.spark.sql.DataFrame = [key2: bigint, value2: bigint]

scala> val inner = df1.join(df2,col("key1")===col("key2")).select(col("key1"),col("value2")).collect 

Actual Result : Timeout exception thrown and still task will be running in background, in spark web ui also the task execution will be in progress and after execution the job status shown successful, please refer attachments for more details.
image
Web UI
broadcast_fished

After Fix:
Once timeout occurs the job will be cancelled and even in UI the job status displayed as failed.
brcast_fix

Web UI
brdast_fix2

…ce broadcast timeout occurs

## What changes were proposed in this pull request?
Currently even Broadcast thread is timed out, Jobs are not aborted and it will run in the baakground, as per current design
the broadcast future will be submitting the job whose result needs to be broadcasted wiithin a particular time, when the broadcast timeout happens the
jobs which are scheduled will not getting killed and it will continue running in background even though time out happens.

As part of solution we shall get the jobs based on execution id from appstatus store and cancel the respective job before throwing out the Future time out exception,
this can help to terminate the job promptly when TimeOutException happens, this will also save the additional resources getting utilized even after timeout exception thrown from driver.
In UI also the jobs are getting failed after applying this patch.

## How was this patch tested?

Manually
@sujith71955
Copy link
Contributor Author

cc @srowen @HyukjinKwon @jinxing64
Please help to review this patch. Thanks

@srowen
Copy link
Member

srowen commented Mar 9, 2019

Is it necessary to kill all these jobs? I get that they will probably fail without the broadcast, but, it's also possible they won't.

@sujith71955
Copy link
Contributor Author

sujith71955 commented Mar 9, 2019

Is it necessary to kill all these jobs? I get that they will probably fail without the broadcast, but, it's also possible they won't.
You are right, the subsequent job will always failed without the broadcast results , and few points i want to highlight here is,
a) Broadcast job which is running in the background may unnecessarily occupy the resources , even-though the query will fail in the end , so it will be better if we can cancel these useless jobs/tasks which we are not sure how much time it can run.
b) Another point is regarding web ui, In UI broadcast job will be shown as completed which may simply mislead the users.

@sujith71955
Copy link
Contributor Author

@srowen hope i clarified your question. thanks

@srowen
Copy link
Member

srowen commented Mar 9, 2019

I don't feel strongly about it, but I am not sure it's worth this complexity.

@sujith71955
Copy link
Contributor Author

sujith71955 commented Mar 9, 2019

I don't feel strongly about it, but I am not sure it's worth this complexity.

For short tasks it may not harm much but long running tasks can unnecessary hold the resources, and ultimately the query results in failure. Moreover we dont have any control on type of broadcast jobs, it may vary based on business use-cases.

When the jobs are not able finish in 5 mins(Default Broadcast timeout time) which means it can turn out to be a long running job.

@ajithme
Copy link
Contributor

ajithme commented Mar 11, 2019

@sujith71955 Few Questions,

  1. In your scenario when the broadcast has timeout, will the query fail.?
  2. Even if the query has failed, the broadcast tasks are running,?

@sujith71955
Copy link
Contributor Author

@ajithme

  1. Yes, the query will fail as it will not get any result for broadcast once timeout occur.
  2. broadcast jobs tasks will be running in the background because future timeout wont ensure termination of the tasks. since the task may run for long time also, it will unnecesarily occupy the resources.

@ajithme
Copy link
Contributor

ajithme commented Mar 11, 2019

Is it necessary to kill all these jobs? I get that they will probably fail without the broadcast, but, it's also possible they won't.

@srowen then in this case how do we justify to the end user who will see that his query has failed (due to timeout) but his resource are still being occupied by the failed query (which may or may not eventually complete).

I think killing such orphaned tasks justifies the case

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Mar 11, 2019

Test build #103314 has finished for PR 24036 at commit 43bfd81.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sujith71955
Copy link
Contributor Author

@HyukjinKwon Any suggestions regarding this PR, please let me know for any inputs. thanks

@sujith71955
Copy link
Contributor Author

@HyukjinKwon @cloud-fan @srowen
Please let me know any further inputs.thnks

} catch {
case ex: TimeoutException =>
logError(s"Could not execute broadcast in ${timeout.toSeconds} secs.", ex)
val executionUIData = sqlContext.sparkSession.sharedState.statusStore.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a reliable way to get the associated jobs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQLAppStatusListener will hold the live execution data, so i use the same for getting the associated jobs , If its not efficient way of getting then i will revisit the code and try to find a better mechanism for getting the associated job for the particular execution Id. please let me know for any suggestions. thanks for your valuable time.

Copy link
Contributor Author

@sujith71955 sujith71955 Mar 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot find an alternative to get Jobs based on the execution id in this layer of execution, seems to be this way shall be reliable as whenever we are submitting /processing events via dagscheduler, we are always posting the events to SQLAppStatusListener, this will make our job viable to retrieve the jobs from LiveExecutionData.
Please let me know if we have any better way to get this job done. Thanks .

@sujith71955
Copy link
Contributor Author

sujith71955 commented Mar 18, 2019

Gentle ping @HyukjinKwon @cloud-fan @srowen

@cloud-fan
Copy link
Contributor

maybe this is not the right place to do it. We should have a query manager that watches all the broadcasting of a query and cancel the entire query if one broadcasting fails.

@sujith71955
Copy link
Contributor Author

sujith71955 commented Mar 18, 2019 via email

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@sujith71955
Copy link
Contributor Author

closing this PR as this scenario is already handled in below PR
https://github.com/apache/spark/pull/24595/files
Thanks all for your valuable inputs and time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants