From 5ff90ee6842483d8fc26501a7154379015d0bf3e Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 22 Oct 2014 16:12:04 -0700 Subject: [PATCH 1/2] Fix tests to not ignore ordering and also assert all data is present --- .../streaming/util/WriteAheadLogSuite.scala | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index d93db9995fda2..f6a622a3b1f93 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.util import java.io._ import java.nio.ByteBuffer -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path} import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ @@ -41,14 +41,14 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte val hadoopConf = new Configuration() val dfsDir = Files.createTempDir() val TEST_BUILD_DATA_KEY: String = "test.build.data" - val oldTestBuildDataProp = System.getProperty(TEST_BUILD_DATA_KEY) + val oldTestBuildDataProp = Option(System.getProperty(TEST_BUILD_DATA_KEY)) + System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString) val cluster = new MiniDFSCluster(new Configuration, 2, true, null) val nnPort = cluster.getNameNode.getNameNodeAddress.getPort - val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/" + val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/" var pathForTest: String = null override def beforeAll() { - System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString) cluster.waitActive() } @@ -59,7 +59,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte override def afterAll() { cluster.shutdown() FileUtils.deleteDirectory(dfsDir) - System.setProperty(TEST_BUILD_DATA_KEY, oldTestBuildDataProp) + oldTestBuildDataProp.foreach(System.setProperty(TEST_BUILD_DATA_KEY, _)) } test("WriteAheadLogWriter - writing data") { @@ -71,8 +71,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte assert(writtenData.toArray === dataToWrite.toArray) } - test("WriteAheadLogWriter - syncing of data by writing and reading immediately using " + - "Minicluster") { + test("WriteAheadLogWriter - syncing of data by writing and reading immediately") { val dataToWrite = generateRandomData() val writer = new WriteAheadLogWriter(pathForTest, hadoopConf) dataToWrite.foreach { data => @@ -98,7 +97,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte reader.close() } - test("WriteAheadLogReader - sequentially reading data written with writer using Minicluster") { + test("WriteAheadLogReader - sequentially reading data written with writer") { // Write data manually for testing the sequential reader val dataToWrite = generateRandomData() writeDataUsingWriter(pathForTest, dataToWrite) @@ -124,8 +123,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte reader.close() } - test("WriteAheadLogRandomReader - reading data using random reader written with writer using " + - "Minicluster") { + test("WriteAheadLogRandomReader - reading data using random reader written with writer") { // Write data using writer for testing the random reader val data = generateRandomData() val segments = writeDataUsingWriter(pathForTest, data) @@ -148,17 +146,16 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte // Read data manually to verify the written data val logFiles = getLogFilesInDirectory(dir) assert(logFiles.size > 1) - val writtenData = logFiles.flatMap { file => readDataManually(file) } - assert(writtenData.toSet === dataToWrite.toSet) + val writtenData = logFiles.flatMap { file => readDataManually(file)} + assert(writtenData.toList === dataToWrite.toList) } - // This one is failing right now -- commenting out for now. test("WriteAheadLogManager - read rotating logs") { // Write data manually for testing reading through manager val dir = pathForTest val writtenData = (1 to 10).map { i => val data = generateRandomData(10) - val file = dir + "/log-" + i + val file = dir + s"/log-$i-$i" writeDataManually(data, file) data }.flatten @@ -169,7 +166,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte // Read data using manager and verify val readData = readDataUsingManager(dir) -// assert(readData.toList === writtenData.toList) + assert(readData.toList === writtenData.toList) } test("WriteAheadLogManager - recover past logs when creating new manager") { @@ -201,7 +198,6 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte assert(getLogFilesInDirectory(dir).size < logFiles.size) } } - // TODO (Hari, TD): Test different failure conditions of writers and readers. // - Failure while reading incomplete/corrupt file } @@ -271,7 +267,8 @@ object WriteAheadLogSuite { val reader = HdfsUtils.getInputStream(file, hadoopConf) val buffer = new ArrayBuffer[String] try { - while (true) { // Read till EOF is thrown + while (true) { + // Read till EOF is thrown val length = reader.readInt() val bytes = new Array[Byte](length) reader.read(bytes) @@ -294,15 +291,20 @@ object WriteAheadLogSuite { } def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = { - (1 to numItems).map { _.toString } + (1 to numItems).map { + _.toString + } } def getLogFilesInDirectory(directory: String): Seq[String] = { val logDirectoryPath = new Path(directory) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) + implicit def fileStatusOrdering[A <: FileStatus]: Ordering[A] = Ordering + .by(f => f.getModificationTime) + if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { - fileSystem.listStatus(logDirectoryPath).map { + fileSystem.listStatus(logDirectoryPath).sorted.map { _.getPath.toString } } else { From 82ce56e8e5c8bed36fb6ea7a61b5b143775f0143 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 22 Oct 2014 16:40:59 -0700 Subject: [PATCH 2/2] Fix file ordering issue in WALManager tests --- .../streaming/util/WriteAheadLogSuite.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index f6a622a3b1f93..21a2c6a58b0c1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -139,7 +139,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogManager - write rotating logs") { // Write data using manager - val dataToWrite = generateRandomData(10) + val dataToWrite = generateRandomData() val dir = pathForTest writeDataUsingManager(dir, dataToWrite) @@ -154,7 +154,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte // Write data manually for testing reading through manager val dir = pathForTest val writtenData = (1 to 10).map { i => - val data = generateRandomData(10) + val data = generateRandomData() val file = dir + s"/log-$i-$i" writeDataManually(data, file) data @@ -171,7 +171,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogManager - recover past logs when creating new manager") { // Write data with manager, recover with new manager and verify - val dataToWrite = generateRandomData(100) + val dataToWrite = generateRandomData() val dir = pathForTest writeDataUsingManager(dir, dataToWrite) val logFiles = getLogFilesInDirectory(dir) @@ -183,7 +183,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogManager - cleanup old logs") { // Write data with manager, recover with new manager and verify val dir = pathForTest - val dataToWrite = generateRandomData(100) + val dataToWrite = generateRandomData() val fakeClock = new ManualClock val manager = new WriteAheadLogManager(dir, hadoopConf, rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) @@ -239,8 +239,10 @@ object WriteAheadLogSuite { def writeDataUsingManager(logDirectory: String, data: Seq[String]) { val fakeClock = new ManualClock + fakeClock.setTime(1000000) val manager = new WriteAheadLogManager(logDirectory, hadoopConf, rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) + // Ensure that 500 does not get sorted after 2000, so put a high base value. data.foreach { item => fakeClock.addToTime(500) manager.writeToLog(item) @@ -290,8 +292,8 @@ object WriteAheadLogSuite { data } - def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = { - (1 to numItems).map { + def generateRandomData(): Seq[String] = { + (1 to 50).map { _.toString } } @@ -300,11 +302,8 @@ object WriteAheadLogSuite { val logDirectoryPath = new Path(directory) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) - implicit def fileStatusOrdering[A <: FileStatus]: Ordering[A] = Ordering - .by(f => f.getModificationTime) - if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { - fileSystem.listStatus(logDirectoryPath).sorted.map { + fileSystem.listStatus(logDirectoryPath).map { _.getPath.toString } } else {