diff --git a/pom.xml b/pom.xml
index 1adff2243e816..288bbf1114bea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -406,12 +406,6 @@
akka-slf4j_${scala.binary.version}
${akka.version}
-
- org.apache.hadoop
- hadoop-minicluster
- ${hadoop.version}
- test
-
${akka.group}
akka-testkit_${scala.binary.version}
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 5bb5f3e159e3f..12f900c91eb98 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -68,11 +68,6 @@
junit-interface
test
-
- org.apache.hadoop
- hadoop-minicluster
- test
-
target/scala-${scala.binary.version}/classes
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index 5449b87e65b8e..4a6f3006922cd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -17,13 +17,12 @@
package org.apache.spark.streaming.util
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, FSDataOutputStream, Path}
private[streaming] object HdfsUtils {
def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that
-
val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
// If the file exists and we have append support, append instead of creating a new file
@@ -63,6 +62,10 @@ private[streaming] object HdfsUtils {
}
def getFileSystemForPath(path: Path, conf: Configuration) = synchronized {
- path.getFileSystem(conf)
+ val fs = path.getFileSystem(conf)
+ fs match {
+ case localFs: LocalFileSystem => localFs.getRawFileSystem
+ case _ => fs
+ }
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
index 47f1e2544caea..d8341f0b1c936 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
@@ -73,6 +73,7 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura
}
private def flush() {
+ stream.getWrappedStream.flush()
hadoopFlushMethod.foreach {
_.invoke(stream)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 21a2c6a58b0c1..bd21f462b46b4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.util
import java.io._
import java.nio.ByteBuffer
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.Path
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
@@ -31,54 +31,44 @@ import WriteAheadLogSuite._
import com.google.common.io.Files
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hdfs.MiniDFSCluster
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}
+import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.util.Utils
import org.scalatest.concurrent.Eventually._
-class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll {
+class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
val hadoopConf = new Configuration()
- val dfsDir = Files.createTempDir()
- val TEST_BUILD_DATA_KEY: String = "test.build.data"
- val oldTestBuildDataProp = Option(System.getProperty(TEST_BUILD_DATA_KEY))
- System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString)
- val cluster = new MiniDFSCluster(new Configuration, 2, true, null)
- val nnPort = cluster.getNameNode.getNameNodeAddress.getPort
- val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/"
- var pathForTest: String = null
-
- override def beforeAll() {
- cluster.waitActive()
- }
+ var tempDir: File = null
+ var dirForTest: String = null
+ var fileForTest: String = null
before {
- pathForTest = hdfsUrl + getRandomString()
+ tempDir = Files.createTempDir()
+ dirForTest = "file:///" + tempDir.toString
+ fileForTest = "file:///" + new File(tempDir, getRandomString()).toString
}
- override def afterAll() {
- cluster.shutdown()
- FileUtils.deleteDirectory(dfsDir)
- oldTestBuildDataProp.foreach(System.setProperty(TEST_BUILD_DATA_KEY, _))
+ after {
+ FileUtils.deleteDirectory(tempDir)
}
test("WriteAheadLogWriter - writing data") {
val dataToWrite = generateRandomData()
- val writer = new WriteAheadLogWriter(pathForTest, hadoopConf)
+ val writer = new WriteAheadLogWriter(fileForTest, hadoopConf)
val segments = dataToWrite.map(data => writer.write(data))
writer.close()
- val writtenData = readDataManually(pathForTest, segments)
+ val writtenData = readDataManually(fileForTest, segments)
assert(writtenData.toArray === dataToWrite.toArray)
}
test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
val dataToWrite = generateRandomData()
- val writer = new WriteAheadLogWriter(pathForTest, hadoopConf)
+ val writer = new WriteAheadLogWriter(fileForTest, hadoopConf)
dataToWrite.foreach { data =>
- val segment = writer.write(ByteBuffer.wrap(data.getBytes()))
- val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf)
+ val segment = writer.write(stringToByteBuffer(data))
+ val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf)
val dataRead = reader.read(segment)
- assert(data === new String(dataRead.array()))
+ assert(data === byteBufferToString(dataRead))
}
writer.close()
}
@@ -86,8 +76,8 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogReader - sequentially reading data") {
// Write data manually for testing the sequential reader
val writtenData = generateRandomData()
- writeDataManually(writtenData, pathForTest)
- val reader = new WriteAheadLogReader(pathForTest, hadoopConf)
+ writeDataManually(writtenData, fileForTest)
+ val reader = new WriteAheadLogReader(fileForTest, hadoopConf)
val readData = reader.toSeq.map(byteBufferToString)
assert(readData.toList === writtenData.toList)
assert(reader.hasNext === false)
@@ -100,9 +90,9 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogReader - sequentially reading data written with writer") {
// Write data manually for testing the sequential reader
val dataToWrite = generateRandomData()
- writeDataUsingWriter(pathForTest, dataToWrite)
+ writeDataUsingWriter(fileForTest, dataToWrite)
val iter = dataToWrite.iterator
- val reader = new WriteAheadLogReader(pathForTest, hadoopConf)
+ val reader = new WriteAheadLogReader(fileForTest, hadoopConf)
reader.foreach { byteBuffer =>
assert(byteBufferToString(byteBuffer) === iter.next())
}
@@ -112,11 +102,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogRandomReader - reading data using random reader") {
// Write data manually for testing the random reader
val writtenData = generateRandomData()
- val segments = writeDataManually(writtenData, pathForTest)
+ val segments = writeDataManually(writtenData, fileForTest)
// Get a random order of these segments and read them back
val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten
- val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf)
+ val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf)
writtenDataAndSegments.foreach { case (data, segment) =>
assert(data === byteBufferToString(reader.read(segment)))
}
@@ -126,11 +116,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
// Write data using writer for testing the random reader
val data = generateRandomData()
- val segments = writeDataUsingWriter(pathForTest, data)
+ val segments = writeDataUsingWriter(fileForTest, data)
// Read a random sequence of segments and verify read data
val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten
- val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf)
+ val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf)
dataAndSegments.foreach { case (data, segment) =>
assert(data === byteBufferToString(reader.read(segment)))
}
@@ -140,11 +130,10 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogManager - write rotating logs") {
// Write data using manager
val dataToWrite = generateRandomData()
- val dir = pathForTest
- writeDataUsingManager(dir, dataToWrite)
+ writeDataUsingManager(dirForTest, dataToWrite)
// Read data manually to verify the written data
- val logFiles = getLogFilesInDirectory(dir)
+ val logFiles = getLogFilesInDirectory(dirForTest)
assert(logFiles.size > 1)
val writtenData = logFiles.flatMap { file => readDataManually(file)}
assert(writtenData.toList === dataToWrite.toList)
@@ -152,50 +141,47 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogManager - read rotating logs") {
// Write data manually for testing reading through manager
- val dir = pathForTest
val writtenData = (1 to 10).map { i =>
val data = generateRandomData()
- val file = dir + s"/log-$i-$i"
+ val file = dirForTest + s"/log-$i-$i"
writeDataManually(data, file)
data
}.flatten
- val logDirectoryPath = new Path(dir)
+ val logDirectoryPath = new Path(dirForTest)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
assert(fileSystem.exists(logDirectoryPath) === true)
// Read data using manager and verify
- val readData = readDataUsingManager(dir)
+ val readData = readDataUsingManager(dirForTest)
assert(readData.toList === writtenData.toList)
}
test("WriteAheadLogManager - recover past logs when creating new manager") {
// Write data with manager, recover with new manager and verify
val dataToWrite = generateRandomData()
- val dir = pathForTest
- writeDataUsingManager(dir, dataToWrite)
- val logFiles = getLogFilesInDirectory(dir)
+ writeDataUsingManager(dirForTest, dataToWrite)
+ val logFiles = getLogFilesInDirectory(dirForTest)
assert(logFiles.size > 1)
- val readData = readDataUsingManager(dir)
+ val readData = readDataUsingManager(dirForTest)
assert(dataToWrite.toList === readData.toList)
}
test("WriteAheadLogManager - cleanup old logs") {
// Write data with manager, recover with new manager and verify
- val dir = pathForTest
val dataToWrite = generateRandomData()
val fakeClock = new ManualClock
- val manager = new WriteAheadLogManager(dir, hadoopConf,
+ val manager = new WriteAheadLogManager(dirForTest, hadoopConf,
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
dataToWrite.foreach { item =>
fakeClock.addToTime(500) // half second for each
manager.writeToLog(item)
}
- val logFiles = getLogFilesInDirectory(dir)
+ val logFiles = getLogFilesInDirectory(dirForTest)
assert(logFiles.size > 1)
manager.cleanupOldLogs(fakeClock.currentTime() / 2)
eventually(timeout(1 second), interval(10 milliseconds)) {
- assert(getLogFilesInDirectory(dir).size < logFiles.size)
+ assert(getLogFilesInDirectory(dirForTest).size < logFiles.size)
}
}
// TODO (Hari, TD): Test different failure conditions of writers and readers.
@@ -305,7 +291,7 @@ object WriteAheadLogSuite {
if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
fileSystem.listStatus(logDirectoryPath).map {
_.getPath.toString
- }
+ }.sorted
} else {
Seq.empty
}