Skip to content

Commit 88d8de9

Browse files
committed
[SPARK-23581][SQL] Add interpreted unsafe projection
## What changes were proposed in this pull request? We currently can only create unsafe rows using code generation. This is a problem for situations in which code generation fails. There is no fallback, and as a result we cannot execute the query. This PR adds an interpreted version of `UnsafeProjection`. The implementation is modeled after `InterpretedMutableProjection`. It stores the expression results in a `GenericInternalRow`, and it then uses a conversion function to convert the `GenericInternalRow` into an `UnsafeRow`. This PR does not implement the actual code generated to interpreted fallback logic. This will be done in a follow-up. ## How was this patch tested? I am piggybacking on exiting `UnsafeProjection` tests, and I have added an interpreted version for each of these. Author: Herman van Hovell <[email protected]> Closes #20750 from hvanhovell/SPARK-23581.
1 parent dffeac3 commit 88d8de9

File tree

14 files changed

+555
-74
lines changed

14 files changed

+555
-74
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* A helper class to write data into global row buffer using `UnsafeArrayData` format,
3131
* used by {@link org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection}.
3232
*/
33-
public class UnsafeArrayWriter {
33+
public final class UnsafeArrayWriter extends UnsafeWriter {
3434

3535
private BufferHolder holder;
3636

@@ -83,7 +83,7 @@ private long getElementOffset(int ordinal, int elementSize) {
8383
return startingOffset + headerInBytes + ordinal * elementSize;
8484
}
8585

86-
public void setOffsetAndSize(int ordinal, long currentCursor, int size) {
86+
public void setOffsetAndSize(int ordinal, int currentCursor, int size) {
8787
assertIndexIsValid(ordinal);
8888
final long relativeOffset = currentCursor - startingOffset;
8989
final long offsetAndSize = (relativeOffset << 32) | (long)size;
@@ -96,49 +96,31 @@ private void setNullBit(int ordinal) {
9696
BitSetMethods.set(holder.buffer, startingOffset + 8, ordinal);
9797
}
9898

99-
public void setNullBoolean(int ordinal) {
100-
setNullBit(ordinal);
101-
// put zero into the corresponding field when set null
102-
Platform.putBoolean(holder.buffer, getElementOffset(ordinal, 1), false);
103-
}
104-
105-
public void setNullByte(int ordinal) {
99+
public void setNull1Bytes(int ordinal) {
106100
setNullBit(ordinal);
107101
// put zero into the corresponding field when set null
108102
Platform.putByte(holder.buffer, getElementOffset(ordinal, 1), (byte)0);
109103
}
110104

111-
public void setNullShort(int ordinal) {
105+
public void setNull2Bytes(int ordinal) {
112106
setNullBit(ordinal);
113107
// put zero into the corresponding field when set null
114108
Platform.putShort(holder.buffer, getElementOffset(ordinal, 2), (short)0);
115109
}
116110

117-
public void setNullInt(int ordinal) {
111+
public void setNull4Bytes(int ordinal) {
118112
setNullBit(ordinal);
119113
// put zero into the corresponding field when set null
120114
Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), 0);
121115
}
122116

123-
public void setNullLong(int ordinal) {
117+
public void setNull8Bytes(int ordinal) {
124118
setNullBit(ordinal);
125119
// put zero into the corresponding field when set null
126120
Platform.putLong(holder.buffer, getElementOffset(ordinal, 8), (long)0);
127121
}
128122

129-
public void setNullFloat(int ordinal) {
130-
setNullBit(ordinal);
131-
// put zero into the corresponding field when set null
132-
Platform.putFloat(holder.buffer, getElementOffset(ordinal, 4), (float)0);
133-
}
134-
135-
public void setNullDouble(int ordinal) {
136-
setNullBit(ordinal);
137-
// put zero into the corresponding field when set null
138-
Platform.putDouble(holder.buffer, getElementOffset(ordinal, 8), (double)0);
139-
}
140-
141-
public void setNull(int ordinal) { setNullLong(ordinal); }
123+
public void setNull(int ordinal) { setNull8Bytes(ordinal); }
142124

143125
public void write(int ordinal, boolean value) {
144126
assertIndexIsValid(ordinal);

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
* beginning of the global row buffer, we don't need to update `startingOffset` and can just call
3939
* `zeroOutNullBytes` before writing new data.
4040
*/
41-
public class UnsafeRowWriter {
41+
public final class UnsafeRowWriter extends UnsafeWriter {
4242

4343
private final BufferHolder holder;
4444
// The offset of the global buffer where we start to write this row.
@@ -93,18 +93,38 @@ public void setNullAt(int ordinal) {
9393
Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L);
9494
}
9595

96+
@Override
97+
public void setNull1Bytes(int ordinal) {
98+
setNullAt(ordinal);
99+
}
100+
101+
@Override
102+
public void setNull2Bytes(int ordinal) {
103+
setNullAt(ordinal);
104+
}
105+
106+
@Override
107+
public void setNull4Bytes(int ordinal) {
108+
setNullAt(ordinal);
109+
}
110+
111+
@Override
112+
public void setNull8Bytes(int ordinal) {
113+
setNullAt(ordinal);
114+
}
115+
96116
public long getFieldOffset(int ordinal) {
97117
return startingOffset + nullBitsSize + 8 * ordinal;
98118
}
99119

100-
public void setOffsetAndSize(int ordinal, long size) {
120+
public void setOffsetAndSize(int ordinal, int size) {
101121
setOffsetAndSize(ordinal, holder.cursor, size);
102122
}
103123

104-
public void setOffsetAndSize(int ordinal, long currentCursor, long size) {
124+
public void setOffsetAndSize(int ordinal, int currentCursor, int size) {
105125
final long relativeOffset = currentCursor - startingOffset;
106126
final long fieldOffset = getFieldOffset(ordinal);
107-
final long offsetAndSize = (relativeOffset << 32) | size;
127+
final long offsetAndSize = (relativeOffset << 32) | (long) size;
108128

109129
Platform.putLong(holder.buffer, fieldOffset, offsetAndSize);
110130
}
@@ -174,7 +194,7 @@ public void write(int ordinal, Decimal input, int precision, int scale) {
174194
if (input == null || !input.changePrecision(precision, scale)) {
175195
BitSetMethods.set(holder.buffer, startingOffset, ordinal);
176196
// keep the offset for future update
177-
setOffsetAndSize(ordinal, 0L);
197+
setOffsetAndSize(ordinal, 0);
178198
} else {
179199
final byte[] bytes = input.toJavaBigDecimal().unscaledValue().toByteArray();
180200
assert bytes.length <= 16;
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.catalyst.expressions.codegen;
18+
19+
import org.apache.spark.sql.types.Decimal;
20+
import org.apache.spark.unsafe.types.CalendarInterval;
21+
import org.apache.spark.unsafe.types.UTF8String;
22+
23+
/**
24+
* Base class for writing Unsafe* structures.
25+
*/
26+
public abstract class UnsafeWriter {
27+
public abstract void setNull1Bytes(int ordinal);
28+
public abstract void setNull2Bytes(int ordinal);
29+
public abstract void setNull4Bytes(int ordinal);
30+
public abstract void setNull8Bytes(int ordinal);
31+
public abstract void write(int ordinal, boolean value);
32+
public abstract void write(int ordinal, byte value);
33+
public abstract void write(int ordinal, short value);
34+
public abstract void write(int ordinal, int value);
35+
public abstract void write(int ordinal, long value);
36+
public abstract void write(int ordinal, float value);
37+
public abstract void write(int ordinal, double value);
38+
public abstract void write(int ordinal, Decimal input, int precision, int scale);
39+
public abstract void write(int ordinal, UTF8String input);
40+
public abstract void write(int ordinal, byte[] input);
41+
public abstract void write(int ordinal, CalendarInterval input);
42+
public abstract void setOffsetAndSize(int ordinal, int currentCursor, int size);
43+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,32 @@ trait Nondeterministic extends Expression {
328328
protected def evalInternal(input: InternalRow): Any
329329
}
330330

331+
/**
332+
* An expression that contains mutable state. A stateful expression is always non-deterministic
333+
* because the results it produces during evaluation are not only dependent on the given input
334+
* but also on its internal state.
335+
*
336+
* The state of the expressions is generally not exposed in the parameter list and this makes
337+
* comparing stateful expressions problematic because similar stateful expressions (with the same
338+
* parameter list) but with different internal state will be considered equal. This is especially
339+
* problematic during tree transformations. In order to counter this the `fastEquals` method for
340+
* stateful expressions only returns `true` for the same reference.
341+
*
342+
* A stateful expression should never be evaluated multiple times for a single row. This should
343+
* only be a problem for interpreted execution. This can be prevented by creating fresh copies
344+
* of the stateful expression before execution, these can be made using the `freshCopy` function.
345+
*/
346+
trait Stateful extends Nondeterministic {
347+
/**
348+
* Return a fresh uninitialized copy of the stateful expression.
349+
*/
350+
def freshCopy(): Stateful
351+
352+
/**
353+
* Only the same reference is considered equal.
354+
*/
355+
override def fastEquals(other: TreeNode[_]): Boolean = this eq other
356+
}
331357

332358
/**
333359
* A leaf expression, i.e. one without any child expressions.

0 commit comments

Comments
 (0)