Skip to content

Commit 1faa113

Browse files
committed
Revert "[SPARK-2805] Upgrade to akka 2.3.4"
This reverts commit b9df8af.
1 parent ec4d40e commit 1faa113

File tree

6 files changed

+77
-6
lines changed

6 files changed

+77
-6
lines changed

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
130130
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
131131
System.exit(-1)
132132

133-
case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
133+
case AssociationErrorEvent(cause, _, remoteAddress, _) =>
134134
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
135135
println(s"Cause was: $cause")
136136
System.exit(-1)

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private[spark] class AppClient(
154154
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
155155
markDisconnected()
156156

157-
case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) =>
157+
case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
158158
logWarning(s"Could not connect to $address: $cause")
159159

160160
case StopAppClient =>

core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private[spark] class WorkerWatcher(workerUrl: String)
5454
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
5555
logInfo(s"Successfully connected to $workerUrl")
5656

57-
case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _)
57+
case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound)
5858
if isWorker(remoteAddress) =>
5959
// These logs may not be seen if the worker (and associated pipe) has died
6060
logError(s"Could not initialize connection to worker $workerUrl. Exiting.")

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
146146
val masterTracker = new MapOutputTrackerMaster(conf)
147147
val actorSystem = ActorSystem("test")
148148
val actorRef = TestActorRef[MapOutputTrackerMasterActor](
149-
Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem)
149+
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
150150
val masterActor = actorRef.underlyingActor
151151

152152
// Frame size should be ~123B, and no exception should be thrown
@@ -164,7 +164,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
164164
val masterTracker = new MapOutputTrackerMaster(conf)
165165
val actorSystem = ActorSystem("test")
166166
val actorRef = TestActorRef[MapOutputTrackerMasterActor](
167-
Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem)
167+
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
168168
val masterActor = actorRef.underlyingActor
169169

170170
// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@
118118
<mesos.version>0.18.1</mesos.version>
119119
<mesos.classifier>shaded-protobuf</mesos.classifier>
120120
<akka.group>org.spark-project.akka</akka.group>
121-
<akka.version>2.3.4-spark</akka.version>
121+
<akka.version>2.2.3-shaded-protobuf</akka.version>
122122
<slf4j.version>1.7.5</slf4j.version>
123123
<log4j.version>1.2.17</log4j.version>
124124
<hadoop.version>1.0.4</hadoop.version>

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.spark.streaming
1919

2020
import akka.actor.Actor
21+
import akka.actor.IO
22+
import akka.actor.IOManager
2123
import akka.actor.Props
2224
import akka.util.ByteString
2325

@@ -142,6 +144,59 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
142144
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
143145
}
144146

147+
// TODO: This test works in IntelliJ but not through SBT
148+
ignore("actor input stream") {
149+
// Start the server
150+
val testServer = new TestServer()
151+
val port = testServer.port
152+
testServer.start()
153+
154+
// Set up the streaming context and input streams
155+
val ssc = new StreamingContext(conf, batchDuration)
156+
val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
157+
// Had to pass the local value of port to prevent from closing over entire scope
158+
StorageLevel.MEMORY_AND_DISK)
159+
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
160+
val outputStream = new TestOutputStream(networkStream, outputBuffer)
161+
def output = outputBuffer.flatMap(x => x)
162+
outputStream.register()
163+
ssc.start()
164+
165+
// Feed data to the server to send to the network receiver
166+
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
167+
val input = 1 to 9
168+
val expectedOutput = input.map(x => x.toString)
169+
Thread.sleep(1000)
170+
for (i <- 0 until input.size) {
171+
testServer.send(input(i).toString)
172+
Thread.sleep(500)
173+
clock.addToTime(batchDuration.milliseconds)
174+
}
175+
Thread.sleep(1000)
176+
logInfo("Stopping server")
177+
testServer.stop()
178+
logInfo("Stopping context")
179+
ssc.stop()
180+
181+
// Verify whether data received was as expected
182+
logInfo("--------------------------------")
183+
logInfo("output.size = " + outputBuffer.size)
184+
logInfo("output")
185+
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
186+
logInfo("expected output.size = " + expectedOutput.size)
187+
logInfo("expected output")
188+
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
189+
logInfo("--------------------------------")
190+
191+
// Verify whether all the elements received are as expected
192+
// (whether the elements were received one in each interval is not verified)
193+
assert(output.size === expectedOutput.size)
194+
for (i <- 0 until output.size) {
195+
assert(output(i) === expectedOutput(i))
196+
}
197+
}
198+
199+
145200
test("multi-thread receiver") {
146201
// set up the test receiver
147202
val numThreads = 10
@@ -323,6 +378,22 @@ class TestServer(portToBind: Int = 0) extends Logging {
323378
def port = serverSocket.getLocalPort
324379
}
325380

381+
/** This is an actor for testing actor input stream */
382+
class TestActor(port: Int) extends Actor with ActorHelper {
383+
384+
def bytesToString(byteString: ByteString) = byteString.utf8String
385+
386+
override def preStart(): Unit = {
387+
@deprecated("suppress compile time deprecation warning", "1.0.0")
388+
val unit = IOManager(context.system).connect(new InetSocketAddress(port))
389+
}
390+
391+
def receive = {
392+
case IO.Read(socket, bytes) =>
393+
store(bytesToString(bytes))
394+
}
395+
}
396+
326397
/** This is a receiver to test multiple threads inserting data using block generator */
327398
class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
328399
extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging {

0 commit comments

Comments
 (0)