Skip to content

Conversation

@rxin
Copy link
Contributor

@rxin rxin commented Apr 12, 2014

This should fix the extra memory copy introduced by #266.

@mridulm @pwendell @mateiz

Note that because I created the util.io package, I also moved ByteBufferInputStream into the package.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14073/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still entails a copy, right? I don't see that this improves the situation by itself.

@mridulm
Copy link
Contributor

mridulm commented Apr 12, 2014

I actually meant something like this:
(This is from an internal WIP branch to tackle the ByteBuffer to Seq[ByteBuffer])
Ideally I should submit this via a PR, but unfortunately, but ...

Please take a look at needCompact and toByteBuffer (replace direct creation of ByteBuffer from byte array with this method).
Idea is to potentially waste some memory to prevent memory fragmentation and/or copy.

package org.apache.spark.io

import java.io.OutputStream
import java.nio.ByteBuffer
import java.util.{Arrays => JArrays}
import org.apache.spark.storage.DiskStore

/**
 * A custom implementation of ByteArrayOutputStream which tries to minimize array copies by
 * reusing the underlying array if within bounds.
 *
 * Note, this is unsafe for general use : directly exposed the data array for use.
 *
 */
private[io] class SparkByteArrayOutputStream(initialSize: Int) extends OutputStream {

  if (initialSize < 0) {
    throw new IllegalArgumentException("Negative initial size: " + initialSize)
  }

  /**
   * The buffer where data is stored.
   */
  private var buf: Array[Byte] = new Array[Byte](initialSize)
  /**
   * The number of valid bytes in the buffer.
   */
  private var count: Int = 0

  /**
   * Creates a new byte array output stream. The buffer capacity is
   * initially 32 bytes, though its size increases if necessary.
   */
  def this() = this(32)

  private def ensureCapacity(minCapacity: Int) {
    if (minCapacity < 0) throw new IllegalArgumentException("require capacity " + minCapacity + " is negative")
    if (minCapacity > buf.length) grow(minCapacity)
  }

  /**
   * Increases the capacity to ensure that it can hold at least the
   * number of elements specified by the minimum capacity argument.
   *
   * @param minCapacity the desired minimum capacity
   */
  private def grow(minCapacity: Int) {
    val oldCapacity: Int = buf.length

    // TODO: This might be too expensive as size grows : work out something better ?
    var newCapacity: Int = oldCapacity << 1

    if (newCapacity - minCapacity < 0) newCapacity = minCapacity
    // set max size to DiskStore.MAX_BLOCK_SIZE
    if (newCapacity > DiskStore.MAX_BLOCK_SIZE) newCapacity = DiskStore.MAX_BLOCK_SIZE

    if (newCapacity < 0) {
      throw new IllegalArgumentException("computed cacacity = " + newCapacity +
        " is negative. minCapacity = " + minCapacity)
    }

    if (newCapacity <= buf.length || newCapacity < minCapacity) {
      throw new IllegalStateException("Cant grow the array anymore. Already at max size ?" +
        " newCapacity = " + newCapacity +
        ", minCapacity = " + minCapacity +
        ", blocksize = " + DiskStore.MAX_BLOCK_SIZE)
    }

    buf = JArrays.copyOf(buf, newCapacity)
  }

  /**
   * Writes the specified byte to this byte array output stream.
   *
   * @param   b   the byte to be written.
   */
  def write(b: Int) {
    ensureCapacity(count + 1)
    buf(count) = b.asInstanceOf[Byte]
    count += 1
  }

  /**
   * Writes <code>len</code> bytes from the specified byte array
   * starting at offset <code>off</code> to this byte array output stream.
   *
   * @param   b     the data.
   * @param   off   the start offset in the data.
   * @param   len   the number of bytes to write.
   */
  override def write(b: Array[Byte], off: Int, len: Int) {
    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) {
      throw new IndexOutOfBoundsException
    }
    ensureCapacity(count + len)
    System.arraycopy(b, off, buf, count, len)
    count += len
  }

  /**
   * Resets the <code>count</code> field of this byte array output
   * stream to zero, so that all currently accumulated output in the
   * output stream is discarded. The output stream can be used again,
   * reusing the already allocated buffer space.
   *
   * @see     java.io.ByteArrayInputStream#count
   */
  def reset() {
    count = 0
  }

  /**
   * Trim the underlying array : do this only if it makes sense - that is, if the space saving
   * is worth more than the cost of doing the allocation copy.
   *
   * Note, this is called after all data has been written to the stream to compact the array.
   */
  def compact() {
    if (SparkByteArrayOutputStream.needCompact(this)) {
      buf = JArrays.copyOf(buf, count)
    }
  }

  /**
   * Creates a newly allocated byte array. Its size is the current
   * size of this output stream and the valid contents of the buffer
   * have been copied into it.
   *
   * @return  the current contents of this output stream, as a byte array.
   * @see     java.io.ByteArrayOutputStream#size()
   */
  def toByteBuffer: ByteBuffer = {
    if (0 == count) return ByteBuffer.allocate(0)
    ByteBuffer.wrap(buf, 0, count)
  }

  /**
   * Returns the current size of the buffer.
   *
   * @return  the value of the <code>count</code> field, which is the number
   *          of valid bytes in this output stream.
   * @see     java.io.ByteArrayOutputStream#count
   */
  def size: Int = {
    count
  }

  /**
   * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in
   * this class can be called after the stream has been closed without
   * generating an <tt>IOException</tt>.
   * <p>
   *
   */
  override def close() {
    // set a flag and not allow any more writes ?
  }
}


