Skip to content

Commit 7618712

Browse files
committed
support column position in DS v2
1 parent 187f3c1 commit 7618712

File tree

12 files changed

+300
-72
lines changed

12 files changed

+300
-72
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -846,7 +846,7 @@ intervalUnit
846846
;
847847

848848
colPosition
849-
: FIRST | AFTER multipartIdentifier
849+
: position=FIRST | position=AFTER multipartIdentifier
850850
;
851851

852852
dataType

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import com.google.common.base.Preconditions;
2121
import org.apache.spark.annotation.Experimental;
2222

23+
import java.util.ArrayList;
2324
import java.util.Arrays;
25+
import java.util.List;
2426
import java.util.Objects;
2527
import java.util.stream.Collectors;
2628
import java.util.stream.Stream;
@@ -51,19 +53,10 @@ public String name() {
5153
return name;
5254
}
5355

54-
private String escapeQuote(String part) {
55-
if (part.contains("`")) {
56-
return part.replace("`", "``");
57-
} else {
58-
return part;
59-
}
60-
}
61-
6256
@Override
6357
public String toString() {
64-
return Stream.concat(Stream.of(namespace), Stream.of(name))
65-
.map(part -> '`' + escapeQuote(part) + '`')
66-
.collect(Collectors.joining("."));
58+
return CatalogV2Implicits.quoteNameParts(Stream.concat(
59+
Stream.of(namespace), Stream.of(name)).toArray(String[]::new));
6760
}
6861

6962
@Override

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java

Lines changed: 140 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.spark.annotation.Experimental;
2121
import org.apache.spark.sql.types.DataType;
2222

23+
import javax.annotation.Nullable;
2324
import java.util.Arrays;
2425
import java.util.Objects;
2526

@@ -76,7 +77,7 @@ static TableChange removeProperty(String property) {
7677
* @return a TableChange for the addition
7778
*/
7879
static TableChange addColumn(String[] fieldNames, DataType dataType) {
79-
return new AddColumn(fieldNames, dataType, true, null);
80+
return new AddColumn(fieldNames, dataType, true, null, null);
8081
}
8182

8283
/**
@@ -92,7 +93,7 @@ static TableChange addColumn(String[] fieldNames, DataType dataType) {
9293
* @return a TableChange for the addition
9394
*/
9495
static TableChange addColumn(String[] fieldNames, DataType dataType, boolean isNullable) {
95-
return new AddColumn(fieldNames, dataType, isNullable, null);
96+
return new AddColumn(fieldNames, dataType, isNullable, null, null);
9697
}
9798

9899
/**
@@ -113,7 +114,30 @@ static TableChange addColumn(
113114
DataType dataType,
114115
boolean isNullable,
115116
String comment) {
116-
return new AddColumn(fieldNames, dataType, isNullable, comment);
117+
return new AddColumn(fieldNames, dataType, isNullable, comment, null);
118+
}
119+
120+
/**
121+
* Create a TableChange for adding a column.
122+
* <p>
123+
* If the field already exists, the change will result in an {@link IllegalArgumentException}.
124+
* If the new field is nested and its parent does not exist or is not a struct, the change will
125+
* result in an {@link IllegalArgumentException}.
126+
*
127+
* @param fieldNames field names of the new column
128+
* @param dataType the new column's data type
129+
* @param isNullable whether the new column can contain null
130+
* @param comment the new field's comment string
131+
* @param position the new columns's position
132+
* @return a TableChange for the addition
133+
*/
134+
static TableChange addColumn(
135+
String[] fieldNames,
136+
DataType dataType,
137+
boolean isNullable,
138+
String comment,
139+
ColumnPosition position) {
140+
return new AddColumn(fieldNames, dataType, isNullable, comment, position);
117141
}
118142

119143
/**
@@ -180,6 +204,21 @@ static TableChange updateColumnComment(String[] fieldNames, String newComment) {
180204
return new UpdateColumnComment(fieldNames, newComment);
181205
}
182206

207+
/**
208+
* Create a TableChange for updating the position of a field.
209+
* <p>
210+
* The name is used to find the field to update.
211+
* <p>
212+
* If the field does not exist, the change will result in an {@link IllegalArgumentException}.
213+
*
214+
* @param fieldNames field names of the column to update
215+
* @param newPosition the new position
216+
* @return a TableChange for the update
217+
*/
218+
static TableChange updateColumnPosition(String[] fieldNames, ColumnPosition newPosition) {
219+
return new UpdateColumnPosition(fieldNames, newPosition);
220+
}
221+
183222
/**
184223
* Create a TableChange for deleting a field.
185224
* <p>
@@ -259,6 +298,44 @@ public int hashCode() {
259298
}
260299
}
261300

301+
interface ColumnPosition {
302+
final class First implements ColumnPosition {
303+
@Override
304+
public String toString() {
305+
return "FIRST";
306+
}
307+
}
308+
309+
final class After implements ColumnPosition {
310+
private final String[] column;
311+
312+
public After(String[] column) {
313+
assert column != null;
314+
this.column = column;
315+
}
316+
317+
@Override
318+
public String toString() {
319+
return "AFTER " + CatalogV2Implicits.quoteNameParts(column);
320+
}
321+
322+
@Override
323+
public boolean equals(Object o) {
324+
if (this == o) return true;
325+
if (o == null || getClass() != o.getClass()) return false;
326+
After after = (After) o;
327+
return Arrays.equals(column, after.column);
328+
}
329+
330+
@Override
331+
public int hashCode() {
332+
return Arrays.hashCode(column);
333+
}
334+
}
335+
336+
First FIRST = new First();
337+
}
338+
262339
interface ColumnChange extends TableChange {
263340
String[] fieldNames();
264341
}
@@ -275,12 +352,19 @@ final class AddColumn implements ColumnChange {
275352
private final DataType dataType;
276353
private final boolean isNullable;
277354
private final String comment;
278-
279-
private AddColumn(String[] fieldNames, DataType dataType, boolean isNullable, String comment) {
355+
private final ColumnPosition position;
356+
357+
private AddColumn(
358+
String[] fieldNames,
359+
DataType dataType,
360+
boolean isNullable,
361+
String comment,
362+
ColumnPosition position) {
280363
this.fieldNames = fieldNames;
281364
this.dataType = dataType;
282365
this.isNullable = isNullable;
283366
this.comment = comment;
367+
this.position = position;
284368
}
285369

286370
@Override
@@ -296,10 +380,16 @@ public boolean isNullable() {
296380
return isNullable;
297381
}
298382

383+
@Nullable
299384
public String comment() {
300385
return comment;
301386
}
302387

388+
@Nullable
389+
public ColumnPosition position() {
390+
return position;
391+
}
392+
303393
@Override
304394
public boolean equals(Object o) {
305395
if (this == o) return true;
@@ -308,12 +398,13 @@ public boolean equals(Object o) {
308398
return isNullable == addColumn.isNullable &&
309399
Arrays.equals(fieldNames, addColumn.fieldNames) &&
310400
dataType.equals(addColumn.dataType) &&
311-
comment.equals(addColumn.comment);
401+
Objects.equals(comment, addColumn.comment) &&
402+
Objects.equals(position, addColumn.position);
312403
}
313404

314405
@Override
315406
public int hashCode() {
316-
int result = Objects.hash(dataType, isNullable, comment);
407+
int result = Objects.hash(dataType, isNullable, comment, position);
317408
result = 31 * result + Arrays.hashCode(fieldNames);
318409
return result;
319410
}
@@ -453,6 +544,48 @@ public int hashCode() {
453544
}
454545
}
455546

547+
/**
548+
* A TableChange to update the position of a field.
549+
* <p>
550+
* The field names are used to find the field to update.
551+
* <p>
552+
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
553+
*/
554+
final class UpdateColumnPosition implements ColumnChange {
555+
private final String[] fieldNames;
556+
private final ColumnPosition position;
557+
558+
private UpdateColumnPosition(String[] fieldNames, ColumnPosition position) {
559+
this.fieldNames = fieldNames;
560+
this.position = position;
561+
}
562+
563+
@Override
564+
public String[] fieldNames() {
565+
return fieldNames;
566+
}
567+
568+
public ColumnPosition position() {
569+
return position;
570+
}
571+
572+
@Override
573+
public boolean equals(Object o) {
574+
if (this == o) return true;
575+
if (o == null || getClass() != o.getClass()) return false;
576+
UpdateColumnPosition that = (UpdateColumnPosition) o;
577+
return Arrays.equals(fieldNames, that.fieldNames) &&
578+
position.equals(that.position);
579+
}
580+
581+
@Override
582+
public int hashCode() {
583+
int result = Objects.hash(position);
584+
result = 31 * result + Arrays.hashCode(fieldNames);
585+
return result;
586+
}
587+
}
588+
456589
/**
457590
* A TableChange to delete a field.
458591
* <p>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,32 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
3535
case AlterTableAddColumnsStatement(
3636
nameParts @ NonSessionCatalog(catalog, tableName), cols) =>
3737
val changes = cols.map { col =>
38-
TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull)
38+
TableChange.addColumn(
39+
col.name.toArray,
40+
col.dataType,
41+
true,
42+
col.comment.orNull,
43+
col.position.orNull)
3944
}
4045
createAlterTable(nameParts, catalog, tableName, changes)
4146

4247
case AlterTableAlterColumnStatement(
43-
nameParts @ NonSessionCatalog(catalog, tableName), colName, dataType, comment) =>
48+
nameParts @ NonSessionCatalog(catalog, tableName), colName, dataType, comment, position) =>
49+
val nameParts = colName.toArray
4450
val typeChange = dataType.map { newDataType =>
45-
TableChange.updateColumnType(colName.toArray, newDataType, true)
51+
TableChange.updateColumnType(nameParts, newDataType, true)
4652
}
4753
val commentChange = comment.map { newComment =>
48-
TableChange.updateColumnComment(colName.toArray, newComment)
54+
TableChange.updateColumnComment(nameParts, newComment)
4955
}
50-
createAlterTable(nameParts, catalog, tableName, typeChange.toSeq ++ commentChange)
56+
val positionChange = position.map { newPosition =>
57+
TableChange.updateColumnPosition(nameParts, newPosition)
58+
}
59+
createAlterTable(
60+
nameParts,
61+
catalog,
62+
tableName,
63+
typeChange.toSeq ++ commentChange ++ positionChange)
5164

5265
case AlterTableRenameColumnStatement(
5366
nameParts @ NonSessionCatalog(catalog, tableName), col, newName) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate
4040
import org.apache.spark.sql.catalyst.util.IntervalUtils
4141
import org.apache.spark.sql.catalyst.util.IntervalUtils.IntervalUnit
4242
import org.apache.spark.sql.connector.catalog.SupportsNamespaces
43+
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
4344
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
4445
import org.apache.spark.sql.internal.SQLConf
4546
import org.apache.spark.sql.types._
@@ -2802,19 +2803,24 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
28022803
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
28032804
}
28042805

2806+
override def visitColPosition(ctx: ColPositionContext): ColumnPosition = {
2807+
ctx.position.getType match {
2808+
case SqlBaseParser.FIRST => ColumnPosition.FIRST
2809+
case SqlBaseParser.AFTER =>
2810+
new ColumnPosition.After(typedVisit[Seq[String]](ctx.multipartIdentifier).toArray)
2811+
}
2812+
}
2813+
28052814
/**
28062815
* Parse new column info from ADD COLUMN into a QualifiedColType.
28072816
*/
28082817
override def visitQualifiedColTypeWithPosition(
28092818
ctx: QualifiedColTypeWithPositionContext): QualifiedColType = withOrigin(ctx) {
2810-
if (ctx.colPosition != null) {
2811-
operationNotAllowed("ALTER TABLE table ADD COLUMN ... FIRST | AFTER otherCol", ctx)
2812-
}
2813-
28142819
QualifiedColType(
28152820
typedVisit[Seq[String]](ctx.name),
28162821
typedVisit[DataType](ctx.dataType),
2817-
Option(ctx.comment).map(string))
2822+
Option(ctx.comment).map(string),
2823+
Option(ctx.colPosition).map(typedVisit[ColumnPosition]))
28182824
}
28192825

28202826
/**
@@ -2862,19 +2868,17 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
28622868
override def visitAlterTableColumn(
28632869
ctx: AlterTableColumnContext): LogicalPlan = withOrigin(ctx) {
28642870
val verb = if (ctx.CHANGE != null) "CHANGE" else "ALTER"
2865-
if (ctx.colPosition != null) {
2866-
operationNotAllowed(s"ALTER TABLE table $verb COLUMN ... FIRST | AFTER otherCol", ctx)
2867-
}
2868-
2869-
if (ctx.dataType == null && ctx.comment == null) {
2870-
operationNotAllowed(s"ALTER TABLE table $verb COLUMN requires a TYPE or a COMMENT", ctx)
2871+
if (ctx.dataType == null && ctx.comment == null && ctx.colPosition == null) {
2872+
operationNotAllowed(
2873+
s"ALTER TABLE table $verb COLUMN requires a TYPE or a COMMENT or a FIRST/AFTER", ctx)
28712874
}
28722875

28732876
AlterTableAlterColumnStatement(
28742877
visitMultipartIdentifier(ctx.table),
28752878
typedVisit[Seq[String]](ctx.column),
28762879
Option(ctx.dataType).map(typedVisit[DataType]),
2877-
Option(ctx.comment).map(string))
2880+
Option(ctx.comment).map(string),
2881+
Option(ctx.colPosition).map(typedVisit[ColumnPosition]))
28782882
}
28792883

28802884
/**

0 commit comments

Comments
 (0)