Skip to content

Commit efdaeb1

Browse files
ianocmarmbrus
authored andcommitted
[SPARK-2102][SQL][CORE] Add option for kryo registration required and use a resource pool in Spark SQL for Kryo instances.
Author: Ian O Connell <[email protected]> Closes apache#1377 from ianoc/feature/SPARK-2102 and squashes the following commits: 5498566 [Ian O Connell] Docs update suggested by Patrick 20e8555 [Ian O Connell] Slight style change f92c294 [Ian O Connell] Add docs for new KryoSerializer option f3735c8 [Ian O Connell] Add using a kryo resource pool for the SqlSerializer 4e5c342 [Ian O Connell] Register the SparkConf for kryo, it gets swept into serialization 665805a [Ian O Connell] Add a spark.kryo.registrationRequired option for configuring the Kryo Serializer
1 parent 1871574 commit efdaeb1

File tree

3 files changed

+50
-17
lines changed

3 files changed

+50
-17
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,15 @@ class KryoSerializer(conf: SparkConf)
4848

4949
private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
5050
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
51+
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
5152
private val registrator = conf.getOption("spark.kryo.registrator")
5253

5354
def newKryoOutput() = new KryoOutput(bufferSize)
5455

5556
def newKryo(): Kryo = {
5657
val instantiator = new EmptyScalaKryoInstantiator
5758
val kryo = instantiator.newKryo()
59+
kryo.setRegistrationRequired(registrationRequired)
5860
val classLoader = Thread.currentThread.getContextClassLoader
5961

6062
// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
@@ -185,7 +187,8 @@ private[serializer] object KryoSerializer {
185187
classOf[MapStatus],
186188
classOf[BlockManagerId],
187189
classOf[Array[Byte]],
188-
classOf[BoundedPriorityQueue[_]]
190+
classOf[BoundedPriorityQueue[_]],
191+
classOf[SparkConf]
189192
)
190193
}
191194

docs/configuration.md

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,17 @@ Apart from these, the following properties are also available, and may be useful
388388
case.
389389
</td>
390390
</tr>
391+
<tr>
392+
<td><code>spark.kryo.registrationRequired</code></td>
393+
<td>false</td>
394+
<td>
395+
Whether to require registration with Kryo. If set to 'true', Kryo will throw an exception
396+
if an unregistered class is serialized. If set to false (the default), Kryo will write
397+
unregistered class names along with each object. Writing class names can cause
398+
significant performance overhead, so enabling this option can enforce strictly that a
399+
user has not omitted classes from registration.
400+
</td>
401+
</tr>
391402
<tr>
392403
<td><code>spark.kryoserializer.buffer.mb</code></td>
393404
<td>2</td>
@@ -497,9 +508,9 @@ Apart from these, the following properties are also available, and may be useful
497508
<tr>
498509
<td>spark.hadoop.validateOutputSpecs</td>
499510
<td>true</td>
500-
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
501-
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
502-
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
511+
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
512+
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
513+
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
503514
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
504515
</tr>
505516
</table>
@@ -861,7 +872,7 @@ Apart from these, the following properties are also available, and may be useful
861872
</table>
862873

863874
#### Cluster Managers
864-
Each cluster manager in Spark has additional configuration options. Configurations
875+
Each cluster manager in Spark has additional configuration options. Configurations
865876
can be found on the pages for each mode:
866877

867878
* [YARN](running-on-yarn.html#configuration)

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ import scala.reflect.ClassTag
2424
import com.clearspring.analytics.stream.cardinality.HyperLogLog
2525
import com.esotericsoftware.kryo.io.{Input, Output}
2626
import com.esotericsoftware.kryo.{Serializer, Kryo}
27-
import com.twitter.chill.AllScalaRegistrar
27+
import com.twitter.chill.{AllScalaRegistrar, ResourcePool}
2828

2929
import org.apache.spark.{SparkEnv, SparkConf}
30-
import org.apache.spark.serializer.KryoSerializer
30+
import org.apache.spark.serializer.{SerializerInstance, KryoSerializer}
3131
import org.apache.spark.util.MutablePair
3232
import org.apache.spark.util.Utils
3333

@@ -48,22 +48,41 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co
4848
}
4949
}
5050

51-
private[sql] object SparkSqlSerializer {
52-
// TODO (lian) Using KryoSerializer here is workaround, needs further investigation
53-
// Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization
54-
// related error.
55-
@transient lazy val ser: KryoSerializer = {
51+
private[execution] class KryoResourcePool(size: Int)
52+
extends ResourcePool[SerializerInstance](size) {
53+
54+
val ser: KryoSerializer = {
5655
val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
56+
// TODO (lian) Using KryoSerializer here is workaround, needs further investigation
57+
// Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization
58+
// related error.
5759
new KryoSerializer(sparkConf)
5860
}
5961

60-
def serialize[T: ClassTag](o: T): Array[Byte] = {
61-
ser.newInstance().serialize(o).array()
62-
}
62+
def newInstance() = ser.newInstance()
63+
}
6364

64-
def deserialize[T: ClassTag](bytes: Array[Byte]): T = {
65-
ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes))
65+
private[sql] object SparkSqlSerializer {
66+
@transient lazy val resourcePool = new KryoResourcePool(30)
67+
68+
private[this] def acquireRelease[O](fn: SerializerInstance => O): O = {
69+
val kryo = resourcePool.borrow
70+
try {
71+
fn(kryo)
72+
} finally {
73+
resourcePool.release(kryo)
74+
}
6675
}
76+
77+
def serialize[T: ClassTag](o: T): Array[Byte] =
78+
acquireRelease { k =>
79+
k.serialize(o).array()
80+
}
81+
82+
def deserialize[T: ClassTag](bytes: Array[Byte]): T =
83+
acquireRelease { k =>
84+
k.deserialize[T](ByteBuffer.wrap(bytes))
85+
}
6786
}
6887

6988
private[sql] class BigDecimalSerializer extends Serializer[BigDecimal] {

0 commit comments

Comments
 (0)