object SparkByteArrayOutputStream {
  private val MINIMUM_BUFFER_SIZE = System.getProperty("spark.io.baos.trim.min_buffer_size", (1024 * 1024).toString).toInt
  // 0.1 % by default.
  private val WASTAGE_FRACTION = System.getProperty("spark.io.baos.trim.wastage", 0.001.toString).toDouble

  def needCompact(stream: SparkByteArrayOutputStream): Boolean = {
    val capacity = stream.buf.length
    val used = stream.size
    val wastage = capacity - used

    // If no wastage, nothing to compact
    if (wastage <= 0) return false

    // If capacity really low, always allow compaction : since cost of compaction will be low.
    if (capacity < MINIMUM_BUFFER_SIZE) return true

    // If wastage is small enough, then dont compact
    // Currently, set to X % of capacity.
    val allowedWastage = capacity * WASTAGE_FRACTION

    wastage >= allowedWastage
  }
}

@srowen
Copy link
Member

srowen commented Apr 12, 2014

So I think I agree part of @mridulm 's direction here, but want to make a few comments to clarify why. Apologies if I'm stating the obvious.

The management of the byte array here is not what's solving the immediate problem. Where this stuff is used in the code, the calling code wants to write N bytes in pieces, and then get a byte[] of exactly N bytes. Except in the lucky case that N exactly matched the buffer size, this entails an allocation and copy. Previously this was hidden in a trim() method. The compact() method would not work directly since it can result in an underlying array of more than N bytes.

What's needed is an abstraction that can take a byte[] that's possibly too large, and a limit N, and broker access in a way that makes it act like it is only of length N, without a copy.

Although this is not ByteBuffer's primarily role in life, it can kind of play that role. You can wrap bytes 0 to N of a byte[] without a copy. And it has methods to read through the elements of the buffer. Note that its array() method itself does not copy, but, also provides access to the whole maybe-too-large array underneath.

For that reason, and because calling code in one case already wants a ByteBuffer, this feels like a good solution. However, other callers need to change to use ByteBuffer if they care about the allocation. So there's more to this than just a drop-in replacement.

However I'd also say that this doesn't need reimplementing ByteArrayOutputStream so entirely. Just subclass it and expose a toByteBuffer() method that wrap()s the internal byte[] with an appropriate limit.

I understand the idea of this code is to implement a compaction mechanism, but that's a separate issue really. (When does this help? I understand it can free up some heap, at the cost of a new second allocation and copy, and could be helpful if this object were sticking around a long time. But it's not, right?)

@mridulm
Copy link
Contributor

mridulm commented Apr 12, 2014

Your summarization is fairly accurate @srowen except for two cases :

  • compact method is not expected to resize the array always : only when the free space is deemed too high. The definition of "too high" is WIP - but there is some initial boiler plate code.
  • More importantly, it prevents a copy when we are hitting the 2G limit. More below on why this is important (stream of boas case).

To add, my initial approach was to subclass ByteArrayOutputStream to minimize code :-)
The reason why I moved away from it was because I did not want to expose the toByteArray method : to prevent current/future accidental invocation of that (expensive) method.
Not to mention, a bunch of other methods which actually wont work (more below) in our context.

Before going into details, please note that the main purpose of this stream is to actually get a ByteBuffer out of the data which is subsequently used - which need not span the entire byte[]- can can work within a region of it.

