@@ -224,7 +224,7 @@ boolean deadNodesContain(DatanodeInfo nodeInfo) {
224224  }
225225
226226  /** 
227-    * Grab the open-file info from namenode 
227+    * Grab the open-file info from namenode.  
228228   * @param refreshLocatedBlocks whether to re-fetch locatedblocks 
229229   */ 
230230  void  openInfo (boolean  refreshLocatedBlocks ) throws  IOException  {
@@ -940,7 +940,8 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
940940   * @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is 
941941   * false. 
942942   */ 
943-   private  DNAddrPair  chooseDataNode (LocatedBlock  block ,
943+   @ VisibleForTesting 
944+   DNAddrPair  chooseDataNode (LocatedBlock  block ,
944945      Collection <DatanodeInfo > ignoredNodes , boolean  refetchIfRequired )
945946      throws  IOException  {
946947    while  (true ) {
@@ -955,6 +956,14 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
955956    }
956957  }
957958
959+   /** 
960+    * RefetchLocations should only be called when there are no active requests 
961+    * to datanodes. In the hedged read case this means futures should be empty. 
962+    * @param block The locatedBlock to get new datanode locations for. 
963+    * @param ignoredNodes A list of ignored nodes. This list can be null and can be cleared. 
964+    * @return the locatedBlock with updated datanode locations. 
965+    * @throws IOException 
966+    */ 
958967  private  LocatedBlock  refetchLocations (LocatedBlock  block ,
959968      Collection <DatanodeInfo > ignoredNodes ) throws  IOException  {
960969    String  errMsg  = getBestNodeDNAddrPairErrorString (block .getLocations (),
@@ -999,13 +1008,24 @@ private LocatedBlock refetchLocations(LocatedBlock block,
9991008      throw  new  InterruptedIOException (
10001009          "Interrupted while choosing DataNode for read." );
10011010    }
1002-     clearLocalDeadNodes ();  //2nd option is to remove only nodes[blockId] 
1011+     clearCachedNodeState ( ignoredNodes ); 
10031012    openInfo (true );
10041013    block  = refreshLocatedBlock (block );
10051014    failures ++;
10061015    return  block ;
10071016  }
10081017
1018+   /** 
1019+    * Clear both the dead nodes and the ignored nodes 
1020+    * @param ignoredNodes is cleared 
1021+    */ 
1022+   private  void  clearCachedNodeState (Collection <DatanodeInfo > ignoredNodes ) {
1023+     clearLocalDeadNodes (); //2nd option is to remove only nodes[blockId] 
1024+     if  (ignoredNodes  != null ) {
1025+       ignoredNodes .clear ();
1026+     }
1027+   }
1028+ 
10091029  /** 
10101030   * Get the best node from which to stream the data. 
10111031   * @param block LocatedBlock, containing nodes in priority order. 
@@ -1337,8 +1357,12 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
13371357        } catch  (InterruptedException  ie ) {
13381358          // Ignore and retry 
13391359        }
1340-         if  (refetch ) {
1341-           refetchLocations (block , ignored );
1360+         // If refetch is true, then all nodes are in deadNodes or ignoredNodes. 
1361+         // We should loop through all futures and remove them, so we do not 
1362+         // have concurrent requests to the same node. 
1363+         // Once all futures are cleared, we can clear the ignoredNodes and retry. 
1364+         if  (refetch  && futures .isEmpty ()) {
1365+           block  = refetchLocations (block , ignored );
13421366        }
13431367        // We got here if exception. Ignore this node on next go around IFF 
13441368        // we found a chosenNode to hedge read against. 
0 commit comments