Skip to content

Commit 3ceec3b

Browse files
juliuszsompolskiHyukjinKwon
authored andcommitted
[SPARK-44835][CONNECT] Make INVALID_CURSOR.DISCONNECTED a retriable error
### What changes were proposed in this pull request? Make INVALID_CURSOR.DISCONNECTED a retriable error. ### Why are the changes needed? This error can happen if two RPCs are racing to reattach to the query, and the client is still using the losing one. SPARK-44833 was a bug that exposed such a situation. That was fixed, but to be more robust, we can make this error retryable. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tests will be added in #42560 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42818 from juliuszsompolski/SPARK-44835. Authored-by: Juliusz Sompolski <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit f13743d) Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent a9d601c commit 3ceec3b

File tree

2 files changed

+44
-4
lines changed

2 files changed

+44
-4
lines changed

connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,22 @@ private[sql] object GrpcRetryHandler extends Logging {
217217
*/
218218
private[client] def retryException(e: Throwable): Boolean = {
219219
e match {
220-
case e: StatusRuntimeException => e.getStatus.getCode == Status.Code.UNAVAILABLE
220+
case e: StatusRuntimeException =>
221+
val statusCode: Status.Code = e.getStatus.getCode
222+
223+
if (statusCode == Status.Code.INTERNAL) {
224+
val msg: String = e.toString
225+
226+
// This error happens if another RPC preempts this RPC.
227+
if (msg.contains("INVALID_CURSOR.DISCONNECTED")) {
228+
return true
229+
}
230+
}
231+
232+
if (statusCode == Status.Code.UNAVAILABLE) {
233+
return true
234+
}
235+
false
221236
case _ => false
222237
}
223238
}

python/pyspark/sql/connect/client/core.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -585,11 +585,36 @@ class SparkConnectClient(object):
585585

586586
@classmethod
587587
def retry_exception(cls, e: Exception) -> bool:
588-
if isinstance(e, grpc.RpcError):
589-
return e.code() == grpc.StatusCode.UNAVAILABLE
590-
else:
588+
"""
589+
Helper function that is used to identify if an exception thrown by the server
590+
can be retried or not.
591+
592+
Parameters
593+
----------
594+
e : Exception
595+
The GRPC error as received from the server. Typed as Exception, because other exception
596+
thrown during client processing can be passed here as well.
597+
598+
Returns
599+
-------
600+
True if the exception can be retried, False otherwise.
601+
602+
"""
603+
if not isinstance(e, grpc.RpcError):
591604
return False
592605

606+
if e.code() in [grpc.StatusCode.INTERNAL]:
607+
msg = str(e)
608+
609+
# This error happens if another RPC preempts this RPC.
610+
if "INVALID_CURSOR.DISCONNECTED" in msg:
611+
return True
612+
613+
if e.code() == grpc.StatusCode.UNAVAILABLE:
614+
return True
615+
616+
return False
617+
593618
def __init__(
594619
self,
595620
connection: Union[str, ChannelBuilder],

0 commit comments

Comments
 (0)