And what is not mentioned above is that the actual use of this class is within another class which multiplexes over these baos instances - so that we are not limited to 2Gb limit : we have an Sequence of these streams : which will be read in order to get to the actual data (the wrapper OutputStream moves from first to next as required; and returns a Seq[ByteBuffer] when we are done writing to it).
Which is why most of the methods wont work - reset, close, toByteArray, toString : since the output stream is not starting at a data boundary : but inside the context of a larger stream. We could leave the methods around : but it did not look right to leave potentially broken functionality around.

@srowen
Copy link
Member

srowen commented Apr 12, 2014

You could deprecate and override toByteArray to throw an exception, etc., to be extra-safe. They "work", the result just may not have much meaning independently. Your class still has methods like close() either way. Dunno, still seems simpler than the duplication.

What's the compaction for? If you've got a series of ~2GB containers, I'd assume you'd fill them each pretty completely and transparently split a big write across the existing and next buffer. It saves a huge allocation, which could fail.

(In the grow() method, you would have to check that the new doubled size hasn't overflowed!)

I agree with use of ByteBuffer, but suppose I'm pointing out that it has to get used in several other places in the code that use byte[] right now in order to get the benefit. I understand that wasn't the direct purpose of the code you're working on, but is the purpose of this PR I think. In which case, perhaps better to leverage your direction.

A simpler step in your direction could be the basis for the change that this PR is trying for. That's why I wonder if this piece could have a simpler, stand-alone purpose.

@mridulm
Copy link
Contributor

mridulm commented Apr 12, 2014

There are two issues here:
a) If we are going to override and deprecate/throw exception for every method which is not exposed by OutputStream - while overriding functionality for most others in ByteArrayOutputStream : then I dont see the value in extending BOAS.

b) More importantly, we do not get to control how array growth happens, what the thresholds are, etc. For example, relying on Integer.MAX_VALUE always is not the best option - we would need to have that configurable, to control what the maximum block size in spark can be per ByteBuffer : this has perf implications in terms of mmap'ing files, reading/writing to other ByteBuffers/sockets, etc.

@srowen
Copy link
Member

srowen commented Apr 12, 2014

Sure, I myself was not suggesting that we should make them throw exceptions. If one really wanted to prohibit their use, that would be a way to do so even when subclassing, but I don't suggest they must be prohibited.

To me, the methods aren't broken or anything, and the resulting class can be reused for the purpose @rxin has in mind in this PR, if these methods are available.

Agree that the max size of such a buffer could be configurable. However the limit can't be more than Integer.MAX_VALUE just because this is the largest an array can be . I was just pointing this out in regard to var newCapacity: Int = oldCapacity << 1 which will fail if oldCapacity is more than half of Integer.MAX_VALUE already.

