Skip to content

Conversation

@a-roberts
Copy link
Contributor

What changes were proposed in this pull request?

This change is very similar to my pull request for improving PartitionedPairAppendOnlyMap: #15735

Summarising (more detail above), we avoid the slow iterator wrapping in favour of helping the inliner. We observed that this, when combined with the above change, leads to a 3% performance increase on the HiBench large PageRank benchmark with both IBM's SDK for Java and with OpenJDK 8

How was this patch tested?

Existing unit tests and HiBench large profile with both IBM's SDK for Java and OpenJDK 8, the PageRank benchmark specifically

This change is very similar to my pull request or improving PartitionedPairAppendOnlyMap: #15735

Summarising (more detail above), we avoid the slow iterator wrapping in favour of helping the inliner. We observed that this, when combined with the above change, leads to a 3% performance increase on the HiBench large PageRank benchmark with both IBM's SDK for Java and with OpenJDK 8
@SparkQA
Copy link

SparkQA commented Nov 2, 2016

Test build #67984 has finished for PR 15736 at commit 0d1411c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

} else
new Comparator[(Int, K)] {
override def compare(a: (Int, K), b: (Int, K)): Int = {
val partitionDiff = a._1 - b._1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some indentation problems here and the else clause is missing a brace. I think you can omit the type of comparator; no space before the colon in any event.

This subtraction can overflow in theory and give the wrong answer, but the existing code does it, so, pass on that.

While optimizing, do you want to call keyComparator.get outside the class definition?

There's a similar construct in PartitionedAppendOnlyMap that should be changed too. Can this be refactored maybe?

Can the method partitionKeyComparator go away? I think the whole WritablePartitionedPairCollection object goes away after this if you care to 'inline' it too in the one refactored instance.

@a-roberts
Copy link
Contributor Author

Will be adding the commit from #15735 here upon addressing the feedback

Inline benefit with this approach as we avoid the bad iterator wrapping
@a-roberts
Copy link
Contributor Author

Addressed the scalastyle comments and added the PartitionedAppendOnlyMap change here as per the above suggestions, will look at the review comments next

Two unrelated asides

  1. wary I'm hogging the build machines, would be useful to not autotest everytime
  2. dev/scalastyle should accept a parameter so we can quickly check just the one file, takes a long time typically

@SparkQA
Copy link

SparkQA commented Nov 2, 2016

Test build #67986 has finished for PR 15736 at commit a3d85b6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

partitionDiff
} else {
keyComparator.get.compare(a._2, b._2)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can dereference the option to avoid get in inner loop

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think Github collapsed it but there's this and other suggestions at ... #15736 (review)

@SparkQA
Copy link

SparkQA commented Nov 2, 2016

Test build #67987 has finished for PR 15736 at commit af4aea3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Nov 5, 2016

To recap some of my feedback here, I think this will be a fine change but it can be refactored further.

I think we can refactor this logic that appears twice in one place, perhaps in object WritablePartitionedPairCollection? that's where its support code currently lives, anyway.

Since the two methods there are only used by these call sites that are changing, they can be 'inlined' into the one common implementation, which might open up more optimization.

It'd be nice to fix the subtraction issue while we're here unless someone is convinced the difference can never overflow.

keyComparator.get can be lifted out of the compare() method.

@srowen
Copy link
Member

srowen commented Nov 9, 2016

@a-roberts this does look like a worthy change, what do you think of the further simplifications here?

@a-roberts
Copy link
Contributor Author

Sean, they are great suggestions, thanks -- I'll find the time (like for the other outstanding pull requests) to get your feedback integrated, tested and profiled, currently caught up in packaging our own Apache Spark releases for both 1.6.3 and 2.0.2. I also have a JIRA to create proposing regular performance runs using the latest Spark snapshot builds to track regressions (I have this all set up with scripts already)

@srowen
Copy link
Member

srowen commented Nov 19, 2016

I know you're busy but this does look like a good change to finish off. That it's a win is self-evident, just a question of how much, and benchmarks you have already show it is an improvement. I can take it on (credit remains with you) or will just wait if you're getting back to it.

@a-roberts
Copy link
Contributor Author

I'm resuming the work for all of these related PRs again this week after the London Spark meetup on Wednesday, if you are keen to take it on I'm more than happy to help out and will share some information here that yourself and others should find useful.

Useful tools
As well as simple microbenchmarking we use the Linux perf tools, tprof with Visual Performance Analyzer and also IBM's Healthcenter for Java for method profiling (this is bundled with the JDK and you provide -Xhealthcenter as a driver/executor option then open the files in said tool).

Benchmarks
We'd want to run our improvement ideas with and without the changes using HiBench 6 (large profile) and SparkSqlPerf against all 100 TPCDS queries.

@a-roberts
Copy link
Contributor Author

Back to working on the performance related JIRAs now, so based on the above helpful comments here's what I'll do

Remove the .get.compare from the loop as suggested above - we'll do a .get upfront to get our comparator to use, eliminating the .get later

Move the duplicated code into the WritablePartitionedPairCollection object so the two methods optimised here will call the above new method (let's say it's called getComparator) before returning accordingly (both methods are the same apart from the final few lines).

PartitionedAppendOnlyMap returns

destructiveSortedIterator(comparator)

and PartitionedPairBuffer returns:

new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)
iterator

I'll then build/test/profile this again

}
}

