Skip to content

Commit 8c0e961

Browse files
[SPARK-29081][CORE] Replace calls to SerializationUtils.clone on properties with a faster implementation
Replace use of `SerializationUtils.clone` with new `Utils.cloneProperties` method Add benchmark + results showing dramatic speed up for effectively equivalent functionality. ### What changes were proposed in this pull request? While I am not sure that SerializationUtils.clone is a performance issue in production, I am sure that it is overkill for the task it is doing (providing a distinct copy of a `Properties` object). This PR provides a benchmark showing the dramatic improvement over the clone operation and replaces uses of `SerializationUtils.clone` on `Properties` with the more specialized `Utils.cloneProperties`. ### Does this PR introduce any user-facing change? Strings are immutable so there is no reason to serialize and deserialize them, it just creates extra garbage. The only functionality that would be changed is the unsupported insertion of non-String objects into the spark local properties. ### How was this patch tested? 1. Pass the Jenkins with the existing tests. 2. Since this is a performance improvement PR, manually run the benchmark. Closes #25787 from databricks-david-lewis/SPARK-29081. Authored-by: David Lewis <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 956f6e9 commit 8c0e961

File tree

9 files changed

+134
-18
lines changed

9 files changed

+134
-18
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
================================================================================================
2+
Properties Cloning
3+
================================================================================================
4+
5+
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.14.6
6+
Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz
7+
Empty Properties: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
8+
------------------------------------------------------------------------------------------------------------------------
9+
SerializationUtils.clone 0 0 0 0.2 4184.0 1.0X
10+
Utils.cloneProperties 0 0 0 55.6 18.0 232.4X
11+
12+
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.14.6
13+
Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz
14+
System Properties: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
15+
------------------------------------------------------------------------------------------------------------------------
16+
SerializationUtils.clone 0 0 0 0.0 107612.0 1.0X
17+
Utils.cloneProperties 0 0 0 1.0 962.0 111.9X
18+
19+
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.14.6
20+
Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz
21+
Small Properties: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
22+
------------------------------------------------------------------------------------------------------------------------
23+
SerializationUtils.clone 0 0 0 0.0 330210.0 1.0X
24+
Utils.cloneProperties 0 0 0 0.9 1082.0 305.2X
25+
26+
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.14.6
27+
Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz
28+
Medium Properties: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
29+
------------------------------------------------------------------------------------------------------------------------
30+
SerializationUtils.clone 1 2 0 0.0 1336301.0 1.0X
31+
Utils.cloneProperties 0 0 0 0.2 5456.0 244.9X
32+
33+
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.14.6
34+
Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz
35+
Large Properties: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
36+
------------------------------------------------------------------------------------------------------------------------
37+
SerializationUtils.clone 3 3 0 0.0 2634336.0 1.0X
38+
Utils.cloneProperties 0 0 0 0.1 10822.0 243.4X
39+
40+

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import scala.reflect.{classTag, ClassTag}
3131
import scala.util.control.NonFatal
3232

3333
import com.google.common.collect.MapMaker
34-
import org.apache.commons.lang3.SerializationUtils
3534
import org.apache.hadoop.conf.Configuration
3635
import org.apache.hadoop.fs.{FileSystem, Path}
3736
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
@@ -346,7 +345,7 @@ class SparkContext(config: SparkConf) extends Logging {
346345
override protected def childValue(parent: Properties): Properties = {
347346
// Note: make a clone such that changes in the parent properties aren't reflected in
348347
// the those of the children threads, which has confusing semantics (SPARK-10563).
349-
SerializationUtils.clone(parent)
348+
Utils.cloneProperties(parent)
350349
}
351350
override protected def initialValue(): Properties = new Properties()
352351
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
2929
import scala.concurrent.duration._
3030
import scala.util.control.NonFatal
3131

32-
import org.apache.commons.lang3.SerializationUtils
33-
3432
import org.apache.spark._
3533
import org.apache.spark.broadcast.Broadcast
3634
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
@@ -698,7 +696,7 @@ private[spark] class DAGScheduler(
698696
if (partitions.isEmpty) {
699697
val time = clock.getTimeMillis()
700698
listenerBus.post(
701-
SparkListenerJobStart(jobId, time, Seq[StageInfo](), SerializationUtils.clone(properties)))
699+
SparkListenerJobStart(jobId, time, Seq[StageInfo](), Utils.cloneProperties(properties)))
702700
listenerBus.post(
703701
SparkListenerJobEnd(jobId, time, JobSucceeded))
704702
// Return immediately if the job is running 0 tasks
@@ -710,7 +708,7 @@ private[spark] class DAGScheduler(
710708
val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
711709
eventProcessLoop.post(JobSubmitted(
712710
jobId, rdd, func2, partitions.toArray, callSite, waiter,
713-
SerializationUtils.clone(properties)))
711+
Utils.cloneProperties(properties)))
714712
waiter
715713
}
716714

@@ -782,7 +780,7 @@ private[spark] class DAGScheduler(
782780
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
783781
eventProcessLoop.post(JobSubmitted(
784782
jobId, rdd, func2, rdd.partitions.indices.toArray, callSite, listener,
785-
SerializationUtils.clone(properties)))
783+
Utils.cloneProperties(properties)))
786784
listener.awaitResult() // Will throw an exception if the job fails
787785
}
788786

@@ -819,7 +817,7 @@ private[spark] class DAGScheduler(
819817
this, jobId, 1,
820818
(_: Int, r: MapOutputStatistics) => callback(r))
821819
eventProcessLoop.post(MapStageSubmitted(
822-
jobId, dependency, callSite, waiter, SerializationUtils.clone(properties)))
820+
jobId, dependency, callSite, waiter, Utils.cloneProperties(properties)))
823821
waiter
824822
}
825823

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2950,6 +2950,13 @@ private[spark] object Utils extends Logging {
29502950
val codec = codecFactory.getCodec(path)
29512951
codec == null || codec.isInstanceOf[SplittableCompressionCodec]
29522952
}
2953+
2954+
/** Create a new properties object with the same values as `props` */
2955+
def cloneProperties(props: Properties): Properties = {
2956+
val resultProps = new Properties()
2957+
props.forEach((k, v) => resultProps.put(k, v))
2958+
resultProps
2959+
}
29532960
}
29542961

29552962
private[util] object CallerContext extends Logging {

core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,14 @@ private[spark] class Benchmark(
141141
val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters
142142
val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos
143143
val runTimes = ArrayBuffer[Long]()
144+
var totalTime = 0L
144145
var i = 0
145-
while (i < minIters || runTimes.sum < minDuration) {
146+
while (i < minIters || totalTime < minDuration) {
146147
val timer = new Benchmark.Timer(i)
147148
f(timer)
148149
val runTime = timer.totalTime()
149150
runTimes += runTime
151+
totalTime += runTime
150152

151153
if (outputPerIteration) {
152154
// scalastyle:off
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.util
18+
19+
import java.util.Properties
20+
21+
import scala.util.Random
22+
23+
import org.apache.commons.lang.SerializationUtils
24+
25+
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
26+
27+
28+
/**
29+
* Benchmark for Kryo Unsafe vs safe Serialization.
30+
* To run this benchmark:
31+
* {{{
32+
* 1. without sbt:
33+
* bin/spark-submit --class <this class> --jars <spark core test jar>
34+
* 2. build/sbt "core/test:runMain <this class>"
35+
* 3. generate result:
36+
* SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this class>"
37+
* Results will be written to "benchmarks/PropertiesCloneBenchmark-results.txt".
38+
* }}}
39+
*/
40+
object PropertiesCloneBenchmark extends BenchmarkBase {
41+
/**
42+
* Benchmark various cases of cloning properties objects
43+
*/
44+
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
45+
runBenchmark("Properties Cloning") {
46+
def compareSerialization(name: String, props: Properties): Unit = {
47+
val benchmark = new Benchmark(name, 1, output = output)
48+
benchmark.addCase("SerializationUtils.clone") { _ =>
49+
SerializationUtils.clone(props)
50+
}
51+
benchmark.addCase("Utils.cloneProperties") { _ =>
52+
Utils.cloneProperties(props)
53+
}
54+
benchmark.run()
55+
}
56+
compareSerialization("Empty Properties", new Properties)
57+
compareSerialization("System Properties", System.getProperties)
58+
compareSerialization("Small Properties", makeRandomProps(10, 40, 100))
59+
compareSerialization("Medium Properties", makeRandomProps(50, 40, 100))
60+
compareSerialization("Large Properties", makeRandomProps(100, 40, 100))
61+
}
62+
}
63+
64+
def makeRandomProps(numProperties: Int, keySize: Int, valueSize: Int): Properties = {
65+
val props = new Properties
66+
for (_ <- 1 to numProperties) {
67+
props.put(
68+
Random.alphanumeric.take(keySize),
69+
Random.alphanumeric.take(valueSize)
70+
)
71+
}
72+
props
73+
}
74+
}

core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.util
1919

2020
import java.util.Properties
2121

22-
import org.apache.commons.lang3.SerializationUtils
2322
import org.scalatest.{BeforeAndAfterEach, Suite}
2423

2524
/**
@@ -43,11 +42,11 @@ private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Su
4342
var oldProperties: Properties = null
4443

4544
override def beforeEach(): Unit = {
46-
// we need SerializationUtils.clone instead of `new Properties(System.getProperties())` because
45+
// we need Utils.cloneProperties instead of `new Properties(System.getProperties())` because
4746
// the later way of creating a copy does not copy the properties but it initializes a new
4847
// Properties object with the given properties as defaults. They are not recognized at all
4948
// by standard Scala wrapper over Java Properties then.
50-
oldProperties = SerializationUtils.clone(System.getProperties)
49+
oldProperties = Utils.cloneProperties(System.getProperties)
5150
super.beforeEach()
5251
}
5352

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import scala.collection.mutable.Queue
2626
import scala.reflect.ClassTag
2727
import scala.util.control.NonFatal
2828

29-
import org.apache.commons.lang3.SerializationUtils
3029
import org.apache.hadoop.conf.Configuration
3130
import org.apache.hadoop.fs.Path
3231
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
@@ -586,7 +585,7 @@ class StreamingContext private[streaming] (
586585
sparkContext.setCallSite(startSite.get)
587586
sparkContext.clearJobGroup()
588587
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
589-
savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
588+
savedProperties.set(Utils.cloneProperties(sparkContext.localProperties.get()))
590589
scheduler.start()
591590
}
592591
state = StreamingContextState.ACTIVE

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,14 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2222
import scala.collection.JavaConverters._
2323
import scala.util.Failure
2424

25-
import org.apache.commons.lang3.SerializationUtils
26-
2725
import org.apache.spark.ExecutorAllocationClient
2826
import org.apache.spark.internal.Logging
2927
import org.apache.spark.internal.io.SparkHadoopWriterUtils
3028
import org.apache.spark.rdd.RDD
3129
import org.apache.spark.streaming._
3230
import org.apache.spark.streaming.api.python.PythonDStream
3331
import org.apache.spark.streaming.ui.UIUtils
34-
import org.apache.spark.util.{EventLoop, ThreadUtils}
32+
import org.apache.spark.util.{EventLoop, ThreadUtils, Utils}
3533

3634

3735
private[scheduler] sealed trait JobSchedulerEvent
@@ -231,7 +229,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
231229
def run() {
232230
val oldProps = ssc.sparkContext.getLocalProperties
233231
try {
234-
ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
232+
ssc.sparkContext.setLocalProperties(Utils.cloneProperties(ssc.savedProperties.get()))
235233
val formattedTime = UIUtils.formatBatchTime(
236234
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
237235
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"

0 commit comments

Comments
 (0)