Skip to content

Commit a55a65c

Browse files
committed
Added a unit test to test replication behavior.
1 parent 012afa3 commit a55a65c

File tree

2 files changed

+69
-21
lines changed

2 files changed

+69
-21
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ private[spark] class BlockManager(
112112
MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
113113
private val broadcastCleaner = new MetadataCleaner(
114114
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
115+
116+
// Field related to peer block managers that are necessary for block replication
115117
@volatile private var cachedPeers: Seq[BlockManagerId] = _
116118
private val peerFetchLock = new Object
117119
private var lastPeerFetchTime = 0L
@@ -807,7 +809,8 @@ private[spark] class BlockManager(
807809
}
808810

809811
/**
810-
* Replicate block to another node.
812+
* Replicate block to another node. Not that this is a blocking call that returns after
813+
* the block has been replicated.
811814
*/
812815
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
813816
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 65 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import org.apache.spark.shuffle.hash.HashShuffleManager
4848
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
4949
import org.apache.spark.storage.StorageLevel._
5050
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
51+
import org.apache.spark.network.{ManagedBuffer, BlockTransferService}
5152

5253

5354
class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
@@ -1370,41 +1371,85 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
13701371
assert(a3Locs3x !== a2Locs3x, "Two blocks gave same locations with 3x replication")
13711372
}
13721373

1374+
test("block replication - replication failures") {
1375+
1376+
/*
1377+
Create a system of three block managers / stores. One of them (say, failableStore)
1378+
cannot receive blocks. So attempts to use that as replication target fails.
1379+
1380+
store <-----------/fails/-----------> failableStore
1381+
A A
1382+
| |
1383+
| |
1384+
-----/works/-----> store1 <----/fails/------
1385+
1386+
We are first going to add a normal store and a failable store, and test
1387+
whether 2x replication fails to create two copies of a block.
1388+
Then we are going to add the third normal store,
1389+
and test that now 2x replication works as the new store will be used for replication.
1390+
*/
1391+
1392+
1393+
// Insert a block with 2x replication and return the number of copies of the block
1394+
def replicateAndGetNumCopies(blockId: String): Int = {
1395+
store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2)
1396+
val numLocations = master.getLocations(blockId).size
1397+
allStores.foreach { _.removeBlock(blockId) }
1398+
numLocations
1399+
}
1400+
1401+
// Add a normal block manager
1402+
store = makeBlockManager(10000, "store")
1403+
1404+
// Add a failable block manager with a mock transfer service that does not
1405+
// allow receiving of blocks. So attempts to use it as a replication target will fail.
1406+
val failableTransfer = mock(classOf[BlockTransferService]) // this wont actually work
1407+
when(failableTransfer.hostName).thenReturn("some-hostname")
1408+
when(failableTransfer.port).thenReturn(1000)
1409+
val failableStore = new BlockManager("failable-store", actorSystem, master, serializer, 10000, conf,
1410+
mapOutputTracker, shuffleManager, failableTransfer)
1411+
allStores += failableStore // so that this gets stopped after test
1412+
assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId))
1413+
1414+
// Test that 2x replication fails by creating only one copy of the block
1415+
assert(replicateAndGetNumCopies("a1") === 1)
1416+
1417+
// Add another normal block manager and test that 2x replication works
1418+
store = makeBlockManager(10000, "store2")
1419+
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
1420+
assert(replicateAndGetNumCopies("a2") === 2)
1421+
}
1422+
}
1423+
13731424
test("block replication - addition and deletion of block managers") {
13741425
val blockSize = 1000
13751426
val storeSize = 10000
13761427
val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") }
13771428

1378-
/**
1379-
* Function to test whether insert a block with replication achieves the expected replication.
1380-
* Since this function can be call on the same block id repeatedly through an `eventually`,
1381-
* it needs to be ensured that the method leaves block manager + master in the same state as
1382-
* it was before attempting to insert the block.
1383-
*/
1384-
def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) {
1385-
try {
1386-
initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
1387-
assert(master.getLocations(blockId).size === expectedNumLocations)
1388-
} finally {
1389-
allStores.foreach { _.removeBlock(blockId) }
1390-
}
1429+
// Insert a block with given replication factor and return the number of copies of the block\
1430+
def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = {
1431+
val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
1432+
initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
1433+
val numLocations = master.getLocations(blockId).size
1434+
allStores.foreach { _.removeBlock(blockId) }
1435+
numLocations
13911436
}
13921437

13931438
// 2x replication should work, 3x replication should only replicate 2x
1394-
testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2)
1395-
testPut("a2", StorageLevel(true, true, false, true, 3), 2)
1439+
assert(replicateAndGetNumCopies("a1", 2) === 2)
1440+
assert(replicateAndGetNumCopies("a2", 3) === 2)
13961441

13971442
// Add another store, 3x replication should work now, 4x replication should only replicate 3x
13981443
val newStore1 = makeBlockManager(storeSize, s"newstore1")
13991444
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
1400-
testPut("a3", StorageLevel(true, true, false, true, 3), 3)
1445+
assert(replicateAndGetNumCopies("a3", 3) === 3)
14011446
}
1402-
testPut("a4",StorageLevel(true, true, false, true, 4), 3)
1447+
assert(replicateAndGetNumCopies("a4", 4) === 3)
14031448

14041449
// Add another store, 4x replication should work now
14051450
val newStore2 = makeBlockManager(storeSize, s"newstore2")
14061451
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
1407-
testPut("a5", StorageLevel(true, true, false, true, 4), 4)
1452+
assert(replicateAndGetNumCopies("a5", 4) === 4)
14081453
}
14091454

14101455
// Remove all but the 1st store, 2x replication should fail
@@ -1413,12 +1458,12 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
14131458
master.removeExecutor(store.blockManagerId.executorId)
14141459
store.stop()
14151460
}
1416-
testPut("a6", StorageLevel.MEMORY_AND_DISK_2, 1)
1461+
assert(replicateAndGetNumCopies("a6", 2) === 1)
14171462

14181463
// Add new stores, 3x replication should work
14191464
val newStores = (3 to 5).map { i => makeBlockManager(storeSize, s"newstore$i") }
14201465
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
1421-
testPut("a7", StorageLevel(true, true, false, true, 3), 3)
1466+
assert(replicateAndGetNumCopies("a7", 3) === 3)
14221467
}
14231468
}
14241469

0 commit comments

Comments
 (0)