/* Takes an optional parameter (keyComparator), use if provided
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc/scaladoc starts with /** and usually you leave that alone on one line and start documentation on the next.

* and returns a comparator for the partitions
*/
def getComparator[K](keyComparator: Option[Comparator[K]]) : Comparator[(Int, K)] = {
val comparator : Comparator[(Int, K)] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comparator is now entirely redundant. The whole body is just the if statement

: Iterator[((Int, K), V)] = {
val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)
new Sorter(new KVArraySortDataFormat[(Int, K),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think breaking the line here is odd. If necesasry, pull out the result of getComparator to a statement above to shorten this line, like it was before.

} else {
new Comparator[(Int, K)] {
// We know we have a non-empty comparator here
val ourKeyComp = keyComparator.get
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be outside the body of the anonymous class. You don't need a reference to the Option here even in the anonymous class.

/* Takes an optional parameter (keyComparator), use if provided
* and returns a comparator for the partitions
*/
def getComparator[K](keyComparator: Option[Comparator[K]]) : Comparator[(Int, K)] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: no space before colon

*/
def getComparator[K](keyComparator: Option[Comparator[K]]) : Comparator[(Int, K)] = {
val comparator : Comparator[(Int, K)] =
if (keyComparator.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isDefined is probably a tiny bit more conventional (and then flip the logic here of course)

comparator
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can delete partitionKeyComparator below now, right?

def getComparator[K](keyComparator: Option[Comparator[K]]) : Comparator[(Int, K)] = {
val comparator : Comparator[(Int, K)] =
if (keyComparator.isEmpty) {
partitionComparator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be inlined now

// We know we have a non-empty comparator here
val ourKeyComp = keyComparator.get
override def compare(a: (Int, K), b: (Int, K)): Int = {
val partitionDiff = a._1 - b._1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not thrilled about the subtraction here but maybe leave it for now

@SparkQA
Copy link

SparkQA commented Nov 25, 2016

Test build #69164 has finished for PR 15736 at commit fc8f98e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 25, 2016

Test build #69165 has finished for PR 15736 at commit d342394.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 25, 2016

Test build #69172 has finished for PR 15736 at commit 53ed170.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@a-roberts
Copy link
Contributor Author

I've conducted a lot of performance tests and gathered .hcd files so I can investigate this next week, but it looks like either the first commit is the best for performance or my current configuration with this benchmark results in us being unable to infer if our changes really make a difference.

Sharing some raw data, the format is as follows.

Benchmark name, date, time, data size in bytes (the same each run), the elapsed time and the throughput (bytes per second).

With the above suggestions for Partitioned*Buffer

ScalaSparkPagerank 2016-11-25 18:49:23 259928115            49.577               5242917              
ScalaSparkPagerank 2016-11-25 18:56:55 259928115            49.946               5204182              
ScalaSparkPagerank 2016-11-25 19:00:04 259928115            46.510               5588650              
ScalaSparkPagerank 2016-11-25 19:02:23 259928115            49.018               5302707              
ScalaSparkPagerank 2016-11-25 19:05:25 259928115            49.270               5275585              

Vanilla, no changes at all

ScalaSparkPagerank 2016-11-25 19:08:45 259928115            48.068               5407508              
ScalaSparkPagerank 2016-11-25 19:11:20 259928115            47.712               5447856              
ScalaSparkPagerank 2016-11-25 19:13:50 259928115            44.517               5838850              
ScalaSparkPagerank 2016-11-25 19:16:07 259928115            49.942               5204599              
ScalaSparkPagerank 2016-11-25 19:19:08 259928115            48.521               5357023              

Original commit

ScalaSparkPagerank 2016-11-25 19:47:59 259928115            45.486               5714464              
ScalaSparkPagerank 2016-11-25 19:50:48 259928115            48.507               5358569              
ScalaSparkPagerank 2016-11-25 19:53:09 259928115            47.063               5522982              
ScalaSparkPagerank 2016-11-25 19:56:58 259928115            46.154               5631757              
ScalaSparkPagerank 2016-11-25 20:00:01 259928115            48.935               5311701        

In Healthcenter I do see that these methods are still great candidates for optimisation as they are all very commonly used.

Open to more suggestions, I have exclusive access to lots of hardware, can easily churn out more custom builds and have lots of profiling software we can use. I'll be committing code for the SizeEstimator soon as that's a good candidate for optimisation here as well.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I don't see why this would be slower than the original version. It should be nearly identical anyway or better, as it further inlines a few things. It could be some weird interactions with the JIT and benchmark or whatever, or maybe some difference in how it was tested.

Try one more round of changes here and benchmark again. In any event it would be worthwhile just for the code streamlining.

keyComparator.compare(a._2, b._2)
def getComparator[K](keyComparator: Option[Comparator[K]]): Comparator[(Int, K)] = {
if (!keyComparator.isDefined) return partitionComparator
else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style is off here -- you need braces in both clauses, return is redundant, and there's no point in inverting the condition as opposed to just flipping the clauses.

} else {
keyComparator.compare(a._2, b._2)
def getComparator[K](keyComparator: Option[Comparator[K]]): Comparator[(Int, K)] = {
if (!keyComparator.isDefined) return partitionComparator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline and remove partitionComparator, as I think it's not used

// We know we have a non-empty comparator here
override def compare(a: (Int, K), b: (Int, K)): Int = {
val partitionDiff = a._1 - b._1
if (partitionDiff != 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably very very slightly better to say

if (a._1 == b._1) {
  theKeyComp.compare(a._2, b._2)
} else {
  a._1 - b._1
}

@a-roberts
Copy link
Contributor Author

Before progressing I've discussed what I'm seeing with our JIT compiler team, with the refactoring to reduce code duplication, the following occurs which solves some of the mystery -- although it's bad news as, like you, I wanted to remove the duplicate method.

Summarising:

By having both of these classes share the getComparator method one level up in the hierarchy, the JIT profiling won't function as expected.

Let's assume we have A (calling method) -> getComparator -> B (returns the comparator) and then you have C (another calling method) -> getComparator -> D (returns the comparator).

A and B are the actual methods calling getComparator. B and D are the comparators that are passed in.

As a JIT compiler if we profile getComparator on its I will see it calling B and calling D since profiling in most JITs is context insensitive and we think that's happening here.

When we inline from A into getComparator and C into getComparator, I don't know if I should inline B or D, and given that inlining is critical for performance we see the slight drop in performance. Inlining is critical for eliminating call overheads, improving code locality, and the scope for optimisation.

@srowen
Copy link
Member

srowen commented Nov 29, 2016

If that's true then again doesn't my suggestion to inline partitionComparator fix it?

@a-roberts
Copy link
Contributor Author

@srowen how about this for profiling?

private[spark] object WritablePartitionedPairCollection {
  /**
   * Takes an optional parameter (keyComparator), use if provided
   * and returns a comparator for the partitions
   */
  def getComparator[K](keyComparator: Option[Comparator[K]]): Comparator[(Int, K)] = {
    if (keyComparator.isDefined) {
      val theKeyComp = keyComparator.get
      new Comparator[(Int, K)] {
        // We know we have a non-empty comparator here
        override def compare(a: (Int, K), b: (Int, K)): Int = {
          if (a._1 == b._1) {
            theKeyComp.compare(a._2, b._2)
          } else {
            a._1 - b._1
          }
        }
      }
    } else return new Comparator[(Int, K)] {
      override def compare(a: (Int, K), b: (Int, K)): Int = {
        a._1 - b._1
      }
    }
  }
}

@srowen
Copy link
Member

srowen commented Nov 30, 2016

Looks right except you just want to write

if (...) {
  ...
} else {
  new Comparator...
}

@a-roberts
Copy link
Contributor Author

Good point, done, I can get profiling the below code then? Builds fine and no scalastyle problems

  def getComparator[K](keyComparator: Option[Comparator[K]]): Comparator[(Int, K)] = {
    if (keyComparator.isDefined) {
      val theKeyComp = keyComparator.get
      new Comparator[(Int, K)] {
        // We know we have a non-empty comparator here
        override def compare(a: (Int, K), b: (Int, K)): Int = {
          if (a._1 == b._1) {
            theKeyComp.compare(a._2, b._2)
          } else {
            a._1 - b._1
          }
        }
      }
    } else {
      new Comparator[(Int, K)] {
        override def compare(a: (Int, K), b: (Int, K)): Int = {
          a._1 - b._1
        }
      }
    }
  }

@mridulm
Copy link
Contributor

mridulm commented Nov 30, 2016

(Particularly) as the number of partitions increase, "if (a._1 != b._1)" might be better for bpt reasons.

@a-roberts
Copy link
Contributor Author

I see, so doing the != comparison first which is likely to be true more of the time so we're not consistently failing this check then entering the else

  def getComparator[K](keyComparator: Option[Comparator[K]]): Comparator[(Int, K)] = {
    if (keyComparator.isDefined) {
      val theKeyComp = keyComparator.get
      new Comparator[(Int, K)] {
        // We know we have a non-empty comparator here
        override def compare(a: (Int, K), b: (Int, K)): Int = {
          if (a._1 != b._1) {
           a._1 - b._1
          } else {
           theKeyComp.compare(a._2, b._2)
          }
        }
      }
    } else {
      new Comparator[(Int, K)] {
        override def compare(a: (Int, K), b: (Int, K)): Int = {
          a._1 - b._1
        }
      }
    }
  }

Again that builds fine

@srowen
Copy link
Member

srowen commented Nov 30, 2016

Hm why does the order matter - maybe helps branch prediction? I doubt we even know how the bytecode orders this let alone how it is JITted and whether it will gather branching info on this one branch. Either way. I usually prefer == for code clarity all else equal. No need to benchmark both just pick one.

@a-roberts
Copy link
Contributor Author

Passed on your question to our JIT developers

The sense* of the test can have an impact of the VM interpreter performance, but that is not usually much of a component of actual throughput since important methods will be JIT'd very quickly regardless of which specific JVM you use. The J9 VM is capable of profiling the branch and flipping the sense when JITing the code

The sense refers to the way a code branches, so either down the equals branch or not equals branch

Numbers for us

Refactored further as above

ScalaSparkPagerank 2016-11-30 14:27:49 259928115            49.841               5215146
ScalaSparkPagerank 2016-11-30 14:29:52 259928115            51.310               5065837
ScalaSparkPagerank 2016-11-30 14:31:59 259928115            52.086               4990364
ScalaSparkPagerank 2016-11-30 14:34:05 259928115            50.667               5130126
ScalaSparkPagerank 2016-11-30 14:36:04 259928115            47.096               5519112
ScalaSparkPagerank 2016-11-30 14:38:04 259928115            48.244               5387781
ScalaSparkPagerank 2016-11-30 14:40:10 259928115            48.734               5333609
ScalaSparkPagerank 2016-11-30 14:42:12 259928115            49.295               5272910
397.273 / 8 = 49.659 sec average

initial commit

ScalaSparkPagerank 2016-11-30 14:48:01 259928115            46.442               5596832
ScalaSparkPagerank 2016-11-30 14:50:06 259928115            50.016               5196899
ScalaSparkPagerank 2016-11-30 14:52:12 259928115            51.113               5085362
ScalaSparkPagerank 2016-11-30 14:54:12 259928115            46.424               5599002
ScalaSparkPagerank 2016-11-30 14:56:15 259928115            47.604               5460215
ScalaSparkPagerank 2016-11-30 14:58:14 259928115            46.802               5553782
ScalaSparkPagerank 2016-11-30 15:00:16 259928115            47.021               5527915
ScalaSparkPagerank 2016-11-30 15:02:16 259928115            47.072               5521926
382.494 / 8 = 47.811s average

The first commit performs better on average, I'd like to next add the improved compare code as above and "push this down" into the subclasses to see how this performs

@srowen
Copy link
Member

srowen commented Dec 4, 2016

I'd certainly be curious to see a benchmark of the 'final' version with inlined comparator. I would honestly be surprised if that's not fastest of all.

@a-roberts
Copy link
Contributor Author

New data for us, inlined comparator scores here (code provided below to check I've not profiled something useless!):

ScalaSparkPagerank 2016-12-05 13:44:41 259928115            48.149               5398411              5398411
ScalaSparkPagerank 2016-12-05 13:46:43 259928115            46.897               5542531              5542531
ScalaSparkPagerank 2016-12-05 13:48:46 259928115            49.130               5290619              5290619
ScalaSparkPagerank 2016-12-05 13:50:49 259928115            49.793               5220173              5220173
ScalaSparkPagerank 2016-12-05 13:52:50 259928115            48.061               5408296              5408296
ScalaSparkPagerank 2016-12-05 13:54:52 259928115            46.468               5593701              5593701
ScalaSparkPagerank 2016-12-05 13:56:56 259928115            51.385               5058443              5058443
ScalaSparkPagerank 2016-12-05 13:58:59 259928115            47.857               5431349              5431349
ScalaSparkPagerank 2016-12-05 14:00:59 259928115            46.515               5588049              5588049
ScalaSparkPagerank 2016-12-05 14:03:03 259928115            47.791               5438850              5438850
Avg 48.2046s

Remember our "vanilla" average time is 47.752s and our first commit averaged 47.229s (so not much of a difference really).

I think we're splitting hairs and I've got another PR I am seeing good results on that I plan to focus on instead: the SizeEstimator.

This is what I've benchmarked, PartitionedAppendOnlyMap first, so let me know if there any further suggestions, otherwise I propose leaving this one for later as actually against the Spark master codebase I'm not noticing anything exciting.

  def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
    : Iterator[((Int, K), V)] = {
    val comparator = {
      if (keyComparator.isDefined) {
        val theKeyComp = keyComparator.get
        new Comparator[(Int, K)] {
          // We know we have a non-empty comparator here
          override def compare(a: (Int, K), b: (Int, K)): Int = {
            if (a._1 != b._1) {
              a._1 - b._1
            } else {
             theKeyComp.compare(a._2, b._2)
            }
          }
        }
      } else {
        new Comparator[(Int, K)] {
          override def compare(a: (Int, K), b: (Int, K)): Int = {
            a._1 - b._1
          }
        }
      }
    }
    destructiveSortedIterator(comparator)
  }

In PartitionedPairBuffer


  /** Iterate through the data in a given order. For this class this is not really destructive. */
  override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
    : Iterator[((Int, K), V)] = {
    val comparator = {
      if (keyComparator.isDefined) {
        val theKeyComp = keyComparator.get
        new Comparator[(Int, K)] {
          // We know we have a non-empty comparator here
          override def compare(a: (Int, K), b: (Int, K)): Int = {
            if (a._1 != b._1) {
              a._1 - b._1
            } else {
             theKeyComp.compare(a._2, b._2)
            }
          }
        }
      } else {
        new Comparator[(Int, K)] {
          override def compare(a: (Int, K), b: (Int, K)): Int = {
            a._1 - b._1
          }
        }
      }
    }
    new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)
    iterator
  }

WritablePartitionedPairCollection remains unchanged.

@srowen
Copy link
Member

srowen commented Dec 6, 2016

It does seem like nice cleanup in any event. I am not sure why the first commit was faster as this seems like a 'superset' of optimization. We can't use that one in any event. If you want to update the PR with what you posted above, I think it'd be OK to commit just for the code simplification.

@srowen
Copy link
Member

srowen commented Dec 11, 2016

@a-roberts let's either finish the thought and merge this as mostly a code cleanup and maybe marginal win, or just close it.

@srowen
Copy link
Member

srowen commented Dec 19, 2016

Ping @a-roberts to resolve this

@srowen
Copy link
Member

srowen commented Dec 30, 2016

I'm going to manually close this

srowen added a commit to srowen/spark that referenced this pull request Feb 2, 2017
@srowen srowen mentioned this pull request Feb 2, 2017
@asfgit asfgit closed this in 20b4ca1 Feb 3, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants