Skip to content

Commit 8463c73

Browse files
committed
SPARK-27964: Move v2 catalog update methods to CatalogV2Util.
1 parent aec0869 commit 8463c73

File tree

4 files changed

+161
-124
lines changed

4 files changed

+161
-124
lines changed

project/SparkBuild.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,6 +823,7 @@ object Unidoc {
823823
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution")))
824824
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/internal")))
825825
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive/test")))
826+
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalog/v2/util")))
826827
}
827828

828829
private def ignoreClasspaths(classpaths: Seq[Classpath]): Seq[Classpath] = {
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2.util
19+
20+
import java.util
21+
import java.util.Collections
22+
23+
import scala.collection.JavaConverters._
24+
25+
import org.apache.spark.sql.catalog.v2.TableChange
26+
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType}
27+
import org.apache.spark.sql.types.{StructField, StructType}
28+
29+
object CatalogV2Util {
30+
/**
31+
* Apply properties changes to a map and return the result.
32+
*/
33+
def applyPropertiesChanges(
34+
properties: Map[String, String],
35+
changes: Seq[TableChange]): Map[String, String] = {
36+
applyPropertiesChanges(properties.asJava, changes).asScala.toMap
37+
}
38+
39+
/**
40+
* Apply properties changes to a Java map and return the result.
41+
*/
42+
def applyPropertiesChanges(
43+
properties: util.Map[String, String],
44+
changes: Seq[TableChange]): util.Map[String, String] = {
45+
val newProperties = new util.HashMap[String, String](properties)
46+
47+
changes.foreach {
48+
case set: SetProperty =>
49+
newProperties.put(set.property, set.value)
50+
51+
case unset: RemoveProperty =>
52+
newProperties.remove(unset.property)
53+
54+
case _ =>
55+
// ignore non-property changes
56+
}
57+
58+
Collections.unmodifiableMap(newProperties)
59+
}
60+
61+
/**
62+
* Apply schema changes to a schema and return the result.
63+
*/
64+
def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): StructType = {
65+
changes.foldLeft(schema) { (schema, change) =>
66+
change match {
67+
case add: AddColumn =>
68+
add.fieldNames match {
69+
case Array(name) =>
70+
val newField = StructField(name, add.dataType, nullable = add.isNullable)
71+
Option(add.comment) match {
72+
case Some(comment) =>
73+
schema.add(newField.withComment(comment))
74+
case _ =>
75+
schema.add(newField)
76+
}
77+
78+
case names =>
79+
replace(schema, names.init, parent => parent.dataType match {
80+
case parentType: StructType =>
81+
val field = StructField(names.last, add.dataType, nullable = add.isNullable)
82+
val newParentType = Option(add.comment) match {
83+
case Some(comment) =>
84+
parentType.add(field.withComment(comment))
85+
case None =>
86+
parentType.add(field)
87+
}
88+
89+
Some(StructField(parent.name, newParentType, parent.nullable, parent.metadata))
90+
91+
case _ =>
92+
throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
93+
})
94+
}
95+
96+
case rename: RenameColumn =>
97+
replace(schema, rename.fieldNames, field =>
98+
Some(StructField(rename.newName, field.dataType, field.nullable, field.metadata)))
99+
100+
case update: UpdateColumnType =>
101+
replace(schema, update.fieldNames, field => {
102+
if (!update.isNullable && field.nullable) {
103+
throw new IllegalArgumentException(
104+
s"Cannot change optional column to required: $field.name")
105+
}
106+
Some(StructField(field.name, update.newDataType, update.isNullable, field.metadata))
107+
})
108+
109+
case update: UpdateColumnComment =>
110+
replace(schema, update.fieldNames, field =>
111+
Some(field.withComment(update.newComment)))
112+
113+
case delete: DeleteColumn =>
114+
replace(schema, delete.fieldNames, _ => None)
115+
116+
case _ =>
117+
// ignore non-schema changes
118+
schema
119+
}
120+
}
121+
}
122+
123+
private def replace(
124+
struct: StructType,
125+
path: Seq[String],
126+
update: StructField => Option[StructField]): StructType = {
127+
128+
val pos = struct.getFieldIndex(path.head)
129+
.getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${path.head}"))
130+
val field = struct.fields(pos)
131+
val replacement: Option[StructField] = if (path.tail.isEmpty) {
132+
update(field)
133+
} else {
134+
field.dataType match {
135+
case nestedStruct: StructType =>
136+
val updatedType: StructType = replace(nestedStruct, path.tail, update)
137+
Some(StructField(field.name, updatedType, field.nullable, field.metadata))
138+
case _ =>
139+
throw new IllegalArgumentException(s"Not a struct: ${path.head}")
140+
}
141+
}
142+
143+
val newFields = struct.fields.zipWithIndex.flatMap {
144+
case (_, index) if pos == index =>
145+
replacement
146+
case (other, _) =>
147+
Some(other)
148+
}
149+
150+
new StructType(newFields)
151+
}
152+
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala

Lines changed: 4 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,15 @@
1818
package org.apache.spark.sql.catalog.v2
1919

2020
import java.util
21-
import java.util.Collections
2221
import java.util.concurrent.ConcurrentHashMap
2322

2423
import scala.collection.JavaConverters._
2524

26-
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType}
2725
import org.apache.spark.sql.catalog.v2.expressions.Transform
26+
import org.apache.spark.sql.catalog.v2.util.CatalogV2Util
2827
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
2928
import org.apache.spark.sql.sources.v2.{Table, TableCapability}
30-
import org.apache.spark.sql.types.{StructField, StructType}
29+
import org.apache.spark.sql.types.StructType
3130
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3231

3332
class TestTableCatalog extends TableCatalog {
@@ -79,8 +78,8 @@ class TestTableCatalog extends TableCatalog {
7978

8079
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
8180
val table = loadTable(ident)
82-
val properties = TestTableCatalog.applyPropertiesChanges(table.properties, changes)
83-
val schema = TestTableCatalog.applySchemaChanges(table.schema, changes)
81+
val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)
82+
val schema = CatalogV2Util.applySchemaChanges(table.schema, changes)
8483
val newTable = InMemoryTable(table.name, schema, properties)
8584

8685
tables.put(ident, newTable)
@@ -91,122 +90,6 @@ class TestTableCatalog extends TableCatalog {
9190
override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined
9291
}
9392

94-
object TestTableCatalog {
95-
/**
96-
* Apply properties changes to a map and return the result.
97-
*/
98-
def applyPropertiesChanges(
99-
properties: util.Map[String, String],
100-
changes: Seq[TableChange]): util.Map[String, String] = {
101-
val newProperties = new util.HashMap[String, String](properties)
102-
103-
changes.foreach {
104-
case set: SetProperty =>
105-
newProperties.put(set.property, set.value)
106-
107-
case unset: RemoveProperty =>
108-
newProperties.remove(unset.property)
109-
110-
case _ =>
111-
// ignore non-property changes
112-
}
113-
114-
Collections.unmodifiableMap(newProperties)
115-
}
116-
117-
/**
118-
* Apply schema changes to a schema and return the result.
119-
*/
120-
def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): StructType = {
121-
changes.foldLeft(schema) { (schema, change) =>
122-
change match {
123-
case add: AddColumn =>
124-
add.fieldNames match {
125-
case Array(name) =>
126-
val newField = StructField(name, add.dataType, nullable = add.isNullable)
127-
Option(add.comment) match {
128-
case Some(comment) =>
129-
schema.add(newField.withComment(comment))
130-
case _ =>
131-
schema.add(newField)
132-
}
133-
134-
case names =>
135-
replace(schema, names.init, parent => parent.dataType match {
136-
case parentType: StructType =>
137-
val field = StructField(names.last, add.dataType, nullable = add.isNullable)
138-
val newParentType = Option(add.comment) match {
139-
case Some(comment) =>
140-
parentType.add(field.withComment(comment))
141-
case None =>
142-
parentType.add(field)
143-
}
144-
145-
Some(StructField(parent.name, newParentType, parent.nullable, parent.metadata))
146-
147-
case _ =>
148-
throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
149-
})
150-
}
151-
152-
case rename: RenameColumn =>
153-
replace(schema, rename.fieldNames, field =>
154-
Some(StructField(rename.newName, field.dataType, field.nullable, field.metadata)))
155-
156-
case update: UpdateColumnType =>
157-
replace(schema, update.fieldNames, field => {
158-
if (!update.isNullable && field.nullable) {
159-
throw new IllegalArgumentException(
160-
s"Cannot change optional column to required: $field.name")
161-
}
162-
Some(StructField(field.name, update.newDataType, update.isNullable, field.metadata))
163-
})
164-
165-
case update: UpdateColumnComment =>
166-
replace(schema, update.fieldNames, field =>
167-
Some(field.withComment(update.newComment)))
168-
169-
case delete: DeleteColumn =>
170-
replace(schema, delete.fieldNames, _ => None)
171-
172-
case _ =>
173-
// ignore non-schema changes
174-
schema
175-
}
176-
}
177-
}
178-
179-
private def replace(
180-
struct: StructType,
181-
path: Seq[String],
182-
update: StructField => Option[StructField]): StructType = {
183-
184-
val pos = struct.getFieldIndex(path.head)
185-
.getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${path.head}"))
186-
val field = struct.fields(pos)
187-
val replacement: Option[StructField] = if (path.tail.isEmpty) {
188-
update(field)
189-
} else {
190-
field.dataType match {
191-
case nestedStruct: StructType =>
192-
val updatedType: StructType = replace(nestedStruct, path.tail, update)
193-
Some(StructField(field.name, updatedType, field.nullable, field.metadata))
194-
case _ =>
195-
throw new IllegalArgumentException(s"Not a struct: ${path.head}")
196-
}
197-
}
198-
199-
val newFields = struct.fields.zipWithIndex.flatMap {
200-
case (_, index) if pos == index =>
201-
replacement
202-
case (other, _) =>
203-
Some(other)
204-
}
205-
206-
new StructType(newFields)
207-
}
208-
}
209-
21093
case class InMemoryTable(
21194
name: String,
21295
schema: StructType,

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ import java.util.concurrent.ConcurrentHashMap
2323
import scala.collection.JavaConverters._
2424
import scala.collection.mutable
2525

26-
import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, TableCatalog, TableChange, TestTableCatalog}
26+
import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, TableCatalog, TableChange}
2727
import org.apache.spark.sql.catalog.v2.expressions.Transform
28+
import org.apache.spark.sql.catalog.v2.util.CatalogV2Util
2829
import org.apache.spark.sql.catalyst.InternalRow
2930
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
3031
import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
@@ -85,8 +86,8 @@ class TestInMemoryTableCatalog extends TableCatalog {
8586
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
8687
Option(tables.get(ident)) match {
8788
case Some(table) =>
88-
val properties = TestTableCatalog.applyPropertiesChanges(table.properties, changes)
89-
val schema = TestTableCatalog.applySchemaChanges(table.schema, changes)
89+
val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)
90+
val schema = CatalogV2Util.applySchemaChanges(table.schema, changes)
9091
val newTable = new InMemoryTable(table.name, schema, properties, table.data)
9192

9293
tables.put(ident, newTable)

0 commit comments

Comments
 (0)