diff --git a/.github/labeler.yml b/.github/labeler.yml
index c5325949889b..a74b4ab5d5a3 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -225,6 +225,7 @@ DEPLOY:
CONNECT:
- changed-files:
- any-glob-to-any-file: [
+ 'connect/**/*',
'connector/connect/**/*',
'python/pyspark/sql/**/connect/**/*',
'python/pyspark/ml/**/connect/**/*'
diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml
index 4f092d8c0478..95cec52a262f 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -606,7 +606,7 @@ jobs:
- name: Breaking change detection against branch-3.5
uses: bufbuild/buf-breaking-action@v1
with:
- input: connector/connect/common/src/main
+ input: connect/common/src/main
against: 'https://github.com/apache/spark.git#branch=branch-3.5,subdir=connector/connect/common/src/main'
- name: Install Python 3.9
uses: actions/setup-python@v5
diff --git a/.github/workflows/build_python_connect.yml b/.github/workflows/build_python_connect.yml
index 01d9d272d436..8458cdf771b1 100644
--- a/.github/workflows/build_python_connect.yml
+++ b/.github/workflows/build_python_connect.yml
@@ -84,7 +84,7 @@ jobs:
# Start a Spark Connect server for local
PYTHONPATH="python/lib/pyspark.zip:python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH" ./sbin/start-connect-server.sh \
--driver-java-options "-Dlog4j.configurationFile=file:$GITHUB_WORKSPACE/conf/log4j2.properties" \
- --jars "`find connector/connect/server/target -name spark-connect-*SNAPSHOT.jar`,`find connector/protobuf/target -name spark-protobuf-*SNAPSHOT.jar`,`find connector/avro/target -name spark-avro*SNAPSHOT.jar`"
+ --jars "`find connect/server/target -name spark-connect-*SNAPSHOT.jar`,`find connector/protobuf/target -name spark-protobuf-*SNAPSHOT.jar`,`find connector/avro/target -name spark-avro*SNAPSHOT.jar`"
# Remove Py4J and PySpark zipped library to make sure there is no JVM connection
mv python/lib lib.back
@@ -104,7 +104,7 @@ jobs:
PYTHONPATH="python/lib/pyspark.zip:python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH" ./sbin/start-connect-server.sh \
--master "local-cluster[2, 4, 1024]" \
--driver-java-options "-Dlog4j.configurationFile=file:$GITHUB_WORKSPACE/conf/log4j2.properties" \
- --jars "`find connector/connect/server/target -name spark-connect-*SNAPSHOT.jar`,`find connector/protobuf/target -name spark-protobuf-*SNAPSHOT.jar`,`find connector/avro/target -name spark-avro*SNAPSHOT.jar`"
+ --jars "`find connect/server/target -name spark-connect-*SNAPSHOT.jar`,`find connector/protobuf/target -name spark-protobuf-*SNAPSHOT.jar`,`find connector/avro/target -name spark-avro*SNAPSHOT.jar`"
# Remove Py4J and PySpark zipped library to make sure there is no JVM connection
mv python/lib lib.back
diff --git a/.github/workflows/build_python_connect35.yml b/.github/workflows/build_python_connect35.yml
index abff471349a2..b00fdddb4b0e 100644
--- a/.github/workflows/build_python_connect35.yml
+++ b/.github/workflows/build_python_connect35.yml
@@ -87,7 +87,7 @@ jobs:
# Start a Spark Connect server for local
PYTHONPATH="python/lib/pyspark.zip:python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH" ./sbin/start-connect-server.sh \
--driver-java-options "-Dlog4j.configurationFile=file:$GITHUB_WORKSPACE/conf/log4j2.properties" \
- --jars "`find connector/connect/server/target -name spark-connect-*SNAPSHOT.jar`,`find connector/protobuf/target -name spark-protobuf-*SNAPSHOT.jar`,`find connector/avro/target -name spark-avro*SNAPSHOT.jar`"
+ --jars "`find connect/server/target -name spark-connect-*SNAPSHOT.jar`,`find connector/protobuf/target -name spark-protobuf-*SNAPSHOT.jar`,`find connector/avro/target -name spark-avro*SNAPSHOT.jar`"
# Checkout to branch-3.5 to use the tests in branch-3.5.
cd ..
diff --git a/.github/workflows/maven_test.yml b/.github/workflows/maven_test.yml
index d23cea926a27..fa30bd3abc8a 100644
--- a/.github/workflows/maven_test.yml
+++ b/.github/workflows/maven_test.yml
@@ -194,7 +194,7 @@ jobs:
if [[ "$INCLUDED_TAGS" != "" ]]; then
./build/mvn $MAVEN_CLI_OPTS -pl "$TEST_MODULES" -Pyarn -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Pjvm-profiler -Pspark-ganglia-lgpl -Pkinesis-asl -Djava.version=${JAVA_VERSION/-ea} -Dtest.include.tags="$INCLUDED_TAGS" test -fae
elif [[ "$MODULES_TO_TEST" == "connect" ]]; then
- ./build/mvn $MAVEN_CLI_OPTS -Dtest.exclude.tags="$EXCLUDED_TAGS" -Djava.version=${JAVA_VERSION/-ea} -pl connector/connect/client/jvm,connector/connect/common,connector/connect/server test -fae
+ ./build/mvn $MAVEN_CLI_OPTS -Dtest.exclude.tags="$EXCLUDED_TAGS" -Djava.version=${JAVA_VERSION/-ea} -pl connector/connect/client/jvm,connect/common,connect/server test -fae
elif [[ "$EXCLUDED_TAGS" != "" ]]; then
./build/mvn $MAVEN_CLI_OPTS -pl "$TEST_MODULES" -Pyarn -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Pjvm-profiler -Pspark-ganglia-lgpl -Pkinesis-asl -Djava.version=${JAVA_VERSION/-ea} -Dtest.exclude.tags="$EXCLUDED_TAGS" test -fae
elif [[ "$MODULES_TO_TEST" == *"sql#hive-thriftserver"* ]]; then
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 58e7ae5bb0c7..9377849cf1cd 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -74,6 +74,41 @@
spark-repl_${scala.binary.version}
${project.version}
+
+ org.apache.spark
+ spark-connect_${scala.binary.version}
+ ${project.version}
+
+
+ org.apache.spark
+ spark-connect-common_${scala.binary.version}
+
+
+ io.grpc
+ *
+
+
+ com.google.code.gson
+ gson
+
+
+ com.google.guava
+ failureaccess
+
+
+
+
+ org.apache.spark
+ spark-avro_${scala.binary.version}
+ ${project.version}
+ provided
+
+
+ org.apache.spark
+ spark-protobuf_${scala.binary.version}
+ ${project.version}
+ provided
+
diff --git a/python/docs/source/development/testing.rst b/python/docs/source/development/testing.rst
index 2dd389e34a44..c2737371c9b4 100644
--- a/python/docs/source/development/testing.rst
+++ b/python/docs/source/development/testing.rst
@@ -69,21 +69,16 @@ Running Tests for Python Client
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In order to test the changes in Protobuf definitions, for example, at
-`spark/connector/connect/common/src/main/protobuf/spark/connect `_,
+`spark/connect/common/src/main/protobuf/spark/connect `_,
you should regenerate Python Protobuf client first by running ``dev/connect-gen-protos.sh``.
Running PySpark Shell with Python Client
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-For Apache Spark you locally built:
+The command below starts Spark Connect server automatically locally, and creates a Spark Connect client connected to the server.
.. code-block:: bash
bin/pyspark --remote "local[*]"
-For the Apache Spark release:
-
-.. code-block:: bash
-
- bin/pyspark --remote "local[*]" --packages org.apache.spark:spark-connect_2.13:$SPARK_VERSION
diff --git a/python/docs/source/getting_started/quickstart_connect.ipynb b/python/docs/source/getting_started/quickstart_connect.ipynb
index 1d994bde907f..74b77238c67f 100644
--- a/python/docs/source/getting_started/quickstart_connect.ipynb
+++ b/python/docs/source/getting_started/quickstart_connect.ipynb
@@ -28,7 +28,7 @@
"metadata": {},
"outputs": [],
"source": [
- "!$HOME/sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.13:$SPARK_VERSION"
+ "!$HOME/sbin/start-connect-server.sh"
]
},
{
diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py
index 1c40206471a5..b5e76982b3fd 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -51,7 +51,6 @@
from pyspark.sql.connect.dataframe import DataFrame
from pyspark.sql.dataframe import DataFrame as ParentDataFrame
-from pyspark.loose_version import LooseVersion
from pyspark.sql.connect.client import SparkConnectClient, DefaultChannelBuilder
from pyspark.sql.connect.conf import RuntimeConf
from pyspark.sql.connect.plan import (
@@ -984,39 +983,13 @@ def _start_connect_server(master: str, opts: Dict[str, Any]) -> None:
"""
Starts the Spark Connect server given the master (thread-unsafe).
- At the high level, there are two cases. The first case is development case, e.g.,
- you locally build Apache Spark, and run ``SparkSession.builder.remote("local")``:
-
- 1. This method automatically finds the jars for Spark Connect (because the jars for
- Spark Connect are not bundled in the regular Apache Spark release).
-
- 2. Temporarily remove all states for Spark Connect, for example, ``SPARK_REMOTE``
+ 1. Temporarily remove all states for Spark Connect, for example, ``SPARK_REMOTE``
environment variable.
- 3. Starts a JVM (without Spark Context) first, and adds the Spark Connect server jars
- into the current class loader. Otherwise, Spark Context with ``spark.plugins``
- cannot be initialized because the JVM is already running without the jars in
- the classpath before executing this Python process for driver side (in case of
- PySpark application submission).
-
- 4. Starts a regular Spark session that automatically starts a Spark Connect server
+ 2. Starts a regular Spark session that automatically starts a Spark Connect server
via ``spark.plugins`` feature.
-
- The second case is when you use Apache Spark release:
-
- 1. Users must specify either the jars or package, e.g., ``--packages
- org.apache.spark:spark-connect_2.12:3.4.0``. The jars or packages would be specified
- in SparkSubmit automatically. This method does not do anything related to this.
-
- 2. Temporarily remove all states for Spark Connect, for example, ``SPARK_REMOTE``
- environment variable. It does not do anything for PySpark application submission as
- well because jars or packages were already specified before executing this Python
- process for driver side.
-
- 3. Starts a regular Spark session that automatically starts a Spark Connect server
- with JVM via ``spark.plugins`` feature.
"""
- from pyspark import SparkContext, SparkConf, __version__
+ from pyspark import SparkContext, SparkConf
session = PySparkSession._instantiatedSession
if session is None or session._sc._jsc is None:
@@ -1033,21 +1006,6 @@ def _start_connect_server(master: str, opts: Dict[str, Any]) -> None:
# See also SPARK-42272.
overwrite_conf["spark.connect.grpc.binding.port"] = "0"
- def create_conf(**kwargs: Any) -> SparkConf:
- conf = SparkConf(**kwargs)
- for k, v in overwrite_conf.items():
- conf.set(k, v)
- for k, v in default_conf.items():
- if not conf.contains(k):
- conf.set(k, v)
- return conf
-
- # Check if we're using unreleased version that is in development.
- # Also checks SPARK_TESTING for RC versions.
- is_dev_mode = (
- "dev" in LooseVersion(__version__).version or "SPARK_TESTING" in os.environ
- )
-
origin_remote = os.environ.get("SPARK_REMOTE", None)
try:
if origin_remote is not None:
@@ -1055,49 +1013,11 @@ def create_conf(**kwargs: Any) -> SparkConf:
# start the regular PySpark session.
del os.environ["SPARK_REMOTE"]
- SparkContext._ensure_initialized(conf=create_conf(loadDefaults=False))
-
- if is_dev_mode:
- # Try and catch for a possibility in production because pyspark.testing
- # does not exist in the canonical release.
- try:
- from pyspark.testing.utils import search_jar
-
- # Note that, in production, spark.jars.packages configuration should be
- # set by users. Here we're automatically searching the jars locally built.
- connect_jar = search_jar(
- "connector/connect/server", "spark-connect-assembly-", "spark-connect"
- )
- if connect_jar is None:
- warnings.warn(
- "Attempted to automatically find the Spark Connect jars because "
- "'SPARK_TESTING' environment variable is set, or the current "
- f"PySpark version is dev version ({__version__}). However, the jar"
- " was not found. Manually locate the jars and specify them, e.g., "
- "'spark.jars' configuration."
- )
- else:
- pyutils = SparkContext._jvm.PythonSQLUtils # type: ignore[union-attr]
- pyutils.addJarToCurrentClassLoader(connect_jar)
-
- # Required for local-cluster testing as their executors need the jars
- # to load the Spark plugin for Spark Connect.
- if master.startswith("local-cluster"):
- if "spark.jars" in overwrite_conf:
- overwrite_conf[
- "spark.jars"
- ] = f"{overwrite_conf['spark.jars']},{connect_jar}"
- else:
- overwrite_conf["spark.jars"] = connect_jar
-
- except ImportError:
- pass
-
# The regular PySpark session is registered as an active session
# so would not be garbage-collected.
- PySparkSession(
- SparkContext.getOrCreate(create_conf(loadDefaults=True, _jvm=SparkContext._jvm))
- )
+ conf = SparkConf(loadDefaults=True)
+ conf.setAll(list(overwrite_conf.items())).setAll(list(default_conf.items()))
+ PySparkSession(SparkContext.getOrCreate(conf))
# Lastly only keep runtime configurations because other configurations are
# disallowed to set in the regular Spark Connect session.
diff --git a/repl/src/main/scala/org/apache/spark/repl/Main.scala b/repl/src/main/scala/org/apache/spark/repl/Main.scala
index 22b7a8e2a733..7b126c357271 100644
--- a/repl/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/Main.scala
@@ -126,11 +126,6 @@ object Main extends Logging {
sparkContext = sparkSession.sparkContext
sparkSession
} catch {
- case e: ClassNotFoundException if isShellSession && e.getMessage.contains(
- "org.apache.spark.sql.connect.SparkConnectPlugin") =>
- logError("Failed to load spark connect plugin.")
- logError("You need to build Spark with -Pconnect.")
- sys.exit(1)
case e: Exception if isShellSession =>
logError("Failed to initialize Spark session.", e)
sys.exit(1)