Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
System.exit(-1)

case AssociationErrorEvent(cause, _, remoteAddress, _) =>
case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
println(s"Cause was: $cause")
System.exit(-1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private[spark] class AppClient(
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()

case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) =>
logWarning(s"Could not connect to $address: $cause")

case StopAppClient =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[spark] class WorkerWatcher(workerUrl: String) extends Actor
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
logInfo(s"Successfully connected to $workerUrl")

case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound)
case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _)
if isWorker(remoteAddress) =>
// These logs may not be seen if the worker (and associated pipe) has died
logError(s"Could not initialize connection to worker $workerUrl. Exiting.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val masterTracker = new MapOutputTrackerMaster(conf)
val actorSystem = ActorSystem("test")
val actorRef = TestActorRef[MapOutputTrackerMasterActor](
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem)
val masterActor = actorRef.underlyingActor

// Frame size should be ~123B, and no exception should be thrown
Expand All @@ -186,7 +186,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val masterTracker = new MapOutputTrackerMaster(conf)
val actorSystem = ActorSystem("test")
val actorRef = TestActorRef[MapOutputTrackerMasterActor](
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem)
val masterActor = actorRef.underlyingActor

// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,15 @@
<mesos.version>0.18.1</mesos.version>
<mesos.classifier>shaded-protobuf</mesos.classifier>
<akka.group>org.spark-project.akka</akka.group>
<akka.version>2.2.3-shaded-protobuf</akka.version>
<akka.version>2.3.4-spark</akka.version>
<slf4j.version>1.7.5</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<hadoop.version>1.0.4</hadoop.version>
<protobuf.version>2.4.1</protobuf.version>
<yarn.version>${hadoop.version}</yarn.version>
<hbase.version>0.94.6</hbase.version>
<zookeeper.version>3.4.5</zookeeper.version>
<hive.version>0.12.0</hive.version>
<hive.version>0.12.0-protobuf</hive.version>
<parquet.version>1.4.3</parquet.version>
<jblas.version>1.2.3</jblas.version>
<jetty.version>8.1.14.v20131031</jetty.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.spark.streaming

import akka.actor.Actor
import akka.actor.IO
import akka.actor.IOManager
import akka.actor.Props
import akka.util.ByteString

Expand Down Expand Up @@ -144,59 +142,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
}

// TODO: This test works in IntelliJ but not through SBT
ignore("actor input stream") {
// Start the server
val testServer = new TestServer()
val port = testServer.port
testServer.start()

// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
// Had to pass the local value of port to prevent from closing over entire scope
StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
outputStream.register()
ssc.start()

// Feed data to the server to send to the network receiver
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = 1 to 9
val expectedOutput = input.map(x => x.toString)
Thread.sleep(1000)
for (i <- 0 until input.size) {
testServer.send(input(i).toString)
Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
}
Thread.sleep(1000)
logInfo("Stopping server")
testServer.stop()
logInfo("Stopping context")
ssc.stop()

// Verify whether data received was as expected
logInfo("--------------------------------")
logInfo("output.size = " + outputBuffer.size)
logInfo("output")
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")

// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
assert(output.size === expectedOutput.size)
for (i <- 0 until output.size) {
assert(output(i) === expectedOutput(i))
}
}


test("multi-thread receiver") {
// set up the test receiver
val numThreads = 10
Expand Down Expand Up @@ -378,22 +323,6 @@ class TestServer(portToBind: Int = 0) extends Logging {
def port = serverSocket.getLocalPort
}

/** This is an actor for testing actor input stream */
class TestActor(port: Int) extends Actor with ActorHelper {

def bytesToString(byteString: ByteString) = byteString.utf8String

override def preStart(): Unit = {
@deprecated("suppress compile time deprecation warning", "1.0.0")
val unit = IOManager(context.system).connect(new InetSocketAddress(port))
}

def receive = {
case IO.Read(socket, bytes) =>
store(bytesToString(bytes))
}
}

/** This is a receiver to test multiple threads inserting data using block generator */
class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging {
Expand Down