Skip to content

Commit 11f80a3

Browse files
committed
[SPARK-9368][SQL] Support get(ordinal, dataType) generic getter in UnsafeRow.
1 parent c025c3d commit 11f80a3

File tree

5 files changed

+51
-7
lines changed

5 files changed

+51
-7
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.HashSet;
2525
import java.util.Set;
2626

27-
import org.apache.spark.sql.types.DataType;
27+
import org.apache.spark.sql.types.*;
2828
import org.apache.spark.unsafe.PlatformDependent;
2929
import org.apache.spark.unsafe.array.ByteArrayMethods;
3030
import org.apache.spark.unsafe.bitset.BitSetMethods;
@@ -235,6 +235,35 @@ public Object get(int ordinal) {
235235
throw new UnsupportedOperationException();
236236
}
237237

238+
@Override
239+
public Object get(int ordinal, DataType dataType) {
240+
if (dataType instanceof NullType) {
241+
return null;
242+
} else if (dataType instanceof BooleanType) {
243+
return getBoolean(ordinal);
244+
} else if (dataType instanceof ByteType) {
245+
return getByte(ordinal);
246+
} else if (dataType instanceof ShortType) {
247+
return getShort(ordinal);
248+
} else if (dataType instanceof IntegerType) {
249+
return getInt(ordinal);
250+
} else if (dataType instanceof LongType) {
251+
return getLong(ordinal);
252+
} else if (dataType instanceof FloatType) {
253+
return getFloat(ordinal);
254+
} else if (dataType instanceof DoubleType) {
255+
return getDouble(ordinal);
256+
} else if (dataType instanceof DecimalType) {
257+
return getDecimal(ordinal);
258+
} else if (dataType instanceof StringType) {
259+
return getUTF8String(ordinal);
260+
} else if (dataType instanceof StructType) {
261+
return getStruct(ordinal, ((StructType) dataType).size());
262+
} else {
263+
throw new UnsupportedOperationException("Unsupported data type " + dataType.simpleString());
264+
}
265+
}
266+
238267
@Override
239268
public boolean isNullAt(int ordinal) {
240269
assertIndexIsValid(ordinal);
@@ -436,4 +465,19 @@ public String toString() {
436465
public boolean anyNull() {
437466
return BitSetMethods.anySet(baseObject, baseOffset, bitSetWidthInBytes / 8);
438467
}
468+
469+
/**
470+
* Writes the content of this row into a memory address, identified by an object and an offset.
471+
* The target memory address must already been allocated, and have enough space to hold all the
472+
* bytes in this string.
473+
*/
474+
public void writeToMemory(Object target, long targetOffset) {
475+
PlatformDependent.copyMemory(
476+
baseObject,
477+
baseOffset,
478+
target,
479+
targetOffset,
480+
sizeInBytes
481+
);
482+
}
439483
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ abstract class InternalRow extends Serializable {
3030

3131
def numFields: Int
3232

33-
def get(ordinal: Int): Any
33+
def get(ordinal: Int): Any = get(ordinal, null)
3434

3535
def genericGet(ordinal: Int): Any = get(ordinal, null)
3636

37-
def get(ordinal: Int, dataType: DataType): Any = get(ordinal)
37+
def get(ordinal: Int, dataType: DataType): Any
3838

3939
def getAs[T](ordinal: Int, dataType: DataType): T = get(ordinal, dataType).asInstanceOf[T]
4040

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR
219219
values(i).isNull = true
220220
}
221221

222-
override def get(i: Int): Any = values(i).boxed
222+
override def get(i: Int, dataType: DataType): Any = values(i).boxed
223223

224224
override def getStruct(ordinal: Int, numFields: Int): InternalRow = {
225225
values(ordinal).boxed.asInstanceOf[InternalRow]

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
183183
public void setNullAt(int i) { nullBits[i] = true; }
184184
public boolean isNullAt(int i) { return nullBits[i]; }
185185

186-
public Object get(int i) {
186+
public Object get(int i, ${classOf[DataType].getName} dataType) {
187187
if (isNullAt(i)) return null;
188188
switch (i) {
189189
$getCases

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class GenericInternalRow(protected[sql] val values: Array[Any]) extends Internal
9999

100100
override def numFields: Int = values.length
101101

102-
override def get(i: Int): Any = values(i)
102+
override def get(i: Int, dataType: DataType): Any = values(i)
103103

104104
override def getStruct(ordinal: Int, numFields: Int): InternalRow = {
105105
values(ordinal).asInstanceOf[InternalRow]
@@ -130,7 +130,7 @@ class GenericMutableRow(val values: Array[Any]) extends MutableRow {
130130

131131
override def numFields: Int = values.length
132132

133-
override def get(i: Int): Any = values(i)
133+
override def get(i: Int, dataType: DataType): Any = values(i)
134134

135135
override def getStruct(ordinal: Int, numFields: Int): InternalRow = {
136136
values(ordinal).asInstanceOf[InternalRow]

0 commit comments

Comments
 (0)