Skip to content

Conversation

@caneGuy
Copy link
Contributor

@caneGuy caneGuy commented Jul 25, 2017

What changes were proposed in this pull request?

Right now, ChunkedByteBuffer#writeFully do not slice bytes first.We observe code in java nio Util#getTemporaryDirectBuffer below:

    BufferCache cache = bufferCache.get();
    ByteBuffer buf = cache.get(size);
    if (buf != null) {
        return buf;
    } else {
        // No suitable buffer in the cache so we need to allocate a new
        // one. To avoid the cache growing then we remove the first
        // buffer from the cache and free it.
        if (!cache.isEmpty()) {
            buf = cache.removeFirst();
            free(buf);
        }
        return ByteBuffer.allocateDirect(size);
    }

If we slice first with a fixed size, we can use buffer cache and only need to allocate at the first write call.
Since we allocate new buffer, we can not control the free time of this buffer.This once cause memory issue in our production cluster.
In this patch, i supply a new api which will slice with fixed size for buffer writing.

How was this patch tested?

Unit test and test in production.

@caneGuy caneGuy closed this Jul 26, 2017
@caneGuy caneGuy deleted the zhoukang/improve-chunkwrite branch July 26, 2017 02:44
@caneGuy caneGuy restored the zhoukang/improve-chunkwrite branch August 22, 2017 07:33
@caneGuy caneGuy reopened this Aug 22, 2017
@cloud-fan
Copy link
Contributor

ok to test

@cloud-fan
Copy link
Contributor

looks reasonable, do you have some performance numbers?


/**
* Write this buffer to a channel with slice.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use this one to replace writeFully?

@SparkQA
Copy link

SparkQA commented Aug 23, 2017

Test build #81018 has finished for PR 18730 at commit 7cbadc5.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 23, 2017

Test build #81020 has finished for PR 18730 at commit fc91f96.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@caneGuy
Copy link
Contributor Author

caneGuy commented Aug 23, 2017

@cloud-fan Thanks for your time to review this pr.Actually,there is an application in our cluster always failed with direct memory oom.I have not measured performance with some benchmark test,but this feature has run in our cluster for a long time(since last year) and i observed this has not caused any performance issue.By the way, application with direct memory issue caused by 'BufferCache' out of control has not occur since then.

@SparkQA
Copy link

SparkQA commented Aug 23, 2017

Test build #81023 has finished for PR 18730 at commit fc91f96.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

It would be great to benchmark this improvement, otherwise we are not sure there is no regression.

def writeWithSlice(channel: WritableByteChannel): Unit = {
for (bytes <- getChunks()) {
val capacity = bytes.limit()
while (bytes.position() < capacity) {
Copy link
Member

@kiszk kiszk Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we replace Math.min(...) with Math.min(capacity, bytes.position + NIO_BUFFER_LIMIT.toLong)? I am afraid about int underflow. For example, if capacity = 0x7FFFFFF0 and bytes.position = 0x7FFFFF00, the result of bytes.position + NIO_BUFFER_LIMIT.toInt is negative (i.e. greater than 0x80000000).
To avoid this underflow, it would be good to compare them by using long.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good review.I refactor the code.

@caneGuy
Copy link
Contributor Author

caneGuy commented Aug 24, 2017

@jiangxb1987 Ok,i will try to do some benchmark tesing.

@caneGuy
Copy link
Contributor Author

caneGuy commented Aug 24, 2017

I mock some local test for two different api. From the simple test result, we can see slice will not affect the performance of write bytes.Test result below:

【Test 10 chunks each with 30m for 1 loop】
Time cost with 1 loop for writeFully(): 83 ms
Time cost with 1 loop for writeWithSlice(): 76 ms
【Ending】

【Test 10 chunks each with 100m for 1 loop】
Time cost with 1 loop for writeFully(): 219 ms
Time cost with 1 loop for writeWithSlice(): 213 ms
【Ending】

【Test 10 chunks each with 30m for 10 loop】
Time cost with 10 loop for writeFully(): 982 ms
Time cost with 10 loop for writeWithSlice(): 1000 ms
【Ending】

【Test 10 chunks each with 100m for 10 loop】
Time cost with 10 loop for writeFully(): 3298 ms
Time cost with 10 loop for writeWithSlice(): 3454 ms
【Ending】

【Test 10 chunks each with 30m for 50 loop】
Time cost with 50 loop for writeFully(): 3444 ms
Time cost with 50 loop for writeWithSlice(): 3329 ms
【Ending】

【Test 10 chunks each with 100m for 50 loop】
Time cost with 50 loop for writeFully(): 21913 ms
Time cost with 50 loop for writeWithSlice(): 17574 ms
【Ending】

Test code below:

test("benchmark testing") {
    // scalastyle:off
    val buffer100 = ByteBuffer.allocate(1024 * 1024 * 100)
    val buffer30 = ByteBuffer.allocate(1024 * 1024 * 30)
    testWithLoop(1, new ChunkedByteBuffer(Array.fill(10)(buffer30)), "Test 10 chunks each with 30m for 1 loop")
    testWithLoop(1, new ChunkedByteBuffer(Array.fill(10)(buffer100)), "Test 10 chunks each with 100m for 1 loop")

    testWithLoop(10, new ChunkedByteBuffer(Array.fill(10)(buffer30)), "Test 10 chunks each with 30m for 10 loop")
    testWithLoop(10, new ChunkedByteBuffer(Array.fill(10)(buffer100)), "Test 10 chunks each with 100m for 10 loop")

    testWithLoop(50, new ChunkedByteBuffer(Array.fill(10)(buffer30)), "Test 10 chunks each with 30m for 50 loop")
    testWithLoop(50, new ChunkedByteBuffer(Array.fill(10)(buffer100)), "Test 10 chunks each with 100m for 50 loop")
  }

  // scalastyle:off
  private def testWithLoop(loopTimes : Int, chunkedByteBuffer: ChunkedByteBuffer, testString: String) {
    System.out.println(s"【$testString】")
    var starTime = System.currentTimeMillis()
    for (i <- 1 to loopTimes) {
      chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
    }
    System.out.println(s"Time cost with $loopTimes loop for writeFully():${Utils.getUsedTimeMs(starTime)}")
    starTime = System.currentTimeMillis()
    for (i <- 1 to loopTimes) {
      chunkedByteBuffer.writeWithSlice(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
    }
    System.out.println(s"Time cost with $loopTimes loop for writeWithSlice():${Utils.getUsedTimeMs(starTime)}")
    System.out.println("【Ending】")
    System.out.println("")
  }

@SparkQA
Copy link

SparkQA commented Aug 24, 2017

Test build #81068 has finished for PR 18730 at commit 4789772.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 24, 2017

Test build #81061 has finished for PR 18730 at commit bab91db.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 24, 2017

Test build #81064 has finished for PR 18730 at commit 72aef67.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 24, 2017

Test build #81069 has finished for PR 18730 at commit aeabe1d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented Aug 24, 2017

Thank you for preparing a benchmark. Could you please write a benchmark using Benchmark class?
Java/Scala benchmark usually needs warmup run to avoid execution time by an interpreter.

@caneGuy
Copy link
Contributor Author

caneGuy commented Aug 24, 2017

Update with Benchmark @kiszk

Running benchmark: Benchmark writeWithSlice() and writeFully()
  Running case: Test writeFully() chunks each with 30m for 10 loop
  Stopped after 10 iterations, 2365 ms
  Running case: Test writeWithSlice() chunks each with 30m for 10 loop
  Stopped after 10 iterations, 1440 ms
  Running case: Test writeFully() chunks each with 100m for 50 loop
  Stopped after 50 iterations, 2860 ms
  Running case: Test writeWithSlice() chunks each with 100m for 50 loop
  Stopped after 50 iterations, 2834 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_25-b17 on Linux 4.4.0-64-generic
Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz
Benchmark writeWithSlice() and writeFully(): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Test writeFully() chunks each with 30m for 10 loop        98 /  237        160.8           6.2       1.0X
Test writeWithSlice() chunks each with 30m for 10 loop        55 /  144        287.6           3.5       1.8X
Test writeFully() chunks each with 100m for 50 loop        54 /   57        290.3           3.4       1.8X
Test writeWithSlice() chunks each with 100m for 50 loop        54 /   57        288.9           3.5       1.8X

private[spark] val BUFFER_WRITE_CHUNK_SIZE =
ConfigBuilder("spark.buffer.write.chunkSize")
.internal()
.doc("The block size limit when use ChunkedByteBuffer to writeFully bytes.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chunk size during writing out the bytes of ChunkedByteBuffer

@SparkQA
Copy link

SparkQA commented Aug 24, 2017

Test build #81085 has finished for PR 18730 at commit 717f886.

  • This patch fails to build.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

LGTM, pending jenkins

@caneGuy
Copy link
Contributor Author

caneGuy commented Aug 24, 2017

Thanks for your time @cloud-fan

@SparkQA
Copy link

SparkQA commented Aug 24, 2017

Test build #81087 has finished for PR 18730 at commit e48f44f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 24, 2017

Test build #81086 has finished for PR 18730 at commit 9d3004d.

  • This patch fails to build.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 24, 2017

Test build #81089 has finished for PR 18730 at commit fc184aa.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 24, 2017

Test build #81091 has finished for PR 18730 at commit f1d67a3.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 24, 2017

Test build #81080 has finished for PR 18730 at commit d708142.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 24, 2017

Test build #81081 has finished for PR 18730 at commit 33a2796.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 24, 2017

Test build #81082 has finished for PR 18730 at commit ab384d4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 24, 2017

Test build #81083 has finished for PR 18730 at commit b669351.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 25, 2017

Test build #81112 has finished for PR 18730 at commit 14ca824.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@caneGuy
Copy link
Contributor Author

caneGuy commented Aug 25, 2017

@cloud-fan can we retest this?Thanks

@cloud-fan
Copy link
Contributor

retest this please

while (bytes.remaining > 0) {
while (bytes.remaining() > 0) {
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
bytes.limit(bytes.position + ioSize.toInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's avoid this per-loop type cast, we can make bufferWriteChunkSize an int.

.internal()
.doc("The chunk size during writing out the bytes of ChunkedByteBuffer.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(64 * 1024 * 1024)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a checkValue to make sure the value is smaller than Int.Max

@SparkQA
Copy link

SparkQA commented Aug 25, 2017

Test build #81125 has finished for PR 18730 at commit 14ca824.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 25, 2017

Test build #81128 has finished for PR 18730 at commit a02575e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@caneGuy
Copy link
Contributor Author

caneGuy commented Aug 25, 2017

@cloud-fan Jekins done!

@asfgit asfgit closed this in 574ef6c Aug 25, 2017
@cloud-fan
Copy link
Contributor

thanks, merging to master!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants