Driver state: {driverState.state}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index 5a1d06eb87db9..dc2bee6f2bdca 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -17,8 +17,6 @@
package org.apache.spark.deploy.worker.ui
-import java.io.File
-import java.net.URI
import javax.servlet.http.HttpServletRequest
import scala.xml.Node
@@ -137,13 +135,6 @@ private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with
return ("Error: Log type must be one of " + supportedLogTypes.mkString(", "), 0, 0, 0)
}
- // Verify that the normalized path of the log directory is in the working directory
- val normalizedUri = new URI(logDirectory).normalize()
- val normalizedLogDir = new File(normalizedUri.getPath)
- if (!Utils.isInDirectory(workDir, normalizedLogDir)) {
- return ("Error: invalid log directory " + logDirectory, 0, 0, 0)
- }
-
try {
val files = RollingFileAppender.getSortedRolledOverFiles(logDirectory, logType)
logDebug(s"Sorted log files of type $logType in $logDirectory:\n${files.mkString("\n")}")
@@ -159,7 +150,7 @@ private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with
offset
}
}
- val endIndex = math.min(startIndex + byteLength, totalLength)
+ val endIndex = math.min(startIndex + totalLength, totalLength)
logDebug(s"Getting log from $startIndex to $endIndex")
val logText = Utils.offsetBytes(files, startIndex, endIndex)
logDebug(s"Got log of length ${logText.length} bytes")
diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala
index 8ae76c5f72f2e..2ab41ba488ff6 100644
--- a/core/src/main/scala/org/apache/spark/package.scala
+++ b/core/src/main/scala/org/apache/spark/package.scala
@@ -43,5 +43,5 @@ package org.apache
package object spark {
// For package docs only
- val SPARK_VERSION = "1.5.0-SNAPSHOT"
+ val SPARK_VERSION = "1.4.0-SNAPSHOT"
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 82455b0426a5d..673cd0e19eba2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -46,7 +46,7 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
*
* @param sched the TaskSchedulerImpl associated with the TaskSetManager
* @param taskSet the TaskSet to manage scheduling for
- * @param maxTaskFailures if any particular task fails this number of times, the entire
+ * @param maxTaskFailures if any particular task fails more than this number of times, the entire
* task set will be aborted
*/
private[spark] class TaskSetManager(
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7c7f70d8a193b..fcad959540f5a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -103,7 +103,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
- s"from unknown executor with ID $executorId")
+ "from unknown executor $sender with ID $executorId")
}
}
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
index 50b6ba67e9931..f73c742732dec 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
@@ -16,7 +16,6 @@
*/
package org.apache.spark.status.api.v1
-import java.util.zip.ZipOutputStream
import javax.servlet.ServletContext
import javax.ws.rs._
import javax.ws.rs.core.{Context, Response}
@@ -165,18 +164,6 @@ private[v1] class ApiRootResource extends UIRootFromServletContext {
}
}
- @Path("applications/{appId}/logs")
- def getEventLogs(
- @PathParam("appId") appId: String): EventLogDownloadResource = {
- new EventLogDownloadResource(uiRoot, appId, None)
- }
-
- @Path("applications/{appId}/{attemptId}/logs")
- def getEventLogs(
- @PathParam("appId") appId: String,
- @PathParam("attemptId") attemptId: String): EventLogDownloadResource = {
- new EventLogDownloadResource(uiRoot, appId, Some(attemptId))
- }
}
private[spark] object ApiRootResource {
@@ -206,17 +193,6 @@ private[spark] trait UIRoot {
def getSparkUI(appKey: String): Option[SparkUI]
def getApplicationInfoList: Iterator[ApplicationInfo]
- /**
- * Write the event logs for the given app to the [[ZipOutputStream]] instance. If attemptId is
- * [[None]], event logs for all attempts of this application will be written out.
- */
- def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit = {
- Response.serverError()
- .entity("Event logs are only available through the history server.")
- .status(Response.Status.SERVICE_UNAVAILABLE)
- .build()
- }
-
/**
* Get the spark UI with the given appID, and apply a function
* to it. If there is no such app, throw an appropriate exception
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala
deleted file mode 100644
index 22e21f0c62a29..0000000000000
--- a/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.status.api.v1
-
-import java.io.OutputStream
-import java.util.zip.ZipOutputStream
-import javax.ws.rs.{GET, Produces}
-import javax.ws.rs.core.{MediaType, Response, StreamingOutput}
-
-import scala.util.control.NonFatal
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
-
-@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
-private[v1] class EventLogDownloadResource(
- val uIRoot: UIRoot,
- val appId: String,
- val attemptId: Option[String]) extends Logging {
- val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf)
-
- @GET
- def getEventLogs(): Response = {
- try {
- val fileName = {
- attemptId match {
- case Some(id) => s"eventLogs-$appId-$id.zip"
- case None => s"eventLogs-$appId.zip"
- }
- }
-
- val stream = new StreamingOutput {
- override def write(output: OutputStream): Unit = {
- val zipStream = new ZipOutputStream(output)
- try {
- uIRoot.writeEventLogs(appId, attemptId, zipStream)
- } finally {
- zipStream.close()
- }
-
- }
- }
-
- Response.ok(stream)
- .header("Content-Disposition", s"attachment; filename=$fileName")
- .header("Content-Type", MediaType.APPLICATION_OCTET_STREAM)
- .build()
- } catch {
- case NonFatal(e) =>
- Response.serverError()
- .entity(s"Event logs are not available for app: $appId.")
- .status(Response.Status.SERVICE_UNAVAILABLE)
- .build()
- }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 1d31fce4c697b..f39e961772c46 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -17,12 +17,8 @@
package org.apache.spark.ui.jobs
-import java.util.concurrent.TimeoutException
-
import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
-import com.google.common.annotations.VisibleForTesting
-
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
@@ -530,30 +526,4 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onApplicationStart(appStarted: SparkListenerApplicationStart) {
startTime = appStarted.time
}
-
- /**
- * For testing only. Wait until at least `numExecutors` executors are up, or throw
- * `TimeoutException` if the waiting time elapsed before `numExecutors` executors up.
- *
- * @param numExecutors the number of executors to wait at least
- * @param timeout time to wait in milliseconds
- */
- @VisibleForTesting
- private[spark] def waitUntilExecutorsUp(numExecutors: Int, timeout: Long): Unit = {
- val finishTime = System.currentTimeMillis() + timeout
- while (System.currentTimeMillis() < finishTime) {
- val numBlockManagers = synchronized {
- blockManagerIds.size
- }
- if (numBlockManagers >= numExecutors + 1) {
- // Need to count the block manager in driver
- return
- }
- // Sleep rather than using wait/notify, because this is used only for testing and wait/notify
- // add overhead in the general case.
- Thread.sleep(10)
- }
- throw new TimeoutException(
- s"Can't find $numExecutors executors before $timeout milliseconds elapsed")
- }
}
diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
index 61b5a4cecddce..1861d38640102 100644
--- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
@@ -120,22 +120,21 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
/**
* For testing only. Wait until there are no more events in the queue, or until the specified
- * time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
- * emptied.
+ * time has elapsed. Return true if the queue has emptied and false is the specified time
+ * elapsed before the queue emptied.
*/
@VisibleForTesting
- @throws(classOf[TimeoutException])
- def waitUntilEmpty(timeoutMillis: Long): Unit = {
+ def waitUntilEmpty(timeoutMillis: Int): Boolean = {
val finishTime = System.currentTimeMillis + timeoutMillis
while (!queueIsEmpty) {
if (System.currentTimeMillis > finishTime) {
- throw new TimeoutException(
- s"The event queue is not empty after $timeoutMillis milliseconds")
+ return false
}
/* Sleep rather than using wait/notify, because this is used only for testing and
* wait/notify add overhead in the general case. */
Thread.sleep(10)
}
+ true
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5f132410540fd..693e1a0a3d5f0 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2227,22 +2227,6 @@ private[spark] object Utils extends Logging {
}
}
- /**
- * Return whether the specified file is a parent directory of the child file.
- */
- def isInDirectory(parent: File, child: File): Boolean = {
- if (child == null || parent == null) {
- return false
- }
- if (!child.exists() || !parent.exists() || !parent.isDirectory()) {
- return false
- }
- if (parent.equals(child)) {
- return true
- }
- isInDirectory(parent, child.getParentFile)
- }
-
}
private [util] class SparkShutdownHookManager {
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index 64e7102e3654c..1501111a06655 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -20,8 +20,6 @@ package org.apache.spark.util.collection
import scala.reflect._
import com.google.common.hash.Hashing
-import org.apache.spark.annotation.Private
-
/**
* A simple, fast hash set optimized for non-null insertion-only use case, where keys are never
* removed.
@@ -39,7 +37,7 @@ import org.apache.spark.annotation.Private
* It uses quadratic probing with a power-of-2 hash table size, which is guaranteed
* to explore all spaces for each key (see http://en.wikipedia.org/wiki/Quadratic_probing).
*/
-@Private
+private[spark]
class OpenHashSet[@specialized(Long, Int) T: ClassTag](
initialCapacity: Int,
loadFactor: Double)
@@ -112,14 +110,6 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
rehashIfNeeded(k, grow, move)
}
- def union(other: OpenHashSet[T]): OpenHashSet[T] = {
- val iterator = other.iterator
- while (iterator.hasNext) {
- add(iterator.next())
- }
- this
- }
-
/**
* Add an element to the set. This one differs from add in that it doesn't trigger rehashing.
* The caller is responsible for calling rehashIfNeeded.
diff --git a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
index d575bf2f284b9..ce4fe80b66aa5 100644
--- a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
@@ -7,22 +7,6 @@
"sparkUser" : "irashid",
"completed" : true
} ]
-}, {
- "id" : "local-1430917381535",
- "name" : "Spark shell",
- "attempts" : [ {
- "attemptId" : "2",
- "startTime" : "2015-05-06T13:03:00.893GMT",
- "endTime" : "2015-05-06T13:03:00.950GMT",
- "sparkUser" : "irashid",
- "completed" : true
- }, {
- "attemptId" : "1",
- "startTime" : "2015-05-06T13:03:00.880GMT",
- "endTime" : "2015-05-06T13:03:00.890GMT",
- "sparkUser" : "irashid",
- "completed" : true
- } ]
}, {
"id" : "local-1426533911241",
"name" : "Spark shell",
diff --git a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
index d575bf2f284b9..ce4fe80b66aa5 100644
--- a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
@@ -7,22 +7,6 @@
"sparkUser" : "irashid",
"completed" : true
} ]
-}, {
- "id" : "local-1430917381535",
- "name" : "Spark shell",
- "attempts" : [ {
- "attemptId" : "2",
- "startTime" : "2015-05-06T13:03:00.893GMT",
- "endTime" : "2015-05-06T13:03:00.950GMT",
- "sparkUser" : "irashid",
- "completed" : true
- }, {
- "attemptId" : "1",
- "startTime" : "2015-05-06T13:03:00.880GMT",
- "endTime" : "2015-05-06T13:03:00.890GMT",
- "sparkUser" : "irashid",
- "completed" : true
- } ]
}, {
"id" : "local-1426533911241",
"name" : "Spark shell",
diff --git a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
index 15c2de8ef99ea..dca86fe5f7e6a 100644
--- a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
@@ -7,22 +7,6 @@
"sparkUser" : "irashid",
"completed" : true
} ]
-}, {
- "id" : "local-1430917381535",
- "name" : "Spark shell",
- "attempts" : [ {
- "attemptId" : "2",
- "startTime" : "2015-05-06T13:03:00.893GMT",
- "endTime" : "2015-05-06T13:03:00.950GMT",
- "sparkUser" : "irashid",
- "completed" : true
- }, {
- "attemptId" : "1",
- "startTime" : "2015-05-06T13:03:00.880GMT",
- "endTime" : "2015-05-06T13:03:00.890GMT",
- "sparkUser" : "irashid",
- "completed" : true
- } ]
}, {
"id" : "local-1426533911241",
"name" : "Spark shell",
@@ -40,14 +24,12 @@
"completed" : true
} ]
}, {
- "id": "local-1425081759269",
- "name": "Spark shell",
- "attempts": [
- {
- "startTime": "2015-02-28T00:02:38.277GMT",
- "endTime": "2015-02-28T00:02:46.912GMT",
- "sparkUser": "irashid",
- "completed": true
- }
- ]
+ "id" : "local-1425081759269",
+ "name" : "Spark shell",
+ "attempts" : [ {
+ "startTime" : "2015-02-28T00:02:38.277GMT",
+ "endTime" : "2015-02-28T00:02:46.912GMT",
+ "sparkUser" : "irashid",
+ "completed" : true
+ } ]
} ]
\ No newline at end of file
diff --git a/core/src/test/resources/spark-events/local-1430917381535_1 b/core/src/test/resources/spark-events/local-1430917381535_1
deleted file mode 100644
index d5a1303344825..0000000000000
--- a/core/src/test/resources/spark-events/local-1430917381535_1
+++ /dev/null
@@ -1,5 +0,0 @@
-{"Event":"SparkListenerLogStart","Spark Version":"1.4.0-SNAPSHOT"}
-{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"localhost","Port":61103},"Maximum Memory":278019440,"Timestamp":1430917380880}
-{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","Java Version":"1.8.0_25 (Oracle Corporation)","Scala Version":"version 2.10.4"},"Spark Properties":{"spark.driver.host":"192.168.1.102","spark.eventLog.enabled":"true","spark.driver.port":"61101","spark.repl.class.uri":"http://192.168.1.102:61100","spark.jars":"","spark.app.name":"Spark shell","spark.scheduler.mode":"FIFO","spark.executor.id":"driver","spark.master":"local[*]","spark.eventLog.dir":"/Users/irashid/github/kraps/core/src/test/resources/spark-events","spark.fileserver.uri":"http://192.168.1.102:61102","spark.tachyonStore.folderName":"spark-aaaf41b3-d1dd-447f-8951-acf51490758b","spark.app.id":"local-1430917381534"},"System Properties":{"java.io.tmpdir":"/var/folders/36/m29jw1z95qv4ywb1c4n0rz000000gp/T/","line.separator":"\n","path.separator":":","sun.management.compiler":"HotSpot 64-Bit Tiered Compilers","SPARK_SUBMIT":"true","sun.cpu.endian":"little","java.specification.version":"1.8","java.vm.specification.name":"Java Virtual Machine Specification","java.vendor":"Oracle Corporation","java.vm.specification.version":"1.8","user.home":"/Users/irashid","file.encoding.pkg":"sun.io","sun.nio.ch.bugLevel":"","ftp.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","sun.arch.data.model":"64","sun.boot.library.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib","user.dir":"/Users/irashid/github/spark","java.library.path":"/Users/irashid/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.","sun.cpu.isalist":"","os.arch":"x86_64","java.vm.version":"25.25-b02","java.endorsed.dirs":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/endorsed","java.runtime.version":"1.8.0_25-b17","java.vm.info":"mixed mode","java.ext.dirs":"/Users/irashid/Library/Java/Extensions:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/ext:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java","java.runtime.name":"Java(TM) SE Runtime Environment","file.separator":"/","java.class.version":"52.0","scala.usejavacp":"true","java.specification.name":"Java Platform API Specification","sun.boot.class.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/classes","file.encoding":"UTF-8","user.timezone":"America/Chicago","java.specification.vendor":"Oracle Corporation","sun.java.launcher":"SUN_STANDARD","os.version":"10.9.5","sun.os.patch.level":"unknown","gopherProxySet":"false","java.vm.specification.vendor":"Oracle Corporation","user.country":"US","sun.jnu.encoding":"UTF-8","http.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","user.language":"en","socksNonProxyHosts":"local|*.local|169.254/16|*.169.254/16","java.vendor.url":"http://java.oracle.com/","java.awt.printerjob":"sun.lwawt.macosx.CPrinterJob","java.awt.graphicsenv":"sun.awt.CGraphicsEnvironment","awt.toolkit":"sun.lwawt.macosx.LWCToolkit","os.name":"Mac OS X","java.vm.vendor":"Oracle Corporation","java.vendor.url.bug":"http://bugreport.sun.com/bugreport/","user.name":"irashid","java.vm.name":"Java HotSpot(TM) 64-Bit Server VM","sun.java.command":"org.apache.spark.deploy.SparkSubmit --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=/Users/irashid/github/kraps/core/src/test/resources/spark-events --class org.apache.spark.repl.Main spark-shell","java.home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","java.version":"1.8.0_25","sun.io.unicode.encoding":"UnicodeBig"},"Classpath Entries":{"/etc/hadoop":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-rdbms-3.2.9.jar":"System Classpath","/Users/irashid/github/spark/conf/":"System Classpath","/Users/irashid/github/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.0.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-core-3.2.10.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar":"System Classpath"}}
-{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"local-1430917381535","Timestamp":1430917380880,"User":"irashid","App Attempt ID":"1"}
-{"Event":"SparkListenerApplicationEnd","Timestamp":1430917380890}
\ No newline at end of file
diff --git a/core/src/test/resources/spark-events/local-1430917381535_2 b/core/src/test/resources/spark-events/local-1430917381535_2
deleted file mode 100644
index abb637a22e1e3..0000000000000
--- a/core/src/test/resources/spark-events/local-1430917381535_2
+++ /dev/null
@@ -1,5 +0,0 @@
-{"Event":"SparkListenerLogStart","Spark Version":"1.4.0-SNAPSHOT"}
-{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"localhost","Port":61103},"Maximum Memory":278019440,"Timestamp":1430917380893}
-{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","Java Version":"1.8.0_25 (Oracle Corporation)","Scala Version":"version 2.10.4"},"Spark Properties":{"spark.driver.host":"192.168.1.102","spark.eventLog.enabled":"true","spark.driver.port":"61101","spark.repl.class.uri":"http://192.168.1.102:61100","spark.jars":"","spark.app.name":"Spark shell","spark.scheduler.mode":"FIFO","spark.executor.id":"driver","spark.master":"local[*]","spark.eventLog.dir":"/Users/irashid/github/kraps/core/src/test/resources/spark-events","spark.fileserver.uri":"http://192.168.1.102:61102","spark.tachyonStore.folderName":"spark-aaaf41b3-d1dd-447f-8951-acf51490758b","spark.app.id":"local-1430917381534"},"System Properties":{"java.io.tmpdir":"/var/folders/36/m29jw1z95qv4ywb1c4n0rz000000gp/T/","line.separator":"\n","path.separator":":","sun.management.compiler":"HotSpot 64-Bit Tiered Compilers","SPARK_SUBMIT":"true","sun.cpu.endian":"little","java.specification.version":"1.8","java.vm.specification.name":"Java Virtual Machine Specification","java.vendor":"Oracle Corporation","java.vm.specification.version":"1.8","user.home":"/Users/irashid","file.encoding.pkg":"sun.io","sun.nio.ch.bugLevel":"","ftp.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","sun.arch.data.model":"64","sun.boot.library.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib","user.dir":"/Users/irashid/github/spark","java.library.path":"/Users/irashid/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.","sun.cpu.isalist":"","os.arch":"x86_64","java.vm.version":"25.25-b02","java.endorsed.dirs":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/endorsed","java.runtime.version":"1.8.0_25-b17","java.vm.info":"mixed mode","java.ext.dirs":"/Users/irashid/Library/Java/Extensions:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/ext:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java","java.runtime.name":"Java(TM) SE Runtime Environment","file.separator":"/","java.class.version":"52.0","scala.usejavacp":"true","java.specification.name":"Java Platform API Specification","sun.boot.class.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/classes","file.encoding":"UTF-8","user.timezone":"America/Chicago","java.specification.vendor":"Oracle Corporation","sun.java.launcher":"SUN_STANDARD","os.version":"10.9.5","sun.os.patch.level":"unknown","gopherProxySet":"false","java.vm.specification.vendor":"Oracle Corporation","user.country":"US","sun.jnu.encoding":"UTF-8","http.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","user.language":"en","socksNonProxyHosts":"local|*.local|169.254/16|*.169.254/16","java.vendor.url":"http://java.oracle.com/","java.awt.printerjob":"sun.lwawt.macosx.CPrinterJob","java.awt.graphicsenv":"sun.awt.CGraphicsEnvironment","awt.toolkit":"sun.lwawt.macosx.LWCToolkit","os.name":"Mac OS X","java.vm.vendor":"Oracle Corporation","java.vendor.url.bug":"http://bugreport.sun.com/bugreport/","user.name":"irashid","java.vm.name":"Java HotSpot(TM) 64-Bit Server VM","sun.java.command":"org.apache.spark.deploy.SparkSubmit --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=/Users/irashid/github/kraps/core/src/test/resources/spark-events --class org.apache.spark.repl.Main spark-shell","java.home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","java.version":"1.8.0_25","sun.io.unicode.encoding":"UnicodeBig"},"Classpath Entries":{"/etc/hadoop":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-rdbms-3.2.9.jar":"System Classpath","/Users/irashid/github/spark/conf/":"System Classpath","/Users/irashid/github/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.0.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-core-3.2.10.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar":"System Classpath"}}
-{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"local-1430917381535","Timestamp":1430917380893,"User":"irashid","App Attempt ID":"2"}
-{"Event":"SparkListenerApplicationEnd","Timestamp":1430917380950}
\ No newline at end of file
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 803e1831bb269..1c2b681f0b843 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -90,7 +90,7 @@ class ExecutorAllocationManagerSuite
}
test("add executors") {
- sc = createSparkContext(1, 10, 1)
+ sc = createSparkContext(1, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
@@ -135,7 +135,7 @@ class ExecutorAllocationManagerSuite
}
test("add executors capped by num pending tasks") {
- sc = createSparkContext(0, 10, 0)
+ sc = createSparkContext(0, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5)))
@@ -186,7 +186,7 @@ class ExecutorAllocationManagerSuite
}
test("cancel pending executors when no longer needed") {
- sc = createSparkContext(0, 10, 0)
+ sc = createSparkContext(0, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5)))
@@ -213,7 +213,7 @@ class ExecutorAllocationManagerSuite
}
test("remove executors") {
- sc = createSparkContext(5, 10, 5)
+ sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
(1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
@@ -263,7 +263,7 @@ class ExecutorAllocationManagerSuite
}
test ("interleaving add and remove") {
- sc = createSparkContext(5, 10, 5)
+ sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
@@ -331,7 +331,7 @@ class ExecutorAllocationManagerSuite
}
test("starting/canceling add timer") {
- sc = createSparkContext(2, 10, 2)
+ sc = createSparkContext(2, 10)
val clock = new ManualClock(8888L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -363,7 +363,7 @@ class ExecutorAllocationManagerSuite
}
test("starting/canceling remove timers") {
- sc = createSparkContext(2, 10, 2)
+ sc = createSparkContext(2, 10)
val clock = new ManualClock(14444L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -410,7 +410,7 @@ class ExecutorAllocationManagerSuite
}
test("mock polling loop with no events") {
- sc = createSparkContext(0, 20, 0)
+ sc = createSparkContext(0, 20)
val manager = sc.executorAllocationManager.get
val clock = new ManualClock(2020L)
manager.setClock(clock)
@@ -436,7 +436,7 @@ class ExecutorAllocationManagerSuite
}
test("mock polling loop add behavior") {
- sc = createSparkContext(0, 20, 0)
+ sc = createSparkContext(0, 20)
val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -486,7 +486,7 @@ class ExecutorAllocationManagerSuite
}
test("mock polling loop remove behavior") {
- sc = createSparkContext(1, 20, 1)
+ sc = createSparkContext(1, 20)
val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -547,7 +547,7 @@ class ExecutorAllocationManagerSuite
}
test("listeners trigger add executors correctly") {
- sc = createSparkContext(2, 10, 2)
+ sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get
assert(addTime(manager) === NOT_SET)
@@ -577,7 +577,7 @@ class ExecutorAllocationManagerSuite
}
test("listeners trigger remove executors correctly") {
- sc = createSparkContext(2, 10, 2)
+ sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get
assert(removeTimes(manager).isEmpty)
@@ -608,7 +608,7 @@ class ExecutorAllocationManagerSuite
}
test("listeners trigger add and remove executor callbacks correctly") {
- sc = createSparkContext(2, 10, 2)
+ sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty)
@@ -641,7 +641,7 @@ class ExecutorAllocationManagerSuite
}
test("SPARK-4951: call onTaskStart before onBlockManagerAdded") {
- sc = createSparkContext(2, 10, 2)
+ sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty)
@@ -677,7 +677,7 @@ class ExecutorAllocationManagerSuite
}
test("avoid ramp up when target < running executors") {
- sc = createSparkContext(0, 100000, 0)
+ sc = createSparkContext(0, 100000)
val manager = sc.executorAllocationManager.get
val stage1 = createStageInfo(0, 1000)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
@@ -701,67 +701,13 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsTarget(manager) === 16)
}
- test("avoid ramp down initial executors until first job is submitted") {
- sc = createSparkContext(2, 5, 3)
- val manager = sc.executorAllocationManager.get
- val clock = new ManualClock(10000L)
- manager.setClock(clock)
-
- // Verify the initial number of executors
- assert(numExecutorsTarget(manager) === 3)
- schedule(manager)
- // Verify whether the initial number of executors is kept with no pending tasks
- assert(numExecutorsTarget(manager) === 3)
-
- sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
- clock.advance(100L)
-
- assert(maxNumExecutorsNeeded(manager) === 2)
- schedule(manager)
-
- // Verify that current number of executors should be ramp down when first job is submitted
- assert(numExecutorsTarget(manager) === 2)
- }
-
- test("avoid ramp down initial executors until idle executor is timeout") {
- sc = createSparkContext(2, 5, 3)
- val manager = sc.executorAllocationManager.get
- val clock = new ManualClock(10000L)
- manager.setClock(clock)
-
- // Verify the initial number of executors
- assert(numExecutorsTarget(manager) === 3)
- schedule(manager)
- // Verify the initial number of executors is kept when no pending tasks
- assert(numExecutorsTarget(manager) === 3)
- (0 until 3).foreach { i =>
- onExecutorAdded(manager, s"executor-$i")
- }
-
- clock.advance(executorIdleTimeout * 1000)
-
- assert(maxNumExecutorsNeeded(manager) === 0)
- schedule(manager)
- // Verify executor is timeout but numExecutorsTarget is not recalculated
- assert(numExecutorsTarget(manager) === 3)
-
- // Schedule again to recalculate the numExecutorsTarget after executor is timeout
- schedule(manager)
- // Verify that current number of executors should be ramp down when executor is timeout
- assert(numExecutorsTarget(manager) === 2)
- }
-
- private def createSparkContext(
- minExecutors: Int = 1,
- maxExecutors: Int = 5,
- initialExecutors: Int = 1): SparkContext = {
+ private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
- .set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString)
.set("spark.dynamicAllocation.schedulerBacklogTimeout",
s"${schedulerBacklogTimeout.toString}s")
.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout",
@@ -845,10 +791,6 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
manager invokePrivate _schedule()
}
- private def maxNumExecutorsNeeded(manager: ExecutorAllocationManager): Int = {
- manager invokePrivate _maxNumExecutorsNeeded()
- }
-
private def addExecutors(manager: ExecutorAllocationManager): Int = {
val maxNumExecutorsNeeded = manager invokePrivate _maxNumExecutorsNeeded()
manager invokePrivate _addExecutors(maxNumExecutorsNeeded)
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 140012226fdbb..bac6fdbcdc976 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -55,14 +55,6 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient])
- // In a slow machine, one slave may register hundreds of milliseconds ahead of the other one.
- // If we don't wait for all slaves, it's possible that only one executor runs all jobs. Then
- // all shuffle blocks will be in this executor, ShuffleBlockFetcherIterator will directly fetch
- // local blocks from the local BlockManager and won't send requests to ExternalShuffleService.
- // In this case, we won't receive FetchFailed. And it will make this test fail.
- // Therefore, we should wait until all slaves are up
- sc.jobProgressListener.waitUntilExecutorsUp(2, 10000)
-
val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _)
rdd.count()
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index c054c718075f8..c05e8bb6538ba 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -17,9 +17,11 @@
package org.apache.spark.broadcast
+import scala.concurrent.duration._
import scala.util.Random
import org.scalatest.Assertions
+import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.io.SnappyCompressionCodec
@@ -310,7 +312,13 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
val _sc =
new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
// Wait until all salves are up
- _sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000)
+ eventually(timeout(10.seconds), interval(10.milliseconds)) {
+ _sc.jobProgressListener.synchronized {
+ val numBlockManagers = _sc.jobProgressListener.blockManagerIds.size
+ assert(numBlockManagers == numSlaves + 1,
+ s"Expect ${numSlaves + 1} block managers, but was ${numBlockManagers}")
+ }
+ }
_sc
} else {
new SparkContext("local", "test", broadcastConf)
diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index ddc92814c0acf..c215b0582889f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -41,7 +41,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
// Browse to each URL to check that it's valid
@@ -71,7 +71,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
assert(listeners.size === 1)
val listener = listeners(0)
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 46ea28d0f18f6..46369457f000a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -62,7 +62,7 @@ class SparkSubmitSuite
SparkSubmit.printStream = printStream
@volatile var exitedCleanly = false
- SparkSubmit.exitFn = (_) => exitedCleanly = true
+ SparkSubmit.exitFn = () => exitedCleanly = true
val thread = new Thread {
override def run() = try {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 09075eeb539aa..0f6933df9e6bc 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -17,16 +17,12 @@
package org.apache.spark.deploy.history
-import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream, File,
- FileOutputStream, OutputStreamWriter}
+import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
import java.net.URI
import java.util.concurrent.TimeUnit
-import java.util.zip.{ZipInputStream, ZipOutputStream}
import scala.io.Source
-import com.google.common.base.Charsets
-import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods._
import org.scalatest.BeforeAndAfter
@@ -339,40 +335,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
assert(!log2.exists())
}
- test("Event log copy") {
- val provider = new FsHistoryProvider(createTestConf())
- val logs = (1 to 2).map { i =>
- val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false)
- writeFile(log, true, None,
- SparkListenerApplicationStart(
- "downloadApp1", Some("downloadApp1"), 5000 * i, "test", Some(s"attempt$i")),
- SparkListenerApplicationEnd(5001 * i)
- )
- log
- }
- provider.checkForLogs()
-
- (1 to 2).foreach { i =>
- val underlyingStream = new ByteArrayOutputStream()
- val outputStream = new ZipOutputStream(underlyingStream)
- provider.writeEventLogs("downloadApp1", Some(s"attempt$i"), outputStream)
- outputStream.close()
- val inputStream = new ZipInputStream(new ByteArrayInputStream(underlyingStream.toByteArray))
- var totalEntries = 0
- var entry = inputStream.getNextEntry
- entry should not be null
- while (entry != null) {
- val actual = new String(ByteStreams.toByteArray(inputStream), Charsets.UTF_8)
- val expected = Files.toString(logs.find(_.getName == entry.getName).get, Charsets.UTF_8)
- actual should be (expected)
- totalEntries += 1
- entry = inputStream.getNextEntry
- }
- totalEntries should be (1)
- inputStream.close()
- }
- }
-
/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index e5b5e1bb65337..14f2d1a5894b8 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -16,13 +16,10 @@
*/
package org.apache.spark.deploy.history
-import java.io.{File, FileInputStream, FileWriter, InputStream, IOException}
+import java.io.{File, FileInputStream, FileWriter, IOException}
import java.net.{HttpURLConnection, URL}
-import java.util.zip.ZipInputStream
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
-import com.google.common.base.Charsets
-import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.{FileUtils, IOUtils}
import org.mockito.Mockito.when
import org.scalatest.{BeforeAndAfter, Matchers}
@@ -150,70 +147,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
}
}
- test("download all logs for app with multiple attempts") {
- doDownloadTest("local-1430917381535", None)
- }
-
- test("download one log for app with multiple attempts") {
- (1 to 2).foreach { attemptId => doDownloadTest("local-1430917381535", Some(attemptId)) }
- }
-
- test("download legacy logs - all attempts") {
- doDownloadTest("local-1426533911241", None, legacy = true)
- }
-
- test("download legacy logs - single attempts") {
- (1 to 2). foreach {
- attemptId => doDownloadTest("local-1426533911241", Some(attemptId), legacy = true)
- }
- }
-
- // Test that the files are downloaded correctly, and validate them.
- def doDownloadTest(appId: String, attemptId: Option[Int], legacy: Boolean = false): Unit = {
-
- val url = attemptId match {
- case Some(id) =>
- new URL(s"${generateURL(s"applications/$appId")}/$id/logs")
- case None =>
- new URL(s"${generateURL(s"applications/$appId")}/logs")
- }
-
- val (code, inputStream, error) = HistoryServerSuite.connectAndGetInputStream(url)
- code should be (HttpServletResponse.SC_OK)
- inputStream should not be None
- error should be (None)
-
- val zipStream = new ZipInputStream(inputStream.get)
- var entry = zipStream.getNextEntry
- entry should not be null
- val totalFiles = {
- if (legacy) {
- attemptId.map { x => 3 }.getOrElse(6)
- } else {
- attemptId.map { x => 1 }.getOrElse(2)
- }
- }
- var filesCompared = 0
- while (entry != null) {
- if (!entry.isDirectory) {
- val expectedFile = {
- if (legacy) {
- val splits = entry.getName.split("/")
- new File(new File(logDir, splits(0)), splits(1))
- } else {
- new File(logDir, entry.getName)
- }
- }
- val expected = Files.toString(expectedFile, Charsets.UTF_8)
- val actual = new String(ByteStreams.toByteArray(zipStream), Charsets.UTF_8)
- actual should be (expected)
- filesCompared += 1
- }
- entry = zipStream.getNextEntry
- }
- filesCompared should be (totalFiles)
- }
-
test("response codes on bad paths") {
val badAppId = getContentAndCode("applications/foobar")
badAppId._1 should be (HttpServletResponse.SC_NOT_FOUND)
@@ -269,11 +202,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
}
def getUrl(path: String): String = {
- HistoryServerSuite.getUrl(generateURL(path))
- }
-
- def generateURL(path: String): URL = {
- new URL(s"http://localhost:$port/api/v1/$path")
+ HistoryServerSuite.getUrl(new URL(s"http://localhost:$port/api/v1/$path"))
}
def generateExpectation(name: String, path: String): Unit = {
@@ -304,18 +233,13 @@ object HistoryServerSuite {
}
def getContentAndCode(url: URL): (Int, Option[String], Option[String]) = {
- val (code, in, errString) = connectAndGetInputStream(url)
- val inString = in.map(IOUtils.toString)
- (code, inString, errString)
- }
-
- def connectAndGetInputStream(url: URL): (Int, Option[InputStream], Option[String]) = {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestMethod("GET")
connection.connect()
val code = connection.getResponseCode()
- val inStream = try {
- Option(connection.getInputStream())
+ val inString = try {
+ val in = Option(connection.getInputStream())
+ in.map(IOUtils.toString)
} catch {
case io: IOException => None
}
@@ -325,7 +249,7 @@ object HistoryServerSuite {
} catch {
case io: IOException => None
}
- (code, inStream, errString)
+ (code, inString, errString)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
index 72eaffb416981..572360ddb95d4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.worker.ui
import java.io.{File, FileWriter}
-import org.mockito.Mockito.{mock, when}
+import org.mockito.Mockito.mock
import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkFunSuite
@@ -28,47 +28,33 @@ class LogPageSuite extends SparkFunSuite with PrivateMethodTester {
test("get logs simple") {
val webui = mock(classOf[WorkerWebUI])
- val tmpDir = new File(sys.props("java.io.tmpdir"))
- val workDir = new File(tmpDir, "work-dir")
- workDir.mkdir()
- when(webui.workDir).thenReturn(workDir)
val logPage = new LogPage(webui)
// Prepare some fake log files to read later
val out = "some stdout here"
val err = "some stderr here"
- val tmpOut = new File(workDir, "stdout")
- val tmpErr = new File(workDir, "stderr")
- val tmpErrBad = new File(tmpDir, "stderr") // outside the working directory
- val tmpOutBad = new File(tmpDir, "stdout")
- val tmpRand = new File(workDir, "random")
+ val tmpDir = new File(sys.props("java.io.tmpdir"))
+ val tmpOut = new File(tmpDir, "stdout")
+ val tmpErr = new File(tmpDir, "stderr")
+ val tmpRand = new File(tmpDir, "random")
write(tmpOut, out)
write(tmpErr, err)
- write(tmpOutBad, out)
- write(tmpErrBad, err)
write(tmpRand, "1 6 4 5 2 7 8")
// Get the logs. All log types other than "stderr" or "stdout" will be rejected
val getLog = PrivateMethod[(String, Long, Long, Long)]('getLog)
val (stdout, _, _, _) =
- logPage invokePrivate getLog(workDir.getAbsolutePath, "stdout", None, 100)
+ logPage invokePrivate getLog(tmpDir.getAbsolutePath, "stdout", None, 100)
val (stderr, _, _, _) =
- logPage invokePrivate getLog(workDir.getAbsolutePath, "stderr", None, 100)
+ logPage invokePrivate getLog(tmpDir.getAbsolutePath, "stderr", None, 100)
val (error1, _, _, _) =
- logPage invokePrivate getLog(workDir.getAbsolutePath, "random", None, 100)
+ logPage invokePrivate getLog(tmpDir.getAbsolutePath, "random", None, 100)
val (error2, _, _, _) =
- logPage invokePrivate getLog(workDir.getAbsolutePath, "does-not-exist.txt", None, 100)
- // These files exist, but live outside the working directory
- val (error3, _, _, _) =
- logPage invokePrivate getLog(tmpDir.getAbsolutePath, "stderr", None, 100)
- val (error4, _, _, _) =
- logPage invokePrivate getLog(tmpDir.getAbsolutePath, "stdout", None, 100)
+ logPage invokePrivate getLog(tmpDir.getAbsolutePath, "does-not-exist.txt", None, 100)
assert(stdout === out)
assert(stderr === err)
- assert(error1.startsWith("Error: Log type must be one of "))
- assert(error2.startsWith("Error: Log type must be one of "))
- assert(error3.startsWith("Error: invalid log directory"))
- assert(error4.startsWith("Error: invalid log directory"))
+ assert(error1.startsWith("Error"))
+ assert(error2.startsWith("Error"))
}
/** Write the specified string to the file. */
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 47b2868753c0e..bfcf918e06162 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -254,7 +254,7 @@ class DAGSchedulerSuite
test("[SPARK-3353] parent stage should have lower stage id") {
sparkListener.stageByOrderOfExecution.clear()
sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count()
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.stageByOrderOfExecution.length === 2)
assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1))
}
@@ -389,7 +389,7 @@ class DAGSchedulerSuite
submit(unserializableRdd, Array(0))
assert(failure.getMessage.startsWith(
"Job aborted due to stage failure: Task not serializable:"))
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
assertDataStructuresEmpty()
@@ -399,7 +399,7 @@ class DAGSchedulerSuite
submit(new MyRDD(sc, 1, Nil), Array(0))
failed(taskSets(0), "some failure")
assert(failure.getMessage === "Job aborted due to stage failure: some failure")
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
assertDataStructuresEmpty()
@@ -410,7 +410,7 @@ class DAGSchedulerSuite
val jobId = submit(rdd, Array(0))
cancel(jobId)
assert(failure.getMessage === s"Job $jobId cancelled ")
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
assertDataStructuresEmpty()
@@ -462,7 +462,7 @@ class DAGSchedulerSuite
assert(results === Map(0 -> 42))
assertDataStructuresEmpty()
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.isEmpty)
assert(sparkListener.successfulStages.contains(0))
}
@@ -531,7 +531,7 @@ class DAGSchedulerSuite
Map[Long, Any](),
createFakeTaskInfo(),
null))
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(1))
// The second ResultTask fails, with a fetch failure for the output from the second mapper.
@@ -543,7 +543,7 @@ class DAGSchedulerSuite
createFakeTaskInfo(),
null))
// The SparkListener should not receive redundant failure events.
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.size == 1)
}
@@ -592,7 +592,7 @@ class DAGSchedulerSuite
// Listener bus should get told about the map stage failing, but not the reduce stage
// (since the reduce stage hasn't been started yet).
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.toSet === Set(0))
assertDataStructuresEmpty()
@@ -643,7 +643,7 @@ class DAGSchedulerSuite
assert(cancelledStages.toSet === Set(0, 2))
// Make sure the listeners got told about both failed stages.
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.successfulStages.isEmpty)
assert(sparkListener.failedStages.toSet === Set(0, 2))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 651295b7344c5..06fb909bf5419 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -47,7 +47,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Starting listener bus should flush all buffered events
bus.start(sc)
- bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(counter.count === 5)
// After listener bus has stopped, posting events should not increment counter
@@ -131,7 +131,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
rdd2.setName("Target RDD")
rdd2.count()
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be {1}
val (stageInfo, taskInfoMetrics) = listener.stageInfos.head
@@ -156,7 +156,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
rdd3.setName("Trois")
rdd1.count()
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be {1}
val stageInfo1 = listener.stageInfos.keys.find(_.stageId == 0).get
stageInfo1.rddInfos.size should be {1} // ParallelCollectionRDD
@@ -165,7 +165,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.stageInfos.clear()
rdd2.count()
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be {1}
val stageInfo2 = listener.stageInfos.keys.find(_.stageId == 1).get
stageInfo2.rddInfos.size should be {3} // ParallelCollectionRDD, FilteredRDD, MappedRDD
@@ -174,7 +174,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.stageInfos.clear()
rdd3.count()
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be {2} // Shuffle map stage + result stage
val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 3).get
stageInfo3.rddInfos.size should be {1} // ShuffledRDD
@@ -190,7 +190,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val rdd2 = rdd1.map(_.toString)
sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1), true)
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be {1}
val (stageInfo, _) = listener.stageInfos.head
@@ -214,7 +214,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val d = sc.parallelize(0 to 1e4.toInt, 64).map(w)
d.count()
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be (1)
val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
@@ -225,7 +225,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
d4.setName("A Cogroup")
d4.collectAsMap()
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be (4)
listener.stageInfos.foreach { case (stageInfo, taskInfoMetrics) =>
/**
@@ -281,7 +281,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
.reduce { case (x, y) => x }
assert(result === 1.to(akkaFrameSize).toArray)
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
val TASK_INDEX = 0
assert(listener.startedTasks.contains(TASK_INDEX))
assert(listener.startedGettingResultTasks.contains(TASK_INDEX))
@@ -297,7 +297,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
assert(result === 2)
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
val TASK_INDEX = 0
assert(listener.startedTasks.contains(TASK_INDEX))
assert(listener.startedGettingResultTasks.isEmpty)
@@ -352,7 +352,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Post events to all listeners, and wait until the queue is drained
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
- bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
// The exception should be caught, and the event should be propagated to other listeners
assert(bus.listenerThreadIsAlive)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
index d97fba00976d2..c7f179e1483a5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
@@ -17,12 +17,12 @@
package org.apache.spark.scheduler
-import scala.collection.mutable
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
-import org.apache.spark.scheduler.cluster.ExecutorInfo
+import scala.collection.mutable
/**
* Unit tests for SparkListener that require a local cluster.
@@ -41,16 +41,12 @@ class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext
val listener = new SaveExecutorInfo
sc.addSparkListener(listener)
- // This test will check if the number of executors received by "SparkListener" is same as the
- // number of all executors, so we need to wait until all executors are up
- sc.jobProgressListener.waitUntilExecutorsUp(2, 10000)
-
val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.map(_.toString)
rdd2.setName("Target RDD")
rdd2.count()
- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(listener.addedExecutorInfo.size == 2)
assert(listener.addedExecutorInfo("0").totalCores == 1)
assert(listener.addedExecutorInfo("1").totalCores == 1)
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index a61ea3918f46a..a867cf83dc3f1 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -608,69 +608,4 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
manager.runAll()
assert(output.toList === List(4, 3, 2))
}
-
- test("isInDirectory") {
- val tmpDir = new File(sys.props("java.io.tmpdir"))
- val parentDir = new File(tmpDir, "parent-dir")
- val childDir1 = new File(parentDir, "child-dir-1")
- val childDir1b = new File(parentDir, "child-dir-1b")
- val childFile1 = new File(parentDir, "child-file-1.txt")
- val childDir2 = new File(childDir1, "child-dir-2")
- val childDir2b = new File(childDir1, "child-dir-2b")
- val childFile2 = new File(childDir1, "child-file-2.txt")
- val childFile3 = new File(childDir2, "child-file-3.txt")
- val nullFile: File = null
- parentDir.mkdir()
- childDir1.mkdir()
- childDir1b.mkdir()
- childDir2.mkdir()
- childDir2b.mkdir()
- childFile1.createNewFile()
- childFile2.createNewFile()
- childFile3.createNewFile()
-
- // Identity
- assert(Utils.isInDirectory(parentDir, parentDir))
- assert(Utils.isInDirectory(childDir1, childDir1))
- assert(Utils.isInDirectory(childDir2, childDir2))
-
- // Valid ancestor-descendant pairs
- assert(Utils.isInDirectory(parentDir, childDir1))
- assert(Utils.isInDirectory(parentDir, childFile1))
- assert(Utils.isInDirectory(parentDir, childDir2))
- assert(Utils.isInDirectory(parentDir, childFile2))
- assert(Utils.isInDirectory(parentDir, childFile3))
- assert(Utils.isInDirectory(childDir1, childDir2))
- assert(Utils.isInDirectory(childDir1, childFile2))
- assert(Utils.isInDirectory(childDir1, childFile3))
- assert(Utils.isInDirectory(childDir2, childFile3))
-
- // Inverted ancestor-descendant pairs should fail
- assert(!Utils.isInDirectory(childDir1, parentDir))
- assert(!Utils.isInDirectory(childDir2, parentDir))
- assert(!Utils.isInDirectory(childDir2, childDir1))
- assert(!Utils.isInDirectory(childFile1, parentDir))
- assert(!Utils.isInDirectory(childFile2, parentDir))
- assert(!Utils.isInDirectory(childFile3, parentDir))
- assert(!Utils.isInDirectory(childFile2, childDir1))
- assert(!Utils.isInDirectory(childFile3, childDir1))
- assert(!Utils.isInDirectory(childFile3, childDir2))
-
- // Non-existent files or directories should fail
- assert(!Utils.isInDirectory(parentDir, new File(parentDir, "one.txt")))
- assert(!Utils.isInDirectory(parentDir, new File(parentDir, "one/two.txt")))
- assert(!Utils.isInDirectory(parentDir, new File(parentDir, "one/two/three.txt")))
-
- // Siblings should fail
- assert(!Utils.isInDirectory(childDir1, childDir1b))
- assert(!Utils.isInDirectory(childDir1, childFile1))
- assert(!Utils.isInDirectory(childDir2, childDir2b))
- assert(!Utils.isInDirectory(childDir2, childFile2))
-
- // Null files should fail without throwing NPE
- assert(!Utils.isInDirectory(parentDir, nullFile))
- assert(!Utils.isInDirectory(childFile3, nullFile))
- assert(!Utils.isInDirectory(nullFile, parentDir))
- assert(!Utils.isInDirectory(nullFile, childFile3))
- }
}
diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh
index 54274a83f6d66..0b14a618e755c 100755
--- a/dev/create-release/create-release.sh
+++ b/dev/create-release/create-release.sh
@@ -228,14 +228,14 @@ if [[ ! "$@" =~ --skip-package ]]; then
# We increment the Zinc port each time to avoid OOM's and other craziness if multiple builds
# share the same Zinc server.
- make_binary_release "hadoop1" "-Psparkr -Phadoop-1 -Phive -Phive-thriftserver" "3030" &
- make_binary_release "hadoop1-scala2.11" "-Psparkr -Phadoop-1 -Phive -Dscala-2.11" "3031" &
- make_binary_release "cdh4" "-Psparkr -Phadoop-1 -Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0" "3032" &
- make_binary_release "hadoop2.3" "-Psparkr -Phadoop-2.3 -Phive -Phive-thriftserver -Pyarn" "3033" &
- make_binary_release "hadoop2.4" "-Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn" "3034" &
- make_binary_release "mapr3" "-Pmapr3 -Psparkr -Phive -Phive-thriftserver" "3035" &
- make_binary_release "mapr4" "-Pmapr4 -Psparkr -Pyarn -Phive -Phive-thriftserver" "3036" &
- make_binary_release "hadoop2.4-without-hive" "-Psparkr -Phadoop-2.4 -Pyarn" "3037" &
+ make_binary_release "hadoop1" "-Psparkr -Psparkr-docs -Phadoop-1 -Phive -Phive-thriftserver" "3030" &
+ make_binary_release "hadoop1-scala2.11" "-Psparkr -Psparkr-docs -Phadoop-1 -Phive -Dscala-2.11" "3031" &
+ make_binary_release "cdh4" "-Psparkr -Psparkr-docs -Phadoop-1 -Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0" "3032" &
+ make_binary_release "hadoop2.3" "-Psparkr -Psparkr-docs -Phadoop-2.3 -Phive -Phive-thriftserver -Pyarn" "3033" &
+ make_binary_release "hadoop2.4" "-Psparkr -Psparkr-docs -Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn" "3034" &
+ make_binary_release "mapr3" "-Pmapr3 -Psparkr -Psparkr-docs -Phive -Phive-thriftserver" "3035" &
+ make_binary_release "mapr4" "-Pmapr4 -Psparkr -Psparkr-docs -Pyarn -Phive -Phive-thriftserver" "3036" &
+ make_binary_release "hadoop2.4-without-hive" "-Psparkr -Psparkr-docs -Phadoop-2.4 -Pyarn" "3037" &
wait
rm -rf spark-$RELEASE_VERSION-bin-*/
diff --git a/dev/run-tests b/dev/run-tests
index d178e2a4601ea..7dd8d31fd44e3 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -80,19 +80,18 @@ export SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Pkinesis-asl"
# Only run Hive tests if there are SQL changes.
# Partial solution for SPARK-1455.
if [ -n "$AMPLAB_JENKINS" ]; then
- target_branch="$ghprbTargetBranch"
- git fetch origin "$target_branch":"$target_branch"
+ git fetch origin master:master
# AMP_JENKINS_PRB indicates if the current build is a pull request build.
if [ -n "$AMP_JENKINS_PRB" ]; then
# It is a pull request build.
sql_diffs=$(
- git diff --name-only "$target_branch" \
+ git diff --name-only master \
| grep -e "^sql/" -e "^bin/spark-sql" -e "^sbin/start-thriftserver.sh"
)
non_sql_diffs=$(
- git diff --name-only "$target_branch" \
+ git diff --name-only master \
| grep -v -e "^sql/" -e "^bin/spark-sql" -e "^sbin/start-thriftserver.sh"
)
diff --git a/dev/run-tests-jenkins b/dev/run-tests-jenkins
index 641b0ff3c4be4..8b2a44fd72ba5 100755
--- a/dev/run-tests-jenkins
+++ b/dev/run-tests-jenkins
@@ -47,9 +47,7 @@ COMMIT_URL="https://github.com/apache/spark/commit/${ghprbActualCommit}"
# GitHub doesn't auto-link short hashes when submitted via the API, unfortunately. :(
SHORT_COMMIT_HASH="${ghprbActualCommit:0:7}"
-# format: http://linux.die.net/man/1/timeout
-# must be less than the timeout configured on Jenkins (currently 180m)
-TESTS_TIMEOUT="175m"
+TESTS_TIMEOUT="150m" # format: http://linux.die.net/man/1/timeout
# Array to capture all tests to run on the pull request. These tests are held under the
#+ dev/tests/ directory.
@@ -193,7 +191,7 @@ done
test_result="$?"
if [ "$test_result" -eq "124" ]; then
- fail_message="**[Test build ${BUILD_DISPLAY_NAME} timed out](${BUILD_URL}console)** \
+ fail_message="**[Test build ${BUILD_DISPLAY_NAME} timed out](${BUILD_URL}consoleFull)** \
for PR $ghprbPullId at commit [\`${SHORT_COMMIT_HASH}\`](${COMMIT_URL}) \
after a configured wait of \`${TESTS_TIMEOUT}\`."
@@ -233,7 +231,7 @@ done
# post end message
{
result_message="\
- [Test build ${BUILD_DISPLAY_NAME} has finished](${BUILD_URL}console) for \
+ [Test build ${BUILD_DISPLAY_NAME} has finished](${BUILD_URL}consoleFull) for \
PR $ghprbPullId at commit [\`${SHORT_COMMIT_HASH}\`](${COMMIT_URL})."
result_message="${result_message}\n${test_result_note}"
diff --git a/docs/_config.yml b/docs/_config.yml
index c0e031a83ba9c..b22b627f09007 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -14,8 +14,8 @@ include:
# These allow the documentation to be updated with newer releases
# of Spark, Scala, and Mesos.
-SPARK_VERSION: 1.5.0-SNAPSHOT
-SPARK_VERSION_SHORT: 1.5.0
+SPARK_VERSION: 1.4.0-SNAPSHOT
+SPARK_VERSION_SHORT: 1.4.0
SCALA_BINARY_VERSION: "2.10"
SCALA_VERSION: "2.10.4"
MESOS_VERSION: 0.21.0
diff --git a/docs/monitoring.md b/docs/monitoring.md
index bcf885fe4e681..e75018499003a 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -228,14 +228,6 @@ for a running application, at `http://localhost:4040/api/v1`.
/applications/[app-id]/storage/rdd/[rdd-id] |
Details for the storage status of a given RDD |
-
- /applications/[app-id]/logs |
- Download the event logs for all attempts of the given application as a zip file |
-
-
- /applications/[app-id]/[attempt-id]/logs |
- Download the event logs for the specified attempt of the given application as a zip file |
-
When running on Yarn, each application has multiple attempts, so `[app-id]` is actually
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index cde5830c733e0..282ea75e1e785 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1785,13 +1785,6 @@ that these options will be deprecated in future release as more optimizations ar
Configures the number of partitions to use when shuffling data for joins or aggregations.
-
- spark.sql.planner.externalSort |
- false |
-
- When true, performs sorts spilling to disk as needed otherwise sort each partition in memory.
- |
-
# Distributed SQL Engine
diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md
index d6d5605948a5a..64714f0b799fc 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -29,7 +29,7 @@ Next, we discuss how to use this approach in your streaming application.
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
You can also specify the key and value classes and their corresponding decoder classes using variations of `createStream`. See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
- and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala).
+ and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala).
import org.apache.spark.streaming.kafka.*;
@@ -39,7 +39,7 @@ Next, we discuss how to use this approach in your streaming application.
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
You can also specify the key and value classes and their corresponding decoder classes using variations of `createStream`. See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
- and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java).
+ and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java).
@@ -105,7 +105,7 @@ Next, we discuss how to use this approach in your streaming application.
streamingContext, [map of Kafka parameters], [set of topics to consume])
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
- and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala).
+ and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala).
import org.apache.spark.streaming.kafka.*;
@@ -116,7 +116,7 @@ Next, we discuss how to use this approach in your streaming application.
[map of Kafka parameters], [set of topics to consume]);
See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
- and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).
+ and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).
@@ -153,4 +153,4 @@ Next, we discuss how to use this approach in your streaming application.
Another thing to note is that since this approach does not use Receivers, the standard receiver-related (that is, [configurations](configuration.html) of the form `spark.streaming.receiver.*` ) will not apply to the input DStreams created by this approach (will apply to other input DStreams though). Instead, use the [configurations](configuration.html) `spark.streaming.kafka.*`. An important one is `spark.streaming.kafka.maxRatePerPartition` which is the maximum rate at which each Kafka partition will be read by this direct API.
-3. **Deploying:** Similar to the first approach, you can package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR and the launch the application using `spark-submit`. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation.
+3. **Deploying:** Similar to the first approach, you can package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR and the launch the application using `spark-submit`. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation.
\ No newline at end of file
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 84629cb9a0ca0..ee0904c9e5d54 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -219,8 +219,7 @@ def parse_args():
"(default: %default).")
parser.add_option(
"--hadoop-major-version", default="1",
- help="Major version of Hadoop. Valid options are 1 (Hadoop 1.0.4), 2 (CDH 4.2.0), yarn " +
- "(Hadoop 2.4.0) (default: %default)")
+ help="Major version of Hadoop (default: %default)")
parser.add_option(
"-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
help="Use SSH dynamic port forwarding to create a SOCKS proxy at " +
@@ -272,8 +271,7 @@ def parse_args():
help="Launch fresh slaves, but use an existing stopped master if possible")
parser.add_option(
"--worker-instances", type="int", default=1,
- help="Number of instances per worker: variable SPARK_WORKER_INSTANCES. Not used if YARN " +
- "is used as Hadoop major version (default: %default)")
+ help="Number of instances per worker: variable SPARK_WORKER_INSTANCES (default: %default)")
parser.add_option(
"--master-opts", type="string", default="",
help="Extra options to give to master through SPARK_MASTER_OPTS variable " +
@@ -763,10 +761,6 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
if opts.ganglia:
modules.append('ganglia')
- # Clear SPARK_WORKER_INSTANCES if running on YARN
- if opts.hadoop_major_version == "yarn":
- opts.worker_instances = ""
-
# NOTE: We should clone the repository before running deploy_files to
# prevent ec2-variables.sh from being overwritten
print("Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format(
@@ -1004,7 +998,6 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes]
slave_addresses = [get_dns_name(i, opts.private_ips) for i in slave_nodes]
- worker_instances_str = "%d" % opts.worker_instances if opts.worker_instances else ""
template_vars = {
"master_list": '\n'.join(master_addresses),
"active_master": active_master,
@@ -1018,7 +1011,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
"spark_version": spark_v,
"tachyon_version": tachyon_v,
"hadoop_major_version": opts.hadoop_major_version,
- "spark_worker_instances": worker_instances_str,
+ "spark_worker_instances": "%d" % opts.worker_instances,
"spark_master_opts": opts.master_opts
}
diff --git a/examples/pom.xml b/examples/pom.xml
index e6884b09dca94..e4efee7b5e647 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@