Skip to content

Commit ae4083a

Browse files
avatiaarondav
authored andcommitted
[SPARK-2805] Upgrade Akka to 2.3.4
This is a second rev of the Akka upgrade (earlier merged, but reverted). I made a slight modification which is that I also upgrade Hive to deal with a compatibility issue related to the protocol buffers library. Author: Anand Avati <[email protected]> Author: Patrick Wendell <[email protected]> Closes #2752 from pwendell/akka-upgrade and squashes the following commits: 4c7ca3f [Patrick Wendell] Upgrading to new hive->protobuf version 57a2315 [Anand Avati] SPARK-1812: streaming - remove tests which depend on akka.actor.IO 2a551d3 [Anand Avati] SPARK-1812: core - upgrade to akka 2.3.4
1 parent 29c6dcf commit ae4083a

File tree

6 files changed

+7
-78
lines changed

6 files changed

+7
-78
lines changed

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
130130
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
131131
System.exit(-1)
132132

133-
case AssociationErrorEvent(cause, _, remoteAddress, _) =>
133+
case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
134134
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
135135
println(s"Cause was: $cause")
136136
System.exit(-1)

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private[spark] class AppClient(
154154
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
155155
markDisconnected()
156156

157-
case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
157+
case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) =>
158158
logWarning(s"Could not connect to $address: $cause")
159159

160160
case StopAppClient =>

core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private[spark] class WorkerWatcher(workerUrl: String)
5454
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
5555
logInfo(s"Successfully connected to $workerUrl")
5656

57-
case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound)
57+
case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _)
5858
if isWorker(remoteAddress) =>
5959
// These logs may not be seen if the worker (and associated pipe) has died
6060
logError(s"Could not initialize connection to worker $workerUrl. Exiting.")

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
146146
val masterTracker = new MapOutputTrackerMaster(conf)
147147
val actorSystem = ActorSystem("test")
148148
val actorRef = TestActorRef[MapOutputTrackerMasterActor](
149-
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
149+
Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem)
150150
val masterActor = actorRef.underlyingActor
151151

152152
// Frame size should be ~123B, and no exception should be thrown
@@ -164,7 +164,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
164164
val masterTracker = new MapOutputTrackerMaster(conf)
165165
val actorSystem = ActorSystem("test")
166166
val actorRef = TestActorRef[MapOutputTrackerMasterActor](
167-
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
167+
Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem)
168168
val masterActor = actorRef.underlyingActor
169169

170170
// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@
119119
<mesos.version>0.18.1</mesos.version>
120120
<mesos.classifier>shaded-protobuf</mesos.classifier>
121121
<akka.group>org.spark-project.akka</akka.group>
122-
<akka.version>2.2.3-shaded-protobuf</akka.version>
122+
<akka.version>2.3.4-spark</akka.version>
123123
<slf4j.version>1.7.5</slf4j.version>
124124
<log4j.version>1.2.17</log4j.version>
125125
<hadoop.version>1.0.4</hadoop.version>
@@ -128,7 +128,7 @@
128128
<hbase.version>0.94.6</hbase.version>
129129
<flume.version>1.4.0</flume.version>
130130
<zookeeper.version>3.4.5</zookeeper.version>
131-
<hive.version>0.12.0</hive.version>
131+
<hive.version>0.12.0-protobuf</hive.version>
132132
<parquet.version>1.4.3</parquet.version>
133133
<jblas.version>1.2.3</jblas.version>
134134
<jetty.version>8.1.14.v20131031</jetty.version>

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
package org.apache.spark.streaming
1919

2020
import akka.actor.Actor
21-
import akka.actor.IO
22-
import akka.actor.IOManager
2321
import akka.actor.Props
2422
import akka.util.ByteString
2523

@@ -143,59 +141,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
143141
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
144142
}
145143

146-
// TODO: This test works in IntelliJ but not through SBT
147-
ignore("actor input stream") {
148-
// Start the server
149-
val testServer = new TestServer()
150-
val port = testServer.port
151-
testServer.start()
152-
153-
// Set up the streaming context and input streams
154-
val ssc = new StreamingContext(conf, batchDuration)
155-
val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
156-
// Had to pass the local value of port to prevent from closing over entire scope
157-
StorageLevel.MEMORY_AND_DISK)
158-
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
159-
val outputStream = new TestOutputStream(networkStream, outputBuffer)
160-
def output = outputBuffer.flatMap(x => x)
161-
outputStream.register()
162-
ssc.start()
163-
164-
// Feed data to the server to send to the network receiver
165-
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
166-
val input = 1 to 9
167-
val expectedOutput = input.map(x => x.toString)
168-
Thread.sleep(1000)
169-
for (i <- 0 until input.size) {
170-
testServer.send(input(i).toString)
171-
Thread.sleep(500)
172-
clock.addToTime(batchDuration.milliseconds)
173-
}
174-
Thread.sleep(1000)
175-
logInfo("Stopping server")
176-
testServer.stop()
177-
logInfo("Stopping context")
178-
ssc.stop()
179-
180-
// Verify whether data received was as expected
181-
logInfo("--------------------------------")
182-
logInfo("output.size = " + outputBuffer.size)
183-
logInfo("output")
184-
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
185-
logInfo("expected output.size = " + expectedOutput.size)
186-
logInfo("expected output")
187-
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
188-
logInfo("--------------------------------")
189-
190-
// Verify whether all the elements received are as expected
191-
// (whether the elements were received one in each interval is not verified)
192-
assert(output.size === expectedOutput.size)
193-
for (i <- 0 until output.size) {
194-
assert(output(i) === expectedOutput(i))
195-
}
196-
}
197-
198-
199144
test("multi-thread receiver") {
200145
// set up the test receiver
201146
val numThreads = 10
@@ -377,22 +322,6 @@ class TestServer(portToBind: Int = 0) extends Logging {
377322
def port = serverSocket.getLocalPort
378323
}
379324

380-
/** This is an actor for testing actor input stream */
381-
class TestActor(port: Int) extends Actor with ActorHelper {
382-
383-
def bytesToString(byteString: ByteString) = byteString.utf8String
384-
385-
override def preStart(): Unit = {
386-
@deprecated("suppress compile time deprecation warning", "1.0.0")
387-
val unit = IOManager(context.system).connect(new InetSocketAddress(port))
388-
}
389-
390-
def receive = {
391-
case IO.Read(socket, bytes) =>
392-
store(bytesToString(bytes))
393-
}
394-
}
395-
396325
/** This is a receiver to test multiple threads inserting data using block generator */
397326
class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
398327
extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging {

0 commit comments

Comments
 (0)