Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,20 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
}

test("Sort metrics") {
// Assume the execution plan is
// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1))
val ds = spark.range(10).sort('id)
testSparkPlanMetrics(ds.toDF(), 2, Map.empty)
// Assume the execution plan with node id is
// Sort(nodeId = 0)
// Exchange(nodeId = 1)
// Project(nodeId = 2)
// LocalTableScan(nodeId = 3)
// Because of SPARK-25267, ConvertToLocalRelation is disabled in the test cases of sql/core,
// so Project here is not collapsed into LocalTableScan.
val df = Seq(1, 3, 2).toDF("id").sort('id)
testSparkPlanMetricsWithPredicates(df, 2, Map(
0L -> (("Sort", Map(
"sort time total (min, med, max)" -> {_.toString.matches(timingMetricPattern)},
"peak memory total (min, med, max)" -> {_.toString.matches(sizeMetricPattern)},
"spill size total (min, med, max)" -> {_.toString.matches(sizeMetricPattern)})))
))
}

test("SortMergeJoin metrics") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ trait SQLMetricsTestUtils extends SQLTestUtils {

protected def statusStore: SQLAppStatusStore = spark.sharedState.statusStore

// Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)"
protected val sizeMetricPattern = {
val bytes = "([0-9]+(\\.[0-9]+)?) (EiB|PiB|TiB|GiB|MiB|KiB|B)"
s"\\n$bytes \\($bytes, $bytes, $bytes\\)"
}

// Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
protected val timingMetricPattern = {
val duration = "([0-9]+(\\.[0-9]+)?) (ms|s|m|h)"
s"\\n$duration \\($duration, $duration, $duration\\)"
}

/**
* Get execution metrics for the SQL execution and verify metrics values.
*
Expand Down Expand Up @@ -185,15 +197,34 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
df: DataFrame,
expectedNumOfJobs: Int,
expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
val optActualMetrics = getSparkPlanMetrics(df, expectedNumOfJobs, expectedMetrics.keySet)
val expectedMetricsPredicates = expectedMetrics.mapValues { case (nodeName, nodeMetrics) =>
(nodeName, nodeMetrics.mapValues(expectedMetricValue =>
(actualMetricValue: Any) => expectedMetricValue.toString === actualMetricValue))
}
testSparkPlanMetricsWithPredicates(df, expectedNumOfJobs, expectedMetricsPredicates)
}

/**
* Call `df.collect()` and verify if the collected metrics satisfy the specified predicates.
* @param df `DataFrame` to run
* @param expectedNumOfJobs number of jobs that will run
* @param expectedMetricsPredicates the expected metrics predicates. The format is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: go to 100 chars and the next line has a bad indentation

Copy link
Contributor Author

@seancxmao seancxmao Dec 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because usually metric values are numbers, so for metrics values, predicates could be more natural than regular expressions which are more suitable for text matching. For simple metric values, helper functions are not needed. However, timing and size metric values are a little complex:

  • timing metric value example: "\n96.2 MB (32.1 MB, 32.1 MB, 32.1 MB)"
  • size metric value example: "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"

With helper functions, we extract stats (by timingMetricStats or sizeMetricStats method), then we can apply predicates to check any stats (all stats or any single one). timingMetricAllStatsShould and sizeMetricAllStatsShould are not required, they are something like syntax sugar to eliminate boilerplate code since timing and size metrics are frequently used. If we want to check any single value (e.g sum >=0), we can provide a predicate like below:

timingMetricStats(_)(0)._1 >= 0

BTW, may be timing and size metric values should be stored in a more structured way rather than pure text format (even with "\n" in values).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indentation is not right. I have fixed it in the new commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my point is: as of now, pattern matching is enough for what we need to check and we do not have a use case when we actually need to parse the exact values. Doing that, we can simplify this PR and reduce considerably the size of this change. So I think we should go this way. If in the future we will need something like you proposed here because we want to check the actual values, then we can introduce all the methods you are suggesting here. But as of know this can be skipped IMO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does look like a load of additional code that I think duplicates some existing code in Utils? is it really necessary to make some basic assertions about metric values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 I agree. Thanks for your detailed and clear explanation. Checking metric values do make things unnecessarily complex.

@srowen As @mgaido91 said, currently it is not necessary to check metric values, pattern matching is enough, and we could eliminate these methods. As for code duplication, methods here are not duplicate with code in Utils. Utils provides a bunch of methods to do conversion between string and bytes, bytes there are of type Long. However bytes in metric values are of type Float, e.g. 96.2 MB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I have switched to pattern matching and also removed unnecessary helper methods in the new commit.

* `nodeId -> (operatorName, metric name -> metric predicate)`.
*/
protected def testSparkPlanMetricsWithPredicates(
df: DataFrame,
expectedNumOfJobs: Int,
expectedMetricsPredicates: Map[Long, (String, Map[String, Any => Boolean])]): Unit = {
val optActualMetrics =
getSparkPlanMetrics(df, expectedNumOfJobs, expectedMetricsPredicates.keySet)
optActualMetrics.foreach { actualMetrics =>
assert(expectedMetrics.keySet === actualMetrics.keySet)
for (nodeId <- expectedMetrics.keySet) {
val (expectedNodeName, expectedMetricsMap) = expectedMetrics(nodeId)
assert(expectedMetricsPredicates.keySet === actualMetrics.keySet)
for ((nodeId, (expectedNodeName, expectedMetricsPredicatesMap))
<- expectedMetricsPredicates) {
val (actualNodeName, actualMetricsMap) = actualMetrics(nodeId)
assert(expectedNodeName === actualNodeName)
for (metricName <- expectedMetricsMap.keySet) {
assert(expectedMetricsMap(metricName).toString === actualMetricsMap(metricName))
for ((metricName, metricPredicate) <- expectedMetricsPredicatesMap) {
assert(metricPredicate(actualMetricsMap(metricName)))
}
}
}
Expand Down