From 1b531f864745ba4c5d0322d02a71173e0789e27b Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Sun, 24 Apr 2016 20:37:06 -0700 Subject: [PATCH 01/16] mvn.cmd for windows Helping script for windows to download dependency and start zinc to support incremental building on windows. --- build/installdep.sh | 148 ++++++++++++++++++++++++++++++++++++++++++++ build/mvn.cmd | 24 +++++++ 2 files changed, 172 insertions(+) create mode 100644 build/installdep.sh create mode 100644 build/mvn.cmd diff --git a/build/installdep.sh b/build/installdep.sh new file mode 100644 index 000000000000..17e36e5dd309 --- /dev/null +++ b/build/installdep.sh @@ -0,0 +1,148 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Determine the current working directory +_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +# Preserve the calling directory +_CALLING_DIR="$(pwd)" +# Options used during compilation +_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" + +# Installs any application tarball given a URL, the expected tarball name, +# and, optionally, a checkable binary path to determine if the binary has +# already been installed +## Arg1 - URL +## Arg2 - Tarball Name +## Arg3 - Checkable Binary +install_app() { + local remote_tarball="$1/$2" + local local_tarball="${_DIR}/$2" + local binary="${_DIR}/$3" + + # setup `curl` and `wget` silent options if we're running on Jenkins + local curl_opts="-L" + local wget_opts="" + if [ -n "$AMPLAB_JENKINS" ]; then + curl_opts="-s ${curl_opts}" + wget_opts="--quiet ${wget_opts}" + else + curl_opts="--progress-bar ${curl_opts}" + wget_opts="--progress=bar:force ${wget_opts}" + fi + + if [ -z "$3" -o ! -f "$binary" ]; then + # check if we already have the tarball + # check if we have curl installed + # download application + [ ! -f "${local_tarball}" ] && [ $(command -v curl) ] && \ + echo "exec: curl ${curl_opts} ${remote_tarball}" 1>&2 && \ + curl ${curl_opts} "${remote_tarball}" > "${local_tarball}" + # if the file still doesn't exist, lets try `wget` and cross our fingers + [ ! -f "${local_tarball}" ] && [ $(command -v wget) ] && \ + echo "exec: wget ${wget_opts} ${remote_tarball}" 1>&2 && \ + wget ${wget_opts} -O "${local_tarball}" "${remote_tarball}" + # if both were unsuccessful, exit + [ ! -f "${local_tarball}" ] && \ + echo -n "ERROR: Cannot download $2 with cURL or wget; " && \ + echo "please install manually and try again." && \ + exit 2 + cd "${_DIR}" && tar -xzf "$2" + rm -rf "$local_tarball" + fi +} + +# Install maven under the build/ folder +install_mvn() { + local MVN_VERSION="3.3.9" + local APACHE_MIRROR=${APACHE_MIRROR:-'https://www.apache.org/dyn/closer.lua?action=download&filename='} + + install_app \ + "${APACHE_MIRROR}/maven/maven-3/${MVN_VERSION}/binaries" \ + "apache-maven-${MVN_VERSION}-bin.tar.gz" \ + "apache-maven-${MVN_VERSION}/bin/mvn" + + MVN_BIN="${_DIR}/apache-maven-${MVN_VERSION}/bin/mvn" +} + +# Install zinc under the build/ folder +install_zinc() { + local zinc_path="zinc-0.3.9/bin/zinc" + [ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1 + local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com} + + install_app \ + "${TYPESAFE_MIRROR}/zinc/0.3.9" \ + "zinc-0.3.9.tgz" \ + "${zinc_path}" + ZINC_BIN="${_DIR}/${zinc_path}" +} + +# Determine the Scala version from the root pom.xml file, set the Scala URL, +# and, with that, download the specific version of Scala necessary under +# the build/ folder +install_scala() { + # determine the Scala version used in Spark + local scala_version=`grep "scala.version" "${_DIR}/../pom.xml" | \ + head -1 | cut -f2 -d'>' | cut -f1 -d'<'` + local scala_bin="${_DIR}/scala-${scala_version}/bin/scala" + local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com} + + install_app \ + "${TYPESAFE_MIRROR}/scala/${scala_version}" \ + "scala-${scala_version}.tgz" \ + "scala-${scala_version}/bin/scala" + + SCALA_COMPILER="$(cd "$(dirname "${scala_bin}")/../lib" && pwd)/scala-compiler.jar" + SCALA_LIBRARY="$(cd "$(dirname "${scala_bin}")/../lib" && pwd)/scala-library.jar" +} + +# Setup healthy defaults for the Zinc port if none were provided from +# the environment +ZINC_PORT=${ZINC_PORT:-"3030"} + +# Check for the `--force` flag dictating that `mvn` should be downloaded +# regardless of whether the system already has a `mvn` install +if [ "$1" == "--force" ]; then + FORCE_MVN=1 + shift +fi + +# Install Maven if necessary +MVN_BIN="$(command -v mvn)" + +if [ ! "$MVN_BIN" -o -n "$FORCE_MVN" ]; then + install_mvn +fi + +# Install the proper version of Scala and Zinc for the build +install_zinc +install_scala + +# Reset the current working directory +cd "${_CALLING_DIR}" + +# Now that zinc is ensured to be installed, check its status and, if its +# not running or just installed, start it +if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then + export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"} + "${ZINC_BIN}" -shutdown -port ${ZINC_PORT} + "${ZINC_BIN}" -start -port ${ZINC_PORT} \ + -scala-compiler "${SCALA_COMPILER}" \ + -scala-library "${SCALA_LIBRARY}" &>/dev/null +fi diff --git a/build/mvn.cmd b/build/mvn.cmd new file mode 100644 index 000000000000..f5da15c55c5c --- /dev/null +++ b/build/mvn.cmd @@ -0,0 +1,24 @@ +@echo off +set _DIR=%~dp0 +set ZINC_PORT=3030 +bash %_DIR%installdep.sh +set _CALLING_DIR="%_DIR%..\" +set _COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" + + +if defined MAVEN_OPTS ( + echo %MAVEN_OPTS% +) else ( + MAVEN_OPTS=%_COMPILE_JVM_OPTS% +) + +where /q mvn +IF ERRORLEVEL 1 ( + set MAVEN_BIN="%_DIR%\apache-maven-3.3.3\bin\mvn" +) ELSE ( + set MAVEN_BIN="mvn" +) + +ECHO "using maven from %MAVEN_BIN%" + +%MAVEN_BIN% -DzincPort=%ZINC_PORT% %* From aa31dd908988f37bd3def550d8d95a9802c3ca3b Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Sun, 24 Apr 2016 23:50:59 -0700 Subject: [PATCH 02/16] Fix file not closed on FileSuite --- .../src/test/scala/org/apache/spark/FileSuite.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 993834f8d7d4..26fbd0ca224f 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -56,10 +56,15 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { nums.saveAsTextFile(outputDir) // Read the plain text file and check it's OK val outputFile = new File(outputDir, "part-00000") - val content = Source.fromFile(outputFile).mkString - assert(content === "1\n2\n3\n4\n") - // Also try reading it in as a text file RDD - assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4")) + val bufferSrc = Source.fromFile(outputFile) + try { + val content = bufferSrc.mkString + assert(content === "1\n2\n3\n4\n") + // Also try reading it in as a text file RDD + assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4")) + } finally { + bufferSrc.close() + } } test("text files (compressed)") { From 151a1b0f4c561e324cecd9e61f2d75f70f52370c Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Mon, 25 Apr 2016 00:28:01 -0700 Subject: [PATCH 03/16] Fix ReliableCheckpointRDD file won't correctly closed if error on the first stream --- .../org/apache/spark/rdd/ReliableCheckpointRDD.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index fddb9353018a..7b301dc5f046 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -243,7 +243,16 @@ private[spark] object ReliableCheckpointRDD extends Logging { if (fs.exists(partitionerFilePath)) { val fileInputStream = fs.open(partitionerFilePath, bufferSize) val serializer = SparkEnv.get.serializer.newInstance() - val deserializeStream = serializer.deserializeStream(fileInputStream) + // make sure that the file is closed if error occurrs during deserialization + val deserializeStream = + try { + serializer.deserializeStream(fileInputStream) + } catch { + case ex => + fileInputStream.close() + throw ex + } + val partitioner = Utils.tryWithSafeFinally[Partitioner] { deserializeStream.readObject[Partitioner] } { From 781b1ac81fb56e5789474b0ac51ee6f4396e6c5b Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Mon, 25 Apr 2016 00:45:39 -0700 Subject: [PATCH 04/16] Fix another file closing --- .../spark/deploy/history/FsHistoryProviderSuite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 39c5857b1345..327b33aef005 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -415,8 +415,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream) val bstream = new BufferedOutputStream(cstream) if (isNewFormat) { - EventLoggingListener.initEventLog(new FileOutputStream(file)) + val newFormatStream = new FileOutputStream(file) + Utils.tryWithSafeFinally { + EventLoggingListener.initEventLog(newFormatStream) + } { + newFormatStream.close() + } } + val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8) Utils.tryWithSafeFinally { events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n")) From 5b83cf6a862d3fd26aced2c205c30ce4d50db984 Mon Sep 17 00:00:00 2001 From: Tao LI Date: Mon, 25 Apr 2016 00:57:42 -0700 Subject: [PATCH 05/16] close log data. --- .../scheduler/EventLoggingListenerSuite.scala | 50 ++++++++++--------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 176d8930aad1..26e64c0ef73f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -202,33 +202,37 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit // Make sure expected events exist in the log file. val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) - val logStart = SparkListenerLogStart(SPARK_VERSION) - val lines = readLines(logData) - val eventSet = mutable.Set( - SparkListenerApplicationStart, - SparkListenerBlockManagerAdded, - SparkListenerExecutorAdded, - SparkListenerEnvironmentUpdate, - SparkListenerJobStart, - SparkListenerJobEnd, - SparkListenerStageSubmitted, - SparkListenerStageCompleted, - SparkListenerTaskStart, - SparkListenerTaskEnd, - SparkListenerApplicationEnd).map(Utils.getFormattedClassName) - lines.foreach { line => - eventSet.foreach { event => - if (line.contains(event)) { - val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line)) - val eventType = Utils.getFormattedClassName(parsedEvent) - if (eventType == event) { - eventSet.remove(event) + try { + val logStart = SparkListenerLogStart(SPARK_VERSION) + val lines = readLines(logData) + val eventSet = mutable.Set( + SparkListenerApplicationStart, + SparkListenerBlockManagerAdded, + SparkListenerExecutorAdded, + SparkListenerEnvironmentUpdate, + SparkListenerJobStart, + SparkListenerJobEnd, + SparkListenerStageSubmitted, + SparkListenerStageCompleted, + SparkListenerTaskStart, + SparkListenerTaskEnd, + SparkListenerApplicationEnd).map(Utils.getFormattedClassName) + lines.foreach { line => + eventSet.foreach { event => + if (line.contains(event)) { + val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line)) + val eventType = Utils.getFormattedClassName(parsedEvent) + if (eventType == event) { + eventSet.remove(event) + } } } } + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq) + } finally { + logData.close() } - assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) - assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq) } private def readLines(in: InputStream): Seq[String] = { From aa5c0996ce5fb5d70f103b8aaf802004fd5efd41 Mon Sep 17 00:00:00 2001 From: Tao LI Date: Mon, 25 Apr 2016 01:01:55 -0700 Subject: [PATCH 06/16] Close the class loader --- .../org/apache/spark/scheduler/TaskResultGetterSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index b5385c11a926..8a811751e318 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -183,9 +183,9 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local // ensure we reset the classloader after the test completes val originalClassLoader = Thread.currentThread.getContextClassLoader + val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader) try { // load the exception from the jar - val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader) loader.addURL(jarFile.toURI.toURL) Thread.currentThread().setContextClassLoader(loader) val excClass: Class[_] = Utils.classForName("repro.MyException") @@ -210,6 +210,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local assert(expectedFailure.findFirstMatchIn(exceptionMessage).isDefined) assert(unknownFailure.findFirstMatchIn(exceptionMessage).isEmpty) } finally { + loader.close() Thread.currentThread.setContextClassLoader(originalClassLoader) } } From 6108f610ae811e398a5b43aeb8a3ae9320d1f655 Mon Sep 17 00:00:00 2001 From: Tao LI Date: Mon, 25 Apr 2016 01:20:48 -0700 Subject: [PATCH 07/16] Another file not closed. --- .../scala/org/apache/spark/mllib/util/MLUtilsSuite.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 0c6aabf1926e..2aab018d13b0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -151,12 +151,14 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { val tempDir = Utils.createTempDir() val outputDir = new File(tempDir, "output") MLUtils.saveAsLibSVMFile(examples, outputDir.toURI.toString) - val lines = outputDir.listFiles() + val sources = outputDir.listFiles() .filter(_.getName.startsWith("part-")) - .flatMap(Source.fromFile(_).getLines()) - .toSet + .map(Source.fromFile) + + val lines = sources.flatMap(_.getLines()).toSet val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03") assert(lines === expected) + sources.foreach(_.close()) Utils.deleteRecursively(tempDir) } From 8b988269d06eeb0be5f28b501313ac2fd49a2bae Mon Sep 17 00:00:00 2001 From: Tao LI Date: Mon, 25 Apr 2016 01:29:16 -0700 Subject: [PATCH 08/16] Stop to release resources. --- .../apache/spark/streaming/scheduler/ReceiverTracker.scala | 5 ++++- .../test/java/org/apache/spark/streaming/JavaAPISuite.java | 1 + .../scala/org/apache/spark/streaming/CheckpointSuite.scala | 6 ++++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 9aa2f0bbb995..4e2d6594c523 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -194,10 +194,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Finally, stop the endpoint ssc.env.rpcEnv.stop(endpoint) endpoint = null - receivedBlockTracker.stop() logInfo("ReceiverTracker stopped") trackerState = Stopped } + + // note that the output writer is created at construction time, we have to close + // them even if it hasn't been started. + receivedBlockTracker.stop() } /** Allocate all unallocated blocks to the given batch. */ diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 01f0c4de9e3c..a47b2faa8b2f 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1805,6 +1805,7 @@ public Integer call(String s) { // will be re-processed after recovery List> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3); assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3)); + ssc.stop(); Utils.deleteRecursively(tempDir); } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index bdbac64b9bc7..af0a239a92b2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -644,8 +644,10 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester val mappedStream = fileStream.map(s => { val i = s.toInt if (i == 3) { - while (CheckpointSuite.batchThreeShouldBlockIndefinitely) { - Thread.sleep(Long.MaxValue) + if (CheckpointSuite.batchThreeShouldBlockIndefinitely) { + // It's not a good idea to let the thread run forever + // as resource won't be correctly released + Thread.sleep(6000) } } i From 16edc2efb170c4427e68a77055ccee81f2051a8c Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Mon, 25 Apr 2016 01:34:45 -0700 Subject: [PATCH 09/16] More closing problem --- .../org/apache/spark/streaming/MapWithStateSuite.scala | 8 ++++---- .../spark/streaming/ReceivedBlockTrackerSuite.scala | 5 +++++ .../apache/spark/streaming/util/WriteAheadLogSuite.scala | 1 + 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala index 3b662ec1833a..28b4fba5caef 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala @@ -39,18 +39,15 @@ class MapWithStateSuite extends SparkFunSuite before { StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } - checkpointDir = Utils.createTempDir("checkpoint") } after { StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } - if (checkpointDir != null) { - Utils.deleteRecursively(checkpointDir) - } } override def beforeAll(): Unit = { super.beforeAll() + checkpointDir = Utils.createTempDir("checkpoint") val conf = new SparkConf().setMaster("local").setAppName("MapWithStateSuite") conf.set("spark.streaming.clock", classOf[ManualClock].getName()) sc = new SparkContext(conf) @@ -64,6 +61,9 @@ class MapWithStateSuite extends SparkFunSuite } finally { super.afterAll() } + if (checkpointDir != null) { + Utils.deleteRecursively(checkpointDir) + } } test("state - get, exists, update, remove, ") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 851013bb1e84..107c3f5dcc08 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -134,6 +134,7 @@ class ReceivedBlockTrackerSuite val expectedWrittenData1 = blockInfos1.map(BlockAdditionEvent) getWrittenLogData() shouldEqual expectedWrittenData1 getWriteAheadLogFiles() should have size 1 + tracker1.stop() incrementTime() @@ -141,6 +142,7 @@ class ReceivedBlockTrackerSuite val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false) tracker1_.getUnallocatedBlocks(streamId) shouldBe empty tracker1_.hasUnallocatedReceivedBlocks should be (false) + tracker1_.stop() // Restart tracker and verify recovered list of unallocated blocks val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true) @@ -163,6 +165,7 @@ class ReceivedBlockTrackerSuite val blockInfos2 = addBlockInfos(tracker2) tracker2.allocateBlocksToBatch(batchTime2) tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 + tracker2.stop() // Verify whether log has correct contents val expectedWrittenData2 = expectedWrittenData1 ++ @@ -192,6 +195,7 @@ class ReceivedBlockTrackerSuite getWriteAheadLogFiles() should not contain oldestLogFile } printLogFiles("After clean") + tracker3.stop() // Restart tracker and verify recovered state, specifically whether info about the first // batch has been removed, but not the second batch @@ -200,6 +204,7 @@ class ReceivedBlockTrackerSuite tracker4.getUnallocatedBlocks(streamId) shouldBe empty tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 + tracker4.stop() } test("disable write ahead log when checkpoint directory is not set") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 24cb5afee33c..4bec52b9fe4f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -139,6 +139,7 @@ abstract class CommonWriteAheadLogTests( assert(getLogFilesInDirectory(testDir).size < logFiles.size) } } + writeAheadLog.close() } test(testPrefix + "handling file errors while reading rotating logs") { From 1f91a8d2c15458c58c87a5342cf528ab7bde17d7 Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Mon, 25 Apr 2016 06:25:42 -0700 Subject: [PATCH 10/16] Fix the zip file and jar file in RPackageUtilsSuite --- .../scala/org/apache/spark/deploy/RPackageUtilsSuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala index 13cba94578a6..bd426c778d6d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -77,6 +77,7 @@ class RPackageUtilsSuite assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code") assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code") assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code") + jars.foreach(_.close()) } } @@ -144,13 +145,15 @@ class RPackageUtilsSuite IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc") val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip") assert(finalZip.exists()) - val entries = new ZipFile(finalZip).entries().asScala.map(_.getName).toSeq + val zipFile = new ZipFile(finalZip) + val entries = zipFile.entries().asScala.map(_.getName).toSeq assert(entries.contains("/test.R")) assert(entries.contains("/SparkR/abc.R")) assert(entries.contains("/SparkR/DESCRIPTION")) assert(!entries.contains("/package.zip")) assert(entries.contains("/packageTest/def.R")) assert(entries.contains("/packageTest/DESCRIPTION")) + zipFile.close() } finally { FileUtils.deleteDirectory(tempDir) } From e9399a3dddf7528d056e075ee05636c060a60459 Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Mon, 25 Apr 2016 22:35:35 -0700 Subject: [PATCH 11/16] Stop ssc in MasterFailureTest --- .../scala/org/apache/spark/streaming/MasterFailureTest.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala index 60c8e702352c..fff2d6fbace3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala @@ -164,6 +164,7 @@ object MasterFailureTest extends Logging { val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun) fileGeneratingThread.join() + ssc.stop() fs.delete(checkpointDir, true) fs.delete(testDir, true) logInfo("Finished test after " + killCount + " failures") From 0948fd239139cb46d38f96d0a28f69c432f17bd6 Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Tue, 26 Apr 2016 01:09:04 -0700 Subject: [PATCH 12/16] Remove accidentally added files --- build/installdep.sh | 148 -------------------------------------------- build/mvn.cmd | 24 ------- 2 files changed, 172 deletions(-) delete mode 100644 build/installdep.sh delete mode 100644 build/mvn.cmd diff --git a/build/installdep.sh b/build/installdep.sh deleted file mode 100644 index 17e36e5dd309..000000000000 --- a/build/installdep.sh +++ /dev/null @@ -1,148 +0,0 @@ -#!/usr/bin/env bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Determine the current working directory -_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -# Preserve the calling directory -_CALLING_DIR="$(pwd)" -# Options used during compilation -_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" - -# Installs any application tarball given a URL, the expected tarball name, -# and, optionally, a checkable binary path to determine if the binary has -# already been installed -## Arg1 - URL -## Arg2 - Tarball Name -## Arg3 - Checkable Binary -install_app() { - local remote_tarball="$1/$2" - local local_tarball="${_DIR}/$2" - local binary="${_DIR}/$3" - - # setup `curl` and `wget` silent options if we're running on Jenkins - local curl_opts="-L" - local wget_opts="" - if [ -n "$AMPLAB_JENKINS" ]; then - curl_opts="-s ${curl_opts}" - wget_opts="--quiet ${wget_opts}" - else - curl_opts="--progress-bar ${curl_opts}" - wget_opts="--progress=bar:force ${wget_opts}" - fi - - if [ -z "$3" -o ! -f "$binary" ]; then - # check if we already have the tarball - # check if we have curl installed - # download application - [ ! -f "${local_tarball}" ] && [ $(command -v curl) ] && \ - echo "exec: curl ${curl_opts} ${remote_tarball}" 1>&2 && \ - curl ${curl_opts} "${remote_tarball}" > "${local_tarball}" - # if the file still doesn't exist, lets try `wget` and cross our fingers - [ ! -f "${local_tarball}" ] && [ $(command -v wget) ] && \ - echo "exec: wget ${wget_opts} ${remote_tarball}" 1>&2 && \ - wget ${wget_opts} -O "${local_tarball}" "${remote_tarball}" - # if both were unsuccessful, exit - [ ! -f "${local_tarball}" ] && \ - echo -n "ERROR: Cannot download $2 with cURL or wget; " && \ - echo "please install manually and try again." && \ - exit 2 - cd "${_DIR}" && tar -xzf "$2" - rm -rf "$local_tarball" - fi -} - -# Install maven under the build/ folder -install_mvn() { - local MVN_VERSION="3.3.9" - local APACHE_MIRROR=${APACHE_MIRROR:-'https://www.apache.org/dyn/closer.lua?action=download&filename='} - - install_app \ - "${APACHE_MIRROR}/maven/maven-3/${MVN_VERSION}/binaries" \ - "apache-maven-${MVN_VERSION}-bin.tar.gz" \ - "apache-maven-${MVN_VERSION}/bin/mvn" - - MVN_BIN="${_DIR}/apache-maven-${MVN_VERSION}/bin/mvn" -} - -# Install zinc under the build/ folder -install_zinc() { - local zinc_path="zinc-0.3.9/bin/zinc" - [ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1 - local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com} - - install_app \ - "${TYPESAFE_MIRROR}/zinc/0.3.9" \ - "zinc-0.3.9.tgz" \ - "${zinc_path}" - ZINC_BIN="${_DIR}/${zinc_path}" -} - -# Determine the Scala version from the root pom.xml file, set the Scala URL, -# and, with that, download the specific version of Scala necessary under -# the build/ folder -install_scala() { - # determine the Scala version used in Spark - local scala_version=`grep "scala.version" "${_DIR}/../pom.xml" | \ - head -1 | cut -f2 -d'>' | cut -f1 -d'<'` - local scala_bin="${_DIR}/scala-${scala_version}/bin/scala" - local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com} - - install_app \ - "${TYPESAFE_MIRROR}/scala/${scala_version}" \ - "scala-${scala_version}.tgz" \ - "scala-${scala_version}/bin/scala" - - SCALA_COMPILER="$(cd "$(dirname "${scala_bin}")/../lib" && pwd)/scala-compiler.jar" - SCALA_LIBRARY="$(cd "$(dirname "${scala_bin}")/../lib" && pwd)/scala-library.jar" -} - -# Setup healthy defaults for the Zinc port if none were provided from -# the environment -ZINC_PORT=${ZINC_PORT:-"3030"} - -# Check for the `--force` flag dictating that `mvn` should be downloaded -# regardless of whether the system already has a `mvn` install -if [ "$1" == "--force" ]; then - FORCE_MVN=1 - shift -fi - -# Install Maven if necessary -MVN_BIN="$(command -v mvn)" - -if [ ! "$MVN_BIN" -o -n "$FORCE_MVN" ]; then - install_mvn -fi - -# Install the proper version of Scala and Zinc for the build -install_zinc -install_scala - -# Reset the current working directory -cd "${_CALLING_DIR}" - -# Now that zinc is ensured to be installed, check its status and, if its -# not running or just installed, start it -if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then - export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"} - "${ZINC_BIN}" -shutdown -port ${ZINC_PORT} - "${ZINC_BIN}" -start -port ${ZINC_PORT} \ - -scala-compiler "${SCALA_COMPILER}" \ - -scala-library "${SCALA_LIBRARY}" &>/dev/null -fi diff --git a/build/mvn.cmd b/build/mvn.cmd deleted file mode 100644 index f5da15c55c5c..000000000000 --- a/build/mvn.cmd +++ /dev/null @@ -1,24 +0,0 @@ -@echo off -set _DIR=%~dp0 -set ZINC_PORT=3030 -bash %_DIR%installdep.sh -set _CALLING_DIR="%_DIR%..\" -set _COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" - - -if defined MAVEN_OPTS ( - echo %MAVEN_OPTS% -) else ( - MAVEN_OPTS=%_COMPILE_JVM_OPTS% -) - -where /q mvn -IF ERRORLEVEL 1 ( - set MAVEN_BIN="%_DIR%\apache-maven-3.3.3\bin\mvn" -) ELSE ( - set MAVEN_BIN="mvn" -) - -ECHO "using maven from %MAVEN_BIN%" - -%MAVEN_BIN% -DzincPort=%ZINC_PORT% %* From 488cd3ab07795d1926b2286a31005ebab4f42cc1 Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Mon, 2 May 2016 23:11:12 -0700 Subject: [PATCH 13/16] Code cleanup with respect to comments --- .../org/apache/spark/streaming/CheckpointSuite.scala | 8 ++++---- .../org/apache/spark/streaming/MapWithStateSuite.scala | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index af0a239a92b2..0b7e596573bd 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -640,11 +640,11 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester val fileStream = ssc.textFileStream(testDir.toString) // Make value 3 take a large time to process, to ensure that the driver // shuts down in the middle of processing the 3rd batch - CheckpointSuite.batchThreeShouldBlockIndefinitely = true + CheckpointSuite.batchThreeShouldBlockALongTime = true val mappedStream = fileStream.map(s => { val i = s.toInt if (i == 3) { - if (CheckpointSuite.batchThreeShouldBlockIndefinitely) { + if (CheckpointSuite.batchThreeShouldBlockALongTime) { // It's not a good idea to let the thread run forever // as resource won't be correctly released Thread.sleep(6000) @@ -691,7 +691,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester } // The original StreamingContext has now been stopped. - CheckpointSuite.batchThreeShouldBlockIndefinitely = false + CheckpointSuite.batchThreeShouldBlockALongTime = false // Create files while the streaming driver is down for (i <- Seq(4, 5, 6)) { @@ -928,5 +928,5 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester } private object CheckpointSuite extends Serializable { - var batchThreeShouldBlockIndefinitely: Boolean = true + var batchThreeShouldBlockALongTime: Boolean = true } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala index 28b4fba5caef..55c048b3c586 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala @@ -38,11 +38,11 @@ class MapWithStateSuite extends SparkFunSuite protected val batchDuration = Seconds(1) before { - StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } + StreamingContext.getActive().foreach(_.stop(stopSparkContext = false)) } after { - StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } + StreamingContext.getActive().foreach(_.stop(stopSparkContext = false)) } override def beforeAll(): Unit = { @@ -60,9 +60,9 @@ class MapWithStateSuite extends SparkFunSuite } } finally { super.afterAll() - } - if (checkpointDir != null) { - Utils.deleteRecursively(checkpointDir) + if (checkpointDir != null) { + Utils.deleteRecursively(checkpointDir) + } } } From aa74679c0fb24a63b7eec200f58106fae4cd0c5f Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Wed, 4 May 2016 20:51:50 -0700 Subject: [PATCH 14/16] Style fixes --- .../scala/org/apache/spark/streaming/CheckpointSuite.scala | 4 ++-- .../scala/org/apache/spark/streaming/MapWithStateSuite.scala | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 0b7e596573bd..5bf430c02b5e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -641,7 +641,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester // Make value 3 take a large time to process, to ensure that the driver // shuts down in the middle of processing the 3rd batch CheckpointSuite.batchThreeShouldBlockALongTime = true - val mappedStream = fileStream.map(s => { + val mappedStream = fileStream.map{ s => val i = s.toInt if (i == 3) { if (CheckpointSuite.batchThreeShouldBlockALongTime) { @@ -651,7 +651,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester } } i - }) + } // Reducing over a large window to ensure that recovery from driver failure // requires reprocessing of all the files seen before the failure diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala index 55c048b3c586..f8a329a4c075 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala @@ -60,9 +60,7 @@ class MapWithStateSuite extends SparkFunSuite } } finally { super.afterAll() - if (checkpointDir != null) { - Utils.deleteRecursively(checkpointDir) - } + Utils.deleteRecursively(checkpointDir) } } From 69dbceb838e4d00ed88b87c358af8d95bfeea0cf Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Thu, 5 May 2016 00:29:32 -0700 Subject: [PATCH 15/16] Minor code cleanup --- .../scheduler/EventLoggingListenerSuite.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 26e64c0ef73f..c7e607076309 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -202,21 +202,21 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit // Make sure expected events exist in the log file. val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) + val eventSet = mutable.Set( + SparkListenerApplicationStart, + SparkListenerBlockManagerAdded, + SparkListenerExecutorAdded, + SparkListenerEnvironmentUpdate, + SparkListenerJobStart, + SparkListenerJobEnd, + SparkListenerStageSubmitted, + SparkListenerStageCompleted, + SparkListenerTaskStart, + SparkListenerTaskEnd, + SparkListenerApplicationEnd).map(Utils.getFormattedClassName) try { val logStart = SparkListenerLogStart(SPARK_VERSION) val lines = readLines(logData) - val eventSet = mutable.Set( - SparkListenerApplicationStart, - SparkListenerBlockManagerAdded, - SparkListenerExecutorAdded, - SparkListenerEnvironmentUpdate, - SparkListenerJobStart, - SparkListenerJobEnd, - SparkListenerStageSubmitted, - SparkListenerStageCompleted, - SparkListenerTaskStart, - SparkListenerTaskEnd, - SparkListenerApplicationEnd).map(Utils.getFormattedClassName) lines.foreach { line => eventSet.foreach { event => if (line.contains(event)) { From 3ae2396f146c382e9ec3b45a269cd3b1d2970d43 Mon Sep 17 00:00:00 2001 From: Tao LI Date: Tue, 10 May 2016 11:18:46 +0800 Subject: [PATCH 16/16] Clear build warning --- .../main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index 7b301dc5f046..63f271740973 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -248,7 +248,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { try { serializer.deserializeStream(fileInputStream) } catch { - case ex => + case ex : Throwable => fileInputStream.close() throw ex }