diff --git a/connector/connect/client/jvm/pom.xml b/connector/connect/client/jvm/pom.xml
index e117a0a7451cb..2fdb2d4bafe01 100644
--- a/connector/connect/client/jvm/pom.xml
+++ b/connector/connect/client/jvm/pom.xml
@@ -45,6 +45,11 @@
spark-sql-api_${scala.binary.version}
${project.version}
+
+ org.apache.spark
+ spark-connect-shims_${scala.binary.version}
+ ${project.version}
+
org.apache.spark
spark-sketch_${scala.binary.version}
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 60bacd4e18ede..051d382c49773 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -22,7 +22,9 @@ import java.util.Properties
import scala.jdk.CollectionConverters._
import org.apache.spark.annotation.Stable
+import org.apache.spark.api.java.JavaRDD
import org.apache.spark.connect.proto.Parse.ParseFormat
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.connect.ConnectConversions._
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import org.apache.spark.sql.types.StructType
@@ -140,6 +142,14 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends api.Data
def json(jsonDataset: Dataset[String]): DataFrame =
parse(jsonDataset, ParseFormat.PARSE_FORMAT_JSON)
+ /** @inheritdoc */
+ override def json(jsonRDD: JavaRDD[String]): Dataset[Row] =
+ throwRddNotSupportedException()
+
+ /** @inheritdoc */
+ override def json(jsonRDD: RDD[String]): Dataset[Row] =
+ throwRddNotSupportedException()
+
/** @inheritdoc */
override def csv(path: String): DataFrame = super.csv(path)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index a368da2aaee60..966b5acebca23 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -26,8 +26,10 @@ import scala.util.control.NonFatal
import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function._
import org.apache.spark.connect.proto
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
@@ -1463,4 +1465,10 @@ class Dataset[T] private[sql] (
func: MapFunction[T, K],
encoder: Encoder[K]): KeyValueGroupedDataset[K, T] =
super.groupByKey(func, encoder).asInstanceOf[KeyValueGroupedDataset[K, T]]
+
+ /** @inheritdoc */
+ override def rdd: RDD[T] = throwRddNotSupportedException()
+
+ /** @inheritdoc */
+ override def toJavaRDD: JavaRDD[T] = throwRddNotSupportedException()
}
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 222b5ea79508e..ad10a22f833bf 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -29,10 +29,13 @@ import com.google.common.cache.{CacheBuilder, CacheLoader}
import io.grpc.ClientInterceptor
import org.apache.arrow.memory.RootAllocator
+import org.apache.spark.SparkContext
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
+import org.apache.spark.api.java.JavaRDD
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalog.Catalog
import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection}
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder}
@@ -84,10 +87,14 @@ class SparkSession private[sql] (
private[sql] val observationRegistry = new ConcurrentHashMap[Long, Observation]()
- private[sql] def hijackServerSideSessionIdForTesting(suffix: String) = {
+ private[sql] def hijackServerSideSessionIdForTesting(suffix: String): Unit = {
client.hijackServerSideSessionIdForTesting(suffix)
}
+ /** @inheritdoc */
+ override def sparkContext: SparkContext =
+ throw new UnsupportedOperationException("sparkContext is not supported in Spark Connect.")
+
/** @inheritdoc */
val conf: RuntimeConfig = new ConnectRuntimeConfig(client)
@@ -144,6 +151,30 @@ class SparkSession private[sql] (
createDataset(data.asScala.toSeq)
}
+ /** @inheritdoc */
+ override def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]): DataFrame =
+ throwRddNotSupportedException()
+
+ /** @inheritdoc */
+ override def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame =
+ throwRddNotSupportedException()
+
+ /** @inheritdoc */
+ override def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame =
+ throwRddNotSupportedException()
+
+ /** @inheritdoc */
+ override def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame =
+ throwRddNotSupportedException()
+
+ /** @inheritdoc */
+ override def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame =
+ throwRddNotSupportedException()
+
+ /** @inheritdoc */
+ override def createDataset[T: Encoder](data: RDD[T]): Dataset[T] =
+ throwRddNotSupportedException()
+
/** @inheritdoc */
@Experimental
def sql(sqlText: String, args: Array[_]): DataFrame = newDataFrame { builder =>
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
index ada94b76fcbcd..5c61b9371f37c 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
@@ -19,4 +19,7 @@ package org.apache.spark
package object sql {
type DataFrame = Dataset[Row]
+
+ private[sql] def throwRddNotSupportedException(): Nothing =
+ throw new UnsupportedOperationException("RDDs are not supported in Spark Connect.")
}
diff --git a/pom.xml b/pom.xml
index 31046e5a85f82..d3b2154d496bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,7 @@
common/utils
common/variant
common/tags
+ sql/connect/shims
core
graphx
mllib
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 6137984a53c0a..5882fcbf336b0 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -45,24 +45,24 @@ object BuildCommons {
private val buildLocation = file(".").getAbsoluteFile.getParentFile
- val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer, tokenProviderKafka010, sqlKafka010, avro, protobuf) = Seq(
- "catalyst", "sql", "hive", "hive-thriftserver", "token-provider-kafka-0-10", "sql-kafka-0-10", "avro", "protobuf"
- ).map(ProjectRef(buildLocation, _))
+ val sqlProjects@Seq(sqlApi, catalyst, sql, hive, hiveThriftServer, tokenProviderKafka010, sqlKafka010, avro, protobuf) =
+ Seq("sql-api", "catalyst", "sql", "hive", "hive-thriftserver", "token-provider-kafka-0-10",
+ "sql-kafka-0-10", "avro", "protobuf").map(ProjectRef(buildLocation, _))
val streamingProjects@Seq(streaming, streamingKafka010) =
Seq("streaming", "streaming-kafka-0-10").map(ProjectRef(buildLocation, _))
- val connectCommon = ProjectRef(buildLocation, "connect-common")
- val connect = ProjectRef(buildLocation, "connect")
- val connectClient = ProjectRef(buildLocation, "connect-client-jvm")
+ val connectProjects@Seq(connectCommon, connect, connectClient, connectShims) =
+ Seq("connect-common", "connect", "connect-client-jvm", "connect-shims")
+ .map(ProjectRef(buildLocation, _))
val allProjects@Seq(
core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, kvstore,
- commonUtils, sqlApi, variant, _*
+ commonUtils, variant, _*
) = Seq(
"core", "graphx", "mllib", "mllib-local", "repl", "network-common", "network-shuffle", "launcher", "unsafe",
- "tags", "sketch", "kvstore", "common-utils", "sql-api", "variant"
- ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ Seq(connectCommon, connect, connectClient)
+ "tags", "sketch", "kvstore", "common-utils", "variant"
+ ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ connectProjects
val optionallyEnabledProjects@Seq(kubernetes, yarn,
sparkGangliaLgpl, streamingKinesisAsl,
@@ -360,7 +360,7 @@ object SparkBuild extends PomBuild {
/* Enable shared settings on all projects */
(allProjects ++ optionallyEnabledProjects ++ assemblyProjects ++ copyJarsProjects ++ Seq(spark, tools))
.foreach(enable(sharedSettings ++ DependencyOverrides.settings ++
- ExcludedDependencies.settings ++ Checkstyle.settings))
+ ExcludedDependencies.settings ++ Checkstyle.settings ++ ExcludeShims.settings))
/* Enable tests settings for all projects except examples, assembly and tools */
(allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings))
@@ -369,7 +369,7 @@ object SparkBuild extends PomBuild {
Seq(
spark, hive, hiveThriftServer, repl, networkCommon, networkShuffle, networkYarn,
unsafe, tags, tokenProviderKafka010, sqlKafka010, connectCommon, connect, connectClient,
- variant
+ variant, connectShims
).contains(x)
}
@@ -1087,6 +1087,36 @@ object ExcludedDependencies {
)
}
+/**
+ * This excludes the spark-connect-shims module from a module when it is not part of the connect
+ * client dependencies.
+ */
+object ExcludeShims {
+ val shimmedProjects = Set("spark-sql-api", "spark-connect-common", "spark-connect-client-jvm")
+ val classPathFilter = TaskKey[Classpath => Classpath]("filter for classpath")
+ lazy val settings = Seq(
+ classPathFilter := {
+ if (!shimmedProjects(moduleName.value)) {
+ cp => cp.filterNot(_.data.name.contains("spark-connect-shims"))
+ } else {
+ identity _
+ }
+ },
+ Compile / internalDependencyClasspath :=
+ classPathFilter.value((Compile / internalDependencyClasspath).value),
+ Compile / internalDependencyAsJars :=
+ classPathFilter.value((Compile / internalDependencyAsJars).value),
+ Runtime / internalDependencyClasspath :=
+ classPathFilter.value((Runtime / internalDependencyClasspath).value),
+ Runtime / internalDependencyAsJars :=
+ classPathFilter.value((Runtime / internalDependencyAsJars).value),
+ Test / internalDependencyClasspath :=
+ classPathFilter.value((Test / internalDependencyClasspath).value),
+ Test / internalDependencyAsJars :=
+ classPathFilter.value((Test / internalDependencyAsJars).value),
+ )
+}
+
/**
* Project to pull previous artifacts of Spark for generating Mima excludes.
*/
@@ -1456,10 +1486,12 @@ object SparkUnidoc extends SharedUnidocSettings {
lazy val settings = baseSettings ++ Seq(
(ScalaUnidoc / unidoc / unidocProjectFilter) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes,
- yarn, tags, streamingKafka010, sqlKafka010, connectCommon, connect, connectClient, protobuf),
+ yarn, tags, streamingKafka010, sqlKafka010, connectCommon, connect, connectClient,
+ connectShims, protobuf),
(JavaUnidoc / unidoc / unidocProjectFilter) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes,
- yarn, tags, streamingKafka010, sqlKafka010, connectCommon, connect, connectClient, protobuf),
+ yarn, tags, streamingKafka010, sqlKafka010, connectCommon, connect, connectClient,
+ connectShims, protobuf),
)
}
diff --git a/sql/api/pom.xml b/sql/api/pom.xml
index 54cdc96fc40a2..9c50a2567c5fe 100644
--- a/sql/api/pom.xml
+++ b/sql/api/pom.xml
@@ -58,6 +58,12 @@
spark-sketch_${scala.binary.version}
${project.version}
+
+ org.apache.spark
+ spark-connect-shims_${scala.binary.version}
+ ${project.version}
+ compile
+
org.json4s
json4s-jackson_${scala.binary.version}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala
index c101c52fd0662..8c88387714228 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala
@@ -21,6 +21,8 @@ import scala.jdk.CollectionConverters._
import _root_.java.util
import org.apache.spark.annotation.Stable
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, SparkCharVarcharUtils}
@@ -309,6 +311,38 @@ abstract class DataFrameReader {
*/
def json(jsonDataset: DS[String]): Dataset[Row]
+ /**
+ * Loads a `JavaRDD[String]` storing JSON objects (JSON Lines
+ * text format or newline-delimited JSON) and returns the result as a `DataFrame`.
+ *
+ * Unless the schema is specified using `schema` function, this function goes through the input
+ * once to determine the input schema.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @param jsonRDD
+ * input RDD with one JSON object per record
+ * @since 1.4.0
+ */
+ @deprecated("Use json(Dataset[String]) instead.", "2.2.0")
+ def json(jsonRDD: JavaRDD[String]): DS[Row]
+
+ /**
+ * Loads an `RDD[String]` storing JSON objects (JSON Lines text
+ * format or newline-delimited JSON) and returns the result as a `DataFrame`.
+ *
+ * Unless the schema is specified using `schema` function, this function goes through the input
+ * once to determine the input schema.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @param jsonRDD
+ * input RDD with one JSON object per record
+ * @since 1.4.0
+ */
+ @deprecated("Use json(Dataset[String]) instead.", "2.2.0")
+ def json(jsonRDD: RDD[String]): DS[Row]
+
/**
* Loads a CSV file and returns the result as a `DataFrame`. See the documentation on the other
* overloaded `csv()` method for more details.
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
index 06a6148a7c188..c277b4cab85c1 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
@@ -22,7 +22,9 @@ import scala.reflect.runtime.universe.TypeTag
import _root_.java.util
import org.apache.spark.annotation.{DeveloperApi, Stable}
+import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function.{FilterFunction, FlatMapFunction, ForeachFunction, ForeachPartitionFunction, MapFunction, MapPartitionsFunction, ReduceFunction}
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{functions, AnalysisException, Column, DataFrameWriter, DataFrameWriterV2, Encoder, MergeIntoWriter, Observation, Row, TypedColumn}
import org.apache.spark.sql.internal.{ToScalaUDF, UDFAdaptors}
import org.apache.spark.sql.types.{Metadata, StructType}
@@ -3098,4 +3100,34 @@ abstract class Dataset[T] extends Serializable {
* @since 1.6.0
*/
def write: DataFrameWriter[T]
+
+ /**
+ * Represents the content of the Dataset as an `RDD` of `T`.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @group basic
+ * @since 1.6.0
+ */
+ def rdd: RDD[T]
+
+ /**
+ * Returns the content of the Dataset as a `JavaRDD` of `T`s.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @group basic
+ * @since 1.6.0
+ */
+ def toJavaRDD: JavaRDD[T]
+
+ /**
+ * Returns the content of the Dataset as a `JavaRDD` of `T`s.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @group basic
+ * @since 1.6.0
+ */
+ def javaRDD: JavaRDD[T] = toJavaRDD
}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SQLImplicits.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/SQLImplicits.scala
index f6b44e168390a..5e022570d3ca7 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/SQLImplicits.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SQLImplicits.scala
@@ -23,6 +23,7 @@ import scala.reflect.runtime.universe.TypeTag
import _root_.java
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ColumnName, DatasetHolder, Encoder, Encoders}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
@@ -278,6 +279,14 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits with Serializable {
new DatasetHolder(session.createDataset(s).asInstanceOf[DS[T]])
}
+ /**
+ * Creates a [[Dataset]] from an RDD.
+ *
+ * @since 1.6.0
+ */
+ implicit def rddToDatasetHolder[T: Encoder](rdd: RDD[T]): DatasetHolder[T, DS] =
+ new DatasetHolder(session.createDataset(rdd).asInstanceOf[DS[T]])
+
/**
* An implicit conversion that turns a Scala `Symbol` into a [[org.apache.spark.sql.Column]].
* @since 1.3.0
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
index 4dfeb87a11d92..b2e61df5937bd 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
@@ -25,7 +25,10 @@ import _root_.java.lang
import _root_.java.net.URI
import _root_.java.util
+import org.apache.spark.SparkContext
import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, Unstable}
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Encoder, Row, RuntimeConfig}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SparkClassUtils
@@ -52,6 +55,14 @@ import org.apache.spark.util.SparkClassUtils
*/
abstract class SparkSession extends Serializable with Closeable {
+ /**
+ * The Spark context associated with this Spark session.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ */
+ def sparkContext: SparkContext
+
/**
* The version of Spark on which this application is running.
*
@@ -155,6 +166,85 @@ abstract class SparkSession extends Serializable with Closeable {
*/
def createDataFrame(data: util.List[_], beanClass: Class[_]): Dataset[Row]
+ /**
+ * Creates a `DataFrame` from an RDD of Product (e.g. case classes, tuples).
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @since 2.0.0
+ */
+ def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]): Dataset[Row]
+
+ /**
+ * :: DeveloperApi :: Creates a `DataFrame` from an `RDD` containing
+ * [[org.apache.spark.sql.Row]]s using the given schema. It is important to make sure that the
+ * structure of every [[org.apache.spark.sql.Row]] of the provided RDD matches the provided
+ * schema. Otherwise, there will be runtime exception. Example:
+ * {{{
+ * import org.apache.spark.sql._
+ * import org.apache.spark.sql.types._
+ * val sparkSession = new org.apache.spark.sql.SparkSession(sc)
+ *
+ * val schema =
+ * StructType(
+ * StructField("name", StringType, false) ::
+ * StructField("age", IntegerType, true) :: Nil)
+ *
+ * val people =
+ * sc.textFile("examples/src/main/resources/people.txt").map(
+ * _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
+ * val dataFrame = sparkSession.createDataFrame(people, schema)
+ * dataFrame.printSchema
+ * // root
+ * // |-- name: string (nullable = false)
+ * // |-- age: integer (nullable = true)
+ *
+ * dataFrame.createOrReplaceTempView("people")
+ * sparkSession.sql("select name from people").collect.foreach(println)
+ * }}}
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @since 2.0.0
+ */
+ @DeveloperApi
+ def createDataFrame(rowRDD: RDD[Row], schema: StructType): Dataset[Row]
+
+ /**
+ * :: DeveloperApi :: Creates a `DataFrame` from a `JavaRDD` containing
+ * [[org.apache.spark.sql.Row]]s using the given schema. It is important to make sure that the
+ * structure of every [[org.apache.spark.sql.Row]] of the provided RDD matches the provided
+ * schema. Otherwise, there will be runtime exception.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @since 2.0.0
+ */
+ @DeveloperApi
+ def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): Dataset[Row]
+
+ /**
+ * Applies a schema to an RDD of Java Beans.
+ *
+ * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, SELECT * queries
+ * will return the columns in an undefined order.
+ *
+ * @since 2.0.0
+ */
+ def createDataFrame(rdd: RDD[_], beanClass: Class[_]): Dataset[Row]
+
+ /**
+ * Applies a schema to an RDD of Java Beans.
+ *
+ * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, SELECT * queries
+ * will return the columns in an undefined order.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @since 2.0.0
+ */
+ def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): Dataset[Row]
+
/* ------------------------------- *
| Methods for creating DataSets |
* ------------------------------- */
@@ -212,6 +302,18 @@ abstract class SparkSession extends Serializable with Closeable {
*/
def createDataset[T: Encoder](data: util.List[T]): Dataset[T]
+ /**
+ * Creates a [[Dataset]] from an RDD of a given type. This method requires an encoder (to
+ * convert a JVM object of type `T` to and from the internal Spark SQL representation) that is
+ * generally created automatically through implicits from a `SparkSession`, or can be created
+ * explicitly by calling static methods on `Encoders`.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @since 2.0.0
+ */
+ def createDataset[T: Encoder](data: RDD[T]): Dataset[T]
+
/**
* Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements in a
* range from 0 to `end` (exclusive) with step value 1.
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 7ce4609de51f7..aa1aa5f67a2a9 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -44,6 +44,12 @@
org.apache.spark
spark-sql-api_${scala.binary.version}
${project.version}
+
+
+ org.apache.spark
+ spark-connect-shims_${scala.binary.version}
+
+
org.apache.spark
diff --git a/sql/connect/server/pom.xml b/sql/connect/server/pom.xml
index d0d982934d2c7..f2a7f1b1da9d9 100644
--- a/sql/connect/server/pom.xml
+++ b/sql/connect/server/pom.xml
@@ -52,6 +52,10 @@
spark-connect-common_${scala.binary.version}
${project.version}
+
+ org.apache.spark
+ spark-connect-shims_${scala.binary.version}
+
com.google.guava
guava
diff --git a/sql/connect/shims/README.md b/sql/connect/shims/README.md
new file mode 100644
index 0000000000000..07b593dd04b4b
--- /dev/null
+++ b/sql/connect/shims/README.md
@@ -0,0 +1 @@
+This module defines shims used by the interface defined in sql/api.
diff --git a/sql/connect/shims/pom.xml b/sql/connect/shims/pom.xml
new file mode 100644
index 0000000000000..6bb12a927738c
--- /dev/null
+++ b/sql/connect/shims/pom.xml
@@ -0,0 +1,41 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.spark
+ spark-parent_2.13
+ 4.0.0-SNAPSHOT
+ ../../../pom.xml
+
+
+ spark-connect-shims_2.13
+ jar
+ Spark Project Connect Shims
+ https://spark.apache.org/
+
+ connect-shims
+
+
+
+ target/scala-${scala.binary.version}/classes
+ target/scala-${scala.binary.version}/test-classes
+
+
diff --git a/sql/connect/shims/src/main/scala/org/apache/spark/api/java/shims.scala b/sql/connect/shims/src/main/scala/org/apache/spark/api/java/shims.scala
new file mode 100644
index 0000000000000..45fae00247485
--- /dev/null
+++ b/sql/connect/shims/src/main/scala/org/apache/spark/api/java/shims.scala
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.api.java
+
+class JavaRDD[T]
diff --git a/sql/connect/shims/src/main/scala/org/apache/spark/rdd/shims.scala b/sql/connect/shims/src/main/scala/org/apache/spark/rdd/shims.scala
new file mode 100644
index 0000000000000..b23f83fa9185c
--- /dev/null
+++ b/sql/connect/shims/src/main/scala/org/apache/spark/rdd/shims.scala
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.rdd
+
+class RDD[T]
diff --git a/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala b/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala
new file mode 100644
index 0000000000000..813b8e4859c28
--- /dev/null
+++ b/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark
+
+class SparkContext
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 78cc65bb7a298..ab3e939cee171 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -177,30 +177,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession)
@scala.annotation.varargs
override def json(paths: String*): DataFrame = super.json(paths: _*)
- /**
- * Loads a `JavaRDD[String]` storing JSON objects (JSON
- * Lines text format or newline-delimited JSON) and returns the result as
- * a `DataFrame`.
- *
- * Unless the schema is specified using `schema` function, this function goes through the
- * input once to determine the input schema.
- *
- * @param jsonRDD input RDD with one JSON object per record
- * @since 1.4.0
- */
+ /** @inheritdoc */
@deprecated("Use json(Dataset[String]) instead.", "2.2.0")
def json(jsonRDD: JavaRDD[String]): DataFrame = json(jsonRDD.rdd)
- /**
- * Loads an `RDD[String]` storing JSON objects (JSON Lines
- * text format or newline-delimited JSON) and returns the result as a `DataFrame`.
- *
- * Unless the schema is specified using `schema` function, this function goes through the
- * input once to determine the input schema.
- *
- * @param jsonRDD input RDD with one JSON object per record
- * @since 1.4.0
- */
+ /** @inheritdoc */
@deprecated("Use json(Dataset[String]) instead.", "2.2.0")
def json(jsonRDD: RDD[String]): DataFrame = {
json(sparkSession.createDataset(jsonRDD)(Encoders.STRING))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 58006837a3a6d..1c5df1163eb78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1524,12 +1524,7 @@ class Dataset[T] private[sql](
sparkSession.sessionState.executePlan(deserialized)
}
- /**
- * Represents the content of the Dataset as an `RDD` of `T`.
- *
- * @group basic
- * @since 1.6.0
- */
+ /** @inheritdoc */
lazy val rdd: RDD[T] = {
val objectType = exprEnc.deserializer.dataType
rddQueryExecution.toRdd.mapPartitions { rows =>
@@ -1537,20 +1532,9 @@ class Dataset[T] private[sql](
}
}
- /**
- * Returns the content of the Dataset as a `JavaRDD` of `T`s.
- * @group basic
- * @since 1.6.0
- */
+ /** @inheritdoc */
def toJavaRDD: JavaRDD[T] = rdd.toJavaRDD()
- /**
- * Returns the content of the Dataset as a `JavaRDD` of `T`s.
- * @group basic
- * @since 1.6.0
- */
- def javaRDD: JavaRDD[T] = toJavaRDD
-
protected def createTempView(
viewName: String,
replace: Boolean,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
index 1bc7e3ee98e76..b6ed50447109d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
@@ -17,21 +17,9 @@
package org.apache.spark.sql
-import scala.language.implicitConversions
-
-import org.apache.spark.rdd.RDD
-
/** @inheritdoc */
abstract class SQLImplicits extends api.SQLImplicits {
type DS[U] = Dataset[U]
protected def session: SparkSession
-
- /**
- * Creates a [[Dataset]] from an RDD.
- *
- * @since 1.6.0
- */
- implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T, Dataset] =
- new DatasetHolder(session.createDataset(rdd))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index eeb46fbf145d7..2d485c4ef321d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -294,11 +294,7 @@ class SparkSession private(
new Dataset(self, LocalRelation(encoder.schema), encoder)
}
- /**
- * Creates a `DataFrame` from an RDD of Product (e.g. case classes, tuples).
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = withActive {
val encoder = Encoders.product[A]
Dataset.ofRows(self, ExternalRDD(rdd, self)(encoder))
@@ -311,37 +307,7 @@ class SparkSession private(
Dataset.ofRows(self, LocalRelation.fromProduct(attributeSeq, data))
}
- /**
- * :: DeveloperApi ::
- * Creates a `DataFrame` from an `RDD` containing [[Row]]s using the given schema.
- * It is important to make sure that the structure of every [[Row]] of the provided RDD matches
- * the provided schema. Otherwise, there will be runtime exception.
- * Example:
- * {{{
- * import org.apache.spark.sql._
- * import org.apache.spark.sql.types._
- * val sparkSession = new org.apache.spark.sql.SparkSession(sc)
- *
- * val schema =
- * StructType(
- * StructField("name", StringType, false) ::
- * StructField("age", IntegerType, true) :: Nil)
- *
- * val people =
- * sc.textFile("examples/src/main/resources/people.txt").map(
- * _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
- * val dataFrame = sparkSession.createDataFrame(people, schema)
- * dataFrame.printSchema
- * // root
- * // |-- name: string (nullable = false)
- * // |-- age: integer (nullable = true)
- *
- * dataFrame.createOrReplaceTempView("people")
- * sparkSession.sql("select name from people").collect.foreach(println)
- * }}}
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
@DeveloperApi
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = withActive {
val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
@@ -353,14 +319,7 @@ class SparkSession private(
internalCreateDataFrame(catalystRows.setName(rowRDD.name), schema)
}
- /**
- * :: DeveloperApi ::
- * Creates a `DataFrame` from a `JavaRDD` containing [[Row]]s using the given schema.
- * It is important to make sure that the structure of every [[Row]] of the provided RDD matches
- * the provided schema. Otherwise, there will be runtime exception.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
@DeveloperApi
def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
@@ -374,14 +333,7 @@ class SparkSession private(
Dataset.ofRows(self, LocalRelation.fromExternalRows(toAttributes(replaced), rows.asScala.toSeq))
}
- /**
- * Applies a schema to an RDD of Java Beans.
- *
- * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
- * SELECT * queries will return the columns in an undefined order.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = withActive {
val attributeSeq: Seq[AttributeReference] = getSchema(beanClass)
val className = beanClass.getName
@@ -392,14 +344,7 @@ class SparkSession private(
Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd.setName(rdd.name))(self))
}
- /**
- * Applies a schema to an RDD of Java Beans.
- *
- * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
- * SELECT * queries will return the columns in an undefined order.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
createDataFrame(rdd.rdd, beanClass)
}
@@ -434,14 +379,7 @@ class SparkSession private(
Dataset[T](self, plan)
}
- /**
- * Creates a [[Dataset]] from an RDD of a given type. This method requires an
- * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL representation)
- * that is generally created automatically through implicits from a `SparkSession`, or can be
- * created explicitly by calling static methods on [[Encoders]].
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = {
Dataset[T](self, ExternalRDD(data, self))
}