-
Notifications
You must be signed in to change notification settings - Fork 117
Error messages when the driver container fails to start. #11
Error messages when the driver container fails to start. #11
Conversation
ash211
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple minor changes for more precise errors. Users love precise errors
| s"$podStatusPhase\n" + | ||
| s"$podStatusMessage\n\n$failedDriverContainerStatusString" | ||
| try { | ||
| kubernetesClient.pods.delete(driverPod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do a logFatal before this deletion
| } catch { | ||
| case throwable: Throwable => | ||
| logError("Timed out while waiting for the driver pod to start, but the driver" + | ||
| " pod could not be found.", throwable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
say what the timeout length was here and in the below line. Probably want to pull the 30 above into a constant we can reference in both places
| " up the driver pod.", e) | ||
| } | ||
| val topLevelMessage = s"The driver pod with name ${driverPod.getMetadata.getName}" + | ||
| s" in namespace ${driverPod.getMetadata.getNamespace} was not ready in 30 seconds." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use constant for this 30
| submitCompletedFuture.get(30, TimeUnit.SECONDS) | ||
| } | ||
| try { | ||
| submitCompletedFuture.get(30, TimeUnit.SECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we consider submission to be complete only if the driver pod has entered the running state. It may make sense to instead report the submission as being complete once the APIServer has accepted the request, even if the pod is in the Pending state. Would that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the error cases we hit was that the pod was stuck in the pending state because the image could not pull. We want to count the spark-submit as complete once we are able to actually start the Spark job itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I guess we do need to time-bound the starting up of the driver, because the client side needs to submit files to it. I'm just wondering how we expect queueing of jobs to work, when a cluster is overloaded. It seems like a user can only submit jobs if his driver is guaranteed to start running almost immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC the semantics here should be the same as that for YARN cluster-mode, which runs into a similar issue if resources aren't available for the application master. Otherwise we could support a fire and forget "wait forever" mode of operation, but that can come in a later iteration once we expand the spark-submit things (if/when we include killing jobs etc. possibly also)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, in the spark standalone mode, as long as the master is in "ALIVE" state, the driver is queued - translating to fire and forget.
@foxish's concern is a valid one, in that scenario, the ideal thing is for the job/driver to be queued, though it might be gotten to at a later time. Perhaps, we can settle for an approach where the submission client polls for the driver state ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another concern I have is with the time out - how long is good enough ?
My sample runs, for example have failed due to time out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was fairly certain that in YARN at least the application remains in "ACCEPTED" state for some time and then spark-submit gives up running the application after some time. It could be the case that different cluster managers have different behavior though, which would mean we would have to decide on one or the other for this one.
The problem is that it's difficult to differentiate between queued versus image pull errors. Is it possible for K8s to change the "image pull failed" case to be one that's a FAILED state rather than a PENDING state? Are there any other error cases where a PENDING state doesn't correspond to resource constraints? I was actually pretty surprised that the pod's state was stuck in PENDING when it seemed to be the result of an error to fetch the image.
If we could differentiate PENDING from FAILED and confidently say that the PENDING state is always one where the pod could eventually be able to run (e.g. after resources are freed up), then we could properly abort if we see the container hit the FAILED state, and keep SparkSubmit around in PENDING states. The thing I want to avoid here is keeping spark-submits running against things that have zero chance to start running successfully to begin with, like error cases. I'm fine with stalling and keeping the SparkSubmit process running to wait for resource availability.
|
@aash addressed comments |
| s"$podStatusMessage\n\n$failedDriverContainerStatusString" | ||
| logError(finalErrorMessage, e) | ||
| try { | ||
| kubernetesClient.pods.delete(driverPod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the other hand, I kind of don't want to delete the pod at all, as letting it stay around gives users the ability to use kubectl to inspect the state of the pod themselves. kubectl describe pod gives a lot more depth to what's going on than we should be providing in error logs. On the other hand, leaking failed pods seems sub-par... What does everyone think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We definitely don't want to leak pods, because the end user may not understand kubectl as well as they understand spark-submit. But there should be a debug mode/commandline option that lets us keep the pod around, and inspect it later.
| } finally { | ||
| if (!submitSucceeded) { | ||
| try { | ||
| kubernetesClient.pods.withName(kubernetesAppId).delete |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Going to repeat my earlier comment as I wrote some changes that rendered the diff out of date)
On the other hand, I kind of don't want to delete the pod at all, as letting it stay around gives users the ability to use kubectl to inspect the state of the pod themselves. kubectl describe pod gives a lot more depth to what's going on than we should be providing in error logs. On the other hand, leaking failed pods seems sub-par... What does everyone think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@foxish sorry for moving the comment here. Yes - having a way to keep the pod around for debug mode would be useful. I'll follow up on that separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
brainstorming some flag names for that:
--preserve-pods
--keep-driver-pod
--preserve-driver-pod
Would we want this flag to keep the pod around even if it launched and finished its job successfully? Maybe some folks will want to look at logs afterwards for debugging, though kubectl logs might preserve those anyway so no need to keep successful pods just for logs.
ash211
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing s string
| "Driver container last state: Unknown" | ||
| } | ||
| }).getOrElse("The driver container wasn't found in the pod; expected to find" + | ||
| " container with name $DRIVER_LAUNCHER_CONTAINER_NAME") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lost the s" prefix
|
Asides the missing 's caught by @ash211 this LGTM, I love a good descriptive error message. Any other issues preventing the PR from merge ? I'm guessing that requested change will come in another commit and our concerns about how best to handle the submit will come in form of subsequent PR(s) |
|
Typo is fixed, is this good to go? We can follow up on persisting the pod with the setting separately. Questions around fire and forget and error handling are probably worth following up on as well. |
* Error messages when the driver container fails to start. * Fix messages a bit * Use timeout constant * Delete the pod if it fails for any reason (not just timeout) * Actually set submit succeeded * Fix typo
* Error messages when the driver container fails to start. * Fix messages a bit * Use timeout constant * Delete the pod if it fails for any reason (not just timeout) * Actually set submit succeeded * Fix typo
…pressions
## What changes were proposed in this pull request?
This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below:
```
val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil)
val sc = spark.sparkContext
val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
val df = spark.createDataFrame(rdd, inputSchema)
// Works correctly since no nested decimal expression is involved
// Expected result type: (26, 6) * (26, 6) = (38, 12)
df.select($"col" * $"col").explain(true)
df.select($"col" * $"col").printSchema()
// Gives a wrong result since there is a nested decimal expression that should be visited first
// Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18)
df.select($"col" * $"col" * $"col").explain(true)
df.select($"col" * $"col" * $"col").printSchema()
```
The example above gives the following output:
```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)#4]
+- LogicalRDD [col#1]
== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]
== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]
== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- Scan ExistingRDD[col#1]
// Schema
root
|-- (col * col): decimal(38,12) (nullable = true)
// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]
== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]
== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]
== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- Scan ExistingRDD[col#1]
// Schema
root
|-- ((col * col) * col): decimal(38,12) (nullable = true)
```
## How was this patch tested?
This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios.
Author: aokolnychyi <[email protected]>
Closes apache#18583 from aokolnychyi/spark-21332.
(cherry picked from commit 0be5fb4)
Signed-off-by: gatorsmile <[email protected]>
* Error messages when the driver container fails to start. * Fix messages a bit * Use timeout constant * Delete the pod if it fails for any reason (not just timeout) * Actually set submit succeeded * Fix typo
…pressions
## What changes were proposed in this pull request?
This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below:
```
val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil)
val sc = spark.sparkContext
val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
val df = spark.createDataFrame(rdd, inputSchema)
// Works correctly since no nested decimal expression is involved
// Expected result type: (26, 6) * (26, 6) = (38, 12)
df.select($"col" * $"col").explain(true)
df.select($"col" * $"col").printSchema()
// Gives a wrong result since there is a nested decimal expression that should be visited first
// Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18)
df.select($"col" * $"col" * $"col").explain(true)
df.select($"col" * $"col" * $"col").printSchema()
```
The example above gives the following output:
```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)#4]
+- LogicalRDD [col#1]
== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]
== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]
== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- Scan ExistingRDD[col#1]
// Schema
root
|-- (col * col): decimal(38,12) (nullable = true)
// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]
== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]
== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]
== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- Scan ExistingRDD[col#1]
// Schema
root
|-- ((col * col) * col): decimal(38,12) (nullable = true)
```
## How was this patch tested?
This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios.
Author: aokolnychyi <[email protected]>
Closes apache#18583 from aokolnychyi/spark-21332.
…/`to_avro`
## What changes were proposed in this pull request?
Previously in from_avro/to_avro, we override the method `simpleString` and `sql` for the string output. However, the override only affects the alias naming:
```
Project [from_avro('col,
...
, (mode,PERMISSIVE)) AS from_avro(col, struct<col1:bigint,col2:double>, Map(mode -> PERMISSIVE))apache-spark-on-k8s#11]
```
It only makes the alias name quite long: `from_avro(col, struct<col1:bigint,col2:double>, Map(mode -> PERMISSIVE))`).
We should follow `from_csv`/`from_json` here, to override the method prettyName only, and we will get a clean alias name
```
... AS from_avro(col)apache-spark-on-k8s#11
```
## How was this patch tested?
Manual check
Closes apache#22890 from gengliangwang/revise_from_to_avro.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
…k-on-k8s#11) * Error messages when the driver container fails to start. * Fix messages a bit * Use timeout constant * Delete the pod if it fails for any reason (not just timeout) * Actually set submit succeeded * Fix typo
* Error messages when the driver container fails to start. * Fix messages a bit * Use timeout constant * Delete the pod if it fails for any reason (not just timeout) * Actually set submit succeeded * Fix typo
No description provided.