Skip to content

Conversation

@zhenlineo
Copy link
Contributor

@zhenlineo zhenlineo commented Jul 19, 2023

What changes were proposed in this pull request?

This PR introduces a stub class loader for unpacking Scala UDFs in the driver and the executor. When encountering user classes that are not found on the server session classpath, the stub class loader would try to stub the class.

This solves the problem that when serializing UDFs, Java serializer might include unnecessary user code e.g. User classes used in the lambda definition signatures in the same class where the UDF is defined.

If the user code is actually needed to execute the UDF, we will return an error message to suggest the user to add the missing classes using the addArtifact method.

Why are the changes needed?

To enhance the user experience of UDF. This PR should be merged to master and 3.5.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added test both for Scala 2.12 & 2.13

4 tests in SparkSessionE2ESuite still fail to run with maven after the fix because the client test jar is installed on the system classpath (added using --jar at server start), the stub classloader can only stub classes missing from the session classpath (added using session.addArtifact).

Moving the test jar to the session classpath causes failures in tests for flatMapGroupsWithState (SPARK-44576). Finish moving the test jar to session classpath once flatMapGroupsWithState test failures are fixed.

@zhenlineo zhenlineo changed the title [WIP] Fix class loading problem caused by stub user classes not found on the server classpath [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath Jul 25, 2023
@zhenlineo zhenlineo marked this pull request as ready for review July 25, 2023 05:43
@LuciferYang
Copy link
Contributor

LuciferYang commented Jul 25, 2023

checked maven test with this pr, there are 10 TESTS FAILED, further confirmation is needed to confirm whether all are related to this pr:

run

build/mvn clean install -DskipTests -Phive
build/mvn clean test -pl connector/connect/client/jvm
FlatMapGroupsWithStateStreamingSuite:
- flatMapGroupsWithState - streaming *** FAILED ***
  org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
  at scala.collection.Iterator.toStream(Iterator.scala:1417)
  at scala.collection.Iterator.toStream$(Iterator.scala:1416)
  at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
  at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
  at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
  at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
  ...
- flatMapGroupsWithState - streaming - with initial state *** FAILED ***
  org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
  at scala.collection.Iterator.toStream(Iterator.scala:1417)
  at scala.collection.Iterator.toStream$(Iterator.scala:1416)
  at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
  at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
  at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
  at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
  ...
- mapGroupsWithState - streaming *** FAILED ***
  org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
  at scala.collection.Iterator.toStream(Iterator.scala:1417)
  at scala.collection.Iterator.toStream$(Iterator.scala:1416)
  at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
  at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
  at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
  at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
  ...
- mapGroupsWithState - streaming - with initial state *** FAILED ***
  org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
  at scala.collection.Iterator.toStream(Iterator.scala:1417)
  at scala.collection.Iterator.toStream$(Iterator.scala:1416)
  at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
  at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
  at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
  at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
  ...
- flatMapGroupsWithState *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 489.0 failed 1 times, most recent failure: Lost task 0.0 in stage 489.0 (TID 1997) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:87)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
  at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
  at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
  at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
  at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
  at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
  ...
- flatMapGroupsWithState - with initial state *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 494.0 failed 1 times, most recent failure: Lost task 0.0 in stage 494.0 (TID 2006) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(generated.java:87)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
  at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
  at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
  at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
  at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
  at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
  ...
- mapGroupsWithState *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 497.0 failed 1 times, most recent failure: Lost task 0.0 in stage 497.0 (TID 2013) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:87)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
  at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
  at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
  at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
  at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
  at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
  ...
- mapGroupsWithState - with initial state *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 502.0 failed 1 times, most recent failure: Lost task 0.0 in stage 502.0 (TID 2022) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(generated.java:87)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
  at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
  at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
  at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
  at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
  at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
  ...
- update class loader after stubbing: new session *** FAILED ***
  java.io.NotSerializableException: org.scalatest.Engine
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
  at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
  at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
  at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
  at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
  at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
  ...
- update class loader after stubbing: same session *** FAILED ***
  java.io.NotSerializableException: org.scalatest.Engine
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
  at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
  at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
  at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
  at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
  at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
  ...
*** 10 TESTS FAILED ***

@LuciferYang
Copy link
Contributor

checked maven test with this pr, there are 10 TESTS FAILED, further confirmation is needed to confirm whether all are related to this pr:

run

build/mvn clean install -DskipTests -Phive
build/mvn clean test -pl connector/connect/client/jvm
FlatMapGroupsWithStateStreamingSuite:
- flatMapGroupsWithState - streaming *** FAILED ***
  org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
  at scala.collection.Iterator.toStream(Iterator.scala:1417)
  at scala.collection.Iterator.toStream$(Iterator.scala:1416)
  at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
  at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
  at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
  at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
  ...
