4444import java .util .Collections ;
4545import java .util .List ;
4646import java .util .Map ;
47+ import java .util .Set ;
48+ import java .util .TreeSet ;
4749import java .util .concurrent .atomic .AtomicBoolean ;
4850import java .util .concurrent .atomic .AtomicInteger ;
4951
5557import org .apache .hadoop .ha .HAServiceProtocol .HAServiceState ;
5658import org .apache .hadoop .hdfs .DFSTestUtil ;
5759import org .apache .hadoop .hdfs .protocol .Block ;
60+ import org .apache .hadoop .hdfs .protocol .BlockListAsLongs .BlockReportReplica ;
5861import org .apache .hadoop .hdfs .protocol .ExtendedBlock ;
5962import org .apache .hadoop .hdfs .protocol .LocatedBlock ;
6063import org .apache .hadoop .hdfs .protocolPB .DatanodeProtocolClientSideTranslatorPB ;
@@ -109,8 +112,6 @@ public class TestBPOfferService {
109112 private long firstLeaseId = 0 ;
110113 private long secondLeaseId = 0 ;
111114 private long nextFullBlockReportLeaseId = 1L ;
112- private int fullBlockReportCount = 0 ;
113- private int incrBlockReportCount = 0 ;
114115
115116 static {
116117 GenericTestUtils .setLogLevel (DataNode .LOG , Level .ALL );
@@ -233,14 +234,6 @@ public HeartbeatResponse answer(InvocationOnMock invocation)
233234 }
234235 }
235236
236- private void setBlockReportCount (int count ) {
237- fullBlockReportCount = count ;
238- }
239-
240- private void setIncreaseBlockReportCount (int count ) {
241- incrBlockReportCount += count ;
242- }
243-
244237 /**
245238 * Test that the BPOS can register to talk to two different NNs,
246239 * sends block reports to both, etc.
@@ -288,6 +281,7 @@ public void testMissBlocksWhenReregister() throws Exception {
288281 Thread addNewBlockThread = null ;
289282 final AtomicInteger count = new AtomicInteger (0 );
290283 DataNodeFaultInjector prevDNFaultInjector = null ;
284+ Set <Long > blocks = new TreeSet <>();
291285 try {
292286 waitForBothActors (bpos );
293287 waitForInitialization (bpos );
@@ -303,7 +297,7 @@ public void blockUtilSendFullBlockReport() {
303297 }
304298 });
305299
306- countBlockReportItems (FAKE_BLOCK , mockNN1 );
300+ countBlockReportItems (FAKE_BLOCK , mockNN1 , blocks );
307301 addNewBlockThread = new Thread (() -> {
308302 for (int i = 0 ; i < totalTestBlocks ; i ++) {
309303 SimulatedFSDataset fsDataset = (SimulatedFSDataset ) mockFSDataset ;
@@ -334,14 +328,12 @@ public void blockUtilSendFullBlockReport() {
334328 addNewBlockThread = null ;
335329 // Verify FBR/IBR count is equal to generate number.
336330 try {
337- GenericTestUtils .waitFor (() ->
338- (fullBlockReportCount == totalTestBlocks ||
339- incrBlockReportCount == totalTestBlocks ), 1000 , 15000 );
331+ GenericTestUtils .waitFor (() -> blocks .size () == totalTestBlocks ,
332+ 1000 , 15000 );
340333 } catch (Exception e ) {
341- fail (String .format ("Timed out wait for IBR counts FBRCount = %d,"
342- + " IBRCount = %d; expected = %d. Exception: %s" ,
343- fullBlockReportCount , incrBlockReportCount , totalTestBlocks ,
344- e .getMessage ()));
334+ fail (String .format ("Timed out waiting for blocks count. "
335+ + "reported = %d, expected = %d. Exception: %s" ,
336+ blocks .size (), totalTestBlocks , e .getMessage ()));
345337 }
346338
347339 } finally {
@@ -711,7 +703,8 @@ private void setTimeForSynchronousBPOSCalls() {
711703 * which assume no deleting blocks here.
712704 */
713705 private void countBlockReportItems (final ExtendedBlock fakeBlock ,
714- final DatanodeProtocolClientSideTranslatorPB mockNN ) throws Exception {
706+ final DatanodeProtocolClientSideTranslatorPB mockNN ,
707+ final Set <Long > blocks ) throws Exception {
715708 final String fakeBlockPoolId = fakeBlock .getBlockPoolId ();
716709 final ArgumentCaptor <StorageBlockReport []> captor =
717710 ArgumentCaptor .forClass (StorageBlockReport [].class );
@@ -720,7 +713,9 @@ private void countBlockReportItems(final ExtendedBlock fakeBlock,
720713 Mockito .doAnswer ((Answer <Object >) invocation -> {
721714 Object [] arguments = invocation .getArguments ();
722715 StorageBlockReport [] list = (StorageBlockReport [])arguments [2 ];
723- setBlockReportCount (list [0 ].getBlocks ().getNumberOfBlocks ());
716+ for (BlockReportReplica brr : list [0 ].getBlocks ()) {
717+ blocks .add (brr .getBlockId ());
718+ }
724719 return null ;
725720 }).when (mockNN ).blockReport (
726721 Mockito .any (),
@@ -734,7 +729,9 @@ private void countBlockReportItems(final ExtendedBlock fakeBlock,
734729 Object [] arguments = invocation .getArguments ();
735730 StorageReceivedDeletedBlocks [] list =
736731 (StorageReceivedDeletedBlocks [])arguments [2 ];
737- setIncreaseBlockReportCount (list [0 ].getBlocks ().length );
732+ for (ReceivedDeletedBlockInfo rdbi : list [0 ].getBlocks ()) {
733+ blocks .add (rdbi .getBlock ().getBlockId ());
734+ }
738735 return null ;
739736 }).when (mockNN ).blockReceivedAndDeleted (
740737 Mockito .any (),
@@ -1233,4 +1230,4 @@ public void testCommandProcessingThreadExit() throws Exception {
12331230 }
12341231 }
12351232 }
1236- }
1233+ }
0 commit comments