Skip to content

Commit b9a922e

Browse files
zsxwingrxin
authored andcommitted
[SPARK-6602][Core]Replace Akka Serialization with Spark Serializer
Replace Akka Serialization with Spark Serializer and add unit tests. Author: zsxwing <[email protected]> Closes apache#7159 from zsxwing/remove-akka-serialization and squashes the following commits: fc0fca3 [zsxwing] Merge branch 'master' into remove-akka-serialization cf81a58 [zsxwing] Fix the code style 73251c6 [zsxwing] Add test scope 9ef4af9 [zsxwing] Add AkkaRpcEndpointRef.hashCode 433115c [zsxwing] Remove final be3edb0 [zsxwing] Support deserializing RpcEndpointRef ecec410 [zsxwing] Replace Akka Serialization with Spark Serializer
1 parent 536533c commit b9a922e

File tree

12 files changed

+214
-62
lines changed

12 files changed

+214
-62
lines changed

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,11 @@
372372
<artifactId>junit-interface</artifactId>
373373
<scope>test</scope>
374374
</dependency>
375+
<dependency>
376+
<groupId>org.apache.curator</groupId>
377+
<artifactId>curator-test</artifactId>
378+
<scope>test</scope>
379+
</dependency>
375380
<dependency>
376381
<groupId>net.razorvine</groupId>
377382
<artifactId>pyrolite</artifactId>

core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ import java.io._
2121

2222
import scala.reflect.ClassTag
2323

24-
import akka.serialization.Serialization
25-
2624
import org.apache.spark.Logging
25+
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer}
2726
import org.apache.spark.util.Utils
2827

2928

@@ -32,11 +31,11 @@ import org.apache.spark.util.Utils
3231
* Files are deleted when applications and workers are removed.
3332
*
3433
* @param dir Directory to store files. Created if non-existent (but not recursively).
35-
* @param serialization Used to serialize our objects.
34+
* @param serializer Used to serialize our objects.
3635
*/
3736
private[master] class FileSystemPersistenceEngine(
3837
val dir: String,
39-
val serialization: Serialization)
38+
val serializer: Serializer)
4039
extends PersistenceEngine with Logging {
4140

4241
new File(dir).mkdir()
@@ -57,27 +56,31 @@ private[master] class FileSystemPersistenceEngine(
5756
private def serializeIntoFile(file: File, value: AnyRef) {
5857
val created = file.createNewFile()
5958
if (!created) { throw new IllegalStateException("Could not create file: " + file) }
60-
val serializer = serialization.findSerializerFor(value)
61-
val serialized = serializer.toBinary(value)
62-
val out = new FileOutputStream(file)
59+
val fileOut = new FileOutputStream(file)
60+
var out: SerializationStream = null
6361
Utils.tryWithSafeFinally {
64-
out.write(serialized)
62+
out = serializer.newInstance().serializeStream(fileOut)
63+
out.writeObject(value)
6564
} {
66-
out.close()
65+
fileOut.close()
66+
if (out != null) {
67+
out.close()
68+
}
6769
}
6870
}
6971

7072
private def deserializeFromFile[T](file: File)(implicit m: ClassTag[T]): T = {
71-
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
72-
val dis = new DataInputStream(new FileInputStream(file))
73+
val fileIn = new FileInputStream(file)
74+
var in: DeserializationStream = null
7375
try {
74-
dis.readFully(fileData)
76+
in = serializer.newInstance().deserializeStream(fileIn)
77+
in.readObject[T]()
7578
} finally {
76-
dis.close()
79+
fileIn.close()
80+
if (in != null) {
81+
in.close()
82+
}
7783
}
78-
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
79-
val serializer = serialization.serializerFor(clazz)
80-
serializer.fromBinary(fileData).asInstanceOf[T]
8184
}
8285

8386
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,8 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
2727
import scala.language.postfixOps
2828
import scala.util.Random
2929

30-
import akka.serialization.Serialization
31-
import akka.serialization.SerializationExtension
3230
import org.apache.hadoop.fs.Path
3331

34-
import org.apache.spark.rpc.akka.AkkaRpcEnv
3532
import org.apache.spark.rpc._
3633
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
3734
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
@@ -44,6 +41,7 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
4441
import org.apache.spark.deploy.rest.StandaloneRestServer
4542
import org.apache.spark.metrics.MetricsSystem
4643
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
44+
import org.apache.spark.serializer.{JavaSerializer, Serializer}
4745
import org.apache.spark.ui.SparkUI
4846
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
4947

@@ -58,9 +56,6 @@ private[master] class Master(
5856
private val forwardMessageThread =
5957
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
6058

61-
// TODO Remove it once we don't use akka.serialization.Serialization
62-
private val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
63-
6459
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
6560

6661
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
@@ -161,20 +156,21 @@ private[master] class Master(
161156
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
162157
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
163158

159+
val serializer = new JavaSerializer(conf)
164160
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
165161
case "ZOOKEEPER" =>
166162
logInfo("Persisting recovery state to ZooKeeper")
167163
val zkFactory =
168-
new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(actorSystem))
164+
new ZooKeeperRecoveryModeFactory(conf, serializer)
169165
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
170166
case "FILESYSTEM" =>
171167
val fsFactory =
172-
new FileSystemRecoveryModeFactory(conf, SerializationExtension(actorSystem))
168+
new FileSystemRecoveryModeFactory(conf, serializer)
173169
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
174170
case "CUSTOM" =>
175171
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
176-
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serialization])
177-
.newInstance(conf, SerializationExtension(actorSystem))
172+
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
173+
.newInstance(conf, serializer)
178174
.asInstanceOf[StandaloneRecoveryModeFactory]
179175
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
180176
case _ =>
@@ -213,7 +209,7 @@ private[master] class Master(
213209

214210
override def receive: PartialFunction[Any, Unit] = {
215211
case ElectedLeader => {
216-
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
212+
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
217213
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
218214
RecoveryState.ALIVE
219215
} else {

core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.deploy.master
1919

2020
import org.apache.spark.annotation.DeveloperApi
21+
import org.apache.spark.rpc.RpcEnv
2122

2223
import scala.reflect.ClassTag
2324

@@ -80,8 +81,11 @@ abstract class PersistenceEngine {
8081
* Returns the persisted data sorted by their respective ids (which implies that they're
8182
* sorted by time of creation).
8283
*/
83-
final def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
84-
(read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))
84+
final def readPersistedData(
85+
rpcEnv: RpcEnv): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
86+
rpcEnv.deserialize { () =>
87+
(read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))
88+
}
8589
}
8690

8791
def close() {}

core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717

1818
package org.apache.spark.deploy.master
1919

20-
import akka.serialization.Serialization
21-
2220
import org.apache.spark.{Logging, SparkConf}
2321
import org.apache.spark.annotation.DeveloperApi
22+
import org.apache.spark.serializer.Serializer
2423

2524
/**
2625
* ::DeveloperApi::
@@ -30,7 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
3029
*
3130
*/
3231
@DeveloperApi
33-
abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serialization) {
32+
abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serializer) {
3433

3534
/**
3635
* PersistenceEngine defines how the persistent data(Information about worker, driver etc..)
@@ -49,7 +48,7 @@ abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serial
4948
* LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
5049
* recovery is made by restoring from filesystem.
5150
*/
52-
private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: Serialization)
51+
private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
5352
extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
5453

5554
val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
@@ -64,7 +63,7 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer:
6463
}
6564
}
6665

67-
private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serialization)
66+
private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
6867
extends StandaloneRecoveryModeFactory(conf, serializer) {
6968

7069
def createPersistenceEngine(): PersistenceEngine = {

core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.deploy.master
1919

20-
import akka.serialization.Serialization
20+
import java.nio.ByteBuffer
2121

2222
import scala.collection.JavaConversions._
2323
import scala.reflect.ClassTag
@@ -27,9 +27,10 @@ import org.apache.zookeeper.CreateMode
2727

2828
import org.apache.spark.{Logging, SparkConf}
2929
import org.apache.spark.deploy.SparkCuratorUtil
30+
import org.apache.spark.serializer.Serializer
3031

3132

32-
private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)
33+
private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer)
3334
extends PersistenceEngine
3435
with Logging {
3536

@@ -57,17 +58,16 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializat
5758
}
5859

5960
private def serializeIntoFile(path: String, value: AnyRef) {
60-
val serializer = serialization.findSerializerFor(value)
61-
val serialized = serializer.toBinary(value)
62-
zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized)
61+
val serialized = serializer.newInstance().serialize(value)
62+
val bytes = new Array[Byte](serialized.remaining())
63+
serialized.get(bytes)
64+
zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)
6365
}
6466

6567
private def deserializeFromFile[T](filename: String)(implicit m: ClassTag[T]): Option[T] = {
6668
val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
67-
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
68-
val serializer = serialization.serializerFor(clazz)
6969
try {
70-
Some(serializer.fromBinary(fileData).asInstanceOf[T])
70+
Some(serializer.newInstance().deserialize[T](ByteBuffer.wrap(fileData)))
7171
} catch {
7272
case e: Exception => {
7373
logWarning("Exception while reading persisted file, deleting", e)

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,12 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
139139
* creating it manually because different [[RpcEnv]] may have different formats.
140140
*/
141141
def uriOf(systemName: String, address: RpcAddress, endpointName: String): String
142+
143+
/**
144+
* [[RpcEndpointRef]] cannot be deserialized without [[RpcEnv]]. So when deserializing any object
145+
* that contains [[RpcEndpointRef]]s, the deserialization codes should be wrapped by this method.
146+
*/
147+
def deserialize[T](deserializationAction: () => T): T
142148
}
143149

144150

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Add
2828
import akka.event.Logging.Error
2929
import akka.pattern.{ask => akkaAsk}
3030
import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent}
31-
import com.google.common.util.concurrent.MoreExecutors
31+
import akka.serialization.JavaSerializer
3232

3333
import org.apache.spark.{SparkException, Logging, SparkConf}
3434
import org.apache.spark.rpc._
@@ -239,6 +239,12 @@ private[spark] class AkkaRpcEnv private[akka] (
239239
}
240240

241241
override def toString: String = s"${getClass.getSimpleName}($actorSystem)"
242+
243+
override def deserialize[T](deserializationAction: () => T): T = {
244+
JavaSerializer.currentSystem.withValue(actorSystem.asInstanceOf[ExtendedActorSystem]) {
245+
deserializationAction()
246+
}
247+
}
242248
}
243249

244250
private[spark] class AkkaRpcEnvFactory extends RpcEnvFactory {
@@ -315,6 +321,12 @@ private[akka] class AkkaRpcEndpointRef(
315321

316322
override def toString: String = s"${getClass.getSimpleName}($actorRef)"
317323

324+
final override def equals(that: Any): Boolean = that match {
325+
case other: AkkaRpcEndpointRef => actorRef == other.actorRef
326+
case _ => false
327+
}
328+
329+
final override def hashCode(): Int = if (actorRef == null) 0 else actorRef.hashCode()
318330
}
319331

320332
/**

core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,19 @@
1919
// when they are outside of org.apache.spark.
2020
package other.supplier
2121

22+
import java.nio.ByteBuffer
23+
2224
import scala.collection.mutable
2325
import scala.reflect.ClassTag
2426

25-
import akka.serialization.Serialization
26-
2727
import org.apache.spark.SparkConf
2828
import org.apache.spark.deploy.master._
29+
import org.apache.spark.serializer.Serializer
2930

3031
class CustomRecoveryModeFactory(
3132
conf: SparkConf,
32-
serialization: Serialization
33-
) extends StandaloneRecoveryModeFactory(conf, serialization) {
33+
serializer: Serializer
34+
) extends StandaloneRecoveryModeFactory(conf, serializer) {
3435

3536
CustomRecoveryModeFactory.instantiationAttempts += 1
3637

@@ -40,7 +41,7 @@ class CustomRecoveryModeFactory(
4041
*
4142
*/
4243
override def createPersistenceEngine(): PersistenceEngine =
43-
new CustomPersistenceEngine(serialization)
44+
new CustomPersistenceEngine(serializer)
4445

4546
/**
4647
* Create an instance of LeaderAgent that decides who gets elected as master.
@@ -53,7 +54,7 @@ object CustomRecoveryModeFactory {
5354
@volatile var instantiationAttempts = 0
5455
}
5556

56-
class CustomPersistenceEngine(serialization: Serialization) extends PersistenceEngine {
57+
class CustomPersistenceEngine(serializer: Serializer) extends PersistenceEngine {
5758
val data = mutable.HashMap[String, Array[Byte]]()
5859

5960
CustomPersistenceEngine.lastInstance = Some(this)
@@ -64,10 +65,10 @@ class CustomPersistenceEngine(serialization: Serialization) extends PersistenceE
6465
*/
6566
override def persist(name: String, obj: Object): Unit = {
6667
CustomPersistenceEngine.persistAttempts += 1
67-
serialization.serialize(obj) match {
68-
case util.Success(bytes) => data += name -> bytes
69-
case util.Failure(cause) => throw new RuntimeException(cause)
70-
}
68+
val serialized = serializer.newInstance().serialize(obj)
69+
val bytes = new Array[Byte](serialized.remaining())
70+
serialized.get(bytes)
71+
data += name -> bytes
7172
}
7273

7374
/**
@@ -84,15 +85,9 @@ class CustomPersistenceEngine(serialization: Serialization) extends PersistenceE
8485
*/
8586
override def read[T: ClassTag](prefix: String): Seq[T] = {
8687
CustomPersistenceEngine.readAttempts += 1
87-
val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
8888
val results = for ((name, bytes) <- data; if name.startsWith(prefix))
89-
yield serialization.deserialize(bytes, clazz)
90-
91-
results.find(_.isFailure).foreach {
92-
case util.Failure(cause) => throw new RuntimeException(cause)
93-
}
94-
95-
results.flatMap(_.toOption).toSeq
89+
yield serializer.newInstance().deserialize[T](ByteBuffer.wrap(bytes))
90+
results.toSeq
9691
}
9792
}
9893

core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually {
105105
persistenceEngine.addDriver(driverToPersist)
106106
persistenceEngine.addWorker(workerToPersist)
107107

108-
val (apps, drivers, workers) = persistenceEngine.readPersistedData()
108+
val (apps, drivers, workers) = persistenceEngine.readPersistedData(rpcEnv)
109109

110110
apps.map(_.id) should contain(appToPersist.id)
111111
drivers.map(_.id) should contain(driverToPersist.id)

0 commit comments

Comments
 (0)