diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 8061f7b5a6cbf..5041ef3330ac2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -471,6 +471,10 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { if (curPeer == null) break; if (curPeer.fromCache) remainingCacheTries--; DomainPeer peer = (DomainPeer)curPeer.peer; + try { + peer.setReadTimeout(conf.socketTimeout); + } + catch(IOException e){} Slot slot = null; ShortCircuitCache cache = clientContext.getShortCircuitCache(); try { @@ -486,6 +490,10 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { curPeer = nextDomainPeer(); if (curPeer == null) break; peer = (DomainPeer)curPeer.peer; + try { + peer.setReadTimeout(conf.socketTimeout); + } + catch(IOException e){} } ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot); clientContext.getPeerCache().put(datanode, peer); @@ -629,6 +637,7 @@ private BlockReader getRemoteBlockReaderFromDomain() throws IOException { if (curPeer == null) break; if (curPeer.fromCache) remainingCacheTries--; DomainPeer peer = (DomainPeer)curPeer.peer; + peer.setReadTimeout(conf.socketTimeout); BlockReader blockReader = null; try { blockReader = getRemoteBlockReader(peer); @@ -694,6 +703,7 @@ private BlockReader getRemoteBlockReaderFromTcp() throws IOException { curPeer = nextTcpPeer(); if (curPeer.fromCache) remainingCacheTries--; peer = curPeer.peer; + peer.setReadTimeout(conf.socketTimeout); blockReader = getRemoteBlockReader(peer); return blockReader; } catch (IOException ioe) { @@ -743,6 +753,10 @@ private BlockReaderPeer nextDomainPeer() { if (remainingCacheTries > 0) { Peer peer = clientContext.getPeerCache().get(datanode, true); if (peer != null) { + try { + peer.setReadTimeout(conf.socketTimeout); + } + catch(IOException e){} if (LOG.isTraceEnabled()) { LOG.trace("nextDomainPeer: reusing existing peer " + peer); } @@ -767,6 +781,7 @@ private BlockReaderPeer nextTcpPeer() throws IOException { if (remainingCacheTries > 0) { Peer peer = clientContext.getPeerCache().get(datanode, false); if (peer != null) { + peer.setReadTimeout(conf.socketTimeout); if (LOG.isTraceEnabled()) { LOG.trace("nextTcpPeer: reusing existing peer " + peer); } @@ -776,6 +791,7 @@ private BlockReaderPeer nextTcpPeer() throws IOException { try { Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token, datanode); + peer.setReadTimeout(conf.socketTimeout); if (LOG.isTraceEnabled()) { LOG.trace("nextTcpPeer: created newConnectedPeer " + peer); }