Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ class KryoSerializer(conf: SparkConf)

private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
private val registrator = conf.getOption("spark.kryo.registrator")

def newKryoOutput() = new KryoOutput(bufferSize)

def newKryo(): Kryo = {
val instantiator = new EmptyScalaKryoInstantiator
val kryo = instantiator.newKryo()
kryo.setRegistrationRequired(registrationRequired)
val classLoader = Thread.currentThread.getContextClassLoader

// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
Expand Down Expand Up @@ -185,7 +187,8 @@ private[serializer] object KryoSerializer {
classOf[MapStatus],
classOf[BlockManagerId],
classOf[Array[Byte]],
classOf[BoundedPriorityQueue[_]]
classOf[BoundedPriorityQueue[_]],
classOf[SparkConf]
)
}

Expand Down
19 changes: 15 additions & 4 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,17 @@ Apart from these, the following properties are also available, and may be useful
case.
</td>
</tr>
<tr>
<td><code>spark.kryo.registrationRequired</code></td>
<td>false</td>
<td>
Whether to require registration with Kryo. If set to 'true', Kryo will throw an exception
if an unregistered class is serialized. If set to false (the default), Kryo will write
unregistered class names along with each object. Writing class names can cause
significant performance overhead, so enabling this option can enforce strictly that a
user has not omitted classes from registration.
</td>
</tr>
<tr>
<td><code>spark.kryoserializer.buffer.mb</code></td>
<td>2</td>
Expand Down Expand Up @@ -490,9 +501,9 @@ Apart from these, the following properties are also available, and may be useful
<tr>
<td>spark.hadoop.validateOutputSpecs</td>
<td>true</td>
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
</tr>
</table>
Expand Down Expand Up @@ -835,7 +846,7 @@ Apart from these, the following properties are also available, and may be useful
</table>

#### Cluster Managers
Each cluster manager in Spark has additional configuration options. Configurations
Each cluster manager in Spark has additional configuration options. Configurations
can be found on the pages for each mode:

* [YARN](running-on-yarn.html#configuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import scala.reflect.ClassTag
import com.clearspring.analytics.stream.cardinality.HyperLogLog
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Serializer, Kryo}
import com.twitter.chill.AllScalaRegistrar
import com.twitter.chill.{AllScalaRegistrar, ResourcePool}

import org.apache.spark.{SparkEnv, SparkConf}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.serializer.{SerializerInstance, KryoSerializer}
import org.apache.spark.util.MutablePair
import org.apache.spark.util.Utils

Expand All @@ -48,22 +48,41 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co
}
}

private[sql] object SparkSqlSerializer {
// TODO (lian) Using KryoSerializer here is workaround, needs further investigation
// Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization
// related error.
@transient lazy val ser: KryoSerializer = {
private[execution] class KryoResourcePool(size: Int)
extends ResourcePool[SerializerInstance](size) {

val ser: KryoSerializer = {
val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
// TODO (lian) Using KryoSerializer here is workaround, needs further investigation
// Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization
// related error.
new KryoSerializer(sparkConf)
}

def serialize[T: ClassTag](o: T): Array[Byte] = {
ser.newInstance().serialize(o).array()
}
def newInstance() = ser.newInstance()
}

def deserialize[T: ClassTag](bytes: Array[Byte]): T = {
ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes))
private[sql] object SparkSqlSerializer {
@transient lazy val resourcePool = new KryoResourcePool(30)

private[this] def acquireRelease[O](fn: SerializerInstance => O): O = {
val kryo = resourcePool.borrow
try {
fn(kryo)
} finally {
resourcePool.release(kryo)
}
}

def serialize[T: ClassTag](o: T): Array[Byte] =
acquireRelease { k =>
k.serialize(o).array()
}

def deserialize[T: ClassTag](bytes: Array[Byte]): T =
acquireRelease { k =>
k.deserialize[T](ByteBuffer.wrap(bytes))
}
}

private[sql] class BigDecimalSerializer extends Serializer[BigDecimal] {
Expand Down