From 3b98a0f6bbcf6bd56341276f1cdb20e32a743faf Mon Sep 17 00:00:00 2001 From: maryannxue Date: Wed, 12 Sep 2018 16:32:12 -0500 Subject: [PATCH] [SPARK-25415][SQL] Make plan change log in RuleExecutor configurable by SQLConf --- .../sql/catalyst/rules/RuleExecutor.scala | 33 +++- .../apache/spark/sql/internal/SQLConf.scala | 24 +++ .../optimizer/OptimizerLoggingSuite.scala | 148 ++++++++++++++++++ 3 files changed, 200 insertions(+), 5 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index dccb44ddebfa4..183be5a027ec5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -21,6 +21,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.sideBySide +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils object RuleExecutor { @@ -72,6 +73,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { def execute(plan: TreeType): TreeType = { var curPlan = plan val queryExecutionMetrics = RuleExecutor.queryExecutionMeter + val planChangeLogger = new PlanChangeLogger() batches.foreach { batch => val batchStartPlan = curPlan @@ -90,11 +92,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { if (!result.fastEquals(plan)) { queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName) queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime) - logTrace( - s""" - |=== Applying Rule ${rule.ruleName} === - |${sideBySide(plan.treeString, result.treeString).mkString("\n")} - """.stripMargin) + planChangeLogger.log(rule.ruleName, plan, result) } queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime) queryExecutionMetrics.incNumExecution(rule.ruleName) @@ -143,4 +141,29 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { curPlan } + + private class PlanChangeLogger { + + private val logLevel = SQLConf.get.optimizerPlanChangeLogLevel.toUpperCase + + private val logRules = SQLConf.get.optimizerPlanChangeRules.map(Utils.stringToSeq) + + def log(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = { + if (logRules.isEmpty || logRules.get.contains(ruleName)) { + lazy val message = + s""" + |=== Applying Rule ${ruleName} === + |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")} + """.stripMargin + logLevel match { + case "TRACE" => logTrace(message) + case "DEBUG" => logDebug(message) + case "INFO" => logInfo(message) + case "WARN" => logWarning(message) + case "ERROR" => logError(message) + case _ => logTrace(message) + } + } + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 738d8fee891d1..4928560eacb1c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -171,6 +171,26 @@ object SQLConf { .intConf .createWithDefault(10) + val OPTIMIZER_PLAN_CHANGE_LOG_LEVEL = buildConf("spark.sql.optimizer.planChangeLog.level") + .internal() + .doc("Configures the log level for logging the change from the original plan to the new " + + "plan after a rule is applied. The value can be 'trace', 'debug', 'info', 'warn', or " + + "'error'. The default log level is 'trace'.") + .stringConf + .checkValue( + str => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(str.toUpperCase), + "Invalid value for 'spark.sql.optimizer.planChangeLog.level'. Valid values are " + + "'trace', 'debug', 'info', 'warn' and 'error'.") + .createWithDefault("trace") + + val OPTIMIZER_PLAN_CHANGE_LOG_RULES = buildConf("spark.sql.optimizer.planChangeLog.rules") + .internal() + .doc("If this configuration is set, the optimizer will only log plan changes caused by " + + "applying the rules specified in this configuration. The value can be a list of rule " + + "names separated by comma.") + .stringConf + .createOptional + val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed") .doc("When set to true Spark SQL will automatically select a compression codec for each " + "column based on statistics of the data.") @@ -1570,6 +1590,10 @@ class SQLConf extends Serializable with Logging { def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD) + def optimizerPlanChangeLogLevel: String = getConf(OPTIMIZER_PLAN_CHANGE_LOG_LEVEL) + + def optimizerPlanChangeRules: Option[String] = getConf(OPTIMIZER_PLAN_CHANGE_LOG_RULES) + def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS) def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala new file mode 100644 index 0000000000000..915f408089fe9 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.log4j.{Appender, AppenderSkeleton, Level, Logger} +import org.apache.log4j.spi.LoggingEvent + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.internal.SQLConf + +class OptimizerLoggingSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = Batch("Optimizer Batch", FixedPoint(100), + PushDownPredicate, + ColumnPruning, + CollapseProject) :: Nil + } + + class MockAppender extends AppenderSkeleton { + val loggingEvents = new ArrayBuffer[LoggingEvent]() + + override def append(loggingEvent: LoggingEvent): Unit = { + if (loggingEvent.getRenderedMessage().contains("Applying Rule")) { + loggingEvents.append(loggingEvent) + } + } + + override def close(): Unit = {} + override def requiresLayout(): Boolean = false + } + + private def withLogLevelAndAppender(level: Level, appender: Appender)(f: => Unit): Unit = { + val logger = Logger.getLogger(Optimize.getClass.getName.dropRight(1)) + val restoreLevel = logger.getLevel + logger.setLevel(level) + logger.addAppender(appender) + try f finally { + logger.setLevel(restoreLevel) + logger.removeAppender(appender) + } + } + + private def verifyLog(expectedLevel: Level, expectedRules: Seq[String]): Unit = { + val logAppender = new MockAppender() + withLogLevelAndAppender(Level.TRACE, logAppender) { + val input = LocalRelation('a.int, 'b.string, 'c.double) + val query = input.select('a, 'b).select('a).where('a > 1).analyze + val expected = input.where('a > 1).select('a).analyze + comparePlans(Optimize.execute(query), expected) + } + val logMessages = logAppender.loggingEvents.map(_.getRenderedMessage) + assert(expectedRules.forall(rule => logMessages.exists(_.contains(rule)))) + assert(logAppender.loggingEvents.forall(_.getLevel == expectedLevel)) + } + + test("test log level") { + val levels = Seq( + "TRACE" -> Level.TRACE, + "trace" -> Level.TRACE, + "DEBUG" -> Level.DEBUG, + "debug" -> Level.DEBUG, + "INFO" -> Level.INFO, + "info" -> Level.INFO, + "WARN" -> Level.WARN, + "warn" -> Level.WARN, + "ERROR" -> Level.ERROR, + "error" -> Level.ERROR, + "deBUG" -> Level.DEBUG) + + levels.foreach { level => + withSQLConf(SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> level._1) { + verifyLog( + level._2, + Seq( + PushDownPredicate.ruleName, + ColumnPruning.ruleName, + CollapseProject.ruleName)) + } + } + } + + test("test invalid log level conf") { + val levels = Seq( + "", + "*d_", + "infoo") + + levels.foreach { level => + val error = intercept[IllegalArgumentException] { + withSQLConf(SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> level) {} + } + assert(error.getMessage.contains( + "Invalid value for 'spark.sql.optimizer.planChangeLog.level'.")) + } + } + + test("test log rules") { + val rulesSeq = Seq( + Seq(PushDownPredicate.ruleName, + ColumnPruning.ruleName, + CollapseProject.ruleName).reduce(_ + "," + _) -> + Seq(PushDownPredicate.ruleName, + ColumnPruning.ruleName, + CollapseProject.ruleName), + Seq(PushDownPredicate.ruleName, + ColumnPruning.ruleName).reduce(_ + "," + _) -> + Seq(PushDownPredicate.ruleName, + ColumnPruning.ruleName), + CollapseProject.ruleName -> + Seq(CollapseProject.ruleName), + Seq(ColumnPruning.ruleName, + "DummyRule").reduce(_ + "," + _) -> + Seq(ColumnPruning.ruleName), + "DummyRule" -> Seq(), + "" -> Seq() + ) + + rulesSeq.foreach { case (rulesConf, expectedRules) => + withSQLConf( + SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_RULES.key -> rulesConf, + SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> "INFO") { + verifyLog(Level.INFO, expectedRules) + } + } + } +}