diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index cd9748eaa6f28..c102dd251e34a 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -855,7 +855,7 @@ intervalUnit ; colPosition - : FIRST | AFTER multipartIdentifier + : position=FIRST | position=AFTER afterCol=errorCapturingIdentifier ; dataType diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java index 56d13ef742cea..a56007b2a5ab8 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java @@ -17,14 +17,15 @@ package org.apache.spark.sql.connector.catalog; -import com.google.common.base.Preconditions; -import org.apache.spark.annotation.Experimental; - import java.util.Arrays; import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; +import com.google.common.base.Preconditions; + +import org.apache.spark.annotation.Experimental; + /** * An {@link Identifier} implementation. */ @@ -51,19 +52,11 @@ public String name() { return name; } - private String escapeQuote(String part) { - if (part.contains("`")) { - return part.replace("`", "``"); - } else { - return part; - } - } - @Override public String toString() { return Stream.concat(Stream.of(namespace), Stream.of(name)) - .map(part -> '`' + escapeQuote(part) + '`') - .collect(Collectors.joining(".")); + .map(CatalogV2Implicits::quote) + .collect(Collectors.joining(".")); } @Override diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java index 20c22388b0ef9..783439935c8d2 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java @@ -17,11 +17,12 @@ package org.apache.spark.sql.connector.catalog; -import org.apache.spark.annotation.Experimental; -import org.apache.spark.sql.types.DataType; - import java.util.Arrays; import java.util.Objects; +import javax.annotation.Nullable; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.types.DataType; /** * TableChange subclasses represent requested changes to a table. These are passed to @@ -76,7 +77,7 @@ static TableChange removeProperty(String property) { * @return a TableChange for the addition */ static TableChange addColumn(String[] fieldNames, DataType dataType) { - return new AddColumn(fieldNames, dataType, true, null); + return new AddColumn(fieldNames, dataType, true, null, null); } /** @@ -92,7 +93,7 @@ static TableChange addColumn(String[] fieldNames, DataType dataType) { * @return a TableChange for the addition */ static TableChange addColumn(String[] fieldNames, DataType dataType, boolean isNullable) { - return new AddColumn(fieldNames, dataType, isNullable, null); + return new AddColumn(fieldNames, dataType, isNullable, null, null); } /** @@ -113,7 +114,30 @@ static TableChange addColumn( DataType dataType, boolean isNullable, String comment) { - return new AddColumn(fieldNames, dataType, isNullable, comment); + return new AddColumn(fieldNames, dataType, isNullable, comment, null); + } + + /** + * Create a TableChange for adding a column. + *

+ * If the field already exists, the change will result in an {@link IllegalArgumentException}. + * If the new field is nested and its parent does not exist or is not a struct, the change will + * result in an {@link IllegalArgumentException}. + * + * @param fieldNames field names of the new column + * @param dataType the new column's data type + * @param isNullable whether the new column can contain null + * @param comment the new field's comment string + * @param position the new columns's position + * @return a TableChange for the addition + */ + static TableChange addColumn( + String[] fieldNames, + DataType dataType, + boolean isNullable, + String comment, + ColumnPosition position) { + return new AddColumn(fieldNames, dataType, isNullable, comment, position); } /** @@ -180,6 +204,21 @@ static TableChange updateColumnComment(String[] fieldNames, String newComment) { return new UpdateColumnComment(fieldNames, newComment); } + /** + * Create a TableChange for updating the position of a field. + *

+ * The name is used to find the field to update. + *

+ * If the field does not exist, the change will result in an {@link IllegalArgumentException}. + * + * @param fieldNames field names of the column to update + * @param newPosition the new position + * @return a TableChange for the update + */ + static TableChange updateColumnPosition(String[] fieldNames, ColumnPosition newPosition) { + return new UpdateColumnPosition(fieldNames, newPosition); + } + /** * Create a TableChange for deleting a field. *

@@ -259,6 +298,69 @@ public int hashCode() { } } + interface ColumnPosition { + + static ColumnPosition first() { + return First.SINGLETON; + } + + static ColumnPosition after(String column) { + return new After(column); + } + } + + /** + * Column position FIRST means the specified column should be the first column. + * Note that, the specified column may be a nested field, and then FIRST means this field should + * be the first one within the struct. + */ + final class First implements ColumnPosition { + private static final First SINGLETON = new First(); + + private First() {} + + @Override + public String toString() { + return "FIRST"; + } + } + + /** + * Column position AFTER means the specified column should be put after the given `column`. + * Note that, the specified column may be a nested field, and then the given `column` refers to + * a field in the same struct. + */ + final class After implements ColumnPosition { + private final String column; + + private After(String column) { + assert column != null; + this.column = column; + } + + public String column() { + return column; + } + + @Override + public String toString() { + return "AFTER " + column; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + After after = (After) o; + return column.equals(after.column); + } + + @Override + public int hashCode() { + return Objects.hash(column); + } + } + interface ColumnChange extends TableChange { String[] fieldNames(); } @@ -275,12 +377,19 @@ final class AddColumn implements ColumnChange { private final DataType dataType; private final boolean isNullable; private final String comment; - - private AddColumn(String[] fieldNames, DataType dataType, boolean isNullable, String comment) { + private final ColumnPosition position; + + private AddColumn( + String[] fieldNames, + DataType dataType, + boolean isNullable, + String comment, + ColumnPosition position) { this.fieldNames = fieldNames; this.dataType = dataType; this.isNullable = isNullable; this.comment = comment; + this.position = position; } @Override @@ -296,10 +405,16 @@ public boolean isNullable() { return isNullable; } + @Nullable public String comment() { return comment; } + @Nullable + public ColumnPosition position() { + return position; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -308,12 +423,13 @@ public boolean equals(Object o) { return isNullable == addColumn.isNullable && Arrays.equals(fieldNames, addColumn.fieldNames) && dataType.equals(addColumn.dataType) && - comment.equals(addColumn.comment); + Objects.equals(comment, addColumn.comment) && + Objects.equals(position, addColumn.position); } @Override public int hashCode() { - int result = Objects.hash(dataType, isNullable, comment); + int result = Objects.hash(dataType, isNullable, comment, position); result = 31 * result + Arrays.hashCode(fieldNames); return result; } @@ -453,6 +569,48 @@ public int hashCode() { } } + /** + * A TableChange to update the position of a field. + *

+ * The field names are used to find the field to update. + *

+ * If the field does not exist, the change must result in an {@link IllegalArgumentException}. + */ + final class UpdateColumnPosition implements ColumnChange { + private final String[] fieldNames; + private final ColumnPosition position; + + private UpdateColumnPosition(String[] fieldNames, ColumnPosition position) { + this.fieldNames = fieldNames; + this.position = position; + } + + @Override + public String[] fieldNames() { + return fieldNames; + } + + public ColumnPosition position() { + return position; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + UpdateColumnPosition that = (UpdateColumnPosition) o; + return Arrays.equals(fieldNames, that.fieldNames) && + position.equals(that.position); + } + + @Override + public int hashCode() { + int result = Objects.hash(position); + result = 31 * result + Arrays.hashCode(fieldNames); + return result; + } + } + /** * A TableChange to delete a field. *

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 8183aa36a5b90..3361173c9962f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -35,19 +35,32 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case AlterTableAddColumnsStatement( nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => val changes = cols.map { col => - TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull) + TableChange.addColumn( + col.name.toArray, + col.dataType, + true, + col.comment.orNull, + col.position.orNull) } createAlterTable(nameParts, catalog, tbl, changes) case AlterTableAlterColumnStatement( - nameParts @ NonSessionCatalogAndTable(catalog, tbl), colName, dataType, comment) => + nameParts @ NonSessionCatalogAndTable(catalog, tbl), colName, dataType, comment, pos) => + val colNameArray = colName.toArray val typeChange = dataType.map { newDataType => - TableChange.updateColumnType(colName.toArray, newDataType, true) + TableChange.updateColumnType(colNameArray, newDataType, true) } val commentChange = comment.map { newComment => - TableChange.updateColumnComment(colName.toArray, newComment) + TableChange.updateColumnComment(colNameArray, newComment) } - createAlterTable(nameParts, catalog, tbl, typeChange.toSeq ++ commentChange) + val positionChange = pos.map { newPosition => + TableChange.updateColumnPosition(colNameArray, newPosition) + } + createAlterTable( + nameParts, + catalog, + tbl, + typeChange.toSeq ++ commentChange ++ positionChange) case AlterTableRenameColumnStatement( nameParts @ NonSessionCatalogAndTable(catalog, tbl), col, newName) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 1beadc5e37801..8e51f65144042 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate import org.apache.spark.sql.catalyst.util.IntervalUtils import org.apache.spark.sql.catalyst.util.IntervalUtils.IntervalUnit import org.apache.spark.sql.connector.catalog.SupportsNamespaces +import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -2803,19 +2804,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)) } + override def visitColPosition(ctx: ColPositionContext): ColumnPosition = { + ctx.position.getType match { + case SqlBaseParser.FIRST => ColumnPosition.first() + case SqlBaseParser.AFTER => ColumnPosition.after(ctx.afterCol.getText) + } + } + /** * Parse new column info from ADD COLUMN into a QualifiedColType. */ override def visitQualifiedColTypeWithPosition( ctx: QualifiedColTypeWithPositionContext): QualifiedColType = withOrigin(ctx) { - if (ctx.colPosition != null) { - operationNotAllowed("ALTER TABLE table ADD COLUMN ... FIRST | AFTER otherCol", ctx) - } - QualifiedColType( typedVisit[Seq[String]](ctx.name), typedVisit[DataType](ctx.dataType), - Option(ctx.comment).map(string)) + Option(ctx.comment).map(string), + Option(ctx.colPosition).map(typedVisit[ColumnPosition])) } /** @@ -2863,19 +2868,17 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging override def visitAlterTableColumn( ctx: AlterTableColumnContext): LogicalPlan = withOrigin(ctx) { val verb = if (ctx.CHANGE != null) "CHANGE" else "ALTER" - if (ctx.colPosition != null) { - operationNotAllowed(s"ALTER TABLE table $verb COLUMN ... FIRST | AFTER otherCol", ctx) - } - - if (ctx.dataType == null && ctx.comment == null) { - operationNotAllowed(s"ALTER TABLE table $verb COLUMN requires a TYPE or a COMMENT", ctx) + if (ctx.dataType == null && ctx.comment == null && ctx.colPosition == null) { + operationNotAllowed( + s"ALTER TABLE table $verb COLUMN requires a TYPE or a COMMENT or a FIRST/AFTER", ctx) } AlterTableAlterColumnStatement( visitMultipartIdentifier(ctx.table), typedVisit[Seq[String]](ctx.column), Option(ctx.dataType).map(typedVisit[DataType]), - Option(ctx.comment).map(string)) + Option(ctx.comment).map(string), + Option(ctx.colPosition).map(typedVisit[ColumnPosition])) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 13356bfd04ffd..a818cc441ec2b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.ViewType import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.{DataType, StructType} @@ -141,7 +142,11 @@ case class ReplaceTableAsSelectStatement( /** * Column data as parsed by ALTER TABLE ... ADD COLUMNS. */ -case class QualifiedColType(name: Seq[String], dataType: DataType, comment: Option[String]) +case class QualifiedColType( + name: Seq[String], + dataType: DataType, + comment: Option[String], + position: Option[ColumnPosition]) /** * ALTER TABLE ... ADD COLUMNS command, as parsed from SQL. @@ -157,7 +162,8 @@ case class AlterTableAlterColumnStatement( tableName: Seq[String], column: Seq[String], dataType: Option[DataType], - comment: Option[String]) extends ParsedStatement + comment: Option[String], + position: Option[ColumnPosition]) extends ParsedStatement /** * ALTER TABLE ... RENAME COLUMN command, as parsed from SQL. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala index 882e968f34b59..86e5894b369aa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala @@ -118,7 +118,7 @@ private[sql] object CatalogV2Implicits { def quoted: String = parts.map(quote).mkString(".") } - private def quote(part: String): String = { + def quote(part: String): String = { if (part.contains(".") || part.contains("`")) { s"`${part.replace("`", "``")}`" } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 0dcd595ded191..2f4914dd7db30 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -104,26 +104,16 @@ private[sql] object CatalogV2Util { case add: AddColumn => add.fieldNames match { case Array(name) => - val newField = StructField(name, add.dataType, nullable = add.isNullable) - Option(add.comment) match { - case Some(comment) => - schema.add(newField.withComment(comment)) - case _ => - schema.add(newField) - } + val field = StructField(name, add.dataType, nullable = add.isNullable) + val newField = Option(add.comment).map(field.withComment).getOrElse(field) + addField(schema, newField, add.position()) case names => replace(schema, names.init, parent => parent.dataType match { case parentType: StructType => val field = StructField(names.last, add.dataType, nullable = add.isNullable) - val newParentType = Option(add.comment) match { - case Some(comment) => - parentType.add(field.withComment(comment)) - case None => - parentType.add(field) - } - - Some(StructField(parent.name, newParentType, parent.nullable, parent.metadata)) + val newField = Option(add.comment).map(field.withComment).getOrElse(field) + Some(parent.copy(dataType = addField(parentType, newField, add.position()))) case _ => throw new IllegalArgumentException(s"Not a struct: ${names.init.last}") @@ -147,6 +137,27 @@ private[sql] object CatalogV2Util { replace(schema, update.fieldNames, field => Some(field.withComment(update.newComment))) + case update: UpdateColumnPosition => + def updateFieldPos(struct: StructType, name: String): StructType = { + val oldField = struct.fields.find(_.name == name).getOrElse { + throw new IllegalArgumentException("Field not found: " + name) + } + val withFieldRemoved = StructType(struct.fields.filter(_ != oldField)) + addField(withFieldRemoved, oldField, update.position()) + } + + update.fieldNames() match { + case Array(name) => + updateFieldPos(schema, name) + case names => + replace(schema, names.init, parent => parent.dataType match { + case parentType: StructType => + Some(parent.copy(dataType = updateFieldPos(parentType, names.last))) + case _ => + throw new IllegalArgumentException(s"Not a struct: ${names.init.last}") + }) + } + case delete: DeleteColumn => replace(schema, delete.fieldNames, _ => None) @@ -157,6 +168,25 @@ private[sql] object CatalogV2Util { } } + private def addField( + schema: StructType, + field: StructField, + position: ColumnPosition): StructType = { + if (position == null) { + schema.add(field) + } else if (position.isInstanceOf[First]) { + StructType(field +: schema.fields) + } else { + val afterCol = position.asInstanceOf[After].column() + val fieldIndex = schema.fields.indexWhere(_.name == afterCol) + if (fieldIndex == -1) { + throw new IllegalArgumentException("AFTER column not found: " + afterCol) + } + val (before, after) = schema.fields.splitAt(fieldIndex + 1) + StructType(before ++ (field +: after)) + } + } + private def replace( struct: StructType, fieldNames: Seq[String], diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index b0d9a00d653ce..2d4a19a0a2ea7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, Loc import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first} import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType} @@ -492,7 +493,7 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int"), AlterTableAddColumnsStatement(Seq("table_name"), Seq( - QualifiedColType(Seq("x"), IntegerType, None) + QualifiedColType(Seq("x"), IntegerType, None, None) ))) } @@ -500,8 +501,8 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS x int, y string"), AlterTableAddColumnsStatement(Seq("table_name"), Seq( - QualifiedColType(Seq("x"), IntegerType, None), - QualifiedColType(Seq("y"), StringType, None) + QualifiedColType(Seq("x"), IntegerType, None, None), + QualifiedColType(Seq("y"), StringType, None, None) ))) } @@ -509,7 +510,7 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS x int"), AlterTableAddColumnsStatement(Seq("table_name"), Seq( - QualifiedColType(Seq("x"), IntegerType, None) + QualifiedColType(Seq("x"), IntegerType, None, None) ))) } @@ -517,7 +518,7 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS (x int)"), AlterTableAddColumnsStatement(Seq("table_name"), Seq( - QualifiedColType(Seq("x"), IntegerType, None) + QualifiedColType(Seq("x"), IntegerType, None, None) ))) } @@ -525,7 +526,7 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS (x int COMMENT 'doc')"), AlterTableAddColumnsStatement(Seq("table_name"), Seq( - QualifiedColType(Seq("x"), IntegerType, Some("doc")) + QualifiedColType(Seq("x"), IntegerType, Some("doc"), None) ))) } @@ -533,7 +534,21 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int COMMENT 'doc'"), AlterTableAddColumnsStatement(Seq("table_name"), Seq( - QualifiedColType(Seq("x"), IntegerType, Some("doc")) + QualifiedColType(Seq("x"), IntegerType, Some("doc"), None) + ))) + } + + test("alter table: add column with position") { + comparePlans( + parsePlan("ALTER TABLE table_name ADD COLUMN x int FIRST"), + AlterTableAddColumnsStatement(Seq("table_name"), Seq( + QualifiedColType(Seq("x"), IntegerType, None, Some(first())) + ))) + + comparePlans( + parsePlan("ALTER TABLE table_name ADD COLUMN x int AFTER y"), + AlterTableAddColumnsStatement(Seq("table_name"), Seq( + QualifiedColType(Seq("x"), IntegerType, None, Some(after("y"))) ))) } @@ -541,25 +556,19 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc'"), AlterTableAddColumnsStatement(Seq("table_name"), Seq( - QualifiedColType(Seq("x", "y", "z"), IntegerType, Some("doc")) + QualifiedColType(Seq("x", "y", "z"), IntegerType, Some("doc"), None) ))) } test("alter table: add multiple columns with nested column name") { comparePlans( - parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc', a.b string"), + parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc', a.b string FIRST"), AlterTableAddColumnsStatement(Seq("table_name"), Seq( - QualifiedColType(Seq("x", "y", "z"), IntegerType, Some("doc")), - QualifiedColType(Seq("a", "b"), StringType, None) + QualifiedColType(Seq("x", "y", "z"), IntegerType, Some("doc"), None), + QualifiedColType(Seq("a", "b"), StringType, None, Some(first())) ))) } - test("alter table: add column at position (not supported)") { - assertUnsupported("ALTER TABLE table_name ADD COLUMNS name bigint COMMENT 'doc' FIRST, a.b int") - assertUnsupported("ALTER TABLE table_name ADD COLUMN name bigint COMMENT 'doc' FIRST") - assertUnsupported("ALTER TABLE table_name ADD COLUMN name string AFTER a.b") - } - test("alter table: set location") { comparePlans( parsePlan("ALTER TABLE a.b.c SET LOCATION 'new location'"), @@ -589,6 +598,7 @@ class DDLParserSuite extends AnalysisTest { Seq("table_name"), Seq("a", "b", "c"), Some(LongType), + None, None)) } @@ -599,6 +609,7 @@ class DDLParserSuite extends AnalysisTest { Seq("table_name"), Seq("a", "b", "c"), Some(LongType), + None, None)) } @@ -609,22 +620,31 @@ class DDLParserSuite extends AnalysisTest { Seq("table_name"), Seq("a", "b", "c"), None, - Some("new comment"))) + Some("new comment"), + None)) } - test("alter table: update column type and comment") { + test("alter table: update column position") { comparePlans( - parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c TYPE bigint COMMENT 'new comment'"), + parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c FIRST"), AlterTableAlterColumnStatement( Seq("table_name"), Seq("a", "b", "c"), - Some(LongType), - Some("new comment"))) + None, + None, + Some(first()))) } - test("alter table: change column position (not supported)") { - assertUnsupported("ALTER TABLE table_name CHANGE COLUMN name COMMENT 'doc' FIRST") - assertUnsupported("ALTER TABLE table_name CHANGE COLUMN name TYPE INT AFTER other_col") + test("alter table: update column type, comment and position") { + comparePlans( + parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c " + + "TYPE bigint COMMENT 'new comment' AFTER d"), + AlterTableAlterColumnStatement( + Seq("table_name"), + Seq("a", "b", "c"), + Some(LongType), + Some("new comment"), + Some(after("d")))) } test("alter table: drop column") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 53eb7dae2ca0a..75651bf5e24d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -55,13 +55,18 @@ class ResolveSessionCatalog( AlterTableAddColumnsCommand(tbl.asTableIdentifier, cols.map(convertToStructField)) }.getOrElse { val changes = cols.map { col => - TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull) + TableChange.addColumn( + col.name.toArray, + col.dataType, + true, + col.comment.orNull, + col.position.orNull) } createAlterTable(nameParts, catalog, tbl, changes) } case AlterTableAlterColumnStatement( - nameParts @ SessionCatalogAndTable(catalog, tbl), colName, dataType, comment) => + nameParts @ SessionCatalogAndTable(catalog, tbl), colName, dataType, comment, pos) => loadTable(catalog, tbl.asIdentifier).collect { case v1Table: V1Table => if (colName.length > 1) { @@ -72,6 +77,10 @@ class ResolveSessionCatalog( throw new AnalysisException( "ALTER COLUMN with v1 tables must specify new data type.") } + if (pos.isDefined) { + throw new AnalysisException("" + + "ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.") + } val builder = new MetadataBuilder // Add comment to metadata comment.map(c => builder.putString("comment", c)) @@ -87,13 +96,21 @@ class ResolveSessionCatalog( builder.build()) AlterTableChangeColumnCommand(tbl.asTableIdentifier, colName(0), newColumn) }.getOrElse { + val colNameArray = colName.toArray val typeChange = dataType.map { newDataType => - TableChange.updateColumnType(colName.toArray, newDataType, true) + TableChange.updateColumnType(colNameArray, newDataType, true) } val commentChange = comment.map { newComment => - TableChange.updateColumnComment(colName.toArray, newComment) + TableChange.updateColumnComment(colNameArray, newComment) + } + val positionChange = pos.map { newPosition => + TableChange.updateColumnPosition(colNameArray, newPosition) } - createAlterTable(nameParts, catalog, tbl, typeChange.toSeq ++ commentChange) + createAlterTable( + nameParts, + catalog, + tbl, + typeChange.toSeq ++ commentChange ++ positionChange) } case AlterTableRenameColumnStatement( diff --git a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out index 21a344c071bc4..82326346b361c 100644 --- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out @@ -27,7 +27,7 @@ struct<> -- !query 2 output org.apache.spark.sql.catalyst.parser.ParseException -Operation not allowed: ALTER TABLE table CHANGE COLUMN requires a TYPE or a COMMENT(line 1, pos 0) +Operation not allowed: ALTER TABLE table CHANGE COLUMN requires a TYPE or a COMMENT or a FIRST/AFTER(line 1, pos 0) == SQL == ALTER TABLE test_change CHANGE a @@ -87,13 +87,8 @@ ALTER TABLE test_change CHANGE a TYPE INT AFTER b -- !query 8 schema struct<> -- !query 8 output -org.apache.spark.sql.catalyst.parser.ParseException - -Operation not allowed: ALTER TABLE table CHANGE COLUMN ... FIRST | AFTER otherCol(line 1, pos 0) - -== SQL == -ALTER TABLE test_change CHANGE a TYPE INT AFTER b -^^^ +org.apache.spark.sql.AnalysisException +ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.; -- !query 9 @@ -101,13 +96,8 @@ ALTER TABLE test_change CHANGE b TYPE STRING FIRST -- !query 9 schema struct<> -- !query 9 output -org.apache.spark.sql.catalyst.parser.ParseException - -Operation not allowed: ALTER TABLE table CHANGE COLUMN ... FIRST | AFTER otherCol(line 1, pos 0) - -== SQL == -ALTER TABLE test_change CHANGE b TYPE STRING FIRST -^^^ +org.apache.spark.sql.AnalysisException +ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.; -- !query 10 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index 7392850f276cc..2ba3c99dfbefd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -101,6 +101,49 @@ trait AlterTableTests extends SharedSparkSession { } } + test("AlterTable: add column with position") { + val t = s"${catalogAndNamespace}table_name" + withTable(t) { + sql(s"CREATE TABLE $t (point struct) USING $v2Format") + + sql(s"ALTER TABLE $t ADD COLUMN a string FIRST") + assert(getTableMetadata(t).schema == new StructType() + .add("a", StringType) + .add("point", new StructType().add("x", IntegerType))) + + sql(s"ALTER TABLE $t ADD COLUMN b string AFTER point") + assert(getTableMetadata(t).schema == new StructType() + .add("a", StringType) + .add("point", new StructType().add("x", IntegerType)) + .add("b", StringType)) + + val e1 = intercept[SparkException]( + sql(s"ALTER TABLE $t ADD COLUMN c string AFTER non_exist")) + assert(e1.getMessage().contains("AFTER column not found")) + + sql(s"ALTER TABLE $t ADD COLUMN point.y int FIRST") + assert(getTableMetadata(t).schema == new StructType() + .add("a", StringType) + .add("point", new StructType() + .add("y", IntegerType) + .add("x", IntegerType)) + .add("b", StringType)) + + sql(s"ALTER TABLE $t ADD COLUMN point.z int AFTER x") + assert(getTableMetadata(t).schema == new StructType() + .add("a", StringType) + .add("point", new StructType() + .add("y", IntegerType) + .add("x", IntegerType) + .add("z", IntegerType)) + .add("b", StringType)) + + val e2 = intercept[SparkException]( + sql(s"ALTER TABLE $t ADD COLUMN point.x2 int AFTER non_exist")) + assert(e2.getMessage().contains("AFTER column not found")) + } + } + test("AlterTable: add multiple columns") { val t = s"${catalogAndNamespace}table_name" withTable(t) { @@ -471,6 +514,61 @@ trait AlterTableTests extends SharedSparkSession { } } + test("AlterTable: update column position") { + val t = s"${catalogAndNamespace}table_name" + withTable(t) { + sql(s"CREATE TABLE $t (a int, b int, point struct) USING $v2Format") + + sql(s"ALTER TABLE $t ALTER COLUMN b FIRST") + assert(getTableMetadata(t).schema == new StructType() + .add("b", IntegerType) + .add("a", IntegerType) + .add("point", new StructType() + .add("x", IntegerType) + .add("y", IntegerType) + .add("z", IntegerType))) + + sql(s"ALTER TABLE $t ALTER COLUMN b AFTER point") + assert(getTableMetadata(t).schema == new StructType() + .add("a", IntegerType) + .add("point", new StructType() + .add("x", IntegerType) + .add("y", IntegerType) + .add("z", IntegerType)) + .add("b", IntegerType)) + + val e1 = intercept[SparkException]( + sql(s"ALTER TABLE $t ALTER COLUMN b AFTER non_exist")) + assert(e1.getMessage.contains("AFTER column not found")) + + sql(s"ALTER TABLE $t ALTER COLUMN point.y FIRST") + assert(getTableMetadata(t).schema == new StructType() + .add("a", IntegerType) + .add("point", new StructType() + .add("y", IntegerType) + .add("x", IntegerType) + .add("z", IntegerType)) + .add("b", IntegerType)) + + sql(s"ALTER TABLE $t ALTER COLUMN point.y AFTER z") + assert(getTableMetadata(t).schema == new StructType() + .add("a", IntegerType) + .add("point", new StructType() + .add("x", IntegerType) + .add("z", IntegerType) + .add("y", IntegerType)) + .add("b", IntegerType)) + + val e2 = intercept[SparkException]( + sql(s"ALTER TABLE $t ALTER COLUMN point.y AFTER non_exist")) + assert(e2.getMessage.contains("AFTER column not found")) + + // `AlterTable.resolved` checks column existence. + intercept[AnalysisException]( + sql(s"ALTER TABLE $t ALTER COLUMN a.y AFTER x")) + } + } + test("AlterTable: update column type and comment") { val t = s"${catalogAndNamespace}table_name" withTable(t) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 6675636c0e62f..60e6c018a3b66 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1191,7 +1191,7 @@ class DataSourceV2SQLSuite } test("tableCreation: duplicate column names in the table definition") { - val errorMsg = "Found duplicate column(s) in the table definition of `t`" + val errorMsg = "Found duplicate column(s) in the table definition of t" Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { assertAnalysisError( @@ -1215,7 +1215,7 @@ class DataSourceV2SQLSuite } test("tableCreation: duplicate nested column names in the table definition") { - val errorMsg = "Found duplicate column(s) in the table definition of `t`" + val errorMsg = "Found duplicate column(s) in the table definition of t" Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { assertAnalysisError( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 10873678e05f2..2bb121b27e7d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -181,6 +181,16 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession { assert(e.contains("Hive built-in ORC data source must be used with Hive support enabled")) } } + + test("ALTER TABLE ALTER COLUMN with position is not supported") { + withTable("t") { + sql("CREATE TABLE t(i INT) USING parquet") + val e = intercept[AnalysisException] { + sql("ALTER TABLE t ALTER COLUMN i TYPE INT FIRST") + } + assert(e.message.contains("ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables")) + } + } } abstract class DDLSuite extends QueryTest with SQLTestUtils {