From 7ad543ddd0ef354b7147a16b75dbde5d649c8329 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 22 Jan 2019 12:40:01 -0800 Subject: [PATCH] Changes: - add exceptions to API - fix write side: use serializer manager, fix usage of ByteBuffer. - misc compilation / style fixes to get things to build. --- .../spark/shuffle/api/ShuffleDataIO.java | 8 +++-- .../shuffle/api/ShuffleMapOutputWriter.java | 8 +++-- .../shuffle/api/ShufflePartitionReader.java | 8 +++-- .../shuffle/api/ShufflePartitionWriter.java | 7 +++-- .../spark/shuffle/api/ShuffleReadSupport.java | 5 +++- .../shuffle/api/ShuffleWriteSupport.java | 5 +++- .../external/ExternalShuffleReadSupport.java | 3 +- .../shuffle/BlockStoreShuffleReader.scala | 2 +- .../ShufflePartitionWriterOutputStream.scala | 30 +++++++++---------- .../shuffle/sort/SortShuffleWriter.scala | 2 +- .../apache/spark/storage/BlockManager.scala | 5 ++-- .../ShufflePartitionObjectWriter.scala | 11 +++---- .../util/collection/ExternalSorter.scala | 13 +++++--- ...ernetesShuffleServiceAddressProvider.scala | 6 ---- .../cluster/mesos/MesosClusterManager.scala | 3 +- .../cluster/YarnClusterManager.scala | 4 ++- 16 files changed, 68 insertions(+), 52 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java index b091e231f2cd..19cd94712a8a 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java @@ -16,11 +16,13 @@ */ package org.apache.spark.shuffle.api; +import java.io.IOException; + public interface ShuffleDataIO { - void initialize(); + void initialize() throws IOException; - ShuffleReadSupport readSupport(); + ShuffleReadSupport readSupport() throws IOException; - ShuffleWriteSupport writeSupport(); + ShuffleWriteSupport writeSupport() throws IOException; } diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java index 06415dba72d3..becb9413a8f4 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java @@ -17,11 +17,13 @@ package org.apache.spark.shuffle.api; +import java.io.IOException; + public interface ShuffleMapOutputWriter { - ShufflePartitionWriter newPartitionWriter(int partitionId); + ShufflePartitionWriter newPartitionWriter(int partitionId) throws IOException; - void commitAllPartitions(); + void commitAllPartitions() throws IOException; - void abort(Exception exception); + void abort(Exception exception) throws IOException; } diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShufflePartitionReader.java b/core/src/main/java/org/apache/spark/shuffle/api/ShufflePartitionReader.java index 817d213cd8cc..46d169972498 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShufflePartitionReader.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShufflePartitionReader.java @@ -17,12 +17,14 @@ package org.apache.spark.shuffle.api; -import org.apache.spark.storage.ShuffleLocation; - import java.io.InputStream; +import java.io.IOException; import java.util.Optional; +import org.apache.spark.storage.ShuffleLocation; + public interface ShufflePartitionReader { - InputStream fetchPartition(int reduceId, Optional shuffleLocation); + InputStream fetchPartition(int reduceId, Optional shuffleLocation) + throws IOException; } diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShufflePartitionWriter.java b/core/src/main/java/org/apache/spark/shuffle/api/ShufflePartitionWriter.java index bdc0fd45474f..e7cc6dd913d1 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShufflePartitionWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShufflePartitionWriter.java @@ -17,6 +17,7 @@ package org.apache.spark.shuffle.api; +import java.io.IOException; import java.io.OutputStream; /** @@ -27,18 +28,18 @@ public interface ShufflePartitionWriter { /** * Return a stream that should persist the bytes for this partition. */ - OutputStream openPartitionStream(); + OutputStream openPartitionStream() throws IOException; /** * Indicate that the partition was written successfully and there are no more incoming bytes. * Returns a {@link CommittedPartition} indicating information about that written partition. */ - CommittedPartition commitPartition(); + CommittedPartition commitPartition() throws IOException; /** * Indicate that the write has failed for some reason and the implementation can handle the * failure reason. After this method is called, this writer will be discarded; it's expected that * the implementation will close any underlying resources. */ - void abort(Exception failureReason); + void abort(Exception failureReason) throws IOException; } diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleReadSupport.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleReadSupport.java index b1be7c1de98a..ebe8fd12dccd 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleReadSupport.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleReadSupport.java @@ -17,8 +17,11 @@ package org.apache.spark.shuffle.api; +import java.io.IOException; + public interface ShuffleReadSupport { - ShufflePartitionReader newPartitionReader(String appId, int shuffleId, int mapId); + ShufflePartitionReader newPartitionReader(String appId, int shuffleId, int mapId) + throws IOException; } diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleWriteSupport.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleWriteSupport.java index 2f61dbaa17c6..f88555f8a1bd 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleWriteSupport.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleWriteSupport.java @@ -17,7 +17,10 @@ package org.apache.spark.shuffle.api; +import java.io.IOException; + public interface ShuffleWriteSupport { - ShuffleMapOutputWriter newMapOutputWriter(String appId, int shuffleId, int mapId); + ShuffleMapOutputWriter newMapOutputWriter(String appId, int shuffleId, int mapId) + throws IOException; } diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleReadSupport.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleReadSupport.java index a671b80904ed..a9eac0443fdc 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleReadSupport.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleReadSupport.java @@ -1,5 +1,7 @@ package org.apache.spark.shuffle.external; +import scala.compat.java8.OptionConverters; + import com.google.common.collect.Lists; import org.apache.spark.MapOutputTracker; import org.apache.spark.network.TransportContext; @@ -13,7 +15,6 @@ import org.apache.spark.storage.ShuffleLocation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.compat.java8.OptionConverters; import java.util.List; import java.util.Optional; diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 70c76d594815..caeecedc5d36 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -20,7 +20,7 @@ package org.apache.spark.shuffle import scala.compat.java8.OptionConverters import org.apache.spark._ -import org.apache.spark.internal.{Logging, config} +import org.apache.spark.internal.{config, Logging} import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.api.ShuffleReadSupport import org.apache.spark.storage._ diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionWriterOutputStream.scala b/core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionWriterOutputStream.scala index 2eed51962181..8a776281041d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionWriterOutputStream.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionWriterOutputStream.scala @@ -20,38 +20,36 @@ package org.apache.spark.shuffle import java.io.{InputStream, OutputStream} import java.nio.ByteBuffer -import org.apache.spark.network.util.LimitedInputStream +import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.api.ShufflePartitionWriter +import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.{ByteBufferInputStream, Utils} class ShufflePartitionWriterOutputStream( - partitionWriter: ShufflePartitionWriter, buffer: ByteBuffer, bufferSize: Int) - extends OutputStream { + blockId: ShuffleBlockId, + partitionWriter: ShufflePartitionWriter, + buffer: ByteBuffer, + serializerManager: SerializerManager) + extends OutputStream { - private var currentChunkSize = 0 - private val bufferForRead = buffer.asReadOnlyBuffer() private var underlyingOutputStream: OutputStream = _ override def write(b: Int): Unit = { - buffer.putInt(b) - currentChunkSize += 1 - if (currentChunkSize == bufferSize) { + buffer.put(b.asInstanceOf[Byte]) + if (buffer.remaining() == 0) { pushBufferedBytesToUnderlyingOutput() } } private def pushBufferedBytesToUnderlyingOutput(): Unit = { - bufferForRead.reset() - var bufferInputStream: InputStream = new ByteBufferInputStream(bufferForRead) - if (currentChunkSize < bufferSize) { - bufferInputStream = new LimitedInputStream(bufferInputStream, currentChunkSize) - } + buffer.flip() + var bufferInputStream: InputStream = new ByteBufferInputStream(buffer) if (underlyingOutputStream == null) { - underlyingOutputStream = partitionWriter.openPartitionStream() + underlyingOutputStream = serializerManager.wrapStream(blockId, + partitionWriter.openPartitionStream()) } Utils.copyStream(bufferInputStream, underlyingOutputStream, false, false) - buffer.reset() - currentChunkSize = 0 + buffer.clear() } override def flush(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 98388d80cbe5..b6ab2f354e81 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -71,7 +71,7 @@ private[spark] class SortShuffleWriter[K, V, C]( try { val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) val committedPartitions = pluggableWriteSupport.map { writeSupport => - sorter.writePartitionedToExternalShuffleWriteSupport(mapId, dep.shuffleId, writeSupport) + sorter.writePartitionedToExternalShuffleWriteSupport(blockId, writeSupport) }.getOrElse(sorter.writePartitionedFile(blockId, tmp)) if (pluggableWriteSupport.isEmpty) { shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1575b076d3fa..fb1ed02c857a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -18,7 +18,7 @@ package org.apache.spark.storage import java.io._ -import java.lang.ref.{WeakReference, ReferenceQueue => JReferenceQueue} +import java.lang.ref.{ReferenceQueue => JReferenceQueue, WeakReference} import java.nio.ByteBuffer import java.nio.channels.Channels import java.util.Collections @@ -31,11 +31,12 @@ import scala.concurrent.duration._ import scala.reflect.ClassTag import scala.util.Random import scala.util.control.NonFatal + import com.codahale.metrics.{MetricRegistry, MetricSet} import org.apache.spark._ import org.apache.spark.executor.DataReadMethod -import org.apache.spark.internal.{Logging, config} +import org.apache.spark.internal.{config, Logging} import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.metrics.source.Source import org.apache.spark.network._ diff --git a/core/src/main/scala/org/apache/spark/storage/ShufflePartitionObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/ShufflePartitionObjectWriter.scala index baaee46f8123..b2263a51051a 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShufflePartitionObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShufflePartitionObjectWriter.scala @@ -19,7 +19,7 @@ package org.apache.spark.storage import java.nio.ByteBuffer -import org.apache.spark.serializer.{SerializationStream, SerializerInstance} +import org.apache.spark.serializer.{SerializationStream, SerializerInstance, SerializerManager} import org.apache.spark.shuffle.ShufflePartitionWriterOutputStream import org.apache.spark.shuffle.api.{CommittedPartition, ShuffleMapOutputWriter, ShufflePartitionWriter} @@ -30,10 +30,12 @@ import org.apache.spark.shuffle.api.{CommittedPartition, ShuffleMapOutputWriter, * left to the implementation of the underlying implementation of the writer plugin. */ private[spark] class ShufflePartitionObjectWriter( + blockId: ShuffleBlockId, bufferSize: Int, serializerInstance: SerializerInstance, + serializerManager: SerializerManager, mapOutputWriter: ShuffleMapOutputWriter) - extends PairsWriter { + extends PairsWriter { // Reused buffer. Experiments should be done with off-heap at some point. private val buffer = ByteBuffer.allocate(bufferSize) @@ -44,10 +46,9 @@ private[spark] class ShufflePartitionObjectWriter( def startNewPartition(partitionId: Int): Unit = { require(buffer.position() == 0, "Buffer was not flushed to the underlying output on the previous partition.") - buffer.reset() currentWriter = mapOutputWriter.newPartitionWriter(partitionId) val currentWriterStream = new ShufflePartitionWriterOutputStream( - currentWriter, buffer, bufferSize) + blockId, currentWriter, buffer, serializerManager) objectOutputStream = serializerInstance.serializeStream(currentWriterStream) } @@ -56,7 +57,7 @@ private[spark] class ShufflePartitionObjectWriter( require(currentWriter != null, "Cannot commit a partition that has not been started.") objectOutputStream.close() val committedPartition = currentWriter.commitPartition() - buffer.reset() + buffer.clear() currentWriter = null objectOutputStream = null committedPartition diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 69077c644dc7..70c36c40865b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -24,13 +24,13 @@ import com.google.common.io.ByteStreams import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import org.apache.spark.{util, _} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.internal.Logging import org.apache.spark.serializer._ import org.apache.spark.shuffle.api.{CommittedPartition, ShuffleWriteSupport} import org.apache.spark.shuffle.sort.LocalCommittedPartition -import org.apache.spark.storage.{BlockId, DiskBlockObjectWriter, PairsWriter, ShuffleLocation, ShufflePartitionObjectWriter} -import org.apache.spark.{util, _} +import org.apache.spark.storage._ /** * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner @@ -727,14 +727,18 @@ private[spark] class ExternalSorter[K, V, C]( * Write all partitions to some backend that is pluggable. */ def writePartitionedToExternalShuffleWriteSupport( - mapId: Int, shuffleId: Int, writeSupport: ShuffleWriteSupport): Array[CommittedPartition] = { + blockId: ShuffleBlockId, + writeSupport: ShuffleWriteSupport): Array[CommittedPartition] = { // Track location of each range in the output file val committedPartitions = new Array[CommittedPartition](numPartitions) - val mapOutputWriter = writeSupport.newMapOutputWriter(conf.getAppId, shuffleId, mapId) + val mapOutputWriter = writeSupport.newMapOutputWriter(conf.getAppId, blockId.shuffleId, + blockId.mapId) val writer = new ShufflePartitionObjectWriter( + blockId, Math.min(serializerBatchSize, Integer.MAX_VALUE).toInt, serInstance, + serializerManager, mapOutputWriter) try { @@ -781,6 +785,7 @@ private[spark] class ExternalSorter[K, V, C]( mapOutputWriter.commitAllPartitions() } catch { case e: Exception => + logError("Error writing shuffle data.", e) util.Utils.tryLogNonFatalError { writer.abortCurrentPartition(e) mapOutputWriter.abort(e) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/k8s/KubernetesShuffleServiceAddressProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/k8s/KubernetesShuffleServiceAddressProvider.scala index 63074f6f14d7..420a82bd7d8e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/k8s/KubernetesShuffleServiceAddressProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/k8s/KubernetesShuffleServiceAddressProvider.scala @@ -139,10 +139,4 @@ class KubernetesShuffleServiceAddressProvider( override def onClose(e: KubernetesClientException): Unit = {} } - - private implicit def toRunnable(func: () => Unit): Runnable = { - new Runnable { - override def run(): Unit = func() - } - } } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala index 48ef8df37ecc..a69b0d305035 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala @@ -60,8 +60,9 @@ private[spark] class MesosClusterManager extends ExternalClusterManager { override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) + } - override def createShuffleServiceAddressProvider(): ShuffleServiceAddressProvider = + def createShuffleServiceAddressProvider(): ShuffleServiceAddressProvider = { DefaultShuffleServiceAddressProvider } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala index b2a4fd42c60f..8e83d49d2332 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala @@ -54,6 +54,8 @@ private[spark] class YarnClusterManager extends ExternalClusterManager { override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) } - def createShuffleServiceAddressProvider(): ShuffleServiceAddressProvider = + + def createShuffleServiceAddressProvider(): ShuffleServiceAddressProvider = { DefaultShuffleServiceAddressProvider + } }