Skip to content

Commit 8fff2f3

Browse files
aokolnychyidongjoon-hyun
authored andcommitted
rdar://70004937 Rewrite row-level operations for Iceberg (apache#922)
1 parent 8a7c1a9 commit 8fff2f3

File tree

15 files changed

+1292
-5
lines changed

15 files changed

+1292
-5
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions
19+
20+
import org.apache.spark.sql.catalyst.InternalRow
21+
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
22+
import org.apache.spark.sql.catalyst.util.SetAccumulator
23+
import org.apache.spark.sql.types.{DataType, IntegerType}
24+
25+
// copied from Iceberg Spark extensions
26+
case class AccumulateFiles(
27+
filesAccumulator: SetAccumulator[String],
28+
child: Expression) extends UnaryExpression with CodegenFallback {
29+
30+
override def dataType: DataType = IntegerType
31+
override def nullable: Boolean = true
32+
override def prettyName: String = "AccumulateFiles"
33+
override lazy val deterministic: Boolean = false
34+
private val RETURN_VAL: Integer = 1
35+
36+
override def eval(input: InternalRow) : Any = {
37+
val resultVal = child.eval(input)
38+
filesAccumulator.add(resultVal.toString)
39+
RETURN_VAL
40+
}
41+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical
19+
20+
import org.apache.spark.sql.catalyst.expressions.Attribute
21+
import org.apache.spark.sql.catalyst.expressions.Expression
22+
23+
// copied from Iceberg Spark extensions
24+
case class MergeInto(
25+
mergeIntoParams: MergeIntoParams,
26+
output: Seq[Attribute],
27+
child: LogicalPlan) extends UnaryNode
28+
29+
// copied from Iceberg Spark extensions
30+
case class MergeIntoParams(
31+
isSourceRowPresent: Expression,
32+
isTargetRowPresent: Expression,
33+
matchedConditions: Seq[Expression],
34+
matchedOutputs: Seq[Option[Seq[Expression]]],
35+
notMatchedConditions: Seq[Expression],
36+
notMatchedOutputs: Seq[Option[Seq[Expression]]],
37+
targetOutput: Seq[Expression],
38+
joinedAttributes: Seq[Attribute]) extends Serializable
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.AnalysisException
21+
import org.apache.spark.sql.catalyst.SQLConfHelper
22+
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, InsertAction, LogicalPlan, MergeIntoTable, UpdateAction, UpdateTable}
23+
import org.apache.spark.sql.catalyst.rules.Rule
24+
25+
// copied from Iceberg Spark extensions
26+
object AlignRowLevelOperations
27+
extends Rule[LogicalPlan] with AssignmentAlignmentSupport with IcebergSupport with SQLConfHelper {
28+
29+
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
30+
case u: UpdateTable if u.resolved && isIcebergRelation(u.table) =>
31+
u.copy(assignments = alignAssignments(u.table, u.assignments))
32+
33+
case m: MergeIntoTable if m.resolved && isIcebergRelation(m.targetTable) =>
34+
val alignedMatchedActions = m.matchedActions.map {
35+
case u @ UpdateAction(_, assignments) =>
36+
u.copy(assignments = alignAssignments(m.targetTable, assignments))
37+
case d: DeleteAction =>
38+
d
39+
case _ =>
40+
throw new AnalysisException("Matched actions can only contain UPDATE or DELETE")
41+
}
42+
43+
val alignedNotMatchedActions = m.notMatchedActions.map {
44+
case i @ InsertAction(_, assignments) =>
45+
// check no nested columns are present
46+
val refs = assignments.map(_.key).map(asAssignmentReference)
47+
refs.foreach { ref =>
48+
if (ref.size > 1) {
49+
throw new AnalysisException(
50+
"Nested fields are not supported inside INSERT clauses of MERGE operations: " +
51+
s"${ref.mkString("`", "`.`", "`")}")
52+
}
53+
}
54+
55+
val colNames = refs.map(_.head)
56+
57+
// check there are no duplicates
58+
val duplicateColNames = colNames.groupBy(identity).collect {
59+
case (name, matchingNames) if matchingNames.size > 1 => name
60+
}
61+
62+
if (duplicateColNames.nonEmpty) {
63+
throw new AnalysisException(
64+
s"Duplicate column names inside INSERT clause: ${duplicateColNames.mkString(", ")}")
65+
}
66+
67+
// reorder assignments by the target table column order
68+
val assignmentMap = colNames.zip(assignments).toMap
69+
i.copy(assignments = alignInsertActionAssignments(m.targetTable, assignmentMap))
70+
71+
case _ =>
72+
throw new AnalysisException("Not matched actions can only contain INSERT")
73+
}
74+
75+
m.copy(matchedActions = alignedMatchedActions, notMatchedActions = alignedNotMatchedActions)
76+
}
77+
78+
private def alignInsertActionAssignments(
79+
targetTable: LogicalPlan,
80+
assignmentMap: Map[String, Assignment]): Seq[Assignment] = {
81+
82+
val resolver = conf.resolver
83+
84+
targetTable.output.map { targetAttr =>
85+
val assignment = assignmentMap
86+
.find { case (name, _) => resolver(name, targetAttr.name) }
87+
.map { case (_, assignment) => assignment }
88+
89+
if (assignment.isEmpty) {
90+
throw new AnalysisException(
91+
s"Cannot find column '${targetAttr.name}' of the target table among " +
92+
s"the INSERT columns: ${assignmentMap.keys.mkString(", ")}. " +
93+
"INSERT clauses must provide values for all columns of the target table.")
94+
}
95+
96+
val key = assignment.get.key
97+
val value = assignment.get.value
98+
Assignment(key, castIfNeeded(targetAttr, value, resolver))
99+
}
100+
}
101+
}
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.sql.AnalysisException
23+
import org.apache.spark.sql.catalyst.SQLConfHelper
24+
import org.apache.spark.sql.catalyst.expressions.{Alias, AnsiCast, AttributeReference, Cast, CreateNamedStruct, Expression, ExtractValue, GetStructField, Literal, NamedExpression}
25+
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, LogicalPlan}
26+
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
27+
import org.apache.spark.sql.types.{DataType, StructField, StructType}
28+
29+
// copied from Iceberg Spark extensions
30+
trait AssignmentAlignmentSupport extends CastSupport {
31+
32+
self: SQLConfHelper =>
33+
34+
private case class ColumnUpdate(ref: Seq[String], expr: Expression)
35+
36+
/**
37+
* Aligns assignments to match table columns.
38+
* <p>
39+
* This method processes and reorders given assignments so that each target column gets
40+
* an expression it should be set to. If a column does not have a matching assignment,
41+
* it will be set to its current value. For example, if one passes a table with columns c1, c2
42+
* and an assignment c2 = 1, this method will return c1 = c1, c2 = 1.
43+
* <p>
44+
* This method also handles updates to nested columns. If there is an assignment to a particular
45+
* nested field, this method will construct a new struct with one field updated
46+
* preserving other fields that have not been modified. For example, if one passes a table with
47+
* columns c1, c2 where c2 is a struct with fields n1 and n2 and an assignment c2.n2 = 1,
48+
* this method will return c1 = c1, c2 = struct(c2.n1, 1).
49+
*
50+
* @param table a target table
51+
* @param assignments assignments to align
52+
* @return aligned assignments that match table columns
53+
*/
54+
protected def alignAssignments(
55+
table: LogicalPlan,
56+
assignments: Seq[Assignment]): Seq[Assignment] = {
57+
58+
val columnUpdates = assignments.map(a => ColumnUpdate(asAssignmentReference(a.key), a.value))
59+
val outputExprs = applyUpdates(table.output, columnUpdates)
60+
outputExprs.zip(table.output).map {
61+
case (expr, attr) => Assignment(attr, expr)
62+
}
63+
}
64+
65+
private def applyUpdates(
66+
cols: Seq[NamedExpression],
67+
updates: Seq[ColumnUpdate],
68+
resolver: Resolver = conf.resolver,
69+
namePrefix: Seq[String] = Nil): Seq[Expression] = {
70+
71+
// iterate through columns at the current level and find which column updates match
72+
cols.map { col =>
73+
// find matches for this column or any of its children
74+
val prefixMatchedUpdates = updates.filter(a => resolver(a.ref.head, col.name))
75+
prefixMatchedUpdates match {
76+
// if there is no exact match and no match for children, return the column as is
77+
case updates if updates.isEmpty =>
78+
col
79+
80+
// if there is an exact match, return the assigned expression
81+
case Seq(update) if isExactMatch(update, col, resolver) =>
82+
castIfNeeded(col, update.expr, resolver)
83+
84+
// if there are matches only for children
85+
case updates if !hasExactMatch(updates, col, resolver) =>
86+
col.dataType match {
87+
case StructType(fields) =>
88+
// build field expressions
89+
val fieldExprs = fields.zipWithIndex.map { case (field, ordinal) =>
90+
Alias(GetStructField(col, ordinal, Some(field.name)), field.name)()
91+
}
92+
93+
// recursively apply this method on nested fields
94+
val newUpdates = updates.map(u => u.copy(ref = u.ref.tail))
95+
val updatedFieldExprs = applyUpdates(
96+
fieldExprs,
97+
newUpdates,
98+
resolver,
99+
namePrefix :+ col.name)
100+
101+
// construct a new struct with updated field expressions
102+
toNamedStruct(fields, updatedFieldExprs)
103+
104+
case otherType =>
105+
val colName = (namePrefix :+ col.name).mkString(".")
106+
throw new AnalysisException(
107+
"Updating nested fields is only supported for StructType " +
108+
s"but $colName is of type $otherType"
109+
)
110+
}
111+
112+
// if there are conflicting updates, throw an exception
113+
// there are two illegal scenarios:
114+
// - multiple updates to the same column
115+
// - updates to a top-level struct and its nested fields (e.g., a.b and a.b.c)
116+
case updates if hasExactMatch(updates, col, resolver) =>
117+
val conflictingCols = updates.map(u => (namePrefix ++ u.ref).mkString("."))
118+
throw new AnalysisException(
119+
"Updates are in conflict for these columns: " +
120+
conflictingCols.distinct.mkString(", "))
121+
}
122+
}
123+
}
124+
125+
private def toNamedStruct(fields: Seq[StructField], fieldExprs: Seq[Expression]): Expression = {
126+
val namedStructExprs = fields.zip(fieldExprs).flatMap { case (field, expr) =>
127+
Seq(Literal(field.name), expr)
128+
}
129+
CreateNamedStruct(namedStructExprs)
130+
}
131+
132+
private def hasExactMatch(
133+
updates: Seq[ColumnUpdate],
134+
col: NamedExpression,
135+
resolver: Resolver): Boolean = {
136+
137+
updates.exists(assignment => isExactMatch(assignment, col, resolver))
138+
}
139+
140+
private def isExactMatch(
141+
update: ColumnUpdate,
142+
col: NamedExpression,
143+
resolver: Resolver): Boolean = {
144+
145+
update.ref match {
146+
case Seq(namePart) if resolver(namePart, col.name) => true
147+
case _ => false
148+
}
149+
}
150+
151+
protected def castIfNeeded(
152+
tableAttr: NamedExpression,
153+
expr: Expression,
154+
resolver: Resolver): Expression = {
155+
156+
val storeAssignmentPolicy = conf.storeAssignmentPolicy
157+
158+
// run the type check and catch type errors
159+
storeAssignmentPolicy match {
160+
case StoreAssignmentPolicy.STRICT | StoreAssignmentPolicy.ANSI =>
161+
if (expr.nullable && !tableAttr.nullable) {
162+
throw new AnalysisException(
163+
s"Cannot write nullable values to non-null column '${tableAttr.name}'")
164+
}
165+
166+
// we use byName = true to catch cases when struct field names don't match
167+
// e.g. a struct with fields (a, b) is assigned as a struct with fields (a, c) or (b, a)
168+
val errors = new mutable.ArrayBuffer[String]()
169+
val canWrite = DataType.canWrite(
170+
expr.dataType, tableAttr.dataType, byName = true, resolver, tableAttr.name,
171+
storeAssignmentPolicy, err => errors += err)
172+
173+
if (!canWrite) {
174+
throw new AnalysisException(
175+
s"Cannot write incompatible data:\n- ${errors.mkString("\n- ")}")
176+
}
177+
178+
case _ => // OK
179+
}
180+
181+
storeAssignmentPolicy match {
182+
case _ if tableAttr.dataType.sameType(expr.dataType) =>
183+
expr
184+
case StoreAssignmentPolicy.ANSI =>
185+
AnsiCast(expr, tableAttr.dataType, Option(conf.sessionLocalTimeZone))
186+
case _ =>
187+
Cast(expr, tableAttr.dataType, Option(conf.sessionLocalTimeZone))
188+
}
189+
}
190+
191+
protected def asAssignmentReference(expr: Expression): Seq[String] = expr match {
192+
case attr: AttributeReference => Seq(attr.name)
193+
case Alias(child, _) => asAssignmentReference(child)
194+
case GetStructField(child, _, Some(name)) => asAssignmentReference(child) :+ name
195+
case other: ExtractValue =>
196+
throw new AnalysisException(s"Updating nested fields is only supported for structs: $other")
197+
case other =>
198+
throw new AnalysisException(s"Cannot convert to a reference, unsupported expression: $other")
199+
}
200+
}

0 commit comments

Comments
 (0)