-
Notifications
You must be signed in to change notification settings - Fork 117
Support SSL configuration for the driver application submission #49
Conversation
The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server.
We don't need to persist these after the pod has them mounted and is running already.
| } | ||
|
|
||
| private def configureSsl(kubernetesClient: KubernetesClient) | ||
| : (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move colon to previous line
| (sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray) | ||
| } else { | ||
| (Array[EnvVar](), Array[Volume](), Array[VolumeMount](), Array[Secret]()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe you can move this exceptional case to the start?
in general, it can read nicely to change from:
if (normal1) {
if (normal2) {
if (normal3) {
// do everything normal
} else {
// handle not normal3
}
} else {
// handle not normal2
}
} else {
// handle not normal1
}
to this form instead:
if (!normal1) {
// handle not normal1
}
if (!normal2) {
// handle not normal2
}
if (!normal3) {
// handle not normal3
}
// do everything normal
Advantages of the second are that the indentation is much shallower, and also that the non-normal handling and the check are next to each other, instead of split to opposite sides of the method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To do this here we would have to use the return keyword but I don't know if using return is idiomatic in Scala. To avoid using the return keyword then the last statement must be the values that are returned. But then we would have to use if...else to make it such that the case in the if returns the value properly., and in such a case we don't reduce the indentation as we would prefer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding if-else ordering - if we want to avoid the return statement here, then I prefer checking the positive of a conditional when branching with if... else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, return seems to frowned on in scala: https://tpolecat.github.io/2014/05/09/return.html
No need to change the style if it doesn't flow smoothy
| (trustManagers(0).asInstanceOf[X509TrustManager], sslContext) | ||
| }).getOrElse((null, SSLContext.getDefault)) | ||
| } else { | ||
| (null, SSLContext.getDefault) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same thing here with moving exception handling to the top of the method and flattening some indentation out
| val nodeAddress = node.getStatus.getAddresses.asScala.head.getAddress | ||
| val url = s"http://$nodeAddress:$servicePort" | ||
| HttpClientUtil.createClient[KubernetesSparkRestApi](uri = url) | ||
| val urlScheme = if (driverLaunchSslOptions.enabled) "https" else "http" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log a warning if this is http -- users reading logs should be notified that they're running an insecure configuration
| private def fileToUtf8String(filePath: String) = { | ||
| val passwordFile = new File(filePath) | ||
| if (!passwordFile.isFile) { | ||
| throw new IllegalArgumentException("KeyStore password file does not exist or " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be a keystore password file or a key password file (you use it for both), so need to make the message more generic.
also include the path looked at in the error message
| } | ||
|
|
||
| private def fileToUtf8String(filePath: String) = { | ||
| val passwordFile = new File(filePath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this File to be Utils.tryWithResourced ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No - FIles.readAllBytes opens and closes the streams appropriately.
| "--conf", s"spark.ssl.kubernetes.driverlaunch.trustStorePassword=changeit", | ||
| EXAMPLES_JAR) | ||
| SparkSubmit.main(args) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the truststore and keystore below, create a README file next to it explaining how they were generated. ideally, generate them programmatically as part of the integration tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't get them to generate programmatically in Java/Scala code unfortunately without JCE unlimited key strength. If we could guarantee that our build system would have the JCE policy files installed then that's probably ok. Alternatively I could also have been using the security APIs incorrectly (was trying to use BouncyCastle) so someone should feel free to give that a go but I think checking in the static files is fine for now.
| case "--port" :: value :: tail => | ||
| args = tail | ||
| resolvedArguments.copy(port = Some(value.toInt)) | ||
| case "--use-ssl" :: value :: tail => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this down one so all the ssl-related flags are together
|
|
||
| # This class will also require setting a secret via the SPARK_APP_SECRET environment variable | ||
| CMD exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer --hostname $HOSTNAME --port $SPARK_DRIVER_LAUNCHER_SERVER_PORT --secret-file $SPARK_SUBMISSION_SECRET_LOCATION | ||
| CMD SSL_ARGS="" && \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is tricky - I think using SPARK_DAEMON_JAVA_OPTS is another option here with passing system properties through -D. But I like that here it's clear from the Docker File how the driver rest server is to be configured.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One downside here is that creators of custom images will need to stay pretty up to date with the exact version of spark-submit k8s being used for submissions. I'd imagine that we add configuration options not infrequently.
I think it's safe to say though that docker images will be tightly tied to spark-submit versions anyway.
In general I prefer explicit over implicit so seeing this written out seems better
ash211
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems good -- @foxish thoughts on the SSL implementation here?
|
|
||
| # This class will also require setting a secret via the SPARK_APP_SECRET environment variable | ||
| CMD exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer --hostname $HOSTNAME --port $SPARK_DRIVER_LAUNCHER_SERVER_PORT --secret-file $SPARK_SUBMISSION_SECRET_LOCATION | ||
| CMD SSL_ARGS="" && \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One downside here is that creators of custom images will need to stay pretty up to date with the exact version of spark-submit k8s being used for submissions. I'd imagine that we add configuration options not infrequently.
I think it's safe to say though that docker images will be tightly tied to spark-submit versions anyway.
In general I prefer explicit over implicit so seeing this written out seems better
| barrier.await() | ||
| } | ||
|
|
||
| private def fileToUtf8String(filePath: String, fileType: String) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can maybe replace this method with Guava's Files.toString
http://digitalsanctum.com/2012/11/30/how-to-read-file-contents-in-java-the-easy-way-with-guava/
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import org.apache.spark.{SecurityManager, SPARK_VERSION, SparkConf} | ||
| import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why alias SPARK_SESSION to something different? The casing nicely implies that it's a constant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This keeps things consistent with other places that use this constant.
| "https" | ||
| } else { | ||
| logWarning("Submitting application details and local jars to the cluster" + | ||
| " over an insecure connection. Consider configuring SSL to secure" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should mention "secret" in the application details, and maybe "Strong consider" turning on SSL
| " this step.") | ||
| "http" | ||
| } | ||
| val (trustManager, sslContext): (X509TrustManager, SSLContext) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could pull this whole expression out into a separate function if we wanted to.. just takes in driverLaunchSslOptions
| (sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray) | ||
| } else { | ||
| (Array[EnvVar](), Array[Volume](), Array[VolumeMount](), Array[Secret]()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, return seems to frowned on in scala: https://tpolecat.github.io/2014/05/09/return.html
No need to change the style if it doesn't flow smoothy
|
I have no major qualms -- we can merge this now (note that it's not into the main branch but rather into the in-progress |
|
Going to merge this immediately since it's getting hard to track the changes both here and on |
…it jars (#30) * Revamp ports and service setup for the driver. - Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started * Move service creation down and more thorough error handling * Fix missed merge conflict * Add braces * Fix bad merge * Address comments and refactor run() more. Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod() * Remove unused method * Support SSL configuration for the driver application submission (#49) * Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts * Fix bad merge * Remove unnecessary braces * Fix compiler error
…it jars (#30) * Revamp ports and service setup for the driver. - Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started * Move service creation down and more thorough error handling * Fix missed merge conflict * Add braces * Fix bad merge * Address comments and refactor run() more. Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod() * Remove unused method * Support SSL configuration for the driver application submission (#49) * Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts * Fix bad merge * Remove unnecessary braces * Fix compiler error
…it jars (#30) * Revamp ports and service setup for the driver. - Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started * Move service creation down and more thorough error handling * Fix missed merge conflict * Add braces * Fix bad merge * Address comments and refactor run() more. Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod() * Remove unused method * Support SSL configuration for the driver application submission (#49) * Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts * Fix bad merge * Remove unnecessary braces * Fix compiler error
…it jars (#30) * Revamp ports and service setup for the driver. - Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started * Move service creation down and more thorough error handling * Fix missed merge conflict * Add braces * Fix bad merge * Address comments and refactor run() more. Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod() * Remove unused method * Support SSL configuration for the driver application submission (#49) * Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts * Fix bad merge * Remove unnecessary braces * Fix compiler error
…it jars (apache-spark-on-k8s#30) * Revamp ports and service setup for the driver. - Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started * Move service creation down and more thorough error handling * Fix missed merge conflict * Add braces * Fix bad merge * Address comments and refactor run() more. Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod() * Remove unused method * Support SSL configuration for the driver application submission (apache-spark-on-k8s#49) * Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts * Fix bad merge * Remove unnecessary braces * Fix compiler error
…it jars (apache-spark-on-k8s#30) * Revamp ports and service setup for the driver. - Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started * Move service creation down and more thorough error handling * Fix missed merge conflict * Add braces * Fix bad merge * Address comments and refactor run() more. Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod() * Remove unused method * Support SSL configuration for the driver application submission (apache-spark-on-k8s#49) * Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts * Fix bad merge * Remove unnecessary braces * Fix compiler error
…tions
### What changes were proposed in this pull request?
In order to avoid frequently changing the value of `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions`, we usually set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` much larger than `spark.sql.shuffle.partitions` after enabling adaptive execution, which causes some bucket map join lose efficacy and add more `ShuffleExchange`.
How to reproduce:
```scala
val bucketedTableName = "bucketed_table"
spark.range(10000).write.bucketBy(500, "id").sortBy("id").mode(org.apache.spark.sql.SaveMode.Overwrite).saveAsTable(bucketedTableName)
val bucketedTable = spark.table(bucketedTableName)
val df = spark.range(8)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
// Spark 2.4. spark.sql.adaptive.enabled=false
// We set spark.sql.shuffle.partitions <= 500 every time based on our data in this case.
spark.conf.set("spark.sql.shuffle.partitions", 500)
bucketedTable.join(df, "id").explain()
// Since 3.0. We enabled adaptive execution and set spark.sql.adaptive.shuffle.maxNumPostShufflePartitions to a larger values to fit more cases.
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions", 1000)
bucketedTable.join(df, "id").explain()
```
```
scala> bucketedTable.join(df, "id").explain()
== Physical Plan ==
*(4) Project [id#5L]
+- *(4) SortMergeJoin [id#5L], [id#7L], Inner
:- *(1) Sort [id#5L ASC NULLS FIRST], false, 0
: +- *(1) Project [id#5L]
: +- *(1) Filter isnotnull(id#5L)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500
+- *(3) Sort [id#7L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#7L, 500), true, [id=apache-spark-on-k8s#49]
+- *(2) Range (0, 8, step=1, splits=16)
```
vs
```
scala> bucketedTable.join(df, "id").explain()
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Project [id#5L]
+- SortMergeJoin [id#5L], [id#7L], Inner
:- Sort [id#5L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#5L, 1000), true, [id=apache-spark-on-k8s#93]
: +- Project [id#5L]
: +- Filter isnotnull(id#5L)
: +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500
+- Sort [id#7L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#7L, 1000), true, [id=apache-spark-on-k8s#92]
+- Range (0, 8, step=1, splits=16)
```
This PR makes read bucketed tables always obeys `spark.sql.shuffle.partitions` even enabling adaptive execution and set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` to avoid add more `ShuffleExchange`.
### Why are the changes needed?
Do not degrade performance after enabling adaptive execution.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Unit test.
Closes apache#26409 from wangyum/SPARK-29655.
Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
No description provided.