Skip to content

Conversation

@witgo
Copy link
Contributor

@witgo witgo commented Sep 7, 2016

What changes were proposed in this pull request?

motivation

The various 2G limit in Spark.

  1. When reading the data block is stored in the disk, the following code fragment is called.

      val iterToReturn: Iterator[Any] = {
        val diskBytes = diskStore.getBytes(blockId)
        if (level.deserialized) {
          val diskValues = serializerManager.dataDeserializeStream(
            blockId,
            diskBytes.toInputStream(dispose = true))(info.classTag)
          maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
        } else {
          val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
            .map {_.toInputStream(dispose = false)}
            .getOrElse { diskBytes.toInputStream(dispose = true) }
          serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
        }
      }
    
      def getBytes(blockId: BlockId): ChunkedByteBuffer = {
        val file = diskManager.getFile(blockId.name)
        val channel = new RandomAccessFile(file, "r").getChannel
        Utils.tryWithSafeFinally {
          // For small files, directly read rather than memory map
          if (file.length < minMemoryMapBytes) {
            val buf = ByteBuffer.allocate(file.length.toInt)
            channel.position(0)
            while (buf.remaining() != 0) {
              if (channel.read(buf) == -1) {
                throw new IOException("Reached EOF before filling buffer\n" +
                  s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
              }
            }
            buf.flip()
            new ChunkedByteBuffer(buf)
          } else {
            new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
          }
        } {
          channel.close()
        }
      }
    

    The above code has the following problems:

    • channel.map(MapMode.READ_ONLY, 0, file.length) returns an instance of MappedByteBuffer. the size of MappedByteBuffer can not exceed 2G.
    • When a Iterator[Any] is generated, need to load all the data into the memory,this may take up a lot of memory.
  2. When using kryo serialized data, the following code fragment is called:

      override def serialize[T: ClassTag](t: T): ByteBuffer = {
        output.clear()
        val kryo = borrowKryo()
        try {
          kryo.writeClassAndObject(output, t)
        } catch {
          case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
            throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
              "increase spark.kryoserializer.buffer.max value.")
        } finally {
          releaseKryo(kryo)
        }
        ByteBuffer.wrap(output.toBytes)
      }
    

    The above code has the following problems:

    • The serialization data is stored in the output internal byte[], the size of byte[] can not exceed 2G.
  3. When RPC writes data to be sent to the Channel, the following code fragment is called:

      public long transferTo(final WritableByteChannel target, final long position) throws IOException {
        Preconditions.checkArgument(position == totalBytesTransferred, "Invalid position.");
        // Bytes written for header in this call.
        long writtenHeader = 0;
        if (header.readableBytes() > 0) {
          writtenHeader = copyByteBuf(header, target);
          totalBytesTransferred += writtenHeader;
          if (header.readableBytes() > 0) {
            return writtenHeader;
          }
        }
    
        // Bytes written for body in this call.
        long writtenBody = 0;
        if (body instanceof FileRegion) {
          writtenBody = ((FileRegion) body).transferTo(target, totalBytesTransferred - headerLength);
        } else if (body instanceof ByteBuf) {
          writtenBody = copyByteBuf((ByteBuf) body, target);
        }
        totalBytesTransferred += writtenBody;
        return writtenHeader + writtenBody;
      }

    The above code has the following problems:

    • the size of ByteBuf cannot exceed 2G
    • cannot transfer data over 2G in memory
  4. When decodes the RPC message received, the following code fragment is called:

    public final class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
    
      private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class);
    
      @Override
      public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        Message.Type msgType = Message.Type.decode(in);
        Message decoded = decode(msgType, in);
        assert decoded.type() == msgType;
        logger.trace("Received message {}: {}", msgType, decoded);
        out.add(decoded);
      }
    
      private Message decode(Message.Type msgType, ByteBuf in) {
        switch (msgType) {
          case ChunkFetchRequest:
            return ChunkFetchRequest.decode(in);
    
          case ChunkFetchSuccess:
            return ChunkFetchSuccess.decode(in);
    
          case ChunkFetchFailure:
            return ChunkFetchFailure.decode(in);
    
          default:
            throw new IllegalArgumentException("Unexpected message type: " + msgType);
        }
      }
    }
    

    The above code has the following problems:

    • the size of ByteBuf cannot exceed 2G
    • Must be in the receiver to complete the data can be decoded

Goals

  • Setup for eliminating the various 2G limit in Spark. (The 2G limit 1,2,3,4)
  • Support back-pressure flow control for remote data reading(experimental goal). (The 2G limit 4)
  • Add buffer pool(long-range goal).

Design

Setup for eliminating the various 2G limit in Spark.

Replace ByteBuffer with ChunkedByteBuffer. (The 2G limit 1,2)
  • Support reference counting, a necessary condition to the feature of the buffer pool
    Reference counted objects
  • Support serialization for easy transport
  • Support slice duplicate and copy operation
  • Can be efficiently converted to InputStream, ByteBuffer, byte[] and ByteBuf, etc.
  1. Move the ChunkedByteBuffer class to common/network-common/src/main/java/org/apache/spark/network/buffer/.

  2. Modify ManagedBuffer.nioByteBuffer's return value to ChunkedByteBuffer instance.(The 2G limit 1)

  3. Modify the parameter of SerializerInstance.deserialize and the return value of SerializerInstance.serialize to ChunkedByteBuffer instance. (The 2G limit 2)

    def serialize[T: ClassTag](t: T): ChunkedByteBuffer = {
      output.clear()
      val out = ChunkedByteBufferOutputStream.newInstance()
      output.setOutputStream(out)
      val kryo = borrowKryo()
      kryo.writeClassAndObject(output, t)
      output.close()
      out.toChunkedByteBuffer
    }
  4. Other changes.

Replace ByteBuf with InputStream.
  1. Modify NioManagedBuffer.convertToNetty method returns InputStream instances when data is larger than Integer.MAX_VALUE. (The 2G limit 3)

  2. Add InputStreamManagedBuffer class, used to convert InputStream instance to ManagedBuffer instance. (The 2G limit 4)

  3. Modify MessageWithHeader classes, support processing InputStream instance (The 2G limit 3)

  4. Modify the parameters of the Encodable.encode method to OutputStream instance. (The 2G limit 4)

  5. Modify the parameters of the decode method of the classes who implement the Encodable interface to InputStream instance.It can handle mixed storage data (The 2G limit 3)

    public InputStream toInputStream() throws IOException {
      ChunkedByteBufferOutputStream out = ChunkedByteBufferOutputStream.newInstance();
      Encoders.Bytes.encode(out, type().id());
      encodeWithoutBlockData(out);
      // out.toChunkedByteBuffer().toInputStream() data in memory
      // blockData.createInputStream()  data in hard disk(FileInputStream)
      return new SequenceInputStream(out.toChunkedByteBuffer().toInputStream(),
          blockData.createInputStream());
    }
  6. Modify TransportFrameDecoder class, use LinkedList<ByteBuf> to represent the Frame, remove the size limit of Frame. The 2G limit 4)

  7. Add ByteBufInputStream class, used to convert LinkedList<ByteBuf> instance to InputStream instance. (The 2G limit 4)

  8. Modify the parameters of RpcHandler.receive method to InputStream instance. (The 2G limit 4)

Read data

Local data
  1. Only the data stored in the memory is represented by ChunkedByteBuffer, the other is represented by ManagedBuffer. (The 2G limit 1)
    • Modify DiskStore.getBytes's return value type to ManagedBuffer instance, which calls ManagedBuffer.nioByteBuffer only when the memory has enough space to store the ManagedBuffer data.
Remote Data (The 2G limit 4)

There are three options:

  1. Add InputStreamInterceptor to support propagate back-pressure to shuffle server(The option has been implemented):
    • When the number of ByteBuf in the cache exceeds a certain amount, call channel.config ().SetAutoRead (false) disable AUTO_READ, no longer automatically call channle.read ().
    • When the number of ByteBuf in the cache is smaller than a certain amount, call channel.config().setAutoRead(true) enable AUTO_READ .
    • The advantage of this option is to support propagate back-pressure; drawback is that can lead semantic change the existing API, in some cases the IO retry function is invalid.
  2. When the size of message is greater than a certain value, the message is written to disk, not take up memory. ma
  • The advantage of this options is to take up very little memory, the disadvantage is to increase the disk IO.
  1. Combined with buffer pool, qs far as possible stores data in memory.
    • Write message to the buffer pool when there has enough memory, otherwise write on disk.

How was this patch tested?

Each block has a 4G data, each map ' s data file has 16G of data, a total of 64G data.

val rdd = sc.makeRDD(1 to (1024 * 64), 4).flatMap { _ =>
  (1 to 1024).map { _ =>
    val bytes = new Array[Byte](1024)
    scala.util.Random.nextBytes(bytes)
    bytes
  }
}
rdd.localCheckpoint()
rdd.count

(1 to 4).foreach{ i=>
  val serializeStart = System.currentTimeMillis()
  rdd.repartition(4).count()
  val serializeFinish = System.currentTimeMillis()
  println(f"Test $i: ${(serializeFinish - serializeStart) / 1000D}%1.2f")
}

=>

Test 1: 183.73                                                                  
Test 2: 187.17                                                                  
Test 3: 203.13                                                                  
Test 4: 226.96  

@SparkQA
Copy link

SparkQA commented Sep 7, 2016

Test build #65039 has finished for PR 14995 at commit 8a80539.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 7, 2016

Test build #65041 has finished for PR 14995 at commit 46e641d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 8, 2016

Test build #65069 has finished for PR 14995 at commit 774a412.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 8, 2016

Test build #65070 has finished for PR 14995 at commit e48d701.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 8, 2016

Test build #65075 has finished for PR 14995 at commit 6e58182.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 8, 2016

Test build #65090 has finished for PR 14995 at commit 2b9c946.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 8, 2016

Test build #65098 has finished for PR 14995 at commit a8f89d4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@witgo
Copy link
Contributor Author

witgo commented Sep 9, 2016

retest please.

@witgo witgo changed the title [Test Only][not ready for review][SPARK-6235][CORE]Address various 2G limits [Test Only][SPARK-6235][CORE]Address various 2G limits Sep 9, 2016
@witgo
Copy link
Contributor Author

witgo commented Sep 9, 2016

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Sep 9, 2016

Test build #65126 has finished for PR 14995 at commit a8f89d4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@witgo witgo force-pushed the SPARK-6235_Address_various_2G_limits branch from a8f89d4 to b31fbcd Compare September 12, 2016 08:31
@SparkQA
Copy link

SparkQA commented Sep 12, 2016

Test build #65247 has finished for PR 14995 at commit b31fbcd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@witgo witgo force-pushed the SPARK-6235_Address_various_2G_limits branch from b31fbcd to 11401ac Compare September 13, 2016 01:03
@SparkQA
Copy link

SparkQA commented Sep 13, 2016

Test build #65291 has finished for PR 14995 at commit 11401ac.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 13, 2016

Test build #65305 has finished for PR 14995 at commit a5403fe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@witgo witgo force-pushed the SPARK-6235_Address_various_2G_limits branch from a5403fe to 57833e3 Compare September 19, 2016 01:30
@SparkQA
Copy link

SparkQA commented Sep 19, 2016

Test build #65584 has finished for PR 14995 at commit 57833e3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@witgo witgo force-pushed the SPARK-6235_Address_various_2G_limits branch from 5634dea to 58d4b7c Compare September 21, 2016 01:58
@SparkQA
Copy link

SparkQA commented Sep 21, 2016

Test build #65696 has finished for PR 14995 at commit 58d4b7c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 21, 2016

Test build #65695 has finished for PR 14995 at commit 5634dea.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@witgo witgo force-pushed the SPARK-6235_Address_various_2G_limits branch from 58d4b7c to 81fd814 Compare September 27, 2016 11:24
@SparkQA
Copy link

SparkQA commented Sep 27, 2016

Test build #65967 has finished for PR 14995 at commit 81fd814.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@witgo
Copy link
Contributor Author

witgo commented Sep 27, 2016

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Sep 27, 2016

Test build #65968 has finished for PR 14995 at commit 81fd814.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@witgo witgo force-pushed the SPARK-6235_Address_various_2G_limits branch from 81fd814 to 754b9dc Compare September 28, 2016 08:33
@SparkQA
Copy link

SparkQA commented Sep 28, 2016

Test build #66032 has finished for PR 14995 at commit 754b9dc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@witgo witgo force-pushed the SPARK-6235_Address_various_2G_limits branch from 754b9dc to 53d6ad6 Compare October 8, 2016 14:06
@SparkQA
Copy link

SparkQA commented Oct 8, 2016

Test build #66579 has finished for PR 14995 at commit 53d6ad6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@witgo witgo force-pushed the SPARK-6235_Address_various_2G_limits branch from 53d6ad6 to 043845f Compare October 14, 2016 08:34
@witgo witgo force-pushed the SPARK-6235_Address_various_2G_limits branch from 043845f to be1efc4 Compare October 16, 2016 03:04
@SparkQA
Copy link

SparkQA commented Oct 16, 2016

Test build #67024 has finished for PR 14995 at commit be1efc4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@witgo witgo force-pushed the SPARK-6235_Address_various_2G_limits branch from be1efc4 to 540a65a Compare November 3, 2016 13:38
@SparkQA
Copy link

SparkQA commented Nov 3, 2016

Test build #68069 has finished for PR 14995 at commit 540a65a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@opme
Copy link

opme commented Nov 20, 2016

