Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ intervalUnit
;

colPosition
: FIRST | AFTER multipartIdentifier
: position=FIRST | position=AFTER afterCol=errorCapturingIdentifier
;

dataType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.spark.sql.connector.catalog;

import com.google.common.base.Preconditions;
import org.apache.spark.annotation.Experimental;

import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.base.Preconditions;

import org.apache.spark.annotation.Experimental;

/**
* An {@link Identifier} implementation.
*/
Expand All @@ -51,19 +52,11 @@ public String name() {
return name;
}

private String escapeQuote(String part) {
if (part.contains("`")) {
return part.replace("`", "``");
} else {
return part;
}
}

@Override
public String toString() {
return Stream.concat(Stream.of(namespace), Stream.of(name))
.map(part -> '`' + escapeQuote(part) + '`')
.collect(Collectors.joining("."));
.map(CatalogV2Implicits::quote)
.collect(Collectors.joining("."));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.sql.connector.catalog;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.types.DataType;

import java.util.Arrays;
import java.util.Objects;
import javax.annotation.Nullable;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.types.DataType;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this causing validation to fail? I think we generally want to avoid changes like this unless they are enforced by a linter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not enforced by the linter but we do require it in the style guide.


/**
* TableChange subclasses represent requested changes to a table. These are passed to
Expand Down Expand Up @@ -76,7 +77,7 @@ static TableChange removeProperty(String property) {
* @return a TableChange for the addition
*/
static TableChange addColumn(String[] fieldNames, DataType dataType) {
return new AddColumn(fieldNames, dataType, true, null);
return new AddColumn(fieldNames, dataType, true, null, null);
}

/**
Expand All @@ -92,7 +93,7 @@ static TableChange addColumn(String[] fieldNames, DataType dataType) {
* @return a TableChange for the addition
*/
static TableChange addColumn(String[] fieldNames, DataType dataType, boolean isNullable) {
return new AddColumn(fieldNames, dataType, isNullable, null);
return new AddColumn(fieldNames, dataType, isNullable, null, null);
}

/**
Expand All @@ -113,7 +114,30 @@ static TableChange addColumn(
DataType dataType,
boolean isNullable,
String comment) {
return new AddColumn(fieldNames, dataType, isNullable, comment);
return new AddColumn(fieldNames, dataType, isNullable, comment, null);
}

/**
* Create a TableChange for adding a column.
* <p>
* If the field already exists, the change will result in an {@link IllegalArgumentException}.
* If the new field is nested and its parent does not exist or is not a struct, the change will
* result in an {@link IllegalArgumentException}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick nit. I noticed that the error was wrapped in a SparkException when running alterTable in AlterTableExec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc is for catalog implementations. They should throw IllegalArgumentException if something goes wrong.

*
* @param fieldNames field names of the new column
* @param dataType the new column's data type
* @param isNullable whether the new column can contain null
* @param comment the new field's comment string
* @param position the new columns's position
* @return a TableChange for the addition
*/
static TableChange addColumn(
String[] fieldNames,
DataType dataType,
boolean isNullable,
String comment,
ColumnPosition position) {
return new AddColumn(fieldNames, dataType, isNullable, comment, position);
}

/**
Expand Down Expand Up @@ -180,6 +204,21 @@ static TableChange updateColumnComment(String[] fieldNames, String newComment) {
return new UpdateColumnComment(fieldNames, newComment);
}

/**
* Create a TableChange for updating the position of a field.
* <p>
* The name is used to find the field to update.
* <p>
* If the field does not exist, the change will result in an {@link IllegalArgumentException}.
*
* @param fieldNames field names of the column to update
* @param newPosition the new position
* @return a TableChange for the update
*/
static TableChange updateColumnPosition(String[] fieldNames, ColumnPosition newPosition) {
return new UpdateColumnPosition(fieldNames, newPosition);
}

/**
* Create a TableChange for deleting a field.
* <p>
Expand Down Expand Up @@ -259,6 +298,69 @@ public int hashCode() {
}
}

interface ColumnPosition {

static ColumnPosition first() {
return First.SINGLETON;
}

static ColumnPosition after(String column) {
return new After(column);
}
}

/**
* Column position FIRST means the specified column should be the first column.
* Note that, the specified column may be a nested field, and then FIRST means this field should
* be the first one within the struct.
*/
final class First implements ColumnPosition {
private static final First SINGLETON = new First();
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jan 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, All.
Sorry for late nit-picking, but shall we rename this to INSTANCE for consistency with the other singletons?
During review a new PR, #27380, I found that we are starting to lose the consistency due to this. I'll make a follow-up.

cc @brkyvz


private First() {}

@Override
public String toString() {
return "FIRST";
}
}

/**
* Column position AFTER means the specified column should be put after the given `column`.
* Note that, the specified column may be a nested field, and then the given `column` refers to
* a field in the same struct.
*/
final class After implements ColumnPosition {
private final String column;

private After(String column) {
assert column != null;
this.column = column;
}

public String column() {
return column;
}

@Override
public String toString() {
return "AFTER " + column;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
After after = (After) o;
return column.equals(after.column);
}

@Override
public int hashCode() {
return Objects.hash(column);
}
}

interface ColumnChange extends TableChange {
String[] fieldNames();
}
Expand All @@ -275,12 +377,19 @@ final class AddColumn implements ColumnChange {
private final DataType dataType;
private final boolean isNullable;
private final String comment;

private AddColumn(String[] fieldNames, DataType dataType, boolean isNullable, String comment) {
private final ColumnPosition position;

private AddColumn(
String[] fieldNames,
DataType dataType,
boolean isNullable,
String comment,
ColumnPosition position) {
this.fieldNames = fieldNames;
this.dataType = dataType;
this.isNullable = isNullable;
this.comment = comment;
this.position = position;
}

@Override
Expand All @@ -296,10 +405,16 @@ public boolean isNullable() {
return isNullable;
}

@Nullable
public String comment() {
return comment;
}

@Nullable
public ColumnPosition position() {
return position;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -308,12 +423,13 @@ public boolean equals(Object o) {
return isNullable == addColumn.isNullable &&
Arrays.equals(fieldNames, addColumn.fieldNames) &&
dataType.equals(addColumn.dataType) &&
comment.equals(addColumn.comment);
Objects.equals(comment, addColumn.comment) &&
Objects.equals(position, addColumn.position);
}

@Override
public int hashCode() {
int result = Objects.hash(dataType, isNullable, comment);
int result = Objects.hash(dataType, isNullable, comment, position);
result = 31 * result + Arrays.hashCode(fieldNames);
return result;
}
Expand Down Expand Up @@ -453,6 +569,48 @@ public int hashCode() {
}
}

/**
* A TableChange to update the position of a field.
* <p>
* The field names are used to find the field to update.
* <p>
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
*/
final class UpdateColumnPosition implements ColumnChange {
private final String[] fieldNames;
private final ColumnPosition position;

private UpdateColumnPosition(String[] fieldNames, ColumnPosition position) {
this.fieldNames = fieldNames;
this.position = position;
}

@Override
public String[] fieldNames() {
return fieldNames;
}

public ColumnPosition position() {
return position;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UpdateColumnPosition that = (UpdateColumnPosition) o;
return Arrays.equals(fieldNames, that.fieldNames) &&
position.equals(that.position);
}

@Override
public int hashCode() {
int result = Objects.hash(position);
result = 31 * result + Arrays.hashCode(fieldNames);
return result;
}
}

/**
* A TableChange to delete a field.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,32 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case AlterTableAddColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
val changes = cols.map { col =>
TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull)
TableChange.addColumn(
col.name.toArray,
col.dataType,
true,
col.comment.orNull,
col.position.orNull)
}
createAlterTable(nameParts, catalog, tbl, changes)

case AlterTableAlterColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), colName, dataType, comment) =>
nameParts @ NonSessionCatalogAndTable(catalog, tbl), colName, dataType, comment, pos) =>
val colNameArray = colName.toArray
val typeChange = dataType.map { newDataType =>
TableChange.updateColumnType(colName.toArray, newDataType, true)
TableChange.updateColumnType(colNameArray, newDataType, true)
}
val commentChange = comment.map { newComment =>
TableChange.updateColumnComment(colName.toArray, newComment)
TableChange.updateColumnComment(colNameArray, newComment)
}
createAlterTable(nameParts, catalog, tbl, typeChange.toSeq ++ commentChange)
val positionChange = pos.map { newPosition =>
TableChange.updateColumnPosition(colNameArray, newPosition)
}
createAlterTable(
nameParts,
catalog,
tbl,
typeChange.toSeq ++ commentChange ++ positionChange)

case AlterTableRenameColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), col, newName) =>
Expand Down
Loading