From fcf64b2e28898eebd0124cb10cd1befa4d7ed9e3 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Wed, 23 Oct 2019 14:18:05 -0700 Subject: [PATCH 1/5] Use Spark's CompressionCodec and optimize deserializeMapStatuses --- .../org/apache/spark/MapOutputTracker.scala | 82 ++++++++----------- .../spark/MapStatusesSerDeserBenchmark.scala | 9 +- 2 files changed, 39 insertions(+), 52 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 6f4a6239a09ed..aff46c343891d 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -17,28 +17,28 @@ package org.apache.spark -import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream} -import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} +import java.io.{ObjectOutputStream, ByteArrayInputStream, ObjectInputStream} +import java.util.concurrent.{ConcurrentHashMap, TimeUnit, LinkedBlockingQueue, ThreadPoolExecutor} import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.JavaConverters._ -import scala.collection.mutable.{HashMap, ListBuffer, Map} -import scala.concurrent.{ExecutionContext, Future} +import scala.collection.mutable.{HashMap, Map, ListBuffer} +import scala.concurrent.{Future, ExecutionContext} import scala.concurrent.duration.Duration import scala.reflect.ClassTag import scala.util.control.NonFatal - import com.github.luben.zstd.ZstdInputStream import com.github.luben.zstd.ZstdOutputStream import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOutputStream} - import org.apache.spark.broadcast.{Broadcast, BroadcastManager} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} -import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, MapStatus} +import org.apache.spark.io.CompressionCodec +import org.apache.spark.io.ZStdCompressionCodec +import org.apache.spark.rpc.{RpcEndpointRef, RpcCallContext, RpcEndpoint, RpcEnv} +import org.apache.spark.scheduler.{MapStatus, ExecutorCacheTaskLocation} import org.apache.spark.shuffle.MetadataFetchFailedException -import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId} +import org.apache.spark.storage.{ShuffleBlockId, BlockManagerId, BlockId} import org.apache.spark.util._ /** @@ -195,7 +195,8 @@ private class ShuffleStatus(numPartitions: Int) { def serializedMapStatus( broadcastManager: BroadcastManager, isLocal: Boolean, - minBroadcastSize: Int): Array[Byte] = { + minBroadcastSize: Int, + conf: SparkConf): Array[Byte] = { var result: Array[Byte] = null withReadLock { @@ -207,7 +208,7 @@ private class ShuffleStatus(numPartitions: Int) { if (result == null) withWriteLock { if (cachedSerializedMapStatus == null) { val serResult = MapOutputTracker.serializeMapStatuses( - mapStatuses, broadcastManager, isLocal, minBroadcastSize) + mapStatuses, broadcastManager, isLocal, minBroadcastSize, conf) cachedSerializedMapStatus = serResult._1 cachedSerializedBroadcast = serResult._2 } @@ -450,7 +451,8 @@ private[spark] class MapOutputTrackerMaster( " to " + hostPort) val shuffleStatus = shuffleStatuses.get(shuffleId).head context.reply( - shuffleStatus.serializedMapStatus(broadcastManager, isLocal, minSizeForBroadcast)) + shuffleStatus.serializedMapStatus(broadcastManager, isLocal, minSizeForBroadcast, + conf)) } catch { case NonFatal(e) => logError(e.getMessage, e) } @@ -799,7 +801,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr endPartition: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") - val statuses = getStatuses(shuffleId) + val statuses = getStatuses(shuffleId, conf) try { MapOutputTracker.convertMapStatuses( shuffleId, startPartition, endPartition, statuses) @@ -818,7 +820,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, mapIndex $mapIndex" + s"partitions $startPartition-$endPartition") - val statuses = getStatuses(shuffleId) + val statuses = getStatuses(shuffleId, conf) try { MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses, Some(mapIndex)) @@ -836,7 +838,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr * * (It would be nice to remove this restriction in the future.) */ - private def getStatuses(shuffleId: Int): Array[MapStatus] = { + private def getStatuses(shuffleId: Int, conf: SparkConf): Array[MapStatus] = { val statuses = mapStatuses.get(shuffleId).orNull if (statuses == null) { logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them") @@ -846,7 +848,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr if (fetchedStatuses == null) { logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint) val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId)) - fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes) + fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf) logInfo("Got the output locations") mapStatuses.put(shuffleId, fetchedStatuses) } @@ -890,16 +892,20 @@ private[spark] object MapOutputTracker extends Logging { // Serialize an array of map output locations into an efficient byte format so that we can send // it to reduce tasks. We do this by compressing the serialized bytes using Zstd. They will // generally be pretty compressible because many map outputs will be on the same hostname. - def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager, - isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = { + def serializeMapStatuses( + statuses: Array[MapStatus], + broadcastManager: BroadcastManager, + isLocal: Boolean, + minBroadcastSize: Int, + conf: SparkConf): (Array[Byte], Broadcast[Array[Byte]]) = { // Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of the standard one // This implementation doesn't reallocate the whole memory block but allocates // additional buffers. This way no buffers need to be garbage collected and // the contents don't have to be copied to the new buffer. val out = new ApacheByteArrayOutputStream() - val compressedOut = new ApacheByteArrayOutputStream() - - val objOut = new ObjectOutputStream(out) + out.write(DIRECT) + val codec = CompressionCodec.createCodec(conf, "zstd") + val objOut = new ObjectOutputStream(codec.compressedOutputStream(out)) Utils.tryWithSafeFinally { // Since statuses can be modified in parallel, sync on it statuses.synchronized { @@ -908,42 +914,21 @@ private[spark] object MapOutputTracker extends Logging { } { objOut.close() } - - val arr: Array[Byte] = { - val zos = new ZstdOutputStream(compressedOut) - Utils.tryWithSafeFinally { - compressedOut.write(DIRECT) - // `out.writeTo(zos)` will write the uncompressed data from `out` to `zos` - // without copying to avoid unnecessary allocation and copy of byte[]. - out.writeTo(zos) - } { - zos.close() - } - compressedOut.toByteArray - } + val arr = out.toByteArray if (arr.length >= minBroadcastSize) { // Use broadcast instead. // Important arr(0) is the tag == DIRECT, ignore that while deserializing ! val bcast = broadcastManager.newBroadcast(arr, isLocal) // toByteArray creates copy, so we can reuse out out.reset() - val oos = new ObjectOutputStream(out) + out.write(BROADCAST) + val oos = new ObjectOutputStream(codec.compressedOutputStream(out)) Utils.tryWithSafeFinally { oos.writeObject(bcast) } { oos.close() } - val outArr = { - compressedOut.reset() - val zos = new ZstdOutputStream(compressedOut) - Utils.tryWithSafeFinally { - compressedOut.write(BROADCAST) - out.writeTo(zos) - } { - zos.close() - } - compressedOut.toByteArray - } + val outArr = out.toByteArray logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length) (outArr, bcast) } else { @@ -952,11 +937,12 @@ private[spark] object MapOutputTracker extends Logging { } // Opposite of serializeMapStatuses. - def deserializeMapStatuses(bytes: Array[Byte]): Array[MapStatus] = { + def deserializeMapStatuses(bytes: Array[Byte], conf: SparkConf): Array[MapStatus] = { assert (bytes.length > 0) def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = { - val objIn = new ObjectInputStream(new ZstdInputStream( + val codec = CompressionCodec.createCodec(conf, "zstd") + val objIn = new ObjectInputStream(codec.compressedInputStream( new ByteArrayInputStream(arr, off, len))) Utils.tryWithSafeFinally { objIn.readObject() diff --git a/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala b/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala index 53afe141981f4..5dbef88e73a9e 100644 --- a/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala @@ -67,19 +67,20 @@ object MapStatusesSerDeserBenchmark extends BenchmarkBase { var serializedBroadcastSizes = 0 val (serializedMapStatus, serializedBroadcast) = MapOutputTracker.serializeMapStatuses( - shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize) + shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize, + sc.getConf) serializedMapStatusSizes = serializedMapStatus.length if (serializedBroadcast != null) { serializedBroadcastSizes = serializedBroadcast.value.length } benchmark.addCase("Serialization") { _ => - MapOutputTracker.serializeMapStatuses( - shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize) + MapOutputTracker.serializeMapStatuses(shuffleStatus.mapStatuses, tracker.broadcastManager, + tracker.isLocal, minBroadcastSize, sc.getConf) } benchmark.addCase("Deserialization") { _ => - val result = MapOutputTracker.deserializeMapStatuses(serializedMapStatus) + val result = MapOutputTracker.deserializeMapStatuses(serializedMapStatus, sc.getConf) assert(result.length == numMaps) } From da276aa6f9ca4ea40f425165d8c56467599cf965 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Wed, 23 Oct 2019 14:22:16 -0700 Subject: [PATCH 2/5] fix import --- .../org/apache/spark/MapOutputTracker.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index aff46c343891d..f7e740c0eed8c 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -17,28 +17,27 @@ package org.apache.spark -import java.io.{ObjectOutputStream, ByteArrayInputStream, ObjectInputStream} -import java.util.concurrent.{ConcurrentHashMap, TimeUnit, LinkedBlockingQueue, ThreadPoolExecutor} +import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream} +import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.JavaConverters._ -import scala.collection.mutable.{HashMap, Map, ListBuffer} -import scala.concurrent.{Future, ExecutionContext} +import scala.collection.mutable.{HashMap, ListBuffer, Map} +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.reflect.ClassTag import scala.util.control.NonFatal -import com.github.luben.zstd.ZstdInputStream -import com.github.luben.zstd.ZstdOutputStream + import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOutputStream} + import org.apache.spark.broadcast.{Broadcast, BroadcastManager} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.io.CompressionCodec -import org.apache.spark.io.ZStdCompressionCodec -import org.apache.spark.rpc.{RpcEndpointRef, RpcCallContext, RpcEndpoint, RpcEnv} -import org.apache.spark.scheduler.{MapStatus, ExecutorCacheTaskLocation} +import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} +import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, MapStatus} import org.apache.spark.shuffle.MetadataFetchFailedException -import org.apache.spark.storage.{ShuffleBlockId, BlockManagerId, BlockId} +import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId} import org.apache.spark.util._ /** From d0b1f4c2003c495d4db8bf35163a3989dcfa8d1a Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Wed, 23 Oct 2019 14:53:34 -0700 Subject: [PATCH 3/5] add comment --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index f7e740c0eed8c..873efa76468ed 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -941,6 +941,9 @@ private[spark] object MapOutputTracker extends Logging { def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = { val codec = CompressionCodec.createCodec(conf, "zstd") + // The ZStd codec is wrapped in a `BufferedInputStream` which avoids overhead excessive + // of JNI call while trying to decompress small amount of data for each element + // of `MapStatuses` val objIn = new ObjectInputStream(codec.compressedInputStream( new ByteArrayInputStream(arr, off, len))) Utils.tryWithSafeFinally { From 013e7b2bdda24e81b1c69486f5fe71e8e520ebc4 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 23 Oct 2019 22:12:06 +0000 Subject: [PATCH 4/5] Add JDK11 --- ...tatusesSerDeserBenchmark-jdk11-results.txt | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt index 7a6cfb7b23b94..db23cf5c12ea7 100644 --- a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt +++ b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt @@ -2,10 +2,10 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 205 213 13 1.0 1023.6 1.0X -Deserialization 908 939 27 0.2 4540.2 0.2X +Serialization 170 178 9 1.2 849.7 1.0X +Deserialization 530 535 9 0.4 2651.1 0.3X -Compressed Serialized MapStatus sizes: 400 bytes +Compressed Serialized MapStatus sizes: 411 bytes Compressed Serialized Broadcast MapStatus sizes: 2 MB @@ -13,8 +13,8 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 195 204 24 1.0 976.9 1.0X -Deserialization 913 940 33 0.2 4566.7 0.2X +Serialization 157 165 7 1.3 785.4 1.0X +Deserialization 495 588 79 0.4 2476.7 0.3X Compressed Serialized MapStatus sizes: 2 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -24,21 +24,21 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 616 619 3 0.3 3079.1 1.0X -Deserialization 936 954 22 0.2 4680.5 0.7X +Serialization 344 351 4 0.6 1720.4 1.0X +Deserialization 527 579 99 0.4 2635.9 0.7X -Compressed Serialized MapStatus sizes: 418 bytes -Compressed Serialized Broadcast MapStatus sizes: 14 MB +Compressed Serialized MapStatus sizes: 427 bytes +Compressed Serialized Broadcast MapStatus sizes: 13 MB OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 586 588 3 0.3 2928.8 1.0X -Deserialization 929 933 4 0.2 4647.0 0.6X +Serialization 317 321 4 0.6 1583.8 1.0X +Deserialization 530 540 15 0.4 2648.3 0.6X -Compressed Serialized MapStatus sizes: 14 MB +Compressed Serialized MapStatus sizes: 13 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -46,21 +46,21 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 4740 4916 249 0.0 23698.5 1.0X -Deserialization 1578 1597 27 0.1 7890.6 3.0X +Serialization 1738 1849 156 0.1 8692.0 1.0X +Deserialization 946 977 33 0.2 4730.2 1.8X -Compressed Serialized MapStatus sizes: 546 bytes -Compressed Serialized Broadcast MapStatus sizes: 123 MB +Compressed Serialized MapStatus sizes: 556 bytes +Compressed Serialized Broadcast MapStatus sizes: 121 MB OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 4492 4573 115 0.0 22458.3 1.0X -Deserialization 1533 1547 20 0.1 7664.8 2.9X +Serialization 1379 1432 76 0.1 6892.6 1.0X +Deserialization 929 941 19 0.2 4645.5 1.5X -Compressed Serialized MapStatus sizes: 123 MB +Compressed Serialized MapStatus sizes: 121 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes From 6212855b8bf707ab6fcc9b696a3192ecf92934fb Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 23 Oct 2019 22:47:02 +0000 Subject: [PATCH 5/5] Add JDK8 --- .../MapStatusesSerDeserBenchmark-results.txt | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt index 0c649694f6b6e..053f4bf771923 100644 --- a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt +++ b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt @@ -2,10 +2,10 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15. Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 236 245 18 0.8 1179.1 1.0X -Deserialization 842 885 37 0.2 4211.4 0.3X +Serialization 178 187 15 1.1 887.5 1.0X +Deserialization 530 558 32 0.4 2647.5 0.3X -Compressed Serialized MapStatus sizes: 400 bytes +Compressed Serialized MapStatus sizes: 411 bytes Compressed Serialized Broadcast MapStatus sizes: 2 MB @@ -13,8 +13,8 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15. Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 213 219 8 0.9 1065.1 1.0X -Deserialization 846 870 33 0.2 4228.6 0.3X +Serialization 167 175 7 1.2 835.7 1.0X +Deserialization 523 537 22 0.4 2616.2 0.3X Compressed Serialized MapStatus sizes: 2 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -24,21 +24,21 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15. Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 624 709 167 0.3 3121.1 1.0X -Deserialization 885 908 22 0.2 4427.0 0.7X +Serialization 351 416 147 0.6 1754.4 1.0X +Deserialization 546 551 8 0.4 2727.6 0.6X -Compressed Serialized MapStatus sizes: 418 bytes -Compressed Serialized Broadcast MapStatus sizes: 14 MB +Compressed Serialized MapStatus sizes: 427 bytes +Compressed Serialized Broadcast MapStatus sizes: 13 MB OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 603 604 2 0.3 3014.9 1.0X -Deserialization 892 895 5 0.2 4458.7 0.7X +Serialization 320 321 1 0.6 1598.0 1.0X +Deserialization 542 549 7 0.4 2709.0 0.6X -Compressed Serialized MapStatus sizes: 14 MB +Compressed Serialized MapStatus sizes: 13 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -46,21 +46,21 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15. Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 4612 4945 471 0.0 23061.0 1.0X -Deserialization 1493 1495 2 0.1 7466.3 3.1X +Serialization 1671 1877 290 0.1 8357.3 1.0X +Deserialization 943 970 32 0.2 4715.8 1.8X -Compressed Serialized MapStatus sizes: 546 bytes -Compressed Serialized Broadcast MapStatus sizes: 123 MB +Compressed Serialized MapStatus sizes: 556 bytes +Compressed Serialized Broadcast MapStatus sizes: 121 MB OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 4452 4595 202 0.0 22261.4 1.0X -Deserialization 1464 1477 18 0.1 7321.4 3.0X +Serialization 1373 1436 89 0.1 6865.0 1.0X +Deserialization 940 970 37 0.2 4699.1 1.5X -Compressed Serialized MapStatus sizes: 123 MB +Compressed Serialized MapStatus sizes: 121 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes