diff --git a/dev/run-tests b/dev/run-tests index 861d1671182c..1b88cf40b158 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -113,7 +113,7 @@ echo "=========================================================================" CURRENT_BLOCK=$BLOCK_RAT -./dev/check-license +# ./dev/check-license echo "" echo "=========================================================================" @@ -122,7 +122,7 @@ echo "=========================================================================" CURRENT_BLOCK=$BLOCK_SCALA_STYLE -./dev/lint-scala +# ./dev/lint-scala echo "" echo "=========================================================================" @@ -131,7 +131,7 @@ echo "=========================================================================" CURRENT_BLOCK=$BLOCK_PYTHON_STYLE -./dev/lint-python +# ./dev/lint-python echo "" echo "=========================================================================" @@ -173,7 +173,7 @@ CURRENT_BLOCK=$BLOCK_BUILD build/mvn $HIVE_BUILD_ARGS clean package -DskipTests else echo -e "q\n" \ - | build/sbt $HIVE_BUILD_ARGS package assembly/assembly streaming-kafka-assembly/assembly \ + | build/sbt -Dhadoop.version=1.0.4 package assembly/assembly streaming-kafka-assembly/assembly \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" fi } @@ -185,7 +185,7 @@ echo "=========================================================================" CURRENT_BLOCK=$BLOCK_MIMA -./dev/mima +# ./dev/mima echo "" echo "=========================================================================" @@ -222,9 +222,10 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS # "${SBT_MAVEN_TEST_ARGS[@]}" is cool because it's an array. # QUESTION: Why doesn't 'yes "q"' work? # QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work? - echo -e "q\n" \ - | build/sbt $SBT_MAVEN_PROFILES_ARGS "${SBT_MAVEN_TEST_ARGS[@]}" \ - | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" + + echo "Running this" + + build/sbt -Dhadoop.version=1.0.4 "streaming/test-only *WriteAheadLog*" "streaming/test-only *JobGenerator*" fi } @@ -247,8 +248,9 @@ echo "=========================================================================" CURRENT_BLOCK=$BLOCK_SPARKR_UNIT_TESTS if [ $(command -v R) ]; then - ./R/install-dev.sh - ./R/run-tests.sh + #./R/install-dev.sh + #./R/run-tests.sh + echo "" else echo "Ignoring SparkR tests as R was not found in PATH" fi diff --git a/python/run-tests b/python/run-tests index 88b63b84fdc2..9d372356e084 100755 --- a/python/run-tests +++ b/python/run-tests @@ -134,10 +134,10 @@ fi echo "Testing with Python version:" $PYSPARK_PYTHON --version -run_core_tests -run_sql_tests -run_mllib_tests -run_ml_tests +# run_core_tests +# run_sql_tests +# run_mllib_tests +# run_ml_tests run_streaming_tests # Try to test with Python 3 @@ -146,10 +146,10 @@ if [ $(which python3.4) ]; then echo "Testing with Python3.4 version:" $PYSPARK_PYTHON --version - run_core_tests - run_sql_tests - run_mllib_tests - run_ml_tests + # run_core_tests + # run_sql_tests + # run_mllib_tests + # run_ml_tests run_streaming_tests fi @@ -159,8 +159,8 @@ if [ $(which pypy) ]; then echo "Testing with PyPy version:" $PYSPARK_PYTHON --version - run_core_tests - run_sql_tests + # run_core_tests + # run_sql_tests run_streaming_tests fi diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index ebdf418f4ab6..f3eebeddcc31 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -16,7 +16,9 @@ */ package org.apache.spark.streaming.rdd +import java.io.File import java.nio.ByteBuffer +import java.util.UUID import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -27,6 +29,7 @@ import org.apache.spark._ import org.apache.spark.rdd.BlockRDD import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.streaming.util._ +import org.apache.spark.util.Utils /** * Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]]. @@ -107,10 +110,13 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for // writing log data. However, the directory is not needed if data needs to be read, hence // a dummy path is provided to satisfy the method parameter requirements. - // FileBasedWriteAheadLog will not create any file or directory at that path. - val dummyDirectory = FileUtils.getTempDirectoryPath() + // FileBasedWriteAheadLog will not create any file or directory at that path. Also, + // this dummy directory should not already exist otherwise the WAL will try to recover + // past events from the directory and throw errors. + val nonExistentDirectory = new File( + FileUtils.getTempDirectory(), UUID.randomUUID().toString).getAbsolutePath writeAheadLog = WriteAheadLogUtils.createLogForReceiver( - SparkEnv.get.conf, dummyDirectory, hadoopConf) + SparkEnv.get.conf, nonExistentDirectory, hadoopConf) dataRead = writeAheadLog.read(partition.walRecordHandle) } catch { case NonFatal(e) => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 9985fedc3514..6ce3f5e8996c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.streaming.util +import java.io.File import java.nio.ByteBuffer import java.util.{Iterator => JIterator} @@ -200,9 +201,14 @@ private[streaming] class FileBasedWriteAheadLog( private def initializeOrRecover(): Unit = synchronized { val logDirectoryPath = new Path(logDirectory) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) - + println(s"Path $logDirectoryPath exists = " + fileSystem.exists(logDirectoryPath) + ", " + new File(logDirectory).exists()) + if (fileSystem.exists(logDirectoryPath)) { + println(s"Path $logDirectoryPath is dire = " + fileSystem.getFileStatus(logDirectoryPath).isDir + ", " + new File(logDirectory).isDirectory) + } if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { + println("Inside condition") val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath }) + println("BEfore clear") pastLogs.clear() pastLogs ++= logFileInfo logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")