Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,19 @@
],
"sqlState" : "42846"
},
"CANNOT_CREATE_DATA_SOURCE_TABLE" : {
"message" : [
"Failed to create data source table <tableName>:"
],
"subClass" : {
"EXTERNAL_METADATA_UNSUPPORTED" : {
"message" : [
"provider '<provider>' does not support external metadata but a schema is provided. Please remove the schema when creating the table."
]
}
},
"sqlState" : "42KDE"
},
"CANNOT_DECODE_URL" : {
"message" : [
"The provided URL cannot be decoded: <url>. Please ensure that the URL is properly formatted and try again."
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
layout: global
title: CANNOT_CREATE_DATA_SOURCE_TABLE error class
displayTitle: CANNOT_CREATE_DATA_SOURCE_TABLE error class
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---

[SQLSTATE: 42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Failed to create data source table `<tableName>`:

This error class has the following derived error classes:

## EXTERNAL_METADATA_UNSUPPORTED

provider '`<provider>`' does not support external metadata but a schema is provided. Please remove the schema when creating the table.


8 changes: 8 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ Cannot convert SQL `<sqlColumn>` to Protobuf `<protobufColumn>` because schema i

Cannot convert SQL `<sqlColumn>` to Protobuf `<protobufColumn>` because `<data>` is not in defined values for enum: `<enumString>`.

### [CANNOT_CREATE_DATA_SOURCE_TABLE](sql-error-conditions-cannot-create-data-source-table-error-class.html)

[SQLSTATE: 42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Failed to create data source table `<tableName>`:

For more details see [CANNOT_CREATE_DATA_SOURCE_TABLE](sql-error-conditions-cannot-create-data-source-table-error-class.html)

### CANNOT_DECODE_URL

[SQLSTATE: 22546](sql-error-conditions-sqlstates.html#class-22-data-exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ object QuotingUtils {
}
}

def fullyQuoted(ident: Identifier): String = {
if (ident.namespace.nonEmpty) {
ident.namespace.map(quoteIdentifier).mkString(".") + "." + quoteIdentifier(ident.name)
} else {
quoteIdentifier(ident.name)
}
}

def escapeSingleQuotedString(str: String): String = {
val builder = new StringBuilder

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,16 @@ private[sql] object CatalogV2Implicits {
}

implicit class IdentifierHelper(ident: Identifier) {
/* Quote the identifier if needed. */
def quoted: String = {
QuotingUtils.quoted(ident)
}

/* Always quote the identifier. */
def fullyQuoted: String = {
QuotingUtils.fullyQuoted(ident)
}

def original: String = ident.namespace() :+ ident.name() mkString "."

def asMultipartIdentifier: Seq[String] = (ident.namespace :+ ident.name).toImmutableArraySeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Expression, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils}
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, truncatedString, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability}
import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics}
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
Expand Down Expand Up @@ -61,7 +62,19 @@ case class DataSourceV2Relation(
Nil
}

override def name: String = table.name()
override def name: String = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
(catalog, identifier) match {
case (Some(cat), Some(ident)) => s"${quoteIfNeeded(cat.name())}.${ident.quoted}"
case (None, None) => table.name()
case _ =>
throw SparkException.internalError(
"Invalid catalog and identifier pair. Both 'catalog' and 'identifier' must be " +
s"specified or leave as None. Current input - " +
s"catalog: '${catalog.map(_.name()).getOrElse(None)}', " +
s"identifier: ${identifier.map(_.quoted).getOrElse(None)}.")
}
}

override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA)

Expand Down Expand Up @@ -127,7 +140,7 @@ case class DataSourceV2ScanRelation(
keyGroupedPartitioning: Option[Seq[Expression]] = None,
ordering: Option[Seq[SortOrder]] = None) extends LeafNode with NamedRelation {

override def name: String = relation.table.name()
override def name: String = relation.name

override def simpleString(maxFields: Int): String = {
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Lo
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1, DataSource}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
Expand Down Expand Up @@ -612,15 +612,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}

private def isV2Provider(provider: String): Boolean = {
// Return earlier since `lookupDataSourceV2` may fail to resolve provider "hive" to
// `HiveFileFormat`, when running tests in sql/core.
if (DDLUtils.isHiveTable(Some(provider))) return false
DataSource.lookupDataSourceV2(provider, conf) match {
// TODO(SPARK-28396): Currently file source v2 can't work with tables.
case Some(_: FileDataSourceV2) => false
case Some(_) => true
case _ => false
}
DataSourceV2Utils.getTableProvider(provider, conf).isDefined
}

private object DatabaseInSessionCatalog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SessionConfigSupport, SupportsCatalogOptions, SupportsRead, Table, TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -151,6 +153,20 @@ private[sql] object DataSourceV2Utils extends Logging {
}
}

/**
* Returns the table provider for the given format, or None if it cannot be found.
*/
def getTableProvider(provider: String, conf: SQLConf): Option[TableProvider] = {
// Return earlier since `lookupDataSourceV2` may fail to resolve provider "hive" to
// `HiveFileFormat`, when running tests in sql/core.
if (DDLUtils.isHiveTable(Some(provider))) return None
DataSource.lookupDataSourceV2(provider, conf) match {
// TODO(SPARK-28396): Currently file source v2 can't work with tables.
case Some(p) if !p.isInstanceOf[FileDataSourceV2] => Some(p)
case _ => None
}
}

private lazy val objectMapper = new ObjectMapper()
def getOptionsWithPaths(
extraOptions: CaseInsensitiveMap[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import java.util
import scala.collection.mutable
import scala.jdk.CollectionConverters._

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog}
import org.apache.spark.sql.catalyst.util.TypeUtils._
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableCatalogCapability, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
Expand Down Expand Up @@ -71,9 +72,44 @@ class V2SessionCatalog(catalog: SessionCatalog)
}
}

// Get data source options from the catalog table properties with the path option.
private def getDataSourceOptions(
properties: Map[String, String],
storage: CatalogStorageFormat): CaseInsensitiveStringMap = {
val propertiesWithPath = properties ++
storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
new CaseInsensitiveStringMap(propertiesWithPath.asJava)
}

override def loadTable(ident: Identifier): Table = {
try {
V1Table(catalog.getTableMetadata(ident.asTableIdentifier))
val table = catalog.getTableMetadata(ident.asTableIdentifier)
if (table.provider.isDefined) {
DataSourceV2Utils.getTableProvider(table.provider.get, conf) match {
case Some(provider) =>
// Get the table properties during creation and append the path option
// to the properties.
val dsOptions = getDataSourceOptions(table.properties, table.storage)
// If the source accepts external table metadata, we can pass the schema and
// partitioning information stored in Hive to `getTable` to avoid expensive
// schema/partitioning inference.
if (provider.supportsExternalMetadata()) {
provider.getTable(
table.schema,
getV2Partitioning(table),
dsOptions.asCaseSensitiveMap())
} else {
provider.getTable(
provider.inferSchema(dsOptions),
provider.inferPartitioning(dsOptions),
dsOptions.asCaseSensitiveMap())
}
case _ =>
V1Table(table)
}
} else {
V1Table(table)
}
} catch {
case _: NoSuchDatabaseException =>
throw QueryCompilationErrors.noSuchTableError(ident)
Expand All @@ -96,6 +132,16 @@ class V2SessionCatalog(catalog: SessionCatalog)
throw QueryCompilationErrors.timeTravelUnsupportedError(toSQLId(nameParts))
}

private def getV2Partitioning(table: CatalogTable): Array[Transform] = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val v2Partitioning = table.partitionColumnNames.asTransforms
val v2Bucketing = table.bucketSpec.map(
spec => Array(spec.asTransform)).getOrElse(Array.empty)
val v2Clustering = table.clusterBySpec.map(
spec => Array(spec.asTransform)).getOrElse(Array.empty)
v2Partitioning ++ v2Bucketing ++ v2Clustering
}

override def invalidateTable(ident: Identifier): Unit = {
catalog.refreshTable(ident.asTableIdentifier)
}
Expand All @@ -114,32 +160,68 @@ class V2SessionCatalog(catalog: SessionCatalog)
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
partitions.toImmutableArraySeq.convertTransforms
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName)
val tableProperties = properties.asScala
val tableProperties = properties.asScala.toMap
val location = Option(properties.get(TableCatalog.PROP_LOCATION))
val storage = DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap))
.copy(locationUri = location.map(CatalogUtils.stringToURI))
val storage = DataSource.buildStorageFormatFromOptions(toOptions(tableProperties))
.copy(locationUri = location.map(CatalogUtils.stringToURI))
val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
val tableType = if (isExternal || location.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
}

val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match {
// If the provider does not support external metadata, users should not be allowed to
// specify custom schema when creating the data source table, since the schema will not
// be used when loading the table.
case Some(p) if !p.supportsExternalMetadata() =>
if (schema.nonEmpty) {
throw new SparkUnsupportedOperationException(
errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
messageParameters = Map("tableName" -> ident.fullyQuoted, "provider" -> provider))
}
// V2CreateTablePlan does not allow non-empty partitions when schema is empty. This
// is checked in `PreProcessTableCreation` rule.
assert(partitions.isEmpty,
s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}")
(schema, partitions)

case Some(tableProvider) =>
assert(tableProvider.supportsExternalMetadata())
lazy val dsOptions = getDataSourceOptions(tableProperties, storage)
if (schema.isEmpty) {
assert(partitions.isEmpty,
s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}")
// Infer the schema and partitions and store them in the catalog.
(tableProvider.inferSchema(dsOptions), tableProvider.inferPartitioning(dsOptions))
} else if (partitions.isEmpty) {
(schema, tableProvider.inferPartitioning(dsOptions))
} else {
(schema, partitions)
}

case _ =>
// The provider is not a V2 provider so we return the schema and partitions as is.
(schema, partitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we fail here if it's not a valid data source?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can do it latter. It's the current behavior that allows any table provider.

}

val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
newPartitions.toImmutableArraySeq.convertTransforms

val tableDesc = CatalogTable(
identifier = ident.asTableIdentifier,
tableType = tableType,
storage = storage,
schema = schema,
schema = newSchema,
provider = Some(provider),
partitionColumnNames = partitionColumns,
bucketSpec = maybeBucketSpec,
properties = tableProperties.toMap ++
properties = tableProperties ++
maybeClusterBySpec.map(
clusterBySpec => ClusterBySpec.toProperty(schema, clusterBySpec, conf.resolver)),
clusterBySpec => ClusterBySpec.toProperty(newSchema, clusterBySpec, conf.resolver)),
tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
comment = Option(properties.get(TableCatalog.PROP_COMMENT)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public StructType readSchema() {

@Override
public InputPartition[] planInputPartitions() {
return new InputPartition[0];
InputPartition[] partitions = new InputPartition[1];
partitions[0] = new JavaRangeInputPartition(0, 2);
return partitions;
}
}

Expand Down
Loading