From ceb1681620b83e46425d20f20ff5a228c0a8d75b Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Sun, 24 Apr 2016 20:37:06 -0700 Subject: [PATCH 01/21] mvn.cmd for windows Helping script for windows to download dependency and start zinc to support incremental building on windows. --- build/installdep.sh | 148 ++++++++++++++++++++++++++++++++++++++++++++ build/mvn.cmd | 24 +++++++ 2 files changed, 172 insertions(+) create mode 100644 build/installdep.sh create mode 100644 build/mvn.cmd diff --git a/build/installdep.sh b/build/installdep.sh new file mode 100644 index 0000000000000..17e36e5dd3097 --- /dev/null +++ b/build/installdep.sh @@ -0,0 +1,148 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Determine the current working directory +_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +# Preserve the calling directory +_CALLING_DIR="$(pwd)" +# Options used during compilation +_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" + +# Installs any application tarball given a URL, the expected tarball name, +# and, optionally, a checkable binary path to determine if the binary has +# already been installed +## Arg1 - URL +## Arg2 - Tarball Name +## Arg3 - Checkable Binary +install_app() { + local remote_tarball="$1/$2" + local local_tarball="${_DIR}/$2" + local binary="${_DIR}/$3" + + # setup `curl` and `wget` silent options if we're running on Jenkins + local curl_opts="-L" + local wget_opts="" + if [ -n "$AMPLAB_JENKINS" ]; then + curl_opts="-s ${curl_opts}" + wget_opts="--quiet ${wget_opts}" + else + curl_opts="--progress-bar ${curl_opts}" + wget_opts="--progress=bar:force ${wget_opts}" + fi + + if [ -z "$3" -o ! -f "$binary" ]; then + # check if we already have the tarball + # check if we have curl installed + # download application + [ ! -f "${local_tarball}" ] && [ $(command -v curl) ] && \ + echo "exec: curl ${curl_opts} ${remote_tarball}" 1>&2 && \ + curl ${curl_opts} "${remote_tarball}" > "${local_tarball}" + # if the file still doesn't exist, lets try `wget` and cross our fingers + [ ! -f "${local_tarball}" ] && [ $(command -v wget) ] && \ + echo "exec: wget ${wget_opts} ${remote_tarball}" 1>&2 && \ + wget ${wget_opts} -O "${local_tarball}" "${remote_tarball}" + # if both were unsuccessful, exit + [ ! -f "${local_tarball}" ] && \ + echo -n "ERROR: Cannot download $2 with cURL or wget; " && \ + echo "please install manually and try again." && \ + exit 2 + cd "${_DIR}" && tar -xzf "$2" + rm -rf "$local_tarball" + fi +} + +# Install maven under the build/ folder +install_mvn() { + local MVN_VERSION="3.3.9" + local APACHE_MIRROR=${APACHE_MIRROR:-'https://www.apache.org/dyn/closer.lua?action=download&filename='} + + install_app \ + "${APACHE_MIRROR}/maven/maven-3/${MVN_VERSION}/binaries" \ + "apache-maven-${MVN_VERSION}-bin.tar.gz" \ + "apache-maven-${MVN_VERSION}/bin/mvn" + + MVN_BIN="${_DIR}/apache-maven-${MVN_VERSION}/bin/mvn" +} + +# Install zinc under the build/ folder +install_zinc() { + local zinc_path="zinc-0.3.9/bin/zinc" + [ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1 + local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com} + + install_app \ + "${TYPESAFE_MIRROR}/zinc/0.3.9" \ + "zinc-0.3.9.tgz" \ + "${zinc_path}" + ZINC_BIN="${_DIR}/${zinc_path}" +} + +# Determine the Scala version from the root pom.xml file, set the Scala URL, +# and, with that, download the specific version of Scala necessary under +# the build/ folder +install_scala() { + # determine the Scala version used in Spark + local scala_version=`grep "scala.version" "${_DIR}/../pom.xml" | \ + head -1 | cut -f2 -d'>' | cut -f1 -d'<'` + local scala_bin="${_DIR}/scala-${scala_version}/bin/scala" + local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com} + + install_app \ + "${TYPESAFE_MIRROR}/scala/${scala_version}" \ + "scala-${scala_version}.tgz" \ + "scala-${scala_version}/bin/scala" + + SCALA_COMPILER="$(cd "$(dirname "${scala_bin}")/../lib" && pwd)/scala-compiler.jar" + SCALA_LIBRARY="$(cd "$(dirname "${scala_bin}")/../lib" && pwd)/scala-library.jar" +} + +# Setup healthy defaults for the Zinc port if none were provided from +# the environment +ZINC_PORT=${ZINC_PORT:-"3030"} + +# Check for the `--force` flag dictating that `mvn` should be downloaded +# regardless of whether the system already has a `mvn` install +if [ "$1" == "--force" ]; then + FORCE_MVN=1 + shift +fi + +# Install Maven if necessary +MVN_BIN="$(command -v mvn)" + +if [ ! "$MVN_BIN" -o -n "$FORCE_MVN" ]; then + install_mvn +fi + +# Install the proper version of Scala and Zinc for the build +install_zinc +install_scala + +# Reset the current working directory +cd "${_CALLING_DIR}" + +# Now that zinc is ensured to be installed, check its status and, if its +# not running or just installed, start it +if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then + export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"} + "${ZINC_BIN}" -shutdown -port ${ZINC_PORT} + "${ZINC_BIN}" -start -port ${ZINC_PORT} \ + -scala-compiler "${SCALA_COMPILER}" \ + -scala-library "${SCALA_LIBRARY}" &>/dev/null +fi diff --git a/build/mvn.cmd b/build/mvn.cmd new file mode 100644 index 0000000000000..f5da15c55c5c5 --- /dev/null +++ b/build/mvn.cmd @@ -0,0 +1,24 @@ +@echo off +set _DIR=%~dp0 +set ZINC_PORT=3030 +bash %_DIR%installdep.sh +set _CALLING_DIR="%_DIR%..\" +set _COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" + + +if defined MAVEN_OPTS ( + echo %MAVEN_OPTS% +) else ( + MAVEN_OPTS=%_COMPILE_JVM_OPTS% +) + +where /q mvn +IF ERRORLEVEL 1 ( + set MAVEN_BIN="%_DIR%\apache-maven-3.3.3\bin\mvn" +) ELSE ( + set MAVEN_BIN="mvn" +) + +ECHO "using maven from %MAVEN_BIN%" + +%MAVEN_BIN% -DzincPort=%ZINC_PORT% %* From 98ee4ab6a96b3d8ad8acd8dbeb6dedc10523546e Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Sun, 24 Apr 2016 23:50:59 -0700 Subject: [PATCH 02/21] Fix file not closed on FileSuite --- .../src/test/scala/org/apache/spark/FileSuite.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index cc52bb1d23cd5..40af728159ae2 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -58,10 +58,15 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { nums.saveAsTextFile(outputDir) // Read the plain text file and check it's OK val outputFile = new File(outputDir, "part-00000") - val content = Source.fromFile(outputFile).mkString - assert(content === "1\n2\n3\n4\n") - // Also try reading it in as a text file RDD - assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4")) + val bufferSrc = Source.fromFile(outputFile) + try { + val content = bufferSrc.mkString + assert(content === "1\n2\n3\n4\n") + // Also try reading it in as a text file RDD + assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4")) + } finally { + bufferSrc.close() + } } test("text files (compressed)") { From ed52ec72fee774e9b8a1bec3183ae4594d758b9f Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Mon, 25 Apr 2016 00:45:39 -0700 Subject: [PATCH 03/21] Fix another file closing --- .../spark/deploy/history/FsHistoryProviderSuite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index a5eda7b5a5a75..2c41c432d1fe2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -449,8 +449,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream) val bstream = new BufferedOutputStream(cstream) if (isNewFormat) { - EventLoggingListener.initEventLog(new FileOutputStream(file)) + val newFormatStream = new FileOutputStream(file) + Utils.tryWithSafeFinally { + EventLoggingListener.initEventLog(newFormatStream) + } { + newFormatStream.close() + } } + val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8) Utils.tryWithSafeFinally { events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n")) From 448508f467c0401f5461d79dadc1454b03210368 Mon Sep 17 00:00:00 2001 From: Tao LI Date: Mon, 25 Apr 2016 00:57:42 -0700 Subject: [PATCH 04/21] close log data. --- .../scheduler/EventLoggingListenerSuite.scala | 50 ++++++++++--------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 7f4859206e257..e6d2adfa84ef9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -202,33 +202,37 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit // Make sure expected events exist in the log file. val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) - val logStart = SparkListenerLogStart(SPARK_VERSION) - val lines = readLines(logData) - val eventSet = mutable.Set( - SparkListenerApplicationStart, - SparkListenerBlockManagerAdded, - SparkListenerExecutorAdded, - SparkListenerEnvironmentUpdate, - SparkListenerJobStart, - SparkListenerJobEnd, - SparkListenerStageSubmitted, - SparkListenerStageCompleted, - SparkListenerTaskStart, - SparkListenerTaskEnd, - SparkListenerApplicationEnd).map(Utils.getFormattedClassName) - lines.foreach { line => - eventSet.foreach { event => - if (line.contains(event)) { - val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line)) - val eventType = Utils.getFormattedClassName(parsedEvent) - if (eventType == event) { - eventSet.remove(event) + try { + val logStart = SparkListenerLogStart(SPARK_VERSION) + val lines = readLines(logData) + val eventSet = mutable.Set( + SparkListenerApplicationStart, + SparkListenerBlockManagerAdded, + SparkListenerExecutorAdded, + SparkListenerEnvironmentUpdate, + SparkListenerJobStart, + SparkListenerJobEnd, + SparkListenerStageSubmitted, + SparkListenerStageCompleted, + SparkListenerTaskStart, + SparkListenerTaskEnd, + SparkListenerApplicationEnd).map(Utils.getFormattedClassName) + lines.foreach { line => + eventSet.foreach { event => + if (line.contains(event)) { + val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line)) + val eventType = Utils.getFormattedClassName(parsedEvent) + if (eventType == event) { + eventSet.remove(event) + } } } } + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq) + } finally { + logData.close() } - assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) - assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq) } private def readLines(in: InputStream): Seq[String] = { From 34078a3facde46f09430baf1a1a5976b1c2d2869 Mon Sep 17 00:00:00 2001 From: Tao LI Date: Mon, 25 Apr 2016 01:01:55 -0700 Subject: [PATCH 05/21] Close the class loader --- .../org/apache/spark/scheduler/TaskResultGetterSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index 9e472f900b655..173276f77f379 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -183,9 +183,9 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local // ensure we reset the classloader after the test completes val originalClassLoader = Thread.currentThread.getContextClassLoader + val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader) try { // load the exception from the jar - val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader) loader.addURL(jarFile.toURI.toURL) Thread.currentThread().setContextClassLoader(loader) val excClass: Class[_] = Utils.classForName("repro.MyException") @@ -210,6 +210,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local assert(expectedFailure.findFirstMatchIn(exceptionMessage).isDefined) assert(unknownFailure.findFirstMatchIn(exceptionMessage).isEmpty) } finally { + loader.close() Thread.currentThread.setContextClassLoader(originalClassLoader) } } From efb7227518e9fdb8c1a35ae2adb3971c9cfc1ac2 Mon Sep 17 00:00:00 2001 From: Tao LI Date: Mon, 25 Apr 2016 01:20:48 -0700 Subject: [PATCH 06/21] Another file not closed. --- .../scala/org/apache/spark/mllib/util/MLUtilsSuite.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index e4e9be39ff6f9..69842f18bcfc7 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -155,12 +155,14 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { val tempDir = Utils.createTempDir() val outputDir = new File(tempDir, "output") MLUtils.saveAsLibSVMFile(examples, outputDir.toURI.toString) - val lines = outputDir.listFiles() + val sources = outputDir.listFiles() .filter(_.getName.startsWith("part-")) - .flatMap(Source.fromFile(_).getLines()) - .toSet + .map(Source.fromFile) + + val lines = sources.flatMap(_.getLines()).toSet val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03") assert(lines === expected) + sources.foreach(_.close()) Utils.deleteRecursively(tempDir) } From a06bffc02e4d2cd7e723e73102c160c1c57f0915 Mon Sep 17 00:00:00 2001 From: Tao LI Date: Mon, 25 Apr 2016 01:29:16 -0700 Subject: [PATCH 07/21] Stop to release resources. --- .../apache/spark/streaming/scheduler/ReceiverTracker.scala | 5 ++++- .../test/java/org/apache/spark/streaming/JavaAPISuite.java | 1 + .../scala/org/apache/spark/streaming/CheckpointSuite.scala | 6 ++++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index b9d898a72362e..6f8222d0b7a0c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -194,10 +194,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Finally, stop the endpoint ssc.env.rpcEnv.stop(endpoint) endpoint = null - receivedBlockTracker.stop() logInfo("ReceiverTracker stopped") trackerState = Stopped } + + // note that the output writer is created at construction time, we have to close + // them even if it hasn't been started. + receivedBlockTracker.stop() } /** Allocate all unallocated blocks to the given batch. */ diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 3d54abd903b6d..648a5abe0b89a 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1805,6 +1805,7 @@ public Integer call(String s) { // will be re-processed after recovery List> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3); assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3)); + ssc.stop(); Utils.deleteRecursively(tempDir); } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index b79cc65d8b5e9..e9762debd012c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -646,8 +646,10 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester val mappedStream = fileStream.map(s => { val i = s.toInt if (i == 3) { - while (CheckpointSuite.batchThreeShouldBlockIndefinitely) { - Thread.sleep(Long.MaxValue) + if (CheckpointSuite.batchThreeShouldBlockIndefinitely) { + // It's not a good idea to let the thread run forever + // as resource won't be correctly released + Thread.sleep(6000) } } i From 45262dcc58073a99417b9d1a6c0e24c393716c8f Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Mon, 25 Apr 2016 01:34:45 -0700 Subject: [PATCH 08/21] More closing problem --- .../org/apache/spark/streaming/MapWithStateSuite.scala | 8 ++++---- .../spark/streaming/ReceivedBlockTrackerSuite.scala | 5 +++++ .../apache/spark/streaming/util/WriteAheadLogSuite.scala | 1 + 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala index 3b662ec1833aa..28b4fba5caef3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala @@ -39,18 +39,15 @@ class MapWithStateSuite extends SparkFunSuite before { StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } - checkpointDir = Utils.createTempDir("checkpoint") } after { StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } - if (checkpointDir != null) { - Utils.deleteRecursively(checkpointDir) - } } override def beforeAll(): Unit = { super.beforeAll() + checkpointDir = Utils.createTempDir("checkpoint") val conf = new SparkConf().setMaster("local").setAppName("MapWithStateSuite") conf.set("spark.streaming.clock", classOf[ManualClock].getName()) sc = new SparkContext(conf) @@ -64,6 +61,9 @@ class MapWithStateSuite extends SparkFunSuite } finally { super.afterAll() } + if (checkpointDir != null) { + Utils.deleteRecursively(checkpointDir) + } } test("state - get, exists, update, remove, ") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 851013bb1e846..107c3f5dcc08d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -134,6 +134,7 @@ class ReceivedBlockTrackerSuite val expectedWrittenData1 = blockInfos1.map(BlockAdditionEvent) getWrittenLogData() shouldEqual expectedWrittenData1 getWriteAheadLogFiles() should have size 1 + tracker1.stop() incrementTime() @@ -141,6 +142,7 @@ class ReceivedBlockTrackerSuite val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false) tracker1_.getUnallocatedBlocks(streamId) shouldBe empty tracker1_.hasUnallocatedReceivedBlocks should be (false) + tracker1_.stop() // Restart tracker and verify recovered list of unallocated blocks val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true) @@ -163,6 +165,7 @@ class ReceivedBlockTrackerSuite val blockInfos2 = addBlockInfos(tracker2) tracker2.allocateBlocksToBatch(batchTime2) tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 + tracker2.stop() // Verify whether log has correct contents val expectedWrittenData2 = expectedWrittenData1 ++ @@ -192,6 +195,7 @@ class ReceivedBlockTrackerSuite getWriteAheadLogFiles() should not contain oldestLogFile } printLogFiles("After clean") + tracker3.stop() // Restart tracker and verify recovered state, specifically whether info about the first // batch has been removed, but not the second batch @@ -200,6 +204,7 @@ class ReceivedBlockTrackerSuite tracker4.getUnallocatedBlocks(streamId) shouldBe empty tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 + tracker4.stop() } test("disable write ahead log when checkpoint directory is not set") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 24cb5afee33c2..4bec52b9fe4fe 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -139,6 +139,7 @@ abstract class CommonWriteAheadLogTests( assert(getLogFilesInDirectory(testDir).size < logFiles.size) } } + writeAheadLog.close() } test(testPrefix + "handling file errors while reading rotating logs") { From b3c0c96fb4cbe19344cc220e234c98644aa0efcd Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Mon, 25 Apr 2016 06:25:42 -0700 Subject: [PATCH 09/21] Fix the zip file and jar file in RPackageUtilsSuite --- .../scala/org/apache/spark/deploy/RPackageUtilsSuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala index 13cba94578a6a..bd426c778d6db 100644 --- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -77,6 +77,7 @@ class RPackageUtilsSuite assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code") assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code") assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code") + jars.foreach(_.close()) } } @@ -144,13 +145,15 @@ class RPackageUtilsSuite IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc") val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip") assert(finalZip.exists()) - val entries = new ZipFile(finalZip).entries().asScala.map(_.getName).toSeq + val zipFile = new ZipFile(finalZip) + val entries = zipFile.entries().asScala.map(_.getName).toSeq assert(entries.contains("/test.R")) assert(entries.contains("/SparkR/abc.R")) assert(entries.contains("/SparkR/DESCRIPTION")) assert(!entries.contains("/package.zip")) assert(entries.contains("/packageTest/def.R")) assert(entries.contains("/packageTest/DESCRIPTION")) + zipFile.close() } finally { FileUtils.deleteDirectory(tempDir) } From a176adbadfa68cb0819a1e958129c4d96b42b42c Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Mon, 25 Apr 2016 22:35:35 -0700 Subject: [PATCH 10/21] Stop ssc in MasterFailureTest --- .../scala/org/apache/spark/streaming/MasterFailureTest.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala index 60c8e702352cf..fff2d6fbace3a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala @@ -164,6 +164,7 @@ object MasterFailureTest extends Logging { val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun) fileGeneratingThread.join() + ssc.stop() fs.delete(checkpointDir, true) fs.delete(testDir, true) logInfo("Finished test after " + killCount + " failures") From 35aacd29c4667550a4f870ee521ed185c5f9800c Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Tue, 26 Apr 2016 01:09:04 -0700 Subject: [PATCH 11/21] Remove accidentally added files --- build/installdep.sh | 148 -------------------------------------------- build/mvn.cmd | 24 ------- 2 files changed, 172 deletions(-) delete mode 100644 build/installdep.sh delete mode 100644 build/mvn.cmd diff --git a/build/installdep.sh b/build/installdep.sh deleted file mode 100644 index 17e36e5dd3097..0000000000000 --- a/build/installdep.sh +++ /dev/null @@ -1,148 +0,0 @@ -#!/usr/bin/env bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Determine the current working directory -_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -# Preserve the calling directory -_CALLING_DIR="$(pwd)" -# Options used during compilation -_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" - -# Installs any application tarball given a URL, the expected tarball name, -# and, optionally, a checkable binary path to determine if the binary has -# already been installed -## Arg1 - URL -## Arg2 - Tarball Name -## Arg3 - Checkable Binary -install_app() { - local remote_tarball="$1/$2" - local local_tarball="${_DIR}/$2" - local binary="${_DIR}/$3" - - # setup `curl` and `wget` silent options if we're running on Jenkins - local curl_opts="-L" - local wget_opts="" - if [ -n "$AMPLAB_JENKINS" ]; then - curl_opts="-s ${curl_opts}" - wget_opts="--quiet ${wget_opts}" - else - curl_opts="--progress-bar ${curl_opts}" - wget_opts="--progress=bar:force ${wget_opts}" - fi - - if [ -z "$3" -o ! -f "$binary" ]; then - # check if we already have the tarball - # check if we have curl installed - # download application - [ ! -f "${local_tarball}" ] && [ $(command -v curl) ] && \ - echo "exec: curl ${curl_opts} ${remote_tarball}" 1>&2 && \ - curl ${curl_opts} "${remote_tarball}" > "${local_tarball}" - # if the file still doesn't exist, lets try `wget` and cross our fingers - [ ! -f "${local_tarball}" ] && [ $(command -v wget) ] && \ - echo "exec: wget ${wget_opts} ${remote_tarball}" 1>&2 && \ - wget ${wget_opts} -O "${local_tarball}" "${remote_tarball}" - # if both were unsuccessful, exit - [ ! -f "${local_tarball}" ] && \ - echo -n "ERROR: Cannot download $2 with cURL or wget; " && \ - echo "please install manually and try again." && \ - exit 2 - cd "${_DIR}" && tar -xzf "$2" - rm -rf "$local_tarball" - fi -} - -# Install maven under the build/ folder -install_mvn() { - local MVN_VERSION="3.3.9" - local APACHE_MIRROR=${APACHE_MIRROR:-'https://www.apache.org/dyn/closer.lua?action=download&filename='} - - install_app \ - "${APACHE_MIRROR}/maven/maven-3/${MVN_VERSION}/binaries" \ - "apache-maven-${MVN_VERSION}-bin.tar.gz" \ - "apache-maven-${MVN_VERSION}/bin/mvn" - - MVN_BIN="${_DIR}/apache-maven-${MVN_VERSION}/bin/mvn" -} - -# Install zinc under the build/ folder -install_zinc() { - local zinc_path="zinc-0.3.9/bin/zinc" - [ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1 - local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com} - - install_app \ - "${TYPESAFE_MIRROR}/zinc/0.3.9" \ - "zinc-0.3.9.tgz" \ - "${zinc_path}" - ZINC_BIN="${_DIR}/${zinc_path}" -} - -# Determine the Scala version from the root pom.xml file, set the Scala URL, -# and, with that, download the specific version of Scala necessary under -# the build/ folder -install_scala() { - # determine the Scala version used in Spark - local scala_version=`grep "scala.version" "${_DIR}/../pom.xml" | \ - head -1 | cut -f2 -d'>' | cut -f1 -d'<'` - local scala_bin="${_DIR}/scala-${scala_version}/bin/scala" - local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com} - - install_app \ - "${TYPESAFE_MIRROR}/scala/${scala_version}" \ - "scala-${scala_version}.tgz" \ - "scala-${scala_version}/bin/scala" - - SCALA_COMPILER="$(cd "$(dirname "${scala_bin}")/../lib" && pwd)/scala-compiler.jar" - SCALA_LIBRARY="$(cd "$(dirname "${scala_bin}")/../lib" && pwd)/scala-library.jar" -} - -# Setup healthy defaults for the Zinc port if none were provided from -# the environment -ZINC_PORT=${ZINC_PORT:-"3030"} - -# Check for the `--force` flag dictating that `mvn` should be downloaded -# regardless of whether the system already has a `mvn` install -if [ "$1" == "--force" ]; then - FORCE_MVN=1 - shift -fi - -# Install Maven if necessary -MVN_BIN="$(command -v mvn)" - -if [ ! "$MVN_BIN" -o -n "$FORCE_MVN" ]; then - install_mvn -fi - -# Install the proper version of Scala and Zinc for the build -install_zinc -install_scala - -# Reset the current working directory -cd "${_CALLING_DIR}" - -# Now that zinc is ensured to be installed, check its status and, if its -# not running or just installed, start it -if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then - export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"} - "${ZINC_BIN}" -shutdown -port ${ZINC_PORT} - "${ZINC_BIN}" -start -port ${ZINC_PORT} \ - -scala-compiler "${SCALA_COMPILER}" \ - -scala-library "${SCALA_LIBRARY}" &>/dev/null -fi diff --git a/build/mvn.cmd b/build/mvn.cmd deleted file mode 100644 index f5da15c55c5c5..0000000000000 --- a/build/mvn.cmd +++ /dev/null @@ -1,24 +0,0 @@ -@echo off -set _DIR=%~dp0 -set ZINC_PORT=3030 -bash %_DIR%installdep.sh -set _CALLING_DIR="%_DIR%..\" -set _COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" - - -if defined MAVEN_OPTS ( - echo %MAVEN_OPTS% -) else ( - MAVEN_OPTS=%_COMPILE_JVM_OPTS% -) - -where /q mvn -IF ERRORLEVEL 1 ( - set MAVEN_BIN="%_DIR%\apache-maven-3.3.3\bin\mvn" -) ELSE ( - set MAVEN_BIN="mvn" -) - -ECHO "using maven from %MAVEN_BIN%" - -%MAVEN_BIN% -DzincPort=%ZINC_PORT% %* From 9f50128da0660fed97d64b8a5e0d63285dbf93d5 Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Mon, 2 May 2016 23:11:12 -0700 Subject: [PATCH 12/21] Code cleanup with respect to comments --- .../org/apache/spark/streaming/CheckpointSuite.scala | 8 ++++---- .../org/apache/spark/streaming/MapWithStateSuite.scala | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index e9762debd012c..32d17e366bb20 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -642,11 +642,11 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester val fileStream = ssc.textFileStream(testDir.toString) // Make value 3 take a large time to process, to ensure that the driver // shuts down in the middle of processing the 3rd batch - CheckpointSuite.batchThreeShouldBlockIndefinitely = true + CheckpointSuite.batchThreeShouldBlockALongTime = true val mappedStream = fileStream.map(s => { val i = s.toInt if (i == 3) { - if (CheckpointSuite.batchThreeShouldBlockIndefinitely) { + if (CheckpointSuite.batchThreeShouldBlockALongTime) { // It's not a good idea to let the thread run forever // as resource won't be correctly released Thread.sleep(6000) @@ -693,7 +693,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester } // The original StreamingContext has now been stopped. - CheckpointSuite.batchThreeShouldBlockIndefinitely = false + CheckpointSuite.batchThreeShouldBlockALongTime = false // Create files while the streaming driver is down for (i <- Seq(4, 5, 6)) { @@ -930,5 +930,5 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester } private object CheckpointSuite extends Serializable { - var batchThreeShouldBlockIndefinitely: Boolean = true + var batchThreeShouldBlockALongTime: Boolean = true } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala index 28b4fba5caef3..55c048b3c586a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala @@ -38,11 +38,11 @@ class MapWithStateSuite extends SparkFunSuite protected val batchDuration = Seconds(1) before { - StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } + StreamingContext.getActive().foreach(_.stop(stopSparkContext = false)) } after { - StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } + StreamingContext.getActive().foreach(_.stop(stopSparkContext = false)) } override def beforeAll(): Unit = { @@ -60,9 +60,9 @@ class MapWithStateSuite extends SparkFunSuite } } finally { super.afterAll() - } - if (checkpointDir != null) { - Utils.deleteRecursively(checkpointDir) + if (checkpointDir != null) { + Utils.deleteRecursively(checkpointDir) + } } } From 55b360e276968eecea970267b0fa438b56e5e703 Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Wed, 4 May 2016 20:51:50 -0700 Subject: [PATCH 13/21] Style fixes --- .../scala/org/apache/spark/streaming/CheckpointSuite.scala | 4 ++-- .../scala/org/apache/spark/streaming/MapWithStateSuite.scala | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 32d17e366bb20..6904bbde619d7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -643,7 +643,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester // Make value 3 take a large time to process, to ensure that the driver // shuts down in the middle of processing the 3rd batch CheckpointSuite.batchThreeShouldBlockALongTime = true - val mappedStream = fileStream.map(s => { + val mappedStream = fileStream.map{ s => val i = s.toInt if (i == 3) { if (CheckpointSuite.batchThreeShouldBlockALongTime) { @@ -653,7 +653,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester } } i - }) + } // Reducing over a large window to ensure that recovery from driver failure // requires reprocessing of all the files seen before the failure diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala index 55c048b3c586a..f8a329a4c075c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala @@ -60,9 +60,7 @@ class MapWithStateSuite extends SparkFunSuite } } finally { super.afterAll() - if (checkpointDir != null) { - Utils.deleteRecursively(checkpointDir) - } + Utils.deleteRecursively(checkpointDir) } } From 91f82b5fac48afefeceadd764ff0e7b61944d875 Mon Sep 17 00:00:00 2001 From: "U-FAREAST\\tl" Date: Thu, 5 May 2016 00:29:32 -0700 Subject: [PATCH 14/21] Minor code cleanup --- .../scheduler/EventLoggingListenerSuite.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index e6d2adfa84ef9..e5e4e037ad665 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -202,21 +202,21 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit // Make sure expected events exist in the log file. val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) + val eventSet = mutable.Set( + SparkListenerApplicationStart, + SparkListenerBlockManagerAdded, + SparkListenerExecutorAdded, + SparkListenerEnvironmentUpdate, + SparkListenerJobStart, + SparkListenerJobEnd, + SparkListenerStageSubmitted, + SparkListenerStageCompleted, + SparkListenerTaskStart, + SparkListenerTaskEnd, + SparkListenerApplicationEnd).map(Utils.getFormattedClassName) try { val logStart = SparkListenerLogStart(SPARK_VERSION) val lines = readLines(logData) - val eventSet = mutable.Set( - SparkListenerApplicationStart, - SparkListenerBlockManagerAdded, - SparkListenerExecutorAdded, - SparkListenerEnvironmentUpdate, - SparkListenerJobStart, - SparkListenerJobEnd, - SparkListenerStageSubmitted, - SparkListenerStageCompleted, - SparkListenerTaskStart, - SparkListenerTaskEnd, - SparkListenerApplicationEnd).map(Utils.getFormattedClassName) lines.foreach { line => eventSet.foreach { event => if (line.contains(event)) { From f3713d1ff59e5bd45a8a207eaf36ab8e6c285812 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 25 Oct 2016 14:44:15 +0900 Subject: [PATCH 15/21] ex -> e and indentation --- .../org/apache/spark/rdd/ReliableCheckpointRDD.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index eac901d10067c..0cd7c0ce7f659 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -239,7 +239,14 @@ private[spark] object ReliableCheckpointRDD extends Logging { val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration) val fileInputStream = fs.open(partitionerFilePath, bufferSize) val serializer = SparkEnv.get.serializer.newInstance() - val deserializeStream = serializer.deserializeStream(fileInputStream) + val deserializeStream = try { + serializer.deserializeStream(fileInputStream) + } catch { + case e : Throwable => + fileInputStream.close() + throw e + } + val partitioner = Utils.tryWithSafeFinally[Partitioner] { deserializeStream.readObject[Partitioner] } { From 863ea7f66d4919a3e2c8ee6e3ca575a80a3115dc Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 28 Oct 2016 00:55:39 +0900 Subject: [PATCH 16/21] Use Utils.tryWithSafeFinally where possible. --- .../spark/rdd/ReliableCheckpointRDD.scala | 20 +++++------ .../scala/org/apache/spark/FileSuite.scala | 4 +-- .../spark/deploy/RPackageUtilsSuite.scala | 36 +++++++++++-------- .../scheduler/EventLoggingListenerSuite.scala | 4 +-- .../scheduler/TaskResultGetterSuite.scala | 4 +-- .../spark/mllib/util/MLUtilsSuite.scala | 14 ++++---- .../spark/streaming/CheckpointSuite.scala | 2 +- 7 files changed, 45 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index 0cd7c0ce7f659..9f800e3a0953c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -239,19 +239,17 @@ private[spark] object ReliableCheckpointRDD extends Logging { val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration) val fileInputStream = fs.open(partitionerFilePath, bufferSize) val serializer = SparkEnv.get.serializer.newInstance() - val deserializeStream = try { - serializer.deserializeStream(fileInputStream) - } catch { - case e : Throwable => - fileInputStream.close() - throw e - } - - val partitioner = Utils.tryWithSafeFinally[Partitioner] { - deserializeStream.readObject[Partitioner] + val partitioner = Utils.tryWithSafeFinally { + val deserializeStream = serializer.deserializeStream(fileInputStream) + Utils.tryWithSafeFinally { + deserializeStream.readObject[Partitioner] + } { + deserializeStream.close() + } } { - deserializeStream.close() + fileInputStream.close() } + logDebug(s"Read partitioner from $partitionerFilePath") Some(partitioner) } catch { diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 40af728159ae2..89f0b1cb5b56a 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -59,12 +59,12 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { // Read the plain text file and check it's OK val outputFile = new File(outputDir, "part-00000") val bufferSrc = Source.fromFile(outputFile) - try { + Utils.tryWithSafeFinally { val content = bufferSrc.mkString assert(content === "1\n2\n3\n4\n") // Also try reading it in as a text file RDD assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4")) - } finally { + } { bufferSrc.close() } } diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala index bd426c778d6db..005587051b6ad 100644 --- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -33,7 +33,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate -import org.apache.spark.util.ResetSystemProperties +import org.apache.spark.util.{ResetSystemProperties, Utils} class RPackageUtilsSuite extends SparkFunSuite @@ -74,10 +74,13 @@ class RPackageUtilsSuite val deps = Seq(dep1, dep2).mkString(",") IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo => val jars = Seq(main, dep1, dep2).map(c => new JarFile(getJarPath(c, new File(new URI(repo))))) - assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code") - assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code") - assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code") - jars.foreach(_.close()) + Utils.tryWithSafeFinally { + assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code") + assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code") + assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code") + } { + jars.foreach(_.close()) + } } } @@ -132,7 +135,7 @@ class RPackageUtilsSuite test("SparkR zipping works properly") { val tempDir = Files.createTempDir() - try { + Utils.tryWithSafeFinally { IvyTestUtils.writeFile(tempDir, "test.R", "abc") val fakeSparkRDir = new File(tempDir, "SparkR") assert(fakeSparkRDir.mkdirs()) @@ -146,15 +149,18 @@ class RPackageUtilsSuite val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip") assert(finalZip.exists()) val zipFile = new ZipFile(finalZip) - val entries = zipFile.entries().asScala.map(_.getName).toSeq - assert(entries.contains("/test.R")) - assert(entries.contains("/SparkR/abc.R")) - assert(entries.contains("/SparkR/DESCRIPTION")) - assert(!entries.contains("/package.zip")) - assert(entries.contains("/packageTest/def.R")) - assert(entries.contains("/packageTest/DESCRIPTION")) - zipFile.close() - } finally { + Utils.tryWithSafeFinally { + val entries = zipFile.entries().asScala.map(_.getName).toSeq + assert(entries.contains("/test.R")) + assert(entries.contains("/SparkR/abc.R")) + assert(entries.contains("/SparkR/DESCRIPTION")) + assert(!entries.contains("/package.zip")) + assert(entries.contains("/packageTest/def.R")) + assert(entries.contains("/packageTest/DESCRIPTION")) + } { + zipFile.close() + } + } { FileUtils.deleteDirectory(tempDir) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index e5e4e037ad665..8a5ec37eeb66c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -214,7 +214,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit SparkListenerTaskStart, SparkListenerTaskEnd, SparkListenerApplicationEnd).map(Utils.getFormattedClassName) - try { + Utils.tryWithSafeFinally { val logStart = SparkListenerLogStart(SPARK_VERSION) val lines = readLines(logData) lines.foreach { line => @@ -230,7 +230,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq) - } finally { + } { logData.close() } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index 173276f77f379..f48ab5513efcf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -184,7 +184,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local // ensure we reset the classloader after the test completes val originalClassLoader = Thread.currentThread.getContextClassLoader val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader) - try { + Utils.tryWithSafeFinally { // load the exception from the jar loader.addURL(jarFile.toURI.toURL) Thread.currentThread().setContextClassLoader(loader) @@ -209,7 +209,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local assert(expectedFailure.findFirstMatchIn(exceptionMessage).isDefined) assert(unknownFailure.findFirstMatchIn(exceptionMessage).isEmpty) - } finally { + } { loader.close() Thread.currentThread.setContextClassLoader(originalClassLoader) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 69842f18bcfc7..665708a780c48 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -158,12 +158,14 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { val sources = outputDir.listFiles() .filter(_.getName.startsWith("part-")) .map(Source.fromFile) - - val lines = sources.flatMap(_.getLines()).toSet - val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03") - assert(lines === expected) - sources.foreach(_.close()) - Utils.deleteRecursively(tempDir) + Utils.tryWithSafeFinally { + val lines = sources.flatMap(_.getLines()).toSet + val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03") + assert(lines === expected) + } { + sources.foreach(_.close()) + Utils.deleteRecursively(tempDir) + } } test("appendBias") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 6904bbde619d7..41f16bfa5fc7f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -643,7 +643,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester // Make value 3 take a large time to process, to ensure that the driver // shuts down in the middle of processing the 3rd batch CheckpointSuite.batchThreeShouldBlockALongTime = true - val mappedStream = fileStream.map{ s => + val mappedStream = fileStream.map { s => val i = s.toInt if (i == 3) { if (CheckpointSuite.batchThreeShouldBlockALongTime) { From 3949dbeda632e215c64dcc089365bdc7334dacaf Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 29 Oct 2016 11:02:43 +0900 Subject: [PATCH 17/21] close loader later --- .../org/apache/spark/scheduler/TaskResultGetterSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index f48ab5513efcf..ee95e4ff7dbc3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -210,8 +210,8 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local assert(expectedFailure.findFirstMatchIn(exceptionMessage).isDefined) assert(unknownFailure.findFirstMatchIn(exceptionMessage).isEmpty) } { - loader.close() Thread.currentThread.setContextClassLoader(originalClassLoader) + loader.close() } } From 49cb4e7f259ba0a236b0a977e69719cfc165c265 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 30 Oct 2016 21:07:32 +0900 Subject: [PATCH 18/21] Initialize receivedBlockTracker in start() --- .../streaming/scheduler/ReceiverTracker.scala | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 6f8222d0b7a0c..183e5cd1f5196 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -106,14 +106,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false private val receiverInputStreams = ssc.graph.getReceiverInputStreams() private val receiverInputStreamIds = receiverInputStreams.map { _.id } - private val receivedBlockTracker = new ReceivedBlockTracker( - ssc.sparkContext.conf, - ssc.sparkContext.hadoopConfiguration, - receiverInputStreamIds, - ssc.scheduler.clock, - ssc.isCheckpointPresent, - Option(ssc.checkpointDir) - ) + private var receivedBlockTracker: ReceivedBlockTracker = null private val listenerBus = ssc.scheduler.listenerBus /** Enumeration to identify current state of the ReceiverTracker */ @@ -154,6 +147,17 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false throw new SparkException("ReceiverTracker already started") } + if (receivedBlockTracker == null) { + receivedBlockTracker = new ReceivedBlockTracker( + ssc.sparkContext.conf, + ssc.sparkContext.hadoopConfiguration, + receiverInputStreamIds, + ssc.scheduler.clock, + ssc.isCheckpointPresent, + Option(ssc.checkpointDir) + ) + } + if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) @@ -194,17 +198,15 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Finally, stop the endpoint ssc.env.rpcEnv.stop(endpoint) endpoint = null + receivedBlockTracker.stop() logInfo("ReceiverTracker stopped") trackerState = Stopped } - - // note that the output writer is created at construction time, we have to close - // them even if it hasn't been started. - receivedBlockTracker.stop() } /** Allocate all unallocated blocks to the given batch. */ def allocateBlocksToBatch(batchTime: Time): Unit = { + require(receivedBlockTracker != null, "ReceiverTracker should be started first.") if (receiverInputStreams.nonEmpty) { receivedBlockTracker.allocateBlocksToBatch(batchTime) } @@ -212,11 +214,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Get the blocks for the given batch and all input streams. */ def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = { + require(receivedBlockTracker != null, "ReceiverTracker should be started first.") receivedBlockTracker.getBlocksOfBatch(batchTime) } /** Get the blocks allocated to the given batch and stream. */ def getBlocksOfBatchAndStream(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = { + require(receivedBlockTracker != null, "ReceiverTracker should be started first.") receivedBlockTracker.getBlocksOfBatchAndStream(batchTime, streamId) } @@ -225,6 +229,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false * older than the threshold time. Note that this does not */ def cleanupOldBlocksAndBatches(cleanupThreshTime: Time) { + require(receivedBlockTracker != null, "ReceiverTracker should be started first.") // Clean up old block and batch metadata receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false) @@ -344,6 +349,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Add new blocks for the given stream */ private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { + require(receivedBlockTracker != null, "ReceiverTracker should be started first.") receivedBlockTracker.addBlock(receivedBlockInfo) } @@ -398,6 +404,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Check if any blocks are left to be processed */ def hasUnallocatedBlocks: Boolean = { + require(receivedBlockTracker != null, "ReceiverTracker should be started first.") receivedBlockTracker.hasUnallocatedReceivedBlocks } From 15215722dfe2de0785d38cce5713f33fac5e4b03 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 2 Nov 2016 00:43:39 +0900 Subject: [PATCH 19/21] Require nulls for eventpoint and the tracker in start and remove other checks in methods --- .../streaming/scheduler/ReceiverTracker.scala | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 183e5cd1f5196..8e0dd6ca0e58a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -147,16 +147,17 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false throw new SparkException("ReceiverTracker already started") } - if (receivedBlockTracker == null) { - receivedBlockTracker = new ReceivedBlockTracker( - ssc.sparkContext.conf, - ssc.sparkContext.hadoopConfiguration, - receiverInputStreamIds, - ssc.scheduler.clock, - ssc.isCheckpointPresent, - Option(ssc.checkpointDir) - ) - } + require(endpoint == null) + require(receivedBlockTracker == null) + + receivedBlockTracker = new ReceivedBlockTracker( + ssc.sparkContext.conf, + ssc.sparkContext.hadoopConfiguration, + receiverInputStreamIds, + ssc.scheduler.clock, + ssc.isCheckpointPresent, + Option(ssc.checkpointDir) + ) if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( @@ -199,6 +200,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ssc.env.rpcEnv.stop(endpoint) endpoint = null receivedBlockTracker.stop() + receivedBlockTracker = null logInfo("ReceiverTracker stopped") trackerState = Stopped } @@ -206,7 +208,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Allocate all unallocated blocks to the given batch. */ def allocateBlocksToBatch(batchTime: Time): Unit = { - require(receivedBlockTracker != null, "ReceiverTracker should be started first.") if (receiverInputStreams.nonEmpty) { receivedBlockTracker.allocateBlocksToBatch(batchTime) } @@ -214,13 +215,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Get the blocks for the given batch and all input streams. */ def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = { - require(receivedBlockTracker != null, "ReceiverTracker should be started first.") receivedBlockTracker.getBlocksOfBatch(batchTime) } /** Get the blocks allocated to the given batch and stream. */ def getBlocksOfBatchAndStream(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = { - require(receivedBlockTracker != null, "ReceiverTracker should be started first.") receivedBlockTracker.getBlocksOfBatchAndStream(batchTime, streamId) } @@ -229,7 +228,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false * older than the threshold time. Note that this does not */ def cleanupOldBlocksAndBatches(cleanupThreshTime: Time) { - require(receivedBlockTracker != null, "ReceiverTracker should be started first.") // Clean up old block and batch metadata receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false) @@ -349,7 +347,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Add new blocks for the given stream */ private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { - require(receivedBlockTracker != null, "ReceiverTracker should be started first.") receivedBlockTracker.addBlock(receivedBlockInfo) } @@ -404,7 +401,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Check if any blocks are left to be processed */ def hasUnallocatedBlocks: Boolean = { - require(receivedBlockTracker != null, "ReceiverTracker should be started first.") receivedBlockTracker.hasUnallocatedReceivedBlocks } From a9a5f061b8536c8b3173bb5701cb0de0bad99fcc Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 6 Nov 2016 16:28:20 +0900 Subject: [PATCH 20/21] Get rid of the changes in ReceiverTracker --- .../streaming/scheduler/ReceiverTracker.scala | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 8e0dd6ca0e58a..b9d898a72362e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -106,7 +106,14 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false private val receiverInputStreams = ssc.graph.getReceiverInputStreams() private val receiverInputStreamIds = receiverInputStreams.map { _.id } - private var receivedBlockTracker: ReceivedBlockTracker = null + private val receivedBlockTracker = new ReceivedBlockTracker( + ssc.sparkContext.conf, + ssc.sparkContext.hadoopConfiguration, + receiverInputStreamIds, + ssc.scheduler.clock, + ssc.isCheckpointPresent, + Option(ssc.checkpointDir) + ) private val listenerBus = ssc.scheduler.listenerBus /** Enumeration to identify current state of the ReceiverTracker */ @@ -147,18 +154,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false throw new SparkException("ReceiverTracker already started") } - require(endpoint == null) - require(receivedBlockTracker == null) - - receivedBlockTracker = new ReceivedBlockTracker( - ssc.sparkContext.conf, - ssc.sparkContext.hadoopConfiguration, - receiverInputStreamIds, - ssc.scheduler.clock, - ssc.isCheckpointPresent, - Option(ssc.checkpointDir) - ) - if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) @@ -200,7 +195,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ssc.env.rpcEnv.stop(endpoint) endpoint = null receivedBlockTracker.stop() - receivedBlockTracker = null logInfo("ReceiverTracker stopped") trackerState = Stopped } From d680a2f47f25daf2a9fe18a84fe081a688348fb9 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 7 Nov 2016 21:53:15 +0900 Subject: [PATCH 21/21] Revert the changes in MapWithStateSuite --- .../org/apache/spark/streaming/MapWithStateSuite.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala index f8a329a4c075c..3b662ec1833aa 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala @@ -38,16 +38,19 @@ class MapWithStateSuite extends SparkFunSuite protected val batchDuration = Seconds(1) before { - StreamingContext.getActive().foreach(_.stop(stopSparkContext = false)) + StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } + checkpointDir = Utils.createTempDir("checkpoint") } after { - StreamingContext.getActive().foreach(_.stop(stopSparkContext = false)) + StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } + if (checkpointDir != null) { + Utils.deleteRecursively(checkpointDir) + } } override def beforeAll(): Unit = { super.beforeAll() - checkpointDir = Utils.createTempDir("checkpoint") val conf = new SparkConf().setMaster("local").setAppName("MapWithStateSuite") conf.set("spark.streaming.clock", classOf[ManualClock].getName()) sc = new SparkContext(conf) @@ -60,7 +63,6 @@ class MapWithStateSuite extends SparkFunSuite } } finally { super.afterAll() - Utils.deleteRecursively(checkpointDir) } }