(There's another tangent here about whether the buffer class should even enforce a limit -- what does it do, fail with IOException? -- because the caller has to manage it either way.)

The gist of my strawman suggestion is: what if we had a simple subclass of ByteArrayOutputStream that exposes a ByteBuffer? I argue that is a basis for removing some long-standing array copies in various parts of the code, which is @rxin's purpose. And then I think it suits your purpose too, excepting the compaction logic, but I was wondering about whether it is needed. (Maybe I should take this to the JIRA issue you opened about your work?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can avoid using trim (/an array copy) here by using RateLimitedOutputStream#write(bytes, offset, length)

@aarondav
Copy link
Contributor

Having a toByteBuffer method definitely seems reasonable to me, the only issue is that ByteBuffer does not provide a good stream-compatible API. So it would either still have to use write(bytes, offset, length), by getting these parameters from the ByteBuffer (and being careful that .array() returns the entire underlying array), or by using some API external to both ByteBuffer and our fast output stream.

What if FastBufferedOutputStream has two methods:

def toByteBuffer: ByteBuffer
def toByteArraySegment: (Array[Byte], Int, Int)

where the latter returns the offset and index to be passed in to any stream APIs.

In order to provide a lower level API, we often have to sacrifice some code niceties, but this at least provides pretty good safety for the user to not misuse it.

@srowen
Copy link
Member

srowen commented Apr 12, 2014

@aarondav I personally like your second method. That alone is probably just what is needed. Callers who actually want a ByteBuffer can wrap easily with this info. In practice the offset will always (?) be 0 so one of the Ints could be omitted I bet.

@rxin
Copy link
Contributor Author

rxin commented Apr 13, 2014

Ok pushed a new version that avoids the extra trim.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14082/

@pwendell
Copy link
Contributor

@rxin hm looks like this RAT exclude isn't working. Can take another crack at it later tonight.

https://github.com/apache/spark/blob/master/.rat-excludes#L43

@pwendell
Copy link
Contributor

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14085/

@srowen
Copy link
Member

srowen commented Apr 13, 2014

Yeah this looks a lot like what I personally have in mind. I think you could simply subclass java.io.ByteArrayOutputStream and add the two new methods, to avoid writing new code.

@rxin
Copy link
Contributor Author

rxin commented Apr 13, 2014

Eh the other reason fastutil implements the FastByteArrayOutputStream without subclassing ByteArrayOutputStream was to get rid of the synchronized writes. To do this properly, we should probably benchmark every corner case to see how well the jvm can do away with the lock that is always running in a single threaded mode. However, we are not adding much code anyway so I think this is good as is.

@srowen
Copy link
Member

srowen commented Apr 13, 2014

Yes, even if the lock is not removed (and it should be) its overhead is trivial compared to other operations here. Up to you.

@aarondav
Copy link
Contributor

Apparently the JVM implements "biased locking" (http://www.oracle.com/technetwork/java/6-performance-137236.html#2.1.1) when an object's monitor is uncontended, which eliminates all synchronization overhead (at the cost of an extremely expensive "bias revocation" operation if any contention appears). So we would expect there to be no relevant performance loss of this synchronization in this case, except perhaps for very small streams which are completed before being biased.

However, the API of def toByteArray: Array[Byte] and def toArray: (Array[Byte], Int) with different performance characteristics is pretty strange. I think the current solution is probably fine, as the implementation is pretty straightforward. I'll eat those words when the first JIRA comes back with a bug introduced from this patch, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need this method, as it eliminates the whole purpose of this stream. You might as well use ByteArrayOutputStream if you need trim().

@rxin
Copy link
Contributor Author

rxin commented Apr 13, 2014

Yes I knew about biased locking. That's why I said "how well the jvm can do away with the lock that is always running in a single threaded mode". However, it is hard to test how well it works with all the random cases (large streams, small streams, and even with biased locking it is not free) we have for this and quantify the impact. And given how small and simple this code is, I think it is fine as is.

I removed the trim method.

@AmplabJenkins
Copy link

Build triggered.

@AmplabJenkins
Copy link

Build started.

@AmplabJenkins
Copy link

Build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14101/

@aarondav
Copy link
Contributor

Looking at our usages of ByteBuffer#array in Spark, most of them do not handle ByteBuffers where capacity != limit correctly. This change could turn these oversights into actual bugs, if the ByteBuffers returned here are used later as inputs. See TachyonStore for an example, where all the logging and accounting looks at limit(), but the actual bytes used will be capacity().

This change also has the behavior of actually storing up to 2x the amount of bytes as actual data, since before we could throw away the backing array and just keep the data. After all, the original solution using fastutil just called trim(), which means it wasn't really faster than ByteArrayOutputStream, but it did preserve the safety and memory characteristics that this implementation does not have.

This change makes the most sense for RawTextSender, where we immediately turn around and write the bytes to a stream, so we don't need to store them and hope no one misuses it later. It makes less sense when we actually want ByteBuffers later -- I think a copy is really the only reasonable solution there.

The ideal solution in terms of API might actually just be @srowen's suggestion:

class FastByteArrayOutputStream extends ByteArrayOutputStream {
   def getUnderlyingArray: Array[Byte]
}

The only downside is that this still has the synchronized behavior which may or may not be impactful, but it saves 100 lines of code and would allow RawTextSender to go without a copy. As fasr as I can tell, neither Task nor Serializer really benefited from the fastutil FastByteArrayOutputStream as both copied the data anyway.

@rxin
Copy link
Contributor Author

rxin commented Apr 17, 2014

Alright you guys convinced me. Let's close this one for now. When we clean up the downstream consumers of the byte buffers, we can revisit this (and maybe do crazier things like reusing byte arrays instead of constantly re-allocating new ones).

@rxin rxin closed this Apr 17, 2014
pwendell pushed a commit to pwendell/spark that referenced this pull request May 12, 2014
Remove now un-needed hostPort option

I noticed this was logging some scary error messages in various places. After I looked into it, this is no longer really used. I removed the option and re-wrote the one remaining use case (it was unnecessary there anyways).
@rxin rxin deleted the FastByteArrayOutputStream branch August 13, 2014 08:01
mccheah pushed a commit to mccheah/spark that referenced this pull request Nov 28, 2018
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
…raform-provider-openstack-test

Enable OS_DEBUG=1 to get the log details
turboFei added a commit to turboFei/spark that referenced this pull request Nov 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants