Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 5 additions & 22 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -252,24 +252,8 @@ private[spark] object SQLConf {
"not be provided to ExchangeCoordinator.",
isPublic = false)

val TUNGSTEN_ENABLED = booleanConf("spark.sql.tungsten.enabled",
defaultValue = Some(true),
doc = "When true, use the optimized Tungsten physical execution backend which explicitly " +
"manages memory and dynamically generates bytecode for expression evaluation.")

val CODEGEN_ENABLED = booleanConf("spark.sql.codegen",
defaultValue = Some(true), // use TUNGSTEN_ENABLED as default
doc = "When true, code will be dynamically generated at runtime for expression evaluation in" +
" a specific query.",
isPublic = false)

val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
defaultValue = Some(true), // use TUNGSTEN_ENABLED as default
doc = "When true, use the new optimized Tungsten physical execution backend.",
isPublic = false)

val SUBEXPRESSION_ELIMINATION_ENABLED = booleanConf("spark.sql.subexpressionElimination.enabled",
defaultValue = Some(true), // use CODEGEN_ENABLED as default
defaultValue = Some(true),
doc = "When true, common subexpressions will be eliminated.",
isPublic = false)

Expand Down Expand Up @@ -475,6 +459,9 @@ private[spark] object SQLConf {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
val EXTERNAL_SORT = "spark.sql.planner.externalSort"
val USE_SQL_AGGREGATE2 = "spark.sql.useAggregate2"
val TUNGSTEN_ENABLED = "spark.sql.tungsten.enabled"
val CODEGEN_ENABLED = "spark.sql.codegen"
val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
}
}

Expand Down Expand Up @@ -541,14 +528,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf {

private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)

private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, getConf(TUNGSTEN_ENABLED))

def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)

private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED, getConf(TUNGSTEN_ENABLED))

private[spark] def subexpressionEliminationEnabled: Boolean =
getConf(SUBEXPRESSION_ELIMINATION_ENABLED, codegenEnabled)
getConf(SUBEXPRESSION_ELIMINATION_ENABLED)

