diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt index 7a6cfb7b23b94..db23cf5c12ea7 100644 --- a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt +++ b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt @@ -2,10 +2,10 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 205 213 13 1.0 1023.6 1.0X -Deserialization 908 939 27 0.2 4540.2 0.2X +Serialization 170 178 9 1.2 849.7 1.0X +Deserialization 530 535 9 0.4 2651.1 0.3X -Compressed Serialized MapStatus sizes: 400 bytes +Compressed Serialized MapStatus sizes: 411 bytes Compressed Serialized Broadcast MapStatus sizes: 2 MB @@ -13,8 +13,8 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 195 204 24 1.0 976.9 1.0X -Deserialization 913 940 33 0.2 4566.7 0.2X +Serialization 157 165 7 1.3 785.4 1.0X +Deserialization 495 588 79 0.4 2476.7 0.3X Compressed Serialized MapStatus sizes: 2 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -24,21 +24,21 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 616 619 3 0.3 3079.1 1.0X -Deserialization 936 954 22 0.2 4680.5 0.7X +Serialization 344 351 4 0.6 1720.4 1.0X +Deserialization 527 579 99 0.4 2635.9 0.7X -Compressed Serialized MapStatus sizes: 418 bytes -Compressed Serialized Broadcast MapStatus sizes: 14 MB +Compressed Serialized MapStatus sizes: 427 bytes +Compressed Serialized Broadcast MapStatus sizes: 13 MB OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 586 588 3 0.3 2928.8 1.0X -Deserialization 929 933 4 0.2 4647.0 0.6X +Serialization 317 321 4 0.6 1583.8 1.0X +Deserialization 530 540 15 0.4 2648.3 0.6X -Compressed Serialized MapStatus sizes: 14 MB +Compressed Serialized MapStatus sizes: 13 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -46,21 +46,21 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 4740 4916 249 0.0 23698.5 1.0X -Deserialization 1578 1597 27 0.1 7890.6 3.0X +Serialization 1738 1849 156 0.1 8692.0 1.0X +Deserialization 946 977 33 0.2 4730.2 1.8X -Compressed Serialized MapStatus sizes: 546 bytes -Compressed Serialized Broadcast MapStatus sizes: 123 MB +Compressed Serialized MapStatus sizes: 556 bytes +Compressed Serialized Broadcast MapStatus sizes: 121 MB OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 4492 4573 115 0.0 22458.3 1.0X -Deserialization 1533 1547 20 0.1 7664.8 2.9X +Serialization 1379 1432 76 0.1 6892.6 1.0X +Deserialization 929 941 19 0.2 4645.5 1.5X -Compressed Serialized MapStatus sizes: 123 MB +Compressed Serialized MapStatus sizes: 121 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt index 0c649694f6b6e..053f4bf771923 100644 --- a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt +++ b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt @@ -2,10 +2,10 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15. Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 236 245 18 0.8 1179.1 1.0X -Deserialization 842 885 37 0.2 4211.4 0.3X +Serialization 178 187 15 1.1 887.5 1.0X +Deserialization 530 558 32 0.4 2647.5 0.3X -Compressed Serialized MapStatus sizes: 400 bytes +Compressed Serialized MapStatus sizes: 411 bytes Compressed Serialized Broadcast MapStatus sizes: 2 MB @@ -13,8 +13,8 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15. Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 213 219 8 0.9 1065.1 1.0X -Deserialization 846 870 33 0.2 4228.6 0.3X +Serialization 167 175 7 1.2 835.7 1.0X +Deserialization 523 537 22 0.4 2616.2 0.3X Compressed Serialized MapStatus sizes: 2 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -24,21 +24,21 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15. Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 624 709 167 0.3 3121.1 1.0X -Deserialization 885 908 22 0.2 4427.0 0.7X +Serialization 351 416 147 0.6 1754.4 1.0X +Deserialization 546 551 8 0.4 2727.6 0.6X -Compressed Serialized MapStatus sizes: 418 bytes -Compressed Serialized Broadcast MapStatus sizes: 14 MB +Compressed Serialized MapStatus sizes: 427 bytes +Compressed Serialized Broadcast MapStatus sizes: 13 MB OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 603 604 2 0.3 3014.9 1.0X -Deserialization 892 895 5 0.2 4458.7 0.7X +Serialization 320 321 1 0.6 1598.0 1.0X +Deserialization 542 549 7 0.4 2709.0 0.6X -Compressed Serialized MapStatus sizes: 14 MB +Compressed Serialized MapStatus sizes: 13 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -46,21 +46,21 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15. Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 4612 4945 471 0.0 23061.0 1.0X -Deserialization 1493 1495 2 0.1 7466.3 3.1X +Serialization 1671 1877 290 0.1 8357.3 1.0X +Deserialization 943 970 32 0.2 4715.8 1.8X -Compressed Serialized MapStatus sizes: 546 bytes -Compressed Serialized Broadcast MapStatus sizes: 123 MB +Compressed Serialized MapStatus sizes: 556 bytes +Compressed Serialized Broadcast MapStatus sizes: 121 MB OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 4452 4595 202 0.0 22261.4 1.0X -Deserialization 1464 1477 18 0.1 7321.4 3.0X +Serialization 1373 1436 89 0.1 6865.0 1.0X +Deserialization 940 970 37 0.2 4699.1 1.5X -Compressed Serialized MapStatus sizes: 123 MB +Compressed Serialized MapStatus sizes: 121 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 6f4a6239a09ed..873efa76468ed 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -28,13 +28,12 @@ import scala.concurrent.duration.Duration import scala.reflect.ClassTag import scala.util.control.NonFatal -import com.github.luben.zstd.ZstdInputStream -import com.github.luben.zstd.ZstdOutputStream import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOutputStream} import org.apache.spark.broadcast.{Broadcast, BroadcastManager} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, MapStatus} import org.apache.spark.shuffle.MetadataFetchFailedException @@ -195,7 +194,8 @@ private class ShuffleStatus(numPartitions: Int) { def serializedMapStatus( broadcastManager: BroadcastManager, isLocal: Boolean, - minBroadcastSize: Int): Array[Byte] = { + minBroadcastSize: Int, + conf: SparkConf): Array[Byte] = { var result: Array[Byte] = null withReadLock { @@ -207,7 +207,7 @@ private class ShuffleStatus(numPartitions: Int) { if (result == null) withWriteLock { if (cachedSerializedMapStatus == null) { val serResult = MapOutputTracker.serializeMapStatuses( - mapStatuses, broadcastManager, isLocal, minBroadcastSize) + mapStatuses, broadcastManager, isLocal, minBroadcastSize, conf) cachedSerializedMapStatus = serResult._1 cachedSerializedBroadcast = serResult._2 } @@ -450,7 +450,8 @@ private[spark] class MapOutputTrackerMaster( " to " + hostPort) val shuffleStatus = shuffleStatuses.get(shuffleId).head context.reply( - shuffleStatus.serializedMapStatus(broadcastManager, isLocal, minSizeForBroadcast)) + shuffleStatus.serializedMapStatus(broadcastManager, isLocal, minSizeForBroadcast, + conf)) } catch { case NonFatal(e) => logError(e.getMessage, e) } @@ -799,7 +800,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr endPartition: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") - val statuses = getStatuses(shuffleId) + val statuses = getStatuses(shuffleId, conf) try { MapOutputTracker.convertMapStatuses( shuffleId, startPartition, endPartition, statuses) @@ -818,7 +819,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, mapIndex $mapIndex" + s"partitions $startPartition-$endPartition") - val statuses = getStatuses(shuffleId) + val statuses = getStatuses(shuffleId, conf) try { MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses, Some(mapIndex)) @@ -836,7 +837,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr * * (It would be nice to remove this restriction in the future.) */ - private def getStatuses(shuffleId: Int): Array[MapStatus] = { + private def getStatuses(shuffleId: Int, conf: SparkConf): Array[MapStatus] = { val statuses = mapStatuses.get(shuffleId).orNull if (statuses == null) { logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them") @@ -846,7 +847,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr if (fetchedStatuses == null) { logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint) val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId)) - fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes) + fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf) logInfo("Got the output locations") mapStatuses.put(shuffleId, fetchedStatuses) } @@ -890,16 +891,20 @@ private[spark] object MapOutputTracker extends Logging { // Serialize an array of map output locations into an efficient byte format so that we can send // it to reduce tasks. We do this by compressing the serialized bytes using Zstd. They will // generally be pretty compressible because many map outputs will be on the same hostname. - def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager, - isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = { + def serializeMapStatuses( + statuses: Array[MapStatus], + broadcastManager: BroadcastManager, + isLocal: Boolean, + minBroadcastSize: Int, + conf: SparkConf): (Array[Byte], Broadcast[Array[Byte]]) = { // Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of the standard one // This implementation doesn't reallocate the whole memory block but allocates // additional buffers. This way no buffers need to be garbage collected and // the contents don't have to be copied to the new buffer. val out = new ApacheByteArrayOutputStream() - val compressedOut = new ApacheByteArrayOutputStream() - - val objOut = new ObjectOutputStream(out) + out.write(DIRECT) + val codec = CompressionCodec.createCodec(conf, "zstd") + val objOut = new ObjectOutputStream(codec.compressedOutputStream(out)) Utils.tryWithSafeFinally { // Since statuses can be modified in parallel, sync on it statuses.synchronized { @@ -908,42 +913,21 @@ private[spark] object MapOutputTracker extends Logging { } { objOut.close() } - - val arr: Array[Byte] = { - val zos = new ZstdOutputStream(compressedOut) - Utils.tryWithSafeFinally { - compressedOut.write(DIRECT) - // `out.writeTo(zos)` will write the uncompressed data from `out` to `zos` - // without copying to avoid unnecessary allocation and copy of byte[]. - out.writeTo(zos) - } { - zos.close() - } - compressedOut.toByteArray - } + val arr = out.toByteArray if (arr.length >= minBroadcastSize) { // Use broadcast instead. // Important arr(0) is the tag == DIRECT, ignore that while deserializing ! val bcast = broadcastManager.newBroadcast(arr, isLocal) // toByteArray creates copy, so we can reuse out out.reset() - val oos = new ObjectOutputStream(out) + out.write(BROADCAST) + val oos = new ObjectOutputStream(codec.compressedOutputStream(out)) Utils.tryWithSafeFinally { oos.writeObject(bcast) } { oos.close() } - val outArr = { - compressedOut.reset() - val zos = new ZstdOutputStream(compressedOut) - Utils.tryWithSafeFinally { - compressedOut.write(BROADCAST) - out.writeTo(zos) - } { - zos.close() - } - compressedOut.toByteArray - } + val outArr = out.toByteArray logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length) (outArr, bcast) } else { @@ -952,11 +936,15 @@ private[spark] object MapOutputTracker extends Logging { } // Opposite of serializeMapStatuses. - def deserializeMapStatuses(bytes: Array[Byte]): Array[MapStatus] = { + def deserializeMapStatuses(bytes: Array[Byte], conf: SparkConf): Array[MapStatus] = { assert (bytes.length > 0) def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = { - val objIn = new ObjectInputStream(new ZstdInputStream( + val codec = CompressionCodec.createCodec(conf, "zstd") + // The ZStd codec is wrapped in a `BufferedInputStream` which avoids overhead excessive + // of JNI call while trying to decompress small amount of data for each element + // of `MapStatuses` + val objIn = new ObjectInputStream(codec.compressedInputStream( new ByteArrayInputStream(arr, off, len))) Utils.tryWithSafeFinally { objIn.readObject() diff --git a/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala b/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala index 53afe141981f4..5dbef88e73a9e 100644 --- a/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala @@ -67,19 +67,20 @@ object MapStatusesSerDeserBenchmark extends BenchmarkBase { var serializedBroadcastSizes = 0 val (serializedMapStatus, serializedBroadcast) = MapOutputTracker.serializeMapStatuses( - shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize) + shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize, + sc.getConf) serializedMapStatusSizes = serializedMapStatus.length if (serializedBroadcast != null) { serializedBroadcastSizes = serializedBroadcast.value.length } benchmark.addCase("Serialization") { _ => - MapOutputTracker.serializeMapStatuses( - shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize) + MapOutputTracker.serializeMapStatuses(shuffleStatus.mapStatuses, tracker.broadcastManager, + tracker.isLocal, minBroadcastSize, sc.getConf) } benchmark.addCase("Deserialization") { _ => - val result = MapOutputTracker.deserializeMapStatuses(serializedMapStatus) + val result = MapOutputTracker.deserializeMapStatuses(serializedMapStatus, sc.getConf) assert(result.length == numMaps) }