- flatMapGroupsWithState - streaming - with initial state *** FAILED ***
  org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
  at scala.collection.Iterator.toStream(Iterator.scala:1417)
  at scala.collection.Iterator.toStream$(Iterator.scala:1416)
  at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
  at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
  at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
  at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
  ...
- mapGroupsWithState - streaming *** FAILED ***
  org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
  at scala.collection.Iterator.toStream(Iterator.scala:1417)
  at scala.collection.Iterator.toStream$(Iterator.scala:1416)
  at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
  at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
  at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
  at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
  ...
- mapGroupsWithState - streaming - with initial state *** FAILED ***
  org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
  at scala.collection.Iterator.toStream(Iterator.scala:1417)
  at scala.collection.Iterator.toStream$(Iterator.scala:1416)
  at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
  at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
  at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
  at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
  ...
- flatMapGroupsWithState *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 489.0 failed 1 times, most recent failure: Lost task 0.0 in stage 489.0 (TID 1997) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:87)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
  at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
  at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
  at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
  at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
  at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
  ...
- flatMapGroupsWithState - with initial state *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 494.0 failed 1 times, most recent failure: Lost task 0.0 in stage 494.0 (TID 2006) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(generated.java:87)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
  at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
  at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
  at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
  at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
  at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
  ...
- mapGroupsWithState *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 497.0 failed 1 times, most recent failure: Lost task 0.0 in stage 497.0 (TID 2013) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:87)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
  at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
  at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
  at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
  at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
  at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
  ...
- mapGroupsWithState - with initial state *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 502.0 failed 1 times, most recent failure: Lost task 0.0 in stage 502.0 (TID 2022) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(generated.java:87)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
  at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
  at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
  at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
  at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
  at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
  at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
  at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
  ...
- update class loader after stubbing: new session *** FAILED ***
  java.io.NotSerializableException: org.scalatest.Engine
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
  at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
  at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
  at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
  at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
  at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
  ...
- update class loader after stubbing: same session *** FAILED ***
  java.io.NotSerializableException: org.scalatest.Engine
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
  at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
  at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
  at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
  at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
  at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
  ...
*** 10 TESTS FAILED ***

All test failures only occur with this pr, but this PR solves four test failures in SparkSessionE2ESuite

@zhenlineo
Copy link
Contributor Author

@LuciferYang Thanks for the detailed review. Let me fix them. The udf test failures might be caused by my class loading ordering with stub.

@github-actions github-actions bot removed the BUILD label Jul 25, 2023
@zhenlineo
Copy link
Contributor Author

@LuciferYang The 10 errors:

  • 8 Streaming related test failures: Something is wrong with the class loader, still investigating.
  • 2 new udf loading test failures: they needs the client jar file, so the test cannot run with mvn clean. I will update a warning to help with these two failures.

@LuciferYang
Copy link
Contributor

2 new udf loading test failures: they needs the client jar file, so the test cannot run with mvn clean. I will update a warning to help with these two failures.

Yes, we should make a clear indication on this, as I’ve noticed that many developers get into the habit of using the build/mvn package test command for testing, which ends up causing test failures.

@zhenlineo
Copy link
Contributor Author

@LuciferYang @vicennial This is ready for another look, thanks.
The PR should be merged to 3.5 as this is a bug fix for 3.5 UDFs.
cc @rednaxelafx @juliuszsompolski @hvanhovell

@LuciferYang

This comment was marked as outdated.

val plan = proto.Plan.newBuilder().setCommand(command).build()

client.execute(plan)
client.execute(plan).asScala.foreach(_ => ())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the registerUDF call is async. I do not feel it is correct to have registerUDF to be async, so added the code to block for success or error.

classWriter.visitSource(name + ".java", null)

