From a071edb9f96da0fb7fc20c589b20c65cf175c616 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Wed, 18 Sep 2024 12:08:52 -0400 Subject: [PATCH 1/3] Remove Dataset self type. --- .../spark/sql/DataFrameNaFunctions.scala | 3 +- .../apache/spark/sql/DataFrameReader.scala | 5 +- .../spark/sql/DataFrameStatFunctions.scala | 3 +- .../scala/org/apache/spark/sql/Dataset.scala | 5 +- .../spark/sql/KeyValueGroupedDataset.scala | 4 +- .../spark/sql/RelationalGroupedDataset.scala | 4 +- .../org/apache/spark/sql/SparkSession.scala | 2 +- .../apache/spark/sql/catalog/Catalog.scala | 3 +- .../sql/connect/ConnectConversions.scala | 51 +++ .../spark/sql/streaming/StreamingQuery.scala | 4 +- project/MimaExcludes.scala | 2 + project/SparkBuild.scala | 1 + .../org/apache/spark/sql/api/Catalog.scala | 58 ++-- .../spark/sql/api/DataFrameNaFunctions.scala | 65 ++-- .../spark/sql/api/DataFrameReader.scala | 51 +-- .../sql/api/DataFrameStatFunctions.scala | 22 +- .../org/apache/spark/sql/api/Dataset.scala | 300 +++++++++--------- .../sql/api/KeyValueGroupedDataset.scala | 110 +++---- .../sql/api/RelationalGroupedDataset.scala | 44 ++- .../apache/spark/sql/api/SparkSession.scala | 40 +-- .../apache/spark/sql/api/StreamingQuery.scala | 4 +- .../org/apache/spark/sql/functions.scala | 2 +- .../spark/sql/DataFrameNaFunctions.scala | 3 +- .../apache/spark/sql/DataFrameReader.scala | 5 +- .../spark/sql/DataFrameStatFunctions.scala | 3 +- .../scala/org/apache/spark/sql/Dataset.scala | 4 +- .../spark/sql/KeyValueGroupedDataset.scala | 3 +- .../spark/sql/RelationalGroupedDataset.scala | 5 +- .../org/apache/spark/sql/SparkSession.scala | 2 +- .../apache/spark/sql/catalog/Catalog.scala | 3 +- .../sql/classic/ClassicConversions.scala | 50 +++ .../spark/sql/streaming/StreamingQuery.scala | 4 +- 32 files changed, 501 insertions(+), 364 deletions(-) create mode 100644 connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/ConnectConversions.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/classic/ClassicConversions.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala index c06cbbc0cdb4..3777f82594aa 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala @@ -22,6 +22,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.connect.proto.{NAReplace, Relation} import org.apache.spark.connect.proto.Expression.{Literal => GLiteral} import org.apache.spark.connect.proto.NAReplace.Replacement +import org.apache.spark.sql.connect.ConnectConversions._ /** * Functionality for working with missing data in `DataFrame`s. @@ -29,7 +30,7 @@ import org.apache.spark.connect.proto.NAReplace.Replacement * @since 3.4.0 */ final class DataFrameNaFunctions private[sql] (sparkSession: SparkSession, root: Relation) - extends api.DataFrameNaFunctions[Dataset] { + extends api.DataFrameNaFunctions { import sparkSession.RichColumn override protected def drop(minNonNulls: Option[Int]): Dataset[Row] = diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index c3ee7030424e..60bacd4e18ed 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -23,6 +23,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.annotation.Stable import org.apache.spark.connect.proto.Parse.ParseFormat +import org.apache.spark.sql.connect.ConnectConversions._ import org.apache.spark.sql.connect.common.DataTypeProtoConverter import org.apache.spark.sql.types.StructType @@ -33,8 +34,8 @@ import org.apache.spark.sql.types.StructType * @since 3.4.0 */ @Stable -class DataFrameReader private[sql] (sparkSession: SparkSession) - extends api.DataFrameReader[Dataset] { +class DataFrameReader private[sql] (sparkSession: SparkSession) extends api.DataFrameReader { + type DS[U] = Dataset[U] /** @inheritdoc */ override def format(source: String): this.type = super.format(source) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 9f5ada0d7ec3..bb7cfa75a9ab 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -22,6 +22,7 @@ import java.{lang => jl, util => ju} import org.apache.spark.connect.proto.{Relation, StatSampleBy} import org.apache.spark.sql.DataFrameStatFunctions.approxQuantileResultEncoder import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, PrimitiveDoubleEncoder} +import org.apache.spark.sql.connect.ConnectConversions._ import org.apache.spark.sql.functions.lit /** @@ -30,7 +31,7 @@ import org.apache.spark.sql.functions.lit * @since 3.4.0 */ final class DataFrameStatFunctions private[sql] (protected val df: DataFrame) - extends api.DataFrameStatFunctions[Dataset] { + extends api.DataFrameStatFunctions { private def root: Relation = df.plan.getRoot private val sparkSession: SparkSession = df.sparkSession diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 519193ebd9c7..161a0d9d265f 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._ import org.apache.spark.sql.catalyst.expressions.OrderUtils +import org.apache.spark.sql.connect.ConnectConversions._ import org.apache.spark.sql.connect.client.SparkResult import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, StorageLevelProtoConverter} import org.apache.spark.sql.errors.DataTypeErrors.toSQLId @@ -134,8 +135,8 @@ class Dataset[T] private[sql] ( val sparkSession: SparkSession, @DeveloperApi val plan: proto.Plan, val encoder: Encoder[T]) - extends api.Dataset[T, Dataset] { - type RGD = RelationalGroupedDataset + extends api.Dataset[T] { + type DS[U] = Dataset[U] import sparkSession.RichColumn diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index aef7efb08a25..6bf251890147 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -26,6 +26,7 @@ import org.apache.spark.api.java.function._ import org.apache.spark.connect.proto import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.ProductEncoder +import org.apache.spark.sql.connect.ConnectConversions._ import org.apache.spark.sql.connect.common.UdfUtils import org.apache.spark.sql.expressions.SparkUserDefinedFunction import org.apache.spark.sql.functions.col @@ -40,8 +41,7 @@ import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode * * @since 3.5.0 */ -class KeyValueGroupedDataset[K, V] private[sql] () - extends api.KeyValueGroupedDataset[K, V, Dataset] { +class KeyValueGroupedDataset[K, V] private[sql] () extends api.KeyValueGroupedDataset[K, V] { type KVDS[KY, VL] = KeyValueGroupedDataset[KY, VL] private def unsupported(): Nothing = throw new UnsupportedOperationException() diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index ea13635fc2ea..14ceb3f4bb14 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import scala.jdk.CollectionConverters._ import org.apache.spark.connect.proto +import org.apache.spark.sql.connect.ConnectConversions._ /** * A set of methods for aggregations on a `DataFrame`, created by [[Dataset#groupBy groupBy]], @@ -39,8 +40,7 @@ class RelationalGroupedDataset private[sql] ( groupType: proto.Aggregate.GroupType, pivot: Option[proto.Aggregate.Pivot] = None, groupingSets: Option[Seq[proto.Aggregate.GroupingSets]] = None) - extends api.RelationalGroupedDataset[Dataset] { - type RGD = RelationalGroupedDataset + extends api.RelationalGroupedDataset { import df.sparkSession.RichColumn protected def toDF(aggExprs: Seq[Column]): DataFrame = { diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index aa6258a14b81..04f8eeb5c6d4 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -69,7 +69,7 @@ import org.apache.spark.util.ArrayImplicits._ class SparkSession private[sql] ( private[sql] val client: SparkConnectClient, private val planIdGenerator: AtomicLong) - extends api.SparkSession[Dataset] + extends api.SparkSession with Logging { private[this] val allocator = new RootAllocator() diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 11a4a044d20e..86b1dbe4754e 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.catalog import java.util import org.apache.spark.sql.{api, DataFrame, Dataset} +import org.apache.spark.sql.connect.ConnectConversions._ import org.apache.spark.sql.types.StructType /** @inheritdoc */ -abstract class Catalog extends api.Catalog[Dataset] { +abstract class Catalog extends api.Catalog { /** @inheritdoc */ override def listDatabases(): Dataset[Database] diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/ConnectConversions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/ConnectConversions.scala new file mode 100644 index 000000000000..7d81f4ead785 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/ConnectConversions.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect + +import scala.language.implicitConversions + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql._ + +/** + * Conversions from sql interfaces to the Connect specific implementation. + * + * This class is mainly used by the implementation. In the case of connect it should be extremely + * rare that a developer needs these classes. + * + * We provide both a trait and an object. The trait is useful in situations where an extension + * developer needs to use these conversions in a project covering multiple Spark versions. They + * can create a shim for these conversions, the Spark 4+ version of the shim implements this + * trait, and shims for older versions do not. + */ +@DeveloperApi +trait ConnectConversions { + implicit def castToImpl(session: api.SparkSession): SparkSession = + session.asInstanceOf[SparkSession] + + implicit def castToImpl[T](ds: api.Dataset[T]): Dataset[T] = + ds.asInstanceOf[Dataset[T]] + + implicit def castToImpl(rgds: api.RelationalGroupedDataset): RelationalGroupedDataset = + rgds.asInstanceOf[RelationalGroupedDataset] + + implicit def castToImpl[K, V]( + kvds: api.KeyValueGroupedDataset[K, V]): KeyValueGroupedDataset[K, V] = + kvds.asInstanceOf[KeyValueGroupedDataset[K, V]] +} + +object ConnectConversions extends ConnectConversions diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 3b47269875f4..29fbcc443deb 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -26,10 +26,10 @@ import org.apache.spark.connect.proto.ExecutePlanResponse import org.apache.spark.connect.proto.StreamingQueryCommand import org.apache.spark.connect.proto.StreamingQueryCommandResult import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.StreamingQueryInstance -import org.apache.spark.sql.{api, Dataset, SparkSession} +import org.apache.spark.sql.{api, SparkSession} /** @inheritdoc */ -trait StreamingQuery extends api.StreamingQuery[Dataset] { +trait StreamingQuery extends api.StreamingQuery { /** @inheritdoc */ override def sparkSession: SparkSession diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index dfe7b14e2ec6..ece4504395f1 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -201,6 +201,8 @@ object MimaExcludes { ProblemFilters.exclude[Problem]("org.apache.spark.sql.execution.*"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.internal.*"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.errors.*"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.classic.*"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.connect.*"), // DSv2 catalog and expression APIs are unstable yet. We should enable this back. ProblemFilters.exclude[Problem]("org.apache.spark.sql.connector.catalog.*"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.connector.expressions.*"), diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 4a8214b2e20a..d93a52985b77 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -1352,6 +1352,7 @@ trait SharedUnidocSettings { .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/util/kvstore"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalyst"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/connect/"))) + .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/classic/"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalog/v2/utils"))) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Catalog.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/Catalog.scala index fbb665b7f1b1..a0f51d30dc57 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/Catalog.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Catalog.scala @@ -33,7 +33,7 @@ import org.apache.spark.storage.StorageLevel * @since 2.0.0 */ @Stable -abstract class Catalog[DS[U] <: Dataset[U, DS]] { +abstract class Catalog { /** * Returns the current database (namespace) in this session. @@ -54,7 +54,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * * @since 2.0.0 */ - def listDatabases(): DS[Database] + def listDatabases(): Dataset[Database] /** * Returns a list of databases (namespaces) which name match the specify pattern and available @@ -62,7 +62,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * * @since 3.5.0 */ - def listDatabases(pattern: String): DS[Database] + def listDatabases(pattern: String): Dataset[Database] /** * Returns a list of tables/views in the current database (namespace). This includes all @@ -70,7 +70,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * * @since 2.0.0 */ - def listTables(): DS[Table] + def listTables(): Dataset[Table] /** * Returns a list of tables/views in the specified database (namespace) (the name can be @@ -79,7 +79,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * @since 2.0.0 */ @throws[AnalysisException]("database does not exist") - def listTables(dbName: String): DS[Table] + def listTables(dbName: String): Dataset[Table] /** * Returns a list of tables/views in the specified database (namespace) which name match the @@ -88,7 +88,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * @since 3.5.0 */ @throws[AnalysisException]("database does not exist") - def listTables(dbName: String, pattern: String): DS[Table] + def listTables(dbName: String, pattern: String): Dataset[Table] /** * Returns a list of functions registered in the current database (namespace). This includes all @@ -96,7 +96,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * * @since 2.0.0 */ - def listFunctions(): DS[Function] + def listFunctions(): Dataset[Function] /** * Returns a list of functions registered in the specified database (namespace) (the name can be @@ -105,7 +105,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * @since 2.0.0 */ @throws[AnalysisException]("database does not exist") - def listFunctions(dbName: String): DS[Function] + def listFunctions(dbName: String): Dataset[Function] /** * Returns a list of functions registered in the specified database (namespace) which name match @@ -115,7 +115,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * @since 3.5.0 */ @throws[AnalysisException]("database does not exist") - def listFunctions(dbName: String, pattern: String): DS[Function] + def listFunctions(dbName: String, pattern: String): Dataset[Function] /** * Returns a list of columns for the given table/view or temporary view. @@ -127,7 +127,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * @since 2.0.0 */ @throws[AnalysisException]("table does not exist") - def listColumns(tableName: String): DS[Column] + def listColumns(tableName: String): Dataset[Column] /** * Returns a list of columns for the given table/view in the specified database under the Hive @@ -143,7 +143,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * @since 2.0.0 */ @throws[AnalysisException]("database or table does not exist") - def listColumns(dbName: String, tableName: String): DS[Column] + def listColumns(dbName: String, tableName: String): Dataset[Column] /** * Get the database (namespace) with the specified name (can be qualified with catalog). This @@ -280,7 +280,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * @since 2.0.0 */ @deprecated("use createTable instead.", "2.2.0") - def createExternalTable(tableName: String, path: String): DS[Row] = { + def createExternalTable(tableName: String, path: String): Dataset[Row] = { createTable(tableName, path) } @@ -293,7 +293,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * identifier is provided, it refers to a table in the current database. * @since 2.2.0 */ - def createTable(tableName: String, path: String): DS[Row] + def createTable(tableName: String, path: String): Dataset[Row] /** * Creates a table from the given path based on a data source and returns the corresponding @@ -305,7 +305,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * @since 2.0.0 */ @deprecated("use createTable instead.", "2.2.0") - def createExternalTable(tableName: String, path: String, source: String): DS[Row] = { + def createExternalTable(tableName: String, path: String, source: String): Dataset[Row] = { createTable(tableName, path, source) } @@ -318,7 +318,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * identifier is provided, it refers to a table in the current database. * @since 2.2.0 */ - def createTable(tableName: String, path: String, source: String): DS[Row] + def createTable(tableName: String, path: String, source: String): Dataset[Row] /** * Creates a table from the given path based on a data source and a set of options. Then, @@ -333,7 +333,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { def createExternalTable( tableName: String, source: String, - options: util.Map[String, String]): DS[Row] = { + options: util.Map[String, String]): Dataset[Row] = { createTable(tableName, source, options) } @@ -349,7 +349,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { def createTable( tableName: String, source: String, - options: util.Map[String, String]): DS[Row] = { + options: util.Map[String, String]): Dataset[Row] = { createTable(tableName, source, options.asScala.toMap) } @@ -366,7 +366,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { def createExternalTable( tableName: String, source: String, - options: Map[String, String]): DS[Row] = { + options: Map[String, String]): Dataset[Row] = { createTable(tableName, source, options) } @@ -379,7 +379,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * identifier is provided, it refers to a table in the current database. * @since 2.2.0 */ - def createTable(tableName: String, source: String, options: Map[String, String]): DS[Row] + def createTable(tableName: String, source: String, options: Map[String, String]): Dataset[Row] /** * Create a table from the given path based on a data source, a schema and a set of options. @@ -395,7 +395,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { tableName: String, source: String, schema: StructType, - options: util.Map[String, String]): DS[Row] = { + options: util.Map[String, String]): Dataset[Row] = { createTable(tableName, source, schema, options) } @@ -412,7 +412,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { tableName: String, source: String, description: String, - options: util.Map[String, String]): DS[Row] = { + options: util.Map[String, String]): Dataset[Row] = { createTable( tableName, source = source, @@ -433,7 +433,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { tableName: String, source: String, description: String, - options: Map[String, String]): DS[Row] + options: Map[String, String]): Dataset[Row] /** * Create a table based on the dataset in a data source, a schema and a set of options. Then, @@ -448,7 +448,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { tableName: String, source: String, schema: StructType, - options: util.Map[String, String]): DS[Row] = { + options: util.Map[String, String]): Dataset[Row] = { createTable(tableName, source, schema, options.asScala.toMap) } @@ -466,7 +466,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { tableName: String, source: String, schema: StructType, - options: Map[String, String]): DS[Row] = { + options: Map[String, String]): Dataset[Row] = { createTable(tableName, source, schema, options) } @@ -483,7 +483,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { tableName: String, source: String, schema: StructType, - options: Map[String, String]): DS[Row] + options: Map[String, String]): Dataset[Row] /** * Create a table based on the dataset in a data source, a schema and a set of options. Then, @@ -499,7 +499,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { source: String, schema: StructType, description: String, - options: util.Map[String, String]): DS[Row] = { + options: util.Map[String, String]): Dataset[Row] = { createTable( tableName, source = source, @@ -522,7 +522,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { source: String, schema: StructType, description: String, - options: Map[String, String]): DS[Row] + options: Map[String, String]): Dataset[Row] /** * Drops the local temporary view with the given view name in the catalog. If the view has been @@ -670,7 +670,7 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * * @since 3.4.0 */ - def listCatalogs(): DS[CatalogMetadata] + def listCatalogs(): Dataset[CatalogMetadata] /** * Returns a list of catalogs which name match the specify pattern and available in this @@ -678,5 +678,5 @@ abstract class Catalog[DS[U] <: Dataset[U, DS]] { * * @since 3.5.0 */ - def listCatalogs(pattern: String): DS[CatalogMetadata] + def listCatalogs(pattern: String): Dataset[CatalogMetadata] } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameNaFunctions.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameNaFunctions.scala index 12d3d41aa554..ef6cc64c058a 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameNaFunctions.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameNaFunctions.scala @@ -30,14 +30,14 @@ import org.apache.spark.util.ArrayImplicits._ * @since 1.3.1 */ @Stable -abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { +abstract class DataFrameNaFunctions { /** * Returns a new `DataFrame` that drops rows containing any null or NaN values. * * @since 1.3.1 */ - def drop(): DS[Row] = drop("any") + def drop(): Dataset[Row] = drop("any") /** * Returns a new `DataFrame` that drops rows containing null or NaN values. @@ -47,7 +47,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def drop(how: String): DS[Row] = drop(toMinNonNulls(how)) + def drop(how: String): Dataset[Row] = drop(toMinNonNulls(how)) /** * Returns a new `DataFrame` that drops rows containing any null or NaN values in the specified @@ -55,7 +55,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def drop(cols: Array[String]): DS[Row] = drop(cols.toImmutableArraySeq) + def drop(cols: Array[String]): Dataset[Row] = drop(cols.toImmutableArraySeq) /** * (Scala-specific) Returns a new `DataFrame` that drops rows containing any null or NaN values @@ -63,7 +63,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def drop(cols: Seq[String]): DS[Row] = drop(cols.size, cols) + def drop(cols: Seq[String]): Dataset[Row] = drop(cols.size, cols) /** * Returns a new `DataFrame` that drops rows containing null or NaN values in the specified @@ -74,7 +74,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def drop(how: String, cols: Array[String]): DS[Row] = drop(how, cols.toImmutableArraySeq) + def drop(how: String, cols: Array[String]): Dataset[Row] = drop(how, cols.toImmutableArraySeq) /** * (Scala-specific) Returns a new `DataFrame` that drops rows containing null or NaN values in @@ -85,7 +85,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def drop(how: String, cols: Seq[String]): DS[Row] = drop(toMinNonNulls(how), cols) + def drop(how: String, cols: Seq[String]): Dataset[Row] = drop(toMinNonNulls(how), cols) /** * Returns a new `DataFrame` that drops rows containing less than `minNonNulls` non-null and @@ -93,7 +93,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def drop(minNonNulls: Int): DS[Row] = drop(Option(minNonNulls)) + def drop(minNonNulls: Int): Dataset[Row] = drop(Option(minNonNulls)) /** * Returns a new `DataFrame` that drops rows containing less than `minNonNulls` non-null and @@ -101,7 +101,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def drop(minNonNulls: Int, cols: Array[String]): DS[Row] = + def drop(minNonNulls: Int, cols: Array[String]): Dataset[Row] = drop(minNonNulls, cols.toImmutableArraySeq) /** @@ -110,7 +110,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def drop(minNonNulls: Int, cols: Seq[String]): DS[Row] = drop(Option(minNonNulls), cols) + def drop(minNonNulls: Int, cols: Seq[String]): Dataset[Row] = drop(Option(minNonNulls), cols) private def toMinNonNulls(how: String): Option[Int] = { how.toLowerCase(util.Locale.ROOT) match { @@ -120,29 +120,29 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { } } - protected def drop(minNonNulls: Option[Int]): DS[Row] + protected def drop(minNonNulls: Option[Int]): Dataset[Row] - protected def drop(minNonNulls: Option[Int], cols: Seq[String]): DS[Row] + protected def drop(minNonNulls: Option[Int], cols: Seq[String]): Dataset[Row] /** * Returns a new `DataFrame` that replaces null or NaN values in numeric columns with `value`. * * @since 2.2.0 */ - def fill(value: Long): DS[Row] + def fill(value: Long): Dataset[Row] /** * Returns a new `DataFrame` that replaces null or NaN values in numeric columns with `value`. * @since 1.3.1 */ - def fill(value: Double): DS[Row] + def fill(value: Double): Dataset[Row] /** * Returns a new `DataFrame` that replaces null values in string columns with `value`. * * @since 1.3.1 */ - def fill(value: String): DS[Row] + def fill(value: String): Dataset[Row] /** * Returns a new `DataFrame` that replaces null or NaN values in specified numeric columns. If a @@ -150,7 +150,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 2.2.0 */ - def fill(value: Long, cols: Array[String]): DS[Row] = fill(value, cols.toImmutableArraySeq) + def fill(value: Long, cols: Array[String]): Dataset[Row] = fill(value, cols.toImmutableArraySeq) /** * Returns a new `DataFrame` that replaces null or NaN values in specified numeric columns. If a @@ -158,7 +158,8 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def fill(value: Double, cols: Array[String]): DS[Row] = fill(value, cols.toImmutableArraySeq) + def fill(value: Double, cols: Array[String]): Dataset[Row] = + fill(value, cols.toImmutableArraySeq) /** * (Scala-specific) Returns a new `DataFrame` that replaces null or NaN values in specified @@ -166,7 +167,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 2.2.0 */ - def fill(value: Long, cols: Seq[String]): DS[Row] + def fill(value: Long, cols: Seq[String]): Dataset[Row] /** * (Scala-specific) Returns a new `DataFrame` that replaces null or NaN values in specified @@ -174,7 +175,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def fill(value: Double, cols: Seq[String]): DS[Row] + def fill(value: Double, cols: Seq[String]): Dataset[Row] /** * Returns a new `DataFrame` that replaces null values in specified string columns. If a @@ -182,7 +183,8 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def fill(value: String, cols: Array[String]): DS[Row] = fill(value, cols.toImmutableArraySeq) + def fill(value: String, cols: Array[String]): Dataset[Row] = + fill(value, cols.toImmutableArraySeq) /** * (Scala-specific) Returns a new `DataFrame` that replaces null values in specified string @@ -190,14 +192,14 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def fill(value: String, cols: Seq[String]): DS[Row] + def fill(value: String, cols: Seq[String]): Dataset[Row] /** * Returns a new `DataFrame` that replaces null values in boolean columns with `value`. * * @since 2.3.0 */ - def fill(value: Boolean): DS[Row] + def fill(value: Boolean): Dataset[Row] /** * (Scala-specific) Returns a new `DataFrame` that replaces null values in specified boolean @@ -205,7 +207,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 2.3.0 */ - def fill(value: Boolean, cols: Seq[String]): DS[Row] + def fill(value: Boolean, cols: Seq[String]): Dataset[Row] /** * Returns a new `DataFrame` that replaces null values in specified boolean columns. If a @@ -213,7 +215,8 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 2.3.0 */ - def fill(value: Boolean, cols: Array[String]): DS[Row] = fill(value, cols.toImmutableArraySeq) + def fill(value: Boolean, cols: Array[String]): Dataset[Row] = + fill(value, cols.toImmutableArraySeq) /** * Returns a new `DataFrame` that replaces null values. @@ -231,7 +234,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def fill(valueMap: util.Map[String, Any]): DS[Row] = fillMap(valueMap.asScala.toSeq) + def fill(valueMap: util.Map[String, Any]): Dataset[Row] = fillMap(valueMap.asScala.toSeq) /** * (Scala-specific) Returns a new `DataFrame` that replaces null values. @@ -251,9 +254,9 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def fill(valueMap: Map[String, Any]): DS[Row] = fillMap(valueMap.toSeq) + def fill(valueMap: Map[String, Any]): Dataset[Row] = fillMap(valueMap.toSeq) - protected def fillMap(values: Seq[(String, Any)]): DS[Row] + protected def fillMap(values: Seq[(String, Any)]): Dataset[Row] /** * Replaces values matching keys in `replacement` map with the corresponding values. @@ -280,7 +283,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def replace[T](col: String, replacement: util.Map[T, T]): DS[Row] = { + def replace[T](col: String, replacement: util.Map[T, T]): Dataset[Row] = { replace[T](col, replacement.asScala.toMap) } @@ -306,7 +309,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def replace[T](cols: Array[String], replacement: util.Map[T, T]): DS[Row] = { + def replace[T](cols: Array[String], replacement: util.Map[T, T]): Dataset[Row] = { replace(cols.toImmutableArraySeq, replacement.asScala.toMap) } @@ -333,7 +336,7 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def replace[T](col: String, replacement: Map[T, T]): DS[Row] + def replace[T](col: String, replacement: Map[T, T]): Dataset[Row] /** * (Scala-specific) Replaces values matching keys in `replacement` map. @@ -355,5 +358,5 @@ abstract class DataFrameNaFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.3.1 */ - def replace[T](cols: Seq[String], replacement: Map[T, T]): DS[Row] + def replace[T](cols: Seq[String], replacement: Map[T, T]): Dataset[Row] } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala index 6e6ab7b9d95a..c101c52fd066 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala @@ -34,7 +34,8 @@ import org.apache.spark.sql.types.StructType * @since 1.4.0 */ @Stable -abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { +abstract class DataFrameReader { + type DS[U] <: Dataset[U] /** * Specifies the input data source format. @@ -149,7 +150,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * * @since 1.4.0 */ - def load(): DS[Row] + def load(): Dataset[Row] /** * Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by a @@ -157,7 +158,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * * @since 1.4.0 */ - def load(path: String): DS[Row] + def load(path: String): Dataset[Row] /** * Loads input in as a `DataFrame`, for data sources that support multiple paths. Only works if @@ -166,7 +167,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * @since 1.6.0 */ @scala.annotation.varargs - def load(paths: String*): DS[Row] + def load(paths: String*): Dataset[Row] /** * Construct a `DataFrame` representing the database table accessible via JDBC URL url named @@ -179,7 +180,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * * @since 1.4.0 */ - def jdbc(url: String, table: String, properties: util.Properties): DS[Row] = { + def jdbc(url: String, table: String, properties: util.Properties): Dataset[Row] = { assertNoSpecifiedSchema("jdbc") // properties should override settings in extraOptions. this.extraOptions ++= properties.asScala @@ -223,7 +224,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { lowerBound: Long, upperBound: Long, numPartitions: Int, - connectionProperties: util.Properties): DS[Row] = { + connectionProperties: util.Properties): Dataset[Row] = { // columnName, lowerBound, upperBound and numPartitions override settings in extraOptions. this.extraOptions ++= Map( "partitionColumn" -> columnName, @@ -260,7 +261,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { url: String, table: String, predicates: Array[String], - connectionProperties: util.Properties): DS[Row] + connectionProperties: util.Properties): Dataset[Row] /** * Loads a JSON file and returns the results as a `DataFrame`. @@ -269,7 +270,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * * @since 1.4.0 */ - def json(path: String): DS[Row] = { + def json(path: String): Dataset[Row] = { // This method ensures that calls that explicit need single argument works, see SPARK-16009 json(Seq(path): _*) } @@ -290,7 +291,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * @since 2.0.0 */ @scala.annotation.varargs - def json(paths: String*): DS[Row] = { + def json(paths: String*): Dataset[Row] = { validateJsonSchema() format("json").load(paths: _*) } @@ -306,7 +307,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * input Dataset with one JSON object per record * @since 2.2.0 */ - def json(jsonDataset: DS[String]): DS[Row] + def json(jsonDataset: DS[String]): Dataset[Row] /** * Loads a CSV file and returns the result as a `DataFrame`. See the documentation on the other @@ -314,7 +315,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * * @since 2.0.0 */ - def csv(path: String): DS[Row] = { + def csv(path: String): Dataset[Row] = { // This method ensures that calls that explicit need single argument works, see SPARK-16009 csv(Seq(path): _*) } @@ -340,7 +341,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * input Dataset with one CSV row per record * @since 2.2.0 */ - def csv(csvDataset: DS[String]): DS[Row] + def csv(csvDataset: DS[String]): Dataset[Row] /** * Loads CSV files and returns the result as a `DataFrame`. @@ -356,7 +357,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * @since 2.0.0 */ @scala.annotation.varargs - def csv(paths: String*): DS[Row] = format("csv").load(paths: _*) + def csv(paths: String*): Dataset[Row] = format("csv").load(paths: _*) /** * Loads a XML file and returns the result as a `DataFrame`. See the documentation on the other @@ -364,7 +365,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * * @since 4.0.0 */ - def xml(path: String): DS[Row] = { + def xml(path: String): Dataset[Row] = { // This method ensures that calls that explicit need single argument works, see SPARK-16009 xml(Seq(path): _*) } @@ -383,7 +384,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * @since 4.0.0 */ @scala.annotation.varargs - def xml(paths: String*): DS[Row] = { + def xml(paths: String*): Dataset[Row] = { validateXmlSchema() format("xml").load(paths: _*) } @@ -398,7 +399,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * input Dataset with one XML object per record * @since 4.0.0 */ - def xml(xmlDataset: DS[String]): DS[Row] + def xml(xmlDataset: DS[String]): Dataset[Row] /** * Loads a Parquet file, returning the result as a `DataFrame`. See the documentation on the @@ -406,7 +407,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * * @since 2.0.0 */ - def parquet(path: String): DS[Row] = { + def parquet(path: String): Dataset[Row] = { // This method ensures that calls that explicit need single argument works, see SPARK-16009 parquet(Seq(path): _*) } @@ -421,7 +422,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * @since 1.4.0 */ @scala.annotation.varargs - def parquet(paths: String*): DS[Row] = format("parquet").load(paths: _*) + def parquet(paths: String*): Dataset[Row] = format("parquet").load(paths: _*) /** * Loads an ORC file and returns the result as a `DataFrame`. @@ -430,7 +431,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * input path * @since 1.5.0 */ - def orc(path: String): DS[Row] = { + def orc(path: String): Dataset[Row] = { // This method ensures that calls that explicit need single argument works, see SPARK-16009 orc(Seq(path): _*) } @@ -447,7 +448,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * @since 2.0.0 */ @scala.annotation.varargs - def orc(paths: String*): DS[Row] = format("orc").load(paths: _*) + def orc(paths: String*): Dataset[Row] = format("orc").load(paths: _*) /** * Returns the specified table/view as a `DataFrame`. If it's a table, it must support batch @@ -462,7 +463,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * database. Note that, the global temporary view database is also valid here. * @since 1.4.0 */ - def table(tableName: String): DS[Row] + def table(tableName: String): Dataset[Row] /** * Loads text files and returns a `DataFrame` whose schema starts with a string column named @@ -471,7 +472,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * * @since 2.0.0 */ - def text(path: String): DS[Row] = { + def text(path: String): Dataset[Row] = { // This method ensures that calls that explicit need single argument works, see SPARK-16009 text(Seq(path): _*) } @@ -499,14 +500,14 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * @since 1.6.0 */ @scala.annotation.varargs - def text(paths: String*): DS[Row] = format("text").load(paths: _*) + def text(paths: String*): Dataset[Row] = format("text").load(paths: _*) /** * Loads text files and returns a [[Dataset]] of String. See the documentation on the other * overloaded `textFile()` method for more details. * @since 2.0.0 */ - def textFile(path: String): DS[String] = { + def textFile(path: String): Dataset[String] = { // This method ensures that calls that explicit need single argument works, see SPARK-16009 textFile(Seq(path): _*) } @@ -534,7 +535,7 @@ abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] { * @since 2.0.0 */ @scala.annotation.varargs - def textFile(paths: String*): DS[String] = { + def textFile(paths: String*): Dataset[String] = { assertNoSpecifiedSchema("textFile") text(paths: _*).select("value").as(StringEncoder) } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameStatFunctions.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameStatFunctions.scala index fc1680231be5..ae7c256b30ac 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameStatFunctions.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameStatFunctions.scala @@ -34,8 +34,8 @@ import org.apache.spark.util.sketch.{BloomFilter, CountMinSketch} * @since 1.4.0 */ @Stable -abstract class DataFrameStatFunctions[DS[U] <: Dataset[U, DS]] { - protected def df: DS[Row] +abstract class DataFrameStatFunctions { + protected def df: Dataset[Row] /** * Calculates the approximate quantiles of a numerical column of a DataFrame. @@ -202,7 +202,7 @@ abstract class DataFrameStatFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.4.0 */ - def crosstab(col1: String, col2: String): DS[Row] + def crosstab(col1: String, col2: String): Dataset[Row] /** * Finding frequent items for columns, possibly with false positives. Using the frequent element @@ -246,7 +246,7 @@ abstract class DataFrameStatFunctions[DS[U] <: Dataset[U, DS]] { * }}} * @since 1.4.0 */ - def freqItems(cols: Array[String], support: Double): DS[Row] = + def freqItems(cols: Array[String], support: Double): Dataset[Row] = freqItems(cols.toImmutableArraySeq, support) /** @@ -263,7 +263,7 @@ abstract class DataFrameStatFunctions[DS[U] <: Dataset[U, DS]] { * A Local DataFrame with the Array of frequent items for each column. * @since 1.4.0 */ - def freqItems(cols: Array[String]): DS[Row] = freqItems(cols, 0.01) + def freqItems(cols: Array[String]): Dataset[Row] = freqItems(cols, 0.01) /** * (Scala-specific) Finding frequent items for columns, possibly with false positives. Using the @@ -307,7 +307,7 @@ abstract class DataFrameStatFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.4.0 */ - def freqItems(cols: Seq[String], support: Double): DS[Row] + def freqItems(cols: Seq[String], support: Double): Dataset[Row] /** * (Scala-specific) Finding frequent items for columns, possibly with false positives. Using the @@ -324,7 +324,7 @@ abstract class DataFrameStatFunctions[DS[U] <: Dataset[U, DS]] { * A Local DataFrame with the Array of frequent items for each column. * @since 1.4.0 */ - def freqItems(cols: Seq[String]): DS[Row] = freqItems(cols, 0.01) + def freqItems(cols: Seq[String]): Dataset[Row] = freqItems(cols, 0.01) /** * Returns a stratified sample without replacement based on the fraction given on each stratum. @@ -356,7 +356,7 @@ abstract class DataFrameStatFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.5.0 */ - def sampleBy[T](col: String, fractions: Map[T, Double], seed: Long): DS[Row] = { + def sampleBy[T](col: String, fractions: Map[T, Double], seed: Long): Dataset[Row] = { sampleBy(Column(col), fractions, seed) } @@ -376,7 +376,7 @@ abstract class DataFrameStatFunctions[DS[U] <: Dataset[U, DS]] { * * @since 1.5.0 */ - def sampleBy[T](col: String, fractions: ju.Map[T, jl.Double], seed: Long): DS[Row] = { + def sampleBy[T](col: String, fractions: ju.Map[T, jl.Double], seed: Long): Dataset[Row] = { sampleBy(col, fractions.asScala.toMap.asInstanceOf[Map[T, Double]], seed) } @@ -413,7 +413,7 @@ abstract class DataFrameStatFunctions[DS[U] <: Dataset[U, DS]] { * * @since 3.0.0 */ - def sampleBy[T](col: Column, fractions: Map[T, Double], seed: Long): DS[Row] + def sampleBy[T](col: Column, fractions: Map[T, Double], seed: Long): Dataset[Row] /** * (Java-specific) Returns a stratified sample without replacement based on the fraction given @@ -432,7 +432,7 @@ abstract class DataFrameStatFunctions[DS[U] <: Dataset[U, DS]] { * a new `DataFrame` that represents the stratified sample * @since 3.0.0 */ - def sampleBy[T](col: Column, fractions: ju.Map[T, jl.Double], seed: Long): DS[Row] = { + def sampleBy[T](col: Column, fractions: ju.Map[T, jl.Double], seed: Long): Dataset[Row] = { sampleBy(col, fractions.asScala.toMap.asInstanceOf[Map[T, Double]], seed) } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala index fb8b6f2f483a..df0190319c55 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala @@ -119,10 +119,11 @@ import org.apache.spark.util.SparkClassUtils * @since 1.6.0 */ @Stable -abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { - type RGD <: RelationalGroupedDataset[DS] +abstract class Dataset[T] extends Serializable { + // TODO check signatures for the implementations! + type DS[U] <: Dataset[U] - def sparkSession: SparkSession[DS] + def sparkSession: SparkSession val encoder: Encoder[T] @@ -136,7 +137,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { */ // This is declared with parentheses to prevent the Scala compiler from treating // `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. - def toDF(): DS[Row] + def toDF(): Dataset[Row] /** * Returns a new Dataset where each record has been mapped on to the specified type. The method @@ -157,7 +158,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group basic * @since 1.6.0 */ - def as[U: Encoder]: DS[U] + def as[U: Encoder]: Dataset[U] /** * Returns a new DataFrame where each row is reconciled to match the specified schema. Spark @@ -175,7 +176,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group basic * @since 3.4.0 */ - def to(schema: StructType): DS[Row] + def to(schema: StructType): Dataset[Row] /** * Converts this strongly typed collection of data to generic `DataFrame` with columns renamed. @@ -191,7 +192,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def toDF(colNames: String*): DS[Row] + def toDF(colNames: String*): Dataset[Row] /** * Returns the schema of this Dataset. @@ -312,7 +313,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group basic * @since 2.1.0 */ - def checkpoint(): DS[T] = checkpoint(eager = true, reliableCheckpoint = true) + def checkpoint(): Dataset[T] = checkpoint(eager = true, reliableCheckpoint = true) /** * Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the @@ -331,7 +332,8 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group basic * @since 2.1.0 */ - def checkpoint(eager: Boolean): DS[T] = checkpoint(eager = eager, reliableCheckpoint = true) + def checkpoint(eager: Boolean): Dataset[T] = + checkpoint(eager = eager, reliableCheckpoint = true) /** * Eagerly locally checkpoints a Dataset and return the new Dataset. Checkpointing can be used @@ -342,7 +344,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group basic * @since 2.3.0 */ - def localCheckpoint(): DS[T] = checkpoint(eager = true, reliableCheckpoint = false) + def localCheckpoint(): Dataset[T] = checkpoint(eager = true, reliableCheckpoint = false) /** * Locally checkpoints a Dataset and return the new Dataset. Checkpointing can be used to @@ -361,7 +363,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group basic * @since 2.3.0 */ - def localCheckpoint(eager: Boolean): DS[T] = + def localCheckpoint(eager: Boolean): Dataset[T] = checkpoint(eager = eager, reliableCheckpoint = false) /** @@ -373,7 +375,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * Whether to create a reliable checkpoint saved to files inside the checkpoint directory. If * false creates a local checkpoint using the caching subsystem */ - protected def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): DS[T] + protected def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T] /** * Defines an event time watermark for this [[Dataset]]. A watermark tracks a point in time @@ -400,7 +402,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { */ // We only accept an existing column name, not a derived column here as a watermark that is // defined on a derived column cannot referenced elsewhere in the plan. - def withWatermark(eventTime: String, delayThreshold: String): DS[T] + def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] /** * Displays the Dataset in a tabular form. Strings more than 20 characters will be truncated, @@ -551,7 +553,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 1.6.0 */ - def na: DataFrameNaFunctions[DS] + def na: DataFrameNaFunctions /** * Returns a [[DataFrameStatFunctions]] for working statistic functions support. @@ -563,7 +565,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 1.6.0 */ - def stat: DataFrameStatFunctions[DS] + def stat: DataFrameStatFunctions /** * Join with another `DataFrame`. @@ -575,7 +577,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 2.0.0 */ - def join(right: DS[_]): DS[Row] + def join(right: DS[_]): Dataset[Row] /** * Inner equi-join with another `DataFrame` using the given column. @@ -601,7 +603,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 2.0.0 */ - def join(right: DS[_], usingColumn: String): DS[Row] = { + def join(right: DS[_], usingColumn: String): Dataset[Row] = { join(right, Seq(usingColumn)) } @@ -617,7 +619,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 3.4.0 */ - def join(right: DS[_], usingColumns: Array[String]): DS[Row] = { + def join(right: DS[_], usingColumns: Array[String]): Dataset[Row] = { join(right, usingColumns.toImmutableArraySeq) } @@ -645,7 +647,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 2.0.0 */ - def join(right: DS[_], usingColumns: Seq[String]): DS[Row] = { + def join(right: DS[_], usingColumns: Seq[String]): Dataset[Row] = { join(right, usingColumns, "inner") } @@ -675,7 +677,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 3.4.0 */ - def join(right: DS[_], usingColumn: String, joinType: String): DS[Row] = { + def join(right: DS[_], usingColumn: String, joinType: String): Dataset[Row] = { join(right, Seq(usingColumn), joinType) } @@ -696,7 +698,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 3.4.0 */ - def join(right: DS[_], usingColumns: Array[String], joinType: String): DS[Row] = { + def join(right: DS[_], usingColumns: Array[String], joinType: String): Dataset[Row] = { join(right, usingColumns.toImmutableArraySeq, joinType) } @@ -726,7 +728,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 2.0.0 */ - def join(right: DS[_], usingColumns: Seq[String], joinType: String): DS[Row] + def join(right: DS[_], usingColumns: Seq[String], joinType: String): Dataset[Row] /** * Inner join with another `DataFrame`, using the given join expression. @@ -740,7 +742,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 2.0.0 */ - def join(right: DS[_], joinExprs: Column): DS[Row] = + def join(right: DS[_], joinExprs: Column): Dataset[Row] = join(right, joinExprs, "inner") /** @@ -770,7 +772,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 2.0.0 */ - def join(right: DS[_], joinExprs: Column, joinType: String): DS[Row] + def join(right: DS[_], joinExprs: Column, joinType: String): Dataset[Row] /** * Explicit cartesian join with another `DataFrame`. @@ -782,7 +784,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 2.1.0 */ - def crossJoin(right: DS[_]): DS[Row] + def crossJoin(right: DS[_]): Dataset[Row] /** * Joins this Dataset returning a `Tuple2` for each pair where `condition` evaluates to true. @@ -806,7 +808,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 1.6.0 */ - def joinWith[U](other: DS[U], condition: Column, joinType: String): DS[(T, U)] + def joinWith[U](other: DS[U], condition: Column, joinType: String): Dataset[(T, U)] /** * Using inner equi-join to join this Dataset returning a `Tuple2` for each pair where @@ -819,11 +821,11 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 1.6.0 */ - def joinWith[U](other: DS[U], condition: Column): DS[(T, U)] = { + def joinWith[U](other: DS[U], condition: Column): Dataset[(T, U)] = { joinWith(other, condition, "inner") } - protected def sortInternal(global: Boolean, sortExprs: Seq[Column]): DS[T] + protected def sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T] /** * Returns a new Dataset with each partition sorted by the given expressions. @@ -834,7 +836,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def sortWithinPartitions(sortCol: String, sortCols: String*): DS[T] = { + def sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T] = { sortWithinPartitions((sortCol +: sortCols).map(Column(_)): _*) } @@ -847,7 +849,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def sortWithinPartitions(sortExprs: Column*): DS[T] = { + def sortWithinPartitions(sortExprs: Column*): Dataset[T] = { sortInternal(global = false, sortExprs) } @@ -864,7 +866,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def sort(sortCol: String, sortCols: String*): DS[T] = { + def sort(sortCol: String, sortCols: String*): Dataset[T] = { sort((sortCol +: sortCols).map(Column(_)): _*) } @@ -878,7 +880,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def sort(sortExprs: Column*): DS[T] = { + def sort(sortExprs: Column*): Dataset[T] = { sortInternal(global = true, sortExprs) } @@ -890,7 +892,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def orderBy(sortCol: String, sortCols: String*): DS[T] = sort(sortCol, sortCols: _*) + def orderBy(sortCol: String, sortCols: String*): Dataset[T] = sort(sortCol, sortCols: _*) /** * Returns a new Dataset sorted by the given expressions. This is an alias of the `sort` @@ -900,7 +902,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def orderBy(sortExprs: Column*): DS[T] = sort(sortExprs: _*) + def orderBy(sortExprs: Column*): Dataset[T] = sort(sortExprs: _*) /** * Specifies some hint on the current Dataset. As an example, the following code specifies that @@ -926,7 +928,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.2.0 */ @scala.annotation.varargs - def hint(name: String, parameters: Any*): DS[T] + def hint(name: String, parameters: Any*): Dataset[T] /** * Selects column based on the column name and returns it as a [[org.apache.spark.sql.Column]]. @@ -975,7 +977,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 1.6.0 */ - def as(alias: String): DS[T] + def as(alias: String): Dataset[T] /** * (Scala-specific) Returns a new Dataset with an alias set. @@ -983,7 +985,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.0.0 */ - def as(alias: Symbol): DS[T] = as(alias.name) + def as(alias: Symbol): Dataset[T] = as(alias.name) /** * Returns a new Dataset with an alias set. Same as `as`. @@ -991,7 +993,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.0.0 */ - def alias(alias: String): DS[T] = as(alias) + def alias(alias: String): Dataset[T] = as(alias) /** * (Scala-specific) Returns a new Dataset with an alias set. Same as `as`. @@ -999,7 +1001,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.0.0 */ - def alias(alias: Symbol): DS[T] = as(alias) + def alias(alias: Symbol): Dataset[T] = as(alias) /** * Selects a set of column based expressions. @@ -1011,7 +1013,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def select(cols: Column*): DS[Row] + def select(cols: Column*): Dataset[Row] /** * Selects a set of columns. This is a variant of `select` that can only select existing columns @@ -1027,7 +1029,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def select(col: String, cols: String*): DS[Row] = select((col +: cols).map(Column(_)): _*) + def select(col: String, cols: String*): Dataset[Row] = select((col +: cols).map(Column(_)): _*) /** * Selects a set of SQL expressions. This is a variant of `select` that accepts SQL expressions. @@ -1042,7 +1044,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def selectExpr(exprs: String*): DS[Row] = select(exprs.map(functions.expr): _*) + def selectExpr(exprs: String*): Dataset[Row] = select(exprs.map(functions.expr): _*) /** * Returns a new Dataset by computing the given [[org.apache.spark.sql.Column]] expression for @@ -1056,14 +1058,14 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 1.6.0 */ - def select[U1](c1: TypedColumn[T, U1]): DS[U1] + def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] /** * Internal helper function for building typed selects that return tuples. For simplicity and * code reuse, we do this without the help of the type system and then use helper functions that * cast appropriately for the user facing interface. */ - protected def selectUntyped(columns: TypedColumn[_, _]*): DS[_] + protected def selectUntyped(columns: TypedColumn[_, _]*): Dataset[_] /** * Returns a new Dataset by computing the given [[org.apache.spark.sql.Column]] expressions for @@ -1072,8 +1074,8 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 1.6.0 */ - def select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): DS[(U1, U2)] = - selectUntyped(c1, c2).asInstanceOf[DS[(U1, U2)]] + def select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)] = + selectUntyped(c1, c2).asInstanceOf[Dataset[(U1, U2)]] /** * Returns a new Dataset by computing the given [[org.apache.spark.sql.Column]] expressions for @@ -1085,8 +1087,8 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { def select[U1, U2, U3]( c1: TypedColumn[T, U1], c2: TypedColumn[T, U2], - c3: TypedColumn[T, U3]): DS[(U1, U2, U3)] = - selectUntyped(c1, c2, c3).asInstanceOf[DS[(U1, U2, U3)]] + c3: TypedColumn[T, U3]): Dataset[(U1, U2, U3)] = + selectUntyped(c1, c2, c3).asInstanceOf[Dataset[(U1, U2, U3)]] /** * Returns a new Dataset by computing the given [[org.apache.spark.sql.Column]] expressions for @@ -1099,8 +1101,8 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { c1: TypedColumn[T, U1], c2: TypedColumn[T, U2], c3: TypedColumn[T, U3], - c4: TypedColumn[T, U4]): DS[(U1, U2, U3, U4)] = - selectUntyped(c1, c2, c3, c4).asInstanceOf[DS[(U1, U2, U3, U4)]] + c4: TypedColumn[T, U4]): Dataset[(U1, U2, U3, U4)] = + selectUntyped(c1, c2, c3, c4).asInstanceOf[Dataset[(U1, U2, U3, U4)]] /** * Returns a new Dataset by computing the given [[org.apache.spark.sql.Column]] expressions for @@ -1114,8 +1116,8 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { c2: TypedColumn[T, U2], c3: TypedColumn[T, U3], c4: TypedColumn[T, U4], - c5: TypedColumn[T, U5]): DS[(U1, U2, U3, U4, U5)] = - selectUntyped(c1, c2, c3, c4, c5).asInstanceOf[DS[(U1, U2, U3, U4, U5)]] + c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)] = + selectUntyped(c1, c2, c3, c4, c5).asInstanceOf[Dataset[(U1, U2, U3, U4, U5)]] /** * Filters rows using the given condition. @@ -1128,7 +1130,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 1.6.0 */ - def filter(condition: Column): DS[T] + def filter(condition: Column): Dataset[T] /** * Filters rows using the given SQL expression. @@ -1139,7 +1141,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 1.6.0 */ - def filter(conditionExpr: String): DS[T] = + def filter(conditionExpr: String): Dataset[T] = filter(functions.expr(conditionExpr)) /** @@ -1149,7 +1151,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 1.6.0 */ - def filter(func: T => Boolean): DS[T] + def filter(func: T => Boolean): Dataset[T] /** * (Java-specific) Returns a new Dataset that only contains elements where `func` returns @@ -1158,7 +1160,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 1.6.0 */ - def filter(func: FilterFunction[T]): DS[T] + def filter(func: FilterFunction[T]): Dataset[T] /** * Filters rows using the given condition. This is an alias for `filter`. @@ -1171,7 +1173,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 1.6.0 */ - def where(condition: Column): DS[T] = filter(condition) + def where(condition: Column): Dataset[T] = filter(condition) /** * Filters rows using the given SQL expression. @@ -1182,7 +1184,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 1.6.0 */ - def where(conditionExpr: String): DS[T] = filter(conditionExpr) + def where(conditionExpr: String): Dataset[T] = filter(conditionExpr) /** * Groups the Dataset using the specified columns, so we can run aggregation on them. See @@ -1203,7 +1205,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def groupBy(cols: Column*): RGD + def groupBy(cols: Column*): RelationalGroupedDataset /** * Groups the Dataset using the specified columns, so that we can run aggregation on them. See @@ -1227,7 +1229,8 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def groupBy(col1: String, cols: String*): RGD = groupBy((col1 +: cols).map(col): _*) + def groupBy(col1: String, cols: String*): RelationalGroupedDataset = groupBy( + (col1 +: cols).map(col): _*) /** * Create a multi-dimensional rollup for the current Dataset using the specified columns, so we @@ -1249,7 +1252,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def rollup(cols: Column*): RGD + def rollup(cols: Column*): RelationalGroupedDataset /** * Create a multi-dimensional rollup for the current Dataset using the specified columns, so we @@ -1274,7 +1277,8 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def rollup(col1: String, cols: String*): RGD = rollup((col1 +: cols).map(col): _*) + def rollup(col1: String, cols: String*): RelationalGroupedDataset = rollup( + (col1 +: cols).map(col): _*) /** * Create a multi-dimensional cube for the current Dataset using the specified columns, so we @@ -1296,7 +1300,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def cube(cols: Column*): RGD + def cube(cols: Column*): RelationalGroupedDataset /** * Create a multi-dimensional cube for the current Dataset using the specified columns, so we @@ -1321,7 +1325,8 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def cube(col1: String, cols: String*): RGD = cube((col1 +: cols).map(col): _*) + def cube(col1: String, cols: String*): RelationalGroupedDataset = cube( + (col1 +: cols).map(col): _*) /** * Create multi-dimensional aggregation for the current Dataset using the specified grouping @@ -1343,7 +1348,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 4.0.0 */ @scala.annotation.varargs - def groupingSets(groupingSets: Seq[Seq[Column]], cols: Column*): RGD + def groupingSets(groupingSets: Seq[Seq[Column]], cols: Column*): RelationalGroupedDataset /** * (Scala-specific) Aggregates on the entire Dataset without groups. @@ -1356,7 +1361,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 2.0.0 */ - def agg(aggExpr: (String, String), aggExprs: (String, String)*): DS[Row] = { + def agg(aggExpr: (String, String), aggExprs: (String, String)*): Dataset[Row] = { groupBy().agg(aggExpr, aggExprs: _*) } @@ -1371,7 +1376,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 2.0.0 */ - def agg(exprs: Map[String, String]): DS[Row] = groupBy().agg(exprs) + def agg(exprs: Map[String, String]): Dataset[Row] = groupBy().agg(exprs) /** * (Java-specific) Aggregates on the entire Dataset without groups. @@ -1384,7 +1389,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 2.0.0 */ - def agg(exprs: util.Map[String, String]): DS[Row] = groupBy().agg(exprs) + def agg(exprs: util.Map[String, String]): Dataset[Row] = groupBy().agg(exprs) /** * Aggregates on the entire Dataset without groups. @@ -1398,7 +1403,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def agg(expr: Column, exprs: Column*): DS[Row] = groupBy().agg(expr, exprs: _*) + def agg(expr: Column, exprs: Column*): Dataset[Row] = groupBy().agg(expr, exprs: _*) /** * (Scala-specific) Reduces the elements of this Dataset using the specified binary function. @@ -1479,7 +1484,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { ids: Array[Column], values: Array[Column], variableColumnName: String, - valueColumnName: String): DS[Row] + valueColumnName: String): Dataset[Row] /** * Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns @@ -1502,7 +1507,10 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 3.4.0 */ - def unpivot(ids: Array[Column], variableColumnName: String, valueColumnName: String): DS[Row] + def unpivot( + ids: Array[Column], + variableColumnName: String, + valueColumnName: String): Dataset[Row] /** * Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns @@ -1526,7 +1534,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { ids: Array[Column], values: Array[Column], variableColumnName: String, - valueColumnName: String): DS[Row] = + valueColumnName: String): Dataset[Row] = unpivot(ids, values, variableColumnName, valueColumnName) /** @@ -1548,7 +1556,10 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 3.4.0 */ - def melt(ids: Array[Column], variableColumnName: String, valueColumnName: String): DS[Row] = + def melt( + ids: Array[Column], + variableColumnName: String, + valueColumnName: String): Dataset[Row] = unpivot(ids, variableColumnName, valueColumnName) /** @@ -1611,7 +1622,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 4.0.0 */ - def transpose(indexColumn: Column): DS[Row] + def transpose(indexColumn: Column): Dataset[Row] /** * Transposes a DataFrame, switching rows to columns. This function transforms the DataFrame @@ -1630,7 +1641,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 4.0.0 */ - def transpose(): DS[Row] + def transpose(): Dataset[Row] /** * Define (named) metrics to observe on the Dataset. This method returns an 'observed' Dataset @@ -1651,7 +1662,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 3.0.0 */ @scala.annotation.varargs - def observe(name: String, expr: Column, exprs: Column*): DS[T] + def observe(name: String, expr: Column, exprs: Column*): Dataset[T] /** * Observe (named) metrics through an `org.apache.spark.sql.Observation` instance. This method @@ -1674,7 +1685,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 3.3.0 */ @scala.annotation.varargs - def observe(observation: Observation, expr: Column, exprs: Column*): DS[T] + def observe(observation: Observation, expr: Column, exprs: Column*): Dataset[T] /** * Returns a new Dataset by taking the first `n` rows. The difference between this function and @@ -1684,7 +1695,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.0.0 */ - def limit(n: Int): DS[T] + def limit(n: Int): Dataset[T] /** * Returns a new Dataset by skipping the first `n` rows. @@ -1692,7 +1703,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 3.4.0 */ - def offset(n: Int): DS[T] + def offset(n: Int): Dataset[T] /** * Returns a new Dataset containing union of rows in this Dataset and another Dataset. @@ -1724,7 +1735,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.0.0 */ - def union(other: DS[T]): DS[T] + def union(other: DS[T]): Dataset[T] /** * Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is @@ -1738,7 +1749,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.0.0 */ - def unionAll(other: DS[T]): DS[T] = union(other) + def unionAll(other: DS[T]): Dataset[T] = union(other) /** * Returns a new Dataset containing union of rows in this Dataset and another Dataset. @@ -1769,7 +1780,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.3.0 */ - def unionByName(other: DS[T]): DS[T] = unionByName(other, allowMissingColumns = false) + def unionByName(other: DS[T]): Dataset[T] = unionByName(other, allowMissingColumns = false) /** * Returns a new Dataset containing union of rows in this Dataset and another Dataset. @@ -1813,7 +1824,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 3.1.0 */ - def unionByName(other: DS[T], allowMissingColumns: Boolean): DS[T] + def unionByName(other: DS[T], allowMissingColumns: Boolean): Dataset[T] /** * Returns a new Dataset containing rows only in both this Dataset and another Dataset. This is @@ -1825,7 +1836,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 1.6.0 */ - def intersect(other: DS[T]): DS[T] + def intersect(other: DS[T]): Dataset[T] /** * Returns a new Dataset containing rows only in both this Dataset and another Dataset while @@ -1838,7 +1849,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.4.0 */ - def intersectAll(other: DS[T]): DS[T] + def intersectAll(other: DS[T]): Dataset[T] /** * Returns a new Dataset containing rows in this Dataset but not in another Dataset. This is @@ -1850,7 +1861,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.0.0 */ - def except(other: DS[T]): DS[T] + def except(other: DS[T]): Dataset[T] /** * Returns a new Dataset containing rows in this Dataset but not in another Dataset while @@ -1863,7 +1874,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.4.0 */ - def exceptAll(other: DS[T]): DS[T] + def exceptAll(other: DS[T]): Dataset[T] /** * Returns a new [[Dataset]] by sampling a fraction of rows (without replacement), using a @@ -1879,7 +1890,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.3.0 */ - def sample(fraction: Double, seed: Long): DS[T] = { + def sample(fraction: Double, seed: Long): Dataset[T] = { sample(withReplacement = false, fraction = fraction, seed = seed) } @@ -1895,7 +1906,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.3.0 */ - def sample(fraction: Double): DS[T] = { + def sample(fraction: Double): Dataset[T] = { sample(withReplacement = false, fraction = fraction) } @@ -1914,7 +1925,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 1.6.0 */ - def sample(withReplacement: Boolean, fraction: Double, seed: Long): DS[T] + def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T] /** * Returns a new [[Dataset]] by sampling a fraction of rows, using a random seed. @@ -1931,7 +1942,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 1.6.0 */ - def sample(withReplacement: Boolean, fraction: Double): DS[T] = { + def sample(withReplacement: Boolean, fraction: Double): Dataset[T] = { sample(withReplacement, fraction, SparkClassUtils.random.nextLong) } @@ -1948,7 +1959,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.0.0 */ - def randomSplit(weights: Array[Double], seed: Long): Array[_ <: DS[T]] + def randomSplit(weights: Array[Double], seed: Long): Array[_ <: Dataset[T]] /** * Returns a Java list that contains randomly split Dataset with the provided weights. @@ -1960,7 +1971,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.0.0 */ - def randomSplitAsList(weights: Array[Double], seed: Long): util.List[_ <: DS[T]] + def randomSplitAsList(weights: Array[Double], seed: Long): util.List[_ <: Dataset[T]] /** * Randomly splits this Dataset with the provided weights. @@ -1970,7 +1981,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.0.0 */ - def randomSplit(weights: Array[Double]): Array[_ <: DS[T]] + def randomSplit(weights: Array[Double]): Array[_ <: Dataset[T]] /** * (Scala-specific) Returns a new Dataset where each row has been expanded to zero or more rows @@ -1983,7 +1994,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * * {{{ * case class Book(title: String, words: String) - * val ds: DS[Book] + * val ds: Dataset[Book] * * val allWords = ds.select($"title", explode(split($"words", " ")).as("word")) * @@ -2000,7 +2011,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @deprecated("use flatMap() or select() with functions.explode() instead", "2.0.0") - def explode[A <: Product: TypeTag](input: Column*)(f: Row => IterableOnce[A]): DS[Row] + def explode[A <: Product: TypeTag](input: Column*)(f: Row => IterableOnce[A]): Dataset[Row] /** * (Scala-specific) Returns a new Dataset where a single column has been expanded to zero or @@ -2026,7 +2037,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { */ @deprecated("use flatMap() or select() with functions.explode() instead", "2.0.0") def explode[A, B: TypeTag](inputColumn: String, outputColumn: String)( - f: A => IterableOnce[B]): DS[Row] + f: A => IterableOnce[B]): Dataset[Row] /** * Returns a new Dataset by adding a column or replacing the existing column that has the same @@ -2043,7 +2054,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 2.0.0 */ - def withColumn(colName: String, col: Column): DS[Row] = withColumns(Seq(colName), Seq(col)) + def withColumn(colName: String, col: Column): Dataset[Row] = withColumns(Seq(colName), Seq(col)) /** * (Scala-specific) Returns a new Dataset by adding columns or replacing the existing columns @@ -2055,7 +2066,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 3.3.0 */ - def withColumns(colsMap: Map[String, Column]): DS[Row] = { + def withColumns(colsMap: Map[String, Column]): Dataset[Row] = { val (colNames, newCols) = colsMap.toSeq.unzip withColumns(colNames, newCols) } @@ -2070,13 +2081,14 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 3.3.0 */ - def withColumns(colsMap: util.Map[String, Column]): DS[Row] = withColumns(colsMap.asScala.toMap) + def withColumns(colsMap: util.Map[String, Column]): Dataset[Row] = withColumns( + colsMap.asScala.toMap) /** * Returns a new Dataset by adding columns or replacing the existing columns that has the same * names. */ - protected def withColumns(colNames: Seq[String], cols: Seq[Column]): DS[Row] + protected def withColumns(colNames: Seq[String], cols: Seq[Column]): Dataset[Row] /** * Returns a new Dataset with a column renamed. This is a no-op if schema doesn't contain @@ -2085,7 +2097,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 2.0.0 */ - def withColumnRenamed(existingName: String, newName: String): DS[Row] = + def withColumnRenamed(existingName: String, newName: String): Dataset[Row] = withColumnsRenamed(Seq(existingName), Seq(newName)) /** @@ -2100,7 +2112,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 3.4.0 */ @throws[AnalysisException] - def withColumnsRenamed(colsMap: Map[String, String]): DS[Row] = { + def withColumnsRenamed(colsMap: Map[String, String]): Dataset[Row] = { val (colNames, newColNames) = colsMap.toSeq.unzip withColumnsRenamed(colNames, newColNames) } @@ -2114,10 +2126,10 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 3.4.0 */ - def withColumnsRenamed(colsMap: util.Map[String, String]): DS[Row] = + def withColumnsRenamed(colsMap: util.Map[String, String]): Dataset[Row] = withColumnsRenamed(colsMap.asScala.toMap) - protected def withColumnsRenamed(colNames: Seq[String], newColNames: Seq[String]): DS[Row] + protected def withColumnsRenamed(colNames: Seq[String], newColNames: Seq[String]): Dataset[Row] /** * Returns a new Dataset by updating an existing column with metadata. @@ -2125,7 +2137,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 3.3.0 */ - def withMetadata(columnName: String, metadata: Metadata): DS[Row] + def withMetadata(columnName: String, metadata: Metadata): Dataset[Row] /** * Returns a new Dataset with a column dropped. This is a no-op if schema doesn't contain column @@ -2198,7 +2210,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 2.0.0 */ - def drop(colName: String): DS[Row] = drop(colName :: Nil: _*) + def drop(colName: String): Dataset[Row] = drop(colName :: Nil: _*) /** * Returns a new Dataset with columns dropped. This is a no-op if schema doesn't contain column @@ -2211,7 +2223,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def drop(colNames: String*): DS[Row] + def drop(colNames: String*): Dataset[Row] /** * Returns a new Dataset with column dropped. @@ -2226,7 +2238,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group untypedrel * @since 2.0.0 */ - def drop(col: Column): DS[Row] = drop(col, Nil: _*) + def drop(col: Column): Dataset[Row] = drop(col, Nil: _*) /** * Returns a new Dataset with columns dropped. @@ -2238,7 +2250,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 3.4.0 */ @scala.annotation.varargs - def drop(col: Column, cols: Column*): DS[Row] + def drop(col: Column, cols: Column*): Dataset[Row] /** * Returns a new Dataset that contains only the unique rows from this Dataset. This is an alias @@ -2253,7 +2265,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.0.0 */ - def dropDuplicates(): DS[T] + def dropDuplicates(): Dataset[T] /** * (Scala-specific) Returns a new Dataset with duplicate rows removed, considering only the @@ -2268,7 +2280,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.0.0 */ - def dropDuplicates(colNames: Seq[String]): DS[T] + def dropDuplicates(colNames: Seq[String]): Dataset[T] /** * Returns a new Dataset with duplicate rows removed, considering only the subset of columns. @@ -2282,7 +2294,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 2.0.0 */ - def dropDuplicates(colNames: Array[String]): DS[T] = + def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toImmutableArraySeq) /** @@ -2299,7 +2311,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 2.0.0 */ @scala.annotation.varargs - def dropDuplicates(col1: String, cols: String*): DS[T] = { + def dropDuplicates(col1: String, cols: String*): Dataset[T] = { val colNames: Seq[String] = col1 +: cols dropDuplicates(colNames) } @@ -2321,7 +2333,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 3.5.0 */ - def dropDuplicatesWithinWatermark(): DS[T] + def dropDuplicatesWithinWatermark(): Dataset[T] /** * Returns a new Dataset with duplicates rows removed, considering only the subset of columns, @@ -2341,7 +2353,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 3.5.0 */ - def dropDuplicatesWithinWatermark(colNames: Seq[String]): DS[T] + def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] /** * Returns a new Dataset with duplicates rows removed, considering only the subset of columns, @@ -2361,7 +2373,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @group typedrel * @since 3.5.0 */ - def dropDuplicatesWithinWatermark(colNames: Array[String]): DS[T] = { + def dropDuplicatesWithinWatermark(colNames: Array[String]): Dataset[T] = { dropDuplicatesWithinWatermark(colNames.toImmutableArraySeq) } @@ -2384,7 +2396,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 3.5.0 */ @scala.annotation.varargs - def dropDuplicatesWithinWatermark(col1: String, cols: String*): DS[T] = { + def dropDuplicatesWithinWatermark(col1: String, cols: String*): Dataset[T] = { val colNames: Seq[String] = col1 +: cols dropDuplicatesWithinWatermark(colNames) } @@ -2418,7 +2430,7 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable { * @since 1.6.0 */ @scala.annotation.varargs - def describe(cols: String*): DS[Row] + def describe(cols: String*): Dataset[Row] /** * Computes specified statistics for numeric and string columns. Available statistics are: