Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ echo "========================================================================="

CURRENT_BLOCK=$BLOCK_RAT

./dev/check-license
# ./dev/check-license

echo ""
echo "========================================================================="
Expand All @@ -122,7 +122,7 @@ echo "========================================================================="

CURRENT_BLOCK=$BLOCK_SCALA_STYLE

./dev/lint-scala
# ./dev/lint-scala

echo ""
echo "========================================================================="
Expand All @@ -131,7 +131,7 @@ echo "========================================================================="

CURRENT_BLOCK=$BLOCK_PYTHON_STYLE

./dev/lint-python
# ./dev/lint-python

echo ""
echo "========================================================================="
Expand Down Expand Up @@ -173,7 +173,7 @@ CURRENT_BLOCK=$BLOCK_BUILD
build/mvn $HIVE_BUILD_ARGS clean package -DskipTests
else
echo -e "q\n" \
| build/sbt $HIVE_BUILD_ARGS package assembly/assembly streaming-kafka-assembly/assembly \
| build/sbt -Dhadoop.version=1.0.4 package assembly/assembly streaming-kafka-assembly/assembly \
| grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
fi
}
Expand All @@ -185,7 +185,7 @@ echo "========================================================================="

CURRENT_BLOCK=$BLOCK_MIMA

./dev/mima
# ./dev/mima

echo ""
echo "========================================================================="
Expand Down Expand Up @@ -222,9 +222,10 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS
# "${SBT_MAVEN_TEST_ARGS[@]}" is cool because it's an array.
# QUESTION: Why doesn't 'yes "q"' work?
# QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work?
echo -e "q\n" \
| build/sbt $SBT_MAVEN_PROFILES_ARGS "${SBT_MAVEN_TEST_ARGS[@]}" \
| grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"

echo "Running this"

build/sbt -Dhadoop.version=1.0.4 "streaming/test-only *WriteAheadLog*" "streaming/test-only *JobGenerator*"
fi
}

Expand All @@ -247,8 +248,9 @@ echo "========================================================================="
CURRENT_BLOCK=$BLOCK_SPARKR_UNIT_TESTS

if [ $(command -v R) ]; then
./R/install-dev.sh
./R/run-tests.sh
#./R/install-dev.sh
#./R/run-tests.sh
echo ""
else
echo "Ignoring SparkR tests as R was not found in PATH"
fi
Expand Down
20 changes: 10 additions & 10 deletions python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ fi
echo "Testing with Python version:"
$PYSPARK_PYTHON --version

run_core_tests
run_sql_tests
run_mllib_tests
run_ml_tests
# run_core_tests
# run_sql_tests
# run_mllib_tests
# run_ml_tests
run_streaming_tests

# Try to test with Python 3
Expand All @@ -146,10 +146,10 @@ if [ $(which python3.4) ]; then
echo "Testing with Python3.4 version:"
$PYSPARK_PYTHON --version

run_core_tests
run_sql_tests
run_mllib_tests
run_ml_tests
# run_core_tests
# run_sql_tests
# run_mllib_tests
# run_ml_tests
run_streaming_tests
fi

Expand All @@ -159,8 +159,8 @@ if [ $(which pypy) ]; then
echo "Testing with PyPy version:"
$PYSPARK_PYTHON --version

run_core_tests
run_sql_tests
# run_core_tests
# run_sql_tests
run_streaming_tests
fi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.spark.streaming.rdd

import java.io.File
import java.nio.ByteBuffer
import java.util.UUID

import scala.reflect.ClassTag
import scala.util.control.NonFatal
Expand All @@ -27,6 +29,7 @@ import org.apache.spark._
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming.util._
import org.apache.spark.util.Utils

/**
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
Expand Down Expand Up @@ -107,10 +110,13 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
// WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
// writing log data. However, the directory is not needed if data needs to be read, hence
// a dummy path is provided to satisfy the method parameter requirements.
// FileBasedWriteAheadLog will not create any file or directory at that path.
val dummyDirectory = FileUtils.getTempDirectoryPath()
// FileBasedWriteAheadLog will not create any file or directory at that path. Also,
// this dummy directory should not already exist otherwise the WAL will try to recover
// past events from the directory and throw errors.
val nonExistentDirectory = new File(
FileUtils.getTempDirectory(), UUID.randomUUID().toString).getAbsolutePath
writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
SparkEnv.get.conf, dummyDirectory, hadoopConf)
SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
dataRead = writeAheadLog.read(partition.walRecordHandle)
} catch {
case NonFatal(e) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.streaming.util

import java.io.File
import java.nio.ByteBuffer
import java.util.{Iterator => JIterator}

Expand Down Expand Up @@ -200,9 +201,14 @@ private[streaming] class FileBasedWriteAheadLog(
private def initializeOrRecover(): Unit = synchronized {
val logDirectoryPath = new Path(logDirectory)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)

println(s"Path $logDirectoryPath exists = " + fileSystem.exists(logDirectoryPath) + ", " + new File(logDirectory).exists())
if (fileSystem.exists(logDirectoryPath)) {
println(s"Path $logDirectoryPath is dire = " + fileSystem.getFileStatus(logDirectoryPath).isDir + ", " + new File(logDirectory).isDirectory)
}
if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
println("Inside condition")
val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
println("BEfore clear")
pastLogs.clear()
pastLogs ++= logFileInfo
logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
Expand Down