Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
12b591e
remove unboxing operations when an array is primitive type array
kiszk Jun 26, 2016
909d210
addressed comments
kiszk Jun 27, 2016
d481cb0
fix test failure
kiszk Jun 27, 2016
66454f3
revert miss operation of git
kiszk Nov 13, 2016
03e0cfa
addressed review comments
kiszk Nov 13, 2016
5620733
fix scala style error
kiszk Nov 13, 2016
2906c74
support CreateMap
kiszk Nov 14, 2016
d5b3a8a
addressed review comments
kiszk Nov 15, 2016
d29bb97
addressed review comments
kiszk Nov 17, 2016
88daf42
addressed review comments
kiszk Nov 18, 2016
da82efe
addressed review comment
kiszk Nov 18, 2016
7914230
A patch for expected catalystValue. (#1)
viirya Nov 21, 2016
69e0eed
address review comment
kiszk Nov 22, 2016
597dc72
add a test for Byte array to ComplexTypeSuite
kiszk Dec 3, 2016
a971336
address review comments
kiszk Dec 6, 2016
be01d91
commit a file
kiszk Dec 6, 2016
1c7f972
remove two test suites that I newly added
kiszk Dec 10, 2016
360d139
Create UnsafeArrayData by using UnsafeArrayWriter
kiszk Dec 10, 2016
b66d0f6
fix test failure - DataFrameSuite.Star Expansion
kiszk Dec 11, 2016
08262b1
support createMap
kiszk Dec 11, 2016
438944b
calculate initial buffer size at compilation time
kiszk Dec 11, 2016
f418062
addressed review comments
kiszk Dec 12, 2016
d24c7b1
address review comment
kiszk Dec 19, 2016
c159f03
Create UnsafeArrayData by using UnsafeRow and UnsafeArrayWriter
kiszk Dec 21, 2016
0af0828
Create UnsafeArrayData by making ArrayData mutable
kiszk Dec 23, 2016
f6e9a83
addressed review comments
kiszk Dec 23, 2016
327c8ac
addressed review comments
kiszk Dec 24, 2016
7a7e9c3
addressed review comment
kiszk Dec 24, 2016
ee237b4
addressed review comments
kiszk Dec 24, 2016
28df09f
fixed test failure
kiszk Dec 24, 2016
293b344
addressed review comments
kiszk Dec 26, 2016
69d5e33
addressed review comments
kiszk Dec 26, 2016
2556ba5
fix a test failure
kiszk Dec 26, 2016
34bff15
fix a test failure
kiszk Dec 26, 2016
4a0409a
addressed review comments
kiszk Dec 26, 2016
dcce4c5
addressed review comment
kiszk Dec 27, 2016
2f67ac2
address a review comment
kiszk Dec 27, 2016
c986361
address a review comment
kiszk Dec 27, 2016
cfe2e3d
revert a change of an exception message
kiszk Dec 28, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,58 @@ public UnsafeMapData getMap(int ordinal) {
return map;
}

@Override
public void update(int ordinal, Object value) { throw new UnsupportedOperationException(); }

public void setNullAt(int ordinal) {
assertIndexIsValid(ordinal);
BitSetMethods.set(baseObject, baseOffset + 8, ordinal);

/* we assume the corrresponding column was already 0 or
will be set to 0 later by the caller side */
}

public void setBoolean(int ordinal, boolean value) {
assertIndexIsValid(ordinal);
Platform.putBoolean(baseObject, getElementOffset(ordinal, 1), value);
}

public void setByte(int ordinal, byte value) {
assertIndexIsValid(ordinal);
Platform.putByte(baseObject, getElementOffset(ordinal, 1), value);
}

public void setShort(int ordinal, short value) {
assertIndexIsValid(ordinal);
Platform.putShort(baseObject, getElementOffset(ordinal, 2), value);
}

public void setInt(int ordinal, int value) {
assertIndexIsValid(ordinal);
Platform.putInt(baseObject, getElementOffset(ordinal, 4), value);
}

public void setLong(int ordinal, long value) {
assertIndexIsValid(ordinal);
Platform.putLong(baseObject, getElementOffset(ordinal, 8), value);
}

public void setFloat(int ordinal, float value) {
if (Float.isNaN(value)) {
value = Float.NaN;
}
assertIndexIsValid(ordinal);
Platform.putFloat(baseObject, getElementOffset(ordinal, 4), value);
}

public void setDouble(int ordinal, double value) {
if (Double.isNaN(value)) {
value = Double.NaN;
}
assertIndexIsValid(ordinal);
Platform.putDouble(baseObject, getElementOffset(ordinal, 8), value);
}

// This `hashCode` computation could consume much processor time for large data.
// If the computation becomes a bottleneck, we can use a light-weight logic; the first fixed bytes
// are used to compute `hashCode` (See `Vector.hashCode`).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.analysis.Star
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, TypeUtils}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.types.UTF8String

/**
Expand All @@ -43,7 +44,7 @@ case class CreateArray(children: Seq[Expression]) extends Expression {
override def checkInputDataTypes(): TypeCheckResult =
TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), "function array")

override def dataType: DataType = {
override def dataType: ArrayType = {
ArrayType(
children.headOption.map(_.dataType).getOrElse(NullType),
containsNull = children.exists(_.nullable))
Expand All @@ -56,33 +57,99 @@ case class CreateArray(children: Seq[Expression]) extends Expression {
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val arrayClass = classOf[GenericArrayData].getName
val values = ctx.freshName("values")
ctx.addMutableState("Object[]", values, s"this.$values = null;")

ev.copy(code = s"""
this.$values = new Object[${children.size}];""" +
ctx.splitExpressions(
ctx.INPUT_ROW,
children.zipWithIndex.map { case (e, i) =>
val eval = e.genCode(ctx)
eval.code + s"""
if (${eval.isNull}) {
$values[$i] = null;
} else {
$values[$i] = ${eval.value};
}
"""
}) +
s"""
final ArrayData ${ev.value} = new $arrayClass($values);
this.$values = null;
""", isNull = "false")
val et = dataType.elementType
val evals = children.map(e => e.genCode(ctx))
val (preprocess, assigns, postprocess, arrayData) =
GenArrayData.genCodeToCreateArrayData(ctx, et, evals, false)
ev.copy(
code = preprocess + ctx.splitExpressions(ctx.INPUT_ROW, assigns) + postprocess,
value = arrayData,
isNull = "false")
}

override def prettyName: String = "array"
}

private [sql] object GenArrayData {
/**
* Return Java code pieces based on DataType and isPrimitive to allocate ArrayData class
*
* @param ctx a [[CodegenContext]]
* @param elementType data type of underlying array elements
* @param elementsCode a set of [[ExprCode]] for each element of an underlying array
* @param isMapKey if true, throw an exception when the element is null
* @return (code pre-assignments, assignments to each array elements, code post-assignments,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No param doc for allowNull.

* arrayData name)
*/
def genCodeToCreateArrayData(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need 2 methods....
can we just write

def genCodeToCreateArrayData(
    ctx: CodegenContext,
    elementType: DataType,
    elementCodes: Seq[ExprCode],
    allowNull: Boolean): (String, String, String, String) = {
  val arrayName = ctx.freshName("array")
  val arrayDataName = ctx.freshName("arrayData")
  val numElements = elementCodes.length

  if (ctx.isPrimitiveType(elementType)) {
    ctx.addMutableState("UnsafeArrayData", arrayDataName, "new UnsafeArrayData();")
    val unsafeArraySizeInBytes =
      UnsafeArrayData.calculateHeaderPortionInBytes(numElements) +
        ByteArrayMethods.roundNumberOfBytesToNearestWord(elementType.defaultSize * numElements)
    val baseOffset = Platform.BYTE_ARRAY_OFFSET
    val preprocess =
      s"""
        byte[] $arrayName = new byte[$unsafeArraySizeInBytes];
        Platform.putLong($arrayName, $baseOffset, $numElements);
        $arrayDataName.pointTo($arrayName, $baseOffset, $unsafeArraySizeInBytes);
      """

      val primitiveTypeName = ctx.primitiveTypeName(elementType)
      val assignElements = ctx.splitExpressions(elementCodes.zipWithIndex.map { case (eval, i) => ... })
      
      val createArrayData = ""
      (preprocess, assignElements, createArrayData, arrayData)
  } else {
    ......
  }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see. I will do it tonight

ctx: CodegenContext,
elementType: DataType,
elementsCode: Seq[ExprCode],
isMapKey: Boolean): (String, Seq[String], String, String) = {
val arrayName = ctx.freshName("array")
val arrayDataName = ctx.freshName("arrayData")
val numElements = elementsCode.length

if (!ctx.isPrimitiveType(elementType)) {
val genericArrayClass = classOf[GenericArrayData].getName
ctx.addMutableState("Object[]", arrayName,
s"this.$arrayName = new Object[${numElements}];")

val assignments = elementsCode.zipWithIndex.map { case (eval, i) =>
val isNullAssignment = if (!isMapKey) {
s"$arrayName[$i] = null;"
} else {
"throw new RuntimeException(\"Cannot use null as map key!\");"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message seems strange. This is not only for map key.

}
eval.code + s"""
if (${eval.isNull}) {
$isNullAssignment
} else {
$arrayName[$i] = ${eval.value};
}
"""
}

("",
assignments,
s"final ArrayData $arrayDataName = new $genericArrayClass($arrayName);",
arrayDataName)
} else {
val unsafeArraySizeInBytes =
UnsafeArrayData.calculateHeaderPortionInBytes(numElements) +
ByteArrayMethods.roundNumberOfBytesToNearestWord(elementType.defaultSize * numElements)
val baseOffset = Platform.BYTE_ARRAY_OFFSET
ctx.addMutableState("UnsafeArrayData", arrayDataName, "");

val primitiveValueTypeName = ctx.primitiveTypeName(elementType)
val assignments = elementsCode.zipWithIndex.map { case (eval, i) =>
val isNullAssignment = if (!isMapKey) {
s"$arrayDataName.setNullAt($i);"
} else {
"throw new RuntimeException(\"Cannot use null as map key!\");"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

}
eval.code + s"""
if (${eval.isNull}) {
$isNullAssignment
} else {
$arrayDataName.set$primitiveValueTypeName($i, ${eval.value});
}
"""
}

(s"""
byte[] $arrayName = new byte[$unsafeArraySizeInBytes];
$arrayDataName = new UnsafeArrayData();
Platform.putLong($arrayName, $baseOffset, $numElements);
$arrayDataName.pointTo($arrayName, $baseOffset, $unsafeArraySizeInBytes);
""",
assignments,
"",
arrayDataName)
}
}
}

/**
* Returns a catalyst Map containing the evaluation of all children expressions as keys and values.
* The children are a flatted sequence of kv pairs, e.g. (key1, value1, key2, value2, ...)
Expand Down Expand Up @@ -133,49 +200,26 @@ case class CreateMap(children: Seq[Expression]) extends Expression {
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val arrayClass = classOf[GenericArrayData].getName
val mapClass = classOf[ArrayBasedMapData].getName
val keyArray = ctx.freshName("keyArray")
val valueArray = ctx.freshName("valueArray")
ctx.addMutableState("Object[]", keyArray, s"this.$keyArray = null;")
ctx.addMutableState("Object[]", valueArray, s"this.$valueArray = null;")

val keyData = s"new $arrayClass($keyArray)"
val valueData = s"new $arrayClass($valueArray)"
ev.copy(code = s"""
$keyArray = new Object[${keys.size}];
$valueArray = new Object[${values.size}];""" +
ctx.splitExpressions(
ctx.INPUT_ROW,
keys.zipWithIndex.map { case (key, i) =>
val eval = key.genCode(ctx)
s"""
${eval.code}
if (${eval.isNull}) {
throw new RuntimeException("Cannot use null as map key!");
} else {
$keyArray[$i] = ${eval.value};
}
"""
}) +
ctx.splitExpressions(
ctx.INPUT_ROW,
values.zipWithIndex.map { case (value, i) =>
val eval = value.genCode(ctx)
s"""
${eval.code}
if (${eval.isNull}) {
$valueArray[$i] = null;
} else {
$valueArray[$i] = ${eval.value};
}
"""
}) +
val MapType(keyDt, valueDt, _) = dataType
val evalKeys = keys.map(e => e.genCode(ctx))
val evalValues = values.map(e => e.genCode(ctx))
val (preprocessKeyData, assignKeys, postprocessKeyData, keyArrayData) =
GenArrayData.genCodeToCreateArrayData(ctx, keyDt, evalKeys, true)
val (preprocessValueData, assignValues, postprocessValueData, valueArrayData) =
GenArrayData.genCodeToCreateArrayData(ctx, valueDt, evalValues, false)
val code =
s"""
final MapData ${ev.value} = new $mapClass($keyData, $valueData);
this.$keyArray = null;
this.$valueArray = null;
""", isNull = "false")
final boolean ${ev.isNull} = false;
$preprocessKeyData
${ctx.splitExpressions(ctx.INPUT_ROW, assignKeys)}
$postprocessKeyData
$preprocessValueData
${ctx.splitExpressions(ctx.INPUT_ROW, assignValues)}
$postprocessValueData
final MapData ${ev.value} = new $mapClass($keyArrayData, $valueArrayData);
"""
ev.copy(code = code)
}

override def prettyName: String = "map"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ abstract class ArrayData extends SpecializedGetters with Serializable {

def array: Array[Any]

def setNullAt(i: Int): Unit

def update(i: Int, value: Any): Unit

// default implementation (slow)
def setBoolean(i: Int, value: Boolean): Unit = update(i, value)
def setByte(i: Int, value: Byte): Unit = update(i, value)
def setShort(i: Int, value: Short): Unit = update(i, value)
def setInt(i: Int, value: Int): Unit = update(i, value)
def setLong(i: Int, value: Long): Unit = update(i, value)
def setFloat(i: Int, value: Float): Unit = update(i, value)
def setDouble(i: Int, value: Double): Unit = update(i, value)

def toBooleanArray(): Array[Boolean] = {
val size = numElements()
val values = new Array[Boolean](size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class GenericArrayData(val array: Array[Any]) extends ArrayData {
override def getArray(ordinal: Int): ArrayData = getAs(ordinal)
override def getMap(ordinal: Int): MapData = getAs(ordinal)

override def setNullAt(ordinal: Int): Unit = array(ordinal) = null

override def update(ordinal: Int, value: Any): Unit = array(ordinal) = value

override def toString(): String = array.mkString("[", ",", "]")

override def equals(o: Any): Boolean = {
Expand Down
Loading