Skip to content

Conversation

@liupc
Copy link

@liupc liupc commented Jan 25, 2019

What changes were proposed in this pull request?

Currently, ExecutorShuffleInfo can be recovered from file if NM recovery enabled, however, the recovery file is under a single directory, which may be unavailable if disk broken. So if a NM restart happen(may be caused by kill or some reason), the shuffle service can not start and the ExecutorShuffleInfo would lost even if there are existing executors on the node.

This may finally cause job failures(if node or executors on it not blacklisted), or at least, it will cause resource waste.(shuffle from this node always failed.), for long running spark applications, this problem may be more serious.

This PR introduced a mechanism to support multi directories for executor shuffle info recovery, this can improve the robustness of the YarnShuffleService.

How was this patch tested?

UT

Please review http://spark.apache.org/contributing.html before opening a pull request.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@liupc
Copy link
Author

liupc commented Jan 25, 2019

@srowen @vanzin @squito @HyukjinKwon +Potetial reviewers, could anybody give some suggestions?

@vanzin
Copy link
Contributor

vanzin commented Jan 25, 2019

Did you actually run into this problem?

It sounds like such an uncommon thing, and Spark should recover even if that problem happens. (stages would be recomputed, etc.)

Aside from that there's quite a few style and functional problems in the code, but no point in going through that since I'm not really convinced this helps much.

@HyukjinKwon
Copy link
Member

+1 for ^

@liupc
Copy link
Author

liupc commented Jan 28, 2019

@vanzin @HyukjinKwon
we once run into a similar problem on Spark2.0.1 when #14162 is not introduced. The disk broken of recovery path caused NM started without YarnShuffleService, so that executors scheduled on the node were unable to register with YarnShuffleService, and finally caused the application failure.

Even though, we now have #14162 and the application level blacklist, but I think this PR still make sense for long running applications(for instance, Spark ThriftServer applications or spark streaming applications).
For these type of applications, this case might not be a uncommon thing for they are running for a long time, and even if we suppose spark would recover with application level blacklist enabled, it will still cause resource waste, for shuffle will always fail on the node, not not mention that there are chances that the node is not blacklisted and will cause job failure. And the resource waste or job failure is unacceptable for a thriftServer or streaming applications.

Hope my explanation can make you convinced.

@squito
Copy link
Contributor

squito commented Jan 28, 2019

it will still cause resource waste, for shuffle will always fail on the node, not not mention that there are chances that the node is not blacklisted

there will certainly be some resource waste, but we have to balance complexity vs. how often the issue would occur and how bad the simpler behavior would be. If you have a bad disk, you're definitely losing some shuffle data. Furthermore, any other shuffleMapStages would need to know to not write their output to the bad disk also. Blacklisting should kick in here, and if it doesn't, we should figure out why. Yes, there will be some waste till that happens, but I think we can live with that.

@vanzin
Copy link
Contributor

vanzin commented Jan 28, 2019

I'm very torn on this. It makes sense to try to use a better disk, but then the NM itself doesn't do that. So if the recovery dir is bust, then the NM will be affected regardless of this.

It feels to me like enabling the option in SPARK-16505 is the right thing. If your recovery dir is bad, then the NM shouldn't be running until that is fixed. But that also assumes that the failure is detected during shuffle service initialization, and not later.

If implementing multi-disk supports, I'm also not sure even how you'd do it. Opening the DB may or may not work, depending on how bad the disk is. So if the first time it does not work, and you write the recovery db to some other directory, but then the NM crashes (e.g. because of the bad disk) and the next time, opening the DB actually works in the first try, you'll end up reading stale data before you realize you're reading from the bad disk. I see you have checks for the last mod time, but even that can cause troubles in a scenario where the failure may or may not happen depending on when you look...

I tend to think that if your recovery disk is bad, that should be treated as a catastrophic failure, and trying to work around that is kinda pointless.

What you could do is try to keep running in spite of the bad disk, e.g. by only keeping data in memory. You'd only see problems when the NM is restarted (you'd lose existing state), but at that point Spark's retry mechanism should fix things.

@liupc
Copy link
Author

liupc commented Jan 29, 2019

@squito

If you have a bad disk, you're definitely losing some shuffle data. Furthermore, any other shuffleMapStages would need to know to not write their output to the bad disk also.

This blacklist is introduced in another PR #23614, it will solve the shuffle write issues.

@liupc
Copy link
Author

liupc commented Jan 29, 2019

@vanzin

It feels to me like enabling the option in SPARK-16505 is the right thing. If your recovery dir is bad, then the NM shouldn't be running until that is fixed. But that also assumes that the failure is detected during shuffle service initialization, and not later.

Yes, I think we should make this option enabled by default. maybe in another PR.

If implementing multi-disk supports, I'm also not sure even how you'd do it. Opening the DB may or may not work, depending on how bad the disk is. So if the first time it does not work, and you write the recovery db to some other directory, but then the NM crashes (e.g. because of the bad disk) and the next time, opening the DB actually works in the first try, you'll end up reading stale data before you realize you're reading from the bad disk. I see you have checks for the last mod time, but even that can cause troubles in a scenario where the failure may or may not happen depending on when you look...

This PR just periodically check bad disk and saving current executors info in memory to the new good directory. The data is newest if we handles well the synchronization. There indeed exists a case that make the recovery failure(NM crashes and disk broken happens at the same time), but it should be really really rare to happen.

I tend to think that if your recovery disk is bad, that should be treated as a catastrophic failure, and trying to work around that is kinda pointless. What you could do is try to keep running in spite of the bad disk, e.g. by only keeping data in memory. You'd only see problems when the NM is restarted (you'd lose existing state), but at that point Spark's retry mechanism should fix things.

I understand what you are talking about, but the major problem is that if that happens, the long running applications can not recover from resource waste or maybe occasional job failure except restarting the application. I think if this problem can be resolved by current implementation, then I agree with your opinion. But for my understanding, current implementation can not solve this problem.
However, with my PR, the application can run as usual, and when SREs fixed the disk, everything comes back.

@squito
Copy link
Contributor

squito commented Jan 29, 2019

If you have a bad disk, you're definitely losing some shuffle data. Furthermore, any other shuffleMapStages would need to know to not write their output to the bad disk also.

This blacklist is introduced in another PR #23614, it will solve the shuffle write issues.

OK I see, I've reviewed that PR now. But at best, that still doesn't completely handle the problem, as any existing shuffle data written to the bad disks is gone (and as I noted on that PR, its somewhat complicated to make sure that the ExternalShuffleService and the executor keep a consistent view of good dirs).

@vanzin
Copy link
Contributor

vanzin commented Jan 29, 2019

I think if this problem can be resolved by current implementation

I think the current implementation could be enhanced, but I'd prefer a simpler approach.

If you just change the current implementation to not save recovery data, what data is lost and how does Spark recover from it, if at all?

The shuffle service will need at least the app secret to allow the executors to connect. I'm wondering if after a restart, YARN actually calls the initializeApplication callback which would allow that data to be re-created. That's the bare minimum; I'm hoping that the executor registration data can be somehow re-created, but haven't really looked into that.

@liupc
Copy link
Author

liupc commented Jan 30, 2019

@squito

that still doesn't completely handle the problem, as any existing shuffle data written to the bad disks is gone
Yes, the existing shuffle data written for those finished ShuffleMapTask would gone, but Spark'retry( I mean the stage retry) mechanism should fix things, is it?

@liupc
Copy link
Author

liupc commented Jan 30, 2019

@vanzin

I'm hoping that the executor registration data can be somehow re-created

My change does save recovery data to a better directory(as explained in the above note) if disk error happens, so spark can recover from it.

The shuffle service will need at least the app secret to allow the executors to connect. I'm wondering if after a restart, YARN actually calls the initializeApplication callback which would allow that data to be re-created. That's the bare minimum;

this secrets recovery is done by YarnShuffleService itself. so maybe we should also change secret recovery related codes.

@srowen srowen closed this Feb 26, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants