Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ object Block {
} else {
args.foreach {
case _: ExprValue | _: Inline | _: Block =>
case _: Int | _: Long | _: Float | _: Double | _: String =>
case _: Boolean | _: Int | _: Long | _: Float | _: Double | _: String =>
case other => throw new IllegalArgumentException(
s"Can not interpolate ${other.getClass.getName} into code block.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -375,6 +376,19 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
if (canBeComputedUsingSwitch && hset.size <= SQLConf.get.optimizerInSetSwitchThreshold) {
genCodeWithSwitch(ctx, ev)
} else {
genCodeWithSet(ctx, ev)
}
}

private def canBeComputedUsingSwitch: Boolean = child.dataType match {
case ByteType | ShortType | IntegerType | DateType => true
case _ => false
}

private def genCodeWithSet(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, c => {
val setTerm = ctx.addReferenceObj("set", set)
val setIsNull = if (hasNull) {
Expand All @@ -389,6 +403,34 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with
})
}

// spark.sql.optimizer.inSetSwitchThreshold has an appropriate upper limit,
// so the code size should not exceed 64KB
private def genCodeWithSwitch(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val caseValuesGen = hset.filter(_ != null).map(Literal(_).genCode(ctx))
val valueGen = child.genCode(ctx)

val caseBranches = caseValuesGen.map(literal =>
code"""
case ${literal.value}:
${ev.value} = true;
break;
""")

ev.copy(code =
code"""
${valueGen.code}
${CodeGenerator.JAVA_BOOLEAN} ${ev.isNull} = ${valueGen.isNull};
${CodeGenerator.JAVA_BOOLEAN} ${ev.value} = false;
if (!${valueGen.isNull}) {
switch (${valueGen.value}) {
${caseBranches.mkString("\n")}
default:
${ev.isNull} = $hasNull;
}
}
""")
}

override def sql: String = {
val valueSQL = child.sql
val listSQL = hset.toSeq.map(Literal(_).sql).mkString(", ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ object SQLConf {
.intConf
.createWithDefault(10)

val OPTIMIZER_INSET_SWITCH_THRESHOLD =
buildConf("spark.sql.optimizer.inSetSwitchThreshold")
.internal()
.doc("Configures the max set size in InSet for which Spark will generate code with " +
"switch statements. This is applicable only to bytes, shorts, ints, dates.")
.intConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To prevent user configuration errors, can we have a meaningful min/max check?

.checkValue(v => v > 0 && v < ???, ...)

Copy link
Contributor Author

@aokolnychyi aokolnychyi Feb 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kiszk @mgaido91 we had a discussion about generating codes bigger than 64KB.

I am wondering if we still want to split the switch-based logic into multiple methods if we have this check suggested by @dongjoon-hyun. I've implemented the split logic locally. However, the code looks more complicated and we will need some extensions to splitExpressionsWithCurrentInputs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why you'd need any extension. We have other parts of the code with swtich which are split. I think in general it is safer to have it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 could you point me to an example?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, you're right sorry, I was remembering wrongly. There were switch based expressions for for splitting them we migrated them to a do while approach. Since the whole point of this PR is to introduce the switch construct, then I agree with you that the best way is to add a constraint here in order to have the number small enough not to cause issues with code generation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the default and max values then? The switch logic was faster than HashSet on 500 elements for every data type and on every machine I tested. In some cases, HashSet started to outperform on 550+. Also, I had to generate a set of 6000+ element to hit the limit of 64KB. My proposal is to have 400 as default and 600 as max. Then we should be safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, sounds fine to me. Please add a comment in the codegen part in order to explain why we are not splitting the code. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'll add a comment.

.checkValue(threshold => threshold >= 0 && threshold <= 600, "The max set size " +
"for using switch statements in InSet must be non-negative and less than or equal to 600")
.createWithDefault(400)

val OPTIMIZER_PLAN_CHANGE_LOG_LEVEL = buildConf("spark.sql.optimizer.planChangeLog.level")
.internal()
.doc("Configures the log level for logging the change from the original plan to the new " +
Expand Down Expand Up @@ -1701,6 +1711,8 @@ class SQLConf extends Serializable with Logging {

def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)

def optimizerInSetSwitchThreshold: Int = getConf(OPTIMIZER_INSET_SWITCH_THRESHOLD)

def optimizerPlanChangeLogLevel: String = getConf(OPTIMIZER_PLAN_CHANGE_LOG_LEVEL)

def optimizerPlanChangeRules: Option[String] = getConf(OPTIMIZER_PLAN_CHANGE_LOG_RULES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import scala.collection.immutable.HashSet

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.RandomDataGenerator
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.encoders.ExamplePointUDT
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -241,6 +242,52 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}

test("switch statements in InSet for bytes, shorts, ints, dates") {
val byteValues = Set[Any](1.toByte, 2.toByte, Byte.MinValue, Byte.MaxValue)
val shortValues = Set[Any](-10.toShort, 20.toShort, Short.MinValue, Short.MaxValue)
val intValues = Set[Any](20, -100, 30, Int.MinValue, Int.MaxValue)
val dateValues = Set[Any](
CatalystTypeConverters.convertToCatalyst(Date.valueOf("2017-01-01")),
CatalystTypeConverters.convertToCatalyst(Date.valueOf("1950-01-02")))

def check(presentValue: Expression, absentValue: Expression, values: Set[Any]): Unit = {
require(presentValue.dataType == absentValue.dataType)

val nullLiteral = Literal(null, presentValue.dataType)

checkEvaluation(InSet(nullLiteral, values), expected = null)
checkEvaluation(InSet(nullLiteral, values + null), expected = null)
checkEvaluation(InSet(presentValue, values), expected = true)
checkEvaluation(InSet(presentValue, values + null), expected = true)
checkEvaluation(InSet(absentValue, values), expected = false)
checkEvaluation(InSet(absentValue, values + null), expected = null)
}

def checkAllTypes(): Unit = {
check(presentValue = Literal(2.toByte), absentValue = Literal(3.toByte), byteValues)
check(presentValue = Literal(Byte.MinValue), absentValue = Literal(5.toByte), byteValues)
check(presentValue = Literal(20.toShort), absentValue = Literal(-14.toShort), shortValues)
check(presentValue = Literal(Short.MaxValue), absentValue = Literal(30.toShort), shortValues)
check(presentValue = Literal(20), absentValue = Literal(-14), intValues)
check(presentValue = Literal(Int.MinValue), absentValue = Literal(2), intValues)
check(
presentValue = Literal(Date.valueOf("2017-01-01")),
absentValue = Literal(Date.valueOf("2017-01-02")),
dateValues)
check(
presentValue = Literal(Date.valueOf("1950-01-02")),
absentValue = Literal(Date.valueOf("2017-10-02")),
dateValues)
}

withSQLConf(SQLConf.OPTIMIZER_INSET_SWITCH_THRESHOLD.key -> "0") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After https://github.com/apache/spark/pull/23171/files#r261888276, we need to increase this from 0 to 1.

checkAllTypes()
}
withSQLConf(SQLConf.OPTIMIZER_INSET_SWITCH_THRESHOLD.key -> "20") {
checkAllTypes()
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a test case that spark.sql.optimizer.inSetSwitchThreshold has maximum value and this optimization calls genCodeWithSwitch()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean testing that if the set size is 100 and spark.sql.optimizer.inSetSwitchThreshold is 100, then genCodeWithSwitch is still applied?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question addressed what you are talking here. The current implementation can accept large int value (e.g. Integer.MAX) for spark.sql.optimizer.inSetSwitchThreshold. Thus, I am afraid switch code requires more than 64KB java byte code.
If the option would have the appropriate upper limit, it is fine.


test("SPARK-22501: In should not generate codes beyond 64KB") {
val N = 3000
val sets = (1 to N).map(i => Literal(i.toDouble))
Expand Down
Loading