private[spark] def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ case class Exchange(
* Returns true iff we can support the data type, and we are not doing range partitioning.
*/
private lazy val tungstenMode: Boolean = {
unsafeEnabled && codegenEnabled && GenerateUnsafeProjection.canSupport(child.schema) &&
GenerateUnsafeProjection.canSupport(child.schema) &&
!newPartitioning.isInstanceOf[RangePartitioning]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
|Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
""".stripMargin.trim
}
}
120 changes: 45 additions & 75 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
protected def sparkContext = sqlContext.sparkContext

// sqlContext will be null when we are being deserialized on the slaves. In this instance
// the value of codegenEnabled/unsafeEnabled will be set by the desserializer after the
// the value of subexpressionEliminationEnabled will be set by the desserializer after the
// constructor has run.
val codegenEnabled: Boolean = if (sqlContext != null) {
sqlContext.conf.codegenEnabled
} else {
false
}
val unsafeEnabled: Boolean = if (sqlContext != null) {
sqlContext.conf.unsafeEnabled
} else {
false
}
val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) {
sqlContext.conf.subexpressionEliminationEnabled
} else {
Expand Down Expand Up @@ -233,83 +223,63 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ

protected def newProjection(
expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = {
log.debug(
s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
if (codegenEnabled) {
try {
GenerateProjection.generate(expressions, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate projection, fallback to interpret", e)
new InterpretedProjection(expressions, inputSchema)
}
}
} else {
new InterpretedProjection(expressions, inputSchema)
log.debug(s"Creating Projection: $expressions, inputSchema: $inputSchema")
try {
GenerateProjection.generate(expressions, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate projection, fallback to interpret", e)
new InterpretedProjection(expressions, inputSchema)
}
}
}

protected def newMutableProjection(
expressions: Seq[Expression],
inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(
s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
if(codegenEnabled) {
try {
GenerateMutableProjection.generate(expressions, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate mutable projection, fallback to interpreted", e)
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}
} else {
() => new InterpretedMutableProjection(expressions, inputSchema)
expressions: Seq[Expression], inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
try {
GenerateMutableProjection.generate(expressions, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate mutable projection, fallback to interpreted", e)
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}
}

protected def newPredicate(
expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
if (codegenEnabled) {
try {
GeneratePredicate.generate(expression, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate predicate, fallback to interpreted", e)
InterpretedPredicate.create(expression, inputSchema)
}
}
} else {
InterpretedPredicate.create(expression, inputSchema)
try {
GeneratePredicate.generate(expression, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate predicate, fallback to interpreted", e)
InterpretedPredicate.create(expression, inputSchema)
}
}
}

protected def newOrdering(
order: Seq[SortOrder],
inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
if (codegenEnabled) {
try {
GenerateOrdering.generate(order, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate ordering, fallback to interpreted", e)
new InterpretedOrdering(order, inputSchema)
}
}
} else {
new InterpretedOrdering(order, inputSchema)
order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
try {
GenerateOrdering.generate(order, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate ordering, fallback to interpreted", e)
new InterpretedOrdering(order, inputSchema)
}
}
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy
class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
val sparkContext: SparkContext = sqlContext.sparkContext

def codegenEnabled: Boolean = sqlContext.conf.codegenEnabled

def unsafeEnabled: Boolean = sqlContext.conf.unsafeEnabled

def numPartitions: Int = sqlContext.conf.numShufflePartitions

def strategies: Seq[Strategy] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* if necessary.
*/
def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
if (sqlContext.conf.unsafeEnabled && sqlContext.conf.codegenEnabled &&
TungstenSort.supportsSchema(child.schema)) {
if (TungstenSort.supportsSchema(child.schema)) {
execution.TungstenSort(sortExprs, global, child)
} else {
execution.Sort(sortExprs, global, child)
Expand Down Expand Up @@ -368,8 +367,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Project(projectList, child) =>
// If unsafe mode is enabled and we support these data types in Unsafe, use the
// Tungsten project. Otherwise, use the normal project.
if (sqlContext.conf.unsafeEnabled &&
UnsafeProjection.canSupport(projectList) && UnsafeProjection.canSupport(child.schema)) {
if (UnsafeProjection.canSupport(projectList) && UnsafeProjection.canSupport(child.schema)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnsafeProjection should support all the dataType now, we could also remove this (in another PR).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea let's do that in a separate PR. If it does support all types, we should also just remove the normal Project.

execution.TungstenProject(projectList, planLater(child)) :: Nil
} else {
execution.Project(projectList, planLater(child)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,10 @@ object Utils {
resultExpressions: Seq[NamedExpression],
child: SparkPlan): Seq[SparkPlan] = {
// Check if we can use TungstenAggregate.
val usesTungstenAggregate =
child.sqlContext.conf.unsafeEnabled &&
TungstenAggregate.supportsAggregate(
val usesTungstenAggregate = TungstenAggregate.supportsAggregate(
groupingExpressions,
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))


// 1. Create an Aggregate Operator for partial aggregations.

val groupingAttributes = groupingExpressions.map(_.toAttribute)
Expand Down Expand Up @@ -144,11 +141,9 @@ object Utils {
child: SparkPlan): Seq[SparkPlan] = {

val aggregateExpressions = functionsWithDistinct ++ functionsWithoutDistinct
val usesTungstenAggregate =
child.sqlContext.conf.unsafeEnabled &&
TungstenAggregate.supportsAggregate(
groupingExpressions,
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
val usesTungstenAggregate = TungstenAggregate.supportsAggregate(
groupingExpressions,
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))

// functionsWithDistinct is guaranteed to be non-empty. Even though it may contain more than one
// DISTINCT aggregate function, all of those functions will have the same column expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,33 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
}
(keyValueOutput, runFunc)

case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
logWarning(
s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated and " +
s"will be ignored. Tungsten will continue to be used.")
Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true"))
}
(keyValueOutput, runFunc)

case Some((SQLConf.Deprecated.CODEGEN_ENABLED, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
logWarning(
s"Property ${SQLConf.Deprecated.CODEGEN_ENABLED} is deprecated and " +
s"will be ignored. Codegen will continue to be used.")
Seq(Row(SQLConf.Deprecated.CODEGEN_ENABLED, "true"))
}
(keyValueOutput, runFunc)

case Some((SQLConf.Deprecated.UNSAFE_ENABLED, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
logWarning(
s"Property ${SQLConf.Deprecated.UNSAFE_ENABLED} is deprecated and " +
s"will be ignored. Unsafe mode will continue to be used.")
Seq(Row(SQLConf.Deprecated.UNSAFE_ENABLED, "true"))
}
(keyValueOutput, runFunc)

// Configures a single property.
case Some((key, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ trait HashJoin {
override def output: Seq[Attribute] = left.output ++ right.output

protected[this] def isUnsafeMode: Boolean = {
(self.codegenEnabled && self.unsafeEnabled
&& UnsafeProjection.canSupport(buildKeys)
&& UnsafeProjection.canSupport(self.schema))
UnsafeProjection.canSupport(buildKeys) && UnsafeProjection.canSupport(self.schema)
}

override def outputsUnsafeRows: Boolean = isUnsafeMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ trait HashOuterJoin {
}

protected[this] def isUnsafeMode: Boolean = {
(self.codegenEnabled && self.unsafeEnabled && joinType != FullOuter
&& UnsafeProjection.canSupport(buildKeys)
&& UnsafeProjection.canSupport(self.schema))
joinType != FullOuter &&
UnsafeProjection.canSupport(buildKeys) &&
UnsafeProjection.canSupport(self.schema)
}

override def outputsUnsafeRows: Boolean = isUnsafeMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ trait HashSemiJoin {
override def output: Seq[Attribute] = left.output

protected[this] def supportUnsafe: Boolean = {
(self.codegenEnabled && self.unsafeEnabled
&& UnsafeProjection.canSupport(leftKeys)
&& UnsafeProjection.canSupport(rightKeys)
&& UnsafeProjection.canSupport(left.schema)
&& UnsafeProjection.canSupport(right.schema))
UnsafeProjection.canSupport(leftKeys) &&
UnsafeProjection.canSupport(rightKeys) &&
UnsafeProjection.canSupport(left.schema) &&
UnsafeProjection.canSupport(right.schema)
}

override def outputsUnsafeRows: Boolean = supportUnsafe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,9 @@ case class SortMergeJoin(
requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil

protected[this] def isUnsafeMode: Boolean = {
(codegenEnabled && unsafeEnabled
&& UnsafeProjection.canSupport(leftKeys)
&& UnsafeProjection.canSupport(rightKeys)
&& UnsafeProjection.canSupport(schema))
UnsafeProjection.canSupport(leftKeys) &&
UnsafeProjection.canSupport(rightKeys) &&
UnsafeProjection.canSupport(schema)
}

override def outputsUnsafeRows: Boolean = isUnsafeMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,9 @@ case class SortMergeOuterJoin(
}

private def isUnsafeMode: Boolean = {
(codegenEnabled && unsafeEnabled
&& UnsafeProjection.canSupport(leftKeys)
&& UnsafeProjection.canSupport(rightKeys)
&& UnsafeProjection.canSupport(schema))
UnsafeProjection.canSupport(leftKeys) &&
UnsafeProjection.canSupport(rightKeys) &&
UnsafeProjection.canSupport(schema)
}

override def outputsUnsafeRows: Boolean = isUnsafeMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ trait HashJoinNode {
private[this] var joinKeys: Projection = _

protected def isUnsafeMode: Boolean = {
(codegenEnabled &&
unsafeEnabled &&
UnsafeProjection.canSupport(schema) &&
UnsafeProjection.canSupport(streamedKeys))
UnsafeProjection.canSupport(schema) && UnsafeProjection.canSupport(streamedKeys)
}

private def streamSideKeyGenerator: Projection = {
Expand Down
Loading