-
Notifications
You must be signed in to change notification settings - Fork 117
Extract more of the shuffle management to a different class. #454
Extract more of the shuffle management to a different class. #454
Conversation
4b6dee4 to
6e060ed
Compare
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.network.shuffle.kubernetes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is equivalent to what used to be KubernetesExternalShuffleClient, with just a rename and an interface extraction. I suppose git doesn't present that in the most intuitive way here.
|
@foxish for review. I hope that organizing the code this way will make the scheduler backend and the shuffle related logic easier to test. |
|
rerun integration tests please |
|
This seems like a good refactoring |
|
Think the failure is legitimate, investigating. |
foxish
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Thanks.
Left a couple of comments.
| Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) | ||
|
|
||
| val kubernetesShuffleManager = if (Utils.isDynamicAllocationEnabled(sparkConf)) { | ||
| val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this ensure that each executor gets a different instance of the shuffle client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KubernetesClusterManager is on the driver.
| SparkEnv.get.securityManager.getIOEncryptionKey()) | ||
| context.reply(reply) | ||
| } | ||
| val shuffleSpecifiProperties = shuffleManager.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shuffleSpecifiProperties -> shuffleSpecificProperties
|
Test failure is because apparently the volume names are bad, now that we're appending index numbers to them. We probably just don't need the index numbers. |
dbb113d to
dc6b186
Compare
9f9b432 to
65496d2
Compare
| .getLogger(KubernetesExternalShuffleClientImpl.class); | ||
|
|
||
| /** | ||
| * Creates an Kubernetes external shuffle client that wraps the {@link ExternalShuffleClient}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Creates an Kubernetes .. -> Creates a Kubernetes ..
| @Override | ||
| public void close() { | ||
| super.close(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we omit this method and let the close method inherited from ExternalShuffleClient suffice?
| throw new SparkException(s"Unable to find shuffle pod on node $nodeName")) | ||
| } | ||
| // Inform the shuffle pod about this application so it can watch. | ||
| shuffleClient.registerDriverWithShuffleService(shufflePodIp, externalShufflePort) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this called by the driver every time an executor registers with the driver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - this was the case before the change as well, I think. @foxish
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be telling each shuffle pod about each drivers that it needs to keep track of (in order for the cleanup to work even in cases where the application doesn't terminate gracefully). There may be duplicate calls if we're registering the same driver with the same shuffle service but it's not been common.
| override def getExecutorShuffleDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = { | ||
| shuffleDirs.zipWithIndex.map { | ||
| case (shuffleDir, shuffleDirIndex) => | ||
| val volumeName = s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like this fixes a bug from before, where the old implementation would have attempted to mount multiple volumes with the same name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
|
@mccheah ready for re-aim into branch-2.2-kubernetes |
More efforts to reduce the complexity of the KubernetesClusterSchedulerBackend. The scheduler backend should not be concerned about anything other than the coordination of the executor lifecycle.
a55b28e to
e7a460e
Compare
|
Rebase is done. |
|
@mccheah please resolve conflicts |
…rate-external-shuffle-management
|
Conflicts are fixed, is this good to merge? |
|
The new structure is much nicer. Thanks! Ok to merge from my end. |
…spark-on-k8s#454) * Extract more of the shuffle management to a different class. More efforts to reduce the complexity of the KubernetesClusterSchedulerBackend. The scheduler backend should not be concerned about anything other than the coordination of the executor lifecycle. * Fix scalastyle * Add override annotation * Fix Java style * Remove unused imports. * Move volume index to the beginning to satisfy index * Address PR comments.
Need to figure out how to merge the checkstyles because otherwise we will end up with a lot of mess
…spark-on-k8s#454) * Extract more of the shuffle management to a different class. More efforts to reduce the complexity of the KubernetesClusterSchedulerBackend. The scheduler backend should not be concerned about anything other than the coordination of the executor lifecycle. * Fix scalastyle * Add override annotation * Fix Java style * Remove unused imports. * Move volume index to the beginning to satisfy index * Address PR comments.
More efforts to reduce the complexity of the KubernetesClusterSchedulerBackend. The scheduler backend should not be concerned about anything other than the coordination of the executor lifecycle.
Requires #452.