Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -868,29 +868,27 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue

override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
(0 to 2).foreach { fallbackStartsAt =>
sqlContext.setConf(
"spark.sql.TungstenAggregate.testFallbackStartsAt",
fallbackStartsAt.toString)

// Create a new df to make sure its physical operator picks up
// spark.sql.TungstenAggregate.testFallbackStartsAt.
// todo: remove it?
val newActual = DataFrame(sqlContext, actual.logicalPlan)

QueryTest.checkAnswer(newActual, expectedAnswer) match {
case Some(errorMessage) =>
val newErrorMessage =
s"""
|The following aggregation query failed when using TungstenAggregate with
|controlled fallback (it falls back to sort-based aggregation once it has processed
|$fallbackStartsAt input rows). The query is
|${actual.queryExecution}
|
|$errorMessage
""".stripMargin

fail(newErrorMessage)
case None =>
withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" -> fallbackStartsAt.toString) {
// Create a new df to make sure its physical operator picks up
// spark.sql.TungstenAggregate.testFallbackStartsAt.
// todo: remove it?
val newActual = DataFrame(sqlContext, actual.logicalPlan)

QueryTest.checkAnswer(newActual, expectedAnswer) match {
case Some(errorMessage) =>
val newErrorMessage =
s"""
|The following aggregation query failed when using TungstenAggregate with
|controlled fallback (it falls back to sort-based aggregation once it has processed
|$fallbackStartsAt input rows). The query is
|${actual.queryExecution}
|
|$errorMessage
""".stripMargin

fail(newErrorMessage)
case None =>
}
}
}
}
Expand Down