Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
01e4cdf
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 13, 2015
6835704
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
9180687
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
b38a21e
SPARK-11633
gatorsmile Nov 17, 2015
d2b84af
Merge remote-tracking branch 'upstream/master' into joinMakeCopy
gatorsmile Nov 17, 2015
fda8025
Merge remote-tracking branch 'upstream/master'
gatorspark Nov 17, 2015
ac0dccd
Merge branch 'master' of https://github.com/gatorsmile/spark
gatorspark Nov 17, 2015
6e0018b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0546772
converge
gatorsmile Nov 20, 2015
b37a64f
converge
gatorsmile Nov 20, 2015
c2a872c
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
ab6dbd7
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
4276356
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
2dab708
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 7, 2016
0458770
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 8, 2016
1debdfa
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 9, 2016
763706d
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 14, 2016
4de6ec1
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 18, 2016
9422a4f
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 19, 2016
52bdf48
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 20, 2016
1e95df3
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 23, 2016
fab24cf
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 1, 2016
8b2e33b
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 5, 2016
2ee1876
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 11, 2016
b9f0090
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 12, 2016
ade6f7e
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 15, 2016
9fd63d2
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 19, 2016
5199d49
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 22, 2016
404214c
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 23, 2016
c001dd9
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 25, 2016
59daa48
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 5, 2016
41d5f64
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 7, 2016
472a6e3
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 10, 2016
0fba10a
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 12, 2016
797aabb
When doing the test, issue an exception if hitting the max iteration.
gatorsmile Mar 15, 2016
98cab44
Merge remote-tracking branch 'upstream/master' into maxIterationExcep…
gatorsmile Mar 15, 2016
f351362
address comments.
gatorsmile Mar 15, 2016
1281f36
code clean
gatorsmile Mar 15, 2016
23663fe
update comments.
gatorsmile Mar 15, 2016
cbf73b3
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 21, 2016
c08f561
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 22, 2016
474df88
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 22, 2016
23a6cfc
Merge branch 'maxIterationException' into maxIterationExceptionNew
gatorsmile Mar 22, 2016
48b2c32
address comments.
gatorsmile Mar 23, 2016
1a59fcf
clean code.
gatorsmile Mar 23, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import scala.collection.JavaConverters._
import com.google.common.util.concurrent.AtomicLongMap

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.util.Utils

object RuleExecutor {
protected val timeMap = AtomicLongMap.create[String]()
Expand All @@ -46,15 +48,24 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
/**
* An execution strategy for rules that indicates the maximum number of executions. If the
* execution reaches fix point (i.e. converge) before maxIterations, it will stop.
* If throwsExceptionUponMaxIterations is equal to true, it will issue an exception
* TreeNodeException.
*/
abstract class Strategy { def maxIterations: Int }
abstract class Strategy {
def maxIterations: Int
def throwsExceptionUponMaxIterations: Boolean
}

/** A strategy that only runs once. */
case object Once extends Strategy { val maxIterations = 1 }
case object Once extends Strategy {
override val maxIterations = 1
override val throwsExceptionUponMaxIterations = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this kind of janky to insert a field into the non-test code just to get the behavior you want in a test? how about mocking subclasses?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen This is used in multiple components, e.g., Analyzer and Optimizer. I am not sure how to do it in a more efficient way. Could you give me some hints how we did in the Spark? Any example in the existing code base? Thanks!

}

/** A strategy that runs until fix point or maxIterations times, whichever comes first. */
case class FixedPoint(maxIterations: Int) extends Strategy

case class FixedPoint(maxIterations: Int) extends Strategy {
override val throwsExceptionUponMaxIterations: Boolean = Utils.isTesting
}
/** A batch of rules. */
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)

Expand Down Expand Up @@ -98,7 +109,12 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
if (iteration > batch.strategy.maxIterations) {
// Only log if this is a rule that is supposed to run more than once.
if (iteration != 2) {
logInfo(s"Max iterations (${iteration - 1}) reached for batch ${batch.name}")
val msg = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
if (batch.strategy.throwsExceptionUponMaxIterations) {
throw new TreeNodeException(curPlan, msg, null)
} else {
logTrace(msg)
}
}
continue = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.sql.catalyst.trees

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Expression, IntegerLiteral, Literal}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}

class RuleExecutorSuite extends SparkFunSuite {
Expand Down Expand Up @@ -46,9 +48,13 @@ class RuleExecutorSuite extends SparkFunSuite {

test("to maxIterations") {
object ToFixedPoint extends RuleExecutor[Expression] {
System.setProperty("spark.testing", "true")
val batches = Batch("fixedPoint", FixedPoint(10), DecrementLiterals) :: Nil
}

assert(ToFixedPoint.execute(Literal(100)) === Literal(90))
val message = intercept[TreeNodeException[LogicalPlan]] {
ToFixedPoint.execute(Literal(100))
}.getMessage
assert(message.contains("Max iterations (10) reached for batch fixedPoint"))
}
}