@@ -186,146 +186,62 @@ class ReceivedBlockHandlerSuite
186186 }
187187 }
188188
189- test(" BlockManagerBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count messages" ) {
190- storageLevel = StorageLevel .MEMORY_ONLY
191- // Create a non-trivial (not all zeros) byte array
192- val bytes = Array .tabulate(100 )(i => i.toByte)
193- val byteBufferBlock = ByteBuffer .wrap(bytes)
194- withBlockManagerBasedBlockHandler { handler =>
195- val blockStoreResult = storeBlock(handler, ByteBufferBlock (byteBufferBlock))
196- // ByteBufferBlock is counted as single record
197- assert(blockStoreResult.numRecords === Some (1 ))
198- }
199- }
200-
201- test(" WriteAheadLogBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count messages" ) {
202- storageLevel = StorageLevel .MEMORY_ONLY
203- // Create a non-trivial (not all zeros) byte array
204- val bytes = Array .tabulate(100 )(i => i.toByte)
205- val byteBufferBlock = ByteBuffer .wrap(bytes)
206- withWriteAheadLogBasedBlockHandler { handler =>
207- val blockStoreResult = storeBlock(handler, ByteBufferBlock (byteBufferBlock))
208- // ByteBufferBlock is counted as single record
209- assert(blockStoreResult.numRecords === Some (1 ))
210- }
211- }
212-
213- test(" BlockManagerBasedBlockHandler-MEMORY_ONLY-ArrayBufferBlock - count messages" ) {
214- storageLevel = StorageLevel .MEMORY_ONLY
215- val block = ArrayBuffer .fill(100 )(0 )
216- withBlockManagerBasedBlockHandler { handler =>
217- val blockStoreResult = storeBlock(handler, ArrayBufferBlock (block))
218- assert(blockStoreResult.numRecords === Some (100 ))
219- }
220- }
221-
222- test(" BlockManagerBasedBlockHandler-DISK_ONLY-ArrayBufferBlock - count messages" ) {
223- storageLevel = StorageLevel .DISK_ONLY
224- val block = ArrayBuffer .fill(100 )(0 )
225- withBlockManagerBasedBlockHandler { handler =>
226- val blockStoreResult = storeBlock(handler, ArrayBufferBlock (block))
227- assert(blockStoreResult.numRecords === Some (100 ))
228- }
229- }
230-
231- test(" BlockManagerBasedBlockHandler-MEMORY_AND_DISK-ArrayBufferBlock - count messages" ) {
232- storageLevel = StorageLevel .MEMORY_AND_DISK
233- val block = ArrayBuffer .fill(100 )(0 )
234- withBlockManagerBasedBlockHandler { handler =>
235- val blockStoreResult = storeBlock(handler, ArrayBufferBlock (block))
236- assert(blockStoreResult.numRecords === Some (100 ))
237- }
238- }
239-
240- test(" BlockManagerBasedBlockHandler-MEMORY_ONLY-IteratorBlock - count messages" ) {
241- storageLevel = StorageLevel .MEMORY_ONLY
242- val block = ArrayBuffer .fill(100 )(0 )
243- withBlockManagerBasedBlockHandler { handler =>
244- val blockStoreResult = storeBlock(handler, IteratorBlock (block.iterator))
245- assert(blockStoreResult.numRecords === Some (100 ))
246- }
247- }
248-
249- test(" BlockManagerBasedBlockHandler-DISK_ONLY-IteratorBlock - count messages" ) {
250- storageLevel = StorageLevel .DISK_ONLY
251- val block = ArrayBuffer .fill(100 )(0 )
252- withBlockManagerBasedBlockHandler { handler =>
253- val blockStoreResult = storeBlock(handler, IteratorBlock (block.iterator))
254- assert(blockStoreResult.numRecords === Some (100 ))
255- }
256- }
257-
258- test(" BlockManagerBasedBlockHandler-MEMORY_AND_DISK-IteratorBlock - count messages" ) {
259- storageLevel = StorageLevel .MEMORY_AND_DISK
260- val block = ArrayBuffer .fill(100 )(0 )
261- withBlockManagerBasedBlockHandler { handler =>
262- val blockStoreResult = storeBlock(handler, IteratorBlock (block.iterator))
263- assert(blockStoreResult.numRecords === Some (100 ))
264- }
265- }
266-
267- test(" WriteAheadLogBasedBlockHandler-MEMORY_ONLY-ArrayBufferBlock - count messages" ) {
268- storageLevel = StorageLevel .MEMORY_ONLY
269- val block = ArrayBuffer .fill(100 )(0 )
270- withWriteAheadLogBasedBlockHandler { handler =>
271- val blockStoreResult = storeBlock(handler, ArrayBufferBlock (block))
272- assert(blockStoreResult.numRecords === Some (100 ))
273- }
274- }
275-
276- test(" WriteAheadLogBasedBlockHandler-DISK_ONLY-ArrayBufferBlock - count messages" ) {
277- storageLevel = StorageLevel .DISK_ONLY
278- val block = ArrayBuffer .fill(100 )(0 )
279- withWriteAheadLogBasedBlockHandler { handler =>
280- val blockStoreResult = storeBlock(handler, ArrayBufferBlock (block))
281- assert(blockStoreResult.numRecords === Some (100 ))
282- }
283- }
284-
285- test(" WriteAheadLogBasedBlockHandler-MEMORY_AND_DISK-ArrayBufferBlock - count messages" ) {
286- storageLevel = StorageLevel .MEMORY_AND_DISK
287- val block = ArrayBuffer .fill(100 )(0 )
288- withWriteAheadLogBasedBlockHandler { handler =>
289- val blockStoreResult = storeBlock(handler, ArrayBufferBlock (block))
290- assert(blockStoreResult.numRecords === Some (100 ))
291- }
292- }
293-
294- test(" WriteAheadLogBasedBlockHandler-MEMORY_ONLY-IteratorBlock - count messages" ) {
295- storageLevel = StorageLevel .MEMORY_ONLY
296- val block = ArrayBuffer .fill(100 )(0 )
297- withWriteAheadLogBasedBlockHandler { handler =>
298- val blockStoreResult = storeBlock(handler, IteratorBlock (block.iterator))
299- assert(blockStoreResult.numRecords === Some (100 ))
300- }
301- }
302-
303- test(" WriteAheadLogBasedBlockHandler-DISK_ONLY-IteratorBlock - count messages " ) {
304- storageLevel = StorageLevel .DISK_ONLY
305- val block = ArrayBuffer .fill(100 )(0 )
306- withWriteAheadLogBasedBlockHandler { handler =>
307- val blockStoreResult = storeBlock(handler, IteratorBlock (block.iterator))
308- assert(blockStoreResult.numRecords === Some (100 ))
309- }
310- }
311-
312- test(" WriteAheadLogBasedBlockHandler-MEMORY_AND_DISK-IteratorBlock - count messages" ) {
313- storageLevel = StorageLevel .MEMORY_AND_DISK
314- val block = ArrayBuffer .fill(100 )(0 )
315- withWriteAheadLogBasedBlockHandler { handler =>
316- val blockStoreResult = storeBlock(handler, IteratorBlock (block.iterator))
317- assert(blockStoreResult.numRecords === Some (100 ))
318- }
319- }
320-
321- test(" BlockManagerBasedBlockHandler - isFullyConsumed-MEMORY_ONLY" ) {
189+ test(" BlockManagerBasedBlockHandler - count messages" ) {
190+ // ByteBufferBlock-MEMORY_ONLY
191+ testRecordcount(true , StorageLevel .MEMORY_ONLY ,
192+ ByteBufferBlock (ByteBuffer .wrap(Array .tabulate(100 )(i => i.toByte))), blockManager, None )
193+ // ArrayBufferBlock-MEMORY_ONLY
194+ testRecordcount(true , StorageLevel .MEMORY_ONLY ,
195+ ArrayBufferBlock (ArrayBuffer .fill(25 )(0 )), blockManager, Some (25 ))
196+ // ArrayBufferBlock-DISK_ONLY
197+ testRecordcount(true , StorageLevel .DISK_ONLY ,
198+ ArrayBufferBlock (ArrayBuffer .fill(50 )(0 )), blockManager, Some (50 ))
199+ // ArrayBufferBlock-MEMORY_AND_DISK
200+ testRecordcount(true , StorageLevel .MEMORY_AND_DISK ,
201+ ArrayBufferBlock (ArrayBuffer .fill(75 )(0 )), blockManager, Some (75 ))
202+ // IteratorBlock-MEMORY_ONLY
203+ testRecordcount(true , StorageLevel .MEMORY_ONLY ,
204+ IteratorBlock ((ArrayBuffer .fill(100 )(0 )).iterator), blockManager, Some (100 ))
205+ // IteratorBlock-DISK_ONLY
206+ testRecordcount(true , StorageLevel .DISK_ONLY ,
207+ IteratorBlock ((ArrayBuffer .fill(125 )(0 )).iterator), blockManager, Some (125 ))
208+ // IteratorBlock-MEMORY_AND_DISK
209+ testRecordcount(true , StorageLevel .MEMORY_AND_DISK ,
210+ IteratorBlock ((ArrayBuffer .fill(150 )(0 )).iterator), blockManager, Some (150 ))
211+ }
212+
213+ test(" WriteAheadLogBasedBlockHandler - count messages" ) {
214+ // ByteBufferBlock-MEMORY_ONLY
215+ testRecordcount(false , StorageLevel .MEMORY_ONLY ,
216+ ByteBufferBlock (ByteBuffer .wrap(Array .tabulate(100 )(i => i.toByte))), blockManager, None )
217+ // ArrayBufferBlock-MEMORY_ONLY
218+ testRecordcount(false , StorageLevel .MEMORY_ONLY ,
219+ ArrayBufferBlock (ArrayBuffer .fill(25 )(0 )), blockManager, Some (25 ))
220+ // ArrayBufferBlock-DISK_ONLY
221+ testRecordcount(false , StorageLevel .DISK_ONLY ,
222+ ArrayBufferBlock (ArrayBuffer .fill(50 )(0 )), blockManager, Some (50 ))
223+ // ArrayBufferBlock-MEMORY_AND_DISK
224+ testRecordcount(false , StorageLevel .MEMORY_AND_DISK ,
225+ ArrayBufferBlock (ArrayBuffer .fill(75 )(0 )), blockManager, Some (75 ))
226+ // IteratorBlock-MEMORY_ONLY
227+ testRecordcount(false , StorageLevel .MEMORY_ONLY ,
228+ IteratorBlock ((ArrayBuffer .fill(100 )(0 )).iterator), blockManager, Some (100 ))
229+ // IteratorBlock-DISK_ONLY
230+ testRecordcount(false , StorageLevel .DISK_ONLY ,
231+ IteratorBlock ((ArrayBuffer .fill(125 )(0 )).iterator), blockManager, Some (125 ))
232+ // IteratorBlock-MEMORY_AND_DISK
233+ testRecordcount(false , StorageLevel .MEMORY_AND_DISK ,
234+ IteratorBlock ((ArrayBuffer .fill(150 )(0 )).iterator), blockManager, Some (150 ))
235+ }
236+
237+ test(" BlockManagerBasedBlockHandler-MEMORY_ONLY - isFullyConsumed" ) {
322238 storageLevel = StorageLevel .MEMORY_ONLY
323239 blockManager = createBlockManager(12000 )
324240 val block = List .fill(70 )(new Array [Byte ](100 ))
325241 // spark.storage.unrollFraction set to 0.4 for BlockManager
326- // With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store
242+ // With 12000 * 0.4 = 4800 bytes of free space for unroll, there is not enough space to store
327243 // this block With MEMORY_ONLY StorageLevel. BlockManager will not be able to unroll this block
328- // and hence it will not tryToPut this block , resulting the SparkException
244+ // and hence it will not tryToPut this block, resulting the SparkException
329245 withBlockManagerBasedBlockHandler { handler =>
330246 val thrown = intercept[SparkException ] {
331247 val blockStoreResult = storeBlock(handler, IteratorBlock (block.iterator))
@@ -335,34 +251,55 @@ class ReceivedBlockHandlerSuite
335251 }
336252 }
337253
338- test(" BlockManagerBasedBlockHandler - isFullyConsumed-MEMORY_AND_DISK" ) {
339- storageLevel = StorageLevel .MEMORY_AND_DISK
254+ test(" BlockManagerBasedBlockHandler-MEMORY_AND_DISK - isFullyConsumed" ) {
340255 blockManager = createBlockManager(12000 )
341- val block = List .fill(70 )(new Array [Byte ](100 ))
342256 // spark.storage.unrollFraction set to 0.4 for BlockManager
343- // With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store
344- // this block in MEMORY , But BlockManager will be able to sereliaze this block to DISK
257+ // With 12000 * 0.4 = 4800 bytes of free space for unroll, there is not enough space to store
258+ // this block in MEMORY, But BlockManager will be able to sereliaze this block to DISK
345259 // and hence count returns correct value.
346- withBlockManagerBasedBlockHandler { handler =>
347- val blockStoreResult = storeBlock(handler, IteratorBlock (block.iterator))
348- assert(blockStoreResult.numRecords === Some (70 ))
349- }
260+ testRecordcount(true , StorageLevel .MEMORY_AND_DISK ,
261+ IteratorBlock ((List .fill(70 )(new Array [Byte ](100 ))).iterator), blockManager, Some (70 ))
350262 }
351263
352- test(" WriteAheadLogBasedBlockHandler - isFullyConsumed-MEMORY_ONLY" ) {
353- storageLevel = StorageLevel .MEMORY_ONLY
264+ test(" WriteAheadLogBasedBlockHandler-MEMORY_ONLY - isFullyConsumed" ) {
354265 blockManager = createBlockManager(12000 )
355- val block = List .fill(70 )(new Array [Byte ](100 ))
356266 // spark.storage.unrollFraction set to 0.4 for BlockManager
357- // With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store
358- // this block in MEMORY , But BlockManager will be able to sereliaze this block to WAL
267+ // With 12000 * 0.4 = 4800 bytes of free space for unroll, there is not enough space to store
268+ // this block in MEMORY, But BlockManager will be able to sereliaze this block to WAL
359269 // and hence count returns correct value.
360- withWriteAheadLogBasedBlockHandler { handler =>
361- val blockStoreResult = storeBlock(handler, IteratorBlock (block.iterator))
362- assert(blockStoreResult.numRecords === Some (70 ))
363- }
270+ testRecordcount(false , StorageLevel .MEMORY_ONLY ,
271+ IteratorBlock ((List .fill(70 )(new Array [Byte ](100 ))).iterator), blockManager, Some (70 ))
364272 }
365273
274+ /**
275+ * Test storing of data using different types of Handler, StorageLevle and ReceivedBlocks
276+ * and verify the correct record count
277+ */
278+ private def testRecordcount (isBlockManagedBasedBlockHandler : Boolean ,
279+ sLevel : StorageLevel ,
280+ receivedBlock : ReceivedBlock ,
281+ bManager : BlockManager ,
282+ expectedNumRecords : Option [Long ]
283+ ) {
284+ storageLevel = sLevel
285+ blockManager = bManager
286+ if (isBlockManagedBasedBlockHandler) {
287+ // test received block with BlockManager based handler
288+ withBlockManagerBasedBlockHandler { handler =>
289+ val blockStoreResult = storeBlock(handler, receivedBlock)
290+ assert(blockStoreResult.numRecords === expectedNumRecords)
291+ }
292+ } else {
293+ // test received block with WAL based handler
294+ withWriteAheadLogBasedBlockHandler { handler =>
295+ val blockStoreResult = storeBlock(handler, receivedBlock)
296+ assert(blockStoreResult.numRecords === expectedNumRecords)
297+ }
298+ }
299+ // Removing the Block Id to use same blockManager for next test
300+ blockManager.removeBlock(StreamBlockId (streamId, 1000L ), true )
301+ }
302+
366303 /**
367304 * Test storing of data using different forms of ReceivedBlocks and verify that they succeeded
368305 * using the given verification function
0 commit comments