Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.streaming.util
import java.io._
import java.nio.ByteBuffer

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path}

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
Expand All @@ -41,14 +41,14 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
val hadoopConf = new Configuration()
val dfsDir = Files.createTempDir()
val TEST_BUILD_DATA_KEY: String = "test.build.data"
val oldTestBuildDataProp = System.getProperty(TEST_BUILD_DATA_KEY)
val oldTestBuildDataProp = Option(System.getProperty(TEST_BUILD_DATA_KEY))
System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString)
val cluster = new MiniDFSCluster(new Configuration, 2, true, null)
val nnPort = cluster.getNameNode.getNameNodeAddress.getPort
val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/"
val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/"
var pathForTest: String = null

override def beforeAll() {
System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString)
cluster.waitActive()
}

Expand All @@ -59,7 +59,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
override def afterAll() {
cluster.shutdown()
FileUtils.deleteDirectory(dfsDir)
System.setProperty(TEST_BUILD_DATA_KEY, oldTestBuildDataProp)
oldTestBuildDataProp.foreach(System.setProperty(TEST_BUILD_DATA_KEY, _))
}

test("WriteAheadLogWriter - writing data") {
Expand All @@ -71,8 +71,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
assert(writtenData.toArray === dataToWrite.toArray)
}

test("WriteAheadLogWriter - syncing of data by writing and reading immediately using " +
"Minicluster") {
test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
val dataToWrite = generateRandomData()
val writer = new WriteAheadLogWriter(pathForTest, hadoopConf)
dataToWrite.foreach { data =>
Expand All @@ -98,7 +97,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
reader.close()
}

test("WriteAheadLogReader - sequentially reading data written with writer using Minicluster") {
test("WriteAheadLogReader - sequentially reading data written with writer") {
// Write data manually for testing the sequential reader
val dataToWrite = generateRandomData()
writeDataUsingWriter(pathForTest, dataToWrite)
Expand All @@ -124,8 +123,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
reader.close()
}

test("WriteAheadLogRandomReader - reading data using random reader written with writer using " +
"Minicluster") {
test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
// Write data using writer for testing the random reader
val data = generateRandomData()
val segments = writeDataUsingWriter(pathForTest, data)
Expand All @@ -141,24 +139,23 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte

test("WriteAheadLogManager - write rotating logs") {
// Write data using manager
val dataToWrite = generateRandomData(10)
val dataToWrite = generateRandomData()
val dir = pathForTest
writeDataUsingManager(dir, dataToWrite)

// Read data manually to verify the written data
val logFiles = getLogFilesInDirectory(dir)
assert(logFiles.size > 1)
val writtenData = logFiles.flatMap { file => readDataManually(file) }
assert(writtenData.toSet === dataToWrite.toSet)
val writtenData = logFiles.flatMap { file => readDataManually(file)}
assert(writtenData.toList === dataToWrite.toList)
}

// This one is failing right now -- commenting out for now.
test("WriteAheadLogManager - read rotating logs") {
// Write data manually for testing reading through manager
val dir = pathForTest
val writtenData = (1 to 10).map { i =>
val data = generateRandomData(10)
val file = dir + "/log-" + i
val data = generateRandomData()
val file = dir + s"/log-$i-$i"
writeDataManually(data, file)
data
}.flatten
Expand All @@ -169,12 +166,12 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte

// Read data using manager and verify
val readData = readDataUsingManager(dir)
// assert(readData.toList === writtenData.toList)
assert(readData.toList === writtenData.toList)
}

test("WriteAheadLogManager - recover past logs when creating new manager") {
// Write data with manager, recover with new manager and verify
val dataToWrite = generateRandomData(100)
val dataToWrite = generateRandomData()
val dir = pathForTest
writeDataUsingManager(dir, dataToWrite)
val logFiles = getLogFilesInDirectory(dir)
Expand All @@ -186,7 +183,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogManager - cleanup old logs") {
// Write data with manager, recover with new manager and verify
val dir = pathForTest
val dataToWrite = generateRandomData(100)
val dataToWrite = generateRandomData()
val fakeClock = new ManualClock
val manager = new WriteAheadLogManager(dir, hadoopConf,
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
Expand All @@ -201,7 +198,6 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
assert(getLogFilesInDirectory(dir).size < logFiles.size)
}
}

// TODO (Hari, TD): Test different failure conditions of writers and readers.
// - Failure while reading incomplete/corrupt file
}
Expand Down Expand Up @@ -243,8 +239,10 @@ object WriteAheadLogSuite {

def writeDataUsingManager(logDirectory: String, data: Seq[String]) {
val fakeClock = new ManualClock
fakeClock.setTime(1000000)
val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
// Ensure that 500 does not get sorted after 2000, so put a high base value.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! But this is a baaaaaaaad hidden non-determinism. Not good. I will fix it in the actually code logic, to sort semantically by parsed start time, and not alphabetically.

data.foreach { item =>
fakeClock.addToTime(500)
manager.writeToLog(item)
Expand All @@ -271,7 +269,8 @@ object WriteAheadLogSuite {
val reader = HdfsUtils.getInputStream(file, hadoopConf)
val buffer = new ArrayBuffer[String]
try {
while (true) { // Read till EOF is thrown
while (true) {
// Read till EOF is thrown
val length = reader.readInt()
val bytes = new Array[Byte](length)
reader.read(bytes)
Expand All @@ -293,8 +292,10 @@ object WriteAheadLogSuite {
data
}

def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = {
(1 to numItems).map { _.toString }
def generateRandomData(): Seq[String] = {
(1 to 50).map {
_.toString
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

itemSize is not used. Remove.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably dont need numItems either. Only used in one place, which can be removed to default.

}
}

def getLogFilesInDirectory(directory: String): Seq[String] = {
Expand Down