Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,6 @@
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
package org.apache.spark.streaming.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, FSDataOutputStream, Path}

private[streaming] object HdfsUtils {

def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that

val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
// If the file exists and we have append support, append instead of creating a new file
Expand Down Expand Up @@ -63,6 +62,10 @@ private[streaming] object HdfsUtils {
}

def getFileSystemForPath(path: Path, conf: Configuration) = synchronized {
path.getFileSystem(conf)
val fs = path.getFileSystem(conf)
fs match {
case localFs: LocalFileSystem => localFs.getRawFileSystem
case _ => fs
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura
}

private def flush() {
stream.getWrappedStream.flush()
hadoopFlushMethod.foreach {
_.invoke(stream)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.streaming.util
import java.io._
import java.nio.ByteBuffer

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.Path

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
Expand All @@ -31,63 +31,53 @@ import WriteAheadLogSuite._
import com.google.common.io.Files
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.MiniDFSCluster
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.util.Utils
import org.scalatest.concurrent.Eventually._

class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll {
class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {

val hadoopConf = new Configuration()
val dfsDir = Files.createTempDir()
val TEST_BUILD_DATA_KEY: String = "test.build.data"
val oldTestBuildDataProp = Option(System.getProperty(TEST_BUILD_DATA_KEY))
System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString)
val cluster = new MiniDFSCluster(new Configuration, 2, true, null)
val nnPort = cluster.getNameNode.getNameNodeAddress.getPort
val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/"
var pathForTest: String = null

override def beforeAll() {
cluster.waitActive()
}
var tempDir: File = null
var dirForTest: String = null
var fileForTest: String = null

before {
pathForTest = hdfsUrl + getRandomString()
tempDir = Files.createTempDir()
dirForTest = "file:///" + tempDir.toString
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good! Thats what i had in mind when I had asked to put all common initializations in before - both file and dir initialized here.

fileForTest = "file:///" + new File(tempDir, getRandomString()).toString
}

override def afterAll() {
cluster.shutdown()
FileUtils.deleteDirectory(dfsDir)
oldTestBuildDataProp.foreach(System.setProperty(TEST_BUILD_DATA_KEY, _))
after {
FileUtils.deleteDirectory(tempDir)
}

test("WriteAheadLogWriter - writing data") {
val dataToWrite = generateRandomData()
val writer = new WriteAheadLogWriter(pathForTest, hadoopConf)
val writer = new WriteAheadLogWriter(fileForTest, hadoopConf)
val segments = dataToWrite.map(data => writer.write(data))
writer.close()
val writtenData = readDataManually(pathForTest, segments)
val writtenData = readDataManually(fileForTest, segments)
assert(writtenData.toArray === dataToWrite.toArray)
}

test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
val dataToWrite = generateRandomData()
val writer = new WriteAheadLogWriter(pathForTest, hadoopConf)
val writer = new WriteAheadLogWriter(fileForTest, hadoopConf)
dataToWrite.foreach { data =>
val segment = writer.write(ByteBuffer.wrap(data.getBytes()))
val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf)
val segment = writer.write(stringToByteBuffer(data))
val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf)
val dataRead = reader.read(segment)
assert(data === new String(dataRead.array()))
assert(data === byteBufferToString(dataRead))
}
writer.close()
}

test("WriteAheadLogReader - sequentially reading data") {
// Write data manually for testing the sequential reader
val writtenData = generateRandomData()
writeDataManually(writtenData, pathForTest)
val reader = new WriteAheadLogReader(pathForTest, hadoopConf)
writeDataManually(writtenData, fileForTest)
val reader = new WriteAheadLogReader(fileForTest, hadoopConf)
val readData = reader.toSeq.map(byteBufferToString)
assert(readData.toList === writtenData.toList)
assert(reader.hasNext === false)
Expand All @@ -100,9 +90,9 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogReader - sequentially reading data written with writer") {
// Write data manually for testing the sequential reader
val dataToWrite = generateRandomData()
writeDataUsingWriter(pathForTest, dataToWrite)
writeDataUsingWriter(fileForTest, dataToWrite)
val iter = dataToWrite.iterator
val reader = new WriteAheadLogReader(pathForTest, hadoopConf)
val reader = new WriteAheadLogReader(fileForTest, hadoopConf)
reader.foreach { byteBuffer =>
assert(byteBufferToString(byteBuffer) === iter.next())
}
Expand All @@ -112,11 +102,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogRandomReader - reading data using random reader") {
// Write data manually for testing the random reader
val writtenData = generateRandomData()
val segments = writeDataManually(writtenData, pathForTest)
val segments = writeDataManually(writtenData, fileForTest)

// Get a random order of these segments and read them back
val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten
val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf)
val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf)
writtenDataAndSegments.foreach { case (data, segment) =>
assert(data === byteBufferToString(reader.read(segment)))
}
Expand All @@ -126,11 +116,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
// Write data using writer for testing the random reader
val data = generateRandomData()
val segments = writeDataUsingWriter(pathForTest, data)
val segments = writeDataUsingWriter(fileForTest, data)

// Read a random sequence of segments and verify read data
val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten
val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf)
val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf)
dataAndSegments.foreach { case (data, segment) =>
assert(data === byteBufferToString(reader.read(segment)))
}
Expand All @@ -140,62 +130,58 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogManager - write rotating logs") {
// Write data using manager
val dataToWrite = generateRandomData()
val dir = pathForTest
writeDataUsingManager(dir, dataToWrite)
writeDataUsingManager(dirForTest, dataToWrite)

// Read data manually to verify the written data
val logFiles = getLogFilesInDirectory(dir)
val logFiles = getLogFilesInDirectory(dirForTest)
assert(logFiles.size > 1)
val writtenData = logFiles.flatMap { file => readDataManually(file)}
assert(writtenData.toList === dataToWrite.toList)
}

test("WriteAheadLogManager - read rotating logs") {
// Write data manually for testing reading through manager
val dir = pathForTest
val writtenData = (1 to 10).map { i =>
val data = generateRandomData()
val file = dir + s"/log-$i-$i"
val file = dirForTest + s"/log-$i-$i"
writeDataManually(data, file)
data
}.flatten

val logDirectoryPath = new Path(dir)
val logDirectoryPath = new Path(dirForTest)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
assert(fileSystem.exists(logDirectoryPath) === true)

// Read data using manager and verify
val readData = readDataUsingManager(dir)
val readData = readDataUsingManager(dirForTest)
assert(readData.toList === writtenData.toList)
}

test("WriteAheadLogManager - recover past logs when creating new manager") {
// Write data with manager, recover with new manager and verify
val dataToWrite = generateRandomData()
val dir = pathForTest
writeDataUsingManager(dir, dataToWrite)
val logFiles = getLogFilesInDirectory(dir)
writeDataUsingManager(dirForTest, dataToWrite)
val logFiles = getLogFilesInDirectory(dirForTest)
assert(logFiles.size > 1)
val readData = readDataUsingManager(dir)
val readData = readDataUsingManager(dirForTest)
assert(dataToWrite.toList === readData.toList)
}

test("WriteAheadLogManager - cleanup old logs") {
// Write data with manager, recover with new manager and verify
val dir = pathForTest
val dataToWrite = generateRandomData()
val fakeClock = new ManualClock
val manager = new WriteAheadLogManager(dir, hadoopConf,
val manager = new WriteAheadLogManager(dirForTest, hadoopConf,
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
dataToWrite.foreach { item =>
fakeClock.addToTime(500) // half second for each
manager.writeToLog(item)
}
val logFiles = getLogFilesInDirectory(dir)
val logFiles = getLogFilesInDirectory(dirForTest)
assert(logFiles.size > 1)
manager.cleanupOldLogs(fakeClock.currentTime() / 2)
eventually(timeout(1 second), interval(10 milliseconds)) {
assert(getLogFilesInDirectory(dir).size < logFiles.size)
assert(getLogFilesInDirectory(dirForTest).size < logFiles.size)
}
}
// TODO (Hari, TD): Test different failure conditions of writers and readers.
Expand Down Expand Up @@ -305,7 +291,7 @@ object WriteAheadLogSuite {
if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
fileSystem.listStatus(logDirectoryPath).map {
_.getPath.toString
}
}.sorted
} else {
Seq.empty
}
Expand Down