From 8463c735fce834e6725f3eb2b055f89509e7320b Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 5 Jun 2019 16:51:01 -0700 Subject: [PATCH 1/3] SPARK-27964: Move v2 catalog update methods to CatalogV2Util. --- project/SparkBuild.scala | 1 + .../sql/catalog/v2/util/CatalogV2Util.scala | 152 ++++++++++++++++++ .../sql/catalog/v2/TestTableCatalog.scala | 125 +------------- .../sources/v2/TestInMemoryTableCatalog.scala | 7 +- 4 files changed, 161 insertions(+), 124 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/util/CatalogV2Util.scala diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 83fe904cc1f0..88dbe2cb3d90 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -823,6 +823,7 @@ object Unidoc { .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/internal"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive/test"))) + .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalog/v2/util"))) } private def ignoreClasspaths(classpaths: Seq[Classpath]): Seq[Classpath] = { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/util/CatalogV2Util.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/util/CatalogV2Util.scala new file mode 100644 index 000000000000..a4c94a525048 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/util/CatalogV2Util.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2.util + +import java.util +import java.util.Collections + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalog.v2.TableChange +import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType} +import org.apache.spark.sql.types.{StructField, StructType} + +object CatalogV2Util { + /** + * Apply properties changes to a map and return the result. + */ + def applyPropertiesChanges( + properties: Map[String, String], + changes: Seq[TableChange]): Map[String, String] = { + applyPropertiesChanges(properties.asJava, changes).asScala.toMap + } + + /** + * Apply properties changes to a Java map and return the result. + */ + def applyPropertiesChanges( + properties: util.Map[String, String], + changes: Seq[TableChange]): util.Map[String, String] = { + val newProperties = new util.HashMap[String, String](properties) + + changes.foreach { + case set: SetProperty => + newProperties.put(set.property, set.value) + + case unset: RemoveProperty => + newProperties.remove(unset.property) + + case _ => + // ignore non-property changes + } + + Collections.unmodifiableMap(newProperties) + } + + /** + * Apply schema changes to a schema and return the result. + */ + def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): StructType = { + changes.foldLeft(schema) { (schema, change) => + change match { + case add: AddColumn => + add.fieldNames match { + case Array(name) => + val newField = StructField(name, add.dataType, nullable = add.isNullable) + Option(add.comment) match { + case Some(comment) => + schema.add(newField.withComment(comment)) + case _ => + schema.add(newField) + } + + case names => + replace(schema, names.init, parent => parent.dataType match { + case parentType: StructType => + val field = StructField(names.last, add.dataType, nullable = add.isNullable) + val newParentType = Option(add.comment) match { + case Some(comment) => + parentType.add(field.withComment(comment)) + case None => + parentType.add(field) + } + + Some(StructField(parent.name, newParentType, parent.nullable, parent.metadata)) + + case _ => + throw new IllegalArgumentException(s"Not a struct: ${names.init.last}") + }) + } + + case rename: RenameColumn => + replace(schema, rename.fieldNames, field => + Some(StructField(rename.newName, field.dataType, field.nullable, field.metadata))) + + case update: UpdateColumnType => + replace(schema, update.fieldNames, field => { + if (!update.isNullable && field.nullable) { + throw new IllegalArgumentException( + s"Cannot change optional column to required: $field.name") + } + Some(StructField(field.name, update.newDataType, update.isNullable, field.metadata)) + }) + + case update: UpdateColumnComment => + replace(schema, update.fieldNames, field => + Some(field.withComment(update.newComment))) + + case delete: DeleteColumn => + replace(schema, delete.fieldNames, _ => None) + + case _ => + // ignore non-schema changes + schema + } + } + } + + private def replace( + struct: StructType, + path: Seq[String], + update: StructField => Option[StructField]): StructType = { + + val pos = struct.getFieldIndex(path.head) + .getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${path.head}")) + val field = struct.fields(pos) + val replacement: Option[StructField] = if (path.tail.isEmpty) { + update(field) + } else { + field.dataType match { + case nestedStruct: StructType => + val updatedType: StructType = replace(nestedStruct, path.tail, update) + Some(StructField(field.name, updatedType, field.nullable, field.metadata)) + case _ => + throw new IllegalArgumentException(s"Not a struct: ${path.head}") + } + } + + val newFields = struct.fields.zipWithIndex.flatMap { + case (_, index) if pos == index => + replacement + case (other, _) => + Some(other) + } + + new StructType(newFields) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala index 78b4763484cc..a2489f9f52f3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala @@ -18,16 +18,15 @@ package org.apache.spark.sql.catalog.v2 import java.util -import java.util.Collections import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType} import org.apache.spark.sql.catalog.v2.expressions.Transform +import org.apache.spark.sql.catalog.v2.util.CatalogV2Util import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.sources.v2.{Table, TableCapability} -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap class TestTableCatalog extends TableCatalog { @@ -79,8 +78,8 @@ class TestTableCatalog extends TableCatalog { override def alterTable(ident: Identifier, changes: TableChange*): Table = { val table = loadTable(ident) - val properties = TestTableCatalog.applyPropertiesChanges(table.properties, changes) - val schema = TestTableCatalog.applySchemaChanges(table.schema, changes) + val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes) + val schema = CatalogV2Util.applySchemaChanges(table.schema, changes) val newTable = InMemoryTable(table.name, schema, properties) tables.put(ident, newTable) @@ -91,122 +90,6 @@ class TestTableCatalog extends TableCatalog { override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined } -object TestTableCatalog { - /** - * Apply properties changes to a map and return the result. - */ - def applyPropertiesChanges( - properties: util.Map[String, String], - changes: Seq[TableChange]): util.Map[String, String] = { - val newProperties = new util.HashMap[String, String](properties) - - changes.foreach { - case set: SetProperty => - newProperties.put(set.property, set.value) - - case unset: RemoveProperty => - newProperties.remove(unset.property) - - case _ => - // ignore non-property changes - } - - Collections.unmodifiableMap(newProperties) - } - - /** - * Apply schema changes to a schema and return the result. - */ - def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): StructType = { - changes.foldLeft(schema) { (schema, change) => - change match { - case add: AddColumn => - add.fieldNames match { - case Array(name) => - val newField = StructField(name, add.dataType, nullable = add.isNullable) - Option(add.comment) match { - case Some(comment) => - schema.add(newField.withComment(comment)) - case _ => - schema.add(newField) - } - - case names => - replace(schema, names.init, parent => parent.dataType match { - case parentType: StructType => - val field = StructField(names.last, add.dataType, nullable = add.isNullable) - val newParentType = Option(add.comment) match { - case Some(comment) => - parentType.add(field.withComment(comment)) - case None => - parentType.add(field) - } - - Some(StructField(parent.name, newParentType, parent.nullable, parent.metadata)) - - case _ => - throw new IllegalArgumentException(s"Not a struct: ${names.init.last}") - }) - } - - case rename: RenameColumn => - replace(schema, rename.fieldNames, field => - Some(StructField(rename.newName, field.dataType, field.nullable, field.metadata))) - - case update: UpdateColumnType => - replace(schema, update.fieldNames, field => { - if (!update.isNullable && field.nullable) { - throw new IllegalArgumentException( - s"Cannot change optional column to required: $field.name") - } - Some(StructField(field.name, update.newDataType, update.isNullable, field.metadata)) - }) - - case update: UpdateColumnComment => - replace(schema, update.fieldNames, field => - Some(field.withComment(update.newComment))) - - case delete: DeleteColumn => - replace(schema, delete.fieldNames, _ => None) - - case _ => - // ignore non-schema changes - schema - } - } - } - - private def replace( - struct: StructType, - path: Seq[String], - update: StructField => Option[StructField]): StructType = { - - val pos = struct.getFieldIndex(path.head) - .getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${path.head}")) - val field = struct.fields(pos) - val replacement: Option[StructField] = if (path.tail.isEmpty) { - update(field) - } else { - field.dataType match { - case nestedStruct: StructType => - val updatedType: StructType = replace(nestedStruct, path.tail, update) - Some(StructField(field.name, updatedType, field.nullable, field.metadata)) - case _ => - throw new IllegalArgumentException(s"Not a struct: ${path.head}") - } - } - - val newFields = struct.fields.zipWithIndex.flatMap { - case (_, index) if pos == index => - replacement - case (other, _) => - Some(other) - } - - new StructType(newFields) - } -} - case class InMemoryTable( name: String, schema: StructType, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index 2ecf1c2f184f..d1900fd553bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -23,8 +23,9 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, TableCatalog, TableChange, TestTableCatalog} +import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, TableCatalog, TableChange} import org.apache.spark.sql.catalog.v2.expressions.Transform +import org.apache.spark.sql.catalog.v2.util.CatalogV2Util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder} @@ -85,8 +86,8 @@ class TestInMemoryTableCatalog extends TableCatalog { override def alterTable(ident: Identifier, changes: TableChange*): Table = { Option(tables.get(ident)) match { case Some(table) => - val properties = TestTableCatalog.applyPropertiesChanges(table.properties, changes) - val schema = TestTableCatalog.applySchemaChanges(table.schema, changes) + val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes) + val schema = CatalogV2Util.applySchemaChanges(table.schema, changes) val newTable = new InMemoryTable(table.name, schema, properties, table.data) tables.put(ident, newTable) From b1ac505120d3e29119f7507ad1a225ddb2577d24 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 5 Jun 2019 16:58:12 -0700 Subject: [PATCH 2/3] Rename path to fieldNames. --- .../spark/sql/catalog/v2/util/CatalogV2Util.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/util/CatalogV2Util.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/util/CatalogV2Util.scala index a4c94a525048..d9f867969c19 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/util/CatalogV2Util.scala +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/util/CatalogV2Util.scala @@ -122,21 +122,21 @@ object CatalogV2Util { private def replace( struct: StructType, - path: Seq[String], + fieldNames: Seq[String], update: StructField => Option[StructField]): StructType = { - val pos = struct.getFieldIndex(path.head) - .getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${path.head}")) + val pos = struct.getFieldIndex(fieldNames.head) + .getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${fieldNames.head}")) val field = struct.fields(pos) - val replacement: Option[StructField] = if (path.tail.isEmpty) { + val replacement: Option[StructField] = if (fieldNames.tail.isEmpty) { update(field) } else { field.dataType match { case nestedStruct: StructType => - val updatedType: StructType = replace(nestedStruct, path.tail, update) + val updatedType: StructType = replace(nestedStruct, fieldNames.tail, update) Some(StructField(field.name, updatedType, field.nullable, field.metadata)) case _ => - throw new IllegalArgumentException(s"Not a struct: ${path.head}") + throw new IllegalArgumentException(s"Not a struct: ${fieldNames.head}") } } From ca848baa83c7d14c0abbcfad3af22e6a88887b05 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 5 Jun 2019 17:19:16 -0700 Subject: [PATCH 3/3] Rename util package to utils to avoid name conflict with java.util. --- project/SparkBuild.scala | 2 +- .../spark/sql/catalog/v2/{util => utils}/CatalogV2Util.scala | 2 +- .../org/apache/spark/sql/catalog/v2/TestTableCatalog.scala | 2 +- .../apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) rename sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/{util => utils}/CatalogV2Util.scala (99%) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 88dbe2cb3d90..7a2b5969bc4c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -823,7 +823,7 @@ object Unidoc { .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/internal"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive/test"))) - .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalog/v2/util"))) + .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalog/v2/utils"))) } private def ignoreClasspaths(classpaths: Seq[Classpath]): Seq[Classpath] = { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/util/CatalogV2Util.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala similarity index 99% rename from sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/util/CatalogV2Util.scala rename to sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala index d9f867969c19..a00bcab602e8 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/util/CatalogV2Util.scala +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.catalog.v2.util +package org.apache.spark.sql.catalog.v2.utils import java.util import java.util.Collections diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala index a2489f9f52f3..6ba140fa5dde 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala @@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import org.apache.spark.sql.catalog.v2.expressions.Transform -import org.apache.spark.sql.catalog.v2.util.CatalogV2Util +import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.sources.v2.{Table, TableCapability} import org.apache.spark.sql.types.StructType diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index d1900fd553bb..4e9f961016de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -25,7 +25,7 @@ import scala.collection.mutable import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, TableCatalog, TableChange} import org.apache.spark.sql.catalog.v2.expressions.Transform -import org.apache.spark.sql.catalog.v2.util.CatalogV2Util +import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}