What is the plan to incorporate this patch into a released version? I am hitting this limit when doing simple joins of a table with 2 million records to a table of 300 million records and performing a count.

@srowen
Copy link
Member

srowen commented Nov 20, 2016

@witgo if this is a stale WIP, would you mind closing it? as far as I know it doesn't necessarily solve the problem in the JIRA and may be misleading to those that think there is a solution available. It's actually many problems in one.

@witgo
Copy link
Contributor Author

witgo commented Nov 21, 2016

@srowen
This PR is a comprehensive solution. Used to solve address various 2G limits, RPC memory footprint and other issues.
Users often encounter these problems. Why don't we need to solve this problem?

@srowen
Copy link
Member

srowen commented Nov 21, 2016

This change is marked as not ready to merge, does not merge, hasn't been reviewed, and isn't going to be reviewed as a huge 143-file, 4400 line change. This has been discussed in many different ways regarding this issue over time.

@opme
Copy link

opme commented Nov 21, 2016

I compiled this pull request into the main branch. I can confirm it fixes the problem where I was performing joins between tables with 2 million and 300 million records + making counts and then getting the java Integer error. Previous to building the patched version, I had tried setting 2000 partitions on the Dataframes with spark.sql.shuffle.partitions and the failure still occurred. I don't see there is a valid workaround through partitioning or that the failure occurs as a program design issue.

If the patch is not merged, I will likely make a docker image that contains it and publish that through docker.io. What is the way forward to getting this into a released version?

@witgo
Copy link
Contributor Author

witgo commented Nov 22, 2016

This PR is Test only, it used to

  1. verify code through CI
  2. verify the effectiveness of the solution

includes two underlying API changes.

  1. Replace ByteBuffer with ChunkedByteBuffer.
  2. Replace ByteBuf with InputStream.

There should not be much debate about 1.(Master branch has done some of the relevant changes), But @rxin has a different idea for 2.
We should reach a consensus on the above two underlying changes, and then do the next step.

@srowen What do you think of the above two changes?
@opme Have you done more test on large scale data shuflle?

@witgo witgo force-pushed the SPARK-6235_Address_various_2G_limits branch from 540a65a to 04172e0 Compare November 24, 2016 07:31
@SparkQA
Copy link

SparkQA commented Nov 24, 2016

Test build #69116 has started for PR 14995 at commit 04172e0.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69116/
Test FAILed.

@opme
Copy link

opme commented Nov 24, 2016

@witgo I have a Pyspark application that was failing in 3 different places but is able to run without errors now. I'm glad for this patch as I am not sure how I would have explained to my professors why the big data application I chose to do my analysis has 32 bit limitations. This is my final project for a Georgia Tech Big data class and I will write about the these limitations of Spark in my paper. My app is called the Surgeon Scorecard and it computes surgical complication rate for surgeons on the Medicare synthetic cms dataset which is about 1.6 billion records. https://github.com/opme/SurgeonScorecard.

@witgo
Copy link
Contributor Author

witgo commented Nov 25, 2016

@opme Thanks, I am glad to hear this, and I want to solve the issues of reading, storing and transmitting data as much as possible.

@vanzin vanzin mentioned this pull request Jun 7, 2017
@asfgit asfgit closed this in b771fed Jun 8, 2017
@lklong
Copy link

lklong commented Jun 30, 2017

hi@witgo ,i am glad to see this patch ,but i do not know how should i check this patch,i need this very much, thank you for give me this patch url.please and thanks!

@lklong
Copy link

lklong commented Jun 30, 2017

@witgo hi ,i want to know this patch can be used in preduct env?

@witgo
Copy link
Contributor Author

witgo commented Jul 1, 2017

I did not do much testing, but I think it can be used in the production environment
the url: https://github.com/witgo/spark/tree/SPARK-6235_Address_various_2G_limits

@lklong
Copy link

lklong commented Jul 2, 2017

@witgo thank you very much ,my product of next generation will use spark-2.1,now i use spark-1.6,i am often meet 2g error ,i am very glad to see you code! thanks! i just use spark-ml in my production!
however i want to know why do not merge this to master-branch?if so ,we all can benefit of you !

@lklong
Copy link

lklong commented Jul 2, 2017

@witgo i am very sorry to ask you give me a zip to my email ,because i try to get this branch more than 10 times ,but i can not download this branch ,my email is [email protected],please help me ,thanks very much!

@j143-zz
Copy link

j143-zz commented Jul 16, 2017

Hi @lklong
Did you test this PR in production environment. If yes, can you share the results on this jira. If you have problems with downloading this branch please let me know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants