From 6f11db4f813b596789b36fcc141713b6343abac4 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Sun, 5 Oct 2014 00:29:07 -0700 Subject: [PATCH 01/14] Initial Implementation of HDFS based RDD --- .../storage/HDFSBackedBlockRDD.scala | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala new file mode 100644 index 0000000000000..a0bb3d8399109 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.serializer.Deserializer + +import org.apache.spark.rdd.BlockRDD +import org.apache.spark.storage.BlockId +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} + +private[spark] +class HDFSBackedBlockRDDPartition(val blockId: BlockId, idx: Int, + val segment: FileSegment) extends Partition { + val index = idx +} + +private[spark] +class HDFSBackedBlockRDD[T: ClassTag]( + @transient sc: SparkContext, + @transient hadoopConf: Configuration, + @transient override val blockIds: Array[BlockId], + @transient val segments: Array[FileSegment] + ) extends BlockRDD[T](sc, blockIds) { + + override def getPartitions: Array[Partition] = { + assertValid() + var i = 0 + (0 until blockIds.size).map { i => + new HDFSBackedBlockRDDPartition(blockIds(i), i, segments(i)) + }.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + assertValid() + val blockManager = SparkEnv.get.blockManager + val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] + val blockId = partition.blockId + blockManager.get(blockId) match { + // Data is in Block Manager, grab it from there. + case Some(block) => block.data.asInstanceOf[Iterator[T]] + // Data not found in Block Manager, grab it from HDFS + case None => + // Perhaps we should cache readers at some point + val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf) + blockManager.dataDeserialize(blockId, reader.read(partition.segment)) + .asInstanceOf[Iterator[T]] + } + } +} From 8b1b29cf4bee9a2922904adfaff84a0af70c9dc8 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Sun, 5 Oct 2014 00:43:52 -0700 Subject: [PATCH 02/14] Close reader when done. --- .../spark/streaming/storage/HDFSBackedBlockRDD.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala index a0bb3d8399109..51c96e4bee270 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala @@ -57,10 +57,13 @@ class HDFSBackedBlockRDD[T: ClassTag]( case Some(block) => block.data.asInstanceOf[Iterator[T]] // Data not found in Block Manager, grab it from HDFS case None => - // Perhaps we should cache readers at some point + // TODO: Perhaps we should cache readers at some point? val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf) - blockManager.dataDeserialize(blockId, reader.read(partition.segment)) - .asInstanceOf[Iterator[T]] + val dataRead = reader.read(partition.segment) + reader.close() + blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] + // TODO: Should we put the data back into the Block Manager? How to get the persistance + // TODO: level though } } } From bf0ac9b92d088b5df2cbf718083ccee0c6d2584a Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Sun, 5 Oct 2014 00:48:39 -0700 Subject: [PATCH 03/14] Minor fixes --- .../apache/spark/streaming/storage/HDFSBackedBlockRDD.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala index 51c96e4bee270..caaf0f635690e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala @@ -19,15 +19,14 @@ package org.apache.spark.streaming.storage import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.serializer.Deserializer import org.apache.spark.rdd.BlockRDD import org.apache.spark.storage.BlockId import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} private[spark] -class HDFSBackedBlockRDDPartition(val blockId: BlockId, idx: Int, - val segment: FileSegment) extends Partition { +class HDFSBackedBlockRDDPartition(val blockId: BlockId, idx: Int,val segment: FileSegment) + extends Partition { val index = idx } @@ -41,7 +40,6 @@ class HDFSBackedBlockRDD[T: ClassTag]( override def getPartitions: Array[Partition] = { assertValid() - var i = 0 (0 until blockIds.size).map { i => new HDFSBackedBlockRDDPartition(blockIds(i), i, segments(i)) }.toArray From 6fc4cd877aafa4f9a27b3f2b351c0040a1fb4e5a Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Sun, 5 Oct 2014 00:50:53 -0700 Subject: [PATCH 04/14] Minor formatting changes --- .../org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala index caaf0f635690e..debeede6d69f0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala @@ -25,7 +25,7 @@ import org.apache.spark.storage.BlockId import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} private[spark] -class HDFSBackedBlockRDDPartition(val blockId: BlockId, idx: Int,val segment: FileSegment) +class HDFSBackedBlockRDDPartition(val blockId: BlockId, idx: Int, val segment: FileSegment) extends Partition { val index = idx } From 7b492162f1eb75d7970c685f1ad7d81c15ad32d1 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Mon, 6 Oct 2014 00:48:55 -0700 Subject: [PATCH 05/14] Put data back into BlockManager once retrieved from HDFS. --- .../spark/streaming/storage/HDFSBackedBlockRDD.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala index debeede6d69f0..e7185bc663468 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala @@ -21,7 +21,7 @@ import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration import org.apache.spark.rdd.BlockRDD -import org.apache.spark.storage.BlockId +import org.apache.spark.storage.{BlockManager, StorageLevel, BlockId} import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} private[spark] @@ -35,7 +35,8 @@ class HDFSBackedBlockRDD[T: ClassTag]( @transient sc: SparkContext, @transient hadoopConf: Configuration, @transient override val blockIds: Array[BlockId], - @transient val segments: Array[FileSegment] + @transient val segments: Array[FileSegment], + @transient val persistance: StorageLevel ) extends BlockRDD[T](sc, blockIds) { override def getPartitions: Array[Partition] = { @@ -59,9 +60,9 @@ class HDFSBackedBlockRDD[T: ClassTag]( val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf) val dataRead = reader.read(partition.segment) reader.close() - blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] - // TODO: Should we put the data back into the Block Manager? How to get the persistance - // TODO: level though + val data = blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] + blockManager.putIterator(blockId, data, persistance) + data } } } From 6be55a8c26d11c4871765aba156c102bcd912722 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Mon, 6 Oct 2014 11:53:19 -0700 Subject: [PATCH 06/14] Make hadoopConf and storageLevel non-transient. --- .../spark/streaming/storage/HDFSBackedBlockRDD.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala index e7185bc663468..b46334c3dc500 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala @@ -21,7 +21,7 @@ import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration import org.apache.spark.rdd.BlockRDD -import org.apache.spark.storage.{BlockManager, StorageLevel, BlockId} +import org.apache.spark.storage.{StorageLevel, BlockId} import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} private[spark] @@ -33,10 +33,10 @@ class HDFSBackedBlockRDDPartition(val blockId: BlockId, idx: Int, val segment: F private[spark] class HDFSBackedBlockRDD[T: ClassTag]( @transient sc: SparkContext, - @transient hadoopConf: Configuration, + hadoopConf: Configuration, @transient override val blockIds: Array[BlockId], @transient val segments: Array[FileSegment], - @transient val persistance: StorageLevel + val storageLevel: StorageLevel ) extends BlockRDD[T](sc, blockIds) { override def getPartitions: Array[Partition] = { @@ -61,7 +61,7 @@ class HDFSBackedBlockRDD[T: ClassTag]( val dataRead = reader.read(partition.segment) reader.close() val data = blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] - blockManager.putIterator(blockId, data, persistance) + blockManager.putIterator(blockId, data, storageLevel) data } } From 15d05d4e951178326e264387a41e6f3eef4c5909 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Mon, 6 Oct 2014 12:58:53 -0700 Subject: [PATCH 07/14] Overriding getPreferredLocs --- .../apache/spark/streaming/storage/HDFSBackedBlockRDD.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala index b46334c3dc500..caa710205c889 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala @@ -65,4 +65,8 @@ class HDFSBackedBlockRDD[T: ClassTag]( data } } + + override def getPreferredLocations(split: Partition): Seq[String] = { + locations_.getOrElse(split.asInstanceOf[HDFSBackedBlockRDDPartition].blockId, Seq.empty[String]) + } } From 098cbd1b1653611530680f56ab9b5011f8c7cb33 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Mon, 6 Oct 2014 15:29:05 -0700 Subject: [PATCH 08/14] HDFS Tests initial commit --- .../storage/HDFSBackedBlockRDDSuite.scala | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala new file mode 100644 index 0000000000000..745a557cfd4c8 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io.File + +import com.google.common.io.Files +import org.apache.commons.lang.RandomStringUtils +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.SparkContext +import org.apache.spark.streaming.TestSuiteBase + +class HDFSBackedBlockRDDSuite extends TestSuiteBase { + val sparkContext = new SparkContext(conf) + val hadoopConf = new Configuration() + + test("block manager has data") { + val dir = Files.createTempDir() + val file = new File(dir, "BlockManagerWrite") + + } + + private def writeDataToHDFS(count: Int, file: File): Seq[String] = { + val str: Seq[String] = for (i <- 1 to count) yield RandomStringUtils.random(50) + + var writerOpt: Option[WriteAheadLogWriter] = None + try { + writerOpt = Some(new WriteAheadLogWriter(file.toString, hadoopConf)) + val writer = writerOpt.get + + } finally { + writerOpt.foreach(_.close()) + } + str + } + + + +} From 389acea49889146e48fc4d4f0fdeb47163801416 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Mon, 6 Oct 2014 23:42:24 -0700 Subject: [PATCH 09/14] HDFS Block RDD must create a new iterator to return to caller. Added unit tests for the RDD. --- .../storage/HDFSBackedBlockRDD.scala | 6 +- .../storage/HDFSBackedBlockRDDSuite.scala | 106 ++++++++++++++++-- 2 files changed, 97 insertions(+), 15 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala index caa710205c889..81462f2ca9d31 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala @@ -60,9 +60,9 @@ class HDFSBackedBlockRDD[T: ClassTag]( val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf) val dataRead = reader.read(partition.segment) reader.close() - val data = blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] - blockManager.putIterator(blockId, data, storageLevel) - data + val data = blockManager.dataDeserialize(blockId, dataRead).toIterable + blockManager.putIterator(blockId, data.iterator, storageLevel) + data.iterator.asInstanceOf[Iterator[T]] } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala index 745a557cfd4c8..9b7afdcf73d94 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala @@ -17,38 +17,120 @@ package org.apache.spark.streaming.storage import java.io.File +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer import com.google.common.io.Files import org.apache.commons.lang.RandomStringUtils import org.apache.hadoop.conf.Configuration +import org.scalatest.BeforeAndAfter -import org.apache.spark.SparkContext +import org.apache.spark.{SparkEnv, TaskContext, SparkContext} +import org.apache.spark.storage.{BlockId, StreamBlockId, StorageLevel} import org.apache.spark.streaming.TestSuiteBase -class HDFSBackedBlockRDDSuite extends TestSuiteBase { +class HDFSBackedBlockRDDSuite extends TestSuiteBase with BeforeAndAfter { val sparkContext = new SparkContext(conf) val hadoopConf = new Configuration() + val blockIdCounter = new AtomicInteger(0) + val streamCounter = new AtomicInteger(0) + val blockManager = sparkContext.env.blockManager + var file: File = null + var dir: File = null + + override def beforeFunction() { + super.beforeFunction() + dir = Files.createTempDir() + file = new File(dir, "BlockManagerWrite") + } + + override def afterFunction() { + super.afterFunction() + file.delete() + dir.delete() + } + + test("Verify all data is available when part of the data is only on HDFS") { + doTestHDFSWrites(writeAllToBM = false, 20, 5) + } + + test("Verify all data is available when all data is in BM") { + doTestHDFSWrites(writeAllToBM = true, 20, 5) + } + + test("Verify all data is available when part of the data is in BM with one string per block") { + doTestHDFSWrites(writeAllToBM = false, 20, 20) + } - test("block manager has data") { - val dir = Files.createTempDir() - val file = new File(dir, "BlockManagerWrite") + test("Verify all data is available when all data is in BM with one string per block") { + doTestHDFSWrites(writeAllToBM = true, 20, 20) + } + + /** + * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the + * BlockManager, so all reads need not happen from HDFS. + * @param writeAllToBM - If true, all data is written to BlockManager + * @param total - Total number of Strings to write + * @param blockCount - Number of blocks to write (therefore, total # of events per block = + * total/blockCount + */ + private def doTestHDFSWrites(writeAllToBM: Boolean, total: Int, blockCount: Int) { + val countPerBlock = total / blockCount + val blockIds = (0 until blockCount).map { _ => + StreamBlockId(streamCounter.incrementAndGet(), blockIdCounter.incrementAndGet()) + } + + val (writtenStrings, segments) = writeDataToHDFS(total, countPerBlock, file, blockIds) + + for (i <- 0 until writtenStrings.length) { + if (i % 2 == 0 || writeAllToBM) { + blockManager.putIterator(blockIds(i), writtenStrings(i).iterator, + StorageLevel.MEMORY_ONLY) + } + } + val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray, + segments.toArray, StorageLevel.MEMORY_ONLY) + val partitions = rdd.getPartitions + // The task context is not used in this RDD directly, so ok to be null + val dataFromRDD = partitions.flatMap(rdd.compute(_, null)) + assert(writtenStrings.flatten === dataFromRDD) } - private def writeDataToHDFS(count: Int, file: File): Seq[String] = { - val str: Seq[String] = for (i <- 1 to count) yield RandomStringUtils.random(50) + /** + * Write data to HDFS and get a list of Seq of Seqs in which each Seq represents the data that + * went into one block. + * @param count - Number of Strings to write + * @param countPerBlock - Number of Strings per block + * @param file - The file to write to + * @param blockIds - List of block ids to use. + * @return - Tuple of (Seq of Seqs, each of these Seqs is one block, Seq of FileSegments, + * each representing the block being written to HDFS. + */ + private def writeDataToHDFS( + count: Int, + countPerBlock: Int, + file: File, + blockIds: Seq[BlockId] + ): (Seq[Seq[String]], Seq[FileSegment]) = { + + val strings: Seq[String] = (0 until count).map(_ => RandomStringUtils.randomAlphabetic(50)) var writerOpt: Option[WriteAheadLogWriter] = None try { writerOpt = Some(new WriteAheadLogWriter(file.toString, hadoopConf)) val writer = writerOpt.get - + val blockData = + 0.until(count, countPerBlock).map(y => (0 until countPerBlock).map(x => strings(x + y))) + val blockIdIter = blockIds.iterator + (blockData, blockData.map { + x => + writer.write(blockManager.dataSerialize(blockIdIter.next(), x.iterator)) + }) } finally { writerOpt.foreach(_.close()) } - str } - - - } From e70d390fec954595f4cf08ab5b1204a844815095 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 7 Oct 2014 00:00:30 -0700 Subject: [PATCH 10/14] Don't extend BeforeAndAfter --- .../spark/streaming/storage/HDFSBackedBlockRDDSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala index 9b7afdcf73d94..1d9bcc0346a3a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.{SparkEnv, TaskContext, SparkContext} import org.apache.spark.storage.{BlockId, StreamBlockId, StorageLevel} import org.apache.spark.streaming.TestSuiteBase -class HDFSBackedBlockRDDSuite extends TestSuiteBase with BeforeAndAfter { +class HDFSBackedBlockRDDSuite extends TestSuiteBase { val sparkContext = new SparkContext(conf) val hadoopConf = new Configuration() val blockIdCounter = new AtomicInteger(0) From cf397501458bfdcf3490b334e2d45e05e7bfdb3c Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 7 Oct 2014 12:42:43 -0700 Subject: [PATCH 11/14] Refactor tests for HDFS backed RDD --- .../storage/HDFSBackedBlockRDD.scala | 28 +++++++++++++++++-- .../storage/HDFSBackedBlockRDDSuite.scala | 14 ++++++++-- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala index 81462f2ca9d31..c9b5075f614b6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.streaming.storage +import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration @@ -39,6 +40,18 @@ class HDFSBackedBlockRDD[T: ClassTag]( val storageLevel: StorageLevel ) extends BlockRDD[T](sc, blockIds) { + private var isTest = false + private var bmList: ArrayBuffer[Iterable[T]] = ArrayBuffer.empty[Iterable[T]] + + private [storage] def test() { + isTest = true + bmList = new ArrayBuffer[Iterable[T]]() + } + + private [storage] def getBmList: ArrayBuffer[Iterable[T]] = { + bmList + } + override def getPartitions: Array[Partition] = { assertValid() (0 until blockIds.size).map { i => @@ -48,18 +61,29 @@ class HDFSBackedBlockRDD[T: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[T] = { assertValid() - val blockManager = SparkEnv.get.blockManager + val blockManager = sc.env.blockManager val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] val blockId = partition.blockId blockManager.get(blockId) match { // Data is in Block Manager, grab it from there. - case Some(block) => block.data.asInstanceOf[Iterator[T]] + case Some(block) => + val data = block.data.asInstanceOf[Iterator[T]] + if (isTest) { + val dataCopies = data.duplicate + bmList += dataCopies._1.toIterable + dataCopies._2 + } else { + data + } // Data not found in Block Manager, grab it from HDFS case None => // TODO: Perhaps we should cache readers at some point? val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf) val dataRead = reader.read(partition.segment) reader.close() + // Should we make it configurable whether we want to insert data into BM? If we don't + // need to insert it into BM we can avoid duplicating the iterator. This is the only + // option since each of val data = blockManager.dataDeserialize(blockId, dataRead).toIterable blockManager.putIterator(blockId, data.iterator, storageLevel) data.iterator.asInstanceOf[Iterator[T]] diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala index 1d9bcc0346a3a..de5615cc2c21b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala @@ -84,8 +84,10 @@ class HDFSBackedBlockRDDSuite extends TestSuiteBase { val (writtenStrings, segments) = writeDataToHDFS(total, countPerBlock, file, blockIds) + val writtenToBM = new ArrayBuffer[Iterable[String]]() for (i <- 0 until writtenStrings.length) { if (i % 2 == 0 || writeAllToBM) { + writtenToBM += writtenStrings(i) blockManager.putIterator(blockIds(i), writtenStrings(i).iterator, StorageLevel.MEMORY_ONLY) } @@ -93,10 +95,18 @@ class HDFSBackedBlockRDDSuite extends TestSuiteBase { val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray, segments.toArray, StorageLevel.MEMORY_ONLY) + rdd.test() val partitions = rdd.getPartitions // The task context is not used in this RDD directly, so ok to be null - val dataFromRDD = partitions.flatMap(rdd.compute(_, null)) - assert(writtenStrings.flatten === dataFromRDD) + val dataFromRDD = partitions.map(rdd.compute(_, null)) + val copiedData = dataFromRDD.map(_.duplicate) + // verify each partition is equal to the data pulled out + for(i <- 0 until writtenStrings.length) { + assert(writtenStrings(i) === copiedData(i)._1.toIterable) + } + assert(writtenStrings.flatten === copiedData.map(_._2.toIterable).flatten) + assert(writtenToBM === rdd.getBmList) + assert(writtenToBM.flatten === rdd.getBmList.flatten) } /** From ea227f0e35a19a5417979635cb4ad819fd865a15 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 7 Oct 2014 22:54:24 -0700 Subject: [PATCH 12/14] Broadcast the Hadoop configuration. Refactor the tests to use rdd.collect() --- .../scala/org/apache/spark/rdd/BlockRDD.scala | 4 + .../spark/streaming/storage/FileSegment.scala | 1 + .../spark/streaming/storage/HdfsUtils.scala | 11 ++ .../{ => rdd}/HDFSBackedBlockRDD.scala | 63 +++++----- .../{ => rdd}/HDFSBackedBlockRDDSuite.scala | 115 ++++++++---------- 5 files changed, 94 insertions(+), 100 deletions(-) rename streaming/src/main/scala/org/apache/spark/streaming/storage/{ => rdd}/HDFSBackedBlockRDD.scala (58%) rename streaming/src/test/scala/org/apache/spark/streaming/storage/{ => rdd}/HDFSBackedBlockRDDSuite.scala (50%) diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index 2673ec22509e9..03d406654d8e3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -84,5 +84,9 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds "Attempted to use %s after its blocks have been removed!".format(toString)) } } + + protected def getBlockIdLocations: Map[BlockId, Seq[String]] = { + locations_ + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala index eb9c07e9cf61f..2ee92c1ffc12c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala @@ -16,4 +16,5 @@ */ package org.apache.spark.streaming.storage + private[streaming] case class FileSegment (path: String, offset: Long, length: Int) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala index 079b2fef904a0..62724a17ce531 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala @@ -59,4 +59,15 @@ private[streaming] object HdfsUtils { } } + def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = { + val dfsPath = new Path(path) + val dfs = + this.synchronized { + dfsPath.getFileSystem(conf) + } + val fileStatus = dfs.getFileStatus(dfsPath) + val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)) + blockLocs.map(_.flatMap(_.getHosts)) + } + } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDD.scala similarity index 58% rename from streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala rename to streaming/src/main/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDD.scala index c9b5075f614b6..b67554bbb405c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDD.scala @@ -14,44 +14,41 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.storage.rdd -import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration +import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.BlockRDD -import org.apache.spark.storage.{StorageLevel, BlockId} -import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.streaming.storage.{FileSegment, HdfsUtils, WriteAheadLogRandomReader} +import org.apache.spark._ -private[spark] +private[streaming] class HDFSBackedBlockRDDPartition(val blockId: BlockId, idx: Int, val segment: FileSegment) extends Partition { val index = idx } -private[spark] +private[streaming] class HDFSBackedBlockRDD[T: ClassTag]( @transient sc: SparkContext, - hadoopConf: Configuration, + @transient hadoopConfiguration: Configuration, @transient override val blockIds: Array[BlockId], @transient val segments: Array[FileSegment], + val storeInBlockManager: Boolean, val storageLevel: StorageLevel ) extends BlockRDD[T](sc, blockIds) { - private var isTest = false - private var bmList: ArrayBuffer[Iterable[T]] = ArrayBuffer.empty[Iterable[T]] - - private [storage] def test() { - isTest = true - bmList = new ArrayBuffer[Iterable[T]]() - } - - private [storage] def getBmList: ArrayBuffer[Iterable[T]] = { - bmList + if (blockIds.length != segments.length) { + throw new IllegalStateException("Number of block ids must be the same as number of segments!") } + // Hadoop Configuration is not serializable, so broadcast it as a serializable. + val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration)) + .asInstanceOf[Broadcast[SerializableWritable[Configuration]]] override def getPartitions: Array[Partition] = { assertValid() (0 until blockIds.size).map { i => @@ -61,36 +58,34 @@ class HDFSBackedBlockRDD[T: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[T] = { assertValid() - val blockManager = sc.env.blockManager + val hadoopConf = broadcastedHadoopConf.value.value + val blockManager = SparkEnv.get.blockManager val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] val blockId = partition.blockId blockManager.get(blockId) match { // Data is in Block Manager, grab it from there. case Some(block) => - val data = block.data.asInstanceOf[Iterator[T]] - if (isTest) { - val dataCopies = data.duplicate - bmList += dataCopies._1.toIterable - dataCopies._2 - } else { - data - } + block.data.asInstanceOf[Iterator[T]] // Data not found in Block Manager, grab it from HDFS case None => - // TODO: Perhaps we should cache readers at some point? val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf) val dataRead = reader.read(partition.segment) reader.close() - // Should we make it configurable whether we want to insert data into BM? If we don't - // need to insert it into BM we can avoid duplicating the iterator. This is the only - // option since each of - val data = blockManager.dataDeserialize(blockId, dataRead).toIterable - blockManager.putIterator(blockId, data.iterator, storageLevel) - data.iterator.asInstanceOf[Iterator[T]] + // Currently, we support storing the data to BM only in serialized form and not in + // deserialized form + if (storeInBlockManager) { + blockManager.putBytes(blockId, dataRead, storageLevel) + } + dataRead.rewind() + blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] } } override def getPreferredLocations(split: Partition): Seq[String] = { - locations_.getOrElse(split.asInstanceOf[HDFSBackedBlockRDDPartition].blockId, Seq.empty[String]) + val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] + val locations = getBlockIdLocations + locations.getOrElse(partition.blockId, + HdfsUtils.getBlockLocations(partition.segment.path, hadoopConfiguration) + .getOrElse(new Array[String](0)).toSeq) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDDSuite.scala similarity index 50% rename from streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala rename to streaming/src/test/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDDSuite.scala index de5615cc2c21b..0cbf970b187d8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/HDFSBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDDSuite.scala @@ -14,60 +14,64 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.storage.rdd import java.io.File -import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer import com.google.common.io.Files -import org.apache.commons.lang.RandomStringUtils import org.apache.hadoop.conf.Configuration -import org.scalatest.BeforeAndAfter +import org.scalatest.{BeforeAndAfter, FunSuite} -import org.apache.spark.{SparkEnv, TaskContext, SparkContext} -import org.apache.spark.storage.{BlockId, StreamBlockId, StorageLevel} -import org.apache.spark.streaming.TestSuiteBase +import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} +import org.apache.spark.streaming.storage.{FileSegment, WriteAheadLogWriter} +import org.apache.spark.{SparkConf, SparkContext} -class HDFSBackedBlockRDDSuite extends TestSuiteBase { +class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { + // Name of the framework for Spark context + def framework = this.getClass.getSimpleName + + // Master for Spark context + def master = "local[2]" + + val conf = new SparkConf() + .setMaster(master) + .setAppName(framework) val sparkContext = new SparkContext(conf) val hadoopConf = new Configuration() - val blockIdCounter = new AtomicInteger(0) - val streamCounter = new AtomicInteger(0) val blockManager = sparkContext.env.blockManager var file: File = null var dir: File = null - override def beforeFunction() { - super.beforeFunction() + before { dir = Files.createTempDir() file = new File(dir, "BlockManagerWrite") } - override def afterFunction() { - super.afterFunction() + after { file.delete() dir.delete() } - test("Verify all data is available when part of the data is only on HDFS") { - doTestHDFSWrites(writeAllToBM = false, 20, 5) + test("Verify all data is available when all data is in BM") { + doTestHDFSBackedRDD(writeAllToBM = true, 20, 5) } - test("Verify all data is available when all data is in BM") { - doTestHDFSWrites(writeAllToBM = true, 20, 5) + + test("Verify all data is available when all data is in BM with one string per block") { + doTestHDFSBackedRDD(writeAllToBM = true, 20, 20) } - test("Verify all data is available when part of the data is in BM with one string per block") { - doTestHDFSWrites(writeAllToBM = false, 20, 20) + test("Verify all data is available no data is in BM") { + doTestHDFSBackedRDD(writeAllToBM = false, 20, 5) } - test("Verify all data is available when all data is in BM with one string per block") { - doTestHDFSWrites(writeAllToBM = true, 20, 20) + test("Verify all data is available no data is in BM with one string per block") { + doTestHDFSBackedRDD(writeAllToBM = false, 20, 20) } + /** * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the * BlockManager, so all reads need not happen from HDFS. @@ -76,37 +80,19 @@ class HDFSBackedBlockRDDSuite extends TestSuiteBase { * @param blockCount - Number of blocks to write (therefore, total # of events per block = * total/blockCount */ - private def doTestHDFSWrites(writeAllToBM: Boolean, total: Int, blockCount: Int) { + private def doTestHDFSBackedRDD(writeAllToBM: Boolean, total: Int, blockCount: Int) { val countPerBlock = total / blockCount - val blockIds = (0 until blockCount).map { _ => - StreamBlockId(streamCounter.incrementAndGet(), blockIdCounter.incrementAndGet()) - } + val blockIds = (0 until blockCount).map(i => StreamBlockId(i, i)) val (writtenStrings, segments) = writeDataToHDFS(total, countPerBlock, file, blockIds) - val writtenToBM = new ArrayBuffer[Iterable[String]]() - for (i <- 0 until writtenStrings.length) { - if (i % 2 == 0 || writeAllToBM) { - writtenToBM += writtenStrings(i) - blockManager.putIterator(blockIds(i), writtenStrings(i).iterator, - StorageLevel.MEMORY_ONLY) - } - } - val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray, - segments.toArray, StorageLevel.MEMORY_ONLY) - rdd.test() - val partitions = rdd.getPartitions + segments.toArray, false, StorageLevel.MEMORY_ONLY) + // The task context is not used in this RDD directly, so ok to be null - val dataFromRDD = partitions.map(rdd.compute(_, null)) - val copiedData = dataFromRDD.map(_.duplicate) + val dataFromRDD = rdd.collect() // verify each partition is equal to the data pulled out - for(i <- 0 until writtenStrings.length) { - assert(writtenStrings(i) === copiedData(i)._1.toIterable) - } - assert(writtenStrings.flatten === copiedData.map(_._2.toIterable).flatten) - assert(writtenToBM === rdd.getBmList) - assert(writtenToBM.flatten === rdd.getBmList.flatten) + assert(writtenStrings.flatten === dataFromRDD) } /** @@ -120,27 +106,24 @@ class HDFSBackedBlockRDDSuite extends TestSuiteBase { * each representing the block being written to HDFS. */ private def writeDataToHDFS( - count: Int, - countPerBlock: Int, - file: File, - blockIds: Seq[BlockId] + count: Int, + countPerBlock: Int, + file: File, + blockIds: Seq[BlockId] ): (Seq[Seq[String]], Seq[FileSegment]) = { - val strings: Seq[String] = (0 until count).map(_ => RandomStringUtils.randomAlphabetic(50)) - - var writerOpt: Option[WriteAheadLogWriter] = None - try { - writerOpt = Some(new WriteAheadLogWriter(file.toString, hadoopConf)) - val writer = writerOpt.get - val blockData = - 0.until(count, countPerBlock).map(y => (0 until countPerBlock).map(x => strings(x + y))) - val blockIdIter = blockIds.iterator - (blockData, blockData.map { - x => - writer.write(blockManager.dataSerialize(blockIdIter.next(), x.iterator)) - }) - } finally { - writerOpt.foreach(_.close()) + assert(count / countPerBlock === blockIds.size) + + val strings = (0 until count).map { _ => scala.util.Random.nextString(50)} + + val writer = new WriteAheadLogWriter(file.toString, hadoopConf) + val blockData = strings.grouped(countPerBlock).toSeq + val segments = new ArrayBuffer[FileSegment]() + blockData.zip(blockIds).foreach { + case (data, id) => + segments += writer.write(blockManager.dataSerialize(id, data.iterator)) } + writer.close() + (blockData, segments.toSeq) } } From c4211d7cc7e20546f03cfbb4a056a60fe40438df Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 8 Oct 2014 13:29:40 -0700 Subject: [PATCH 13/14] Update tests to test conditions where data is not read from HDFS at all --- .../scala/org/apache/spark/rdd/BlockRDD.scala | 2 +- .../spark/streaming/storage/FileSegment.scala | 1 - .../spark/streaming/storage/HdfsUtils.scala | 1 - .../storage/rdd/HDFSBackedBlockRDD.scala | 2 +- .../storage/rdd/HDFSBackedBlockRDDSuite.scala | 88 +++++++++++++------ 5 files changed, 61 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index 03d406654d8e3..fffa1911f5bc2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -85,7 +85,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds } } - protected def getBlockIdLocations: Map[BlockId, Seq[String]] = { + protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = { locations_ } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala index 2ee92c1ffc12c..eb9c07e9cf61f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala @@ -16,5 +16,4 @@ */ package org.apache.spark.streaming.storage - private[streaming] case class FileSegment (path: String, offset: Long, length: Int) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala index 62724a17ce531..efb12b82ae949 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala @@ -69,5 +69,4 @@ private[streaming] object HdfsUtils { val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)) blockLocs.map(_.flatMap(_.getHosts)) } - } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDD.scala index b67554bbb405c..c672574ee2ed4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDD.scala @@ -83,7 +83,7 @@ class HDFSBackedBlockRDD[T: ClassTag]( override def getPreferredLocations(split: Partition): Seq[String] = { val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] - val locations = getBlockIdLocations + val locations = getBlockIdLocations() locations.getOrElse(partition.blockId, HdfsUtils.getBlockLocations(partition.segment.path, hadoopConfiguration) .getOrElse(new Array[String](0)).toSeq) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDDSuite.scala index 0cbf970b187d8..7a83b98f7feb5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDDSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.storage.rdd import java.io.File +import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer @@ -41,6 +42,8 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { val sparkContext = new SparkContext(conf) val hadoopConf = new Configuration() val blockManager = sparkContext.env.blockManager + // Since the same BM is reused in all tests, use an atomic int to generate ids + val idGenerator = new AtomicInteger(0) var file: File = null var dir: File = null @@ -54,42 +57,64 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { dir.delete() } - test("Verify all data is available when all data is in BM") { - doTestHDFSBackedRDD(writeAllToBM = true, 20, 5) + test("Verify all data is available when all data is in BM and HDFS") { + doTestHDFSBackedRDD(5, 5, 20, 5) } - - test("Verify all data is available when all data is in BM with one string per block") { - doTestHDFSBackedRDD(writeAllToBM = true, 20, 20) + test("Verify all data is available when all data is in BM but not in HDFS") { + doTestHDFSBackedRDD(5, 0, 20, 5) } - test("Verify all data is available no data is in BM") { - doTestHDFSBackedRDD(writeAllToBM = false, 20, 5) + test("Verify all data is available when all data is in HDFS and no data is in BM") { + doTestHDFSBackedRDD(0, 5, 20, 5) } - test("Verify all data is available no data is in BM with one string per block") { - doTestHDFSBackedRDD(writeAllToBM = false, 20, 20) - } + test("Verify part of the data is in BM, and the remaining in HDFS") { + doTestHDFSBackedRDD(3, 2, 20, 5) + } /** * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the * BlockManager, so all reads need not happen from HDFS. - * @param writeAllToBM - If true, all data is written to BlockManager * @param total - Total number of Strings to write * @param blockCount - Number of blocks to write (therefore, total # of events per block = * total/blockCount */ - private def doTestHDFSBackedRDD(writeAllToBM: Boolean, total: Int, blockCount: Int) { + private def doTestHDFSBackedRDD( + writeToBMCount: Int, + writeToHDFSCount: Int, + total: Int, + blockCount: Int + ) { val countPerBlock = total / blockCount - val blockIds = (0 until blockCount).map(i => StreamBlockId(i, i)) + val blockIds = (0 until blockCount).map { + i => + StreamBlockId(idGenerator.incrementAndGet(), idGenerator.incrementAndGet()) + } + + val writtenStrings = generateData(total, countPerBlock) - val (writtenStrings, segments) = writeDataToHDFS(total, countPerBlock, file, blockIds) + if (writeToBMCount != 0) { + (0 until writeToBMCount).foreach { i => + blockManager + .putIterator(blockIds(i), writtenStrings(i).iterator, StorageLevel.MEMORY_ONLY_SER) + } + } + + val segments = new ArrayBuffer[FileSegment] + if (writeToHDFSCount != 0) { + // Generate some fake segments for the blocks in BM so the RDD does not complain + segments ++= generateFakeSegments(writeToBMCount) + segments ++= writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount), + blockIds.slice(writeToBMCount, blockCount)) + } else { + segments ++= generateFakeSegments(blockCount) + } val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray, segments.toArray, false, StorageLevel.MEMORY_ONLY) - // The task context is not used in this RDD directly, so ok to be null val dataFromRDD = rdd.collect() // verify each partition is equal to the data pulled out assert(writtenStrings.flatten === dataFromRDD) @@ -100,30 +125,35 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { * went into one block. * @param count - Number of Strings to write * @param countPerBlock - Number of Strings per block - * @param file - The file to write to - * @param blockIds - List of block ids to use. * @return - Tuple of (Seq of Seqs, each of these Seqs is one block, Seq of FileSegments, * each representing the block being written to HDFS. */ - private def writeDataToHDFS( - count: Int, - countPerBlock: Int, - file: File, - blockIds: Seq[BlockId] - ): (Seq[Seq[String]], Seq[FileSegment]) = { - - assert(count / countPerBlock === blockIds.size) - + private def generateData( + count: Int, + countPerBlock: Int + ): Seq[Seq[String]] = { val strings = (0 until count).map { _ => scala.util.Random.nextString(50)} + strings.grouped(countPerBlock).toSeq + } - val writer = new WriteAheadLogWriter(file.toString, hadoopConf) - val blockData = strings.grouped(countPerBlock).toSeq + private def writeDataToHDFS( + blockData: Seq[Seq[String]], + blockIds: Seq[BlockId] + ): Seq[FileSegment] = { + assert(blockData.size === blockIds.size) val segments = new ArrayBuffer[FileSegment]() + val writer = new WriteAheadLogWriter(file.toString, hadoopConf) blockData.zip(blockIds).foreach { case (data, id) => segments += writer.write(blockManager.dataSerialize(id, data.iterator)) } writer.close() - (blockData, segments.toSeq) + segments + } + + private def generateFakeSegments(count: Int): Seq[FileSegment] = { + (0 until count).map { + _ => new FileSegment("random", 0l, 0) + } } } From 1fe35671ff904c882f189c5246e31929c6d28b37 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 8 Oct 2014 14:26:19 -0700 Subject: [PATCH 14/14] Removing extra line --- .../spark/streaming/storage/rdd/HDFSBackedBlockRDDSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDDSuite.scala index 7a83b98f7feb5..3d7f9e1ce6e74 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/rdd/HDFSBackedBlockRDDSuite.scala @@ -69,7 +69,6 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { doTestHDFSBackedRDD(0, 5, 20, 5) } - test("Verify part of the data is in BM, and the remaining in HDFS") { doTestHDFSBackedRDD(3, 2, 20, 5) }