// Generate constructor.
val ctorWriter = classWriter.visitMethod(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you file a follow-up to make this throw an exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to cover the case where the default constructor is called? I had the code, just thought not that useful as 99% cases it failed to call other constructors/scan method before calling any constructor etc. Let me bring back the code...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Location is a bit weird, why not in src/test/scala?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This source file cannot be on the classpath, otherwise sbt would include it in the server system classpath. So it is outside in resources. We only needs the jars and binaries, which will be manually installed in session classpath. Keeping the source file is just in case anyone wondering what the dummy udf looks like.

if (updated) {
// When a new url is added for non-default class loader, recreate the class loader
// to ensure all classes are updated.
state.urlClassLoader = createClassLoader(state.urlClassLoader.getURLs, useStub = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we recreate the URL classloader as well? Is that needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm I get it.

.createWithDefault(false)

private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
ConfigBuilder("spark.connect.scalaUdf.stubClasses")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stubPrefixes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you change this in a follow-up?

isolatedSession)
// Always reset the thread class loader to ensure if any updates, all threads (not only
// the thread that updated the dependencies) can update to the new class loader.
Thread.currentThread.setContextClassLoader(isolatedSession.replClassLoader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure we do this else where as well.

Are you also unsetting it once you are done?

val loader = if (SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_CLASSES).nonEmpty) {
val stubClassLoader =
StubClassLoader(null, SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_CLASSES))
new ChildFirstURLClassLoader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this follow the same rules for classpath resolution we have on the executor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should to be consistent. Let me fix in a followup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it is fine. There are 3 existing class loader:

User CL : classes added using --jar
Sys CL: Spark + sys libs
Session CL: classes added using session.addArtifacts

In Executor:

  • normal: Sys -> (User + Session) -> Stub
  • reverse: (User + Session) -> Sys -> Stub

In Driver:

  • normal: (Sys + User) -> Session -> Stub
  • reverse: (User -> Sys) -> Session -> Stub

So here what you saw is () -> Session -> Stub.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hvanhovell
Copy link
Contributor

Merging this, it fixes a pretty big UX issue for UDFs!

hvanhovell pushed a commit that referenced this pull request Jul 29, 2023
…classes not found on the server classpath

### What changes were proposed in this pull request?
This PR introduces a stub class loader for unpacking Scala UDFs in the driver and the executor. When encountering user classes that are not found on the server session classpath, the stub class loader would try to stub the class.

This solves the problem that when serializing UDFs, Java serializer might include unnecessary user code e.g. User classes used in the lambda definition signatures in the same class where the UDF is defined.

If the user code is actually needed to execute the UDF, we will return an error message to suggest the user to add the missing classes using the `addArtifact` method.

### Why are the changes needed?
To enhance the user experience of UDF. This PR should be merged to master and 3.5.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added test both for Scala 2.12 & 2.13

4 tests in SparkSessionE2ESuite still fail to run with maven after the fix because the client test jar is installed on the system classpath (added using --jar at server start), the stub classloader can only stub classes missing from the session classpath (added using `session.addArtifact`).

Moving the test jar to the session classpath causes failures in tests for `flatMapGroupsWithState` (SPARK-44576). Finish moving the test jar to session classpath once `flatMapGroupsWithState` test failures are fixed.

Closes #42069 from zhenlineo/ref-spark-result.

Authored-by: Zhen Li <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit 6d0fed9)
Signed-off-by: Herman van Hovell <[email protected]>
Comment on lines +2554 to +2559
.doc("""
|Comma-separated list of binary names of classes/packages that should be stubbed during
|the Scala UDF serde and execution if not found on the server classpath.
|An empty list effectively disables stubbing for all missing classes.
|By default, the server stubs classes from the Scala client package.
|""".stripMargin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So by default we will be stubbing if some Spark Connect client code is pulled into the UDF, but not if the serialization pulls some other class, unrelated to the client and not needed by the UDF, but just referenced in the contained class in a way that will make it pulled in?
In that case the user would also get an error about ClassNotFound?
Do we in that case want the user to add that using an addArtifact, even though it might be unclear to the user why is that relevant to the UDF?
What are the disadvantages of just stubbing everything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stub class loader currently would be used for all withSession calls in drivers, and all task runs in executors.
Perhaps we should move the stubbing only used for UDF class loading in drivers + more aggressive default e.g. "org, com".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rubber duck questions :-):
What are the risks of being more aggressive and stubbing everything?
Why the risks are smaller if you were to do it only on the driver?
Would it even work without doing it on executors? Executors execute this, so they need to have the stubs to not run into ClassNotFound?

In the description you write

Java serializer might include unnecessary user code e.g. User classes used in the lambda definition signatures in the same class where the UDF is defined.

but with it defaulting to connect client classes only, it will actually not help for "User classes"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include @hvanhovell as he suggested to not stubbing for user classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally expect user classes to be present on the classpath, if they are not the user needs to something anyway. The internal classes are a bit special because they can be captured by accident, so there stubbing makes more sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK about that... don't you think that if the Spark Connect is used inside a real life bigger application, there may be many user classes that are not related to anything that the user wishes to execute on Spark cluster, but just various user application business logic that can get captured by accident just as well?

Comment on lines +1521 to +1525
case e: IOException if e.getCause.isInstanceOf[NoSuchMethodException] =>
throw new ClassNotFoundException(
s"Failed to load class correctly due to ${e.getCause}. " +
"Make sure the artifact where the class is defined is installed by calling" +
" session.addArtifact.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the description you write

If the user code is actually needed to execute the UDF, we will return an error message to suggest the user to add the missing classes using the addArtifact method.

but since this triggers during deserialization, wouldn't this trigger also for a class that is not actually used, just accidentally pulled in, and not captured by the CONNECT_SCALA_UDF_STUB_CLASSES config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this trigger also for a class that is not actually used, just accidentally pulled in, and not captured by the CONNECT_SCALA_UDF_STUB_CLASSES config

This code you highlighted would not catch this class. Because your described case would fail with a NoClassFoundException rather than a NoSuchMethodException.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, smart, that's why you catch NoSuchMethodException, because that would suggest actual use, and for NoSuchClassException generate a stub, now I finally understand from your other comment with explanation.
That could be worth a rubber ducky comment here saying that "while NoSuchClassException may be caused by an unused class accidentally pulled by the serializer, NoSuchMethodException suggests actual use of the class".

And @hvanhovell comment about throwing from default constructor is to cover the case where someone just calls the default constructor, but doesn't use any methods?
Also worth a rubber ducky comment :-)

new StubClassLoader(parent, name => binaryName.exists(p => name.startsWith(p)))
}

def generateStub(binaryName: String): Array[Byte] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the description you write

If the user code is actually needed to execute the UDF, we will return an error message to suggest the user to add the missing classes using the addArtifact method.

If I understand correctly, this generated stub should be throwing that error if it actually gets called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When user actually uses a class, it normally would be val clazz = new Clazz(); clazz.callMethod, when this happens, it fails earlier at compile to find the method before we come here (throw the error from constructor during runtime).

Throwing an error from constructor would only help if the user calls val clazz = new Class(). And did not use the class afterwords.

If you ask why not sub methods that the user would call and throw the error there? The reason is because it is too hard :) We need to scan the UDF contents. The NoSuchMethodException in SparkConnectPlanner is good enough to throw the error for us.

HyukjinKwon pushed a commit that referenced this pull request Jul 30, 2023
### What changes were proposed in this pull request?
Made the stub constructor to throw ClassNotFoundException if called.
A tiny improvement to not recreate class loaders in executor if stubbing is not enabled.

### Why are the changes needed?
Enhancement to #42069
Should be merged to 3.5.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit tests.

Closes #42222 from zhenlineo/error-from-constuctor.

Authored-by: Zhen Li <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Jul 30, 2023
### What changes were proposed in this pull request?
Made the stub constructor to throw ClassNotFoundException if called.
A tiny improvement to not recreate class loaders in executor if stubbing is not enabled.

### Why are the changes needed?
Enhancement to #42069
Should be merged to 3.5.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit tests.

Closes #42222 from zhenlineo/error-from-constuctor.

Authored-by: Zhen Li <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 5df1d79)
Signed-off-by: Hyukjin Kwon <[email protected]>
LuciferYang added a commit that referenced this pull request Sep 26, 2023
…ry files

### What changes were proposed in this pull request?
The purpose of this pr is to clean up the binary files used to assist with Scala 2.12 testing.

They include:
- `core/src/test/resources/TestHelloV3_2.12.jar` and `core/src/test/resources/TestHelloV2_2.12.jar` added by SPARK-44246(#41789).
- `connector/connect/client/jvm/src/test/resources/udf2.12` and `connector/connect/client/jvm/src/test/resources/udf2.12.jar` added by SPARK-43744(#42069)
- `connector/connect/client/jvm/src/test/resources/TestHelloV2_2.12.jar` added by SPARK-44293(#41844)
- `sql/hive/src/test/resources/regression-test-SPARK-8489/test-2.12.jar` added by SPARK-25304(#22308)

### Why are the changes needed?
Spark 4.0 no longer supports Scala 2.12.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43106 from LuciferYang/SPARK-45321.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
@tenstriker
Copy link

I think this PR is affecting external users as well. We start spark-connect server with external jars and hitting similar ClassCastException error due to classloading issue: https://issues.apache.org/jira/browse/SPARK-46762

grundprinzip pushed a commit that referenced this pull request Jul 12, 2024
### What changes were proposed in this pull request?

This jar was added in #42069 but moved in #43735.

### Why are the changes needed?

To clean up a jar not used.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests should check

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47315 from HyukjinKwon/minor-cleanup-jar-2.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Martin Grund <[email protected]>
jingz-db pushed a commit to jingz-db/spark that referenced this pull request Jul 22, 2024
### What changes were proposed in this pull request?

This jar was added in apache#42069 but moved in apache#43735.

### Why are the changes needed?

To clean up a jar not used.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests should check

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47315 from HyukjinKwon/minor-cleanup-jar-2.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Martin Grund <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants