Skip to content

Commit 1684d67

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-33847
2 parents 7f3529d + ec1560a commit 1684d67

File tree

15 files changed

+157
-32
lines changed

15 files changed

+157
-32
lines changed

.github/workflows/publish_snapshot.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ on:
66

77
jobs:
88
publish-snapshot:
9+
if: github.repository == 'apache/spark'
910
runs-on: ubuntu-latest
1011
strategy:
1112
fail-fast: false

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ public boolean dropTable(Identifier ident) {
9999
return asTableCatalog().dropTable(ident);
100100
}
101101

102+
@Override
103+
public boolean purgeTable(Identifier ident) {
104+
return asTableCatalog().purgeTable(ident);
105+
}
106+
102107
@Override
103108
public void renameTable(
104109
Identifier oldIdent,

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -173,26 +173,23 @@ Table alterTable(
173173
boolean dropTable(Identifier ident);
174174

175175
/**
176-
* Drop a table in the catalog with an option to purge.
176+
* Drop a table in the catalog and completely remove its data by skipping a trash even if it is
177+
* supported.
177178
* <p>
178179
* If the catalog supports views and contains a view for the identifier and not a table, this
179180
* must not drop the view and must return false.
180181
* <p>
181-
* If the catalog supports the option to purge a table, this method must be overridden.
182-
* The default implementation falls back to {@link #dropTable(Identifier)} dropTable} if the
183-
* purge option is set to false. Otherwise, it throws {@link UnsupportedOperationException}.
182+
* If the catalog supports to purge a table, this method should be overridden.
183+
* The default implementation throws {@link UnsupportedOperationException}.
184184
*
185185
* @param ident a table identifier
186-
* @param purge whether a table should be purged
187186
* @return true if a table was deleted, false if no table exists for the identifier
187+
* @throws UnsupportedOperationException If table purging is not supported
188188
*
189189
* @since 3.1.0
190190
*/
191-
default boolean dropTable(Identifier ident, boolean purge) {
192-
if (purge) {
193-
throw new UnsupportedOperationException("Purge option is not supported.");
194-
}
195-
return dropTable(ident);
191+
default boolean purgeTable(Identifier ident) throws UnsupportedOperationException {
192+
throw new UnsupportedOperationException("Purge table is not supported.");
196193
}
197194

198195
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,11 @@ object ScalaReflection extends ScalaReflection {
232232
case t if isSubtype(t, localTypeOf[java.time.Instant]) =>
233233
createDeserializerForInstant(path)
234234

235+
case t if isSubtype(t, localTypeOf[java.lang.Enum[_]]) =>
236+
createDeserializerForTypesSupportValueOf(
237+
Invoke(path, "toString", ObjectType(classOf[String]), returnNullable = false),
238+
getClassFromType(t))
239+
235240
case t if isSubtype(t, localTypeOf[java.sql.Timestamp]) =>
236241
createDeserializerForSqlTimestamp(path)
237242

@@ -526,6 +531,9 @@ object ScalaReflection extends ScalaReflection {
526531
case t if isSubtype(t, localTypeOf[java.math.BigInteger]) =>
527532
createSerializerForJavaBigInteger(inputObject)
528533

534+
case t if isSubtype(t, localTypeOf[java.lang.Enum[_]]) =>
535+
createSerializerForJavaEnum(inputObject)
536+
529537
case t if isSubtype(t, localTypeOf[scala.math.BigInt]) =>
530538
createSerializerForScalaBigInt(inputObject)
531539

@@ -749,6 +757,7 @@ object ScalaReflection extends ScalaReflection {
749757
case t if isSubtype(t, localTypeOf[java.lang.Short]) => Schema(ShortType, nullable = true)
750758
case t if isSubtype(t, localTypeOf[java.lang.Byte]) => Schema(ByteType, nullable = true)
751759
case t if isSubtype(t, localTypeOf[java.lang.Boolean]) => Schema(BooleanType, nullable = true)
760+
case t if isSubtype(t, localTypeOf[java.lang.Enum[_]]) => Schema(StringType, nullable = true)
752761
case t if isSubtype(t, definitions.IntTpe) => Schema(IntegerType, nullable = false)
753762
case t if isSubtype(t, definitions.LongTpe) => Schema(LongType, nullable = false)
754763
case t if isSubtype(t, definitions.DoubleTpe) => Schema(DoubleType, nullable = false)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ object SerializerBuildHelper {
7474
returnNullable = false)
7575
}
7676

77+
def createSerializerForJavaEnum(inputObject: Expression): Expression =
78+
createSerializerForString(Invoke(inputObject, "name", ObjectType(classOf[String])))
79+
7780
def createSerializerForSqlTimestamp(inputObject: Expression): Expression = {
7881
StaticInvoke(
7982
DateTimeUtils.getClass,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -127,25 +127,36 @@ object CharVarcharUtils extends Logging {
127127
}
128128

129129
/**
130-
* Returns expressions to apply read-side char type padding for the given attributes. String
131-
* values should be right-padded to N characters if it's from a CHAR(N) column/field.
130+
* Returns expressions to apply read-side char type padding for the given attributes.
131+
*
132+
* For a CHAR(N) column/field and the length of string value is M
133+
* If M > N, raise runtime error
134+
* If M <= N, the value should be right-padded to N characters.
135+
*
136+
* For a VARCHAR(N) column/field and the length of string value is M
137+
* If M > N, raise runtime error
138+
* If M <= N, the value should be remained.
132139
*/
133-
def charTypePadding(output: Seq[AttributeReference]): Seq[NamedExpression] = {
140+
def paddingWithLengthCheck(output: Seq[AttributeReference]): Seq[NamedExpression] = {
134141
output.map { attr =>
135142
getRawType(attr.metadata).filter { rawType =>
136-
rawType.existsRecursively(_.isInstanceOf[CharType])
143+
rawType.existsRecursively(dt => dt.isInstanceOf[CharType] || dt.isInstanceOf[VarcharType])
137144
}.map { rawType =>
138-
Alias(charTypePadding(attr, rawType), attr.name)(explicitMetadata = Some(attr.metadata))
145+
Alias(paddingWithLengthCheck(attr, rawType), attr.name)(
146+
explicitMetadata = Some(attr.metadata))
139147
}.getOrElse(attr)
140148
}
141149
}
142150

143-
private def charTypePadding(expr: Expression, dt: DataType): Expression = dt match {
144-
case CharType(length) => StringRPad(expr, Literal(length))
151+
private def paddingWithLengthCheck(expr: Expression, dt: DataType): Expression = dt match {
152+
case CharType(length) => StringRPad(stringLengthCheck(expr, dt), Literal(length))
153+
154+
case VarcharType(_) => stringLengthCheck(expr, dt)
145155

146156
case StructType(fields) =>
147157
val struct = CreateNamedStruct(fields.zipWithIndex.flatMap { case (f, i) =>
148-
Seq(Literal(f.name), charTypePadding(GetStructField(expr, i, Some(f.name)), f.dataType))
158+
Seq(Literal(f.name),
159+
paddingWithLengthCheck(GetStructField(expr, i, Some(f.name)), f.dataType))
149160
})
150161
if (expr.nullable) {
151162
If(IsNull(expr), Literal(null, struct.dataType), struct)
@@ -166,7 +177,7 @@ object CharVarcharUtils extends Logging {
166177
private def charTypePaddingInArray(
167178
arr: Expression, et: DataType, containsNull: Boolean): Expression = {
168179
val param = NamedLambdaVariable("x", replaceCharVarcharWithString(et), containsNull)
169-
val func = LambdaFunction(charTypePadding(param, et), Seq(param))
180+
val func = LambdaFunction(paddingWithLengthCheck(param, et), Seq(param))
170181
ArrayTransform(arr, func)
171182
}
172183

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,11 @@ class TableCatalogSuite extends SparkFunSuite {
643643
assert(!catalog.tableExists(testIdent))
644644
}
645645

646+
test("purgeTable") {
647+
val catalog = newCatalog()
648+
intercept[UnsupportedOperationException](catalog.purgeTable(testIdent))
649+
}
650+
646651
test("renameTable") {
647652
val catalog = newCatalog()
648653

sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits {
8888
/** @since 3.0.0 */
8989
implicit def newInstantEncoder: Encoder[java.time.Instant] = Encoders.INSTANT
9090

91+
/** @since 3.2.0 */
92+
implicit def newJavaEnumEncoder[A <: java.lang.Enum[_] : TypeTag]: Encoder[A] =
93+
ExpressionEncoder()
94+
9195
// Boxed primitives
9296

9397
/** @since 2.0.0 */

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ApplyCharTypePadding.scala renamed to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PaddingAndLengthCheckForCharVarchar.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,21 @@ import org.apache.spark.sql.types.{CharType, StringType}
2727
import org.apache.spark.unsafe.types.UTF8String
2828

2929
/**
30-
* This rule applies char type padding in two places:
31-
* 1. When reading values from column/field of type CHAR(N), right-pad the values to length N.
32-
* 2. When comparing char type column/field with string literal or char type column/field,
33-
* right-pad the shorter one to the longer length.
30+
* This rule performs char type padding and length check for both char and varchar.
31+
*
32+
* When reading values from column/field of type CHAR(N) or VARCHAR(N), the underlying string value
33+
* might be over length (e.g. tables w/ external locations), it will fail in this case.
34+
* Otherwise, right-pad the values to length N for CHAR(N) and remain the same for VARCHAR(N).
35+
*
36+
* When comparing char type column/field with string literal or char type column/field,
37+
* right-pad the shorter one to the longer length.
3438
*/
35-
object ApplyCharTypePadding extends Rule[LogicalPlan] {
39+
object PaddingAndLengthCheckForCharVarchar extends Rule[LogicalPlan] {
3640

3741
override def apply(plan: LogicalPlan): LogicalPlan = {
3842
val padded = plan.resolveOperatorsUpWithNewOutput {
3943
case r: LogicalRelation =>
40-
val projectList = CharVarcharUtils.charTypePadding(r.output)
44+
val projectList = CharVarcharUtils.paddingWithLengthCheck(r.output)
4145
if (projectList == r.output) {
4246
r -> Nil
4347
} else {
@@ -47,7 +51,7 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
4751
}
4852

4953
case r: DataSourceV2Relation =>
50-
val projectList = CharVarcharUtils.charTypePadding(r.output)
54+
val projectList = CharVarcharUtils.paddingWithLengthCheck(r.output)
5155
if (projectList == r.output) {
5256
r -> Nil
5357
} else {
@@ -57,7 +61,7 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
5761
}
5862

5963
case r: HiveTableRelation =>
60-
val projectList = CharVarcharUtils.charTypePadding(r.output)
64+
val projectList = CharVarcharUtils.paddingWithLengthCheck(r.output)
6165
if (projectList == r.output) {
6266
r -> Nil
6367
} else {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ case class DropTableExec(
3535
override def run(): Seq[InternalRow] = {
3636
if (catalog.tableExists(ident)) {
3737
invalidateCache()
38-
catalog.dropTable(ident, purge)
38+
if (purge) catalog.purgeTable(ident) else catalog.dropTable(ident)
3939
} else if (!ifExists) {
4040
throw new NoSuchTableException(ident)
4141
}

0 commit comments

Comments
 (0)